Documentation
¶
Overview ¶
Package handqueue implements the dispatcher's worker-execution queue per hand-dispatch v0.1 §2-§3: a fairness-scheduled FIFO queue with per-aspect active-worker tracking, soft-cap N, hard-ceiling H, and spillover for idle aspects.
The package name is legacy (per spec §9 directory-rename amnesty); identifiers + types inside use the generic protocol vocabulary (dispatch / worker) rather than the deployment-specific surface vocabulary (hand / summon).
Model:
- One in-process FIFO list of pending dispatches.
- Per-aspect set of active worker IDs (map[aspect]map[workerID]struct{}).
- On arrival:
- if total active workers < SoftCap N → spawn immediately.
- else if calling aspect has zero active workers → spillover spawn (still bounded by HardCeiling H).
- else → enqueue at FIFO tail.
- if at H regardless → reject ErrHardCeiling.
- On worker release:
- if queue empty → nothing.
- else scan queue head-first preferring an item from an idle aspect (no active workers); else FIFO head. Spawn for picked.
v1 scope: single-host execution.
Index ¶
Constants ¶
const ( DefaultDeadline = 30 * time.Minute MaxDeadline = 2 * time.Hour )
Default deadlines per spec §5.5.
Variables ¶
var ( // ErrQueueShutdown is returned by Submit if Shutdown has been called. ErrQueueShutdown = errors.New("handqueue: shutdown") // ErrHardCeiling is returned when a dispatch arrives while the // dispatcher is at HardCeiling H. Per spec §6.3 the broker maps // this to a structured DispatchErrorPayload with code=hard_ceiling. ErrHardCeiling = errors.New("handqueue: hard_ceiling") // ErrQueueFull is returned when a dispatch arrives while the // pending FIFO already has MaxQueueDepth entries (#25). Caller // should backpressure or surface to the dispatching aspect. ErrQueueFull = errors.New("handqueue: queue_full") )
Errors returned through Submit / via DispatchErrorPayload at the broker layer.
Functions ¶
This section is empty.
Types ¶
type AspectHomeResolver ¶
AspectHomeResolver looks up the filesystem path for an aspect by name. The dispatcher gets this from its in-memory roster.
type AspectHomeResolverFunc ¶
AspectHomeResolverFunc adapts a plain function to AspectHomeResolver.
type AspectTokenResolver ¶
AspectTokenResolver looks up the per-aspect bearer token. Per hand-dispatch v0.1 §2.1 (identity inheritance / result attribution): when the dispatcher spawns a worker for aspect X, that worker must authenticate to the broker AS aspect X — same bearer token aspect X uses for its own connection. This makes hand-posts indistinguishable from aspect-posts at the auth layer.
Returns (token, true) when the token store has an entry for the aspect; (empty, false) if not. SpawnExecutor falls back to a static FallbackToken (typically the legacy shared NEXUS_TOKEN) on miss so boot/test paths keep working until per-aspect tokens are everywhere.
type AspectTokenResolverFunc ¶
AspectTokenResolverFunc adapts a plain function to AspectTokenResolver.
type Config ¶
type Config struct {
// MaxConcurrent is the soft cap N — steady-state max-concurrent
// worker count. Default 3 per spec §2.1. Spillover may push the
// active count past N up to HardCeiling H for idle aspects.
MaxConcurrent int
// HardCeiling is the absolute cap H. Past H, dispatch arrivals
// reject with ErrHardCeiling. If unset (or < MaxConcurrent),
// defaults to MaxConcurrent + 1 (a defensive minimum); production
// callers should pass roster_size + 1 per spec §2.1.
HardCeiling int
// DefaultDeadline overrides the spec default of 30 minutes per
// dispatch when set; use 0 for the spec default.
DefaultDeadline time.Duration
// MaxDeadline overrides the spec hard maximum of 2 hours; use 0
// for the spec default. Caller-supplied deadline_secs above this
// is silently capped, not errored.
MaxDeadline time.Duration
// MaxQueueDepth caps the number of dispatches in the pending FIFO
// (#25). Pre-cap, q.pending was unbounded — an authenticated peer
// flooding dispatches faster than workers drained could grow the
// queue (and the goroutines waiting on Submit) without limit. New
// arrivals past this depth reject with ErrQueueFull. Default
// from defaultMaxQueueDepth when zero.
MaxQueueDepth int
// Executor runs jobs. Required.
Executor Executor
// Logger is optional.
Logger *slog.Logger
// Now is the time source; tests inject deterministic clocks.
// Default time.Now.
Now func() time.Time
}
Config tunes the queue.
type Executor ¶
type Executor interface {
Execute(ctx context.Context, req frames.DispatchPayload) (frames.DispatchResultPayload, error)
}
Executor runs a single dispatch and returns the result payload. Implementations may take arbitrarily long; the Queue applies the per-dispatch deadline via ctx and kills the worker on expiry by cancelling ctx (subprocess executors should honor it via exec.CommandContext).
type ExecutorFunc ¶
type ExecutorFunc func(ctx context.Context, req frames.DispatchPayload) (frames.DispatchResultPayload, error)
ExecutorFunc adapts a plain function to the Executor interface.
func (ExecutorFunc) Execute ¶
func (f ExecutorFunc) Execute(ctx context.Context, req frames.DispatchPayload) (frames.DispatchResultPayload, error)
Execute implements Executor.
type HardCeilingError ¶
HardCeilingError carries the structured fields the broker needs to build a §6.3 DispatchErrorPayload (active / soft_cap / limit). errors.As-friendly.
func (*HardCeilingError) Is ¶
func (e *HardCeilingError) Is(target error) bool
Is so errors.Is(err, ErrHardCeiling) works.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is the fairness-scheduled FIFO dispatcher.
func (*Queue) Shutdown ¶
Shutdown stops accepting new dispatches and waits for in-flight workers to finish. Pending items still in the FIFO queue have their respCh closed with ErrQueueShutdown so blocked Submit callers unblock.
func (*Queue) Submit ¶
func (q *Queue) Submit(ctx context.Context, req frames.DispatchPayload) (frames.DispatchResultPayload, error)
Submit submits a dispatch and blocks until the executor returns, the queue shuts down, or ctx cancels. Per fairness rules:
- Spawns immediately if total active < N.
- Spawns spillover if calling aspect has no active worker, up to H.
- Else enqueues at FIFO tail.
- Rejects with *HardCeilingError if at H.
type QueueFullError ¶
QueueFullError carries the structured fields for the §6.3 dispatch error payload when MaxQueueDepth is reached.
func (*QueueFullError) Is ¶
func (e *QueueFullError) Is(target error) bool
Is so errors.Is(err, ErrQueueFull) works.
type SpawnExecutor ¶
type SpawnExecutor struct {
// HarnessPath is the absolute path to the harness executable.
// If empty, defaults to looking up "harness" on PATH via
// exec.LookPath semantics.
HarnessPath string
// HomeResolver maps aspect name → home folder on this host.
HomeResolver AspectHomeResolver
// TokenResolver maps aspect name → bearer token. Per spec §2.1
// identity inheritance: workers authenticate to the broker as the
// dispatching aspect (same token), so hand-posts are
// indistinguishable from aspect-posts. Optional; if nil, falls
// back to whatever NEXUS_TOKEN is in ExtraEnv.
TokenResolver AspectTokenResolver
// Env entries passed to the child, in addition to the parent's.
// Typically carries NEXUS_UPSTREAM / NEXUS_OUTPOST so the worker
// can dial back. Per-aspect NEXUS_TOKEN is injected by Execute
// when TokenResolver returns one for the dispatching aspect; an
// ExtraEnv NEXUS_TOKEN is the fallback for the no-resolver case
// and is overridden by the resolver-supplied token when present.
ExtraEnv []string
}
SpawnExecutor runs a dispatch by spawning a harness subprocess in dispatch mode. The harness binary path is configurable so tests can point at a mock; production wires it to the same binary that runs the aspect (one binary, two modes per transport spec §2.2).
Per hand-dispatch v0.1: the spawned worker boots loaded with the dispatching aspect's home directory — its NEXUS.md, SOUL.md, PRIMER frame the persona for this single fresh-context turn. There is no per-named-hand lookup; slots are interchangeable, persona inherits from the dispatcher.
func (*SpawnExecutor) Execute ¶
func (s *SpawnExecutor) Execute(ctx context.Context, req frames.DispatchPayload) (frames.DispatchResultPayload, error)
Execute spawns a harness subprocess, pipes the request as JSON on stdin, reads the DispatchResultPayload JSON on stdout, returns it.