Documentation
¶
Overview ¶
Package pipeline provides channel-based functional primitives with persistent worker pools.
Unlike [slice.FanOut] (semaphore-per-call, push model), pipeline functions use persistent worker goroutines that pull from input channels. Blocked workers naturally stop pulling, creating backpressure upstream.
The core primitive is Map, which applies a call.Func to each input using N workers while preserving input order. Compose resilience first via call decorators (Retry, CircuitBreaker, Throttle), then execute through Map:
fn := fetchOrder.With(call.Retrier(3, call.ExponentialBackoff(time.Second), isRetryable)) results := pipeline.Map(ctx, orderIDs, fn, 8)
MapUnordered emits results in completion order for higher throughput.
Supporting primitives (Filter, Batch, Merge, Tee) compose freely with Map. They operate on plain T values — when T is rslt.Result, errors pass through naturally.
All functions respect context cancellation and guarantee no goroutine leaks.
Index ¶
- func Batch[T any](ctx context.Context, in <-chan T, size int) <-chan []T
- func Filter[T any](ctx context.Context, in <-chan T, fn func(T) bool) <-chan T
- func FromSlice[T any](ctx context.Context, ts []T) <-chan T
- func Generate[T any](ctx context.Context, fn func() (T, bool)) <-chan T
- func Map[T, R any](ctx context.Context, in <-chan T, fn call.Func[T, R], workers int) <-chan rslt.Result[R]
- func MapUnordered[T, R any](ctx context.Context, in <-chan T, fn call.Func[T, R], workers int) <-chan rslt.Result[R]
- func Merge[T any](ctx context.Context, ins ...<-chan T) <-chan T
- func Tee[T any](ctx context.Context, in <-chan T, n int) []<-chan T
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Batch ¶
Batch collects elements into slices of the given size. Emits a partial batch when input closes mid-batch. Each emitted slice is an independent copy. Panics if size <= 0.
func Filter ¶
Filter sends only elements where fn returns true. Single goroutine — no concurrency needed for a pure predicate. fn must not be nil.
func FromSlice ¶
FromSlice sends each element of ts to the returned channel, then closes it. Respects ctx cancellation.
func Generate ¶
Generate calls fn repeatedly, sending results to the returned channel. fn returns (value, more). When more is false or ctx cancels, the channel closes. fn must not be nil.
func Map ¶
func Map[T, R any](ctx context.Context, in <-chan T, fn call.Func[T, R], workers int) <-chan rslt.Result[R]
Map applies fn to each input using workers persistent goroutines (pull model). Output order matches input order via a reorder buffer. Workers pull from input — blocked workers create natural backpressure. Panics in fn are recovered as *rslt.PanicError. Panics if workers <= 0. fn must not be nil.
func MapUnordered ¶
func MapUnordered[T, R any](ctx context.Context, in <-chan T, fn call.Func[T, R], workers int) <-chan rslt.Result[R]
MapUnordered applies fn to each input using workers persistent goroutines. Results are emitted in completion order for higher throughput when processing times vary. Panics in fn are recovered as *rslt.PanicError. Panics if workers <= 0. fn must not be nil.
Types ¶
This section is empty.