Documentation
¶
Overview ¶
Package concurrency ships Harbor's runtime concurrency primitives — Phase 14 of the runtime kernel chain (RFC §6.1).
Two stateless helpers:
- MapConcurrent: runs fn over each input envelope with at most maxConcurrency goroutines in flight; preserves output order.
- JoinK: reads exactly K envelopes from a channel; cancels the remaining producers via ctx; short-read returns ErrJoinKShortRead.
Both are pure functions — no shared state, no compiled artifacts, trivially safe to call from N concurrent runs (D-025 N/A by construction).
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidConcurrency = errors.New("concurrency: maxConcurrency must be > 0")
ErrInvalidConcurrency — MapConcurrent was called with maxConcurrency <= 0. Wraps the offending value for operator visibility.
var ErrInvalidK = errors.New("concurrency: k must be > 0")
ErrInvalidK — JoinK was called with k <= 0.
var ErrJoinKShortRead = errors.New("concurrency: JoinK source closed before K envelopes arrived")
ErrJoinKShortRead — the input channel closed before K envelopes arrived. The slice returned alongside the error contains however many envelopes did arrive (may be empty).
Functions ¶
func JoinK ¶
JoinK reads exactly K envelopes from `in`. After K, derives ctx cancellation so upstream producers blocked on send observe the cancellation and exit. Returns the K envelopes (caller-owned slice).
If `in` closes before K arrive, returns ErrJoinKShortRead alongside the partial slice. If the caller's ctx cancels mid-read, returns ctx.Err() with the partial slice (which may be empty).
Cancellation note: JoinK derives a child ctx and returns it via the `cancel` parameter so the caller wires upstream producers to honor it. Callers who don't need upstream cancellation can ignore the derived ctx; the cancel function is invoked internally on the happy path.
nil channel is rejected at call time. K must be > 0.
func MapConcurrent ¶
func MapConcurrent( ctx context.Context, in []messages.Envelope, fn func(context.Context, messages.Envelope) (messages.Envelope, error), maxConcurrency int, ) ([]messages.Envelope, error)
MapConcurrent runs fn over each envelope in `in` with at most maxConcurrency goroutines in flight. Output preserves input order. Returns the first error encountered; remaining work is cancelled.
Cancellation: the function derives a child ctx from the caller's ctx. On the first fn error, the child ctx is cancelled, so any in-flight fn invocations that honor ctx will exit promptly. The caller's ctx is never cancelled.
Order preservation: a pre-allocated output slice is indexed by input position; goroutines write to their own slot. No locks needed for the slice itself — distinct indices are written by distinct goroutines.
nil fn is rejected at call time. Empty input returns nil, nil.
Types ¶
This section is empty.