Documentation
¶
Overview ¶
Package executor provides the graph execution engine.
Index ¶
- func RegisterMergeStrategy(name MergeStrategy, fn MergeFunc)
- func WithActorKey(ctx context.Context, key string) context.Contextdeprecated
- type Checkpoint
- type CheckpointCleaner
- type CheckpointManager
- type CheckpointStore
- type CleanupOptions
- type CloneableResolver
- type FileCheckpointConfig
- type FileCheckpointStore
- func (s *FileCheckpointStore) Cleanup(opts CleanupOptions) (int, error)
- func (s *FileCheckpointStore) Delete(graphName string) error
- func (s *FileCheckpointStore) List() ([]string, error)
- func (s *FileCheckpointStore) Load(graphName, runID string) (*Checkpoint, error)
- func (s *FileCheckpointStore) Save(cp Checkpoint) error
- type LocalExecutor
- type MergeFunc
- type MergeStrategy
- type ParallelConfig
- type RunOption
- func WithAttributes(m map[string]string) RunOption
- func WithDeps(d *engine.Dependencies) RunOption
- func WithHost(h engine.Host) RunOption
- func WithMaxIterations(n int) RunOption
- func WithMaxNodeRetries(n int) RunOption
- func WithParallel(cfg ParallelConfig) RunOption
- func WithResolver(r VariableResolver) RunOption
- func WithResumeFrom(cp *engine.Checkpoint) RunOption
- func WithRunID(id string) RunOption
- func WithStartNode(id string) RunOption
- func WithTimeout(d time.Duration) RunOption
- type VariableResolver
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RegisterMergeStrategy ¶
func RegisterMergeStrategy(name MergeStrategy, fn MergeFunc)
RegisterMergeStrategy registers a custom merge strategy.
func WithActorKey
deprecated
WithActorKey stamps an agent identifier onto ctx for legacy callers that drive the executor directly (no agent.Run wrapper).
Deprecated: prefer populating engine.Run.Attributes with the canonical telemetry.AttrAgentID key — agent.Run already does this, and unlike a context value the attribute survives cross-process hand-offs (HTTP, A2A, vessel inline engines). The executor's agentIDFor resolver still honours this ctx-key as a fallback when no attribute is set, so existing callers keep working until the v0.5.0 removal.
Types ¶
type Checkpoint ¶
type Checkpoint struct {
GraphName string `json:"graph_name"`
RunID string `json:"run_id,omitempty"`
NodeID string `json:"node_id"`
Iteration int `json:"iteration"`
Board *graph.BoardSnapshot `json:"board"`
Timestamp time.Time `json:"timestamp"`
// OriginalStartedAt mirrors [engine.Checkpoint.OriginalStartedAt].
// Stored at the executor's own checkpoint type so the
// FileCheckpointStore round-trips it in JSON without losing the
// resume-anchored start time.
OriginalStartedAt time.Time `json:"original_started_at,omitempty"`
}
Checkpoint represents a snapshot of graph execution state.
type CheckpointCleaner ¶
type CheckpointCleaner interface {
Cleanup(opts CleanupOptions) (deleted int, err error)
}
CheckpointCleaner supports periodic cleanup of old checkpoints.
type CheckpointManager ¶
type CheckpointManager interface {
CheckpointStore
List() ([]string, error)
Delete(graphName string) error
}
CheckpointManager extends CheckpointStore with lifecycle operations. FileCheckpointStore implements this interface.
type CheckpointStore ¶
type CheckpointStore interface {
Save(cp Checkpoint) error
// Load retrieves the latest checkpoint. When runID is non-empty, only
// checkpoints for that specific run are considered.
Load(graphName, runID string) (*Checkpoint, error)
}
CheckpointStore is the interface for persisting and loading checkpoints.
type CleanupOptions ¶
CleanupOptions configures checkpoint cleanup behavior.
type CloneableResolver ¶
type CloneableResolver interface {
VariableResolver
Clone() *variable.Resolver
}
CloneableResolver is an optional interface for resolvers that support cloning for use in parallel branches.
type FileCheckpointConfig ¶
FileCheckpointConfig configures the file-based checkpoint store.
type FileCheckpointStore ¶
type FileCheckpointStore struct {
// contains filtered or unexported fields
}
FileCheckpointStore persists checkpoints as JSON files.
func NewFileCheckpointStore ¶
func NewFileCheckpointStore(cfg FileCheckpointConfig) (*FileCheckpointStore, error)
NewFileCheckpointStore creates a file-based checkpoint store.
func (*FileCheckpointStore) Cleanup ¶
func (s *FileCheckpointStore) Cleanup(opts CleanupOptions) (int, error)
Cleanup removes stale checkpoint files.
func (*FileCheckpointStore) Delete ¶
func (s *FileCheckpointStore) Delete(graphName string) error
Delete removes the primary checkpoint and all backups for the given graph.
func (*FileCheckpointStore) List ¶
func (s *FileCheckpointStore) List() ([]string, error)
List returns graph names that have a primary checkpoint file. Primary files match "{name}.checkpoint.json"; backup files have an additional timestamp segment ("{name}.checkpoint.{ts}.json") and are skipped.
func (*FileCheckpointStore) Load ¶
func (s *FileCheckpointStore) Load(graphName, runID string) (*Checkpoint, error)
Load reads the latest checkpoint from disk.
func (*FileCheckpointStore) Save ¶
func (s *FileCheckpointStore) Save(cp Checkpoint) error
Save writes a checkpoint to disk with multi-version retention.
type LocalExecutor ¶
type LocalExecutor struct{}
LocalExecutor is the default single-process executor.
func NewLocalExecutor ¶
func NewLocalExecutor() *LocalExecutor
NewLocalExecutor creates a new LocalExecutor.
type MergeFunc ¶
type MergeFunc func(board *graph.Board, snapshot *graph.BoardSnapshot, results []branchResult) error
MergeFunc merges parallel branch results into the parent board. snapshot is the pre-fork board state (for conflict detection).
type MergeStrategy ¶
type MergeStrategy string
MergeStrategy defines how parallel branch results are merged.
const ( MergeLastWins MergeStrategy = "last_wins" MergeNamespace MergeStrategy = "namespace" MergeErrorOnConflict MergeStrategy = "error_on_conflict" )
type ParallelConfig ¶
type ParallelConfig struct {
Enabled bool `json:"enabled"`
MaxBranches int `json:"max_branches"`
MaxNesting int `json:"max_nesting"`
MergeStrategy MergeStrategy `json:"merge_strategy"`
}
ParallelConfig configures parallel fork/join execution.
type RunOption ¶
type RunOption func(*runConfig)
RunOption configures a single graph execution run.
func WithAttributes ¶ added in v0.3.4
WithAttributes installs the string-keyed attribute bag the executor will forward into graph.ExecutionContext.Attributes. Used for canonical agent-scoped identity (sdk/telemetry.AttrAgentID, AttrTaskID, AttrContextID) and any caller-supplied tags. nil clears.
runner.Runner.Execute populates this from engine.Run.Attributes.
func WithDeps ¶ added in v0.3.4
func WithDeps(d *engine.Dependencies) RunOption
WithDeps installs the engine.Dependencies the executor will forward into graph.ExecutionContext.Deps so nodes can recover host-supplied dependencies (LLM clients, tool registries, …) via engine.GetDep. nil is allowed and means "no deps for this run"; nodes that need a dep must tolerate the nil case.
runner.Runner.Execute populates this from engine.Run.Deps, so callers that drive the executor through the Engine surface get the wiring for free.
func WithHost ¶
WithHost installs the engine.Host the executor will hand to nodes via ExecutionContext.Host. When omitted the executor falls back to engine.NoopHost{} so nodes can call ctx.Host methods unconditionally.
func WithMaxIterations ¶
func WithMaxNodeRetries ¶
func WithParallel ¶
func WithParallel(cfg ParallelConfig) RunOption
func WithResolver ¶
func WithResolver(r VariableResolver) RunOption
func WithResumeFrom ¶ added in v0.3.4
func WithResumeFrom(cp *engine.Checkpoint) RunOption
WithResumeFrom installs an engine.Checkpoint the executor should resume from. The checkpoint must come from a previous run of the same graph: cp.Step names a node already executed, cp.Board carries the snapshot taken right after that node completed, and cp.Iteration is the iteration count at that point.
Execute restores cp.Board over the caller-supplied board (the checkpoint is the source of truth for state at the resume point), resolves the downstream nodes of cp.Step against the restored board (so conditional edges branch correctly on resumed state), and continues execution from there. The completed node is NOT re-executed.
nil is a no-op (= fresh start). Suppling an unknown cp.Step surfaces errdefs.Validation; Run.ResumeFrom contract validation (foreign ExecID, etc.) is the runner.Runner's responsibility, not the executor's.
func WithRunID ¶
WithRunID sets the run identifier the executor uses in telemetry and event subjects.