toc

package
v0.95.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2026 License: MIT Imports: 19 Imported by: 0

README

toc

Constrained stage runner inspired by Drum-Buffer-Rope (Theory of Constraints). Process items through a known bottleneck with bounded concurrency, backpressure, and constraint-centric stats.

stage := toc.Start(ctx, processChunk, toc.Options[Chunk]{Capacity: 10})

go func() {
    defer stage.CloseInput() // submitter owns closing input

    for _, chunk := range chunks {
        if err := stage.Submit(ctx, chunk); err != nil {
            break
        }
    }
}()

for result := range stage.Out() {
    val, err := result.Unpack()
    // handle result
}

err := stage.Wait()

DBR Background

If you already know DBR, skip to What It Adds.

In Goldratt's The Goal, a scout troop hike illustrates the constraint problem: the slowest hiker (Herbie) determines throughput for the whole group. Steps before the constraint can produce work faster than it can consume, so without limits the gap grows unboundedly.

Drum-Buffer-Rope (DBR) is the operational policy derived from this insight: the constraint's pace is the drum that sets the system's rhythm, a protective queue (the buffer) sits in front of the constraint so upstream stalls don't starve it, and a WIP limit (the rope) prevents upstream from outrunning the constraint.

DBR-inspired analogues in toc (approximate software analogues, not a literal factory-floor DBR implementation):

DBR Concept toc Analogue
Constraint (bottleneck) The stage's processing capacity — fn execution bounded by Workers
Drum (constraint's pace) The stage's processing pace, primarily shaped by fn and Workers (actual throughput also depends on downstream consumption)
Buffer (protective queue) Capacity — bounded input queue in front of the constrained step
Rope (WIP limit) MaxWIP — bounded admission to the stage. Submit blocks when admitted items reach the limit. Adjustable at runtime via SetMaxWIP
Constraint monitoring Stats — ServiceTime, IdleTime, OutputBlockedTime indicate constraint utilization and downstream pressure

The hiking analogy is from Goldratt, Eliyahu M. The Goal. North River Press, 1984. DBR applied to software in Tendon, Steve and Wolfram Müller. Hyper-Productive Knowledge Work Performance, Ch 18. J. Ross Publishing, 2015.

What It Adds Over Raw Channels

  • Bounded admission — Submit blocks when the buffer is full (the "rope")
  • Lifecycle contract — Submit → CloseInput → drain Out → Wait
  • Fail-fast default — first error cancels remaining work
  • Constraint stats — service time, idle time, output-blocked time, queue depth
  • Panic recovery — panics in fn become rslt.PanicError results with stack traces

Key Concepts

Capacity is the input buffer size. Zero means unbuffered (Submit blocks until a worker dequeues). Submit blocks when full — this is the backpressure mechanism.

Workers is the number of concurrent fn invocations. Default 1 (serial constraint — the common case). Adjustable at runtime via Stage.SetWorkers(n) — new workers start immediately, excess workers retire after completing their current item. Returns (applied, error), where error is ErrStopping after shutdown.

Submit's ctx is admission-only — it controls how long Submit blocks, not what context fn receives. fn always gets the stage context (derived from Start's ctx).

MaxWIP limits total admitted-but-not-completed items (buffered + in-flight). Default is Capacity + Workers (no additional constraint). Set statically via Options.MaxWIP or dynamically via Stage.SetMaxWIP(n). When admitted items reach the limit, Submit blocks until a worker completes — this is the "rope" that prevents upstream overproduction.

stage := toc.Start(ctx, processChunk, toc.Options[Chunk]{
    Capacity: 10,
    Workers:  4,
    MaxWIP:   6, // at most 6 items in the system at once
})

// Dynamic adjustment based on runtime signal:
stage.SetMaxWIP(3)  // tighten under memory pressure
stage.SetMaxWIP(10) // relax when pressure subsides

MaxWIPWeight adds a weight-based limit alongside count. Both limits are enforced simultaneously — count prevents zero-weight floods, weight prevents memory blowout from heavy items. Items with weight exceeding the limit are rejected immediately with ErrWeightExceedsLimit.

stage := toc.Start(ctx, processChunk, toc.Options[Chunk]{
    Capacity:     10,
    Workers:      4,
    MaxWIP:       6,
    MaxWIPWeight: 1 << 30, // 1 GiB weight budget
    Weight:       func(c Chunk) int64 { return int64(len(c.Data)) },
})

stage.SetMaxWIPWeight(512 << 20) // tighten to 512 MiB under pressure

SetMaxWIP wakes blocked Submits if the new limit is higher. Value is clamped to [1, Capacity+Workers]. Returns the applied value.

Rope design tradeoffs:

  • FIFO fairness — blocked Submits are granted in submission order. A new Submit cannot jump ahead of a queued waiter. This is enforced by granting under admissionMu before releasing the lock for new fast-path admissions.
  • Reservation before enqueue — a granted waiter holds a permit before its item enters the channel. If the waiter goroutine is slow to run (scheduler contention), capacity sits temporarily unused. This is the price of exact no-herd handoff — each slot is reserved for exactly one waiter.
  • Stats.Admitted is reserved permits, not active workers — includes buffered items, in-flight items, and granted-but-not-yet-enqueued waiters. Can exceed current MaxWIP after SetMaxWIP shrinks the limit (existing permits are not revoked). Use for observability, not hard-threshold alerting.
  • Permit lifetime includes output publish — a permit is held until the worker sends the result to Out(). If the consumer stops draining, permits are held and upstream blocks. This is the same liveness contract as the stage itself.

PauseAdmission / ResumeAdmission explicitly pause and resume new admissions. In-flight items complete normally. Queued waiters are held (not rejected) until resume. Use for memory pressure, downstream outage, or operator intervention.

stage.PauseAdmission()  // block all new Submit calls
// ... wait for pressure to subside ...
stage.ResumeAdmission() // wake held waiters

Stats.Paused reports the current state. CloseInput while paused rejects held waiters with ErrClosed.

Output must be drained. Workers block on the unbuffered output channel if nobody reads. Always drain Out() or use DiscardAndWait().

Reporter

NewReporter periodically logs per-stage stats and process memory.

reporter := toc.NewReporter(2 * time.Second)
reporter.AddStage("chunker", chunker.Stats)
reporter.AddStage("embedder", embedder.Stats)
go reporter.Run(ctx) // blocks until ctx canceled

Output (every 2 seconds):

[toc] mem: rss=1.2GiB go=750MiB | chunker: sub=1420 comp=1418 svc=12.3s idle=0.8s | embedder: sub=890 comp=888 depth=2
  • Run(ctx) blocks — call in a goroutine. Panics if called twice.
  • AddStage must be called before Run (frozen lifecycle).
  • RSS is Linux-only (from /proc/self/status); omitted on other platforms.
  • Provider panics are recovered and logged; reporting continues.
  • WithLogger(l) injects a *log.Logger (default: log.Default()).

Stats

stats := stage.Stats()
fmt.Printf("utilization: %v service / %v total\n",
    stats.ServiceTime,
    stats.ServiceTime + stats.IdleTime + stats.OutputBlockedTime)

Stats are approximate mid-flight (independent atomics, not a snapshot). Reliable as final values after Wait returns.

Allocation Tracking

Enable TrackAllocations to sample process-wide heap allocation counters around each fn invocation:

stage := toc.Start(ctx, processChunk, toc.Options[Chunk]{
    Capacity:         10,
    TrackAllocations: true,
})
// ... submit, drain, wait ...
stats := stage.Stats()
if stats.AllocTrackingActive {
    fmt.Printf("observed alloc bytes: %d, objects: %d\n",
        stats.ObservedAllocBytes, stats.ObservedAllocObjects)
}

ObservedAllocBytes and ObservedAllocObjects are cumulative heap allocation counters sampled via runtime/metrics before and after each fn call. They are approximate directional signals, not precise attribution:

  • Process-global: includes allocations by any goroutine during each fn window, not just the stage's own work.
  • Not additive: overlapping workers within the same stage can capture the same unrelated allocation. Per-stage totals can exceed actual process allocations.
  • Biased by service time: longer fn calls observe more background noise.
  • Zero when inactive: Both fields are zero when AllocTrackingActive is false — either because TrackAllocations was not set, or because the runtime does not support the required metrics.
  • Discoverability: Check Stats.AllocTrackingActive to distinguish "tracking not requested" from "tracking requested but unsupported" from "tracking active but fn allocated zero."

Best used to identify allocation-heavy stages under stable workload where the stage dominates allocations. For precise attribution, use go tool pprof with allocation profiling.

Overhead: on the order of 1µs per item in single-worker throughput benchmarks (two runtime/metrics.Read calls plus counter extraction and atomic accumulation). Negligible when fn does real work; roughly doubles overhead for no-op or sub-microsecond fns. Multi-worker contention on shared atomic counters may add cost. Silently disabled if the runtime does not support the required metrics.

Pipeline Composition

Pipe and NewBatcher compose stages into multi-stage pipelines with per-stage observability, error passthrough, and backpressure.

chunker  := toc.Start(ctx, chunkFile, Options{Workers: N, Capacity: N*2})
batched  := toc.NewBatcher(ctx, chunker.Out(), 64)
embedder := toc.Pipe(ctx, batched.Out(), embedBatch, Options{Workers: E})
storer   := toc.Pipe(ctx, embedder.Out(), storeBatch, Options{Workers: 1})

// feed the head stage
go func() {
    defer chunker.CloseInput()
    for _, file := range files {
        if err := chunker.Submit(ctx, file); err != nil {
            break
        }
    }
}()

// drain the tail
for r := range storer.Out() { ... }

// wait — reverse order recommended
storer.Wait(); embedder.Wait(); batched.Wait(); chunker.Wait()
Two Error Planes

Pipelines have two distinct error systems:

  1. Data-plane errorsrslt.Err[R] values in Out(). Per-item results. Pipeline continues processing other items. Forwarded upstream errors are always data-plane.

  2. Control-plane errors — stage execution failure via Wait() / Cause(). Terminal: "the stage itself failed." In fail-fast mode, the first fn error becomes control-plane.

Wait() returning nil does NOT mean all items succeeded — it means the stage didn't terminally fail. Check individual Out() results for item-level errors.

Pipe

Pipe creates a stage from an upstream <-chan rslt.Result[T]. Ok values go to workers; Err values pass through directly to the output (error passthrough). The feeder goroutine drains the source to completion (see Lifecycle Contract for preconditions).

The returned stage's input side is owned by the feeder — do not call Submit or CloseInput (both handled gracefully, but are misuse). External Submit calls void the stats invariant (Received will not account for externally submitted items).

Pipe stats: Received = Submitted + Forwarded + Dropped.

Batcher

NewBatcher accumulates up to n Ok items into []T batches. Errors act as batch boundaries: flush partial batch, forward error, start fresh. Each emitted batch is a fresh allocation (no aliasing).

Batcher stats: Received = Emitted + Forwarded + Dropped.

Batcher introduces up to n-1 items of hidden buffering. Downstream capacity counts batches, not original items.

WeightedBatcher

NewWeightedBatcher flushes when accumulated weight OR item count reaches threshold — whichever comes first. Each Ok item's weight is determined by weightFn func(T) int. The item-count fallback prevents unbounded accumulation of zero/low-weight items. weightFn must return non-negative values (negative panics).

Useful when items have variable cost (e.g., files with different text counts — batch until total texts >= 64, but also cap at 64 files regardless of weight).

WeightedBatcher stats: same as Batcher plus BufferedWeight (accumulated weight in partial batch). Invariant: Received = Emitted + Forwarded + Dropped.

Tee

NewTee broadcasts each item from a source channel to N branches (synchronous lockstep). The slowest consumer governs pace — this is intentional, not a limitation.

chunker := toc.Start(ctx, chunkFile, Options{Workers: N, Capacity: N*2})

tee := toc.NewTee(ctx, chunker.Out(), 2)

ftsRebuilder := toc.Pipe(ctx, tee.Branch(0), rebuildFTS, Options{Workers: 1})
hnswFinalizer := toc.Pipe(ctx, tee.Branch(1), finalizeHNSW, Options{Workers: 1})

go func() {
    defer chunker.CloseInput()
    for _, file := range files {
        if err := chunker.Submit(ctx, file); err != nil {
            break
        }
    }
}()

// drain both tails
go func() { for range hnswFinalizer.Out() {} }()
for r := range ftsRebuilder.Out() { ... }

// wait — reverse order
ftsRebuilder.Wait(); hnswFinalizer.Wait(); tee.Wait(); chunker.Wait()

Contract: Synchronous lockstep broadcast, not independent fan-out. No branch isolation — one branch stalling stalls all branches. No fairness — branch 0 always gets first send.

No deep copy: Tee does not clone payloads. Reference-containing payloads (pointers, slices, maps) may alias across branches. Consumers must treat received values as read-only; mutation after receipt is a data race.

Liveness (downstream): All branch consumers must drain their branch or cancel the shared context. An undrained branch blocks Tee and stalls all branches.

Liveness (upstream): On cancellation, Tee drains src until src is closed. Branch channels stay open until upstream closes. Same source ownership rule as Batcher and Pipe.

Per-branch stats: BranchDelivered[i] and BranchBlockedTime[i] identify which branch is the bottleneck. Aggregate stats: Received = FullyDelivered + PartiallyDelivered + Undelivered (after Wait).

When to use Tee vs manual channel wiring: Use Tee when you need broadcast with stats and lifecycle management. Use manual channels when branches have different types or when you need custom routing logic.

Merge

NewMerge recombines multiple upstream Result channels into a single nondeterministic stream (fan-in). One goroutine per source, all forwarding to a shared unbuffered output channel.

chunker  := toc.Start(ctx, chunkFile, Options{Workers: N, Capacity: N*2})
tee      := toc.NewTee(ctx, chunker.Out(), 2)
ftsPipe  := toc.Pipe(ctx, tee.Branch(0), rebuildFTS, Options{Workers: 1})
hnswPipe := toc.Pipe(ctx, tee.Branch(1), finalizeHNSW, Options{Workers: 1})
merged   := toc.NewMerge(ctx, ftsPipe.Out(), hnswPipe.Out())
storer   := toc.Pipe(ctx, merged.Out(), storeBatch, Options{Workers: 1})

// feed, drain tail, wait reverse order

Not the inverse of Tee. Tee broadcasts identical items to all branches. Merge interleaves distinct items from independent sources. Tee → ... → Merge does not restore original ordering and does not correlate outputs from sibling branches.

Per-source order preserved. Items from each individual source appear in the merged output in the same order they were received from that source. Cross-source order is nondeterministic.

Source ownership: Each source must be distinct and exclusively owned by the Merge. Passing the same channel twice creates two goroutines racing on one source. Each source is drained to completion by its own goroutine. Sources may close at different times — early closure of one does not affect others.

Cancellation: Advisory, not a hard stop. At most 1 item per source may forward after cancel, then discard mode. Wait() blocks until all sources close — cancellation alone does not guarantee prompt return. Wait() may return nil even after cancellation if no goroutine observed it on a checked path.

Liveness: Consumer must drain Out() or cancel the shared context. If the consumer stops reading without canceling, all source goroutines block on the shared output send.

Per-source stats: SourceReceived[i], SourceForwarded[i], SourceDropped[i] track each source's contribution. Aggregates are derived by summing per-source slices. Invariant (after Wait): Received = Forwarded + Dropped.

Join

NewJoin recombines results from two upstream channels into one combined output (strict branch recombination). Uses the first item from each source for join semantics (combine or error), then drains all remaining items from both sources.

chunker  := toc.Start(ctx, chunkFile, Options{Workers: N, Capacity: N*2})
tee      := toc.NewTee(ctx, chunker.Out(), 2)
extDocs  := toc.Pipe(ctx, tee.Branch(0), extractDocs, Options{Workers: 1})
callGraph := toc.Pipe(ctx, tee.Branch(1), buildCallGraph, Options{Workers: 1})
joined   := toc.NewJoin(ctx, extDocs.Out(), callGraph.Out(), combine)
resolver := toc.Pipe(ctx, joined.Out(), resolveEdges, Options{Workers: 1})

// feed, drain tail, wait reverse order

Error matrix: Ok/Ok → combine via fn. Ok/Err or Err/Ok → forward the error, discard the other. Err/Err → errors.Join preserving both. Missing item (source closes empty) → MissingResultError. Missing+Err → errors.Join(err, MissingResultError). Both missing → no output.

Contract: "At most one" — each source is expected to produce exactly 1 item. Missing items (0) and extra items (2+) are contract violations handled gracefully, not panics.

Structural mismatches visible in stats: ExtraA/ExtraB count items beyond the first, drained after the join decision. DiscardedA/DiscardedB count first items that weren't combined (error path, cancel, panic). Post-decision items are always classified as Extra, even if cancellation later prevents result delivery. Conservation invariant (after Wait): ReceivedA = Combined + DiscardedA + ExtraA.

fn contract: func(A, B) R — pure, synchronous combiner. No context, no error return. Panics recovered as PanicError. If combining can fail, use a downstream Pipe for the error-capable transform.

Cancellation: Advisory. On cancel, consumed items are discarded and both sources are drained. Wait() returns the latched context error. Wait() may return nil after cancellation if no goroutine observed it on a checked path.

Liveness: Consumer must drain Out() or cancel the shared context. Both sources must eventually close for the goroutine to exit.

Lifecycle Contract

Source ownership: Pipe, Batcher, WeightedBatcher, Tee, Merge, and Join drain their source(s) to completion. This requires two conditions: (1) the consumer drains Out() or ctx is canceled (downstream liveness), and (2) the upstream source eventually closes (upstream completion). Cancellation solves downstream liveness — it unblocks output sends so the operator can continue draining. It does not force-close the source. If the source never closes, the operator blocks in drain/discard mode indefinitely. After cancellation, all switch to discard mode (continue reading source, discard items). If the consumer stops reading and ctx is never canceled, the operator blocks on output delivery and cannot drain its source.

Cancellation: Fail-fast is stage-local — it cancels only the stage, not upstream. For pipeline-wide shutdown, cancel the shared parent context. This favors deterministic draining over aggressive abort.

Best-effort passthrough: Error passthrough and batch emission use cancel-aware sends (select on ctx). During shutdown, a send may race with cancellation — either branch may win. This means: (1) output may still appear on Out() after cancellation if the send case wins, and (2) upstream errors may be dropped instead of forwarded if the cancel case wins. All drops are reflected in stats. During normal operation, all items are delivered.

Construction order (Tee): NewTee starts immediately. All branch consumers should be wired before upstream produces items. With unbuffered branch channels, if a branch is not yet being read, Tee blocks on that branch's send.

Drain order: Drain only the tail stage's Out(). Each Pipe/Batcher/Tee/Merge/Join drains its upstream internally. After tail Out() closes, Wait() may be called in any order. Reverse order is recommended.

Ordering: No ordering guarantee with Workers > 1. With Workers == 1, worker results preserve encounter order. However, forwarded errors bypass the worker queue, so in Pipe stages they may arrive before buffered worker results regardless of worker count.

When to Use Pipe vs hof.PipeErr

Use hof.PipeErr when transforms are cheap, one worker pool is enough, and per-step observability is unnecessary.

Use toc.Pipe when steps have different throughput/latency profiles, independent worker counts are needed, per-stage capacity matters, or you need to identify the bottleneck.

Documentation

Overview

Package toc provides a constrained stage runner inspired by Drum-Buffer-Rope (Theory of Constraints), with pipeline composition via Pipe, NewBatcher, NewTee, and NewMerge.

A Stage owns a bounded input queue and one or more workers with bounded concurrency. Producers submit items via Stage.Submit; the stage processes them through fn and emits results on Stage.Out. When the stage is saturated, Submit blocks — this is the "rope" limiting upstream WIP.

The stage tracks constraint utilization, idle time, and output-blocked time via Stage.Stats, helping operators assess whether this stage is acting as the constraint and whether downstream backpressure is suppressing throughput.

Single-Stage Lifecycle

  1. Start a stage with Start
  2. Submit items with Stage.Submit from one or more goroutines
  3. Call Stage.CloseInput when all submissions are done (see below)
  4. Read results from Stage.Out until closed — must drain to completion
  5. Call Stage.Wait (or Stage.Cause) to block until shutdown completes Or combine steps 4-5: Stage.DiscardAndWait / Stage.DiscardAndCause

The goroutine or coordinator that knows no more submissions will occur owns CloseInput. In single-producer code, deferring CloseInput in the submitting goroutine is a good safety net. With multiple producers, a coordinator should call CloseInput after all producers finish (e.g., after a sync.WaitGroup). CloseInput is also called internally on fail-fast error or parent context cancellation, so the input side closes automatically on abnormal paths.

Cardinality: under the liveness conditions below, every Stage.Submit that returns nil yields exactly one rslt.Result on Stage.Out. Submit calls that return an error produce no result.

Operational notes: callers must drain Stage.Out until closed, or use Stage.DiscardAndWait / Stage.DiscardAndCause. If callers stop draining Out, workers block on result delivery and Stage.Wait / Stage.Cause may never return. If fn blocks forever or ignores cancellation, the stage leaks goroutines and never completes. See Stage.Out for full liveness details. Total stage WIP (item count) is up to Capacity (buffered) + Workers (in-flight). See Stage.Cause for terminal-status semantics (Wait vs Cause). See Stage.Wait for completion semantics.

Pipeline Composition

Pipe composes stages by reading from an upstream Result channel, forwarding Ok values to workers and passing Err values directly to the output (error passthrough). NewBatcher accumulates items into fixed-count batches between stages. NewWeightedBatcher accumulates items into weight-based batches (flush when accumulated weight reaches threshold). NewTee broadcasts each item to N branches (synchronous lockstep — slowest consumer governs pace). NewMerge recombines multiple upstream Result channels into a single nondeterministic stream (fan-in). NewJoin recombines two branch results into one combined output (strict branch recombination — one item from each source, combined via a function).

Pipelines have two error planes: data-plane errors (per-item rslt.Err in Stage.Out) and control-plane errors (Stage.Wait / Stage.Cause). Forwarded upstream errors are data-plane only — they never trigger fail-fast in the downstream stage.

See the package README for pipeline lifecycle contract, cancellation topology, and selection rubric (call.MapErr vs toc.Pipe).

This package is for pipelines with a known bottleneck stage. If you don't know your constraint, profile first.

Example
package main

import (
	"context"
	"fmt"

	"github.com/binaryphile/fluentfp/toc"
)

func main() {
	// double concatenates a string with itself.
	double := func(_ context.Context, s string) (string, error) {
		return s + s, nil
	}

	ctx := context.Background()

	// Start's ctx governs stage lifetime; Submit's ctx bounds only admission.
	stage := toc.Start(ctx, double, toc.Options[string]{Capacity: 3, Workers: 1})

	go func() {
		defer stage.CloseInput()

		for _, item := range []string{"a", "b", "c"} {
			if err := stage.Submit(ctx, item); err != nil {
				break
			}
		}
	}()

	for result := range stage.Out() {
		val, err := result.Unpack()
		if err != nil {
			fmt.Println("error:", err)
			continue
		}
		fmt.Println(val)
	}

	if err := stage.Wait(); err != nil {
		fmt.Println("stage error:", err)
	}

}
Output:

aa
bb
cc
Example (Pipe)

Example_pipe demonstrates basic error passthrough through a Pipe stage.

package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/binaryphile/fluentfp/rslt"
	"github.com/binaryphile/fluentfp/toc"
)

func main() {
	ctx := context.Background()

	src := make(chan rslt.Result[int], 3)
	src <- rslt.Ok(10)
	src <- rslt.Err[int](errors.New("oops"))
	src <- rslt.Ok(20)
	close(src)

	// doubleFn doubles the input.
	doubleFn := func(_ context.Context, n int) (int, error) {
		return n * 2, nil
	}

	stage := toc.Pipe(ctx, src, doubleFn, toc.Options[int]{})

	for r := range stage.Out() {
		if v, err := r.Unpack(); err != nil {
			fmt.Println("error:", err)
		} else {
			fmt.Println(v)
		}
	}

	stage.Wait()

	// Forwarded errors bypass the worker queue, so the error
	// may arrive before queued Ok results complete.

}
Output:

error: oops
20
40
Example (Pipeline)

Example_pipeline demonstrates a four-handle pipeline modeled on the era-indexer: Start → Batcher → Pipe → Pipe, with error passthrough and reverse-order Wait.

package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/binaryphile/fluentfp/toc"
)

func main() {
	ctx := context.Background()

	// Stage 1: "chunk" each number into a string representation.
	chunkFn := func(_ context.Context, n int) (string, error) {
		if n == 3 {
			return "", errors.New("bad input: 3")
		}

		return fmt.Sprintf("chunk(%d)", n), nil
	}
	chunker := toc.Start(ctx, chunkFn, toc.Options[int]{Capacity: 5, ContinueOnError: true})

	// Stage 2: Batch strings into groups of 2.
	batched := toc.NewBatcher(ctx, chunker.Out(), 2)

	// Stage 3: "embed" each batch by joining.
	embedFn := func(_ context.Context, batch []string) (string, error) {
		result := ""
		for i, s := range batch {
			if i > 0 {
				result += "+"
			}
			result += s
		}

		return fmt.Sprintf("embed[%s]", result), nil
	}
	embedder := toc.Pipe(ctx, batched.Out(), embedFn, toc.Options[[]string]{})

	// Stage 4: "store" by uppercasing (identity for this example).
	storeFn := func(_ context.Context, s string) (string, error) {
		return fmt.Sprintf("store(%s)", s), nil
	}
	storer := toc.Pipe(ctx, embedder.Out(), storeFn, toc.Options[string]{})

	// Feed the head stage.
	go func() {
		defer chunker.CloseInput()
		for _, n := range []int{1, 2, 3, 4, 5} {
			if err := chunker.Submit(ctx, n); err != nil {
				break
			}
		}
	}()

	// Drain the tail.
	for r := range storer.Out() {
		if v, err := r.Unpack(); err != nil {
			fmt.Println("error:", err)
		} else {
			fmt.Println(v)
		}
	}

	// Wait in reverse order (recommended).
	storer.Wait()
	embedder.Wait()
	batched.Wait()
	chunker.Wait()

	// Forwarded errors bypass worker queues, so the error from item 3
	// may arrive before the batch containing items 1-2 is processed.

}
Output:

error: bad input: 3
store(embed[chunk(1)+chunk(2)])
store(embed[chunk(4)+chunk(5)])

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("toc: stage closed")

ErrClosed is returned by Stage.Submit when the stage is no longer accepting input — after Stage.CloseInput, fail-fast shutdown, or parent context cancellation. See Stage.Submit for race semantics.

View Source
var ErrStopping = errors.New("toc: stage is stopping")

ErrStopping is returned by SetWorkers when the stage is stopping or stopped.

View Source
var ErrWeightExceedsLimit = errors.New("toc: item weight exceeds MaxWIPWeight")

ErrWeightExceedsLimit is returned by Stage.Submit when the item's weight exceeds the current [Options.MaxWIPWeight]. The item is rejected immediately and never enqueued.

Functions

func Adapt added in v0.84.0

func Adapt(name string, prev, curr Stats, elapsed time.Duration) core.StageObservation

Adapt converts a pair of consecutive Stats snapshots into a core.StageObservation for the deterministic analyzer. The elapsed duration is the wall-clock time between the two snapshots.

Work units are nanoseconds. CapacityWork is approximated as avgWorkers × elapsed (same approximation as the legacy analyzer). For exact CapacityWork, use a simulation adapter that tracks integrated worker-time per tick.

func BufferCapacity added in v0.82.0

func BufferCapacity(throughput float64, protectionTime time.Duration) int

BufferCapacity computes how many items a buffer needs to hold to protect the drum from starvation over a given protection window. Goldratt's formula: ceil(throughput × protectionTime).

The caller provides protectionTime — it might come from upstream lead time measurement, drum P95, or a configured value.

Returns 0 if throughput <= 0 or protectionTime <= 0.

func FromChan added in v0.53.0

func FromChan[T any](ch <-chan T) <-chan rslt.Result[T]

FromChan converts a plain channel into a Result channel suitable for use with NewTee, Pipe, and other toc operators. Each value is wrapped in rslt.Ok. The output channel closes when the input closes.

Types

type Batcher

type Batcher[T any] struct {
	// contains filtered or unexported fields
}

Batcher accumulates up to n Ok items from an upstream Result channel into batches, emitting each batch as rslt.Result[[]T]. Errors act as batch boundaries: the partial batch is flushed, then the error is forwarded, and a fresh accumulator starts.

Created by NewBatcher. The zero value is not usable.

func NewBatcher

func NewBatcher[T any](
	ctx context.Context,
	src <-chan rslt.Result[T],
	n int,
) *Batcher[T]

NewBatcher creates a Batcher that reads from src, accumulates up to n Ok values per batch, and emits batches on Out(). Errors from src act as batch boundaries: flush partial batch (if non-empty), forward the error, start fresh.

The Batcher drains src to completion (source ownership rule), provided the consumer drains Batcher.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src. After ctx cancellation, it switches to discard mode: continues reading src but discards all items without flushing partial batches. Batch emission and error forwarding are best-effort during shutdown — cancel may race with a successful send, so output may still appear after cancellation. All drops are reflected in stats. If the consumer stops reading Out and ctx is never canceled, the Batcher blocks on output delivery and cannot drain src.

Panics if n <= 0, src is nil, or ctx is nil.

func (*Batcher[T]) Out

func (b *Batcher[T]) Out() <-chan rslt.Result[[]T]

Out returns the receive-only output channel. It closes after the source channel closes and all batches have been emitted.

Callers MUST drain Out to completion. If the consumer stops reading, the Batcher blocks and cannot drain its source, potentially causing upstream deadlocks.

func (*Batcher[T]) Stats

func (b *Batcher[T]) Stats() BatcherStats

Stats returns approximate metrics. See BatcherStats for caveats. Stats are only reliable as final values after Batcher.Wait returns.

func (*Batcher[T]) Wait

func (b *Batcher[T]) Wait() error

Wait blocks until the Batcher goroutine exits. Returns ctx.Err() if context cancellation caused items to be dropped (discard mode or interrupted output send), nil otherwise. The Batcher has no fn, so there is no fail-fast error.

type BatcherStats

type BatcherStats struct {
	Received          int64         // individual items consumed from src
	Emitted           int64         // individual Ok items included in emitted batches
	Forwarded         int64         // Err items forwarded downstream
	Dropped           int64         // items lost to shutdown/cancel (includes partial batch items)
	BufferedDepth     int64         // current items in partial batch accumulator
	BatchCount        int64         // number of batch results emitted
	OutputBlockedTime time.Duration // cumulative time blocked sending to out
}

BatcherStats holds metrics for a Batcher.

Invariant (after Wait): Received = Emitted + Forwarded + Dropped.

func (BatcherStats) ToStats added in v0.80.0

func (s BatcherStats) ToStats() Stats

ToStats converts BatcherStats to Stats for use with the analyze package.

type BudgetAllocatorHandle added in v0.93.0

type BudgetAllocatorHandle struct {
	// contains filtered or unexported fields
}

BudgetAllocatorHandle is returned by BudgetAllocator. Provides the memctl.Watch callback and observable state.

func BudgetAllocator added in v0.93.0

func BudgetAllocator(
	stages []StageAllocation,
	drumName string,
	budgetFraction float64,
	constraintShare float64,
	logger *log.Logger,
) *BudgetAllocatorHandle

BudgetAllocator creates a memctl.Watch callback that distributes available memory budget across pipeline stages. The constraint (drum) gets a larger share; non-constraints get smaller shares.

budgetFraction controls total pipeline budget as a fraction of headroom. Each stage's allocation is budgetFraction × headroom × share.

constraintShare is the fraction of the total budget allocated to the drum stage. The remainder is split equally among non-drum stages. For example, constraintShare=0.5 with 4 stages: drum gets 50%, each of the 3 non-drum stages gets ~16.7%.

drumName identifies which stage is the constraint. If empty, budget is split equally across all stages.

func (*BudgetAllocatorHandle) Callback added in v0.93.0

func (h *BudgetAllocatorHandle) Callback() func(context.Context, memctl.MemInfo)

Callback returns the function to pass to memctl.Watch.

type BufferZone added in v0.74.0

type BufferZone int

BufferZone represents Goldratt's fever chart zone for buffer penetration.

const (
	// BufferGreen indicates the buffer is less than 33% full.
	// The constraint may be underutilized — rope may be too tight.
	BufferGreen BufferZone = iota

	// BufferYellow indicates the buffer is 33-66% full. Healthy operating zone.
	BufferYellow

	// BufferRed indicates the buffer is more than 66% full.
	// The constraint is about to starve — upstream can't keep up.
	BufferRed
)

func (BufferZone) String added in v0.74.0

func (z BufferZone) String() string

type FocusingStep added in v0.83.0

type FocusingStep int

FocusingStep represents which of Goldratt's Five Focusing Steps the pipeline is currently in.

The steps are not a state machine — they are a classification of the current system condition based on observable signals. Consumers call ClassifyStep per interval from their own coordination logic.

const (
	// StepIdentify is Step 1: identify the system's constraint.
	// No constraint has been identified yet — the analyzer is still
	// collecting data or no stage is saturated.
	StepIdentify FocusingStep = iota + 1

	// StepExploit is Step 2: exploit the constraint — don't waste it.
	// A constraint is identified but exploitation is incomplete: the
	// rope/buffer is not yet active, or the drum is starving.
	StepExploit

	// StepSubordinate is Step 3: subordinate everything else.
	// The constraint is identified, the rope is active, and the drum
	// is not starving. Non-constraints defer to the drum's pace.
	StepSubordinate

	// StepElevate is Step 4: elevate the constraint's capacity.
	// The rebalancer is actively moving resources to the constraint
	// (e.g., adding workers to the drum stage).
	StepElevate

	// StepPreventInertia is Step 5: if the constraint has moved,
	// go back to Step 1 — do not allow inertia to become the
	// system's constraint. The old rope must be rebuilt for the
	// new drum.
	//
	// Constraint migration protocol: cancel old [RopeController]
	// context, call [NewRopeController] with the new drum. EWMA
	// state starts fresh — old drum's signals are irrelevant.
	//
	// Migration safety: the new rope controls a prefix of the old
	// chain. If the constraint moved upstream (embed → walk), the
	// new rope limits head → walk WIP, which limits what reaches
	// embed. The old drum's protection is preserved by reduced supply.
	//
	// CAUTION: do not remove workers from the old drum during
	// migration. If embed was elevated (workers added) and then
	// those workers are moved to walk, embed loses capacity and
	// may become the constraint again. Migration simulation
	// (examples/migration-sim) demonstrated this: stealing one
	// worker caused embed queue to spike and throughput to drop.
	// The rebalancer should NOT steal workers from a recently-
	// elevated stage until the new constraint is confirmed stable
	// across multiple intervals.
	StepPreventInertia
)

func ClassifyStep added in v0.83.0

func ClassifyStep(
	prevConstraint string,
	currConstraint string,
	ropeActive bool,
	rebalancing bool,
	starving bool,
) FocusingStep

ClassifyStep determines the current focusing step from system state. Pure function — no side effects.

The caller provides prev and curr constraint names so the comparison logic lives with the classification. The caller tracks prevConstraint across intervals (typically one string variable updated per tick).

Classification priority (highest first):

  1. No constraint (currConstraint empty) → StepIdentify
  2. Constraint changed (prev ≠ curr, both non-empty) → StepPreventInertia
  3. Rebalancer active → StepElevate
  4. Drum starving or rope not active → StepExploit
  5. Otherwise → StepSubordinate

func (FocusingStep) String added in v0.83.0

func (s FocusingStep) String() string

type IncomingEdge added in v0.80.0

type IncomingEdge struct {
	From  string // predecessor stage name
	Ratio int    // consumption ratio (items from From per unit of output)
}

IncomingEdge describes one incoming edge to a stage.

type IntervalStats added in v0.74.0

type IntervalStats struct {
	Duration      time.Duration
	ResetDetected bool // true if any cumulative counter decreased

	// Interval deltas (cumulative counter differences).
	ItemsSubmitted int64
	ItemsCompleted int64 // includes failed+panicked (same as Stats.Completed)
	ItemsFailed    int64 // subset of completed
	ItemsCanceled  int64

	// Derived rates. Zero when Duration <= 0 or denominator is zero.
	Throughput        float64       // completed items/sec (includes failed)
	Goodput           float64       // successful completions/sec: (completed - failed) / elapsed
	ArrivalRate       float64       // submitted items/sec at this stage
	ErrorRate         float64       // failed / completed; bounded [0,1]
	MeanServiceTime   time.Duration // ServiceTimeDelta / ItemsCompleted
	ApproxUtilization float64       // ServiceTimeDelta / (Duration * avg workers); approximate

	// Interval time deltas (cumulative across all workers).
	ServiceTimeDelta   time.Duration
	IdleTimeDelta      time.Duration
	OutputBlockedDelta time.Duration

	// Point-in-time gauges from curr snapshot.
	CurrBufferedDepth     int64
	CurrQueueCapacity     int
	CurrBufferPenetration float64 // depth/capacity; clamped to [0,1]; 0 if unbuffered
	CurrActiveWorkers     int
	CurrTargetWorkers     int

	// Interval-derived queue signal.
	QueueGrowthRate float64 // items/sec; negative = draining
}

IntervalStats holds per-interval deltas between two Stats snapshots. Fields prefixed with Curr are point-in-time gauges from the curr snapshot, not interval-derived. All other fields are computed from the delta between prev and curr.

func Delta added in v0.74.0

func Delta(prev, curr Stats, elapsed time.Duration) IntervalStats

Delta computes interval stats between two Stats snapshots. elapsed is the wall-clock time between the two samples. Both snapshots should come from the same stage.

func (IntervalStats) BufferZone added in v0.74.0

func (s IntervalStats) BufferZone(yellowAt, redAt float64) BufferZone

BufferZone returns the Goldratt fever chart zone based on CurrBufferPenetration. The caller provides yellowAt and redAt penetration thresholds (0.0-1.0). Goldratt convention is 0.33/0.66.

type Join added in v0.44.0

type Join[R any] struct {
	// contains filtered or unexported fields
}

Join is a strict branch recombination operator: it uses the first item from each of two source channels for join semantics (combine or error), then drains all remaining items from both sources (source ownership rule).

Join is designed for recombining Tee branches — each branch is expected to produce exactly one result. Missing items (source closes without producing) and extra items (source produces more than one) are contract violations handled gracefully: missing items produce MissingResultError, extra items are drained and counted in stats.

Created by NewJoin. The zero value is not usable. A Join must not be copied after first use.

func NewJoin added in v0.44.0

func NewJoin[A, B, R any](
	ctx context.Context,
	srcA <-chan rslt.Result[A],
	srcB <-chan rslt.Result[B],
	fn func(A, B) R,
) *Join[R]

NewJoin creates a Join that uses the first item from each source for join semantics, then drains remaining items. Ok/Ok pairs are combined via fn; errors are forwarded. The result is emitted on Join.Out.

Error matrix:

  • Ok(a), Ok(b) → Ok(fn(a, b))
  • Ok(a), Err(e) → Err(e); a discarded
  • Err(e), Ok(b) → Err(e); b discarded
  • Err(ea), Err(eb) → Err(errors.Join(ea, eb))
  • Ok(a), missing → Err(MissingResultError{Source: "B"}); a discarded
  • missing, Ok(b) → Err(MissingResultError{Source: "A"}); b discarded
  • Err(e), missing → Err(errors.Join(e, MissingResultError{Source: "B"}))
  • missing, Err(e) → Err(errors.Join(MissingResultError{Source: "A"}, e))
  • missing, missing → no output

Each source is drained to completion (source ownership rule). Extra items beyond the first are counted in [JoinStats.ExtraA] / [JoinStats.ExtraB]. Sources may close at different times.

fn must be a pure, synchronous combiner — it must not block, perform I/O, or depend on cancellation. fn runs on the Join's only goroutine; a blocking fn prevents cancellation observation, source draining, and Join.Wait from returning. Similarly, if the consumer stops reading Join.Out without canceling, the goroutine blocks on the output send with the same consequences. If combining can fail or block, use a downstream Pipe for the error-capable, context-aware transform. Panics in fn are recovered as rslt.PanicError.

Cancellation is best-effort and observed only in the Phase 1 select and the output send. On ctx cancellation, consumed items are discarded and both sources are drained. A pre-send checkpoint catches already-observable cancellation before attempting the output send, but a result may still be emitted if the send races with cancellation (both select cases ready). Join.Wait returns the latched context error if cancellation was observed during collection or output; Join.Wait may return nil even if the context was canceled, if the goroutine completed without observing it. Drain is unconditional — sources must close for the goroutine to exit.

Panics if srcA, srcB, ctx, or fn is nil.

func (*Join[R]) Out added in v0.44.0

func (j *Join[R]) Out() <-chan rslt.Result[R]

Out returns the receive-only output channel. At most one result will appear on this channel.

The consumer MUST drain Out() to completion or cancel the shared context. If the consumer stops reading without canceling, the goroutine blocks on the output send.

Out() is idempotent — it always returns the same channel.

func (*Join[R]) Stats added in v0.44.0

func (j *Join[R]) Stats() JoinStats

Stats returns approximate metrics. See JoinStats for caveats. Stats are only reliable as final values after Join.Wait returns.

func (*Join[R]) Wait added in v0.44.0

func (j *Join[R]) Wait() error

Wait blocks until the goroutine exits and the output channel is closed. Returns a latched context error if cancellation was observed during collection or output. May return nil even if the context was canceled, if the goroutine completed without observing it (e.g., both sources closed before cancellation was checked).

Multiple Wait() calls are safe — subsequent calls return immediately with the same value.

type JoinStats added in v0.44.0

type JoinStats struct {
	ReceivedA  int64 // total items consumed from srcA
	ReceivedB  int64 // total items consumed from srcB
	Combined   int64 // successful fn(a,b) combinations (0 or 1)
	Errors     int64 // error results delivered to Out (0 or 1)
	DiscardedA int64 // A items consumed but not part of a successful combination (error, missing, cancel, panic)
	DiscardedB int64 // B items consumed but not part of a successful combination (error, missing, cancel, panic)
	ExtraA     int64 // A items beyond the first, drained after the join decision (contract violation)
	ExtraB     int64 // B items beyond the first, drained after the join decision (contract violation)

	OutputBlockedTime time.Duration // time blocked sending result to out
}

JoinStats holds metrics for a Join.

Fields are read from independent atomics, so a mid-flight Stats value is NOT a consistent snapshot. Invariants are guaranteed only after Join.Wait returns.

Conservation invariant (after Wait):

  • ReceivedA = Combined + DiscardedA + ExtraA
  • ReceivedB = Combined + DiscardedB + ExtraB
  • Combined + Errors <= 1

Counter precedence: DiscardedX counts only first items that were consumed but not successfully combined (error, cancel, panic, or other side missing). ExtraX counts items beyond the first, drained after the join decision is reached. Post-decision items are always classified as ExtraX, even if cancellation later prevents result delivery.

type LimitHandle added in v0.90.0

type LimitHandle struct {
	// contains filtered or unexported fields
}

LimitHandle is returned by LimitManager.Register. It scopes proposals to a single source and provides automatic cleanup via Close.

func (*LimitHandle) Close added in v0.90.0

func (h *LimitHandle) Close()

Close withdraws all proposals for this source. Safe to call multiple times. Should be deferred by controllers for cleanup.

func (*LimitHandle) ProposeCount added in v0.90.0

func (h *LimitHandle) ProposeCount(limit int)

ProposeCount sets a count-limit proposal for this source.

func (*LimitHandle) ProposeWeight added in v0.90.0

func (h *LimitHandle) ProposeWeight(limit int64)

ProposeWeight sets a weight-limit proposal for this source.

type LimitManager added in v0.87.0

type LimitManager struct {
	// contains filtered or unexported fields
}

LimitManager accepts named limit proposals from multiple sources and applies the tightest (min) per dimension to the stage.

Two dimensions: count (Stage.SetMaxWIP) and weight (Stage.SetMaxWIPWeight). Each source proposes independently. The effective limit is min across all active proposals.

A permanent "default" baseline prevents withdrawal of all controller proposals from removing limits entirely.

All proposals are hard upper bounds. Min is the correct composition rule for hard caps.

func NewLimitManager added in v0.87.0

func NewLimitManager(
	setCount func(int) int,
	setWeight func(int64) int64,
	defaultCount int,
	defaultWeight int64,
) *LimitManager

NewLimitManager creates a limit manager with construction defaults as baseline.

defaultCount is the stage's construction MaxWIP — always present as a permanent baseline proposal. defaultWeight is the construction MaxWIPWeight — added as baseline only if > 0 (0 means no weight limit configured).

setCount is typically stage.SetMaxWIP. setWeight is typically stage.SetMaxWIPWeight. Both must be non-nil.

func (*LimitManager) Effective added in v0.87.0

func (m *LimitManager) Effective() LimitSnapshot

Effective returns a snapshot of the current limits.

func (*LimitManager) ProposeCount added in v0.87.0

func (m *LimitManager) ProposeCount(source string, limit int)

ProposeCount sets a count-limit proposal for the named source. Recomputes and applies the effective count limit (min across sources). Panics if source is empty.

func (*LimitManager) ProposeWeight added in v0.87.0

func (m *LimitManager) ProposeWeight(source string, limit int64)

ProposeWeight sets a weight-limit proposal for the named source. Recomputes and applies the effective weight limit (min across sources). Panics if source is empty or "default" (reserved for baseline).

func (*LimitManager) Register added in v0.90.0

func (m *LimitManager) Register(source LimitSource) *LimitHandle

Register creates a LimitHandle for the given source. The handle provides scoped ProposeCount/ProposeWeight/Close methods. Defer handle.Close() for automatic cleanup when a controller exits.

Panics if source is empty or reserved.

func (*LimitManager) WithdrawCount added in v0.87.0

func (m *LimitManager) WithdrawCount(source string)

WithdrawCount removes a source's count proposal. The effective limit loosens to the next tightest (or baseline if none remain).

func (*LimitManager) WithdrawWeight added in v0.87.0

func (m *LimitManager) WithdrawWeight(source string)

WithdrawWeight removes a source's weight proposal.

type LimitSnapshot added in v0.87.0

type LimitSnapshot struct {
	EffectiveCount  int    // min across count proposals (always >= 1)
	EffectiveWeight int64  // min across weight proposals (>= 0 if any active, 0 if none)
	AppliedCount    int    // actual value returned by stage setter
	AppliedWeight   int64  // actual value returned by stage setter
	CountSource     string // which source is tightest for count
	WeightSource    string // which source is tightest for weight
	CountSources    int    // number of active count proposals
	WeightSources   int    // number of active weight proposals

	// Per-source proposals for debugging. Includes baseline as "default".
	CountProposals  map[string]int   // copy of all active count proposals
	WeightProposals map[string]int64 // copy of all active weight proposals
}

LimitSnapshot is a point-in-time view of effective limits.

type LimitSource added in v0.88.0

type LimitSource = string

LimitSource identifies a proposal source. Use the predefined constants for built-in controllers. Custom sources use any non-empty string.

const (
	// LimitSourceProcessingRope is the count-based processing rope.
	LimitSourceProcessingRope LimitSource = "processing-rope"

	// LimitSourceWeightRope is the weight-based processing rope.
	LimitSourceWeightRope LimitSource = "processing-weight-rope"

	// LimitSourceMemoryRope is the memory headroom rope.
	LimitSourceMemoryRope LimitSource = "memory-rope"
)
const (
	// LimitSourceBudgetAllocator is the source name for per-stage
	// weight budget proposals from the [BudgetAllocator].
	LimitSourceBudgetAllocator LimitSource = "budget-allocator"
)

type MemoryFeverZone added in v0.82.0

type MemoryFeverZone int

MemoryFeverZone classifies memory headroom.

const (
	// MemoryUnknown indicates the memory limit is unavailable or zero.
	MemoryUnknown MemoryFeverZone = iota

	// MemoryGreen indicates headroom is above the yellow threshold.
	MemoryGreen

	// MemoryYellow indicates headroom is between yellow and red thresholds.
	MemoryYellow

	// MemoryRed indicates headroom is below the red threshold.
	MemoryRed
)

func MemoryFever added in v0.82.0

func MemoryFever(headroom, limit uint64, yellowAt, redAt float64) MemoryFeverZone

MemoryFever classifies memory headroom into a fever zone. yellowAt and redAt are penetration thresholds (0.0-1.0), where penetration = 1.0 - (headroom / limit). Goldratt convention: 0.33/0.66. Returns MemoryUnknown if limit == 0.

func (MemoryFeverZone) String added in v0.82.0

func (z MemoryFeverZone) String() string

type MemoryRopeHandle added in v0.86.0

type MemoryRopeHandle struct {
	// contains filtered or unexported fields
}

MemoryRopeHandle is the result of MemoryRope. It provides a memctl.Watch callback and observable stats.

func MemoryRope added in v0.86.0

func MemoryRope(
	pipeline *Pipeline,
	drum string,
	limits *LimitManager,
	budgetFraction float64,
	relaxRate float64,
	logger *log.Logger,
) *MemoryRopeHandle

MemoryRope creates a memctl.Watch callback that adjusts the head stage's MaxWIPWeight based on available memory headroom. This is the second rope — it operates simultaneously with the processing RopeController. An item is released only when BOTH ropes allow it.

budgetFraction controls what fraction of available headroom is allocated as WIP weight budget (e.g., 0.4 means use 40% of headroom). The rest is reserved as a safety buffer.

The callback computes aggregate weight across upstream stages and sets the head's weight budget to:

headWeightBudget = (headroom × budgetFraction) - downstreamWeight

Asymmetric damping: tightening (lower budget) is applied instantly. Relaxing (higher budget) moves at relaxRate per callback — the fraction of the gap between current and target to close each tick. For example, relaxRate=0.2 closes 20% of the gap per callback. This prevents GC-cycle-driven oscillation while responding to real pressure immediately.

Returns a MemoryRopeHandle for stats. Pass handle.Callback() to memctl.Watch.

func (*MemoryRopeHandle) Callback added in v0.86.0

func (h *MemoryRopeHandle) Callback() func(context.Context, memctl.MemInfo)

Callback returns the function to pass to memctl.Watch.

func (*MemoryRopeHandle) Stats added in v0.86.0

func (h *MemoryRopeHandle) Stats() MemoryRopeStats

Stats returns a snapshot of the memory rope's current state.

type MemoryRopeStats added in v0.86.0

type MemoryRopeStats struct {
	Headroom    int64 // last observed headroom (bytes)
	Budget      int64 // headroom × budgetFraction (bytes)
	Weight      int64 // aggregate AdmittedWeight across upstream
	Applied     int64 // last value passed to setHeadWIPWeight
	Adjustments int64 // total callback invocations with valid headroom
}

MemoryRopeStats is a point-in-time snapshot of the memory rope.

type Merge added in v0.44.0

type Merge[T any] struct {
	// contains filtered or unexported fields
}

Merge is a nondeterministic interleaving fan-in from N sources.

One goroutine per source forwards items to a shared unbuffered output channel. Go runtime scheduler determines send order — no cross-source ordering guarantee, no fairness guarantee, no provenance tracking. Per-source order IS preserved: items from each individual source appear in the merged output in the same order they were received from that source (follows from one goroutine per source with sequential receive/send).

Merge is NOT the inverse of Tee. Tee broadcasts identical items to all branches. Merge interleaves distinct items from independent sources. Tee → ... → Merge does not restore original ordering, does not correlate outputs from sibling branches, and does not pair items across sources.

Created by NewMerge. The zero value is not usable. A Merge must not be copied after first use.

func NewMerge added in v0.44.0

func NewMerge[T any](ctx context.Context, sources ...<-chan rslt.Result[T]) *Merge[T]

NewMerge creates a Merge that reads from each source and forwards all items to a single unbuffered output channel. Items are interleaved nondeterministically — Go runtime scheduler determines send order.

Each source is drained to completion by its own goroutine (source ownership rule). Sources may close at different times — early closure of one source does not affect others. All sources must be finite and must eventually close (including on cancellation paths). If a source never closes, the corresponding goroutine blocks indefinitely and Merge.Wait hangs.

Cancellation is advisory, not a hard stop. On ctx cancellation, each source goroutine enters discard mode at its next cancellation checkpoint: stops forwarding but continues draining its source to completion. Two cancellation checkpoints per iteration: a non-blocking pre-send check and a blocking send-select. This bounds post-cancel forwarding to at most 1 item per source goroutine that has already passed the pre-send checkpoint. Cancel-aware sends ensure goroutines are not blocked on output when downstream stops reading. If a goroutine is blocked waiting on its source (for r := range src) when ctx cancels, it does not observe cancellation until the source produces an item or closes.

Merge.Wait returns only after all source goroutines exit — which requires all sources to close. Cancellation alone does not guarantee prompt return. After observing cancellation, each goroutine drains and discards remaining items from its source until that source closes.

Merge.Out is closed before done is closed, so Out() is guaranteed closed before Wait() returns. Callers can safely range Out() and then call Wait().

Goroutine lifecycle: constructor launches N source goroutines + 1 closer goroutine. Each source goroutine drains its source and sends to output. The closer goroutine waits on a WaitGroup for all source goroutines, closes output, and closes done.

Each source channel must be distinct and exclusively owned by the Merge. Passing the same channel twice creates two goroutines racing on one source — per-source ordering and stats become meaningless. The constructor does not check for duplicates.

Panics if len(sources) == 0, ctx is nil, or any source is nil.

func (*Merge[T]) Out added in v0.44.0

func (m *Merge[T]) Out() <-chan rslt.Result[T]

Out returns the receive-only output channel. All items from all sources appear on this single channel in nondeterministic order.

The consumer MUST drain Out() to completion or cancel the shared context. If the consumer stops reading without canceling, all source goroutines block on the shared output send and cannot drain their sources.

Out() is idempotent — it always returns the same channel.

func (*Merge[T]) Stats added in v0.44.0

func (m *Merge[T]) Stats() MergeStats

Stats returns approximate metrics. See MergeStats for caveats.

Per-source counters are loaded once into plain int64 slices, then aggregates are computed from those copied values. This guarantees single-call coherence: Received == sum(SourceReceived) within one Stats() return, even mid-flight.

Stats are only reliable as final values after Merge.Wait returns.

func (*Merge[T]) Wait added in v0.44.0

func (m *Merge[T]) Wait() error

Wait blocks until all source goroutines exit and the output channel is closed. Returns a latched context error if any source goroutine entered a cancel path (pre-send checkpoint or send-select), nil otherwise.

Wait may return nil even if ctx was canceled. This happens when no goroutine observes cancellation on a checked path — e.g., all sources close before any goroutine loops back to the pre-send check, or a goroutine is blocked in range src when cancel fires and the source closes without sending. This is intentional: the operator completed its work, cancellation had no observable effect on forwarding, and reporting it would be a false positive.

Multiple Wait() calls are safe — subsequent calls return immediately with the same value.

type MergeStats added in v0.44.0

type MergeStats struct {
	Received  int64 // total items consumed from all sources
	Forwarded int64 // items sent to output
	Dropped   int64 // items discarded during cancel

	// Per-source stats. Len == N (number of sources).
	// Index i corresponds to sources[i] as passed to NewMerge.
	SourceReceived  []int64
	SourceForwarded []int64
	SourceDropped   []int64
}

MergeStats holds metrics for a Merge.

Fields are derived from per-source atomic counters, so a mid-flight Stats value is NOT a consistent snapshot — cross-metric invariants (e.g., Received == Forwarded + Dropped) may not hold because an in-flight item has been counted as received but not yet as forwarded or dropped. Invariants are guaranteed only after Merge.Wait returns.

Per-metric aggregates are coherent within a single Stats() call even mid-flight: Received == sum(SourceReceived), Forwarded == sum(SourceForwarded), Dropped == sum(SourceDropped). This holds because aggregates are computed from the same copied values.

Invariant (after Wait): Received = Forwarded + Dropped. Per-source invariant (after Wait): SourceReceived[i] = SourceForwarded[i] + SourceDropped[i].

SourceReceived[i] corresponds to sources[i] as passed to NewMerge. The index mapping is stable and matches construction order.

type MissingResultError added in v0.44.0

type MissingResultError struct {
	Source string // "A" or "B"
}

MissingResultError indicates a source closed without producing a result. Callers can use errors.As to extract the Source field and determine which side was missing.

func (*MissingResultError) Error added in v0.44.0

func (e *MissingResultError) Error() string

type Observer added in v0.95.0

type Observer struct {
	// contains filtered or unexported fields
}

Observer produces atomic PipelineSnapshot values by sampling all registered stages at once. Unlike Reporter (which logs text), Observer returns structured data for dashboard consumers.

Create with NewObserver, register stages with Observer.AddStage, then call Observer.Snapshot to sample, or Observer.Run with a callback for periodic snapshots.

func NewObserver added in v0.95.0

func NewObserver(pipelineID string) *Observer

NewObserver creates an observer for the named pipeline.

func (*Observer) AddStage added in v0.95.0

func (o *Observer) AddStage(s ObserverStage)

AddStage registers a stage for observation. Must be called before Snapshot or Run. Panics if name is empty, stats is nil, or frozen.

func (*Observer) Run added in v0.95.0

func (o *Observer) Run(ctx context.Context, interval time.Duration, fn func(PipelineSnapshot))

Run calls fn with a snapshot every interval until ctx is canceled. Freezes configuration on first call. Panics if interval <= 0.

func (*Observer) RunWithTicker added in v0.95.0

func (o *Observer) RunWithTicker(ctx context.Context, ticks <-chan time.Time, fn func(PipelineSnapshot))

RunWithTicker is like Run but uses the provided tick channel. For testing.

func (*Observer) SetDiagnosis added in v0.95.0

func (o *Observer) SetDiagnosis(fn func() *core.Diagnosis)

SetDiagnosis sets an optional diagnosis provider. When set, each snapshot includes analyzer output and per-stage classification.

func (*Observer) Snapshot added in v0.95.0

func (o *Observer) Snapshot() PipelineSnapshot

Snapshot captures an atomic pipeline snapshot. All stages are sampled in registration order within a single call — no interleaving with other operations. Freezes configuration on first call.

type ObserverStage added in v0.95.0

type ObserverStage struct {
	Name      string       // stage name
	UnitLabel string       // item type for display (e.g., "files")
	Stats     func() Stats // stats provider
}

ObserverStage describes a stage registered with Observer.

type Options

type Options[T any] struct {
	// Capacity is the number of items the input buffer can hold.
	// Submit blocks when the buffer is full.
	// Zero means unbuffered: Submit blocks until a worker dequeues.
	// Negative values panic.
	Capacity int

	// MaxWIP is the maximum number of admitted items (buffered +
	// in-flight). Submit blocks when the limit is reached (the "rope").
	// Zero means default: Capacity + Workers (backward compatible).
	// Clamped to max(1, min(MaxWIP, Capacity + Workers)) at construction.
	// Adjust at runtime with [Stage.SetMaxWIP].
	MaxWIP int

	// MaxWIPWeight is the maximum accumulated weight of admitted items.
	// Zero means disabled (default, backward compatible). If > 0,
	// admission checks accumulated weight alongside count.
	// Adjust at runtime with [Stage.SetMaxWIPWeight].
	MaxWIPWeight int64

	// OversizePolicy controls behavior when an item's weight exceeds
	// MaxWIPWeight. Default ([OversizeReject]) rejects immediately.
	// [OversizeWait] blocks until the limit increases enough.
	OversizePolicy OversizePolicy

	// Weight returns the cost of item t for stats tracking only
	// ([Stats.InFlightWeight]). Does not affect admission — capacity
	// is count-based. Called on the Submit path, so must be cheap.
	// Must be pure, non-negative, and safe for concurrent calls.
	// If nil, every item costs 1.
	Weight func(T) int64

	// Workers is the number of concurrent fn invocations.
	// Zero means default: 1 (serial constraint — the common case).
	// Negative values panic.
	Workers int

	// ContinueOnError, when true, keeps processing after fn errors
	// instead of cancelling the stage. Default: false (fail-fast).
	ContinueOnError bool

	// TrackAllocations, when true, samples process-wide heap allocation
	// counters (runtime/metrics /gc/heap/allocs:bytes and :objects) before
	// and after each fn invocation and accumulates the deltas into
	// [Stats.ObservedAllocBytes] and [Stats.ObservedAllocObjects].
	//
	// Scope: each sample captures the invocation window of a single fn
	// call. Counters are process-global — they include allocations by any
	// goroutine during that window, not just the stage's own work.
	//
	// Concurrent over-attribution: with Workers > 1, overlapping
	// invocation windows can each capture the same unrelated allocation,
	// so per-stage totals can exceed actual process allocations. Totals
	// are also not additive across stages for the same reason.
	//
	// Overhead: on the order of 1µs per item in single-worker throughput
	// benchmarks (two runtime/metrics.Read calls plus counter extraction
	// and atomic accumulation). Negligible when fn does real work;
	// roughly doubles overhead for no-op or sub-microsecond fns.
	// Multi-worker contention on shared atomic counters may add cost.
	//
	// Default: false (disabled). Enable when diagnosing allocation-heavy
	// stages. Silently disabled if the runtime does not support the
	// required metrics (validated on first use via sync.Once).
	TrackAllocations bool

	// TrackServiceTimeDist, when true, records per-item service time
	// into a per-worker HDR histogram. Stats gains [ServiceTimeSummary]
	// with p50/p95/p99/min/max/mean/stddev. Cumulative since Start.
	// Operational telemetry with ~1% precision.
	TrackServiceTimeDist bool
}

Options configures a Stage.

Total stage WIP (item count) is up to Capacity (buffered) + Workers (in-flight). Capacity is always an item-count bound; [Options.Weight] affects stats only, not admission.

type OversizePolicy added in v0.90.0

type OversizePolicy int

OversizePolicy controls what happens when an item's weight exceeds the current MaxWIPWeight.

const (
	// OversizeReject rejects the item immediately with
	// [ErrWeightExceedsLimit]. Default behavior.
	OversizeReject OversizePolicy = iota

	// OversizeWait blocks until the weight limit increases enough to
	// admit the item (or context is canceled / stage closed). Use when
	// the rope controller may raise the limit dynamically.
	OversizeWait
)

type Pipeline added in v0.80.0

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline is a DAG topology descriptor for a toc pipeline. Configure with Pipeline.AddStage and Pipeline.AddEdge, then call Pipeline.Freeze. After Freeze, the Pipeline is immutable and safe for concurrent reads.

Pipeline does NOT own or create stages. It stores stage names and stats accessors only — a passive metadata layer over existing stages. Consumers use target-relative queries (Pipeline.AncestorsOf, Pipeline.HeadsTo, Pipeline.DirectPredecessors) to reason about upstream subgraphs relative to a chosen drum.

Panics on misconfiguration (empty names, nil stats, cycles, etc.) following the toc convention: configuration errors are programming bugs, caught on first run during development.

func NewPipeline added in v0.80.0

func NewPipeline() *Pipeline

NewPipeline creates an empty pipeline topology.

func (*Pipeline) AddEdge added in v0.80.0

func (p *Pipeline) AddEdge(from, to string)

AddEdge registers a directed edge from → to with a 1:1 consumption ratio. Both stages must already be registered. Panics if from or to is unknown, the edge is a duplicate, from == to (self-loop), or the pipeline is frozen.

func (*Pipeline) AddEdgeWithRatio added in v0.80.0

func (p *Pipeline) AddEdgeWithRatio(from, to string, ratio int)

AddEdgeWithRatio registers a directed edge with a consumption ratio. The ratio specifies how many items from 'from' are consumed to produce one unit of output at 'to' — the Bill of Materials ratio for this edge. For example, ratio=2 means 'to' consumes 2 items from 'from' per output. The rope uses this to release inputs at the correct ratio to maximize goodput at merge points.

Panics if ratio <= 0, from or to is unknown, the edge is a duplicate, from == to (self-loop), or the pipeline is frozen.

func (*Pipeline) AddStage added in v0.80.0

func (p *Pipeline) AddStage(name string, stats func() Stats)

AddStage registers a named stage with its stats accessor. Panics if name is empty, stats is nil, name is duplicate, or the pipeline is frozen.

func (*Pipeline) AncestorsOf added in v0.80.0

func (p *Pipeline) AncestorsOf(target string) []string

AncestorsOf returns all stages transitively upstream of the target, in BFS order (closest first). Excludes the target itself. Panics if name is unknown or pipeline is not frozen.

func (*Pipeline) DirectPredecessors added in v0.80.0

func (p *Pipeline) DirectPredecessors(name string) []string

DirectPredecessors returns the immediate upstream stages of the named stage. Panics if name is unknown or pipeline is not frozen.

func (*Pipeline) EdgeRatio added in v0.80.0

func (p *Pipeline) EdgeRatio(from, to string) int

EdgeRatio returns the consumption ratio for the edge from → to. Returns 1 for edges added with Pipeline.AddEdge. Returns the explicit ratio for edges added with Pipeline.AddEdgeWithRatio. Panics if the edge does not exist or pipeline is not frozen.

func (*Pipeline) Freeze added in v0.80.0

func (p *Pipeline) Freeze()

Freeze validates the topology and makes the pipeline read-only. Builds adjacency lists and computes heads (zero in-degree stages). Panics if the graph contains a cycle, has no stages, or Freeze was already called.

func (*Pipeline) HasPath added in v0.80.0

func (p *Pipeline) HasPath(from, to string) bool

HasPath returns true if there is a directed path from → to. Panics if either name is unknown or pipeline is not frozen.

func (*Pipeline) Heads added in v0.80.0

func (p *Pipeline) Heads() []string

Heads returns all zero in-degree stages (pipeline entry points), in registration order. Panics if not frozen.

func (*Pipeline) HeadsTo added in v0.80.0

func (p *Pipeline) HeadsTo(target string) []string

HeadsTo returns the subset of heads that can reach the target stage. If the pipeline has multiple entry points but only some feed the target (drum), only those heads are returned. Uses a single reverse BFS from target, then intersects with heads. Panics if target is unknown or pipeline is not frozen.

func (*Pipeline) Incoming added in v0.80.0

func (p *Pipeline) Incoming(name string) []IncomingEdge

Incoming returns the incoming edges to the named stage, each with its predecessor name and consumption ratio. The rope controller uses this to compute per-predecessor budget allocation at merge points. Panics if name is unknown or pipeline is not frozen.

func (*Pipeline) StageStats added in v0.80.0

func (p *Pipeline) StageStats(name string) func() Stats

StageStats returns the stats accessor for a named stage. Panics if name is unknown or pipeline is not frozen.

func (*Pipeline) Stages added in v0.80.0

func (p *Pipeline) Stages() []string

Stages returns stage names in registration order. Panics if not frozen.

type PipelineSnapshot added in v0.95.0

type PipelineSnapshot struct {
	// Pipeline identity.
	PipelineID string    // e.g., "file-index", "commit-index"
	At         time.Time // when the snapshot was taken

	// Per-stage data, ordered by registration.
	Stages []StageSnapshotEntry

	// Analyzer output (nil if no analyzer configured).
	Diagnosis *core.Diagnosis

	// System-level memory.
	RSS    uint64 // process RSS (bytes); 0 if unavailable
	RSSOK  bool   // true if RSS was successfully read
	GoHeap uint64 // Go runtime total memory (bytes)
}

PipelineSnapshot is an atomic point-in-time capture of all pipeline stages, system memory, and analyzer output. Produced by Observer. Exported and serializable for dashboard rendering, JSON export, or OTel integration.

type Reporter added in v0.66.0

type Reporter struct {
	// contains filtered or unexported fields
}

Reporter periodically logs pipeline stats and process memory. Create with NewReporter, register stages with [AddStage], then call [Run] to start logging.

Config is frozen after Run starts — AddStage panics if called after Run. Run panics if called twice.

Provider contract: functions passed to AddStage must be fast (< 1ms typical), non-blocking, and safe for concurrent calls. Panics are recovered and logged; hangs stall the reporting loop.

func NewReporter added in v0.66.0

func NewReporter(interval time.Duration, opts ...ReporterOption) *Reporter

NewReporter creates a reporter that logs every interval. Panics if interval <= 0.

func (*Reporter) AddStage added in v0.66.0

func (r *Reporter) AddStage(name string, fn func() Stats)

AddStage registers a named stage for periodic reporting. fn is typically a method value: r.AddStage("chunker", chunker.Stats). Must be called before [Run]. Panics if name is empty, fn is nil, or Run has already started.

func (*Reporter) Run added in v0.66.0

func (r *Reporter) Run(ctx context.Context)

Run blocks, logging every interval until ctx is canceled. Panics if called twice.

type ReporterOption added in v0.66.0

type ReporterOption func(*Reporter)

ReporterOption configures a Reporter.

func WithLogger added in v0.66.0

func WithLogger(l *log.Logger) ReporterOption

WithLogger sets the logger for reporter output. If l is nil, log.Default is used.

type RopeController added in v0.81.0

type RopeController struct {
	// contains filtered or unexported fields
}

RopeController is a periodic controller that bounds aggregate upstream WIP between the pipeline head and the drum (constraint) by adjusting the head stage's MaxWIP.

It computes rope length from drum goodput, upstream flow time, and a safety factor using a Little's Law heuristic. This is an approximate soft control — SetMaxWIP cannot revoke existing permits and has a floor of 1. After a target decrease, already-admitted items persist until completion.

Phase 3 scope: single head, linear chain (no branches between head and drum), count-based (not weight-aware).

Create with NewRopeController, configure with RopeOption functions, then call RopeController.Run.

func NewRopeController added in v0.81.0

func NewRopeController(
	pipeline *Pipeline,
	drum string,
	limits *LimitManager,
	stageSnapshot func(string) IntervalStats,
	interval time.Duration,
	opts ...RopeOption,
) *RopeController

NewRopeController creates a count-based rope controller.

The pipeline must be frozen and contain exactly one head feeding the drum via a linear chain (no branches or joins between head and drum).

limits is the LimitManager for the head stage. The controller proposes count limits via limits.ProposeCount("processing-rope", n). stageSnapshot returns the latest IntervalStats for a named stage.

Panics if pipeline is not frozen, drum is unknown, topology is not a single linear chain from head to drum, limits or stageSnapshot is nil, or interval <= 0.

func NewWeightRopeController added in v0.86.0

func NewWeightRopeController(
	pipeline *Pipeline,
	drum string,
	limits *LimitManager,
	stageSnapshot func(string) IntervalStats,
	interval time.Duration,
	opts ...RopeOption,
) *RopeController

NewWeightRopeController creates a weight-aware rope controller. Same as NewRopeController but limits aggregate WEIGHT between release and drum instead of item count. Items with variable processing cost are properly accounted.

limits is the LimitManager for the head stage. The controller proposes weight limits via limits.ProposeWeight("processing-weight-rope", n).

Same linear chain and single-head requirements as NewRopeController.

func (*RopeController) Run added in v0.81.0

func (rc *RopeController) Run(ctx context.Context)

Run blocks, adjusting rope length every interval until ctx is canceled. Panics if called twice.

func (*RopeController) RunWithTicker added in v0.81.0

func (rc *RopeController) RunWithTicker(ctx context.Context, ticks <-chan time.Time)

RunWithTicker is like RopeController.Run but uses the provided tick channel instead of creating a real ticker. For testing. Panics if called twice or after Run.

func (*RopeController) Stats added in v0.81.0

func (rc *RopeController) Stats() RopeStats

Stats returns a snapshot of the rope controller's current state. Safe for concurrent calls.

type RopeOption added in v0.81.0

type RopeOption func(*RopeController)

RopeOption configures a RopeController.

func WithInitialRopeLength added in v0.81.0

func WithInitialRopeLength(n int) RopeOption

WithInitialRopeLength sets the rope length used before the first valid goodput measurement. Default is 1 (conservative). Must be >= 1.

func WithRopeLogger added in v0.81.0

func WithRopeLogger(l *log.Logger) RopeOption

WithRopeLogger sets the logger. If nil, log.Default is used.

func WithRopeSafetyFactor added in v0.81.0

func WithRopeSafetyFactor(factor float64) RopeOption

WithRopeSafetyFactor sets the safety multiplier for rope length. Default is 1.5. Panics if factor <= 0.

type RopeStats added in v0.81.0

type RopeStats struct {
	RopeLength      int     // current computed rope length
	RopeWIP         int     // current aggregate WIP across upstream stages
	RopeUtilization float64 // WIP / Length; 0 if length is 0
	DrumGoodput     float64 // EWMA-smoothed drum goodput (items/sec)
	DrumErrorRate   float64 // EWMA-smoothed drum error rate
	AdjustmentCount int64   // how many times rope was adjusted
	HeadAppliedWIP  int     // last value returned by setHeadWIP
}

RopeStats is a point-in-time snapshot of the controller's state.

type ScalingHistory added in v0.86.0

type ScalingHistory struct {
	// contains filtered or unexported fields
}

ScalingHistory tracks empirical throughput at each worker count for a single stage. After [SetWorkers] changes the count, the caller records the observed throughput. The history detects diminishing returns: if adding a worker gained less than a threshold fraction of the previous gain, further scaling is unlikely to help.

Used by the rebalancer (Step 4: Elevate) to make evidence-based worker allocation decisions instead of linear projection.

func NewScalingHistory added in v0.86.0

func NewScalingHistory() *ScalingHistory

NewScalingHistory creates an empty scaling history.

func (*ScalingHistory) DiminishingReturns added in v0.86.0

func (h *ScalingHistory) DiminishingReturns(workers int, threshold float64) bool

DiminishingReturns returns true if the most recent scaling gain is below the threshold fraction. For example, threshold=0.05 means "less than 5% improvement from the last worker added."

Returns false if insufficient history (optimistic: assume scaling helps until proven otherwise).

func (*ScalingHistory) Len added in v0.86.0

func (h *ScalingHistory) Len() int

Len returns the number of recorded worker-count observations.

func (*ScalingHistory) Record added in v0.86.0

func (h *ScalingHistory) Record(workers int, throughput float64)

Record stores an observed throughput at the given worker count. Overwrites any previous observation at that count.

func (*ScalingHistory) Reset added in v0.86.0

func (h *ScalingHistory) Reset()

Reset clears all history. Use when the constraint moves or the stage's work profile changes fundamentally.

func (*ScalingHistory) ScalingGain added in v0.86.0

func (h *ScalingHistory) ScalingGain(workers int) (float64, bool)

ScalingGain returns the throughput gain from adding the last worker. Returns (gain, true) if both N and N-1 have observations. gain = (throughput[N] - throughput[N-1]) / throughput[N-1]. Returns (0, false) if insufficient history.

type ServiceTimeSummary added in v0.75.0

type ServiceTimeSummary struct {
	Count     int64
	Min       time.Duration
	Max       time.Duration
	Mean      time.Duration
	StdDev    time.Duration
	P50       time.Duration
	P95       time.Duration
	P99       time.Duration
	Underflow int64 // items with duration below recordable minimum (< 1ns)
	Overflow  int64 // items with duration above recordable maximum (> 10min)
}

ServiceTimeSummary holds the distribution of per-item service times. Cumulative since Start. Zero when [Options.TrackServiceTimeDist] is disabled or no items processed.

Operational telemetry with ~1% precision. For recent-only distribution, a future windowed mode will use per-worker histogram rotation.

type Stage

type Stage[T, R any] struct {
	// contains filtered or unexported fields
}

Stage is a running constrained stage. Created by Start. The zero value is not usable.

func Pipe

func Pipe[T, R any](
	ctx context.Context,
	src <-chan rslt.Result[T],
	fn func(context.Context, T) (R, error),
	opts Options[T],
) *Stage[T, R]

Pipe creates a stage that reads from an upstream Result channel, forwarding Ok values to fn via workers and passing Err values directly to the output (error passthrough). The feeder goroutine drains src to completion, provided the consumer drains Stage.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src.

Error passthrough is best-effort during shutdown: if ctx is canceled or fail-fast fires, upstream Err values may be dropped instead of forwarded (reflected in [Stats.Dropped]). During normal operation, all upstream errors are forwarded.

The returned stage's input side is owned by the feeder — do not call Stage.Submit or Stage.CloseInput directly. Both are handled gracefully (no panic, no deadlock) but are misuse. External Submit calls void the stats invariant (Received will not account for externally submitted items).

Stats: [Stats.Received] = [Stats.Submitted] + [Stats.Forwarded] + [Stats.Dropped]. Forwarded errors do not trigger fail-fast and do not affect Stage.Wait.

Panics if ctx is nil, src is nil, or fn is nil.

func Start

func Start[T, R any](
	ctx context.Context,
	fn func(context.Context, T) (R, error),
	opts Options[T],
) *Stage[T, R]

Start launches a constrained stage that processes items through fn.

The stage starts a cancel watcher goroutine that calls Stage.CloseInput when the context is canceled (either parent cancel or fail-fast). This ensures workers always eventually exit.

Panics if ctx is nil, fn is nil, Capacity is negative, or Workers is negative.

func (*Stage[T, R]) ActiveWorkers added in v0.71.0

func (s *Stage[T, R]) ActiveWorkers() int

ActiveWorkers returns the number of live (non-exited) workers. May differ from TargetWorkers during scale-down drain.

func (*Stage[T, R]) Cause

func (s *Stage[T, R]) Cause() error

Cause returns the latched terminal cause of the stage, blocking until shutdown completes. Like Stage.Wait, Cause does not initiate shutdown. Unlike Stage.Wait, Cause distinguishes all three outcomes:

  • nil: all items completed successfully (or ContinueOnError with no cancel)
  • fail-fast error: the first fn error that caused shutdown
  • parent cancel cause: context.Cause of the parent context at completion

The terminal cause is latched when the last worker finishes, so Cause is stable and idempotent — it returns the same value regardless of later parent context changes. If parent cancellation races with a worker error, Cause may return either depending on observation order.

A parent cancellation that occurs after fn returns but before the worker completes result handoff is reported as stage cancellation, even though all business-level computations succeeded. Use Stage.Wait if only fail-fast errors matter.

Requires Out to be drained (see Stage.DiscardAndCause when individual results are not needed).

func (*Stage[T, R]) CloseInput

func (s *Stage[T, R]) CloseInput()

CloseInput signals that no more items will be submitted. Workers finish processing buffered items, then shut down.

Linearization point: closedForAdmission=true under admissionMu. Any Submit whose acquireAdmission serializes after this point returns ErrClosed without creating a permit. A Submit already granted before this point may still succeed or roll back via trySend — that is a concurrent operation that linearized before close, not a post-close admission.

Close state is split across three signals:

  • closedForAdmission (admissionMu): authoritative gate for permit creation
  • closed (atomic): fast-path rejection in Submit and trySend
  • closing (channel): shutdown broadcast to blocked selects

These are set in order; closed/closing may lag closedForAdmission.

Blocks briefly until all in-flight Stage.Submit calls exit, then closes the input channel.

Idempotent — safe to call multiple times (use defer as safety net). Also called internally on fail-fast or parent context cancellation.

func (*Stage[T, R]) DisableMaxWIPWeight added in v0.87.0

func (s *Stage[T, R]) DisableMaxWIPWeight()

DisableMaxWIPWeight removes the weight-based WIP limit entirely. All items pass weight checks regardless of their weight. Concurrency-safe.

func (*Stage[T, R]) DiscardAndCause

func (s *Stage[T, R]) DiscardAndCause() error

DiscardAndCause drains all remaining results from Stage.Out and returns Stage.Cause's latched terminal cause. Use when individual results are not needed but composable terminal status is required.

Requires exclusive ownership of Stage.Out — must not be called while another goroutine is reading Out. Mixing DiscardAndCause with direct Out consumption causes a consumption race (results go to the wrong reader).

func (*Stage[T, R]) DiscardAndWait

func (s *Stage[T, R]) DiscardAndWait() error

DiscardAndWait drains all remaining results from Stage.Out and returns Stage.Wait's error. Use when individual results are not needed.

Requires exclusive ownership of Stage.Out — must not be called while another goroutine is reading Out. Mixing DiscardAndWait with direct Out consumption causes a consumption race (results go to the wrong reader).

func (*Stage[T, R]) Limits added in v0.90.0

func (s *Stage[T, R]) Limits() *LimitManager

Limits returns the stage's LimitManager for per-source limit composition. Multiple controllers can propose limits through the manager; the tightest (min) per dimension is applied.

Created lazily on first call. The manager is bound to this stage's SetMaxWIP and SetMaxWIPWeight, using the construction MaxWIP and MaxWIPWeight as permanent baselines.

This is the recommended path for rope controllers. Raw SetMaxWIP / SetMaxWIPWeight remain available for simple single-controller use.

func (*Stage[T, R]) MaxWIP added in v0.59.0

func (s *Stage[T, R]) MaxWIP() int

MaxWIP returns the current WIP limit.

func (*Stage[T, R]) MaxWIPWeight added in v0.68.0

func (s *Stage[T, R]) MaxWIPWeight() int64

MaxWIPWeight returns the current weight-based WIP limit. Negative means disabled (no weight limiting active).

func (*Stage[T, R]) Out

func (s *Stage[T, R]) Out() <-chan rslt.Result[R]

Out returns the receive-only output channel. It closes after all workers finish and all results have been sent.

Cardinality: every successful Stage.Submit produces exactly one result.

Ordering: with Workers == 1, results are delivered in submit order. With Workers > 1, result order is nondeterministic.

Callers MUST drain Out to completion (or use Stage.DiscardAndWait / Stage.DiscardAndCause):

for result := range stage.Out() {
    val, err := result.Unpack()
    // handle result — do NOT break out of this loop
}

After cancellation or fail-fast, Out may still emit: success results from work already in fn, ordinary error results from in-flight work, and canceled results for buffered items drained post-cancel. With Workers > 1, the fail-fast triggering error is not guaranteed to appear before cancellation results; use Stage.Wait or Stage.Cause for stage-level terminal status, not stream order.

If the consumer stops reading, workers block sending results, which prevents shutdown and causes Stage.Wait to hang — leaking goroutines, context resources, and the stage itself. This is the same contract as consuming from any Go channel-based pipeline.

func (*Stage[T, R]) PauseAdmission added in v0.65.0

func (s *Stage[T, R]) PauseAdmission()

PauseAdmission blocks all new admissions. In-flight items complete normally. Queued waiters are held (not rejected) until [ResumeAdmission]. Concurrency-safe. Idempotent.

func (*Stage[T, R]) Paused added in v0.65.0

func (s *Stage[T, R]) Paused() bool

Paused returns true if admission is paused via [PauseAdmission].

func (*Stage[T, R]) ResumeAdmission added in v0.65.0

func (s *Stage[T, R]) ResumeAdmission()

ResumeAdmission unblocks admission and wakes held waiters. Concurrency-safe. Idempotent.

func (*Stage[T, R]) SetMaxWIP added in v0.59.0

func (s *Stage[T, R]) SetMaxWIP(n int) int

SetMaxWIP dynamically adjusts the WIP limit. Wakes blocked Submits if the new limit is higher than the current one. The value is clamped to [1, Capacity+TargetWorkers]. The ceiling tracks dynamic worker count from Stage.SetWorkers, not the initial construction count. Returns the applied value. Concurrency-safe.

func (*Stage[T, R]) SetMaxWIPWeight added in v0.68.0

func (s *Stage[T, R]) SetMaxWIPWeight(n int64) int64

SetMaxWIPWeight dynamically adjusts the weight-based WIP limit. Zero means zero limit (blocks all positive-weight admission). Panics if n < 0 — use Stage.DisableMaxWIPWeight to disable. Wakes blocked Submits if the new limit allows more weight. Returns the applied value. Concurrency-safe.

func (*Stage[T, R]) SetWorkers added in v0.71.0

func (s *Stage[T, R]) SetWorkers(n int) (int, error)

SetWorkers adjusts the number of workers. New workers start immediately. Excess workers are cancelled LIFO and drain their current item before exiting. A cancelled worker MAY process one more item before exiting (Go select nondeterminism). n must be >= 1. Returns the applied count and nil, or (0, ErrStopping) if the stage is stopping/stopped. Concurrency-safe.

func (*Stage[T, R]) Stats

func (s *Stage[T, R]) Stats() Stats

Stats returns approximate metrics. See Stats for caveats. Stats are only reliable as final values after all Stage.Submit calls have returned and Stage.Wait returns.

func (*Stage[T, R]) Submit

func (s *Stage[T, R]) Submit(ctx context.Context, item T) error

Submit sends item into the stage for processing. Blocks when the buffer is full (backpressure / "rope"). Returns ErrClosed after Stage.CloseInput, fail-fast shutdown, or parent context cancellation.

If cancellation or CloseInput has already occurred before Submit is called, Submit deterministically returns ErrClosed without blocking. A Submit that is blocked or entering concurrently when shutdown fires may nondeterministically succeed or return an error, per Go select semantics — even a blocked Submit can succeed if capacity becomes available at the same instant. Items admitted during this window are processed normally (or canceled if the stage context is already done).

The ctx parameter controls only admission blocking — it is NOT passed to fn. The stage's own context (derived from the ctx passed to Start) is what fn receives. This means canceling a submitter's context does not cancel the item's processing once admitted.

Panics if ctx is nil (same as context.Context method calls). Panics if Weight returns a negative value. Note: a panic in Weight propagates to the caller (unlike fn panics, which are recovered and wrapped in rslt.PanicError). Safe for concurrent use from multiple goroutines. Safe to call concurrently with Stage.CloseInput (will not panic).

func (*Stage[T, R]) TargetWorkers added in v0.71.0

func (s *Stage[T, R]) TargetWorkers() int

TargetWorkers returns the current desired worker count.

func (*Stage[T, R]) Wait

func (s *Stage[T, R]) Wait() error

Wait blocks until all workers have finished and Out is closed. Wait does not initiate shutdown — call Stage.CloseInput first. Without CloseInput, Wait blocks forever if no more items are submitted. Requires that Out is drained concurrently (see Stage.Out). The stage is complete when all workers have finished, terminal status is latched, and the done channel closes.

Returns the first observed fail-fast error, or nil. Specifically:

  • In fail-fast mode: returns the first fn error that caused shutdown (nondeterministic among concurrent workers — first to acquire lock)
  • In ContinueOnError mode: always returns nil (check individual results)
  • On parent context cancellation: returns nil (caller already knows; errors returned by fn after parent cancel are not stored)

If parent cancellation races with a worker error, Wait may return either nil or the error depending on observation order. Use Stage.Cause for terminal status that distinguishes all three outcomes.

type StageAllocation added in v0.93.0

type StageAllocation struct {
	Name   string
	Limits *LimitManager
	Share  float64 // fraction of total budget (0.0-1.0)
}

StageAllocation describes a stage's share of the memory budget.

type StageSnapshotEntry added in v0.95.0

type StageSnapshotEntry struct {
	Name          string // stage name
	Order         int    // display ordering (registration index)
	UnitLabel     string // e.g., "files", "chunks", "commits"
	Stats         Stats  // full stats snapshot
	QueueDepth    int64  // BufferedDepth from Stats
	QueueCapacity int    // QueueCapacity from Stats
	Workers       int    // ActiveWorkers from Stats

	// Analyzer classification for this stage (empty if no analyzer).
	State core.StageState // Unknown/Healthy/Starved/Blocked/Saturated/Broken
}

StageSnapshotEntry captures one stage's state at snapshot time.

type Stats

type Stats struct {
	Submitted int64 // items accepted by Submit (successful return)
	Completed int64 // items where fn returned (includes Failed and Panicked)
	Failed    int64 // subset of Completed where result is error (includes Panicked)
	Panicked  int64 // subset of Failed where fn panicked
	Canceled  int64 // items dequeued but not passed to fn because cancellation was observed first (not in Completed)

	// Pipe-specific counters. Zero for Start-created stages.
	// Invariant (after Wait): Received = Submitted + Forwarded + Dropped.
	Received  int64 // items consumed from src by feeder (any Result)
	Forwarded int64 // upstream Err items sent directly to out (bypassed fn)
	Dropped   int64 // items seen but neither submitted nor forwarded (shutdown/cancel)

	ServiceTime       time.Duration // cumulative time fn was executing
	IdleTime          time.Duration // cumulative worker time waiting for input (includes startup and tail wait)
	OutputBlockedTime time.Duration // cumulative worker time blocked handing result to consumer (unbuffered out channel)

	BufferedDepth  int64 // approximate items in queue; may transiently be negative mid-flight; 0 when Capacity is 0 (unbuffered)
	InFlightWeight int64 // weighted cost of items currently in fn (stats-only, not admission)
	QueueCapacity  int   // configured capacity

	Paused              bool  // true if admission is paused via PauseAdmission
	MaxWIP              int   // current WIP limit (>= 1)
	MaxWIPWeight        int64 // current weight limit (0 when disabled; check MaxWIPWeightEnabled)
	MaxWIPWeightEnabled bool  // true when weight limiting is active
	AdmittedWeight      int64 // current accumulated weight of admitted items
	Admitted            int64 // reserved permits, not active workers. Includes buffered, in-flight, and reserved-but-not-yet-enqueued. Can exceed current MaxWIP after SetMaxWIP shrinks the limit (existing permits are not revoked) or during the brief grant-to-enqueue window. Not a hard invariant gauge — use for observability, not alerting thresholds.
	WaiterCount         int   // current number of Submits blocked on rope
	MaxWaiterCount      int   // high-water mark for waiter queue depth
	TargetWorkers       int   // desired worker count from SetWorkers
	ActiveWorkers       int   // currently running workers (may lag target during drain)
	WIPWaitCount        int64 // cumulative: total submissions that blocked on WIP limit (not current blocked count)
	WIPWaitNs           int64 // cumulative WIP-limit wait time (nanoseconds)

	ServiceTimeDist ServiceTimeSummary // zero if TrackServiceTimeDist disabled

	// AllocTrackingActive reports whether allocation sampling is
	// effectively enabled for this stage. False when
	// [Options.TrackAllocations] was not set, or when the runtime does
	// not support the required metrics. Allows callers to distinguish
	// "tracking requested but unsupported" from "tracking not requested"
	// or "tracking active but fn allocated zero."
	AllocTrackingActive bool

	// ObservedAllocBytes and ObservedAllocObjects are cumulative heap
	// allocation counters sampled via runtime/metrics around each fn
	// invocation. Zero when AllocTrackingActive is false.
	//
	// Process-global, not stage-exclusive: includes allocations by any
	// goroutine during each fn invocation window. With Workers > 1,
	// overlapping windows can capture the same unrelated allocation in
	// multiple workers, so per-stage totals can exceed actual process
	// allocations over the same period. Not additive across stages.
	// Biased upward by longer service times (more background noise).
	// Best used as a directional signal under stable workload where the
	// stage dominates allocations, not for precise attribution. For
	// exact allocation profiling, use go tool pprof.
	ObservedAllocBytes   uint64
	ObservedAllocObjects uint64
}

Stats holds approximate metrics for a Stage.

Fields are read from independent atomics, so a single Stats value is NOT a consistent snapshot — individual fields may reflect slightly different moments. Relationships like Submitted >= Completed + Canceled are not guaranteed mid-flight. Stats are only reliable as final values after all Stage.Submit calls have returned and Stage.Wait returns.

All durations are cumulative across all workers since Start. With Workers > 1, durations can exceed wall-clock time.

type Tee added in v0.43.0

type Tee[T any] struct {
	// contains filtered or unexported fields
}

Tee is a synchronous lockstep broadcast from one source to N branches.

Created by NewTee. The zero value is not usable. A Tee must not be copied after first use.

func NewTee added in v0.43.0

func NewTee[T any](ctx context.Context, src <-chan rslt.Result[T], n int) *Tee[T]

NewTee creates a Tee that reads from src and broadcasts each item to n unbuffered output branches. Items are sent to branches sequentially in index order — branch 0 first, then branch 1, etc. The slowest consumer governs pace (synchronous lockstep).

The Tee drains src to completion (source ownership rule), provided all branch consumers drain their branches or ctx is canceled, and src eventually closes. Cancellation unblocks branch sends but does not force-close src. After ctx cancellation, it switches to discard mode: continues reading src but discards all items. Branch sends are best-effort during shutdown — cancel may race with a successful send, so output may still appear after cancellation. All drops are reflected in stats. If a branch consumer stops reading and ctx is never canceled, the Tee blocks on that branch's send and stalls all branches.

Tee does not clone payloads. Reference-containing payloads (pointers, slices, maps) may alias across branches. Consumers must treat received values as immutable; mutation after receipt is a data race.

Panics if n <= 0, src is nil, or ctx is nil.

func (*Tee[T]) Branch added in v0.43.0

func (t *Tee[T]) Branch(i int) <-chan rslt.Result[T]

Branch returns the receive-only output channel for branch i.

Callers MUST drain all branches to completion or cancel the shared context. An undrained branch blocks the Tee and stalls all branches.

Panics if i is out of range [0, n).

func (*Tee[T]) Stats added in v0.43.0

func (t *Tee[T]) Stats() TeeStats

Stats returns approximate metrics. See TeeStats for caveats. Stats are only reliable as final values after Tee.Wait returns.

func (*Tee[T]) Wait added in v0.43.0

func (t *Tee[T]) Wait() error

Wait blocks until the Tee goroutine exits. Returns ctx.Err() if context cancellation caused items to be dropped (discard mode or interrupted branch send), nil otherwise.

type TeeStats added in v0.43.0

type TeeStats struct {
	Received           int64 // items consumed from src
	FullyDelivered     int64 // items sent to ALL branches
	PartiallyDelivered int64 // items sent to some branches before cancel (≥1, <N)
	Undelivered        int64 // items not sent to any branch (cancel before first send, or discard mode)

	// Per-branch stats. Len == N (number of branches).
	// BranchDelivered[i] = items successfully sent to branch i.
	// BranchBlockedTime[i] = cumulative time blocked sending to branch i.
	BranchDelivered   []int64
	BranchBlockedTime []time.Duration
}

TeeStats holds metrics for a Tee.

Fields are read from independent atomics, so a mid-flight Stats value is NOT a consistent snapshot — individual fields may reflect different moments. Invariants are guaranteed only after Tee.Wait returns.

Invariant (after Wait): Received = FullyDelivered + PartiallyDelivered + Undelivered.

PartiallyDelivered is at most 1 per Tee lifetime: once cancellation interrupts delivery mid-item, the goroutine enters discard mode and does not attempt delivery on subsequent items.

BranchBlockedTime[i] measures direct send-wait time on branch i. It does not measure end-to-end latency imposed by other branches. Because branches are sent in index order, earlier branches' blocked time reflects their consumer's speed directly; later branches' blocked time is near zero even if they are throttled by earlier branches.

type WeightedBatcher

type WeightedBatcher[T any] struct {
	// contains filtered or unexported fields
}

WeightedBatcher accumulates Ok items from an upstream Result channel into batches, flushing when accumulated weight OR item count reaches the threshold. Each item's weight is determined by weightFn. Errors act as batch boundaries: the partial batch is flushed, then the error is forwarded, and a fresh accumulator starts.

Created by NewWeightedBatcher. The zero value is not usable.

func NewWeightedBatcher

func NewWeightedBatcher[T any](
	ctx context.Context,
	src <-chan rslt.Result[T],
	threshold int,
	weightFn func(T) int,
) *WeightedBatcher[T]

NewWeightedBatcher creates a WeightedBatcher that reads from src, accumulates Ok values, and emits batches on Out(). A batch is flushed when either accumulated weight (per weightFn) reaches threshold or item count reaches threshold — whichever comes first. The item-count fallback prevents unbounded accumulation of zero/low-weight items. Errors from src act as batch boundaries: flush partial batch (if non-empty), forward the error, start fresh.

weightFn must return a non-negative weight for each item. A panic occurs if weightFn returns a negative value.

The WeightedBatcher drains src to completion (source ownership rule), provided the consumer drains WeightedBatcher.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src. After ctx cancellation, it switches to discard mode: continues reading src but discards all items without flushing partial batches. Batch emission and error forwarding are best-effort during shutdown — cancel may race with a successful send, so output may still appear after cancellation. All drops are reflected in stats. If the consumer stops reading Out and ctx is never canceled, the WeightedBatcher blocks on output delivery and cannot drain src.

Panics if threshold <= 0, weightFn is nil, src is nil, or ctx is nil.

func (*WeightedBatcher[T]) Out

func (b *WeightedBatcher[T]) Out() <-chan rslt.Result[[]T]

Out returns the receive-only output channel. It closes after the source channel closes and all batches have been emitted.

Callers MUST drain Out to completion. If the consumer stops reading, the WeightedBatcher blocks and cannot drain its source, potentially causing upstream deadlocks.

func (*WeightedBatcher[T]) Stats

func (b *WeightedBatcher[T]) Stats() WeightedBatcherStats

Stats returns approximate metrics. See WeightedBatcherStats for caveats. Stats are only reliable as final values after WeightedBatcher.Wait returns.

func (*WeightedBatcher[T]) Wait

func (b *WeightedBatcher[T]) Wait() error

Wait blocks until the WeightedBatcher goroutine exits. Returns ctx.Err() if context cancellation caused items to be dropped (discard mode or interrupted output send), nil otherwise. The WeightedBatcher has no fn, so there is no fail-fast error.

type WeightedBatcherStats

type WeightedBatcherStats struct {
	Received          int64         // individual items consumed from src
	Emitted           int64         // individual Ok items included in emitted batches
	Forwarded         int64         // Err items forwarded downstream
	Dropped           int64         // items lost to shutdown/cancel (includes partial batch items)
	BufferedDepth     int64         // current items in partial batch accumulator
	BufferedWeight    int64         // current accumulated weight in partial batch
	BatchCount        int64         // number of batch results emitted
	OutputBlockedTime time.Duration // cumulative time blocked sending to out
}

WeightedBatcherStats holds metrics for a WeightedBatcher.

Invariant (after Wait): Received = Emitted + Forwarded + Dropped.

func (WeightedBatcherStats) ToStats added in v0.80.0

func (s WeightedBatcherStats) ToStats() Stats

ToStats converts WeightedBatcherStats to Stats for use with the analyze package. Maps: Received→Received, Emitted→Submitted, BatchCount→Completed, Forwarded→Forwarded, Dropped→Dropped, BufferedDepth→BufferedDepth, OutputBlockedTime→OutputBlockedTime.

Directories

Path Synopsis
Package core provides a deterministic TOC constraint analyzer.
Package core provides a deterministic TOC constraint analyzer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL