dispatch

package
v1.0.0-beta.101 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package dispatch provides the BoundedDispatcher substrate primitive — a bounded-concurrency parallel worker pool with optional KV-twofer-aware completion handling.

What this is

BoundedDispatcher is the framework-provided primitive for the "rules sequence, components parallelize" architecture (CLAUDE.md Orchestration Boundaries section). It's what components compose internally when they need to do parallel work over a known list of items — drone fleet weather-monitor walking all active missions, scenario-orchestrator dispatching ready requirements under DAG gating, manufacturing batch's per-widget station processing, semspec scenario-orchestrator's bounded-concurrency dispatch pattern.

BoundedDispatcher is NOT:

  • A workflow engine (no DAG semantics, no branching, no lifecycle — see pkg/lifecycle for those)
  • A rule-engine extension (rules don't gain new fan-out primitives; for_each at the rule layer is the at-the-rule- layer fan-out)
  • A replacement for pkg/worker.Pool — it WRAPS it. New uses prefer BoundedDispatcher (higher-level, KV-twofer aware); existing pkg/worker.Pool consumers stay as-is.

Use when

  • A component does internal parallel work over a list of items
  • Bounded concurrency is required (caller picks the worker count)
  • Optionally: each work item completes async and the dispatcher should fire OnComplete when KV signals match

Do NOT use for

  • At-the-rule-layer fan-out (use rule engine's for_each instead)
  • Sequential per-item processing (use a plain loop)
  • Unbounded concurrency (use a bare goroutine pool)

Example usage (no completion watcher)

d, err := dispatch.New(ctx, dispatch.Config[*Requirement]{
    Workers:   c.MaxConcurrent,
    QueueSize: 256,
    Process:   c.processRequirement,
}, dispatch.Deps{
    NATSClient: c.natsClient,
    Logger:     c.logger,
})
if err != nil {
    return fmt.Errorf("dispatch new: %w", err)
}
defer func() {
    if err := d.Stop(context.Background()); err != nil {
        c.logger.Warn("dispatch stop", slog.String("error", err.Error()))
    }
}()

for _, req := range filterReady(...) {
    if err := d.Submit(req); err != nil {
        // ErrQueueFull on overflow; caller chooses
        // retry/drop/backpressure-propagate.
    }
}

Example usage (with KV-twofer completion watcher)

d, err := dispatch.New(ctx, dispatch.Config[*Requirement]{
    Workers:                   c.MaxConcurrent,
    QueueSize:                 256,
    Process:                   c.processRequirement,
    CompletionKVBucket:        "EXECUTION_STATES",
    CompletionKeyForWorkItem:  func(r *Requirement) string {
        return "req." + r.Slug + "." + r.ID
    },
    OnComplete: c.onRequirementComplete,
}, deps)

In the completion-watcher mode, the dispatcher subscribes to the configured KV bucket BEFORE accepting any Submit. Each Submit registers a tracking entry keyed by CompletionKeyForWorkItem(work) before enqueuing to the underlying pool, so a completion-signal write that arrives between Submit and Process can't slip past the watcher.

Shutdown

Stop drains the underlying worker.Pool first (blocks until in-flight Process calls complete), then stops the completion watcher. Callers waiting on KV-triggered OnComplete callbacks should ensure those complete before Stop — typically by canceling the caller's own context and letting Process see the cancellation.

See also

  • pkg/worker — the underlying worker pool (Pool[T])
  • pkg/lifecycle — the workflow-shaped substrate that often pairs with BoundedDispatcher (component-internal fan-out over a Lifecycle workflow's instances)
  • ADR-048 — the canonical decision

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidConfig is returned by New when the Config is
	// internally inconsistent (e.g. Workers <= 0, missing Process,
	// CompletionKVBucket set without CompletionKeyForWorkItem +
	// OnComplete). Catches wiring bugs at construction time.
	ErrInvalidConfig = errors.New("dispatch: invalid config")

	// ErrQueueFull is the underlying pkg/worker.ErrQueueFull
	// re-exported so callers can compare via errors.Is without
	// reaching into pkg/worker directly.
	ErrQueueFull = worker.ErrQueueFull

	// ErrStopped is the underlying pkg/worker.ErrPoolStopped
	// re-exported for the same reason.
	ErrStopped = worker.ErrPoolStopped

	// ErrNATSClientRequired is returned by New when
	// CompletionKVBucket is configured but Deps.NATSClient is nil
	// (no way to subscribe to the bucket).
	ErrNATSClientRequired = errors.New("dispatch: Deps.NATSClient is required when CompletionKVBucket is set")
)

Package error sentinels. Callers compare with errors.Is.

Functions

This section is empty.

Types

type BoundedDispatcher

type BoundedDispatcher[W any] struct {
	// contains filtered or unexported fields
}

BoundedDispatcher is the framework's bounded-concurrency parallel work primitive. Components compose it for internal fan-out over known work lists; optionally with KV-twofer-aware completion handling when the work involves async writes by other processors.

Generic over work type W — work items can be any Go type. The underlying pkg/worker.Pool[W] enforces the bounded queue + non-blocking submit + graceful shutdown semantics; this wrapper adds the optional KV-twofer-completion overlay.

Concurrency: Submit is safe to call from concurrent goroutines (delegates to pool.Submit which is lock-free via channel send). Stop is idempotent. Stats reads atomic counters via the underlying pool.

func New

func New[W any](ctx context.Context, cfg Config[W], deps Deps) (*BoundedDispatcher[W], error)

New constructs a BoundedDispatcher with the given Config + Deps. Returns ErrInvalidConfig for missing required fields, or ErrNATSClientRequired when CompletionKVBucket is set without a natsclient.

If CompletionKVBucket is configured, New blocks briefly to resolve the bucket via natsclient and start the completion- watcher goroutine. Subsequent Submit calls register their tracking entries against the live watcher.

The pool is started immediately — callers don't need to call Start. Workers are running and ready to receive Submit calls when New returns.

func (*BoundedDispatcher[W]) Stats

func (d *BoundedDispatcher[W]) Stats() worker.PoolStats

Stats returns current dispatcher statistics via the underlying pool. Thread-safe; reads atomic counters.

func (*BoundedDispatcher[W]) Stop

func (d *BoundedDispatcher[W]) Stop(ctx context.Context) error

Stop halts the dispatcher gracefully. Drains the underlying worker pool first (blocks until in-flight Process calls complete or the given context expires), then stops the completion watcher (if configured). Subsequent Submit calls return ErrStopped.

Stop is idempotent. Multiple calls return nil after the first successful Stop.

Callers waiting on KV-triggered OnComplete callbacks should ensure those complete before Stop returns — typically by canceling the caller's own context and letting Process see the cancellation.

func (*BoundedDispatcher[W]) Submit

func (d *BoundedDispatcher[W]) Submit(work W) error

Submit queues a work item. Returns ErrQueueFull if the queue is at capacity. When a completion watcher is active, Submit also registers the work item in the watcher's tracking map BEFORE enqueuing to the pool so a completion signal that arrives between Submit and Process can't slip past the watcher.

Submit is safe to call from concurrent goroutines.

type Config

type Config[W any] struct {
	// Workers is the bounded concurrency target. The dispatcher
	// never runs more than Workers Process calls in flight at once.
	// Must be > 0.
	Workers int

	// QueueSize bounds the submit queue. Submit returns
	// ErrQueueFull when the queue is at capacity. Must be > 0.
	QueueSize int

	// Process is called for each submitted work item, in one of
	// the worker goroutines. Must not be nil.
	Process func(ctx context.Context, work W) error

	// CompletionKVBucket — optional. When set, the dispatcher
	// subscribes to this bucket via natsclient and tracks
	// completion signals for each submitted work item. The bucket
	// must exist before New is called; the dispatcher does NOT
	// create it. Required if CompletionKeyForWorkItem or
	// OnComplete are set; the three fields are all-or-nothing.
	CompletionKVBucket string

	// CompletionKeyForWorkItem — required if CompletionKVBucket is
	// set. Returns the KV key the dispatcher watches for this work
	// item's completion. The function must be pure (no per-call
	// state) because it's called on Submit AND on each KV-watch
	// update for matching.
	CompletionKeyForWorkItem func(W) string

	// OnComplete — required if CompletionKVBucket is set. Called
	// when CompletionKVBucket has a write at the key returned by
	// CompletionKeyForWorkItem for some tracked work item.
	// Invoked from the dispatcher's watcher goroutine; callers
	// must not block on shared mutexes acquired by Submit.
	OnComplete func(ctx context.Context, work W) error
}

Config parameterizes BoundedDispatcher construction. The zero value is NOT valid — New rejects Workers <= 0, missing Process, and partial completion-watcher configuration.

type Deps

type Deps struct {
	NATSClient *natsclient.Client
	Logger     *slog.Logger
}

Deps carries the framework-provided dependencies the dispatcher needs at runtime. Logger is optional (falls back to slog.Default); NATSClient is required when CompletionKVBucket is configured.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL