Documentation
¶
Overview ¶
Package executor provides the graph execution engine.
Index ¶
- func RegisterMergeStrategy(name MergeStrategy, fn MergeFunc)
- func WithActorKey(ctx context.Context, key string) context.Context
- type Checkpointdeprecated
- type CheckpointCleaner
- type CheckpointManager
- type CheckpointStoredeprecated
- type CleanupOptions
- type CloneableResolver
- type Executordeprecated
- type FileCheckpointConfig
- type FileCheckpointStore
- func (s *FileCheckpointStore) Cleanup(opts CleanupOptions) (int, error)
- func (s *FileCheckpointStore) Delete(graphName string) error
- func (s *FileCheckpointStore) List() ([]string, error)
- func (s *FileCheckpointStore) Load(graphName, runID string) (*Checkpoint, error)
- func (s *FileCheckpointStore) Save(cp Checkpoint) error
- type LocalExecutor
- type MergeFunc
- type MergeStrategy
- type ParallelConfig
- type RunOption
- func WithCheckpointStore(s CheckpointStore) RunOptiondeprecated
- func WithEventBus(bus event.Bus) RunOptiondeprecated
- func WithHost(h engine.Host) RunOption
- func WithMaxIterations(n int) RunOption
- func WithMaxNodeRetries(n int) RunOption
- func WithParallel(cfg ParallelConfig) RunOption
- func WithResolver(r VariableResolver) RunOption
- func WithRunID(id string) RunOptiondeprecated
- func WithStartNode(id string) RunOption
- func WithStreamCallback(cb graph.StreamCallback) RunOptiondeprecated
- func WithTimeout(d time.Duration) RunOption
- type VariableResolver
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RegisterMergeStrategy ¶
func RegisterMergeStrategy(name MergeStrategy, fn MergeFunc)
RegisterMergeStrategy registers a custom merge strategy.
Types ¶
type Checkpoint
deprecated
type Checkpoint struct {
GraphName string `json:"graph_name"`
RunID string `json:"run_id,omitempty"`
NodeID string `json:"node_id"`
Iteration int `json:"iteration"`
Board *graph.BoardSnapshot `json:"board"`
Timestamp time.Time `json:"timestamp"`
}
Checkpoint represents a snapshot of graph execution state.
Deprecated: use engine.Checkpoint. This type predates the engine.Host.Checkpointer abstraction; the executor now persists checkpoints through the host. Scheduled for removal in v0.3.0 together with WithCheckpointStore. Existing CheckpointStore implementations keep working via storeOnlyHost.
type CheckpointCleaner ¶
type CheckpointCleaner interface {
Cleanup(opts CleanupOptions) (deleted int, err error)
}
CheckpointCleaner supports periodic cleanup of old checkpoints.
type CheckpointManager ¶
type CheckpointManager interface {
CheckpointStore
List() ([]string, error)
Delete(graphName string) error
}
CheckpointManager extends CheckpointStore with lifecycle operations. FileCheckpointStore implements this interface.
type CheckpointStore
deprecated
type CheckpointStore interface {
Save(cp Checkpoint) error
// Load retrieves the latest checkpoint. When runID is non-empty, only
// checkpoints for that specific run are considered.
Load(graphName, runID string) (*Checkpoint, error)
}
CheckpointStore is the interface for persisting and loading checkpoints.
Deprecated: implement engine.CheckpointStore (or expose a Host with a real Checkpointer) and pass it via WithHost. The executor now writes every checkpoint through host.Checkpoint; this interface is kept so code already using WithCheckpointStore keeps compiling and is folded into the host path via storeOnlyHost. Scheduled for removal in v0.3.0.
type CleanupOptions ¶
CleanupOptions configures checkpoint cleanup behavior.
type CloneableResolver ¶
type CloneableResolver interface {
VariableResolver
Clone() *variable.Resolver
}
CloneableResolver is an optional interface for resolvers that support cloning for use in parallel branches.
type Executor
deprecated
type Executor interface {
Execute(ctx context.Context, g *graph.Graph, board *graph.Board, opts ...RunOption) (*graph.Board, error)
}
Executor is the interface for graph execution engines.
Deprecated: superseded by engine.Engine (satisfied by graph/runner.Runner). The new contract folds run identity / host / initial board into typed parameters, which lets the agent runtime drive any engine uniformly without reaching into a Strategy layer. Scheduled for removal in v0.3.0; until then the LocalExecutor that implements this interface is kept as the in-process default the runner delegates to internally.
type FileCheckpointConfig ¶
FileCheckpointConfig configures the file-based checkpoint store.
type FileCheckpointStore ¶
type FileCheckpointStore struct {
// contains filtered or unexported fields
}
FileCheckpointStore persists checkpoints as JSON files.
func NewFileCheckpointStore ¶
func NewFileCheckpointStore(cfg FileCheckpointConfig) (*FileCheckpointStore, error)
NewFileCheckpointStore creates a file-based checkpoint store.
func (*FileCheckpointStore) Cleanup ¶
func (s *FileCheckpointStore) Cleanup(opts CleanupOptions) (int, error)
Cleanup removes stale checkpoint files.
func (*FileCheckpointStore) Delete ¶
func (s *FileCheckpointStore) Delete(graphName string) error
Delete removes the primary checkpoint and all backups for the given graph.
func (*FileCheckpointStore) List ¶
func (s *FileCheckpointStore) List() ([]string, error)
List returns graph names that have a primary checkpoint file. Primary files match "{name}.checkpoint.json"; backup files have an additional timestamp segment ("{name}.checkpoint.{ts}.json") and are skipped.
func (*FileCheckpointStore) Load ¶
func (s *FileCheckpointStore) Load(graphName, runID string) (*Checkpoint, error)
Load reads the latest checkpoint from disk.
func (*FileCheckpointStore) Save ¶
func (s *FileCheckpointStore) Save(cp Checkpoint) error
Save writes a checkpoint to disk with multi-version retention.
type LocalExecutor ¶
type LocalExecutor struct{}
LocalExecutor is the default single-process executor.
func NewLocalExecutor ¶
func NewLocalExecutor() *LocalExecutor
NewLocalExecutor creates a new LocalExecutor.
type MergeFunc ¶
type MergeFunc func(board *graph.Board, snapshot *graph.BoardSnapshot, results []branchResult) error
MergeFunc merges parallel branch results into the parent board. snapshot is the pre-fork board state (for conflict detection).
type MergeStrategy ¶
type MergeStrategy string
MergeStrategy defines how parallel branch results are merged.
const ( MergeLastWins MergeStrategy = "last_wins" MergeNamespace MergeStrategy = "namespace" MergeErrorOnConflict MergeStrategy = "error_on_conflict" )
type ParallelConfig ¶
type ParallelConfig struct {
Enabled bool `json:"enabled"`
MaxBranches int `json:"max_branches"`
MaxNesting int `json:"max_nesting"`
MergeStrategy MergeStrategy `json:"merge_strategy"`
}
ParallelConfig configures parallel fork/join execution.
type RunOption ¶
type RunOption func(*runConfig)
RunOption configures a single graph execution run.
func WithCheckpointStore
deprecated
func WithCheckpointStore(s CheckpointStore) RunOption
WithCheckpointStore installs a graph-format CheckpointStore that persists a checkpoint after every node completes.
Deprecated: prefer WithHost with a host whose Checkpointer wraps an engine.CheckpointStore. The executor now writes every checkpoint through host.Checkpoint and the store passed here is folded into the host via storeOnlyHost (see resolveCheckpointHost). When BOTH WithHost and WithCheckpointStore are supplied the host wins and the store is silently ignored — checkpointing is state, so we do not fan out the way we do for events. Scheduled for removal in v0.3.0.
func WithEventBus
deprecated
WithEventBus installs the event bus used for graph- and node-level envelopes.
Deprecated: pass an engine.Host to WithHost instead — the executor now publishes envelopes through host.Publish, which lets the host centralise routing, fan-out and observability. Scheduled for removal in v0.3.0 alongside the other host-overlapping options.
func WithHost ¶
WithHost installs the engine.Host the executor will hand to nodes via ExecutionContext.Host. When omitted the executor falls back to engine.NoopHost{} so nodes can call ctx.Host methods unconditionally.
WithHost subsumes WithEventBus / WithCheckpointStore: when both are supplied the host wins for publishing and checkpointing while the bus / store remain available for legacy code paths during the transition.
func WithMaxIterations ¶
func WithMaxNodeRetries ¶
func WithParallel ¶
func WithParallel(cfg ParallelConfig) RunOption
func WithResolver ¶
func WithResolver(r VariableResolver) RunOption
func WithRunID
deprecated
WithRunID sets the run identifier the executor uses in telemetry and event subjects.
Deprecated: pass engine.Run.ID to engine.Engine.Execute (i.e. via agent.Run, which mints and forwards it for you). When invoking graph/runner.Runner directly through engine.Engine.Execute, the run.ID parameter is the canonical source. Scheduled for removal in v0.3.0 once executor.Executor is removed.
func WithStartNode ¶
func WithStreamCallback
deprecated
func WithStreamCallback(cb graph.StreamCallback) RunOption
WithStreamCallback installs a legacy stream callback receiving every node delta.
Deprecated: subscribe to engine.Host's event bus, or read ExecutionContext.Publisher inside a node, instead. Scheduled for removal in v0.3.0.