executor

package
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package executor provides the graph execution engine.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RegisterMergeStrategy

func RegisterMergeStrategy(name MergeStrategy, fn MergeFunc)

RegisterMergeStrategy registers a custom merge strategy.

func WithActorKey

func WithActorKey(ctx context.Context, key string) context.Context

Types

type Checkpoint deprecated

type Checkpoint struct {
	GraphName string               `json:"graph_name"`
	RunID     string               `json:"run_id,omitempty"`
	NodeID    string               `json:"node_id"`
	Iteration int                  `json:"iteration"`
	Board     *graph.BoardSnapshot `json:"board"`
	Timestamp time.Time            `json:"timestamp"`
}

Checkpoint represents a snapshot of graph execution state.

Deprecated: use engine.Checkpoint. This type predates the engine.Host.Checkpointer abstraction; the executor now persists checkpoints through the host. Scheduled for removal in v0.3.0 together with WithCheckpointStore. Existing CheckpointStore implementations keep working via storeOnlyHost.

type CheckpointCleaner

type CheckpointCleaner interface {
	Cleanup(opts CleanupOptions) (deleted int, err error)
}

CheckpointCleaner supports periodic cleanup of old checkpoints.

type CheckpointManager

type CheckpointManager interface {
	CheckpointStore
	List() ([]string, error)
	Delete(graphName string) error
}

CheckpointManager extends CheckpointStore with lifecycle operations. FileCheckpointStore implements this interface.

type CheckpointStore deprecated

type CheckpointStore interface {
	Save(cp Checkpoint) error
	// Load retrieves the latest checkpoint. When runID is non-empty, only
	// checkpoints for that specific run are considered.
	Load(graphName, runID string) (*Checkpoint, error)
}

CheckpointStore is the interface for persisting and loading checkpoints.

Deprecated: implement engine.CheckpointStore (or expose a Host with a real Checkpointer) and pass it via WithHost. The executor now writes every checkpoint through host.Checkpoint; this interface is kept so code already using WithCheckpointStore keeps compiling and is folded into the host path via storeOnlyHost. Scheduled for removal in v0.3.0.

type CleanupOptions

type CleanupOptions struct {
	MaxAge   time.Duration
	MaxCount int
}

CleanupOptions configures checkpoint cleanup behavior.

type CloneableResolver

type CloneableResolver interface {
	VariableResolver
	Clone() *variable.Resolver
}

CloneableResolver is an optional interface for resolvers that support cloning for use in parallel branches.

type Executor deprecated

type Executor interface {
	Execute(ctx context.Context, g *graph.Graph, board *graph.Board, opts ...RunOption) (*graph.Board, error)
}

Executor is the interface for graph execution engines.

Deprecated: superseded by engine.Engine (satisfied by graph/runner.Runner). The new contract folds run identity / host / initial board into typed parameters, which lets the agent runtime drive any engine uniformly without reaching into a Strategy layer. Scheduled for removal in v0.3.0; until then the LocalExecutor that implements this interface is kept as the in-process default the runner delegates to internally.

type FileCheckpointConfig

type FileCheckpointConfig struct {
	Dir            string
	MaxCheckpoints int
}

FileCheckpointConfig configures the file-based checkpoint store.

type FileCheckpointStore

type FileCheckpointStore struct {
	// contains filtered or unexported fields
}

FileCheckpointStore persists checkpoints as JSON files.

func NewFileCheckpointStore

func NewFileCheckpointStore(cfg FileCheckpointConfig) (*FileCheckpointStore, error)

NewFileCheckpointStore creates a file-based checkpoint store.

func (*FileCheckpointStore) Cleanup

func (s *FileCheckpointStore) Cleanup(opts CleanupOptions) (int, error)

Cleanup removes stale checkpoint files.

func (*FileCheckpointStore) Delete

func (s *FileCheckpointStore) Delete(graphName string) error

Delete removes the primary checkpoint and all backups for the given graph.

func (*FileCheckpointStore) List

func (s *FileCheckpointStore) List() ([]string, error)

List returns graph names that have a primary checkpoint file. Primary files match "{name}.checkpoint.json"; backup files have an additional timestamp segment ("{name}.checkpoint.{ts}.json") and are skipped.

func (*FileCheckpointStore) Load

func (s *FileCheckpointStore) Load(graphName, runID string) (*Checkpoint, error)

Load reads the latest checkpoint from disk.

func (*FileCheckpointStore) Save

func (s *FileCheckpointStore) Save(cp Checkpoint) error

Save writes a checkpoint to disk with multi-version retention.

type LocalExecutor

type LocalExecutor struct{}

LocalExecutor is the default single-process executor.

func NewLocalExecutor

func NewLocalExecutor() *LocalExecutor

NewLocalExecutor creates a new LocalExecutor.

func (*LocalExecutor) Execute

func (e *LocalExecutor) Execute(ctx context.Context, g *graph.Graph, board *graph.Board, opts ...RunOption) (*graph.Board, error)

Execute runs the graph from entry (or startNode) to END.

type MergeFunc

type MergeFunc func(board *graph.Board, snapshot *graph.BoardSnapshot, results []branchResult) error

MergeFunc merges parallel branch results into the parent board. snapshot is the pre-fork board state (for conflict detection).

type MergeStrategy

type MergeStrategy string

MergeStrategy defines how parallel branch results are merged.

const (
	MergeLastWins        MergeStrategy = "last_wins"
	MergeNamespace       MergeStrategy = "namespace"
	MergeErrorOnConflict MergeStrategy = "error_on_conflict"
)

type ParallelConfig

type ParallelConfig struct {
	Enabled       bool          `json:"enabled"`
	MaxBranches   int           `json:"max_branches"`
	MaxNesting    int           `json:"max_nesting"`
	MergeStrategy MergeStrategy `json:"merge_strategy"`
}

ParallelConfig configures parallel fork/join execution.

type RunOption

type RunOption func(*runConfig)

RunOption configures a single graph execution run.

func WithCheckpointStore deprecated

func WithCheckpointStore(s CheckpointStore) RunOption

WithCheckpointStore installs a graph-format CheckpointStore that persists a checkpoint after every node completes.

Deprecated: prefer WithHost with a host whose Checkpointer wraps an engine.CheckpointStore. The executor now writes every checkpoint through host.Checkpoint and the store passed here is folded into the host via storeOnlyHost (see resolveCheckpointHost). When BOTH WithHost and WithCheckpointStore are supplied the host wins and the store is silently ignored — checkpointing is state, so we do not fan out the way we do for events. Scheduled for removal in v0.3.0.

func WithEventBus deprecated

func WithEventBus(bus event.Bus) RunOption

WithEventBus installs the event bus used for graph- and node-level envelopes.

Deprecated: pass an engine.Host to WithHost instead — the executor now publishes envelopes through host.Publish, which lets the host centralise routing, fan-out and observability. Scheduled for removal in v0.3.0 alongside the other host-overlapping options.

func WithHost

func WithHost(h engine.Host) RunOption

WithHost installs the engine.Host the executor will hand to nodes via ExecutionContext.Host. When omitted the executor falls back to engine.NoopHost{} so nodes can call ctx.Host methods unconditionally.

WithHost subsumes WithEventBus / WithCheckpointStore: when both are supplied the host wins for publishing and checkpointing while the bus / store remain available for legacy code paths during the transition.

func WithMaxIterations

func WithMaxIterations(n int) RunOption

func WithMaxNodeRetries

func WithMaxNodeRetries(n int) RunOption

func WithParallel

func WithParallel(cfg ParallelConfig) RunOption

func WithResolver

func WithResolver(r VariableResolver) RunOption

func WithRunID deprecated

func WithRunID(id string) RunOption

WithRunID sets the run identifier the executor uses in telemetry and event subjects.

Deprecated: pass engine.Run.ID to engine.Engine.Execute (i.e. via agent.Run, which mints and forwards it for you). When invoking graph/runner.Runner directly through engine.Engine.Execute, the run.ID parameter is the canonical source. Scheduled for removal in v0.3.0 once executor.Executor is removed.

func WithStartNode

func WithStartNode(id string) RunOption

func WithStreamCallback deprecated

func WithStreamCallback(cb graph.StreamCallback) RunOption

WithStreamCallback installs a legacy stream callback receiving every node delta.

Deprecated: subscribe to engine.Host's event bus, or read ExecutionContext.Publisher inside a node, instead. Scheduled for removal in v0.3.0.

func WithTimeout

func WithTimeout(d time.Duration) RunOption

type VariableResolver

type VariableResolver interface {
	ResolveMap(m map[string]any) (map[string]any, error)
	AddScope(name string, vars map[string]any)
}

VariableResolver resolves variable references in node configs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL