Documentation
¶
Overview ¶
Package runner is the assembly + lifecycle layer on top of graph/executor.
It turns a graph.GraphDefinition into a long-lived Runner instance: the definition is compiled once, then each Run call assembles fresh node instances and dispatches them to the configured executor. This split keeps graph/executor focused on the execute step alone; assembling and node construction belong to a higher layer because they pull in the entire node factory dependency graph (LLM, tool registry, script runtime, …).
Stream-delta emission from custom nodes ¶
In-flight progress events flow through the engine-level SubjectStreamDelta channel — see engine.SubjectStreamDelta + engine.StreamDeltaPayload for the SPI. Built-in nodes (LLM, tool dispatch) emit token / tool_call / tool_result deltas automatically; custom nodes that want to surface their own progress have two options:
Use the simplified graph.StreamPublisher handed to every node via NodeContext. Calling Emit("type", payload) goes through the runner's newNodePublisher wrapper which packages the delta into a SubjectStreamDelta envelope. This is the natural choice when the node already lives inside graph execution.
For nodes that need fine-grained control over the payload shape (e.g. emitting a forward-compatible Type the SDK does not yet ship a helper for), use the strongly-typed helpers in the engine package — engine.EmitStreamToken, engine.EmitStreamToolCall, engine.EmitStreamToolResult or the lower-level engine.EmitStreamDelta. These build the envelope, attach HeaderRunID / HeaderAgentID / HeaderNodeID, and validate per-Type required fields before publishing, so a malformed delta is caught at emit time instead of silently flowing to subscribers.
Both paths land on the same Subject (engine.run.<runID>.stream. <stepActor>.delta), so consumers subscribing via engine.PatternRunStream observe LLM-emitted and node-emitted deltas through one subscription.
Index ¶
- Constants
- func Assemble(compiled *graph.CompiledGraph, factory *node.Factory) (*graph.Graph, error)
- func RegisterMergeStrategy(name MergeStrategy, fn MergeFunc)
- func WithActorKey(ctx context.Context, key string) context.Contextdeprecated
- type CloneableResolver
- type MergeFunc
- type MergeStrategy
- type Option
- type ParallelConfig
- type Runner
- func (r *Runner) CanResume(cp engine.Checkpoint) error
- func (r *Runner) Capabilities() engine.Capabilities
- func (r *Runner) Execute(ctx context.Context, run engine.Run, host engine.Host, board *engine.Board) (*engine.Board, error)
- func (r *Runner) Graph() (*graph.Graph, error)
- func (r *Runner) Host() engine.Host
- func (r *Runner) Run(ctx context.Context, vars map[string]any, opts ...executor.RunOption) (*graph.Board, error)
- type VariableResolver
Constants ¶
const ( MergeLastWins = executor.MergeLastWins MergeNamespace = executor.MergeNamespace MergeErrorOnConflict = executor.MergeErrorOnConflict )
Built-in merge strategies. RegisterMergeStrategy lets callers add their own.
Variables ¶
This section is empty.
Functions ¶
func Assemble ¶
Assemble takes a CompiledGraph (static analysis result) and a Factory, then constructs all real node instances and returns an immutable executable Graph ready for the executor.
func RegisterMergeStrategy ¶
func RegisterMergeStrategy(name MergeStrategy, fn MergeFunc)
RegisterMergeStrategy registers fn under name so ParallelConfig can refer to it by string. Callers SHOULD register custom strategies at init() time.
func WithActorKey
deprecated
WithActorKey stamps an agent identifier onto ctx so the runner forwards it onto every envelope header (HeaderAgentID) and uses it as the prefix of the step subject segment.
Deprecated: as of v0.4 the runner resolves the agent id from the canonical engine.Run.Attributes[telemetry.AttrAgentID] key first (populated automatically by [agent.Run]) and only falls back to this ctx-key when the attribute is absent. Prefer driving the runner through [agent.Run] (or stamp the attribute directly on engine.Run.Attributes) — that path survives cross-process hand-offs (HTTP, vessel inline, A2A) where context values are dropped at the wire boundary. WithActorKey will be removed in v0.5.0.
Types ¶
type CloneableResolver ¶
type CloneableResolver = executor.CloneableResolver
CloneableResolver is the optional interface a resolver implements to support parallel branches. Branches need independent scope so the runner clones the resolver for each.
type MergeStrategy ¶
type MergeStrategy = executor.MergeStrategy
MergeStrategy names a parallel-branch merge policy.
type Option ¶
type Option func(*Runner)
Option configures a Runner.
func WithHost ¶
WithHost installs the engine.Host the Runner forwards to the executor on every Run. The host receives every published envelope and is also handed to nodes via ExecutionContext.Host so they can call Publish, Interrupt, AskUser etc. directly.
When omitted the Runner defaults to engine.NoopHost{} and envelopes are dropped. Note that Runner.Execute takes a Host parameter that overrides this default — WithHost only matters for Runner.Run callers.
func WithMaxIterations ¶
WithMaxIterations caps the number of executor iterations (one iteration ≈ one node frontier).
func WithMaxNodeRetries ¶
WithMaxNodeRetries caps the per-node retry budget the executor applies when a node returns a transient error.
func WithParallel ¶
func WithParallel(cfg executor.ParallelConfig) Option
WithParallel enables parallel fork/join execution with the supplied policy. Defaults are filled in by executor.WithParallel.
func WithResolver ¶
func WithResolver(r executor.VariableResolver) Option
WithResolver installs the variable resolver consulted by the executor when a node config contains references. The Runner installs a fresh variable.NewResolver() per execution when this option is omitted (see runner.go).
func WithStartNode ¶
WithStartNode overrides the entry node for the run. Useful for resume flows that want to restart from a specific point.
func WithTimeout ¶
WithTimeout sets a wall-clock budget for the whole run; the executor derives a context.WithTimeout for its main loop.
type ParallelConfig ¶
type ParallelConfig = executor.ParallelConfig
ParallelConfig configures parallel fork/join execution. Passed to WithParallel.
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner is a graph engine: it caches the CompiledGraph (static analysis result) and re-assembles fresh Node instances on every execution, so concurrent callers never share mutable node state.
Runner satisfies engine.Engine via Runner.Execute, which is the canonical entry point going forward — agent.Run accepts a Runner directly, no adapter required. Runner.Run remains as a convenience helper for callers who want to drive the graph without standing up an engine.Run + engine.Host pair (typical for tests or one-shot CLI usage); it is implemented as a thin wrapper around Execute so there is exactly one execution path to reason about.
Runner construction collects every graph-level configuration knob (max iterations, timeout, parallel policy, variable resolver, …) once via runner.WithXxx options. Per-run identity (engine.Run.ID) and host capabilities (engine.Host) flow through Execute parameters — they are NOT construction-time concerns.
func New ¶
New compiles a GraphDefinition and returns a ready-to-use Runner. The factory provides runtime dependencies (LLM resolver, tool registry, etc.) needed to instantiate nodes.
func (*Runner) CanResume ¶ added in v0.3.4
func (r *Runner) CanResume(cp engine.Checkpoint) error
CanResume satisfies engine.Resumer. It is the cheap pre-flight probe a host runs before invoking Execute with a non-nil ResumeFrom so obvious incompatibilities surface as typed errors instead of failing partway through Execute.
Checks performed (all errdefs.Validation when violated — these are programmer errors, not transient conditions):
- cp.Step is non-empty: a checkpoint without a step marker has no "where to resume" and cannot be replayed.
- cp.Step names a node present in this Runner's compiled graph: prevents reusing a checkpoint produced by a different graph definition (rename / restructure ⇒ resume becomes invalid).
- cp.Attributes["graph_name"] (if set) matches this Runner's compiled graph name: defends against the cross-graph case where two graphs happen to share node ids.
CanResume does NOT perform the foreign-ExecID check — that lives in validateResume on the Execute path because it requires the calling Run.ID, which CanResume does not receive.
func (*Runner) Capabilities ¶ added in v0.3.4
func (r *Runner) Capabilities() engine.Capabilities
Capabilities reports what this engine implementation can do, per the engine.Describer contract. Hosts (agent.Run preflight, vessel build path, dashboards) read this to gate features on.
Current values:
- SupportsResume = true. Execute consumes Run.ResumeFrom: it restores board state from cp.Board and continues from cp.Step's downstream edges. CanResume below is the synchronous probe.
- EmitsUserPrompt = false. The runner core never calls host.AskUser. Optional plugin nodes (e.g. scriptnode) MAY prompt the user via the host bridge; that is the user's graph decision, not a runner-intrinsic capability, so it is left unclaimed at the engine level.
- EmitsCheckpoint = true. The internal executor calls host.Checkpoint after every node completes. Pods that need durable replay should attach a CheckpointStore.
- RequiredDepNames = nil. The graph runner is a meta-engine — concrete dep needs (LLM clients, tool registries) are declared per-node-factory by whoever assembled the graph, not by the runner itself.
func (*Runner) Execute ¶
func (r *Runner) Execute( ctx context.Context, run engine.Run, host engine.Host, board *engine.Board, ) (*engine.Board, error)
Execute satisfies engine.Engine. It runs the bound graph using host as the event/interrupt/ask sink and run.ID as the executor's run identifier. board MUST be non-nil; engines mutate in place by contract and Execute therefore returns the same pointer on success.
Resume support: when run.ResumeFrom is non-nil the runner restores board state from cp.Board, locates the node id recorded in cp.Step, and continues from that node's downstream edges instead of re-executing it. The checkpoint MUST originate from a previous run of the same graph and run.ID; mismatched ExecID surfaces errdefs.Validation, mismatched GraphName surfaces errdefs.Validation via the [Resumer.CanResume] probe.
func (*Runner) Graph ¶
Graph returns a freshly assembled Graph snapshot for inspection. Intended for testing and debugging, not for execution.
func (*Runner) Host ¶
Host returns the configured engine.Host. Always non-nil — callers can invoke any Host method directly (Publish / Interrupts / AskUser / Checkpoint / ReportUsage). Subscribing to envelopes is the host implementation's concern; if the concrete type exposes a getter for that, callers can type-assert on the returned value.
func (*Runner) Run ¶
func (r *Runner) Run(ctx context.Context, vars map[string]any, opts ...executor.RunOption) (*graph.Board, error)
Run is a convenience wrapper around Runner.Execute for callers who want to drive the graph without assembling an engine.Run + engine.Host pair themselves. It populates a fresh Board from vars, mints an engine.Run that carries any extra executor.RunOption supplied via opts, and forwards the Runner's configured host. Safe for concurrent use — each call gets independent node instances.
New code that runs through agent.Run should call agent.Run with the Runner directly; Run is preserved for tests and one-shot CLI usage where the engine.Engine plumbing would be ceremony.
type VariableResolver ¶
type VariableResolver = executor.VariableResolver
VariableResolver resolves variable references in node configs. The runner installs a default variable.NewResolver() per execution; supply your own via WithResolver when you need a different scope or resolution policy.