steering

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package steering ships Harbor's per-run steering inbox + the nine-event control taxonomy + the Protocol-edge validation / sanitisation pass (RFC §6.3, brief 02 §2-§4).

What Phase 52 ships

Steering is a Runtime capability surfaced over the Protocol. Planners observe accumulated `Control` signals via `RunContext`; they NEVER touch the inbox. The Runtime owns the inbox. Phase 52 lands the data structures and the edge enforcement:

  • The nine-type control taxonomy (`ControlType` + the canonical `ControlEvent` record) — `INJECT_CONTEXT`, `REDIRECT`, `CANCEL`, `PRIORITIZE`, `PAUSE`, `RESUME`, `APPROVE`, `REJECT`, `USER_MESSAGE`.
  • The per-run `Inbox` owned by the Runtime: an enqueue + drain surface, per-run, identity-scoped. A process-wide `Registry` mints / looks up / retires per-run inboxes.
  • The Protocol-edge `Validate` pass: the RFC §6.3 payload bounds (depth ≤ 6, ≤ 64 keys, ≤ 50 list items, ≤ 4096 chars / string, ≤ 16 KiB total) — enforced loud, never silently truncated.
  • The per-event `Scope` check: each control type declares the minimum caller scope per RFC §6.3; a mismatch is rejected loud with `ErrScopeMismatch` (the Protocol projection maps that to 403 + an audit emit).

What Phase 53 adds (the run-loop wiring)

Phase 52 shipped the primitive (taxonomy + inbox + validation + scope). Phase 53 ships `RunLoop` — the per-run planner-step loop that is the §13 first consumer of BOTH this primitive AND the Phase 50 `pauseresume.Coordinator`:

  • `RunLoop.Run` drives a `planner.Planner` to a terminal `planner.Finish`, draining the per-run `Inbox` exactly ONCE per step boundary (`Inbox.Drain` — never mid-tool-call, the drain-between-steps invariant from brief 02 §6).
  • The nine control events' side effects are applied (`apply.go`): CANCEL hard/soft, PAUSE/RESUME/APPROVE/REJECT onto the unified `pauseresume.Coordinator`, INJECT_CONTEXT/REDIRECT/USER_MESSAGE projected onto `RunContext.Control` (the planner sees ONLY this — never the `Inbox`), PRIORITIZE onto the `tasks.TaskRegistry`.
  • A planner's `RequestPause` decision routes through `Coordinator.Request`; `Inbox.WaitForEvent` blocks the loop (no busy-spin) until a RESUME / APPROVE arrives, which routes through `Coordinator.Resume`.
  • Per-session applied-control history is capped (`controlHistory`, `MaxControlHistory`, newest-wins ring).
  • `control.received` / `control.applied` lifecycle events are emitted (`events.go`).

See D-071 and the phase-53 plan's "§13 primitive-with-consumer — discharged here" section.

Pause-family controls converge on the unified primitive

`PAUSE` / `RESUME` / `APPROVE` / `REJECT` are taxonomy entries here — Phase 52 validates them and scope-checks them. Phase 53 wires their side effects onto the ONE pause/resume primitive (`internal/runtime/pauseresume`, Phase 50) — Phase 52 does NOT reinvent pause coordination (CLAUDE.md §7 rule 4).

Fail loudly

There is no silent-degradation path (RFC §3.4, CLAUDE.md §13 + §5). An oversize / over-deep payload is REJECTED with a wrapped `ErrPayloadInvalid`, never truncated to fit. A missing identity triple fails closed with `ErrIdentityRequired`. A scope mismatch fails closed with `ErrScopeMismatch`. An unknown control type is rejected with `ErrUnknownControlType`.

Concurrent reuse (D-025)

The process-wide `Registry` is a compiled artifact: immutable after construction, with the per-run inbox map behind a documented-invariant `sync.Mutex`. A per-run `Inbox` is itself concurrent-safe — N Protocol-edge goroutines may `Enqueue` while the run loop `Drain`s — its queue is mutex-guarded. Per-run state never leaks across runs: each `Inbox` is keyed by the run's identity quadruple and holds only that run's events. concurrent_test.go pins N≥100 under -race.

Index

Constants

View Source
const (
	// EventTypeControlRejected — emitted when a steering submission is
	// rejected at the edge: an unknown control type, a payload-bounds
	// violation, or — the master-plan acceptance case — a per-event
	// scope mismatch. Payload is ControlRejectedPayload.
	EventTypeControlRejected events.EventType = "control.rejected"

	// EventTypeControlReceived — emitted by the RunLoop when a control
	// event is drained from the per-run inbox at a step boundary
	// (before its side effect is applied). Payload is
	// ControlLifecyclePayload.
	EventTypeControlReceived events.EventType = "control.received"

	// EventTypeControlApplied — emitted by the RunLoop after a drained
	// control event's side effect has been applied (the goal was
	// rewritten, the pause was requested / resumed, the task was
	// reprioritised, etc.). Payload is ControlLifecyclePayload — the
	// Err field is non-empty when the side effect failed.
	EventTypeControlApplied events.EventType = "control.applied"
)

Canonical event types this package registers into the events package's canonical registry from init(), so a Publish never trips events.ErrUnknownEventType.

Phase 52 emitted exactly one of these: control.rejected, on a validation / scope failure at Enqueue time. Phase 53 adds the two lifecycle events brief 02 §3 names — control.received (a control event was drained from the per-run inbox by the RunLoop) and control.applied (the RunLoop applied the control's side effect). Together with control.rejected they are the full steering audit trail; Phase 54's Protocol edge surfaces them over the wire.

View Source
const (
	// MaxPayloadDepth — the deepest nesting level a payload may reach.
	// The top-level map is depth 1; a value nested one map / list
	// inside it is depth 2; and so on. Depth > 6 is rejected.
	MaxPayloadDepth = 6
	// MaxPayloadKeys — the most keys any single map in the payload
	// may carry. Applies per-map, not cumulatively (a payload with
	// two maps of 64 keys each is valid; one map of 65 is not).
	MaxPayloadKeys = 64
	// MaxPayloadListItems — the most elements any single list in the
	// payload may carry.
	MaxPayloadListItems = 50
	// MaxPayloadStringLen — the most characters (runes) any single
	// string leaf may carry.
	MaxPayloadStringLen = 4096
	// MaxPayloadTotalBytes — the cap on the payload's canonical JSON
	// encoding. 16 KiB. The total-size check runs first so a
	// pathologically large flat payload is rejected before the
	// structural walk.
	MaxPayloadTotalBytes = 16 * 1024
)

Payload bounds — RFC §6.3 "Steering payload bounds" (Settled), verbatim from the predecessor's steering constants (brief 02 §6 finding 8: "Harbor keeps the same caps"). Enforced at the Protocol edge by Validate before a ControlEvent is ever enqueued.

View Source
const DefaultMaxSteps = 64

DefaultMaxSteps is the planner-step cap RunLoop.Run applies when RunSpec.MaxSteps is ≤ 0. A run that has not reached a terminal Finish after this many steps terminates loud with ErrMaxStepsExceeded — an unbounded planner loop is a misconfiguration, never a silent spin.

View Source
const MaxControlHistory = 256

MaxControlHistory is the default per-session cap on the applied-control history ring (RFC §6.3 — "control-history capped per session"). A long-lived session can receive an unbounded number of control events over its lifetime; the runtime keeps only the newest MaxControlHistory applied entries so the bookkeeping is bounded. Overridable per RunLoop via WithMaxControlHistory.

Variables

View Source
var (
	// ErrIdentityRequired — a steering operation was called with an
	// identity quadruple missing one of (tenant, user, session, run).
	// The inbox fails closed (CLAUDE.md §6 rule 9 + D-001); there is
	// no identity-downgrading knob. The run component is mandatory
	// too: the inbox is per-run.
	ErrIdentityRequired = errors.New("steering: identity quadruple incomplete")

	// ErrUnknownControlType — a ControlEvent carried a Type that is
	// not one of the nine canonical control types (RFC §6.3). Rejected
	// at the edge rather than enqueued.
	ErrUnknownControlType = errors.New("steering: unknown control type")

	// ErrPayloadInvalid — a ControlEvent payload violated one of the
	// RFC §6.3 bounds (depth > 6, > 64 keys, > 50 list items, a string
	// > 4096 chars, or > 16 KiB total). The payload is REJECTED loud,
	// never silently truncated to fit (CLAUDE.md §5 "fail loudly").
	// The wrapped message names which bound was exceeded.
	ErrPayloadInvalid = errors.New("steering: control payload failed validation")

	// ErrUnsupportedPayloadValue — a ControlEvent payload carried a
	// leaf value of a type the JSON-shaped steering surface does not
	// accept (a channel, a func, a complex number, etc.). Distinct
	// from ErrPayloadInvalid (a bound exceeded): this is a structural
	// rejection. Fails loud rather than coercing.
	ErrUnsupportedPayloadValue = errors.New("steering: control payload carries an unsupported value type")

	// ErrScopeMismatch — the caller's presented Scope is below the
	// minimum scope the control Type requires (RFC §6.3 per-event
	// scopes), or a cross-tenant steering attempt was made without the
	// admin scope. Fails closed; the Protocol edge maps this to 403 +
	// audit.
	ErrScopeMismatch = errors.New("steering: caller scope insufficient for control type")

	// ErrInvalidScope — a Scope value outside the three canonical
	// scopes was presented. Rejected rather than treated as the
	// weakest scope.
	ErrInvalidScope = errors.New("steering: invalid caller scope")

	// ErrInboxNotFound — a Registry lookup / drain / retire was asked
	// for a run quadruple with no live inbox. Typical cause: the run
	// already ended and its inbox was retired, or it never started.
	ErrInboxNotFound = errors.New("steering: no inbox for run")

	// ErrInboxExists — Registry.Open was called for a run quadruple
	// that already has a live inbox. Opening twice would orphan the
	// first inbox's queued events; the second call is rejected loud.
	ErrInboxExists = errors.New("steering: inbox already open for run")

	// ErrNoPlanner — RunLoop.Run was called with a nil RunSpec.Planner.
	// The planner is the swappable reasoning policy the loop drives;
	// there is no default. Fails closed (CLAUDE.md §5) rather than
	// no-op'ing.
	ErrNoPlanner = errors.New("steering: RunSpec.Planner is nil")

	// ErrRunLoopMisconfigured — NewRunLoop was called with a nil
	// Registry or a nil Coordinator. Both are mandatory: the Registry
	// owns the per-run inbox the loop drains; the Coordinator is the
	// ONE pause/resume primitive PAUSE / RESUME / APPROVE / REJECT
	// converge on (CLAUDE.md §7 rule 4).
	ErrRunLoopMisconfigured = errors.New("steering: RunLoop missing a mandatory dependency")

	// ErrNoOutstandingPause — a RESUME / APPROVE / REJECT control event
	// arrived for a run with no pause Token recorded. The run is not
	// paused; advancing a non-existent pause is a caller / operator bug
	// and is surfaced loud rather than silently ignored.
	ErrNoOutstandingPause = errors.New("steering: control targets a run with no outstanding pause")

	// ErrMaxStepsExceeded — RunLoop.Run drove the planner for
	// RunSpec.MaxSteps planner steps without reaching a terminal
	// Finish decision. The loop terminates loud rather than spinning
	// forever — an unbounded planner loop is a misconfiguration.
	ErrMaxStepsExceeded = errors.New("steering: run loop exceeded the configured max planner steps")
)

Sentinel errors. Callers compare via errors.Is. The Protocol-edge projection (Phase 54) maps ErrScopeMismatch to a 403 + an audit emit, and ErrPayloadInvalid / ErrUnknownControlType to a 400.

View Source
var ErrDecisionShapeUnsupported = errors.New("steering: ToolExecutor does not support this decision shape")

ErrDecisionShapeUnsupported — returned by ToolExecutor implementations for decision shapes the executor does not yet dispatch (e.g. the dev binary's V1.1 executor handles CallTool only; CallParallel / SpawnTask / AwaitTask need their own dispatcher layers). The runloop records the error as the step's observation so the planner sees "this didn't run" and can re-plan.

Functions

func CheckScope

func CheckScope(t ControlType, callerScope Scope, callerTenant string, runIdentity identity.Quadruple) error

CheckScope enforces the RFC §6.3 per-event scope mapping for one steering submission. It fails closed:

  • an unknown control type → ErrUnknownControlType;
  • an unrecognised caller scope → ErrInvalidScope;
  • a cross-tenant submission (callerTenant != run tenant) by a non-admin caller → ErrScopeMismatch ("Cross-tenant steering requires admin" — RFC §6.3);
  • a caller scope below the control type's per-type minimum → ErrScopeMismatch.

callerScope is the (trust-based at Phase 52) Scope the Protocol edge derived from the caller's JWT. callerTenant is the tenant the caller authenticated under; runIdentity is the run the steering targets. CheckScope is pure and holds no state — safe for concurrent use (D-025).

func EmitRejection

func EmitRejection(ctx context.Context, bus events.EventBus, q identity.Quadruple, t ControlType, callerScope Scope, rejectErr error) error

EmitRejection publishes a control.rejected event onto the bus for a steering submission that Inbox.Enqueue rejected. It is the audit-on-scope-mismatch path the master-plan Phase 52 acceptance names ("per-event scope mismatch returns 403 + audit") — the 403 is the Protocol edge's job (Phase 54); the audit emit is this. The Protocol edge calls EmitRejection whenever Enqueue returns a non-nil error.

rejectErr is the error Enqueue returned; it is classified into a stable Reason string (never inspected for its message bytes, which may quote caller data). The event carries the run's identity quadruple so identity-scoped subscribers see it. A nil bus, a nil rejectErr, or an events.Publish failure is returned wrapped — the caller (the Protocol edge) decides whether an un-emittable audit event should fail the request loud; EmitRejection never silently swallows it.

func IsValidControlType

func IsValidControlType(t ControlType) bool

IsValidControlType reports whether t is one of the nine canonical control types.

func IsValidScope

func IsValidScope(s Scope) bool

IsValidScope reports whether s is one of the three canonical scopes.

func ValidatePayload

func ValidatePayload(p map[string]any) error

ValidatePayload runs the RFC §6.3 payload bounds against p. It returns nil for a valid payload (including a nil / empty payload — a control event with no payload is valid, e.g. a bare CANCEL) and a wrapped ErrPayloadInvalid / ErrUnsupportedPayloadValue naming the violated bound otherwise.

Order of checks (each fails loud, none truncates — CLAUDE.md §5):

  1. Total-bytes: the canonical JSON encoding must be ≤ 16 KiB. A payload that cannot be JSON-encoded at all (a channel / func leaf) fails here with ErrUnsupportedPayloadValue.
  2. Structural walk: depth ≤ 6, per-map keys ≤ 64, per-list items ≤ 50, per-string runes ≤ 4096. The walk also rejects any leaf whose Go type is outside the JSON-shaped accepted set.

ValidatePayload is pure and holds no state — safe for concurrent use by N goroutines (D-025).

Types

type AppliedControl

type AppliedControl struct {
	// Type is the control type that was applied.
	Type ControlType
	// RunID is the run the control targeted (a session may host multiple
	// concurrent runs — the run component disambiguates).
	RunID string
	// AppliedAt is the wall-clock time the side effect was applied,
	// stamped from the RunLoop's Clock.
	AppliedAt time.Time
	// Err is the non-nil error when the side effect failed (e.g. a
	// PRIORITIZE whose task does not exist). A failed apply is still
	// recorded — the history is the audit trail, and a silent drop would
	// violate CLAUDE.md §5 "fail loudly".
	Err error
}

AppliedControl is one entry in a session's applied-control history. It is the runtime's own bookkeeping record — what control was applied to which run, when, and whether the side effect succeeded. The rejected caller payload itself is NOT carried (mirroring ControlRejectedPayload): the history is a low-cardinality audit trail, not a payload archive.

type Clock

type Clock interface {
	Now() time.Time
}

Clock is the minimal time source the inbox uses for the per-event `EnqueuedAt` stamp. Tests inject a controllable clock so no test sleeps for synchronisation (CLAUDE.md §11). Production code uses the real-time `systemClock`.

type ControlEvent

type ControlEvent struct {
	// Type is one of the nine canonical control types. An invalid
	// Type is rejected by Inbox.Enqueue with ErrUnknownControlType.
	Type ControlType
	// Identity is the run quadruple this control targets. It MUST
	// match the Inbox's own quadruple — Enqueue rejects a mismatch
	// (an event for run A must never land on run B's inbox).
	Identity identity.Quadruple
	// CallerScope is the (trust-based at Phase 52) Scope the Protocol
	// edge derived from the submitting caller's JWT. Enqueue runs
	// CheckScope against it.
	CallerScope Scope
	// CallerTenant is the tenant the submitting caller authenticated
	// under. Used by the cross-tenant scope check (RFC §6.3).
	CallerTenant string
	// Payload is the sanitised, bounds-checked control payload. May
	// be nil — a bare CANCEL / PAUSE carries no payload. Enqueue runs
	// ValidatePayload against it.
	Payload map[string]any
	// EventID is the caller-supplied idempotency / correlation key
	// (ULID-shaped, mirrors events.EventID). Optional at Phase 52 —
	// Phase 53's control-history dedupe uses it. Empty is permitted.
	EventID string
	// EnqueuedAt is stamped by Inbox.Enqueue from the Inbox's Clock.
	// Callers MUST NOT pre-fill it; a non-zero value is rejected so
	// the Inbox owns the timeline (mirrors events.Event.Sequence).
	EnqueuedAt time.Time
}

ControlEvent is the canonical steering record (RFC §6.3, brief 02 §2 `ControlEvent`). It is what the Protocol edge constructs from an inbound control request, validates, scope-checks, and enqueues on the run's Inbox. Phase 53 drains these between planner steps and projects the result onto RunContext.Control.

type ControlLifecyclePayload

type ControlLifecyclePayload struct {
	events.SafeSealed
	// Type is the control type that was received / applied.
	Type string
	// Outcome is a stable, low-cardinality classification of the apply
	// result — "received" for control.received, and one of "applied" /
	// "failed" for control.applied. Empty on a control.received event.
	Outcome string
	// Err is a short, redaction-safe description of why the side
	// effect failed, when Outcome == "failed". Empty otherwise. The
	// RunLoop derives this from a sentinel classification, never the
	// raw error message (which may quote caller data).
	Err string
}

ControlLifecyclePayload is the typed payload for control.received and control.applied events. SafePayload by construction: every field is the RunLoop's own bookkeeping — the control Type is one of nine canonical enum values, the Outcome / Err strings are low-cardinality runtime-derived classifications. The caller-controlled control payload itself is NOT carried (mirroring ControlRejectedPayload): these events are a low-cardinality audit trail, not a payload archive.

type ControlRejectedPayload

type ControlRejectedPayload struct {
	events.SafeSealed
	// Type is the control type that was rejected (may be the empty
	// string when the rejection was an unknown / unparsable type).
	Type string
	// Reason is a stable, low-cardinality classification of why the
	// submission was rejected — one of "unknown_type",
	// "payload_invalid", "scope_mismatch", "identity_invalid".
	Reason string
	// CallerScope is the scope the rejected caller presented.
	CallerScope string
}

ControlRejectedPayload is the typed payload for a control.rejected event. SafePayload by construction: every field is the steering edge's own bookkeeping — the control Type is one of nine canonical enum values, the Reason is a fixed sentinel-derived string, the CallerScope is one of three canonical enum values. The rejected payload itself is NOT carried — it may hold caller-controlled data and is exactly what was rejected; persisting it would defeat the rejection.

type ControlType

type ControlType string

ControlType is the string-typed enum of the nine canonical control event types (RFC §6.3 — Settled). The wire strings are the RFC's verbatim uppercase identifiers; the Protocol projection (Phase 54) accepts exactly these.

const (
	// ControlInjectContext — append operator-supplied context to the
	// run's trajectory; visible on the planner's next step.
	ControlInjectContext ControlType = "INJECT_CONTEXT"
	// ControlRedirect — rewrite the run's goal. Requires the user
	// (the agent's owner) — RFC §6.3.
	ControlRedirect ControlType = "REDIRECT"
	// ControlCancel — cancel the run (hard or soft; the soft/hard
	// distinction is a Phase 53 payload concern).
	ControlCancel ControlType = "CANCEL"
	// ControlPrioritize — change the run's task priority. Requires
	// admin — RFC §6.3.
	ControlPrioritize ControlType = "PRIORITIZE"
	// ControlPause — pause the run at the next planner-step boundary.
	// Phase 53 wires this onto the unified pause/resume primitive.
	ControlPause ControlType = "PAUSE"
	// ControlResume — resume a paused run. Phase 53 wires this onto
	// the unified pause/resume primitive.
	ControlResume ControlType = "RESUME"
	// ControlApprove — approve a HITL-gated step. Phase 53 wires this
	// onto the unified pause/resume primitive (advance a pause).
	ControlApprove ControlType = "APPROVE"
	// ControlReject — reject a HITL-gated step. Phase 53 wires this
	// onto the unified pause/resume primitive.
	ControlReject ControlType = "REJECT"
	// ControlUserMessage — inject a user-authored message into the
	// run; visible on the planner's next step.
	ControlUserMessage ControlType = "USER_MESSAGE"
)

The nine canonical control types (RFC §6.3, brief 02 §2). Adding a tenth is an RFC change — the taxonomy is Settled.

func ControlTypes

func ControlTypes() []ControlType

ControlTypes returns a deterministic, lexicographically-sorted snapshot of the nine canonical control types. Useful for the Protocol projection's allow-list and for exhaustiveness tests.

type Inbox

type Inbox struct {
	// contains filtered or unexported fields
}

Inbox is the per-run steering inbox. The Runtime owns it; planners never touch it (RFC §6.3 — "the Runtime owns the inbox"; planners observe RunContext.Control only). It is a FIFO enqueue + drain surface, identity-scoped to exactly one run quadruple.

An Inbox is concurrent-safe: N Protocol-edge goroutines may Enqueue while the run loop Drains, all against one Inbox — the queue is mutex-guarded (D-025). Per-run state never leaks across runs because each Inbox holds only its own run's events and is keyed by its own quadruple.

Construct an Inbox via Registry.Open; do not construct one directly.

func (*Inbox) Drain

func (in *Inbox) Drain() ([]ControlEvent, error)

Drain atomically removes and returns every queued ControlEvent in FIFO order, leaving the inbox empty. This is the surface Phase 53's run loop calls between planner steps. Drain on an empty inbox returns an empty (non-nil) slice. Drain on a retired inbox returns ErrInboxNotFound.

The returned slice is owned by the caller — the Inbox keeps no reference to it.

func (*Inbox) Enqueue

func (in *Inbox) Enqueue(ev ControlEvent) error

Enqueue validates, scope-checks, and appends a ControlEvent to the inbox queue. It fails closed (no silent drop — CLAUDE.md §5):

  • ErrIdentityRequired — ev.Identity is not the Inbox's own quadruple (a control for another run must never land here), or the quadruple is incomplete.
  • ErrUnknownControlType — ev.Type is not a canonical type.
  • ErrScopeMismatch / ErrInvalidScope — the caller scope is below the type's RFC §6.3 minimum, or a cross-tenant non-admin submission.
  • ErrPayloadInvalid / ErrUnsupportedPayloadValue — the payload violated an RFC §6.3 bound or carried an unencodable leaf.
  • ErrInboxNotFound — the inbox has been retired (Close called).

On success the event's EnqueuedAt is stamped from the Inbox's Clock and the event is appended. A caller-supplied non-zero EnqueuedAt is rejected with ErrPayloadInvalid — the Inbox owns the timeline.

func (*Inbox) Identity

func (in *Inbox) Identity() identity.Quadruple

Identity returns the run quadruple this Inbox is scoped to.

func (*Inbox) Len

func (in *Inbox) Len() int

Len returns the number of currently-queued events. Primarily for tests and observability; Phase 53's run loop uses Drain.

func (*Inbox) WaitForEvent

func (in *Inbox) WaitForEvent(ctx context.Context) error

WaitForEvent blocks until the inbox has at least one queued event, the inbox is retired, or ctx is cancelled. It is the surface Phase 53's RunLoop uses to wait — without busy-spinning — for a steering control to arrive while a run is paused (a pause is outstanding and the planner must not be re-entered until a RESUME / APPROVE / REJECT lands).

It returns:

  • nil — an event is queued; the caller should Drain.
  • ctx.Err() — the wait was cancelled.
  • ErrInboxNotFound — the inbox was retired while waiting.

WaitForEvent does NOT itself Drain — the caller calls Drain after a nil return. This keeps Drain the single atomic-remove surface.

type Option

type Option func(*Registry)

Option configures a Registry at construction time.

func WithClock

func WithClock(c Clock) Option

WithClock overrides the Registry's time source — the Clock each Inbox stamps EnqueuedAt from. Tests inject a controllable clock so no test sleeps for synchronisation (CLAUDE.md §11). The default is the real-time system clock.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is the process-wide owner of per-run steering inboxes. The Runtime builds ONE Registry per process and shares it across every run; it mints an Inbox when a run starts (Open), hands the Inbox to the Protocol edge (Lookup) and the run loop, and retires the Inbox when the run ends (Retire).

Registry is a compiled artifact (D-025): immutable after construction, with the run→inbox map behind a documented-invariant sync.Mutex. One Registry is safe to share across N concurrent goroutines — concurrent_test.go pins N≥100 under -race. Per-run state never leaks across runs: each run's events live on its own Inbox, keyed by its own identity quadruple.

func NewRegistry

func NewRegistry(opts ...Option) *Registry

NewRegistry builds a process-wide steering inbox Registry. The returned Registry is ready for concurrent use.

func (*Registry) Len

func (r *Registry) Len() int

Len returns the number of currently-open inboxes. Primarily for tests and observability.

func (*Registry) Lookup

func (r *Registry) Lookup(q identity.Quadruple) (*Inbox, error)

Lookup returns the live Inbox for the run quadruple. The Protocol edge calls Lookup to enqueue an inbound control onto the right run's inbox; the run loop calls it to Drain. It fails closed with ErrInboxNotFound when no Inbox is open for the quadruple (the run never started, or its inbox was already retired) and ErrIdentityRequired on an incomplete quadruple.

func (*Registry) Open

func (r *Registry) Open(q identity.Quadruple) (*Inbox, error)

Open mints a fresh per-run Inbox for the run quadruple and registers it. It fails closed:

  • ErrIdentityRequired — the quadruple is incomplete (missing one of tenant / user / session / run).
  • ErrInboxExists — an Inbox is already open for this exact quadruple. Opening twice would orphan the first inbox's queued events, so the second call is rejected loud rather than silently replacing it.

The returned Inbox is owned by the Registry; retire it with Retire when the run ends.

func (*Registry) Retire

func (r *Registry) Retire(q identity.Quadruple) error

Retire removes the run's Inbox from the Registry and closes it: any queued-but-undrained events are dropped (the run is ending; there is nothing left to apply them to) and further Enqueue / Drain on the retired Inbox fail with ErrInboxNotFound. Retire fails closed with ErrInboxNotFound when no Inbox is open for the quadruple, and ErrIdentityRequired on an incomplete quadruple. Retire is the run-lifecycle counterpart of Open; calling it twice for the same run returns ErrInboxNotFound on the second call.

type RunLoop

type RunLoop struct {
	// contains filtered or unexported fields
}

RunLoop is Harbor's per-run planner-step loop — the runtime component that drives a planner.Planner to a terminal planner.Finish decision, draining the per-run steering Inbox between steps and routing pause decisions through the unified pauseresume.Coordinator.

Why this is the steering wiring

Phase 52 shipped the steering primitive (the inbox, the nine-type taxonomy, ValidatePayload, CheckScope, the Registry). Phase 50 shipped the pause/resume primitive (the Coordinator). Neither did anything by itself — there was no run loop to drain the inbox or to route a RequestPause decision through the Coordinator. RunLoop IS that loop. It is the §13 first consumer of BOTH primitives, landing in the same wave (Wave 9, Stage 3) per CLAUDE.md §13 + D-067 §4 + D-070 §5.

The loop (brief 02 §4)

Per run, RunLoop owns a tight loop:

Open the run's Inbox on the Registry
for step := 0; step < MaxSteps; step++ {
    drain the Inbox            -- once, at the step boundary
    apply each control event   -- CANCEL / PAUSE / REDIRECT / ... side effects
    project onto RunContext.Control  -- the planner sees ONLY this
    if a RESUME/APPROVE advanced a pause: clear the pause, continue
    if a REJECT advanced a pause: terminate Finish{ConstraintsConflict}
    decision := Planner.Next(ctx, runContext)
    switch decision {
        RequestPause -> Coordinator.Request; block; re-enter on RESUME/APPROVE
        Finish       -> Retire the Inbox; return
        other        -> (decision execution is a later-phase concern;
                         Phase 53 records the step and re-enters)
    }
}
Retire the Inbox  -- always, even on error

The full applyEvent treatment happens exactly ONCE per step boundary. While a decision execution is in flight the loop ALSO drains the inbox (D-192), but consumes ONLY approval-bridge-eligible APPROVE / REJECT controls there (the D-097 gate bridge — without the mid-step drain an approval-gated tool deadlocks the run: RunGuarded parks until ResolveApproval, whose only production caller is this loop's drain). Every other control drained mid-step is deferred verbatim to the next boundary, preserving brief 02 §6's step-boundary semantics. The planner observes the result via RunContext.Control; it never touches the Inbox (brief 02 §5 sharp-edge #2).

Concurrent reuse (D-025)

RunLoop is a compiled artifact: every field is set once at construction (the Registry, the Coordinator, the applier's dependencies, the control-history ring, the clock — all immutable after NewRunLoop returns). There is NO per-run state on the struct: Run reads its run-specific data from ctx + the RunSpec argument, and the per-step accumulator (stepControl) lives on the run's own goroutine stack. One RunLoop is safe to share across N concurrent goroutines; concurrent_test.go pins N≥100 under -race.

func NewRunLoop

func NewRunLoop(reg *Registry, coord pauseresume.Coordinator, opts ...RunLoopOption) (*RunLoop, error)

NewRunLoop builds a RunLoop. The Registry (Phase 52 — owns the per-run inboxes the loop drains) and the Coordinator (Phase 50 — the ONE pause/resume primitive PAUSE / RESUME / APPROVE / REJECT converge on) are mandatory; a nil either fails loud with ErrRunLoopMisconfigured. Everything else is optional (see the WithXxx options).

The returned RunLoop is immutable after construction (D-025) and safe for concurrent use by N goroutines.

func (*RunLoop) ControlHistory

func (rl *RunLoop) ControlHistory(sessionID string) []AppliedControl

ControlHistory returns a copy of a session's applied-control history, oldest-to-newest. Primarily for observability + tests; the Protocol edge (Phase 54) projects this as the session's steering audit trail.

func (*RunLoop) Run

func (rl *RunLoop) Run(ctx context.Context, spec RunSpec) (planner.Finish, error)

Run drives the planner to a terminal planner.Finish decision. It Opens the run's Inbox on the Registry, drives the drain-apply-project-Next loop, and Retires the Inbox on exit — ALWAYS, even on error (a leaked inbox would orphan a run's steering surface).

Run fails closed:

  • ErrNoPlanner — spec.Planner is nil.
  • ErrIdentityRequired — spec.Base.Quadruple is an incomplete quadruple (the per-run isolation gate, CLAUDE.md §6).
  • ErrInboxExists — an Inbox is already open for this run quadruple.
  • ErrMaxStepsExceeded — the planner did not reach a terminal Finish within spec.MaxSteps steps.
  • any wrapped planner / Coordinator / TaskRegistry error from a step's Next call or a control event's side effect.

On success Run returns the terminal planner.Finish the planner emitted.

type RunLoopOption

type RunLoopOption func(*runLoopConfig)

RunLoopOption configures a RunLoop at construction. Options are applied in order; later options override earlier ones for the same field.

func WithApprovalGates

func WithApprovalGates(gates map[string]*approval.ApprovalGate) RunLoopOption

WithApprovalGates hands the RunLoop the catalog-applied approval gates keyed by tool name (the assembly's catalog band — `assemble.Assemble` → `Stack.Gates` — produces this map via the catalog Builder's `Deps.AppliedGates` out-channel; D-090, D-097, D-197). When a drained CONTROL_APPROVE / CONTROL_REJECT event references a `token` the bridge tries each gate's `ResolveApproval` in turn; the gate that owns the token resumes its `pending` waiter so the wrapped tool's `Invoke` unblocks. When no gate owns the token (a plain RESUME or an OAuth-pause APPROVE), the apply path falls back to the direct `Coordinator.Resume`. A nil / empty map disables the bridge — the loop behaves exactly as before D-097 (direct Resume only). See `applier.advancePause` for the step-boundary routing and `applier.routeApprovalControl` + `RunLoop.dispatchDecision` for the mid-step routing that fires while a decision execution is in flight (D-192 — the path a planner-dispatched approval-gated tool resumes through).

Coupling note (acceptable; D-097): `internal/runtime/steering` imports `internal/tools/approval` for the gate type. Both packages are runtime mechanism — the boundary is acceptable because the bridge IS the runtime-side wiring the gate needs to receive wire-side decisions.

func WithHardCancelHook

func WithHardCancelHook(fn func(ctx context.Context, runID string) error) RunLoopOption

WithHardCancelHook wires the cancellation propagator a hard CANCEL fires. The hook is typically engine.Cancel(runID) — it propagates a cancellation context into an in-flight decision execution (brief 02 §6). The RunLoop holds ONLY a func(ctx, runID) error, never a hard import of internal/runtime/engine — this keeps the step-loop family decoupled from the graph engine. A nil hook is tolerated: a hard CANCEL still sets Control.Cancelled (so the run terminates at the next boundary), the hook only accelerates an in-flight tool's teardown.

func WithMaxControlHistory

func WithMaxControlHistory(n int) RunLoopOption

WithMaxControlHistory overrides the per-session applied-control history cap. A non-positive value falls back to MaxControlHistory.

func WithPauseStatusRecheckInterval added in v1.3.0

func WithPauseStatusRecheckInterval(d time.Duration) RunLoopOption

WithPauseStatusRecheckInterval overrides the parked run's Coordinator.Status re-check cadence — the delivery-independent backstop that makes the timeout-terminal guarantee hold even when the `pause.resumed` bus wake is dropped (and the ONLY wake channel on a bus-less RunLoop). The default (pauseStatusRecheckInterval, 30s) is deliberately coarse; tests inject a small interval so the backstop branch is exercisable without a 30s wall-clock wait (Wave C checkpoint audit). A non-positive d keeps the default.

func WithRunLoopBus

func WithRunLoopBus(b events.EventBus) RunLoopOption

WithRunLoopBus hands the RunLoop an events.EventBus. When set, the RunLoop emits control.received (a control event was drained) and control.applied (its side effect was applied or failed). When NOT set, no lifecycle events are emitted — event emission is observability, not correctness.

func WithRunLoopClock

func WithRunLoopClock(c Clock) RunLoopOption

WithRunLoopClock overrides the RunLoop's time source — the Clock the applied-control history stamps AppliedAt from. Tests inject a controllable clock so no test sleeps for synchronisation (CLAUDE.md §11). The default is the real-time system clock.

func WithRunLoopLogger added in v1.3.0

func WithRunLoopLogger(l *slog.Logger) RunLoopOption

WithRunLoopLogger hands the RunLoop a logger for degradation / recovery lines (CLAUDE.md §5 — "Warn: unexpected but recovered", e.g. a parked run's bus subscription failing and falling back to the Status re-check). Defaults to slog.Default(); the production assembly threads the telemetry-backed logger.

func WithTaskRegistry

func WithTaskRegistry(tr tasks.TaskRegistry) RunLoopOption

WithTaskRegistry hands the RunLoop a tasks.TaskRegistry. Required for the PRIORITIZE control event — a PRIORITIZE with no TaskRegistry fails loud (it cannot reach a task). Optional otherwise: the other eight control events do not touch the task registry.

type RunSpec

type RunSpec struct {
	// Planner is the swappable reasoning policy the loop drives. Nil
	// fails loud with ErrNoPlanner.
	Planner planner.Planner
	// Base is the run's RunContext template. RunLoop refreshes the
	// per-step fields (Control, Goal) on a copy each step; the planner
	// receives a fresh RunContext per Next call (Phase 42 contract).
	// Base.Quadruple is the run's identity — its triple is validated
	// identity-mandatory before the loop starts.
	Base planner.RunContext
	// TaskID is the run's task. Optional — when set, a PRIORITIZE
	// control event targets it; when empty, a PRIORITIZE fails loud.
	TaskID tasks.TaskID
	// MaxSteps caps the planner-step count. ≤ 0 ⇒ DefaultMaxSteps.
	MaxSteps int

	// ToolExecutor dispatches the planner's non-Finish, non-RequestPause
	// decisions (CallTool, CallParallel, SpawnTask, AwaitTask). Phase
	// 83i (D-152): when nil, the runloop's default case logs and
	// appends an empty-observation step (the Phase 53 behaviour) so
	// existing pause/steering tests still drive deterministic finishes.
	// In production the dev binary wires a real executor backed by the
	// tool catalog so the planner's CallTool decisions actually run.
	ToolExecutor ToolExecutor

	// Compression is the optional trajectory-compression runner
	// (Phase 111e — D-202; the §13 first call site of
	// planner.CompressionRunner.MaybeCompress). When non-nil AND the
	// run's Base.Budget.TokenBudget > 0, the runloop invokes
	// MaybeCompress at each step boundary (after the control drain +
	// projection, before Planner.Next) so an over-budget trajectory is
	// compacted into Trajectory.Summary BEFORE the next prompt build —
	// the React prompt builder's `Summary != nil` branch then renders
	// the five-field summary instead of the per-step history and the
	// prompt shrinks. Nil (or a zero TokenBudget) is byte-identical to
	// the pre-111e behaviour: no estimate, no summariser, no events.
	//
	// One compression per run at V1.1.x: the runner is idempotent on
	// `Trajectory.Summary != nil` (the documented scope fence — RFC
	// §6.5; re-compaction cadence is the recorded D-202 follow-up).
	// A MaybeCompress error fails the run LOUDLY (the runner already
	// emitted trajectory.compression_failed) — never a silent
	// fall-through that pretends compression happened.
	Compression *planner.CompressionRunner

	// OnToolDispatched is the optional per-run hook the runloop
	// invokes after the ToolExecutor returns WITHOUT ERROR (Phase 83m
	// item 7). The dev binary wires it to
	// `taskReg.IncrementToolCount(ctx, taskID)` so the Console Tasks
	// page reflects the per-task tool-dispatch count. A nil hook is
	// the legacy / test path (no counter wired); a hook that errors
	// fails the run loud — silent degradation of an observability
	// counter is forbidden per §13 (the counter is an integrity
	// surface, not a best-effort log line).
	//
	// The runloop calls the hook for every successful executor
	// dispatch — CallTool today, CallParallel + SpawnTask + AwaitTask
	// once those executor shapes land. A dispatch that the executor
	// reports as failed (the executor's own error path) does NOT
	// invoke the hook; the planner's repair / re-plan flow records
	// the failure on the trajectory and the counter stays put.
	OnToolDispatched func(ctx context.Context) error
}

RunSpec is the per-run input to RunLoop.Run. ALL run-specific state lives here + ctx — never on the RunLoop struct (D-025).

type Scope

type Scope string

Scope is the privilege tier a steering caller presents. It is a trust-based claim in Phase 52 — cryptographic verification arrives with Protocol auth (Phase 61), mirroring the events package's Admin-claim posture (RFC §6.3 + events.Filter.Admin). The Protocol edge derives the Scope from the caller's JWT scope claim before calling CheckScope.

const (
	// ScopeSessionUser — a user authenticated into the session the
	// run belongs to. The weakest steering scope. Sufficient for
	// INJECT_CONTEXT and USER_MESSAGE.
	ScopeSessionUser Scope = "session_user"
	// ScopeOwnerUser — the user who owns the agent / run (the
	// "originating user"). Sufficient for REDIRECT and the
	// originating-user-or-admin controls (CANCEL, PAUSE, RESUME,
	// APPROVE, REJECT).
	ScopeOwnerUser Scope = "owner_user"
	// ScopeAdmin — an administrator. Sufficient for every control
	// type, including PRIORITIZE (admin-only) and any cross-tenant
	// steering.
	ScopeAdmin Scope = "admin"
)

The three canonical caller scopes (RFC §6.3 per-event scope mapping). They form a total order: ScopeSessionUser < ScopeOwnerUser < ScopeAdmin. A caller presenting a higher scope satisfies any requirement a lower scope satisfies.

func RequiredScope

func RequiredScope(t ControlType) (Scope, bool)

RequiredScope returns the minimum caller Scope that may submit a control of type t (RFC §6.3). The bool is false when t is not a canonical control type.

type ToolExecutor

type ToolExecutor interface {
	// ExecuteDecision dispatches `decision` and returns BOTH the raw
	// observation (preserved for inspect-runs / audit) AND the
	// projection the next prompt sees (`llmObservation`, the D-026
	// heavy-content-discipline projection: a small summary +
	// ArtifactRef when the raw result is over the heavy threshold,
	// or just == raw when the result is small enough to inline).
	//
	// The runloop appends a trajectory.Step{Action: decision,
	// Observation: raw, LLMObservation: projection} so the planner's
	// renderer (Phase 46 / D-055) sees only the projection.
	//
	// `rc` is the per-step RunContext (identity, ToolContext, etc.).
	// ctx is the per-step ctx; the executor MUST honour cancellation.
	ExecuteDecision(ctx context.Context, rc planner.RunContext, decision planner.Decision) (observation, llmObservation any, err error)
}

ToolExecutor is the runtime-side dispatch surface the RunLoop calls when the planner returns a non-Finish, non-RequestPause decision (CallTool, CallParallel, SpawnTask, AwaitTask). The executor:

  • Looks up the tool descriptor by name.
  • Invokes it under the run's identity-scoped ctx.
  • Returns a planner-readable observation (the runtime appends it onto trajectory.Step.Observation for the planner's next step).

Phase 83i (D-152) introduces this seam so the dev binary can wire a real `tools.ToolCatalog`-backed executor; before 83i the runloop's `default:` case dropped every CallTool on the floor (Phase 53's punted scope), which made multi-step ReAct structurally broken against real LLMs because the planner saw the same trajectory on every step.

An executor that does not support a given decision shape returns ErrDecisionShapeUnsupported with a message naming the unsupported shape — the runloop surfaces this as the step's observation so the planner can choose a different path (repair, finish, alternative tool).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL