Documentation
¶
Overview ¶
Package executor provides the graph execution engine.
Index ¶
- func RegisterMergeStrategy(name MergeStrategy, fn MergeFunc)
- func WithActorKey(ctx context.Context, key string) context.Context
- type Checkpoint
- type CheckpointCleaner
- type CheckpointManager
- type CheckpointStore
- type CleanupOptions
- type CloneableResolver
- type FileCheckpointConfig
- type FileCheckpointStore
- func (s *FileCheckpointStore) Cleanup(opts CleanupOptions) (int, error)
- func (s *FileCheckpointStore) Delete(graphName string) error
- func (s *FileCheckpointStore) List() ([]string, error)
- func (s *FileCheckpointStore) Load(graphName, runID string) (*Checkpoint, error)
- func (s *FileCheckpointStore) Save(cp Checkpoint) error
- type LocalExecutor
- type MergeFunc
- type MergeStrategy
- type ParallelConfig
- type RunOption
- func WithHost(h engine.Host) RunOption
- func WithMaxIterations(n int) RunOption
- func WithMaxNodeRetries(n int) RunOption
- func WithParallel(cfg ParallelConfig) RunOption
- func WithResolver(r VariableResolver) RunOption
- func WithRunID(id string) RunOption
- func WithStartNode(id string) RunOption
- func WithTimeout(d time.Duration) RunOption
- type VariableResolver
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RegisterMergeStrategy ¶
func RegisterMergeStrategy(name MergeStrategy, fn MergeFunc)
RegisterMergeStrategy registers a custom merge strategy.
Types ¶
type Checkpoint ¶
type Checkpoint struct {
GraphName string `json:"graph_name"`
RunID string `json:"run_id,omitempty"`
NodeID string `json:"node_id"`
Iteration int `json:"iteration"`
Board *graph.BoardSnapshot `json:"board"`
Timestamp time.Time `json:"timestamp"`
}
Checkpoint represents a snapshot of graph execution state.
type CheckpointCleaner ¶
type CheckpointCleaner interface {
Cleanup(opts CleanupOptions) (deleted int, err error)
}
CheckpointCleaner supports periodic cleanup of old checkpoints.
type CheckpointManager ¶
type CheckpointManager interface {
CheckpointStore
List() ([]string, error)
Delete(graphName string) error
}
CheckpointManager extends CheckpointStore with lifecycle operations. FileCheckpointStore implements this interface.
type CheckpointStore ¶
type CheckpointStore interface {
Save(cp Checkpoint) error
// Load retrieves the latest checkpoint. When runID is non-empty, only
// checkpoints for that specific run are considered.
Load(graphName, runID string) (*Checkpoint, error)
}
CheckpointStore is the interface for persisting and loading checkpoints.
type CleanupOptions ¶
CleanupOptions configures checkpoint cleanup behavior.
type CloneableResolver ¶
type CloneableResolver interface {
VariableResolver
Clone() *variable.Resolver
}
CloneableResolver is an optional interface for resolvers that support cloning for use in parallel branches.
type FileCheckpointConfig ¶
FileCheckpointConfig configures the file-based checkpoint store.
type FileCheckpointStore ¶
type FileCheckpointStore struct {
// contains filtered or unexported fields
}
FileCheckpointStore persists checkpoints as JSON files.
func NewFileCheckpointStore ¶
func NewFileCheckpointStore(cfg FileCheckpointConfig) (*FileCheckpointStore, error)
NewFileCheckpointStore creates a file-based checkpoint store.
func (*FileCheckpointStore) Cleanup ¶
func (s *FileCheckpointStore) Cleanup(opts CleanupOptions) (int, error)
Cleanup removes stale checkpoint files.
func (*FileCheckpointStore) Delete ¶
func (s *FileCheckpointStore) Delete(graphName string) error
Delete removes the primary checkpoint and all backups for the given graph.
func (*FileCheckpointStore) List ¶
func (s *FileCheckpointStore) List() ([]string, error)
List returns graph names that have a primary checkpoint file. Primary files match "{name}.checkpoint.json"; backup files have an additional timestamp segment ("{name}.checkpoint.{ts}.json") and are skipped.
func (*FileCheckpointStore) Load ¶
func (s *FileCheckpointStore) Load(graphName, runID string) (*Checkpoint, error)
Load reads the latest checkpoint from disk.
func (*FileCheckpointStore) Save ¶
func (s *FileCheckpointStore) Save(cp Checkpoint) error
Save writes a checkpoint to disk with multi-version retention.
type LocalExecutor ¶
type LocalExecutor struct{}
LocalExecutor is the default single-process executor.
func NewLocalExecutor ¶
func NewLocalExecutor() *LocalExecutor
NewLocalExecutor creates a new LocalExecutor.
type MergeFunc ¶
type MergeFunc func(board *graph.Board, snapshot *graph.BoardSnapshot, results []branchResult) error
MergeFunc merges parallel branch results into the parent board. snapshot is the pre-fork board state (for conflict detection).
type MergeStrategy ¶
type MergeStrategy string
MergeStrategy defines how parallel branch results are merged.
const ( MergeLastWins MergeStrategy = "last_wins" MergeNamespace MergeStrategy = "namespace" MergeErrorOnConflict MergeStrategy = "error_on_conflict" )
type ParallelConfig ¶
type ParallelConfig struct {
Enabled bool `json:"enabled"`
MaxBranches int `json:"max_branches"`
MaxNesting int `json:"max_nesting"`
MergeStrategy MergeStrategy `json:"merge_strategy"`
}
ParallelConfig configures parallel fork/join execution.
type RunOption ¶
type RunOption func(*runConfig)
RunOption configures a single graph execution run.
func WithHost ¶
WithHost installs the engine.Host the executor will hand to nodes via ExecutionContext.Host. When omitted the executor falls back to engine.NoopHost{} so nodes can call ctx.Host methods unconditionally.
func WithMaxIterations ¶
func WithMaxNodeRetries ¶
func WithParallel ¶
func WithParallel(cfg ParallelConfig) RunOption
func WithResolver ¶
func WithResolver(r VariableResolver) RunOption
func WithRunID ¶
WithRunID sets the run identifier the executor uses in telemetry and event subjects.