Documentation
¶
Overview ¶
Package engine defines the foundational primitives shared by every local execution engine in FlowCraft (e.g. sdk/graph DAG executor, future script-based or native-Go executors).
Position in the layering ¶
engine sits below sdk/agent and below any concrete engine implementation. It is deliberately ignorant of agent-level concepts: there is no Agent, no Memory, no Request/Result, no chat-specific Var conventions in this package. Anything that knows about "agents", "messages" or "memory" belongs in sdk/agent or higher.
Allowed dependencies:
- sdk/event (for Envelope only; engine does NOT use Bus)
- sdk/errdefs (for the interrupted-error classification)
- sdk/model (for Message in Board's typed channels and Part in user-prompt payloads)
- standard library
engine MUST NOT import sdk/agent, sdk/agent/strategy, sdk/graph, sdk/script, sdk/history, sdk/recall, sdk/llm, sdk/tool, sdk/workflow.
The contract at a glance ¶
An engine receives three things at run time:
Execute(ctx, run Run, host Host, board *Board) (*Board, error)
- run — read-only metadata (ID, Attributes, Deps);
- host — capabilities the engine may invoke (Publish events,
listen for Interrupts, AskUser, Checkpoint);
- board — shared blackboard the engine mutates as it runs.
The Host interface is a *composition* of small interfaces:
type Host interface {
Publisher // Publish(ctx, env) error
Interrupter // Interrupts() <-chan Interrupt
UserPrompter // AskUser(ctx, prompt) (UserReply, error)
Checkpointer // Checkpoint(ctx, cp) error
UsageReporter // ReportUsage(ctx, usage)
}
Downstream code (graph nodes, tools, …) should depend on the smallest interface it actually needs (Publisher alone is the common case) rather than the full Host. This keeps node signatures honest about their requirements.
What lives here ¶
Board / BoardSnapshot / Cloneable — shared blackboard state and typed message channels. Any engine that wants a key/value store and ordered message lists reuses these.
Run — per-execution input bundle (ID, Attributes, Deps, ResumeFrom) as a plain data struct. Setting Run.ResumeFrom is how the host requests a resume; the engine interprets the opaque [Checkpoint.Step] / [Checkpoint.Payload] it produced earlier.
Host and the five small interfaces it composes — the surface the engine uses to interact with its host runtime.
Interrupt + Cause + InterruptedError — cooperative-stop primitive. Engines select on Host.Interrupts(); they convert a received Interrupt into an error via Interrupted, which satisfies errdefs.IsInterrupted and carries the Cause for the host to inspect via errors.As.
UserPrompt / UserReply — engine-agnostic, multi-modal (model.Part) prompt/response payloads for input-required steps.
Checkpoint + CheckpointStore — engine-agnostic persistence contract for resumable execution. Each engine decides what its own Step / Payload look like.
Engine — uniform Execute interface (and EngineFunc adapter) so the agent layer can drive any engine through a single shape.
NoopHost / NoopCheckpointStore — zero-cost stand-ins for tests and embedded scenarios.
Subject schema (subjects.go) — the cross-engine event-routing convention every implementation MUST follow when publishing run lifecycle, step lifecycle, and stream-delta envelopes. Public Subject* / Pattern* builders, the StreamDeltaPayload decode contract, and SanitiseID live here so consumers (voice, SSE bridges, dashboards) can route on subject without importing any concrete engine.
What does NOT live here ¶
- StreamCallback / StreamEvent — replaced by Publisher + event.Envelope.
- Memory / MemorySession — that is a sdk/history + sdk/recall concern at the agent layer.
- Strategy / Runnable / Disposition / ResumeToken — those are agent ↔ engine adapter contracts and live in sdk/agent and sdk/agent/strategy.
- VarMessages / VarQuery / VarAnswer — chat conventions that belong to sdk/agent.
- Engine kind enumeration — engine does not reserve a "type" namespace or list which engines exist; routing on subject is the only cross-engine identification mechanism.
See docs/agent-runtime-redesign.md for the full layering rationale.
Index ¶
- Constants
- func GetDep[T any](d *Dependencies, key any) (T, error)
- func GetTyped[T any](b *Board, key string) (T, bool)
- func Interrupted(intr Interrupt) error
- func IsStreamDelta(s event.Subject) bool
- func MustGetDep[T any](d *Dependencies, key any) T
- func PatternAllRuns() event.Pattern
- func PatternRun(runID string) event.Pattern
- func PatternRunSteps(runID string) event.Pattern
- func PatternRunStream(runID string) event.Pattern
- func SanitiseID(id string) string
- func SubjectRunEnd(runID string) event.Subject
- func SubjectRunStart(runID string) event.Subject
- func SubjectStepComplete(runID, actorID string) event.Subject
- func SubjectStepError(runID, actorID string) event.Subject
- func SubjectStepStart(runID, actorID string) event.Subject
- func SubjectStreamDelta(runID, actorID string) event.Subject
- type Board
- func (b *Board) AppendChannelMessage(name string, msg model.Message)
- func (b *Board) AppendSliceVar(key string, value any) error
- func (b *Board) Channel(name string) []model.Message
- func (b *Board) ChannelsCopy() map[string][]model.Message
- func (b *Board) GetVar(key string) (any, bool)
- func (b *Board) GetVarString(key string) string
- func (b *Board) RestoreFrom(snap *BoardSnapshot)
- func (b *Board) SetChannel(name string, msgs []model.Message)
- func (b *Board) SetVar(key string, value any)
- func (b *Board) Snapshot() *BoardSnapshot
- func (b *Board) UpdateSliceVarItem(key string, match func(any) bool, update func(any) any)
- func (b *Board) Vars() map[string]any
- type BoardSnapshot
- type Cause
- type Checkpoint
- type CheckpointDeleter
- type CheckpointLister
- type CheckpointStore
- type Checkpointer
- type Cloneable
- type Dependencies
- type Engine
- type EngineFunc
- type Host
- type Interrupt
- type InterruptedError
- type Interrupter
- type NoopCheckpointStore
- type NoopHost
- func (NoopHost) AskUser(context.Context, UserPrompt) (UserReply, error)
- func (NoopHost) Checkpoint(context.Context, Checkpoint) error
- func (NoopHost) Interrupts() <-chan Interrupt
- func (NoopHost) Publish(context.Context, event.Envelope) error
- func (NoopHost) ReportUsage(context.Context, model.TokenUsage)
- type Publisher
- type Run
- type StreamDeltaPayload
- type StreamDeltaType
- type UsageReporter
- type UserPrompt
- type UserPrompter
- type UserReply
Constants ¶
const MainChannel = ""
MainChannel is the default message channel key (empty string).
Channels are an engine-level primitive: they let nodes/steps share ordered message sequences without going through Vars. Convention-level keys for "the chat transcript", "the answer", etc. belong to the agent layer; this package only provides the channel mechanism.
const SubjectPrefix = "engine.run."
SubjectPrefix is the fixed root every engine envelope subject MUST start with. Exposed as a constant so consumers can check strings.HasPrefix without re-deriving it.
Variables ¶
This section is empty.
Functions ¶
func GetDep ¶
func GetDep[T any](d *Dependencies, key any) (T, error)
GetDep is a generic helper that retrieves a typed dependency. It returns an error when the key is missing or when the stored value is not assignable to T, so callers can surface configuration mistakes early instead of failing with a nil-pointer panic deep inside an engine.
func GetTyped ¶
GetTyped retrieves a typed value from the Board's vars. It is a generic helper rather than a method because Go does not allow type parameters on methods.
func Interrupted ¶
Interrupted wraps an Interrupt as an error that satisfies errdefs.IsInterrupted. The recommended usage from an engine is:
case intr := <-h.Interrupts():
return engine.Interrupted(intr)
Hosts inspecting the result use the standard errdefs / errors.As idiom:
if errdefs.IsInterrupted(err) {
var ie engine.InterruptedError
if errors.As(err, &ie) {
switch ie.Cause {
case engine.CauseUserInput: ...
}
}
}
A zero-value Interrupt still produces a well-formed error so callers don't need to special-case CauseUnknown.
func IsStreamDelta ¶
IsStreamDelta reports whether s is a stream-delta subject. Cheap (string-only) so consumers can filter envelopes before the more expensive payload decode.
Implementation note: matches subjects shaped like "engine.run.<runID>.stream.<actorID>.delta" — i.e. the prefix is SubjectPrefix, contains ".stream." and ends with ".delta".
func MustGetDep ¶
func MustGetDep[T any](d *Dependencies, key any) T
MustGetDep is like GetDep but panics on error. Use it only inside engine internals where a missing dependency is a programming bug (e.g. a node referenced a dep that the host did not register).
func PatternAllRuns ¶
PatternAllRuns returns the wildcard pattern matching every event from every run.
engine.run.>
func PatternRun ¶
PatternRun returns the wildcard pattern matching every event of one run, regardless of engine implementation or engine-private extension.
engine.run.<runID>.>
func PatternRunSteps ¶
PatternRunSteps returns the wildcard pattern matching every step lifecycle event (start / complete / error and any engine-private step.* extension such as graph runner's "skipped") of one run.
engine.run.<runID>.step.>
func PatternRunStream ¶
PatternRunStream returns the wildcard pattern matching every stream delta of one run. Use this when you want LLM token / tool deltas but not the step lifecycle events.
engine.run.<runID>.stream.>
func SanitiseID ¶
SanitiseID escapes characters that would corrupt an event.Subject when the input is concatenated into one. event.Subject segments are separated by '.', and '*' / '>' are reserved by event.Pattern for wildcards; any of these in a runID / actorID would either fragment the subject or turn it into an unintended pattern. SanitiseID replaces each occurrence with '_'.
Empty input becomes "_" so the resulting subject keeps a constant segment count even when the engine forgot to mint an id.
Engines are expected to call SanitiseID on every user-supplied fragment they splice into a subject. The Subject* / Pattern* builders in this file already do so for their parameters; engine implementations only need it when constructing private extensions of their own.
func SubjectRunEnd ¶
SubjectRunEnd returns the subject every engine MUST publish exactly once when [Engine.Execute] returns, regardless of outcome (clean completion, interrupt, cancel, failure).
engine.run.<runID>.end
func SubjectRunStart ¶
SubjectRunStart returns the subject every engine MUST publish exactly once when [Engine.Execute] begins.
engine.run.<runID>.start
func SubjectStepComplete ¶
SubjectStepComplete returns the subject every engine MUST publish when one step finishes successfully.
engine.run.<runID>.step.<actorID>.complete
func SubjectStepError ¶
SubjectStepError returns the subject every engine MUST publish when one step fails (i.e. when it would normally cause Execute to return a non-nil non-interrupt error).
engine.run.<runID>.step.<actorID>.error
func SubjectStepStart ¶
SubjectStepStart returns the subject every engine MUST publish when it begins executing one step. actorID identifies the unit of work (graph runner: node id; script engine: statement id; etc.).
engine.run.<runID>.step.<actorID>.start
func SubjectStreamDelta ¶
SubjectStreamDelta returns the subject every engine MUST use when emitting an in-flight increment from the step identified by actorID — the canonical example is one LLM token, one dispatched tool call, or one tool result.
Payload MUST decode to a StreamDeltaPayload; see its docs for the per-Type field requirements.
engine.run.<runID>.stream.<actorID>.delta
Types ¶
type Board ¶
type Board struct {
// contains filtered or unexported fields
}
Board is the engine execution blackboard: typed message channels plus untyped control vars, both protected by the same mutex.
Thread safety: every public method takes a mutex. Concurrent reads use RLock; mutations use Lock. The contract matches the original graph.Board it replaces — callers that previously held graph.Board across goroutines do not need to add any new locking.
Board is intentionally ignorant of agent concepts. It does not know what "messages", "answer" or "run id" mean; those names are established by callers. Per-execution metadata (ID, Attributes, Deps) belongs on Run, not here.
func NewBoard ¶
func NewBoard() *Board
NewBoard creates an empty Board with an initialised main channel so callers can Board.AppendChannelMessage without a nil-check.
func RestoreBoard ¶
func RestoreBoard(snap *BoardSnapshot) *Board
RestoreBoard reconstructs a Board from a snapshot. Passing nil returns a fresh empty board so resume code can use this unconditionally.
func (*Board) AppendChannelMessage ¶
AppendChannelMessage appends a message to a channel, creating the channel on demand.
func (*Board) AppendSliceVar ¶
AppendSliceVar atomically appends a value to a slice stored in a board variable. It returns an error if the existing value is not a []any (instead of silently overwriting), so callers cannot lose data by typo'ing a key already used for a non-slice value.
func (*Board) Channel ¶
Channel returns a copy of messages for the given channel. An empty or missing channel returns a nil slice (not a zero-length slice) so callers can use len() == 0 uniformly.
func (*Board) ChannelsCopy ¶
ChannelsCopy returns a deep copy of all channel message lists. Used by parallel branch execution to give each branch an independent view that can later be merged.
func (*Board) GetVarString ¶
GetVarString retrieves a board variable as a string. It returns "" when the key is missing or the stored value is not a string.
func (*Board) RestoreFrom ¶
func (b *Board) RestoreFrom(snap *BoardSnapshot)
RestoreFrom overwrites this board from a snapshot. Used by retry / rollback paths inside an executor: the executor takes a snapshot before a risky step, then restores the same Board (preserving its identity for nodes that captured a pointer to it) on failure.
func (*Board) SetChannel ¶
SetChannel replaces the entire message list for a channel. The input slice is copied; later mutations by the caller do not affect the Board.
func (*Board) Snapshot ¶
func (b *Board) Snapshot() *BoardSnapshot
Snapshot returns a serialisable copy of the current state. Vars are deep-copied so the snapshot is safe to retain after further Board mutations; values implementing Cloneable are duplicated through their Clone method, otherwise reflection is used.
func (*Board) UpdateSliceVarItem ¶
UpdateSliceVarItem finds and updates the first matching item in a slice variable. Missing keys and non-slice values are silently ignored — the typical use is "update the entry I just appended", and the caller has already verified the slice exists.
type BoardSnapshot ¶
type BoardSnapshot struct {
Vars map[string]any `json:"vars"`
Channels map[string][]model.Message `json:"channels,omitempty"`
}
BoardSnapshot is a serialisable representation of board state, used for resume / checkpoint flows. It carries no live mutex and is safe to JSON-encode.
type Cause ¶
type Cause string
Cause classifies why a run was asked to stop. The agent layer maps these onto its higher-level commit/discard policy (e.g. discard the partial output on user_input, commit it on host_shutdown).
Engines should NEVER branch on Cause for control flow — Cause is metadata for the host, not a directive for the engine. The engine's only correct response to any cause is "stop cleanly and return".
const ( // CauseUnknown is the zero value. Hosts should avoid sending it; // it exists so a zero-value Interrupt is recognisable. CauseUnknown Cause = "" // CauseUserCancel is a user-initiated cancel ("stop talking", // "abort this turn"). Output is typically discarded. CauseUserCancel Cause = "user_cancel" // CauseUserInput is a barge-in: the user spoke / typed and the // agent should yield to fresh input. Output is typically discarded. CauseUserInput Cause = "user_input" // CauseHostShutdown is a graceful shutdown from the host. // Output should typically be committed if any was produced. CauseHostShutdown Cause = "host_shutdown" // CauseCustom carries a host-defined cause in [Interrupt.Detail]. CauseCustom Cause = "custom" )
type Checkpoint ¶
type Checkpoint struct {
// ExecID identifies the engine execution this checkpoint belongs
// to. MUST equal the producing [Run.ID].
ExecID string `json:"exec_id"`
// Step is an opaque, engine-defined marker that locates "where"
// the run is. For graph it is typically the next node id; for a
// script engine it might be a continuation id. The host treats
// this as opaque bytes.
Step string `json:"step,omitempty"`
// Iteration is an optional monotonic counter for engines that
// loop (graph re-entry counter, scheduler tick, …). Zero is fine
// when the engine doesn't track iterations.
Iteration int `json:"iteration,omitempty"`
// Board is the Board state at the boundary. Always non-nil.
Board *BoardSnapshot `json:"board"`
// Payload is engine-specific extra state the engine wants to
// persist alongside the Board. Treated as opaque JSON by the
// store; the producing engine is the only consumer that knows
// how to decode it.
Payload json.RawMessage `json:"payload,omitempty"`
// Attributes mirrors [Run.Attributes] at the time the checkpoint
// was produced (run id at the agent layer, tenant, graph id, …).
// Stores may use these for indexing/lookup.
Attributes map[string]string `json:"attributes,omitempty"`
// Timestamp is the wall-clock time the engine produced the
// checkpoint. Hosts may overwrite when they actually persist.
Timestamp time.Time `json:"timestamp"`
}
Checkpoint is the engine-agnostic persistence record produced at a safe boundary during execution. Each engine decides what its own step marker / payload looks like; this struct only owns the common envelope shape.
Engines populate Checkpoint and hand it to [Checkpointer.Checkpoint] (the host method). The host is responsible for writing it durably; engines must not assume the call has persisted anything.
type CheckpointDeleter ¶
CheckpointDeleter optionally extends CheckpointStore with the ability to delete a single execution's checkpoints. Used by the host when a run completes successfully and its checkpoints are no longer needed.
type CheckpointLister ¶
CheckpointLister optionally extends CheckpointStore with the ability to enumerate persisted exec ids. Stores that support listing satisfy this interface; agent-level resume / dashboard code can type-assert to it.
type CheckpointStore ¶
type CheckpointStore interface {
Save(ctx context.Context, cp Checkpoint) error
Load(ctx context.Context, execID string) (*Checkpoint, error)
}
CheckpointStore is the host-side persistence contract. The host's [Checkpointer.Checkpoint] implementation typically delegates to a CheckpointStore. The interface is intentionally narrow: Save persists; Load returns the most-recent persisted record for the given exec id, or (nil, nil) if absent. All methods must be safe for concurrent use.
type Checkpointer ¶
type Checkpointer interface {
Checkpoint(ctx context.Context, cp Checkpoint) error
}
Checkpointer persists a checkpoint at a safe boundary the engine has reached. The host decides whether to actually write; engines must not assume durability.
Hosts without configured checkpointing should make this a no-op (return nil) rather than an error so engines can call it unconditionally.
type Cloneable ¶
type Cloneable interface {
Clone() any
}
Cloneable may be implemented by values stored in Board vars to provide a type-safe deep copy instead of the reflection fallback used by Board.Snapshot and RestoreBoard.
type Dependencies ¶
type Dependencies struct {
// contains filtered or unexported fields
}
Dependencies is a typed dependency-injection container that an engine host (typically the agent runtime) populates at build time and that engine implementations consume at run time.
The container is keyed by an opaque any (use a typed key constant per dependency, never a bare string) to keep the lookup explicit and to discourage stringly-typed coupling. Concurrent reads after Build are safe; mutations through Set/Remove acquire a write lock so a host can adjust the container at any time.
This replaces graph.Dependencies / workflow.ToolDeps and intentionally has no engine-specific knowledge.
func NewDependencies ¶
func NewDependencies() *Dependencies
NewDependencies creates an empty dependency container.
func (*Dependencies) Get ¶
func (d *Dependencies) Get(key any) (any, bool)
Get returns the dependency for the given key, untyped.
func (*Dependencies) Has ¶
func (d *Dependencies) Has(key any) bool
Has reports whether the given key is present.
func (*Dependencies) Remove ¶
func (d *Dependencies) Remove(key any)
Remove deletes a dependency. Missing keys are a no-op.
func (*Dependencies) Set ¶
func (d *Dependencies) Set(key, value any)
Set stores a dependency under the given key. Overwrites any existing value for the same key.
type Engine ¶
type Engine interface {
Execute(ctx context.Context, run Run, host Host, board *Board) (*Board, error)
}
Engine is the deliberately thin contract every local execution engine satisfies so the agent layer can drive it through a uniform shape. Concrete engines (sdk/graph DAG executor, future script- based engines, …) usually expose richer APIs in addition to this method.
Contract:
Execute MUST run to completion, until interrupted, or until ctx is cancelled.
On clean completion, return the final Board (often the same pointer as the input — engines mutate in place by default) and a nil error.
On cooperative interrupt (host sent through host.Interrupts()), return the (partial) Board together with the result of Interrupted. The error then satisfies errdefs.IsInterrupted(err) and can be destructured into an InterruptedError for the cause.
On ctx cancellation, return the (partial) Board and ctx.Err().
On any other failure, return the (partial) Board together with a domain error (preferably classified via errdefs). Returning a non-nil board on error lets the host decide whether to commit / discard / persist.
When run.ResumeFrom is non-nil, Execute resumes from that checkpoint instead of starting fresh. See [Run.ResumeFrom] for the resume contract; engines that do not support resume MUST return an errdefs.NotAvailable-classified error rather than silently restarting.
Engines MUST NOT close any host-owned channel and MUST NOT mutate run.Attributes or run.ResumeFrom.
type EngineFunc ¶
EngineFunc adapts a plain function to the Engine interface. Useful for test doubles and trivial engines.
type Host ¶
type Host interface {
Publisher
Interrupter
UserPrompter
Checkpointer
UsageReporter
}
Host is the contract a runtime exposes to a running engine.
Host is intentionally a *composition* of small, single-method interfaces — Publisher, Interrupter, UserPrompter, Checkpointer. The composition exists to keep [Engine.Execute] readable; downstream code (graph nodes, tools, …) should depend on the smallest interface it actually needs:
// A pure-mapping node only emits events:
func (n *MapNode) Execute(ctx, board, pub engine.Publisher) error
// A streaming LLM node also wants the interrupt channel:
func (n *LLMNode) Execute(ctx, board,
pub engine.Publisher, intr engine.Interrupter) error
Host implementations must be safe for concurrent use. The engine may invoke any method from any goroutine.
type Interrupt ¶
Interrupt is the value the host sends through Host.Interrupts() to ask the running engine to stop. It is also the payload of the Interrupted error so the host can introspect why.
Interrupt is a plain value — copy it freely.
type InterruptedError ¶
type InterruptedError = interruptedErr
InterruptedError is the concrete error type returned by Interrupted. It is exported so hosts can use errors.As to destructure it; the preferred way to produce one is Interrupted, not direct construction.
Implements errdefs interrupted-marker so errdefs.IsInterrupted returns true on any error wrapping (or equal to) one of these.
type Interrupter ¶
type Interrupter interface {
Interrupts() <-chan Interrupt
}
Interrupter exposes the host's cooperative-interrupt channel.
Engines select on the returned channel between steps:
select {
case intr := <-h.Interrupts():
return engine.Interrupted(intr)
case <-ctx.Done():
return ctx.Err()
default:
}
A nil channel means "no cooperative interrupt available"; engines should treat it as "never fires" — receiving on nil blocks forever, which is the correct semantic.
type NoopCheckpointStore ¶
type NoopCheckpointStore struct{}
NoopCheckpointStore drops every checkpoint and reports no state. Use as a default when checkpointing is not configured.
func (NoopCheckpointStore) Load ¶
func (NoopCheckpointStore) Load(context.Context, string) (*Checkpoint, error)
Load satisfies CheckpointStore.
func (NoopCheckpointStore) Save ¶
func (NoopCheckpointStore) Save(context.Context, Checkpoint) error
Save satisfies CheckpointStore.
type NoopHost ¶
type NoopHost struct{}
NoopHost is a zero-cost Host implementation that discards events, never reports interrupts, refuses user prompts, and skips checkpoints. It is meant for tests and embedded scenarios where an engine is invoked outside any real runtime.
func (NoopHost) AskUser ¶
AskUser returns errdefs.NotAvailable so engines can detect that user interaction is unsupported in this host.
func (NoopHost) Checkpoint ¶
func (NoopHost) Checkpoint(context.Context, Checkpoint) error
Checkpoint discards the checkpoint.
func (NoopHost) Interrupts ¶
Interrupts returns nil so engines that select on it block forever on that case (i.e. interrupts never fire under NoopHost).
func (NoopHost) ReportUsage ¶
func (NoopHost) ReportUsage(context.Context, model.TokenUsage)
ReportUsage discards the usage report.
type Publisher ¶
Publisher emits a single event envelope.
Subject schema is NOT owned by this package: the host decides what the routing keys look like. Engines simply produce envelopes whose subject they construct from whatever convention their host has agreed with the consumer side.
Publish errors MUST NOT cause the producing engine to fail the run by default; the engine should log/record and continue. Backpressure or transport failures are an observability concern, not a control- flow signal.
type Run ¶
type Run struct {
// ID is a unique identifier for this engine execution. Engines
// use it as a correlation key in telemetry and may include it in
// any subjects/headers their host's subject schema requires.
//
// The host generates ID and is responsible for keeping it stable
// across resume / checkpoint cycles.
ID string
// Attributes carries arbitrary host-supplied metadata that should
// flow into telemetry spans and event headers (run id at the
// agent layer, tenant id, parent span links, engine kind for
// observability, …).
//
// There is intentionally no dedicated field for "engine kind",
// "run id (agent layer)" or similar: those are observability
// concerns whose key conventions the host owns. If the consumer
// side wants to filter by engine kind, the host sets
// Attributes["engine_kind"] (or whatever key it agreed on).
Attributes map[string]string
// Deps is the typed dependency container the host has populated
// for this run (LLM clients, tool registries, retrievers, …).
// May be nil if the engine needs no dependencies; engines that
// look up dependencies should use [GetDep] which handles nil.
Deps *Dependencies
// ResumeFrom, when non-nil, instructs the engine to continue
// execution from the provided checkpoint instead of starting a
// fresh run. The engine is the sole interpreter of
// [Checkpoint.Step] and [Checkpoint.Payload]; the host treats
// them as opaque.
//
// Contract:
//
// - When ResumeFrom is non-nil the engine SHOULD prefer
// ResumeFrom.Board over the board parameter passed to
// [Engine.Execute]; passing both is allowed but the
// checkpoint's board takes precedence as it represents the
// state at the boundary the run paused on.
//
// - When ResumeFrom.ExecID differs from [Run.ID] the engine MUST
// return an errdefs.Validation-classified error: forking a
// run requires a fresh execution id, not a resume.
//
// - Engines that do not support resume MUST return an
// errdefs.NotAvailable-classified error when they observe a
// non-nil ResumeFrom rather than silently restarting from
// scratch.
//
// Hosts that drive resumption typically [CheckpointStore.Load]
// the most recent checkpoint, set ResumeFrom, and call
// [Engine.Execute] again with the same Run.ID.
ResumeFrom *Checkpoint
}
Run is the per-execution input bundle an engine receives alongside the host. It is a plain data struct — no methods, no builder, no hidden state — assembled once by the host and passed to [Engine.Execute] read-only.
All fields are conceptually immutable for the duration of the run. Engines may read freely; they MUST NOT mutate the maps in place nor mutate the referenced ResumeFrom checkpoint.
type StreamDeltaPayload ¶
type StreamDeltaPayload struct {
// Type discriminates the payload variant. See StreamDeltaType
// constants for the standard values.
Type StreamDeltaType `json:"type"`
// Content carries the assistant text for "token" and the tool
// output (typically already serialised) for "tool_result".
Content string `json:"content,omitempty"`
// ID is the tool-call identifier the model assigned. Set on
// "tool_call" only; for "tool_result" use ToolCallID instead.
ID string `json:"id,omitempty"`
// Name is the tool name. Set on "tool_call"; recommended on
// "tool_result" so consumers can correlate without a separate
// dispatch table.
Name string `json:"name,omitempty"`
// Arguments is the tool input the model produced. Engines MAY
// pass it as either a string (raw JSON) or an already-decoded
// map / slice — consumers should accept both.
Arguments any `json:"arguments,omitempty"`
// ToolCallID pairs a "tool_result" with the originating
// "tool_call". Required on tool_result.
ToolCallID string `json:"tool_call_id,omitempty"`
// IsError reports whether the tool dispatch returned an error
// payload (vs. a successful result). Set on "tool_result".
IsError bool `json:"is_error,omitempty"`
// Cancelled reports whether this tool_result is a synthesised
// cancellation (the call was never dispatched because the round
// was interrupted). Set on "tool_result" only.
Cancelled bool `json:"cancelled,omitempty"`
}
StreamDeltaPayload is the canonical decoded shape of a SubjectStreamDelta envelope's payload.
Engines MUST emit payloads that JSON-decode into this struct; the runtime constraint is checked by DecodeStreamDelta. Engines MAY add fields beyond what this struct lists — the JSON decoder is permissive on unknowns — but consumers SHOULD only rely on the fields documented here.
Per-Type field requirements:
Type Required Recommended ------------ -------------------- -------------------- token Content — tool_call ID, Name Arguments tool_result ToolCallID, Content Name, IsError, Cancelled
func DecodeStreamDelta ¶
func DecodeStreamDelta(env event.Envelope) (StreamDeltaPayload, error)
DecodeStreamDelta extracts the payload of a stream-delta envelope. It returns an error when the envelope payload is empty or does not JSON-decode into StreamDeltaPayload. It does NOT verify the subject; callers may pre-filter with IsStreamDelta when iterating a mixed stream.
type StreamDeltaType ¶
type StreamDeltaType string
StreamDeltaType enumerates the kinds of in-flight increments a stream envelope can carry. Engines MUST set [StreamDeltaPayload.Type] to one of these values; consumers SHOULD treat unknown values as forward- compatible additions and skip them.
const ( // StreamDeltaToken is one piece of generated assistant text. // Required field: Content. StreamDeltaToken StreamDeltaType = "token" // StreamDeltaToolCall is one tool invocation the model just // requested. Required fields: ID, Name. Recommended: Arguments. StreamDeltaToolCall StreamDeltaType = "tool_call" // StreamDeltaToolResult is the outcome of one tool invocation — // either the actual result, or a synthesised cancellation when // the round was interrupted before the call dispatched. // Required fields: ToolCallID, Content. Recommended: Name, // IsError, Cancelled. StreamDeltaToolResult StreamDeltaType = "tool_result" )
type UsageReporter ¶
type UsageReporter interface {
ReportUsage(ctx context.Context, usage model.TokenUsage)
}
UsageReporter accepts incremental LLM token-usage reports an engine observes during a run. Each call adds delta usage; the host is responsible for accumulation, billing, and downstream telemetry.
Engines should call ReportUsage once per LLM invocation that returns usage metadata (typical: streaming nodes call it on completion with the per-call totals). Reports SHOULD be best-effort — engines must not let a slow or failing reporter block forward progress, and must not return an error to the run because of usage reporting.
Hosts without billing or token tracking should make this a no-op.
type UserPrompt ¶
type UserPrompt struct {
Parts []model.Part
Schema []byte
Source string
Metadata map[string]string
}
UserPrompt describes what the engine is asking the host to relay to the end user. It deliberately stays one level below "chat message":
- Parts carries the multi-modal payload (text, image, audio, file, structured data) using model.Part — the same building block that model.Message uses, minus the chat-specific Role.
- Schema is an optional structured-input hint (JSON-schema-shaped bytes) for cases where the host wants to render a form or validate the response.
- Source identifies the engine step that produced the prompt; useful for trace correlation and resume.
- Metadata is free-form host-passed-through metadata.
Why []model.Part rather than model.Message: a Message also carries Role (system/user/assistant/tool), which is a chat-layer concept the engine has no business naming. Parts give us full multi-modality (image, audio, file, data) without tying the engine to chat semantics — the agent layer wraps Parts back into a Message with the right Role on its way out, and unwraps user-supplied Parts on the way in.
type UserPrompter ¶
type UserPrompter interface {
AskUser(ctx context.Context, prompt UserPrompt) (UserReply, error)
}
UserPrompter lets an engine ask the host to prompt the end user (chat input, voice DTMF, structured form, …) and block until the reply arrives.
Hosts that don't expose user interaction should return an errdefs.NotAvailable-classified error. Engines that get such an error from a step that strictly needs user input should propagate it so the host can decide whether to fail or fall back.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package enginetest provides reusable contract-test machinery for implementations of engine.Engine.
|
Package enginetest provides reusable contract-test machinery for implementations of engine.Engine. |