Documentation
¶
Overview ¶
Package toc provides a constrained stage runner inspired by Drum-Buffer-Rope (Theory of Constraints), with pipeline composition via Pipe, NewBatcher, NewTee, and NewMerge.
A Stage owns a bounded input queue and one or more workers with bounded concurrency. Producers submit items via Stage.Submit; the stage processes them through fn and emits results on Stage.Out. When the stage is saturated, Submit blocks — this is the "rope" limiting upstream WIP.
The stage tracks constraint utilization, idle time, and output-blocked time via Stage.Stats, helping operators assess whether this stage is acting as the constraint and whether downstream backpressure is suppressing throughput.
Single-Stage Lifecycle ¶
- Start a stage with Start
- Submit items with Stage.Submit from one or more goroutines
- Call Stage.CloseInput when all submissions are done (see below)
- Read results from Stage.Out until closed — must drain to completion
- Call Stage.Wait (or Stage.Cause) to block until shutdown completes Or combine steps 4-5: Stage.DiscardAndWait / Stage.DiscardAndCause
The goroutine or coordinator that knows no more submissions will occur owns CloseInput. In single-producer code, deferring CloseInput in the submitting goroutine is a good safety net. With multiple producers, a coordinator should call CloseInput after all producers finish (e.g., after a sync.WaitGroup). CloseInput is also called internally on fail-fast error or parent context cancellation, so the input side closes automatically on abnormal paths.
Cardinality: under the liveness conditions below, every Stage.Submit that returns nil yields exactly one rslt.Result on Stage.Out. Submit calls that return an error produce no result.
Operational notes: callers must drain Stage.Out until closed, or use Stage.DiscardAndWait / Stage.DiscardAndCause. If callers stop draining Out, workers block on result delivery and Stage.Wait / Stage.Cause may never return. If fn blocks forever or ignores cancellation, the stage leaks goroutines and never completes. See Stage.Out for full liveness details. Total stage WIP (item count) is up to Capacity (buffered) + Workers (in-flight). See Stage.Cause for terminal-status semantics (Wait vs Cause). See Stage.Wait for completion semantics.
Pipeline Composition ¶
Pipe composes stages by reading from an upstream Result channel, forwarding Ok values to workers and passing Err values directly to the output (error passthrough). NewBatcher accumulates items into fixed-count batches between stages. NewWeightedBatcher accumulates items into weight-based batches (flush when accumulated weight reaches threshold). NewTee broadcasts each item to N branches (synchronous lockstep — slowest consumer governs pace). NewMerge recombines multiple upstream Result channels into a single nondeterministic stream (fan-in). NewJoin recombines two branch results into one combined output (strict branch recombination — one item from each source, combined via a function).
Pipelines have two error planes: data-plane errors (per-item rslt.Err in Stage.Out) and control-plane errors (Stage.Wait / Stage.Cause). Forwarded upstream errors are data-plane only — they never trigger fail-fast in the downstream stage.
See the package README for pipeline lifecycle contract, cancellation topology, and selection rubric (call.MapErr vs toc.Pipe).
This package is for pipelines with a known bottleneck stage. If you don't know your constraint, profile first.
Example ¶
package main
import (
"context"
"fmt"
"github.com/binaryphile/fluentfp/toc"
)
func main() {
// double concatenates a string with itself.
double := func(_ context.Context, s string) (string, error) {
return s + s, nil
}
ctx := context.Background()
// Start's ctx governs stage lifetime; Submit's ctx bounds only admission.
stage := toc.Start(ctx, double, toc.Options[string]{Capacity: 3, Workers: 1})
go func() {
defer stage.CloseInput()
for _, item := range []string{"a", "b", "c"} {
if err := stage.Submit(ctx, item); err != nil {
break
}
}
}()
for result := range stage.Out() {
val, err := result.Unpack()
if err != nil {
fmt.Println("error:", err)
continue
}
fmt.Println(val)
}
if err := stage.Wait(); err != nil {
fmt.Println("stage error:", err)
}
}
Output: aa bb cc
Example (Pipe) ¶
Example_pipe demonstrates basic error passthrough through a Pipe stage.
package main
import (
"context"
"errors"
"fmt"
"github.com/binaryphile/fluentfp/rslt"
"github.com/binaryphile/fluentfp/toc"
)
func main() {
ctx := context.Background()
src := make(chan rslt.Result[int], 3)
src <- rslt.Ok(10)
src <- rslt.Err[int](errors.New("oops"))
src <- rslt.Ok(20)
close(src)
// doubleFn doubles the input.
doubleFn := func(_ context.Context, n int) (int, error) {
return n * 2, nil
}
stage := toc.Pipe(ctx, src, doubleFn, toc.Options[int]{})
for r := range stage.Out() {
if v, err := r.Unpack(); err != nil {
fmt.Println("error:", err)
} else {
fmt.Println(v)
}
}
stage.Wait()
// Forwarded errors bypass the worker queue, so the error
// may arrive before queued Ok results complete.
}
Output: error: oops 20 40
Example (Pipeline) ¶
Example_pipeline demonstrates a four-handle pipeline modeled on the era-indexer: Start → Batcher → Pipe → Pipe, with error passthrough and reverse-order Wait.
package main
import (
"context"
"errors"
"fmt"
"github.com/binaryphile/fluentfp/toc"
)
func main() {
ctx := context.Background()
// Stage 1: "chunk" each number into a string representation.
chunkFn := func(_ context.Context, n int) (string, error) {
if n == 3 {
return "", errors.New("bad input: 3")
}
return fmt.Sprintf("chunk(%d)", n), nil
}
chunker := toc.Start(ctx, chunkFn, toc.Options[int]{Capacity: 5, ContinueOnError: true})
// Stage 2: Batch strings into groups of 2.
batched := toc.NewBatcher(ctx, chunker.Out(), 2)
// Stage 3: "embed" each batch by joining.
embedFn := func(_ context.Context, batch []string) (string, error) {
result := ""
for i, s := range batch {
if i > 0 {
result += "+"
}
result += s
}
return fmt.Sprintf("embed[%s]", result), nil
}
embedder := toc.Pipe(ctx, batched.Out(), embedFn, toc.Options[[]string]{})
// Stage 4: "store" by uppercasing (identity for this example).
storeFn := func(_ context.Context, s string) (string, error) {
return fmt.Sprintf("store(%s)", s), nil
}
storer := toc.Pipe(ctx, embedder.Out(), storeFn, toc.Options[string]{})
// Feed the head stage.
go func() {
defer chunker.CloseInput()
for _, n := range []int{1, 2, 3, 4, 5} {
if err := chunker.Submit(ctx, n); err != nil {
break
}
}
}()
// Drain the tail.
for r := range storer.Out() {
if v, err := r.Unpack(); err != nil {
fmt.Println("error:", err)
} else {
fmt.Println(v)
}
}
// Wait in reverse order (recommended).
storer.Wait()
embedder.Wait()
batched.Wait()
chunker.Wait()
// Forwarded errors bypass worker queues, so the error from item 3
// may arrive before the batch containing items 1-2 is processed.
}
Output: error: bad input: 3 store(embed[chunk(1)+chunk(2)]) store(embed[chunk(4)+chunk(5)])
Index ¶
- Variables
- func Adapt(name string, prev, curr Stats, elapsed time.Duration) core.StageObservation
- func BufferCapacity(throughput float64, protectionTime time.Duration) int
- func FromChan[T any](ch <-chan T) <-chan rslt.Result[T]
- type Batcher
- type BatcherStats
- type BufferZone
- type FocusingStep
- type IncomingEdge
- type IntervalStats
- type Join
- type JoinStats
- type MemoryFeverZone
- type Merge
- type MergeStats
- type MissingResultError
- type Options
- type Pipeline
- func (p *Pipeline) AddEdge(from, to string)
- func (p *Pipeline) AddEdgeWithRatio(from, to string, ratio int)
- func (p *Pipeline) AddStage(name string, stats func() Stats)
- func (p *Pipeline) AncestorsOf(target string) []string
- func (p *Pipeline) DirectPredecessors(name string) []string
- func (p *Pipeline) EdgeRatio(from, to string) int
- func (p *Pipeline) Freeze()
- func (p *Pipeline) HasPath(from, to string) bool
- func (p *Pipeline) Heads() []string
- func (p *Pipeline) HeadsTo(target string) []string
- func (p *Pipeline) Incoming(name string) []IncomingEdge
- func (p *Pipeline) StageStats(name string) func() Stats
- func (p *Pipeline) Stages() []string
- type Reporter
- type ReporterOption
- type RopeController
- type RopeOption
- type RopeStats
- type ServiceTimeSummary
- type Stage
- func (s *Stage[T, R]) ActiveWorkers() int
- func (s *Stage[T, R]) Cause() error
- func (s *Stage[T, R]) CloseInput()
- func (s *Stage[T, R]) DiscardAndCause() error
- func (s *Stage[T, R]) DiscardAndWait() error
- func (s *Stage[T, R]) MaxWIP() int
- func (s *Stage[T, R]) MaxWIPWeight() int64
- func (s *Stage[T, R]) Out() <-chan rslt.Result[R]
- func (s *Stage[T, R]) PauseAdmission()
- func (s *Stage[T, R]) Paused() bool
- func (s *Stage[T, R]) ResumeAdmission()
- func (s *Stage[T, R]) SetMaxWIP(n int) int
- func (s *Stage[T, R]) SetMaxWIPWeight(n int64) int64
- func (s *Stage[T, R]) SetWorkers(n int) (int, error)
- func (s *Stage[T, R]) Stats() Stats
- func (s *Stage[T, R]) Submit(ctx context.Context, item T) error
- func (s *Stage[T, R]) TargetWorkers() int
- func (s *Stage[T, R]) Wait() error
- type Stats
- type Tee
- type TeeStats
- type WeightedBatcher
- type WeightedBatcherStats
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("toc: stage closed")
ErrClosed is returned by Stage.Submit when the stage is no longer accepting input — after Stage.CloseInput, fail-fast shutdown, or parent context cancellation. See Stage.Submit for race semantics.
var ErrStopping = errors.New("toc: stage is stopping")
ErrStopping is returned by SetWorkers when the stage is stopping or stopped.
var ErrWeightExceedsLimit = errors.New("toc: item weight exceeds MaxWIPWeight")
ErrWeightExceedsLimit is returned by Stage.Submit when the item's weight exceeds the current [Options.MaxWIPWeight]. The item is rejected immediately and never enqueued.
Functions ¶
func Adapt ¶ added in v0.84.0
Adapt converts a pair of consecutive Stats snapshots into a core.StageObservation for the deterministic analyzer. The elapsed duration is the wall-clock time between the two snapshots.
Work units are nanoseconds. CapacityWork is approximated as avgWorkers × elapsed (same approximation as the legacy analyzer). For exact CapacityWork, use a simulation adapter that tracks integrated worker-time per tick.
func BufferCapacity ¶ added in v0.82.0
BufferCapacity computes how many items a buffer needs to hold to protect the drum from starvation over a given protection window. Goldratt's formula: ceil(throughput × protectionTime).
The caller provides protectionTime — it might come from upstream lead time measurement, drum P95, or a configured value.
Returns 0 if throughput <= 0 or protectionTime <= 0.
Types ¶
type Batcher ¶
type Batcher[T any] struct { // contains filtered or unexported fields }
Batcher accumulates up to n Ok items from an upstream Result channel into batches, emitting each batch as rslt.Result[[]T]. Errors act as batch boundaries: the partial batch is flushed, then the error is forwarded, and a fresh accumulator starts.
Created by NewBatcher. The zero value is not usable.
func NewBatcher ¶
NewBatcher creates a Batcher that reads from src, accumulates up to n Ok values per batch, and emits batches on Out(). Errors from src act as batch boundaries: flush partial batch (if non-empty), forward the error, start fresh.
The Batcher drains src to completion (source ownership rule), provided the consumer drains Batcher.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src. After ctx cancellation, it switches to discard mode: continues reading src but discards all items without flushing partial batches. Batch emission and error forwarding are best-effort during shutdown — cancel may race with a successful send, so output may still appear after cancellation. All drops are reflected in stats. If the consumer stops reading Out and ctx is never canceled, the Batcher blocks on output delivery and cannot drain src.
Panics if n <= 0, src is nil, or ctx is nil.
func (*Batcher[T]) Out ¶
Out returns the receive-only output channel. It closes after the source channel closes and all batches have been emitted.
Callers MUST drain Out to completion. If the consumer stops reading, the Batcher blocks and cannot drain its source, potentially causing upstream deadlocks.
func (*Batcher[T]) Stats ¶
func (b *Batcher[T]) Stats() BatcherStats
Stats returns approximate metrics. See BatcherStats for caveats. Stats are only reliable as final values after Batcher.Wait returns.
type BatcherStats ¶
type BatcherStats struct {
Received int64 // individual items consumed from src
Emitted int64 // individual Ok items included in emitted batches
Forwarded int64 // Err items forwarded downstream
Dropped int64 // items lost to shutdown/cancel (includes partial batch items)
BufferedDepth int64 // current items in partial batch accumulator
BatchCount int64 // number of batch results emitted
OutputBlockedTime time.Duration // cumulative time blocked sending to out
}
BatcherStats holds metrics for a Batcher.
Invariant (after Wait): Received = Emitted + Forwarded + Dropped.
func (BatcherStats) ToStats ¶ added in v0.80.0
func (s BatcherStats) ToStats() Stats
ToStats converts BatcherStats to Stats for use with the analyze package.
type BufferZone ¶ added in v0.74.0
type BufferZone int
BufferZone represents Goldratt's fever chart zone for buffer penetration.
const ( // BufferGreen indicates the buffer is less than 33% full. // The constraint may be underutilized — rope may be too tight. BufferGreen BufferZone = iota // BufferYellow indicates the buffer is 33-66% full. Healthy operating zone. BufferYellow // BufferRed indicates the buffer is more than 66% full. // The constraint is about to starve — upstream can't keep up. BufferRed )
func (BufferZone) String ¶ added in v0.74.0
func (z BufferZone) String() string
type FocusingStep ¶ added in v0.83.0
type FocusingStep int
FocusingStep represents which of Goldratt's Five Focusing Steps the pipeline is currently in.
The steps are not a state machine — they are a classification of the current system condition based on observable signals. Consumers call ClassifyStep per interval from their own coordination logic.
const ( // StepIdentify is Step 1: identify the system's constraint. // No constraint has been identified yet — the analyzer is still // collecting data or no stage is saturated. StepIdentify FocusingStep = iota + 1 // StepExploit is Step 2: exploit the constraint — don't waste it. // A constraint is identified but exploitation is incomplete: the // rope/buffer is not yet active, or the drum is starving. StepExploit // StepSubordinate is Step 3: subordinate everything else. // The constraint is identified, the rope is active, and the drum // is not starving. Non-constraints defer to the drum's pace. StepSubordinate // StepElevate is Step 4: elevate the constraint's capacity. // The rebalancer is actively moving resources to the constraint // (e.g., adding workers to the drum stage). StepElevate // StepPreventInertia is Step 5: if the constraint has moved, // go back to Step 1 — do not allow inertia to become the // system's constraint. The old rope must be rebuilt for the // new drum. // // Constraint migration protocol: cancel old [RopeController] // context, call [NewRopeController] with the new drum. EWMA // state starts fresh — old drum's signals are irrelevant. // // Migration safety: the new rope controls a prefix of the old // chain. If the constraint moved upstream (embed → walk), the // new rope limits head → walk WIP, which limits what reaches // embed. The old drum's protection is preserved by reduced supply. // // CAUTION: do not remove workers from the old drum during // migration. If embed was elevated (workers added) and then // those workers are moved to walk, embed loses capacity and // may become the constraint again. Migration simulation // (examples/migration-sim) demonstrated this: stealing one // worker caused embed queue to spike and throughput to drop. // The rebalancer should NOT steal workers from a recently- // elevated stage until the new constraint is confirmed stable // across multiple intervals. StepPreventInertia )
func ClassifyStep ¶ added in v0.83.0
func ClassifyStep( prevConstraint string, currConstraint string, ropeActive bool, rebalancing bool, starving bool, ) FocusingStep
ClassifyStep determines the current focusing step from system state. Pure function — no side effects.
The caller provides prev and curr constraint names so the comparison logic lives with the classification. The caller tracks prevConstraint across intervals (typically one string variable updated per tick).
Classification priority (highest first):
- No constraint (currConstraint empty) → StepIdentify
- Constraint changed (prev ≠ curr, both non-empty) → StepPreventInertia
- Rebalancer active → StepElevate
- Drum starving or rope not active → StepExploit
- Otherwise → StepSubordinate
func (FocusingStep) String ¶ added in v0.83.0
func (s FocusingStep) String() string
type IncomingEdge ¶ added in v0.80.0
type IncomingEdge struct {
From string // predecessor stage name
Ratio int // consumption ratio (items from From per unit of output)
}
IncomingEdge describes one incoming edge to a stage.
type IntervalStats ¶ added in v0.74.0
type IntervalStats struct {
Duration time.Duration
ResetDetected bool // true if any cumulative counter decreased
// Interval deltas (cumulative counter differences).
ItemsSubmitted int64
ItemsCompleted int64 // includes failed+panicked (same as Stats.Completed)
ItemsFailed int64 // subset of completed
ItemsCanceled int64
// Derived rates. Zero when Duration <= 0 or denominator is zero.
Throughput float64 // completed items/sec (includes failed)
Goodput float64 // successful completions/sec: (completed - failed) / elapsed
ArrivalRate float64 // submitted items/sec at this stage
ErrorRate float64 // failed / completed; bounded [0,1]
MeanServiceTime time.Duration // ServiceTimeDelta / ItemsCompleted
ApproxUtilization float64 // ServiceTimeDelta / (Duration * avg workers); approximate
// Interval time deltas (cumulative across all workers).
ServiceTimeDelta time.Duration
IdleTimeDelta time.Duration
OutputBlockedDelta time.Duration
// Point-in-time gauges from curr snapshot.
CurrBufferedDepth int64
CurrQueueCapacity int
CurrBufferPenetration float64 // depth/capacity; clamped to [0,1]; 0 if unbuffered
CurrActiveWorkers int
CurrTargetWorkers int
// Interval-derived queue signal.
QueueGrowthRate float64 // items/sec; negative = draining
}
IntervalStats holds per-interval deltas between two Stats snapshots. Fields prefixed with Curr are point-in-time gauges from the curr snapshot, not interval-derived. All other fields are computed from the delta between prev and curr.
func Delta ¶ added in v0.74.0
func Delta(prev, curr Stats, elapsed time.Duration) IntervalStats
Delta computes interval stats between two Stats snapshots. elapsed is the wall-clock time between the two samples. Both snapshots should come from the same stage.
func (IntervalStats) BufferZone ¶ added in v0.74.0
func (s IntervalStats) BufferZone(yellowAt, redAt float64) BufferZone
BufferZone returns the Goldratt fever chart zone based on CurrBufferPenetration. The caller provides yellowAt and redAt penetration thresholds (0.0-1.0). Goldratt convention is 0.33/0.66.
type Join ¶ added in v0.44.0
type Join[R any] struct { // contains filtered or unexported fields }
Join is a strict branch recombination operator: it uses the first item from each of two source channels for join semantics (combine or error), then drains all remaining items from both sources (source ownership rule).
Join is designed for recombining Tee branches — each branch is expected to produce exactly one result. Missing items (source closes without producing) and extra items (source produces more than one) are contract violations handled gracefully: missing items produce MissingResultError, extra items are drained and counted in stats.
Created by NewJoin. The zero value is not usable. A Join must not be copied after first use.
func NewJoin ¶ added in v0.44.0
func NewJoin[A, B, R any]( ctx context.Context, srcA <-chan rslt.Result[A], srcB <-chan rslt.Result[B], fn func(A, B) R, ) *Join[R]
NewJoin creates a Join that uses the first item from each source for join semantics, then drains remaining items. Ok/Ok pairs are combined via fn; errors are forwarded. The result is emitted on Join.Out.
Error matrix:
- Ok(a), Ok(b) → Ok(fn(a, b))
- Ok(a), Err(e) → Err(e); a discarded
- Err(e), Ok(b) → Err(e); b discarded
- Err(ea), Err(eb) → Err(errors.Join(ea, eb))
- Ok(a), missing → Err(MissingResultError{Source: "B"}); a discarded
- missing, Ok(b) → Err(MissingResultError{Source: "A"}); b discarded
- Err(e), missing → Err(errors.Join(e, MissingResultError{Source: "B"}))
- missing, Err(e) → Err(errors.Join(MissingResultError{Source: "A"}, e))
- missing, missing → no output
Each source is drained to completion (source ownership rule). Extra items beyond the first are counted in [JoinStats.ExtraA] / [JoinStats.ExtraB]. Sources may close at different times.
fn must be a pure, synchronous combiner — it must not block, perform I/O, or depend on cancellation. fn runs on the Join's only goroutine; a blocking fn prevents cancellation observation, source draining, and Join.Wait from returning. Similarly, if the consumer stops reading Join.Out without canceling, the goroutine blocks on the output send with the same consequences. If combining can fail or block, use a downstream Pipe for the error-capable, context-aware transform. Panics in fn are recovered as rslt.PanicError.
Cancellation is best-effort and observed only in the Phase 1 select and the output send. On ctx cancellation, consumed items are discarded and both sources are drained. A pre-send checkpoint catches already-observable cancellation before attempting the output send, but a result may still be emitted if the send races with cancellation (both select cases ready). Join.Wait returns the latched context error if cancellation was observed during collection or output; Join.Wait may return nil even if the context was canceled, if the goroutine completed without observing it. Drain is unconditional — sources must close for the goroutine to exit.
Panics if srcA, srcB, ctx, or fn is nil.
func (*Join[R]) Out ¶ added in v0.44.0
Out returns the receive-only output channel. At most one result will appear on this channel.
The consumer MUST drain Out() to completion or cancel the shared context. If the consumer stops reading without canceling, the goroutine blocks on the output send.
Out() is idempotent — it always returns the same channel.
func (*Join[R]) Stats ¶ added in v0.44.0
Stats returns approximate metrics. See JoinStats for caveats. Stats are only reliable as final values after Join.Wait returns.
func (*Join[R]) Wait ¶ added in v0.44.0
Wait blocks until the goroutine exits and the output channel is closed. Returns a latched context error if cancellation was observed during collection or output. May return nil even if the context was canceled, if the goroutine completed without observing it (e.g., both sources closed before cancellation was checked).
Multiple Wait() calls are safe — subsequent calls return immediately with the same value.
type JoinStats ¶ added in v0.44.0
type JoinStats struct {
ReceivedA int64 // total items consumed from srcA
ReceivedB int64 // total items consumed from srcB
Combined int64 // successful fn(a,b) combinations (0 or 1)
Errors int64 // error results delivered to Out (0 or 1)
DiscardedA int64 // A items consumed but not part of a successful combination (error, missing, cancel, panic)
DiscardedB int64 // B items consumed but not part of a successful combination (error, missing, cancel, panic)
ExtraA int64 // A items beyond the first, drained after the join decision (contract violation)
ExtraB int64 // B items beyond the first, drained after the join decision (contract violation)
OutputBlockedTime time.Duration // time blocked sending result to out
}
JoinStats holds metrics for a Join.
Fields are read from independent atomics, so a mid-flight Stats value is NOT a consistent snapshot. Invariants are guaranteed only after Join.Wait returns.
Conservation invariant (after Wait):
- ReceivedA = Combined + DiscardedA + ExtraA
- ReceivedB = Combined + DiscardedB + ExtraB
- Combined + Errors <= 1
Counter precedence: DiscardedX counts only first items that were consumed but not successfully combined (error, cancel, panic, or other side missing). ExtraX counts items beyond the first, drained after the join decision is reached. Post-decision items are always classified as ExtraX, even if cancellation later prevents result delivery.
type MemoryFeverZone ¶ added in v0.82.0
type MemoryFeverZone int
MemoryFeverZone classifies memory headroom.
const ( // MemoryUnknown indicates the memory limit is unavailable or zero. MemoryUnknown MemoryFeverZone = iota // MemoryGreen indicates headroom is above the yellow threshold. MemoryGreen // MemoryYellow indicates headroom is between yellow and red thresholds. MemoryYellow // MemoryRed indicates headroom is below the red threshold. MemoryRed )
func MemoryFever ¶ added in v0.82.0
func MemoryFever(headroom, limit uint64, yellowAt, redAt float64) MemoryFeverZone
MemoryFever classifies memory headroom into a fever zone. yellowAt and redAt are penetration thresholds (0.0-1.0), where penetration = 1.0 - (headroom / limit). Goldratt convention: 0.33/0.66. Returns MemoryUnknown if limit == 0.
func (MemoryFeverZone) String ¶ added in v0.82.0
func (z MemoryFeverZone) String() string
type Merge ¶ added in v0.44.0
type Merge[T any] struct { // contains filtered or unexported fields }
Merge is a nondeterministic interleaving fan-in from N sources.
One goroutine per source forwards items to a shared unbuffered output channel. Go runtime scheduler determines send order — no cross-source ordering guarantee, no fairness guarantee, no provenance tracking. Per-source order IS preserved: items from each individual source appear in the merged output in the same order they were received from that source (follows from one goroutine per source with sequential receive/send).
Merge is NOT the inverse of Tee. Tee broadcasts identical items to all branches. Merge interleaves distinct items from independent sources. Tee → ... → Merge does not restore original ordering, does not correlate outputs from sibling branches, and does not pair items across sources.
Created by NewMerge. The zero value is not usable. A Merge must not be copied after first use.
func NewMerge ¶ added in v0.44.0
NewMerge creates a Merge that reads from each source and forwards all items to a single unbuffered output channel. Items are interleaved nondeterministically — Go runtime scheduler determines send order.
Each source is drained to completion by its own goroutine (source ownership rule). Sources may close at different times — early closure of one source does not affect others. All sources must be finite and must eventually close (including on cancellation paths). If a source never closes, the corresponding goroutine blocks indefinitely and Merge.Wait hangs.
Cancellation is advisory, not a hard stop. On ctx cancellation, each source goroutine enters discard mode at its next cancellation checkpoint: stops forwarding but continues draining its source to completion. Two cancellation checkpoints per iteration: a non-blocking pre-send check and a blocking send-select. This bounds post-cancel forwarding to at most 1 item per source goroutine that has already passed the pre-send checkpoint. Cancel-aware sends ensure goroutines are not blocked on output when downstream stops reading. If a goroutine is blocked waiting on its source (for r := range src) when ctx cancels, it does not observe cancellation until the source produces an item or closes.
Merge.Wait returns only after all source goroutines exit — which requires all sources to close. Cancellation alone does not guarantee prompt return. After observing cancellation, each goroutine drains and discards remaining items from its source until that source closes.
Merge.Out is closed before done is closed, so Out() is guaranteed closed before Wait() returns. Callers can safely range Out() and then call Wait().
Goroutine lifecycle: constructor launches N source goroutines + 1 closer goroutine. Each source goroutine drains its source and sends to output. The closer goroutine waits on a WaitGroup for all source goroutines, closes output, and closes done.
Each source channel must be distinct and exclusively owned by the Merge. Passing the same channel twice creates two goroutines racing on one source — per-source ordering and stats become meaningless. The constructor does not check for duplicates.
Panics if len(sources) == 0, ctx is nil, or any source is nil.
func (*Merge[T]) Out ¶ added in v0.44.0
Out returns the receive-only output channel. All items from all sources appear on this single channel in nondeterministic order.
The consumer MUST drain Out() to completion or cancel the shared context. If the consumer stops reading without canceling, all source goroutines block on the shared output send and cannot drain their sources.
Out() is idempotent — it always returns the same channel.
func (*Merge[T]) Stats ¶ added in v0.44.0
func (m *Merge[T]) Stats() MergeStats
Stats returns approximate metrics. See MergeStats for caveats.
Per-source counters are loaded once into plain int64 slices, then aggregates are computed from those copied values. This guarantees single-call coherence: Received == sum(SourceReceived) within one Stats() return, even mid-flight.
Stats are only reliable as final values after Merge.Wait returns.
func (*Merge[T]) Wait ¶ added in v0.44.0
Wait blocks until all source goroutines exit and the output channel is closed. Returns a latched context error if any source goroutine entered a cancel path (pre-send checkpoint or send-select), nil otherwise.
Wait may return nil even if ctx was canceled. This happens when no goroutine observes cancellation on a checked path — e.g., all sources close before any goroutine loops back to the pre-send check, or a goroutine is blocked in range src when cancel fires and the source closes without sending. This is intentional: the operator completed its work, cancellation had no observable effect on forwarding, and reporting it would be a false positive.
Multiple Wait() calls are safe — subsequent calls return immediately with the same value.
type MergeStats ¶ added in v0.44.0
type MergeStats struct {
Received int64 // total items consumed from all sources
Forwarded int64 // items sent to output
Dropped int64 // items discarded during cancel
// Per-source stats. Len == N (number of sources).
// Index i corresponds to sources[i] as passed to NewMerge.
SourceReceived []int64
SourceForwarded []int64
SourceDropped []int64
}
MergeStats holds metrics for a Merge.
Fields are derived from per-source atomic counters, so a mid-flight Stats value is NOT a consistent snapshot — cross-metric invariants (e.g., Received == Forwarded + Dropped) may not hold because an in-flight item has been counted as received but not yet as forwarded or dropped. Invariants are guaranteed only after Merge.Wait returns.
Per-metric aggregates are coherent within a single Stats() call even mid-flight: Received == sum(SourceReceived), Forwarded == sum(SourceForwarded), Dropped == sum(SourceDropped). This holds because aggregates are computed from the same copied values.
Invariant (after Wait): Received = Forwarded + Dropped. Per-source invariant (after Wait): SourceReceived[i] = SourceForwarded[i] + SourceDropped[i].
SourceReceived[i] corresponds to sources[i] as passed to NewMerge. The index mapping is stable and matches construction order.
type MissingResultError ¶ added in v0.44.0
type MissingResultError struct {
Source string // "A" or "B"
}
MissingResultError indicates a source closed without producing a result. Callers can use errors.As to extract the Source field and determine which side was missing.
func (*MissingResultError) Error ¶ added in v0.44.0
func (e *MissingResultError) Error() string
type Options ¶
type Options[T any] struct { // Capacity is the number of items the input buffer can hold. // Submit blocks when the buffer is full. // Zero means unbuffered: Submit blocks until a worker dequeues. // Negative values panic. Capacity int // MaxWIP is the maximum number of admitted items (buffered + // in-flight). Submit blocks when the limit is reached (the "rope"). // Zero means default: Capacity + Workers (backward compatible). // Clamped to max(1, min(MaxWIP, Capacity + Workers)) at construction. // Adjust at runtime with [Stage.SetMaxWIP]. MaxWIP int // MaxWIPWeight is the maximum accumulated weight of admitted items. // Zero means disabled (default, backward compatible). If > 0, // admission checks accumulated weight alongside count. Items with // weight > MaxWIPWeight are rejected with [ErrWeightExceedsLimit]. // Adjust at runtime with [Stage.SetMaxWIPWeight]. MaxWIPWeight int64 // Weight returns the cost of item t for stats tracking only // ([Stats.InFlightWeight]). Does not affect admission — capacity // is count-based. Called on the Submit path, so must be cheap. // Must be pure, non-negative, and safe for concurrent calls. // If nil, every item costs 1. Weight func(T) int64 // Workers is the number of concurrent fn invocations. // Zero means default: 1 (serial constraint — the common case). // Negative values panic. Workers int // ContinueOnError, when true, keeps processing after fn errors // instead of cancelling the stage. Default: false (fail-fast). ContinueOnError bool // TrackAllocations, when true, samples process-wide heap allocation // counters (runtime/metrics /gc/heap/allocs:bytes and :objects) before // and after each fn invocation and accumulates the deltas into // [Stats.ObservedAllocBytes] and [Stats.ObservedAllocObjects]. // // Scope: each sample captures the invocation window of a single fn // call. Counters are process-global — they include allocations by any // goroutine during that window, not just the stage's own work. // // Concurrent over-attribution: with Workers > 1, overlapping // invocation windows can each capture the same unrelated allocation, // so per-stage totals can exceed actual process allocations. Totals // are also not additive across stages for the same reason. // // Overhead: on the order of 1µs per item in single-worker throughput // benchmarks (two runtime/metrics.Read calls plus counter extraction // and atomic accumulation). Negligible when fn does real work; // roughly doubles overhead for no-op or sub-microsecond fns. // Multi-worker contention on shared atomic counters may add cost. // // Default: false (disabled). Enable when diagnosing allocation-heavy // stages. Silently disabled if the runtime does not support the // required metrics (validated on first use via sync.Once). TrackAllocations bool // TrackServiceTimeDist, when true, records per-item service time // into a per-worker HDR histogram. Stats gains [ServiceTimeSummary] // with p50/p95/p99/min/max/mean/stddev. Cumulative since Start. // Operational telemetry with ~1% precision. TrackServiceTimeDist bool }
Options configures a Stage.
Total stage WIP (item count) is up to Capacity (buffered) + Workers (in-flight). Capacity is always an item-count bound; [Options.Weight] affects stats only, not admission.
type Pipeline ¶ added in v0.80.0
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline is a DAG topology descriptor for a toc pipeline. Configure with Pipeline.AddStage and Pipeline.AddEdge, then call Pipeline.Freeze. After Freeze, the Pipeline is immutable and safe for concurrent reads.
Pipeline does NOT own or create stages. It stores stage names and stats accessors only — a passive metadata layer over existing stages. Consumers use target-relative queries (Pipeline.AncestorsOf, Pipeline.HeadsTo, Pipeline.DirectPredecessors) to reason about upstream subgraphs relative to a chosen drum.
Panics on misconfiguration (empty names, nil stats, cycles, etc.) following the toc convention: configuration errors are programming bugs, caught on first run during development.
func NewPipeline ¶ added in v0.80.0
func NewPipeline() *Pipeline
NewPipeline creates an empty pipeline topology.
func (*Pipeline) AddEdge ¶ added in v0.80.0
AddEdge registers a directed edge from → to with a 1:1 consumption ratio. Both stages must already be registered. Panics if from or to is unknown, the edge is a duplicate, from == to (self-loop), or the pipeline is frozen.
func (*Pipeline) AddEdgeWithRatio ¶ added in v0.80.0
AddEdgeWithRatio registers a directed edge with a consumption ratio. The ratio specifies how many items from 'from' are consumed to produce one unit of output at 'to' — the Bill of Materials ratio for this edge. For example, ratio=2 means 'to' consumes 2 items from 'from' per output. The rope uses this to release inputs at the correct ratio to maximize goodput at merge points.
Panics if ratio <= 0, from or to is unknown, the edge is a duplicate, from == to (self-loop), or the pipeline is frozen.
func (*Pipeline) AddStage ¶ added in v0.80.0
AddStage registers a named stage with its stats accessor. Panics if name is empty, stats is nil, name is duplicate, or the pipeline is frozen.
func (*Pipeline) AncestorsOf ¶ added in v0.80.0
AncestorsOf returns all stages transitively upstream of the target, in BFS order (closest first). Excludes the target itself. Panics if name is unknown or pipeline is not frozen.
func (*Pipeline) DirectPredecessors ¶ added in v0.80.0
DirectPredecessors returns the immediate upstream stages of the named stage. Panics if name is unknown or pipeline is not frozen.
func (*Pipeline) EdgeRatio ¶ added in v0.80.0
EdgeRatio returns the consumption ratio for the edge from → to. Returns 1 for edges added with Pipeline.AddEdge. Returns the explicit ratio for edges added with Pipeline.AddEdgeWithRatio. Panics if the edge does not exist or pipeline is not frozen.
func (*Pipeline) Freeze ¶ added in v0.80.0
func (p *Pipeline) Freeze()
Freeze validates the topology and makes the pipeline read-only. Builds adjacency lists and computes heads (zero in-degree stages). Panics if the graph contains a cycle, has no stages, or Freeze was already called.
func (*Pipeline) HasPath ¶ added in v0.80.0
HasPath returns true if there is a directed path from → to. Panics if either name is unknown or pipeline is not frozen.
func (*Pipeline) Heads ¶ added in v0.80.0
Heads returns all zero in-degree stages (pipeline entry points), in registration order. Panics if not frozen.
func (*Pipeline) HeadsTo ¶ added in v0.80.0
HeadsTo returns the subset of heads that can reach the target stage. If the pipeline has multiple entry points but only some feed the target (drum), only those heads are returned. Uses a single reverse BFS from target, then intersects with heads. Panics if target is unknown or pipeline is not frozen.
func (*Pipeline) Incoming ¶ added in v0.80.0
func (p *Pipeline) Incoming(name string) []IncomingEdge
Incoming returns the incoming edges to the named stage, each with its predecessor name and consumption ratio. The rope controller uses this to compute per-predecessor budget allocation at merge points. Panics if name is unknown or pipeline is not frozen.
func (*Pipeline) StageStats ¶ added in v0.80.0
StageStats returns the stats accessor for a named stage. Panics if name is unknown or pipeline is not frozen.
type Reporter ¶ added in v0.66.0
type Reporter struct {
// contains filtered or unexported fields
}
Reporter periodically logs pipeline stats and process memory. Create with NewReporter, register stages with [AddStage], then call [Run] to start logging.
Config is frozen after Run starts — AddStage panics if called after Run. Run panics if called twice.
Provider contract: functions passed to AddStage must be fast (< 1ms typical), non-blocking, and safe for concurrent calls. Panics are recovered and logged; hangs stall the reporting loop.
func NewReporter ¶ added in v0.66.0
func NewReporter(interval time.Duration, opts ...ReporterOption) *Reporter
NewReporter creates a reporter that logs every interval. Panics if interval <= 0.
type ReporterOption ¶ added in v0.66.0
type ReporterOption func(*Reporter)
ReporterOption configures a Reporter.
func WithLogger ¶ added in v0.66.0
func WithLogger(l *log.Logger) ReporterOption
WithLogger sets the logger for reporter output. If l is nil, log.Default is used.
type RopeController ¶ added in v0.81.0
type RopeController struct {
// contains filtered or unexported fields
}
RopeController is a periodic controller that bounds aggregate upstream WIP between the pipeline head and the drum (constraint) by adjusting the head stage's MaxWIP.
It computes rope length from drum goodput, upstream flow time, and a safety factor using a Little's Law heuristic. This is an approximate soft control — SetMaxWIP cannot revoke existing permits and has a floor of 1. After a target decrease, already-admitted items persist until completion.
Phase 3 scope: single head, linear chain (no branches between head and drum), count-based (not weight-aware).
Create with NewRopeController, configure with RopeOption functions, then call RopeController.Run.
func NewRopeController ¶ added in v0.81.0
func NewRopeController( pipeline *Pipeline, drum string, setHeadWIP func(int) int, stageSnapshot func(string) IntervalStats, interval time.Duration, opts ...RopeOption, ) *RopeController
NewRopeController creates a rope controller for the given pipeline and drum stage.
The pipeline must be frozen and contain exactly one head feeding the drum via a linear chain (no branches or joins between head and drum).
setHeadWIP is typically headStage.SetMaxWIP. stageSnapshot returns the latest IntervalStats for a named stage.
Panics if pipeline is not frozen, drum is unknown, topology is not a single linear chain from head to drum, setHeadWIP or stageSnapshot is nil, or interval <= 0.
func (*RopeController) Run ¶ added in v0.81.0
func (rc *RopeController) Run(ctx context.Context)
Run blocks, adjusting rope length every interval until ctx is canceled. Panics if called twice.
func (*RopeController) RunWithTicker ¶ added in v0.81.0
func (rc *RopeController) RunWithTicker(ctx context.Context, ticks <-chan time.Time)
RunWithTicker is like RopeController.Run but uses the provided tick channel instead of creating a real ticker. For testing. Panics if called twice or after Run.
func (*RopeController) Stats ¶ added in v0.81.0
func (rc *RopeController) Stats() RopeStats
Stats returns a snapshot of the rope controller's current state. Safe for concurrent calls.
type RopeOption ¶ added in v0.81.0
type RopeOption func(*RopeController)
RopeOption configures a RopeController.
func WithInitialRopeLength ¶ added in v0.81.0
func WithInitialRopeLength(n int) RopeOption
WithInitialRopeLength sets the rope length used before the first valid goodput measurement. Default is 1 (conservative). Must be >= 1.
func WithRopeLogger ¶ added in v0.81.0
func WithRopeLogger(l *log.Logger) RopeOption
WithRopeLogger sets the logger. If nil, log.Default is used.
func WithRopeSafetyFactor ¶ added in v0.81.0
func WithRopeSafetyFactor(factor float64) RopeOption
WithRopeSafetyFactor sets the safety multiplier for rope length. Default is 1.5. Panics if factor <= 0.
type RopeStats ¶ added in v0.81.0
type RopeStats struct {
RopeLength int // current computed rope length
RopeWIP int // current aggregate WIP across upstream stages
RopeUtilization float64 // WIP / Length; 0 if length is 0
DrumGoodput float64 // EWMA-smoothed drum goodput (items/sec)
DrumErrorRate float64 // EWMA-smoothed drum error rate
AdjustmentCount int64 // how many times rope was adjusted
HeadAppliedWIP int // last value returned by setHeadWIP
}
RopeStats is a point-in-time snapshot of the controller's state.
type ServiceTimeSummary ¶ added in v0.75.0
type ServiceTimeSummary struct {
Count int64
Min time.Duration
Max time.Duration
Mean time.Duration
StdDev time.Duration
P50 time.Duration
P95 time.Duration
P99 time.Duration
Underflow int64 // items with duration below recordable minimum (< 1ns)
Overflow int64 // items with duration above recordable maximum (> 10min)
}
ServiceTimeSummary holds the distribution of per-item service times. Cumulative since Start. Zero when [Options.TrackServiceTimeDist] is disabled or no items processed.
Operational telemetry with ~1% precision. For recent-only distribution, a future windowed mode will use per-worker histogram rotation.
type Stage ¶
type Stage[T, R any] struct { // contains filtered or unexported fields }
Stage is a running constrained stage. Created by Start. The zero value is not usable.
func Pipe ¶
func Pipe[T, R any]( ctx context.Context, src <-chan rslt.Result[T], fn func(context.Context, T) (R, error), opts Options[T], ) *Stage[T, R]
Pipe creates a stage that reads from an upstream Result channel, forwarding Ok values to fn via workers and passing Err values directly to the output (error passthrough). The feeder goroutine drains src to completion, provided the consumer drains Stage.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src.
Error passthrough is best-effort during shutdown: if ctx is canceled or fail-fast fires, upstream Err values may be dropped instead of forwarded (reflected in [Stats.Dropped]). During normal operation, all upstream errors are forwarded.
The returned stage's input side is owned by the feeder — do not call Stage.Submit or Stage.CloseInput directly. Both are handled gracefully (no panic, no deadlock) but are misuse. External Submit calls void the stats invariant (Received will not account for externally submitted items).
Stats: [Stats.Received] = [Stats.Submitted] + [Stats.Forwarded] + [Stats.Dropped]. Forwarded errors do not trigger fail-fast and do not affect Stage.Wait.
Panics if ctx is nil, src is nil, or fn is nil.
func Start ¶
func Start[T, R any]( ctx context.Context, fn func(context.Context, T) (R, error), opts Options[T], ) *Stage[T, R]
Start launches a constrained stage that processes items through fn.
The stage starts a cancel watcher goroutine that calls Stage.CloseInput when the context is canceled (either parent cancel or fail-fast). This ensures workers always eventually exit.
Panics if ctx is nil, fn is nil, Capacity is negative, or Workers is negative.
func (*Stage[T, R]) ActiveWorkers ¶ added in v0.71.0
ActiveWorkers returns the number of live (non-exited) workers. May differ from TargetWorkers during scale-down drain.
func (*Stage[T, R]) Cause ¶
Cause returns the latched terminal cause of the stage, blocking until shutdown completes. Like Stage.Wait, Cause does not initiate shutdown. Unlike Stage.Wait, Cause distinguishes all three outcomes:
- nil: all items completed successfully (or ContinueOnError with no cancel)
- fail-fast error: the first fn error that caused shutdown
- parent cancel cause: context.Cause of the parent context at completion
The terminal cause is latched when the last worker finishes, so Cause is stable and idempotent — it returns the same value regardless of later parent context changes. If parent cancellation races with a worker error, Cause may return either depending on observation order.
A parent cancellation that occurs after fn returns but before the worker completes result handoff is reported as stage cancellation, even though all business-level computations succeeded. Use Stage.Wait if only fail-fast errors matter.
Requires Out to be drained (see Stage.DiscardAndCause when individual results are not needed).
func (*Stage[T, R]) CloseInput ¶
func (s *Stage[T, R]) CloseInput()
CloseInput signals that no more items will be submitted. Workers finish processing buffered items, then shut down.
Linearization point: closedForAdmission=true under admissionMu. Any Submit whose acquireAdmission serializes after this point returns ErrClosed without creating a permit. A Submit already granted before this point may still succeed or roll back via trySend — that is a concurrent operation that linearized before close, not a post-close admission.
Close state is split across three signals:
- closedForAdmission (admissionMu): authoritative gate for permit creation
- closed (atomic): fast-path rejection in Submit and trySend
- closing (channel): shutdown broadcast to blocked selects
These are set in order; closed/closing may lag closedForAdmission.
Blocks briefly until all in-flight Stage.Submit calls exit, then closes the input channel.
Idempotent — safe to call multiple times (use defer as safety net). Also called internally on fail-fast or parent context cancellation.
func (*Stage[T, R]) DiscardAndCause ¶
DiscardAndCause drains all remaining results from Stage.Out and returns Stage.Cause's latched terminal cause. Use when individual results are not needed but composable terminal status is required.
Requires exclusive ownership of Stage.Out — must not be called while another goroutine is reading Out. Mixing DiscardAndCause with direct Out consumption causes a consumption race (results go to the wrong reader).
func (*Stage[T, R]) DiscardAndWait ¶
DiscardAndWait drains all remaining results from Stage.Out and returns Stage.Wait's error. Use when individual results are not needed.
Requires exclusive ownership of Stage.Out — must not be called while another goroutine is reading Out. Mixing DiscardAndWait with direct Out consumption causes a consumption race (results go to the wrong reader).
func (*Stage[T, R]) MaxWIPWeight ¶ added in v0.68.0
MaxWIPWeight returns the current weight-based WIP limit. Zero means disabled.
func (*Stage[T, R]) Out ¶
Out returns the receive-only output channel. It closes after all workers finish and all results have been sent.
Cardinality: every successful Stage.Submit produces exactly one result.
Ordering: with Workers == 1, results are delivered in submit order. With Workers > 1, result order is nondeterministic.
Callers MUST drain Out to completion (or use Stage.DiscardAndWait / Stage.DiscardAndCause):
for result := range stage.Out() {
val, err := result.Unpack()
// handle result — do NOT break out of this loop
}
After cancellation or fail-fast, Out may still emit: success results from work already in fn, ordinary error results from in-flight work, and canceled results for buffered items drained post-cancel. With Workers > 1, the fail-fast triggering error is not guaranteed to appear before cancellation results; use Stage.Wait or Stage.Cause for stage-level terminal status, not stream order.
If the consumer stops reading, workers block sending results, which prevents shutdown and causes Stage.Wait to hang — leaking goroutines, context resources, and the stage itself. This is the same contract as consuming from any Go channel-based pipeline.
func (*Stage[T, R]) PauseAdmission ¶ added in v0.65.0
func (s *Stage[T, R]) PauseAdmission()
PauseAdmission blocks all new admissions. In-flight items complete normally. Queued waiters are held (not rejected) until [ResumeAdmission]. Concurrency-safe. Idempotent.
func (*Stage[T, R]) Paused ¶ added in v0.65.0
Paused returns true if admission is paused via [PauseAdmission].
func (*Stage[T, R]) ResumeAdmission ¶ added in v0.65.0
func (s *Stage[T, R]) ResumeAdmission()
ResumeAdmission unblocks admission and wakes held waiters. Concurrency-safe. Idempotent.
func (*Stage[T, R]) SetMaxWIP ¶ added in v0.59.0
SetMaxWIP dynamically adjusts the WIP limit. Wakes blocked Submits if the new limit is higher than the current one. The value is clamped to [1, Capacity+Workers]. Returns the applied value. Concurrency-safe.
func (*Stage[T, R]) SetMaxWIPWeight ¶ added in v0.68.0
SetMaxWIPWeight dynamically adjusts the weight-based WIP limit. Zero disables weight limiting. Negative values are clamped to 0. Wakes blocked Submits if the new limit allows more weight. Returns the applied value. Concurrency-safe.
func (*Stage[T, R]) SetWorkers ¶ added in v0.71.0
SetWorkers adjusts the number of workers. New workers start immediately. Excess workers are cancelled LIFO and drain their current item before exiting. A cancelled worker MAY process one more item before exiting (Go select nondeterminism). n must be >= 1. Returns the applied count and nil, or (0, ErrStopping) if the stage is stopping/stopped. Concurrency-safe.
func (*Stage[T, R]) Stats ¶
Stats returns approximate metrics. See Stats for caveats. Stats are only reliable as final values after all Stage.Submit calls have returned and Stage.Wait returns.
func (*Stage[T, R]) Submit ¶
Submit sends item into the stage for processing. Blocks when the buffer is full (backpressure / "rope"). Returns ErrClosed after Stage.CloseInput, fail-fast shutdown, or parent context cancellation.
If cancellation or CloseInput has already occurred before Submit is called, Submit deterministically returns ErrClosed without blocking. A Submit that is blocked or entering concurrently when shutdown fires may nondeterministically succeed or return an error, per Go select semantics — even a blocked Submit can succeed if capacity becomes available at the same instant. Items admitted during this window are processed normally (or canceled if the stage context is already done).
The ctx parameter controls only admission blocking — it is NOT passed to fn. The stage's own context (derived from the ctx passed to Start) is what fn receives. This means canceling a submitter's context does not cancel the item's processing once admitted.
Panics if ctx is nil (same as context.Context method calls). Panics if Weight returns a negative value. Note: a panic in Weight propagates to the caller (unlike fn panics, which are recovered and wrapped in rslt.PanicError). Safe for concurrent use from multiple goroutines. Safe to call concurrently with Stage.CloseInput (will not panic).
func (*Stage[T, R]) TargetWorkers ¶ added in v0.71.0
TargetWorkers returns the current desired worker count.
func (*Stage[T, R]) Wait ¶
Wait blocks until all workers have finished and Out is closed. Wait does not initiate shutdown — call Stage.CloseInput first. Without CloseInput, Wait blocks forever if no more items are submitted. Requires that Out is drained concurrently (see Stage.Out). The stage is complete when all workers have finished, terminal status is latched, and the done channel closes.
Returns the first observed fail-fast error, or nil. Specifically:
- In fail-fast mode: returns the first fn error that caused shutdown (nondeterministic among concurrent workers — first to acquire lock)
- In ContinueOnError mode: always returns nil (check individual results)
- On parent context cancellation: returns nil (caller already knows; errors returned by fn after parent cancel are not stored)
If parent cancellation races with a worker error, Wait may return either nil or the error depending on observation order. Use Stage.Cause for terminal status that distinguishes all three outcomes.
type Stats ¶
type Stats struct {
Submitted int64 // items accepted by Submit (successful return)
Completed int64 // items where fn returned (includes Failed and Panicked)
Failed int64 // subset of Completed where result is error (includes Panicked)
Panicked int64 // subset of Failed where fn panicked
Canceled int64 // items dequeued but not passed to fn because cancellation was observed first (not in Completed)
// Pipe-specific counters. Zero for Start-created stages.
// Invariant (after Wait): Received = Submitted + Forwarded + Dropped.
Received int64 // items consumed from src by feeder (any Result)
Forwarded int64 // upstream Err items sent directly to out (bypassed fn)
Dropped int64 // items seen but neither submitted nor forwarded (shutdown/cancel)
ServiceTime time.Duration // cumulative time fn was executing
IdleTime time.Duration // cumulative worker time waiting for input (includes startup and tail wait)
OutputBlockedTime time.Duration // cumulative worker time blocked handing result to consumer (unbuffered out channel)
BufferedDepth int64 // approximate items in queue; may transiently be negative mid-flight; 0 when Capacity is 0 (unbuffered)
InFlightWeight int64 // weighted cost of items currently in fn (stats-only, not admission)
QueueCapacity int // configured capacity
Paused bool // true if admission is paused via PauseAdmission
MaxWIP int // current WIP limit (>= 1)
MaxWIPWeight int64 // current weight limit (0 = disabled)
AdmittedWeight int64 // current accumulated weight of admitted items
Admitted int64 // reserved permits, not active workers. Includes buffered, in-flight, and reserved-but-not-yet-enqueued. Can exceed current MaxWIP after SetMaxWIP shrinks the limit (existing permits are not revoked) or during the brief grant-to-enqueue window. Not a hard invariant gauge — use for observability, not alerting thresholds.
WaiterCount int // current number of Submits blocked on rope
MaxWaiterCount int // high-water mark for waiter queue depth
TargetWorkers int // desired worker count from SetWorkers
ActiveWorkers int // currently running workers (may lag target during drain)
WIPWaitCount int64 // cumulative: total submissions that blocked on WIP limit (not current blocked count)
WIPWaitNs int64 // cumulative WIP-limit wait time (nanoseconds)
ServiceTimeDist ServiceTimeSummary // zero if TrackServiceTimeDist disabled
// AllocTrackingActive reports whether allocation sampling is
// effectively enabled for this stage. False when
// [Options.TrackAllocations] was not set, or when the runtime does
// not support the required metrics. Allows callers to distinguish
// "tracking requested but unsupported" from "tracking not requested"
// or "tracking active but fn allocated zero."
AllocTrackingActive bool
// ObservedAllocBytes and ObservedAllocObjects are cumulative heap
// allocation counters sampled via runtime/metrics around each fn
// invocation. Zero when AllocTrackingActive is false.
//
// Process-global, not stage-exclusive: includes allocations by any
// goroutine during each fn invocation window. With Workers > 1,
// overlapping windows can capture the same unrelated allocation in
// multiple workers, so per-stage totals can exceed actual process
// allocations over the same period. Not additive across stages.
// Biased upward by longer service times (more background noise).
// Best used as a directional signal under stable workload where the
// stage dominates allocations, not for precise attribution. For
// exact allocation profiling, use go tool pprof.
ObservedAllocBytes uint64
ObservedAllocObjects uint64
}
Stats holds approximate metrics for a Stage.
Fields are read from independent atomics, so a single Stats value is NOT a consistent snapshot — individual fields may reflect slightly different moments. Relationships like Submitted >= Completed + Canceled are not guaranteed mid-flight. Stats are only reliable as final values after all Stage.Submit calls have returned and Stage.Wait returns.
All durations are cumulative across all workers since Start. With Workers > 1, durations can exceed wall-clock time.
type Tee ¶ added in v0.43.0
type Tee[T any] struct { // contains filtered or unexported fields }
Tee is a synchronous lockstep broadcast from one source to N branches.
Created by NewTee. The zero value is not usable. A Tee must not be copied after first use.
func NewTee ¶ added in v0.43.0
NewTee creates a Tee that reads from src and broadcasts each item to n unbuffered output branches. Items are sent to branches sequentially in index order — branch 0 first, then branch 1, etc. The slowest consumer governs pace (synchronous lockstep).
The Tee drains src to completion (source ownership rule), provided all branch consumers drain their branches or ctx is canceled, and src eventually closes. Cancellation unblocks branch sends but does not force-close src. After ctx cancellation, it switches to discard mode: continues reading src but discards all items. Branch sends are best-effort during shutdown — cancel may race with a successful send, so output may still appear after cancellation. All drops are reflected in stats. If a branch consumer stops reading and ctx is never canceled, the Tee blocks on that branch's send and stalls all branches.
Tee does not clone payloads. Reference-containing payloads (pointers, slices, maps) may alias across branches. Consumers must treat received values as immutable; mutation after receipt is a data race.
Panics if n <= 0, src is nil, or ctx is nil.
func (*Tee[T]) Branch ¶ added in v0.43.0
Branch returns the receive-only output channel for branch i.
Callers MUST drain all branches to completion or cancel the shared context. An undrained branch blocks the Tee and stalls all branches.
Panics if i is out of range [0, n).
type TeeStats ¶ added in v0.43.0
type TeeStats struct {
Received int64 // items consumed from src
FullyDelivered int64 // items sent to ALL branches
PartiallyDelivered int64 // items sent to some branches before cancel (≥1, <N)
Undelivered int64 // items not sent to any branch (cancel before first send, or discard mode)
// Per-branch stats. Len == N (number of branches).
// BranchDelivered[i] = items successfully sent to branch i.
// BranchBlockedTime[i] = cumulative time blocked sending to branch i.
BranchDelivered []int64
BranchBlockedTime []time.Duration
}
TeeStats holds metrics for a Tee.
Fields are read from independent atomics, so a mid-flight Stats value is NOT a consistent snapshot — individual fields may reflect different moments. Invariants are guaranteed only after Tee.Wait returns.
Invariant (after Wait): Received = FullyDelivered + PartiallyDelivered + Undelivered.
PartiallyDelivered is at most 1 per Tee lifetime: once cancellation interrupts delivery mid-item, the goroutine enters discard mode and does not attempt delivery on subsequent items.
BranchBlockedTime[i] measures direct send-wait time on branch i. It does not measure end-to-end latency imposed by other branches. Because branches are sent in index order, earlier branches' blocked time reflects their consumer's speed directly; later branches' blocked time is near zero even if they are throttled by earlier branches.
type WeightedBatcher ¶
type WeightedBatcher[T any] struct { // contains filtered or unexported fields }
WeightedBatcher accumulates Ok items from an upstream Result channel into batches, flushing when accumulated weight OR item count reaches the threshold. Each item's weight is determined by weightFn. Errors act as batch boundaries: the partial batch is flushed, then the error is forwarded, and a fresh accumulator starts.
Created by NewWeightedBatcher. The zero value is not usable.
func NewWeightedBatcher ¶
func NewWeightedBatcher[T any]( ctx context.Context, src <-chan rslt.Result[T], threshold int, weightFn func(T) int, ) *WeightedBatcher[T]
NewWeightedBatcher creates a WeightedBatcher that reads from src, accumulates Ok values, and emits batches on Out(). A batch is flushed when either accumulated weight (per weightFn) reaches threshold or item count reaches threshold — whichever comes first. The item-count fallback prevents unbounded accumulation of zero/low-weight items. Errors from src act as batch boundaries: flush partial batch (if non-empty), forward the error, start fresh.
weightFn must return a non-negative weight for each item. A panic occurs if weightFn returns a negative value.
The WeightedBatcher drains src to completion (source ownership rule), provided the consumer drains WeightedBatcher.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src. After ctx cancellation, it switches to discard mode: continues reading src but discards all items without flushing partial batches. Batch emission and error forwarding are best-effort during shutdown — cancel may race with a successful send, so output may still appear after cancellation. All drops are reflected in stats. If the consumer stops reading Out and ctx is never canceled, the WeightedBatcher blocks on output delivery and cannot drain src.
Panics if threshold <= 0, weightFn is nil, src is nil, or ctx is nil.
func (*WeightedBatcher[T]) Out ¶
func (b *WeightedBatcher[T]) Out() <-chan rslt.Result[[]T]
Out returns the receive-only output channel. It closes after the source channel closes and all batches have been emitted.
Callers MUST drain Out to completion. If the consumer stops reading, the WeightedBatcher blocks and cannot drain its source, potentially causing upstream deadlocks.
func (*WeightedBatcher[T]) Stats ¶
func (b *WeightedBatcher[T]) Stats() WeightedBatcherStats
Stats returns approximate metrics. See WeightedBatcherStats for caveats. Stats are only reliable as final values after WeightedBatcher.Wait returns.
func (*WeightedBatcher[T]) Wait ¶
func (b *WeightedBatcher[T]) Wait() error
Wait blocks until the WeightedBatcher goroutine exits. Returns ctx.Err() if context cancellation caused items to be dropped (discard mode or interrupted output send), nil otherwise. The WeightedBatcher has no fn, so there is no fail-fast error.
type WeightedBatcherStats ¶
type WeightedBatcherStats struct {
Received int64 // individual items consumed from src
Emitted int64 // individual Ok items included in emitted batches
Forwarded int64 // Err items forwarded downstream
Dropped int64 // items lost to shutdown/cancel (includes partial batch items)
BufferedDepth int64 // current items in partial batch accumulator
BufferedWeight int64 // current accumulated weight in partial batch
BatchCount int64 // number of batch results emitted
OutputBlockedTime time.Duration // cumulative time blocked sending to out
}
WeightedBatcherStats holds metrics for a WeightedBatcher.
Invariant (after Wait): Received = Emitted + Forwarded + Dropped.
func (WeightedBatcherStats) ToStats ¶ added in v0.80.0
func (s WeightedBatcherStats) ToStats() Stats
ToStats converts WeightedBatcherStats to Stats for use with the analyze package. Maps: Received→Received, Emitted→Submitted, BatchCount→Completed, Forwarded→Forwarded, Dropped→Dropped, BufferedDepth→BufferedDepth, OutputBlockedTime→OutputBlockedTime.