Documentation
¶
Overview ¶
Package executor provides the graph execution engine.
Index ¶
- func Assemble(compiled *compiler.CompiledGraph, factory *node.Factory) (*graph.Graph, error)
- func PatternAllRuns() event.Pattern
- func PatternRun(runID string) event.Pattern
- func PatternRunNodes(runID string) event.Pattern
- func RegisterMergeStrategy(name MergeStrategy, fn MergeFunc)
- func WithActorKey(ctx context.Context, key string) context.Context
- type Checkpoint
- type CheckpointCleaner
- type CheckpointManager
- type CheckpointStore
- type CleanupOptions
- type CloneableResolver
- type Executor
- type FileCheckpointConfig
- type FileCheckpointStore
- func (s *FileCheckpointStore) Cleanup(opts CleanupOptions) (int, error)
- func (s *FileCheckpointStore) Delete(graphName string) error
- func (s *FileCheckpointStore) List() ([]string, error)
- func (s *FileCheckpointStore) Load(graphName, runID string) (*Checkpoint, error)
- func (s *FileCheckpointStore) Save(cp Checkpoint) error
- type LocalExecutor
- type MergeFunc
- type MergeStrategy
- type ParallelConfig
- type RunOption
- func WithCheckpointStore(s CheckpointStore) RunOption
- func WithEventBus(bus event.Bus) RunOption
- func WithMaxIterations(n int) RunOption
- func WithMaxNodeRetries(n int) RunOption
- func WithParallel(cfg ParallelConfig) RunOption
- func WithResolver(r VariableResolver) RunOption
- func WithRunID(id string) RunOption
- func WithStartNode(id string) RunOption
- func WithStreamCallback(cb graph.StreamCallback) RunOption
- func WithTimeout(d time.Duration) RunOption
- type Runner
- type RunnerOption
- type VariableResolver
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Assemble ¶
Assemble takes a CompiledGraph (static analysis result) and a Factory, then constructs all real node instances and returns an immutable executable Graph.
func PatternAllRuns ¶ added in v0.1.12
PatternAllRuns returns "graph.run.>" — every executor event from any run.
func PatternRun ¶ added in v0.1.12
PatternRun returns "graph.run.<runID>.>" — a Pattern that matches every event emitted by the executor for the given run.
Exposed for callers that want to subscribe to a specific run without hard-coding the subject convention.
func PatternRunNodes ¶ added in v0.1.12
PatternRunNodes returns "graph.run.<runID>.node.>" — every node-level event for the given run.
func RegisterMergeStrategy ¶
func RegisterMergeStrategy(name MergeStrategy, fn MergeFunc)
RegisterMergeStrategy registers a custom merge strategy.
Types ¶
type Checkpoint ¶
type Checkpoint struct {
GraphName string `json:"graph_name"`
RunID string `json:"run_id,omitempty"`
NodeID string `json:"node_id"`
Iteration int `json:"iteration"`
Board *graph.BoardSnapshot `json:"board"`
Timestamp time.Time `json:"timestamp"`
}
Checkpoint represents a snapshot of graph execution state.
type CheckpointCleaner ¶
type CheckpointCleaner interface {
Cleanup(opts CleanupOptions) (deleted int, err error)
}
CheckpointCleaner supports periodic cleanup of old checkpoints.
type CheckpointManager ¶
type CheckpointManager interface {
CheckpointStore
List() ([]string, error)
Delete(graphName string) error
}
CheckpointManager extends CheckpointStore with lifecycle operations. FileCheckpointStore implements this interface.
type CheckpointStore ¶
type CheckpointStore interface {
Save(cp Checkpoint) error
// Load retrieves the latest checkpoint. When runID is non-empty, only
// checkpoints for that specific run are considered.
Load(graphName, runID string) (*Checkpoint, error)
}
CheckpointStore is the interface for persisting and loading checkpoints.
type CleanupOptions ¶
CleanupOptions configures checkpoint cleanup behavior.
type CloneableResolver ¶
type CloneableResolver interface {
VariableResolver
Clone() *variable.Resolver
}
CloneableResolver is an optional interface for resolvers that support cloning for use in parallel branches.
type Executor ¶
type Executor interface {
Execute(ctx context.Context, g *graph.Graph, board *graph.Board, opts ...RunOption) (*graph.Board, error)
}
Executor is the interface for graph execution engines.
type FileCheckpointConfig ¶
FileCheckpointConfig configures the file-based checkpoint store.
type FileCheckpointStore ¶
type FileCheckpointStore struct {
// contains filtered or unexported fields
}
FileCheckpointStore persists checkpoints as JSON files.
func NewFileCheckpointStore ¶
func NewFileCheckpointStore(cfg FileCheckpointConfig) (*FileCheckpointStore, error)
NewFileCheckpointStore creates a file-based checkpoint store.
func (*FileCheckpointStore) Cleanup ¶
func (s *FileCheckpointStore) Cleanup(opts CleanupOptions) (int, error)
Cleanup removes stale checkpoint files.
func (*FileCheckpointStore) Delete ¶
func (s *FileCheckpointStore) Delete(graphName string) error
Delete removes the primary checkpoint and all backups for the given graph.
func (*FileCheckpointStore) List ¶
func (s *FileCheckpointStore) List() ([]string, error)
List returns graph names that have a primary checkpoint file. Primary files match "{name}.checkpoint.json"; backup files have an additional timestamp segment ("{name}.checkpoint.{ts}.json") and are skipped.
func (*FileCheckpointStore) Load ¶
func (s *FileCheckpointStore) Load(graphName, runID string) (*Checkpoint, error)
Load reads the latest checkpoint from disk.
func (*FileCheckpointStore) Save ¶
func (s *FileCheckpointStore) Save(cp Checkpoint) error
Save writes a checkpoint to disk with multi-version retention.
type LocalExecutor ¶
type LocalExecutor struct{}
LocalExecutor is the default single-process executor.
func NewLocalExecutor ¶
func NewLocalExecutor() *LocalExecutor
NewLocalExecutor creates a new LocalExecutor.
type MergeFunc ¶
type MergeFunc func(board *graph.Board, snapshot *graph.BoardSnapshot, results []branchResult) error
MergeFunc merges parallel branch results into the parent board. snapshot is the pre-fork board state (for conflict detection).
type MergeStrategy ¶
type MergeStrategy string
MergeStrategy defines how parallel branch results are merged.
const ( MergeLastWins MergeStrategy = "last_wins" MergeNamespace MergeStrategy = "namespace" MergeErrorOnConflict MergeStrategy = "error_on_conflict" )
type ParallelConfig ¶
type ParallelConfig struct {
Enabled bool `json:"enabled"`
MaxBranches int `json:"max_branches"`
MaxNesting int `json:"max_nesting"`
MergeStrategy MergeStrategy `json:"merge_strategy"`
}
ParallelConfig configures parallel fork/join execution.
type RunOption ¶
type RunOption func(*runConfig)
RunOption configures a single graph execution run.
func WithCheckpointStore ¶
func WithCheckpointStore(s CheckpointStore) RunOption
func WithEventBus ¶
func WithMaxIterations ¶
func WithMaxNodeRetries ¶
func WithParallel ¶
func WithParallel(cfg ParallelConfig) RunOption
func WithResolver ¶
func WithResolver(r VariableResolver) RunOption
func WithStartNode ¶
func WithStreamCallback ¶
func WithStreamCallback(cb graph.StreamCallback) RunOption
func WithTimeout ¶
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner is a lightweight, concurrency-safe graph executor.
It caches the CompiledGraph (static analysis result) and re-assembles fresh Node instances on every Run call, so concurrent callers never share mutable node state.
func NewRunner ¶
func NewRunner(def *graph.GraphDefinition, factory *node.Factory, opts ...RunnerOption) (*Runner, error)
NewRunner compiles a GraphDefinition and returns a ready-to-use Runner. The factory provides runtime dependencies (LLM resolver, tool registry, etc.) needed to instantiate nodes.
type RunnerOption ¶
type RunnerOption func(*Runner)
RunnerOption configures a Runner.
func WithRunnerEventBus ¶
func WithRunnerEventBus(bus event.Bus) RunnerOption
WithRunnerEventBus sets the Bus used for graph lifecycle events. Defaults to event.NoopBus{}.
func WithRunnerExecutor ¶
func WithRunnerExecutor(e Executor) RunnerOption
WithRunnerExecutor overrides the default LocalExecutor.