handqueue

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package handqueue implements the dispatcher's worker-execution queue per hand-dispatch v0.1 §2-§3: a fairness-scheduled FIFO queue with per-aspect active-worker tracking, soft-cap N, hard-ceiling H, and spillover for idle aspects.

The package name is legacy (per spec §9 directory-rename amnesty); identifiers + types inside use the generic protocol vocabulary (dispatch / worker) rather than the deployment-specific surface vocabulary (hand / summon).

Model:

  • One in-process FIFO list of pending dispatches.
  • Per-aspect set of active worker IDs (map[aspect]map[workerID]struct{}).
  • On arrival:
  • if total active workers < SoftCap N → spawn immediately.
  • else if calling aspect has zero active workers → spillover spawn (still bounded by HardCeiling H).
  • else → enqueue at FIFO tail.
  • if at H regardless → reject ErrHardCeiling.
  • On worker release:
  • if queue empty → nothing.
  • else scan queue head-first preferring an item from an idle aspect (no active workers); else FIFO head. Spawn for picked.

v1 scope: single-host execution.

Index

Constants

View Source
const (
	DefaultDeadline = 30 * time.Minute
	MaxDeadline     = 2 * time.Hour
)

Default deadlines per spec §5.5.

Variables

View Source
var (
	// ErrQueueShutdown is returned by Submit if Shutdown has been called.
	ErrQueueShutdown = errors.New("handqueue: shutdown")
	// ErrHardCeiling is returned when a dispatch arrives while the
	// dispatcher is at HardCeiling H. Per spec §6.3 the broker maps
	// this to a structured DispatchErrorPayload with code=hard_ceiling.
	ErrHardCeiling = errors.New("handqueue: hard_ceiling")
	// ErrQueueFull is returned when a dispatch arrives while the
	// pending FIFO already has MaxQueueDepth entries (#25). Caller
	// should backpressure or surface to the dispatching aspect.
	ErrQueueFull = errors.New("handqueue: queue_full")
)

Errors returned through Submit / via DispatchErrorPayload at the broker layer.

Functions

This section is empty.

Types

type AspectHomeResolver

type AspectHomeResolver interface {
	HomeFor(aspect string) (string, bool)
}

AspectHomeResolver looks up the filesystem path for an aspect by name. The dispatcher gets this from its in-memory roster.

type AspectHomeResolverFunc

type AspectHomeResolverFunc func(aspect string) (string, bool)

AspectHomeResolverFunc adapts a plain function to AspectHomeResolver.

func (AspectHomeResolverFunc) HomeFor

func (f AspectHomeResolverFunc) HomeFor(aspect string) (string, bool)

HomeFor implements AspectHomeResolver.

type AspectTokenResolver

type AspectTokenResolver interface {
	TokenFor(aspect string) (string, bool)
}

AspectTokenResolver looks up the per-aspect bearer token. Per hand-dispatch v0.1 §2.1 (identity inheritance / result attribution): when the dispatcher spawns a worker for aspect X, that worker must authenticate to the broker AS aspect X — same bearer token aspect X uses for its own connection. This makes hand-posts indistinguishable from aspect-posts at the auth layer.

Returns (token, true) when the token store has an entry for the aspect; (empty, false) if not. SpawnExecutor falls back to a static FallbackToken (typically the legacy shared NEXUS_TOKEN) on miss so boot/test paths keep working until per-aspect tokens are everywhere.

type AspectTokenResolverFunc

type AspectTokenResolverFunc func(aspect string) (string, bool)

AspectTokenResolverFunc adapts a plain function to AspectTokenResolver.

func (AspectTokenResolverFunc) TokenFor

func (f AspectTokenResolverFunc) TokenFor(aspect string) (string, bool)

TokenFor implements AspectTokenResolver.

type Config

type Config struct {
	// MaxConcurrent is the soft cap N — steady-state max-concurrent
	// worker count. Default 3 per spec §2.1. Spillover may push the
	// active count past N up to HardCeiling H for idle aspects.
	MaxConcurrent int

	// HardCeiling is the absolute cap H. Past H, dispatch arrivals
	// reject with ErrHardCeiling. If unset (or < MaxConcurrent),
	// defaults to MaxConcurrent + 1 (a defensive minimum); production
	// callers should pass roster_size + 1 per spec §2.1.
	HardCeiling int

	// DefaultDeadline overrides the spec default of 30 minutes per
	// dispatch when set; use 0 for the spec default.
	DefaultDeadline time.Duration

	// MaxDeadline overrides the spec hard maximum of 2 hours; use 0
	// for the spec default. Caller-supplied deadline_secs above this
	// is silently capped, not errored.
	MaxDeadline time.Duration

	// MaxQueueDepth caps the number of dispatches in the pending FIFO
	// (#25). Pre-cap, q.pending was unbounded — an authenticated peer
	// flooding dispatches faster than workers drained could grow the
	// queue (and the goroutines waiting on Submit) without limit. New
	// arrivals past this depth reject with ErrQueueFull. Default
	// from defaultMaxQueueDepth when zero.
	MaxQueueDepth int

	// Executor runs jobs. Required.
	Executor Executor

	// Logger is optional.
	Logger *slog.Logger

	// Now is the time source; tests inject deterministic clocks.
	// Default time.Now.
	Now func() time.Time
}

