Documentation
¶
Overview ¶
Package engine is Harbor's typed, async, queue-backed graph executor — the runtime kernel every other phase sits on. Phase 10 shipped the Engine interface, the worker loop (one goroutine per node), bounded per-adjacency channels (default 64), the always-on egress dispatcher (RunID demux), cycle detection at construction, and Run / Stop / Emit / EmitTo / Fetch.
Phase 11 layered the reliability shell on top (NodePolicy, RunError); Phase 12 added streaming (StreamFrame, EmitChunk) + per-run capacity backpressure; Phase 13 lands Cancel(runID) + FetchByRun (replacing Phase 10's stubs) plus engine-Cancel mirroring into Phase 14's Subflow; Phase 14 adds routers, concurrency utilities, and Subflow. None of those phases change this surface — they extend it.
Concurrent reuse contract (D-025): a compiled *engine is reusable across goroutines after Run starts. Per-run state lives in the dispatcher's subqueues + the worker stacks; never on the engine struct itself. The N=100 reuse test pins this — see concurrent_test.go.
Index ¶
- Constants
- Variables
- type Adjacency
- type EmitOption
- type Engine
- type FetchOption
- type Node
- type NodeContext
- func (nctx *NodeContext) CallSubflow(ctx context.Context, factory SubflowFactory, parentEnv messages.Envelope) (messages.Envelope, error)
- func (nctx *NodeContext) Emit(ctx context.Context, env messages.Envelope) error
- func (nctx *NodeContext) EmitChunk(ctx context.Context, frame StreamFrame) error
- func (nctx *NodeContext) EmitNoWait(ctx context.Context, env messages.Envelope) error
- type NodeFunc
- type NodePolicy
- type NodeRef
- type Option
- func WithCancelTTL(d time.Duration) Option
- func WithChannelOverride(from, to NodeRef, n int) Option
- func WithErrorEmissionToEgress(enabled bool) Option
- func WithEventBus(b events.EventBus) Option
- func WithQueueSize(n int) Option
- func WithRunCancelledHandler(h RunCancelledHandler) Option
- func WithRunErrorHandler(h RunErrorHandler) Option
- type RunCancelledHandler
- type RunCancelledNotice
- type RunError
- type RunErrorCode
- type RunErrorHandler
- type StreamFrame
- type SubflowFactory
- type ValidateMode
Constants ¶
const DefaultCancelTTL = 60 * time.Second
DefaultCancelTTL is how long the engine remembers a run's cancellation flag after Cancel. Default is 60s — generous enough for an operator who pre-computes a RunID and calls Cancel before Emit (covering legitimate "abort an inflight client" flows).
const DefaultQueueSize = 64
DefaultQueueSize is the bounded per-adjacency channel capacity when no override is configured. RFC §6.1 settles the value at 64 (resolves brief 01 Q-4). Engine-wide override via WithQueueSize(n); per-channel override via WithChannelOverride(from, to, n).
Variables ¶
var ( // ErrCycleDetected — engine.New detected an unintended cycle in // the adjacency graph. The wrapped message includes the cycle // path (e.g. "A -> B -> C -> A"). Per-node AllowCycle: true opts // the node out of the detector for legitimate self-loops or // controller-loop graphs. ErrCycleDetected = errors.New("engine: cycle detected without AllowCycle") // ErrIdentityRequired — Emit was called with an Envelope whose // identity triple (TenantID, UserID, SessionID) is incomplete. // Wraps identity.ErrIdentityIncomplete so callers can errors.Is // against either sentinel. ErrIdentityRequired = errors.New("engine: Emit requires non-empty identity triple") // ErrChannelFull — EmitNoWait found the outgoing channel // saturated. Use Emit for blocking semantics with backpressure. ErrChannelFull = errors.New("engine: channel full (use Emit for blocking semantics)") // ErrEngineStopped — operation attempted on an engine whose Stop // has been called (or whose internal context was cancelled). The // engine never resumes after Stop; callers must construct a new // engine. ErrEngineStopped = errors.New("engine: stopped") // ErrInvalidQueueSize — WithQueueSize / WithChannelOverride // received n <= 0. Returned from New. ErrInvalidQueueSize = errors.New("engine: queue size must be > 0") // ErrNodeNotFound — a NodeRef referenced a name that doesn't // exist in the engine's adjacency set. ErrNodeNotFound = errors.New("engine: node not found") // ErrNotImplemented — a method that lands in a later phase was // called. Reserved for forward stubs; no current method returns // this (Phase 10's Cancel + FetchByRun stubs were filled in Phase // 13). Callers that errors.Is on this can detect "this surface // isn't ready yet" without crashing. ErrNotImplemented = errors.New("engine: not implemented in this phase") // ErrDuplicateNodeName — two adjacencies referenced different // nodes with the same Name. The engine's worker map is keyed by // Name; duplicates are a build mis-configuration. ErrDuplicateNodeName = errors.New("engine: duplicate node name") // ErrDeadlineExceeded — the worker observed an Envelope whose // DeadlineAt has passed before it could invoke the node. Phase 11 // will promote this to a structured RunError; Phase 10 returns // the typed sentinel directly. ErrDeadlineExceeded = errors.New("engine: envelope deadline exceeded") // ErrEmptyAdjacencies — New was called with an empty adjacency // list. Engines must have at least one node to do useful work. ErrEmptyAdjacencies = errors.New("engine: adjacencies must be non-empty") )
Sentinel errors. Callers compare via errors.Is.
var ( // ErrSeqProvided — caller pre-filled StreamFrame.Seq. The engine // owns sequence assignment so callers can't accidentally desync // the per-StreamID monotonic invariant. ErrSeqProvided = errors.New("engine: caller pre-filled StreamFrame.Seq; engine owns sequencing") // ErrStreamClosed — EmitChunk called for a StreamID whose Done // frame already drained. The dispatcher deletes the StreamID's // Seq counter on Done; subsequent calls fail loud rather than // silently re-opening the stream. ErrStreamClosed = errors.New("engine: stream closed (Done frame already drained)") // ErrEmptyRunID — EmitChunk called from a NodeFunc that was // invoked on an envelope without a RunID. Stream backpressure is // keyed by RunID; an empty RunID would collapse all anonymous // streams onto one bucket. ErrEmptyRunID = errors.New("engine: EmitChunk requires a non-empty RunID on the originating envelope") )
Sentinel errors specific to streaming. Phase 10 sentinels (e.g. ErrEngineStopped, ErrIdentityRequired) cover engine-wide failures; these cover stream-shape-specific ones.
var ErrConcurrentFetchByRun = errors.New("engine: only one FetchByRun in flight per run")
ErrConcurrentFetchByRun — two goroutines called FetchByRun for the same RunID at the same time. Per brief 01 §5 ("no half-measure"), the dispatcher's per-run subqueue has a single consumer; concurrent fetchers fight for ordering and the API forbids the contention rather than serializing under the hood.
var ErrRunCancelled = errors.New("engine: run cancelled")
ErrRunCancelled — operation observed a cancelled run. Returned by Cancel-aware code paths (worker loop between iterations, reliability shell between retries, EmitChunk capacity-waiter, FetchByRun, Emit for runs whose cancellation flag is still within the TTL).
Wraps via fmt.Errorf("...: %w", ErrRunCancelled, ...) so callers can errors.Is against this sentinel AND read the structured RunError on the same chain.
var ErrSubflowFactoryFailed = errors.New("engine: subflow factory failed")
ErrSubflowFactoryFailed — the factory passed to CallSubflow returned a non-nil error. The factory result is not used; the caller's err chain wraps the original.
Functions ¶
This section is empty.
Types ¶
type Adjacency ¶
Adjacency is a (From Node, To []Node) pair the engine consumes to allocate channels. The full set of adjacencies forms the runtime DAG (with cycle opt-in per node). New validates the set:
- every To node must appear as a From in some adjacency OR be terminal (no children — i.e. an Outlet);
- no two adjacencies share a Node.Name;
- the graph contains no cycle unless the cycle's nodes have AllowCycle: true.
type EmitOption ¶
type EmitOption func(*emitOptions)
EmitOption is the Emit-time option type. Phase 12 added WithRunCapacity for per-run streaming backpressure overrides.
func WithRunCapacity ¶
func WithRunCapacity(n int) EmitOption
WithRunCapacity overrides the default per-run capacity for the run initiated by this Emit. Pass to Engine.Emit at run start. Default is the originating node's Policy.RunCapacity (when > 0) falling back to the engine's DefaultQueueSize (64).
The override is recorded under the envelope's RunID and consulted by the first EmitChunk for that run. Useful for tighter streaming budgets on cost-sensitive runs (e.g. a 16-frame cap for a chat run vs the default 64 for a batch).
n must be > 0; non-positive values are ignored (the engine falls back to the next resolution step).
type Engine ¶
type Engine interface {
Emit(ctx context.Context, env messages.Envelope, opts ...EmitOption) error
EmitTo(ctx context.Context, env messages.Envelope, target NodeRef) error
Fetch(ctx context.Context, opts ...FetchOption) (messages.Envelope, error)
FetchByRun(ctx context.Context, runID string) (messages.Envelope, error)
Cancel(ctx context.Context, runID string) (bool, error)
Run(ctx context.Context) error
Stop(ctx context.Context) error
// Topology builds a canonical types.TopologyProjection of the
// engine's static node graph + live per-edge queue depth (Phase 74
// / D-114). Identity-mandatory; pure read; safe for N concurrent
// callers (D-025). See topology.go.
Topology(ctx context.Context) (types.TopologyProjection, error)
}
Engine is the runtime container — the typed, async, queue-backed graph executor. One concrete implementation in V1 (the in-memory engine); future remote-engine drivers (post-V1) plug behind the same interface via the §4.4 seam pattern.
Lifecycle: New constructs the engine + validates the adjacency set (cycle detection, duplicate-name check, queue-size validation); Run starts one goroutine per node + the dispatcher; Stop joins them all under a deadline. Emit lands at the engine's inlet channel(s); Fetch drains the dispatcher's any-run subqueue.
func New ¶
New constructs an Engine from a list of adjacencies + options. Cycle detection runs at construction; per-node AllowCycle opts out for legitimate self-loops.
Validation errors: ErrEmptyAdjacencies, ErrDuplicateNodeName, ErrCycleDetected, ErrInvalidQueueSize, ErrNodeNotFound (when a channel override targets an unknown node).
type FetchOption ¶
type FetchOption func(*fetchOptions)
FetchOption is the Fetch-time option type. Phase 13 will add per-run filtering via FetchByRun (a dedicated method, not an option), but the type exists today for fetch-side knobs that may land later (e.g. FetchTimeout).
type Node ¶
type Node struct {
// Name is the unique identifier for the node within an engine.
// New rejects duplicates with ErrDuplicateNodeName.
Name string
// Func is invoked by the worker loop on each incoming envelope.
// nil Func is rejected at New time.
Func NodeFunc
// Policy controls validation, timeout, retry, and backoff for
// each invocation of Func. Zero value = "no policy" (single
// invocation, no timeout, no retry — Phase 10's bare worker
// behavior). See policy.go for fields.
Policy NodePolicy
// AllowCycle opts this node out of the cycle detector. Set true
// for legitimate self-loop or controller-loop graphs (e.g. a
// planner node that emits to itself for the next reasoning step).
AllowCycle bool
}
Node wraps a typed async function with reliability policy + cycle opt-in. Phase 10 shipped the shape; Phase 11 fills NodePolicy with real fields (Validate / Timeout / Retry / Backoff) and the worker loop now applies them via the reliability shell.
type NodeContext ¶
type NodeContext struct {
// contains filtered or unexported fields
}
NodeContext is the per-invocation handle the worker passes to the NodeFunc. Carries the engine reference so the function can Emit / EmitNoWait / EmitChunk / Fetch through the same channel mechanic as external callers. CallSubflow (Phase 14) hangs off this same type.
lastEnv records the incoming envelope for the current invocation so per-invocation operations (EmitChunk's identity propagation, future pause-resume hooks) can see the originating run's quadruple without requiring callers to thread it through manually. The worker loop sets this before invoking Func.
NodeContext is constructed by the worker; callers must not build one directly. The struct's internal fields are unexported.
func (*NodeContext) CallSubflow ¶
func (nctx *NodeContext) CallSubflow(ctx context.Context, factory SubflowFactory, parentEnv messages.Envelope) (messages.Envelope, error)
CallSubflow constructs a child engine via factory, runs it under the parent's RunID, mirrors the parent's ctx cancellation AND the parent's Engine.Cancel(parentRunID) into the child, returns the first egress envelope, then Stops the child.
**Cancellation scope (Phase 13/14):** two paths cooperate.
- ctx propagation: childCtx is derived from ctx via WithCancel, so a parent ctx cancel terminates the child immediately.
- Engine.Cancel mirroring: the parent engine fires registered observers when Engine.Cancel(parentRunID) lands. CallSubflow installs an observer that calls child.Cancel(parentRunID), so a steering-side cancel from the parent's Protocol surface reaches the child without requiring the parent's worker ctx to die.
Multi-result subflows compose via concurrency.MapConcurrent over a list of factories; CallSubflow itself returns exactly one envelope.
Cleanup ordering on success: drain first egress → deregister cancel observer → cancel watcher ctx → child.Stop. On factory error or child.Run failure, the child's Stop is still invoked (defer) so no goroutines leak.
Identity propagation: the parent envelope (parentEnv) carries the quadruple. The child engine sees it via the inbound envelope on its inlet; no separate identity copy is needed because Envelope is the identity carrier (RFC §6.1).
func (*NodeContext) Emit ¶
Emit sends env down the node's outgoing channel(s). Blocks if any outgoing channel is full — this is the backpressure path. Phase 12 will hook capacity waiters here so per-run streaming doesn't deadlock against shared bounded queues.
When the node has multiple outgoing edges (fan-out), Emit copies env to each edge in adjacency order. A single full channel pauses the entire emit until that channel drains.
func (*NodeContext) EmitChunk ¶
func (nctx *NodeContext) EmitChunk(ctx context.Context, frame StreamFrame) error
EmitChunk emits a stream frame. Blocks when the originating run's pending-frame count has reached its RunCapacity (default = the engine's DefaultQueueSize, 64). The block is per-run, never per- engine — a single run's saturation does not pause other runs (this is the deadlock-prevention guarantee from brief 01 §4).
The frame is wrapped in an Envelope whose Payload is the StreamFrame and whose identity inherits from the originating NodeFunc's incoming envelope (the worker passes that ctx to EmitChunk's caller; the NodeContext.lastEnv field carries the run's identity).
Done: true marks the terminal frame for the StreamID. After a Done frame drains, subsequent EmitChunk for that StreamID returns ErrStreamClosed.
Failure modes:
- ErrSeqProvided: caller pre-filled frame.Seq.
- ErrEmptyRunID: the originating envelope had no RunID.
- ErrStreamClosed: the StreamID was previously Done-terminated.
- ErrEngineStopped: Stop fired while waiting.
- ctx.Err(): the caller's ctx cancelled while waiting.
func (*NodeContext) EmitNoWait ¶
EmitNoWait is the non-blocking variant. Returns ErrChannelFull immediately if any outgoing channel is saturated. Callers that want backpressure should use Emit; callers that want to drop rather than wait use EmitNoWait.
`ctx` carries identity (D-001) into the downstream emit path and honours the caller's cancellation. The send itself is non-blocking; ctx is read for identity propagation + early-exit on a cancelled run, NOT to wait for channel capacity.
type NodeFunc ¶
type NodeFunc func(ctx context.Context, in messages.Envelope, nctx *NodeContext) (messages.Envelope, error)
NodeFunc is the unit of computation a Node wraps. It receives the incoming envelope plus a per-invocation NodeContext (engine handle scoped to the running node) and returns the outgoing envelope.
Returning a nil envelope WITH nil error means "no emission on this hop" — the worker drops the result and waits for the next ingress envelope. Returning a non-nil error logs through the audit-redacted logger (Phase 04 wiring) and continues; Phase 11 promotes errors to the structured RunError envelope.
type NodePolicy ¶
type NodePolicy struct {
// Validate selects which side(s) of the invocation the worker
// runs ValidateFunc on. Zero value is ValidateNone (no validation
// — matches the Phase 10 bare-worker behavior).
Validate ValidateMode
// TimeoutMS is the per-invocation deadline in milliseconds. 0
// means "no timeout" — the worker invokes Func with the engine's
// ctx unchanged. > 0 wraps each invocation in
// context.WithTimeout(ctx, time.Duration(TimeoutMS) * Millisecond).
TimeoutMS int
// MaxRetries is the count of retry attempts AFTER the initial
// invocation. 0 means "no retries" (the node runs exactly once on
// failure). Total invocations = MaxRetries + 1.
MaxRetries int
// BackoffBase is the first-retry sleep before retry attempt 1.
// Subsequent retries multiply by BackoffMult, capped at
// MaxBackoff. 0 means no sleep between retries.
BackoffBase time.Duration
// BackoffMult is the multiplier between successive retry sleeps.
// 0 or 1 means "no growth" (linear retries at BackoffBase). 2 is
// the canonical exponential value.
BackoffMult float64
// MaxBackoff caps the per-retry sleep regardless of BackoffMult.
// 0 means no cap.
MaxBackoff time.Duration
// ValidateFunc is the function pointer the worker calls on input
// (Validate=Both/In) and/or output (Validate=Both/Out) envelopes.
// nil with Validate != ValidateNone is a no-op (the worker treats
// it as "no validator configured" — fail-loud at construction
// time is the engine's responsibility, not the shell's; Phase 10
// could harden this if needed).
ValidateFunc func(messages.Envelope) error
// RunCapacity (Phase 12) caps the number of pending stream frames
// (EmitChunk-emitted) for a run originating at this node before
// EmitChunk blocks. 0 means "use the engine's DefaultQueueSize"
// (64). Per-run override via WithRunCapacity on the originating
// Engine.Emit takes precedence over this field.
//
// Backpressure is per-run, not per-engine: one run's saturation
// never pauses other runs (this is the deadlock-prevention
// guarantee from brief 01 §4).
RunCapacity int
}
NodePolicy controls per-node reliability semantics. Zero value is "no policy" — Phase 10's bare worker behavior (single invocation, no timeout, no retry, no validation). Construct explicitly for production nodes; the engine never silently applies defaults (per AGENTS.md §5 "Fail loudly").
Concurrent-reuse safe: NodePolicy is a value type. ValidateFunc is a function pointer; the function itself must be safe for concurrent invocation across the same Node.
type NodeRef ¶
type NodeRef struct {
Name string
}
NodeRef identifies a node by name. Used for per-channel queue overrides and EmitTo's explicit-target form.
type Option ¶
type Option func(*engineConfig)
Option configures an engine at construction.
func WithCancelTTL ¶
WithCancelTTL overrides DefaultCancelTTL. Must be > 0; non-positive values silently fall back to the default. The TTL applies to the cancellation flag: an Emit landing within the TTL of a Cancel for the same RunID is rejected with ErrRunCancelled.
func WithChannelOverride ¶
WithChannelOverride sets a per-channel queue size for the (from -> to) edge. Wins over WithQueueSize for that specific edge.
func WithErrorEmissionToEgress ¶
WithErrorEmissionToEgress toggles whether internal worker errors (Phase 11's RunError) ALSO land on the egress channel as a special error-shaped envelope. Default is false: errors go to Phase 04's logger + Phase 05's bus (via the configured RunErrorHandler) only.
Operators who want to consume errors via Fetch (instead of via the bus) opt in here. The egress envelope's Payload is the *RunError; callers Fetch and type-assert.
func WithEventBus ¶
WithEventBus wires the canonical events.EventBus the engine publishes its construction-time `topology.changed` event onto (Phase 74 / D-114). The event carries the engine's initial TopologyProjection so a Protocol consumer that subscribed before the engine was built catches the graph the moment it exists.
The option is additive: an engine constructed WITHOUT WithEventBus (the Phase 02 default) publishes nothing — every existing engine test sees zero behavioural change and Phase 02 callers gain no new mandatory dependency. A nil bus passed to WithEventBus is treated as "WithEventBus not supplied".
When a bus IS supplied and it rejects the construction-time event, New fails loud (CLAUDE.md §5) rather than building an engine whose topology surface silently never reached the bus.
func WithQueueSize ¶
WithQueueSize overrides the engine-wide bounded per-adjacency channel capacity (default DefaultQueueSize). n must be > 0; New returns ErrInvalidQueueSize otherwise.
func WithRunCancelledHandler ¶
func WithRunCancelledHandler(h RunCancelledHandler) Option
WithRunCancelledHandler installs the callback the engine fires after Cancel(runID) observed an active run. The seam for an engine-hosting assembly to translate the notice to a runtime.run_cancelled bus event (no production assembly installs THIS handler today, unlike the run-error handler — see WithRunErrorHandler); tests use a recording callback. Phase 13.
func WithRunErrorHandler ¶
func WithRunErrorHandler(h RunErrorHandler) Option
WithRunErrorHandler installs the callback the engine fires on terminal node failure. The production handler (Phase 111f, D-203) is a callback that invokes telemetry.Logger.Error so the wave-2 BusEmitter adapter publishes a runtime.error event: `assemble.Assemble` builds it as `Stack.RunErrorHandler`. The composition site is the EMBEDDER's flow composition — `flow.WithRunErrorHandler(stack.RunErrorHandler)`, per docs/recipes/observe-an-embedded-runtime.md; the shipped binary registers flow Definitions without composing an engine, so it has no in-binary call site today. Tests install recording callbacks to assert the structured RunError shape.
When unset, the engine logs the failure via its slog.Logger only (Phase 10 behavior). The handler is invoked AFTER the slog log so both paths see the failure regardless of the handler's outcome.
type RunCancelledHandler ¶
type RunCancelledHandler func(ctx context.Context, n RunCancelledNotice)
RunCancelledHandler is the callback the engine fires after a Cancel observed an active run. Same hook pattern as RunErrorHandler: the seam exists for assemblies that host an engine graph to publish a runtime.run_cancelled event on the bus. No production assembly installs one today — `harbor dev` is planner/RunLoop-shaped and boots no engine graph (see docs/notes/sdk-friction-audit.md §3). Tests install recording callbacks.
Best-effort: a panic is recovered; bus errors must not block Cancel from returning.
type RunCancelledNotice ¶
RunCancelledNotice is the payload the engine hands to a RunCancelledHandler after Cancel(runID). The handler is the seam production wiring uses to publish runtime.run_cancelled on the bus without the engine importing the events package.
CancelledAt is wall-clock; DroppedEnvelopeCount counts the envelopes Cancel drained from channels (step 2 of the four-step propagation). Useful for operators measuring "how loaded was the cancelled run."
type RunError ¶
type RunError struct {
// RunID identifies the run the failed envelope belonged to.
// Empty when the envelope had no RunID (Phase 10 allows that;
// Phase 13 will tighten when FetchByRun arrives).
RunID string
// TenantID, UserID, SessionID complete the identity triple from
// the failing envelope. Used by audit subscribers + slog
// attribute set.
TenantID string
UserID string
SessionID string
// NodeName is the unique node identifier within the engine.
NodeName string
// NodeID is reserved for future stable runtime identifiers; for
// Phase 11 it mirrors NodeName (engines are single-process and
// the Name is the stable id).
NodeID string
// Code categorises the failure. See RunErrorCode constants.
Code RunErrorCode
// Message is a short human-readable summary. The redactor sees
// it via the bus emit path; do NOT include raw tool args or
// secrets.
Message string
// Cause is the wrapped underlying error (if any). errors.Unwrap
// follows this; deeper chains compose via the underlying error's
// Unwrap method.
Cause error
// Metadata carries policy-relevant context (e.g. attempt count,
// timeout in ms, validate side). Bounded; free-form.
Metadata map[string]any
}
RunError is the structured error envelope emitted on terminal node failure. Carries the full identity quadruple via RunID + the per-invocation context (NodeName, NodeID). Cause carries one level of wrapping; deeper chains use errors.Unwrap on Cause.
Identity propagation: every RunError carries the failing envelope's (TenantID, UserID, SessionID, RunID) so audit logs and bus subscribers can scope by the multi-isolation triple.
func (*RunError) Error ¶
Error implements the error interface. Format includes the code + node name + message; downstream consumers should errors.As to a *RunError to read structured fields.
type RunErrorCode ¶
type RunErrorCode string
RunErrorCode categorises the failure mode. Stable across Harbor's runtime — downstream consumers (audit subscribers, Console) match against these constants.
const ( // CodeNodeTimeout — the node's invocation exceeded NodePolicy.TimeoutMS. CodeNodeTimeout RunErrorCode = "node_timeout" // CodeNodeException — the node's Func returned a non-nil error // or panicked. The most common terminal-failure code. CodeNodeException RunErrorCode = "node_exception" // CodeRunCancelled — the engine's context (or the per-run cancel // flag, Phase 13) was triggered before invocation completed. CodeRunCancelled RunErrorCode = "run_cancelled" // CodeDeadlineExceeded — the envelope's wall-clock DeadlineAt // expired before the worker invoked the node. Distinct from // CodeNodeTimeout (per-invocation) — DeadlineExceeded is per- // envelope. CodeDeadlineExceeded RunErrorCode = "deadline_exceeded" // CodeValidationFailed — NodePolicy.ValidateFunc rejected the // input or output envelope. CodeValidationFailed RunErrorCode = "validation_failed" )
type RunErrorHandler ¶
RunErrorHandler is the callback the engine fires on terminal node failure. The engine's default (when no handler is configured) is a no-op AFTER the engine's slog.Logger has logged the failure — the handler is the seam production wiring uses to route the structured RunError to Phase 04's *telemetry.Logger.Error so the wave-2 BusEmitter adapter publishes a runtime.error event.
Decoupled by design: the engine package does not import internal/telemetry. Callers who want bus-side runtime.error events pass a callback that invokes Logger.Error with the RunError's structured attrs.
type StreamFrame ¶
StreamFrame is a chunked payload tied to a parent run. StreamID defaults to RunID; sub-streams within a run use a custom StreamID. Seq is monotonic per StreamID and is engine-assigned (callers must NOT pre-fill — the engine rejects with ErrSeqProvided).
Per-stream order is preserved as long as a single goroutine emits per StreamID — the dispatcher's per-run subqueue is FIFO. Two goroutines emitting on the same StreamID concurrently will interleave (the engine has no per-StreamID lock); operators who need cross-goroutine ordering must serialize the emit themselves.
type SubflowFactory ¶
SubflowFactory returns a fresh child Engine per call. Caller never reuses subflow engines; cheap construction is the contract per brief 01 §5 ("a subflow is a freshly-built engine that runs to completion for one parent envelope, then Stops").
type ValidateMode ¶
type ValidateMode string
ValidateMode selects which side of a node invocation the worker passes through ValidateFunc.
const ( // ValidateNone disables validation. The perf escape hatch for hot // streaming paths (Phase 12 will lean on this for stream nodes). // Zero value of ValidateMode. ValidateNone ValidateMode = "" // ValidateBoth runs ValidateFunc on the input AND output envelope. // The default for production nodes — fail-loud per CLAUDE.md §5. ValidateBoth ValidateMode = "both" // ValidateIn runs ValidateFunc on the input only. ValidateIn ValidateMode = "in" // ValidateOut runs ValidateFunc on the output only. ValidateOut ValidateMode = "out" )