Documentation
¶
Overview ¶
Package dispatch provides the BoundedDispatcher substrate primitive — a bounded-concurrency parallel worker pool with optional KV-twofer-aware completion handling.
What this is ¶
BoundedDispatcher is the framework-provided primitive for the "rules sequence, components parallelize" architecture (CLAUDE.md Orchestration Boundaries section). It's what components compose internally when they need to do parallel work over a known list of items — drone fleet weather-monitor walking all active missions, scenario-orchestrator dispatching ready requirements under DAG gating, manufacturing batch's per-widget station processing, semspec scenario-orchestrator's bounded-concurrency dispatch pattern.
BoundedDispatcher is NOT:
- A workflow engine (no DAG semantics, no branching, no lifecycle — see pkg/lifecycle for those)
- A rule-engine extension (rules don't gain new fan-out primitives; for_each at the rule layer is the at-the-rule- layer fan-out)
- A replacement for pkg/worker.Pool — it WRAPS it. New uses prefer BoundedDispatcher (higher-level, KV-twofer aware); existing pkg/worker.Pool consumers stay as-is.
Use when ¶
- A component does internal parallel work over a list of items
- Bounded concurrency is required (caller picks the worker count)
- Optionally: each work item completes async and the dispatcher should fire OnComplete when KV signals match
Do NOT use for ¶
- At-the-rule-layer fan-out (use rule engine's for_each instead)
- Sequential per-item processing (use a plain loop)
- Unbounded concurrency (use a bare goroutine pool)
Example usage (no completion watcher) ¶
d, err := dispatch.New(ctx, dispatch.Config[*Requirement]{
Workers: c.MaxConcurrent,
QueueSize: 256,
Process: c.processRequirement,
}, dispatch.Deps{
NATSClient: c.natsClient,
Logger: c.logger,
})
if err != nil {
return fmt.Errorf("dispatch new: %w", err)
}
defer func() {
if err := d.Stop(context.Background()); err != nil {
c.logger.Warn("dispatch stop", slog.String("error", err.Error()))
}
}()
for _, req := range filterReady(...) {
if err := d.Submit(req); err != nil {
// ErrQueueFull on overflow; caller chooses
// retry/drop/backpressure-propagate.
}
}
Example usage (with KV-twofer completion watcher) ¶
d, err := dispatch.New(ctx, dispatch.Config[*Requirement]{
Workers: c.MaxConcurrent,
QueueSize: 256,
Process: c.processRequirement,
CompletionKVBucket: "EXECUTION_STATES",
CompletionKeyForWorkItem: func(r *Requirement) string {
return "req." + r.Slug + "." + r.ID
},
OnComplete: c.onRequirementComplete,
}, deps)
In the completion-watcher mode, the dispatcher subscribes to the configured KV bucket BEFORE accepting any Submit. Each Submit registers a tracking entry keyed by CompletionKeyForWorkItem(work) before enqueuing to the underlying pool, so a completion-signal write that arrives between Submit and Process can't slip past the watcher.
Shutdown ¶
Stop drains the underlying worker.Pool first (blocks until in-flight Process calls complete), then stops the completion watcher. Callers waiting on KV-triggered OnComplete callbacks should ensure those complete before Stop — typically by canceling the caller's own context and letting Process see the cancellation.
See also ¶
- pkg/worker — the underlying worker pool (Pool[T])
- pkg/lifecycle — the workflow-shaped substrate that often pairs with BoundedDispatcher (component-internal fan-out over a Lifecycle workflow's instances)
- ADR-048 — the canonical decision
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidConfig is returned by New when the Config is // internally inconsistent (e.g. Workers <= 0, missing Process, // CompletionKVBucket set without CompletionKeyForWorkItem + // OnComplete). Catches wiring bugs at construction time. ErrInvalidConfig = errors.New("dispatch: invalid config") // ErrQueueFull is the underlying pkg/worker.ErrQueueFull // re-exported so callers can compare via errors.Is without // reaching into pkg/worker directly. ErrQueueFull = worker.ErrQueueFull // ErrStopped is the underlying pkg/worker.ErrPoolStopped // re-exported for the same reason. ErrStopped = worker.ErrPoolStopped // ErrNATSClientRequired is returned by New when // CompletionKVBucket is configured but Deps.NATSClient is nil // (no way to subscribe to the bucket). ErrNATSClientRequired = errors.New("dispatch: Deps.NATSClient is required when CompletionKVBucket is set") )
Package error sentinels. Callers compare with errors.Is.
Functions ¶
This section is empty.
Types ¶
type BoundedDispatcher ¶
type BoundedDispatcher[W any] struct { // contains filtered or unexported fields }
BoundedDispatcher is the framework's bounded-concurrency parallel work primitive. Components compose it for internal fan-out over known work lists; optionally with KV-twofer-aware completion handling when the work involves async writes by other processors.
Generic over work type W — work items can be any Go type. The underlying pkg/worker.Pool[W] enforces the bounded queue + non-blocking submit + graceful shutdown semantics; this wrapper adds the optional KV-twofer-completion overlay.
Concurrency: Submit is safe to call from concurrent goroutines (delegates to pool.Submit which is lock-free via channel send). Stop is idempotent. Stats reads atomic counters via the underlying pool.
func New ¶
New constructs a BoundedDispatcher with the given Config + Deps. Returns ErrInvalidConfig for missing required fields, or ErrNATSClientRequired when CompletionKVBucket is set without a natsclient.
If CompletionKVBucket is configured, New blocks briefly to resolve the bucket via natsclient and start the completion- watcher goroutine. Subsequent Submit calls register their tracking entries against the live watcher.
The pool is started immediately — callers don't need to call Start. Workers are running and ready to receive Submit calls when New returns.
func (*BoundedDispatcher[W]) Stats ¶
func (d *BoundedDispatcher[W]) Stats() worker.PoolStats
Stats returns current dispatcher statistics via the underlying pool. Thread-safe; reads atomic counters.
func (*BoundedDispatcher[W]) Stop ¶
func (d *BoundedDispatcher[W]) Stop(ctx context.Context) error
Stop halts the dispatcher gracefully. Drains the underlying worker pool first (blocks until in-flight Process calls complete or the given context expires), then stops the completion watcher (if configured). Subsequent Submit calls return ErrStopped.
Stop is idempotent. Multiple calls return nil after the first successful Stop.
Callers waiting on KV-triggered OnComplete callbacks should ensure those complete before Stop returns — typically by canceling the caller's own context and letting Process see the cancellation.
func (*BoundedDispatcher[W]) Submit ¶
func (d *BoundedDispatcher[W]) Submit(work W) error
Submit queues a work item. Returns ErrQueueFull if the queue is at capacity. When a completion watcher is active, Submit also registers the work item in the watcher's tracking map BEFORE enqueuing to the pool so a completion signal that arrives between Submit and Process can't slip past the watcher.
Submit is safe to call from concurrent goroutines.
type Config ¶
type Config[W any] struct { // Workers is the bounded concurrency target. The dispatcher // never runs more than Workers Process calls in flight at once. // Must be > 0. Workers int // QueueSize bounds the submit queue. Submit returns // ErrQueueFull when the queue is at capacity. Must be > 0. QueueSize int // Process is called for each submitted work item, in one of // the worker goroutines. Must not be nil. Process func(ctx context.Context, work W) error // CompletionKVBucket — optional. When set, the dispatcher // subscribes to this bucket via natsclient and tracks // completion signals for each submitted work item. The bucket // must exist before New is called; the dispatcher does NOT // create it. Required if CompletionKeyForWorkItem or // OnComplete are set; the three fields are all-or-nothing. CompletionKVBucket string // CompletionKeyForWorkItem — required if CompletionKVBucket is // set. Returns the KV key the dispatcher watches for this work // item's completion. The function must be pure (no per-call // state) because it's called on Submit AND on each KV-watch // update for matching. CompletionKeyForWorkItem func(W) string // OnComplete — required if CompletionKVBucket is set. Called // when CompletionKVBucket has a write at the key returned by // CompletionKeyForWorkItem for some tracked work item. // Invoked from the dispatcher's watcher goroutine; callers // must not block on shared mutexes acquired by Submit. OnComplete func(ctx context.Context, work W) error }
Config parameterizes BoundedDispatcher construction. The zero value is NOT valid — New rejects Workers <= 0, missing Process, and partial completion-watcher configuration.