Config tunes the queue.

type Executor

type Executor interface {
	Execute(ctx context.Context, req frames.DispatchPayload) (frames.DispatchResultPayload, error)
}

Executor runs a single dispatch and returns the result payload. Implementations may take arbitrarily long; the Queue applies the per-dispatch deadline via ctx and kills the worker on expiry by cancelling ctx (subprocess executors should honor it via exec.CommandContext).

type ExecutorFunc

ExecutorFunc adapts a plain function to the Executor interface.

func (ExecutorFunc) Execute

Execute implements Executor.

type HardCeilingError

type HardCeilingError struct {
	Active  int
	SoftCap int
	Limit   int
}

HardCeilingError carries the structured fields the broker needs to build a §6.3 DispatchErrorPayload (active / soft_cap / limit). errors.As-friendly.

func (*HardCeilingError) Error

func (e *HardCeilingError) Error() string

Error implements error.

func (*HardCeilingError) Is

func (e *HardCeilingError) Is(target error) bool

Is so errors.Is(err, ErrHardCeiling) works.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is the fairness-scheduled FIFO dispatcher.

func New

func New(cfg Config) (*Queue, error)

New constructs a Queue.

func (*Queue) Shutdown

func (q *Queue) Shutdown(ctx context.Context) error

Shutdown stops accepting new dispatches and waits for in-flight workers to finish. Pending items still in the FIFO queue have their respCh closed with ErrQueueShutdown so blocked Submit callers unblock.

func (*Queue) Stats

func (q *Queue) Stats() Stats

Stats returns a snapshot of current dispatcher state.

func (*Queue) Submit

Submit submits a dispatch and blocks until the executor returns, the queue shuts down, or ctx cancels. Per fairness rules:

  • Spawns immediately if total active < N.
  • Spawns spillover if calling aspect has no active worker, up to H.
  • Else enqueues at FIFO tail.
  • Rejects with *HardCeilingError if at H.

type QueueFullError

type QueueFullError struct {
	Depth int
	Limit int
}

QueueFullError carries the structured fields for the §6.3 dispatch error payload when MaxQueueDepth is reached.

func (*QueueFullError) Error

func (e *QueueFullError) Error() string

Error implements error.

func (*QueueFullError) Is

func (e *QueueFullError) Is(target error) bool

Is so errors.Is(err, ErrQueueFull) works.

type SpawnExecutor

type SpawnExecutor struct {
	// HarnessPath is the absolute path to the harness executable.
	// If empty, defaults to looking up "harness" on PATH via
	// exec.LookPath semantics.
	HarnessPath string

	// HomeResolver maps aspect name → home folder on this host.
	HomeResolver AspectHomeResolver

	// TokenResolver maps aspect name → bearer token. Per spec §2.1
	// identity inheritance: workers authenticate to the broker as the
	// dispatching aspect (same token), so hand-posts are
	// indistinguishable from aspect-posts. Optional; if nil, falls
	// back to whatever NEXUS_TOKEN is in ExtraEnv.
	TokenResolver AspectTokenResolver

	// Env entries passed to the child, in addition to the parent's.
	// Typically carries NEXUS_UPSTREAM / NEXUS_OUTPOST so the worker
	// can dial back. Per-aspect NEXUS_TOKEN is injected by Execute
	// when TokenResolver returns one for the dispatching aspect; an
	// ExtraEnv NEXUS_TOKEN is the fallback for the no-resolver case
	// and is overridden by the resolver-supplied token when present.
	ExtraEnv []string
}

SpawnExecutor runs a dispatch by spawning a harness subprocess in dispatch mode. The harness binary path is configurable so tests can point at a mock; production wires it to the same binary that runs the aspect (one binary, two modes per transport spec §2.2).

Per hand-dispatch v0.1: the spawned worker boots loaded with the dispatching aspect's home directory — its NEXUS.md, SOUL.md, PRIMER frame the persona for this single fresh-context turn. There is no per-named-hand lookup; slots are interchangeable, persona inherits from the dispatcher.

func (*SpawnExecutor) Execute

Execute spawns a harness subprocess, pipes the request as JSON on stdin, reads the DispatchResultPayload JSON on stdout, returns it.

type Stats

type Stats struct {
	ActiveTotal    int
	SoftCap        int
	HardCeiling    int
	QueueDepth     int
	ActiveByAspect map[string]int
}

Stats returns a snapshot of dispatcher state. Used by Frame and tests to inspect the pool without poking internals.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL