executor

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2026 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

Package executor provides the graph execution engine.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Assemble

func Assemble(compiled *compiler.CompiledGraph, factory *node.Factory) (*graph.Graph, error)

Assemble takes a CompiledGraph (static analysis result) and a Factory, then constructs all real node instances and returns an immutable executable Graph.

func RegisterMergeStrategy

func RegisterMergeStrategy(name MergeStrategy, fn MergeFunc)

RegisterMergeStrategy registers a custom merge strategy.

func WithActorKey

func WithActorKey(ctx context.Context, key string) context.Context

Types

type Checkpoint

type Checkpoint struct {
	GraphName string               `json:"graph_name"`
	RunID     string               `json:"run_id,omitempty"`
	NodeID    string               `json:"node_id"`
	Iteration int                  `json:"iteration"`
	Board     *graph.BoardSnapshot `json:"board"`
	Timestamp time.Time            `json:"timestamp"`
}

Checkpoint represents a snapshot of graph execution state.

type CheckpointCleaner

type CheckpointCleaner interface {
	Cleanup(opts CleanupOptions) (deleted int, err error)
}

CheckpointCleaner supports periodic cleanup of old checkpoints.

type CheckpointManager

type CheckpointManager interface {
	CheckpointStore
	List() ([]string, error)
	Delete(graphName string) error
}

CheckpointManager extends CheckpointStore with lifecycle operations. FileCheckpointStore implements this interface.

type CheckpointStore

type CheckpointStore interface {
	Save(cp Checkpoint) error
	// Load retrieves the latest checkpoint. When runID is non-empty, only
	// checkpoints for that specific run are considered.
	Load(graphName, runID string) (*Checkpoint, error)
}

CheckpointStore is the interface for persisting and loading checkpoints.

type CleanupOptions

type CleanupOptions struct {
	MaxAge   time.Duration
	MaxCount int
}

CleanupOptions configures checkpoint cleanup behavior.

type CloneableResolver

type CloneableResolver interface {
	VariableResolver
	Clone() *variable.Resolver
}

CloneableResolver is an optional interface for resolvers that support cloning for use in parallel branches.

type Executor

type Executor interface {
	Execute(ctx context.Context, g *graph.Graph, board *graph.Board, opts ...RunOption) (*graph.Board, error)
}

Executor is the interface for graph execution engines.

type FileCheckpointConfig

type FileCheckpointConfig struct {
	Dir            string
	MaxCheckpoints int
}

FileCheckpointConfig configures the file-based checkpoint store.

type FileCheckpointStore

type FileCheckpointStore struct {
	// contains filtered or unexported fields
}

FileCheckpointStore persists checkpoints as JSON files.

func NewFileCheckpointStore

func NewFileCheckpointStore(cfg FileCheckpointConfig) (*FileCheckpointStore, error)

NewFileCheckpointStore creates a file-based checkpoint store.

func (*FileCheckpointStore) Cleanup

func (s *FileCheckpointStore) Cleanup(opts CleanupOptions) (int, error)

Cleanup removes stale checkpoint files.

func (*FileCheckpointStore) Delete

func (s *FileCheckpointStore) Delete(graphName string) error

Delete removes the primary checkpoint and all backups for the given graph.

func (*FileCheckpointStore) List

func (s *FileCheckpointStore) List() ([]string, error)

List returns graph names that have a primary checkpoint file. Primary files match "{name}.checkpoint.json"; backup files have an additional timestamp segment ("{name}.checkpoint.{ts}.json") and are skipped.

func (*FileCheckpointStore) Load

func (s *FileCheckpointStore) Load(graphName, runID string) (*Checkpoint, error)

Load reads the latest checkpoint from disk.

func (*FileCheckpointStore) Save

func (s *FileCheckpointStore) Save(cp Checkpoint) error

Save writes a checkpoint to disk with multi-version retention.

type LocalExecutor

type LocalExecutor struct{}

LocalExecutor is the default single-process executor.

func NewLocalExecutor

func NewLocalExecutor() *LocalExecutor

NewLocalExecutor creates a new LocalExecutor.

func (*LocalExecutor) Execute

func (e *LocalExecutor) Execute(ctx context.Context, g *graph.Graph, board *graph.Board, opts ...RunOption) (*graph.Board, error)

Execute runs the graph from entry (or startNode) to END.

type MergeFunc

type MergeFunc func(board *graph.Board, snapshot *graph.BoardSnapshot, results []branchResult) error

MergeFunc merges parallel branch results into the parent board. snapshot is the pre-fork board state (for conflict detection).

type MergeStrategy

type MergeStrategy string

MergeStrategy defines how parallel branch results are merged.

const (
	MergeLastWins        MergeStrategy = "last_wins"
	MergeNamespace       MergeStrategy = "namespace"
	MergeErrorOnConflict MergeStrategy = "error_on_conflict"
)

type ParallelConfig

type ParallelConfig struct {
	Enabled       bool          `json:"enabled"`
	MaxBranches   int           `json:"max_branches"`
	MaxNesting    int           `json:"max_nesting"`
	MergeStrategy MergeStrategy `json:"merge_strategy"`
}

ParallelConfig configures parallel fork/join execution.

type RunOption

type RunOption func(*runConfig)

RunOption configures a single graph execution run.

func WithCheckpointStore

func WithCheckpointStore(s CheckpointStore) RunOption

func WithEventBus

func WithEventBus(bus event.EventBus) RunOption

func WithMaxIterations

func WithMaxIterations(n int) RunOption

func WithMaxNodeRetries

func WithMaxNodeRetries(n int) RunOption

func WithParallel

func WithParallel(cfg ParallelConfig) RunOption

func WithResolver

func WithResolver(r VariableResolver) RunOption

func WithRunID

func WithRunID(id string) RunOption

func WithStartNode

func WithStartNode(id string) RunOption

func WithStreamCallback

func WithStreamCallback(cb graph.StreamCallback) RunOption

func WithTimeout

func WithTimeout(d time.Duration) RunOption

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner is a lightweight, concurrency-safe graph executor.

It caches the CompiledGraph (static analysis result) and re-assembles fresh Node instances on every Run call, so concurrent callers never share mutable node state.

func NewRunner

func NewRunner(def *graph.GraphDefinition, factory *node.Factory, opts ...RunnerOption) (*Runner, error)

NewRunner compiles a GraphDefinition and returns a ready-to-use Runner. The factory provides runtime dependencies (LLM resolver, tool registry, etc.) needed to instantiate nodes.

func (*Runner) Bus

func (r *Runner) Bus() event.EventBus

Bus returns the configured EventBus for external subscription.

func (*Runner) Graph

func (r *Runner) Graph() (*graph.Graph, error)

Graph returns a freshly assembled Graph snapshot for inspection. Intended for testing and debugging, not for execution.

func (*Runner) Run

func (r *Runner) Run(ctx context.Context, vars map[string]any, opts ...RunOption) (*graph.Board, error)

Run assembles a fresh graph, populates a Board from vars, and executes. Safe for concurrent use — each call gets independent node instances.

type RunnerOption

type RunnerOption func(*Runner)

RunnerOption configures a Runner.

func WithRunnerEventBus

func WithRunnerEventBus(bus event.EventBus) RunnerOption

WithRunnerEventBus sets the EventBus used for graph lifecycle events. Defaults to event.NoopBus{}.

func WithRunnerExecutor

func WithRunnerExecutor(e Executor) RunnerOption

WithRunnerExecutor overrides the default LocalExecutor.

type VariableResolver

type VariableResolver interface {
	ResolveMap(m map[string]any) (map[string]any, error)
	AddScope(name string, vars map[string]any)
}

VariableResolver resolves variable references in node configs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL