Documentation
¶
Index ¶
- type Edge
- type Engine
- func (e *Engine[S]) Add(nodeID string, node Node[S]) error
- func (e *Engine[S]) Connect(from, to string, predicate Predicate[S]) error
- func (e *Engine[S]) ResumeFromCheckpoint(ctx context.Context, cpID string, newRunID string, startNode string) (S, error)
- func (e *Engine[S]) Run(ctx context.Context, runID string, initial S) (S, error)
- func (e *Engine[S]) SaveCheckpoint(ctx context.Context, runID string, cpID string) error
- func (e *Engine[S]) StartAt(nodeID string) error
- type EngineError
- type Next
- type Node
- type NodeError
- type NodeFunc
- type NodeResult
- type Options
- type Predicate
- type Reducer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Edge ¶
type Edge[S any] struct { // From is the source node ID. From string // To is the destination node ID. To string // When is an optional predicate that determines if this edge should be traversed. // If nil, the edge is unconditional (always traverse). // If non-nil, the edge is only traversed when When(state) returns true. When Predicate[S] }
Edge represents a connection between two nodes in the workflow graph.
Edges define the control flow between nodes. They can be:
- Unconditional: Always traverse (When = nil)
- Conditional: Only traverse if predicate returns true (When != nil)
Edges are used during graph construction to define possible transitions. At runtime, the Engine evaluates predicates to determine which edge to follow.
For explicit routing, nodes can return Next in NodeResult which overrides edge-based routing.
Type parameter S is the state type used for predicate evaluation.
type Engine ¶
type Engine[S any] struct { // contains filtered or unexported fields }
Engine orchestrates stateful workflow execution with checkpointing support.
The Engine is the core runtime that:
- Manages workflow graph topology (nodes and edges)
- Executes nodes in sequence or parallel
- Merges state updates via the reducer
- Persists state at each step via the store
- Emits observability events via the emitter
- Enforces execution limits (MaxSteps, Retries)
- Supports checkpoint save/resume
Type parameter S is the state type shared across the workflow.
Example:
reducer := func(prev, delta MyState) MyState {
if delta.Query != "" {
prev.Query = delta.Query
}
prev.Steps++
return prev
}
store := store.NewMemStore[MyState]()
emitter := emit.NewLogEmitter()
opts := Options{MaxSteps: 100}
engine := New(reducer, store, emitter, opts)
engine.Add("process", processNode)
engine.StartAt("process")
final, err := engine.Run(ctx, "run-001", MyState{Query: "hello"})
func New ¶
func New[S any](reducer Reducer[S], st store.Store[S], emitter emit.Emitter, opts Options) *Engine[S]
New creates a new Engine with the given configuration.
Parameters:
- reducer: Function to merge partial state updates (required for Run)
- store: Persistence backend for state and checkpoints (required for Run)
- emitter: Observability event receiver (optional, can be nil)
- opts: Execution configuration (MaxSteps, Retries)
The constructor does not validate all parameters to allow flexible initialization. Validation occurs when Run() is called.
Example:
engine := New(
myReducer,
store.NewMemStore[MyState](),
emit.NewLogEmitter(),
Options{MaxSteps: 100, Retries: 3},
)
func (*Engine[S]) Add ¶
Add registers a node in the workflow graph.
Nodes must be added before calling StartAt or Run. Node IDs must be unique within the workflow.
Parameters:
- nodeID: Unique identifier for this node (cannot be empty)
- node: Node implementation (cannot be nil)
Returns error if:
- nodeID is empty
- node is nil
- a node with this ID already exists
Example:
processNode := NodeFunc[MyState](func(ctx context.Context, s MyState) NodeResult[MyState] {
return NodeResult[MyState]{
Delta: MyState{Result: "processed"},
Route: Stop(),
}
})
err := engine.Add("process", processNode)
func (*Engine[S]) Connect ¶
Connect creates an edge between two nodes.
Edges define possible transitions in the workflow graph. They can be:
- Unconditional: Always traverse (predicate = nil)
- Conditional: Only traverse if predicate returns true
Node explicit routing via NodeResult.Route takes precedence over edges.
Parameters:
- from: Source node ID (cannot be empty)
- to: Destination node ID (cannot be empty)
- predicate: Optional condition for traversal (nil = unconditional)
Returns error if:
- from or to is empty
Note: Node existence is not validated (lazy validation) to allow flexible graph construction order.
Example:
// Unconditional edge
engine.Connect("nodeA", "nodeB", nil)
// Conditional edge
engine.Connect("router", "pathA", func(s MyState) bool {
return s.Score > 0.8
})
func (*Engine[S]) ResumeFromCheckpoint ¶
func (e *Engine[S]) ResumeFromCheckpoint(ctx context.Context, cpID string, newRunID string, startNode string) (S, error)
ResumeFromCheckpoint resumes workflow execution from a saved checkpoint.
This enables:
- Crash recovery (save checkpoints, resume after failure)
- Branching workflows (checkpoint, try path A, resume from checkpoint, try path B)
- Manual intervention (pause at checkpoint, human review, resume)
- A/B testing (checkpoint before experiment, resume multiple times with variants)
The resume operation:
- Loads the checkpoint state
- Starts a new workflow run with the checkpoint state as initial state
- Begins execution at the specified node (typically the next node after checkpoint)
- Continues until workflow completes or errors
Parameters:
- ctx: Context for cancellation
- cpID: Checkpoint identifier to resume from
- newRunID: New unique run ID for the resumed execution
- startNode: Node to begin execution at (typically next node after checkpoint)
Returns:
- Final state after resumed execution completes
- Error if checkpoint doesn't exist, startNode invalid, or execution fails
Example:
// Original run with checkpoint _, _ = engine.Run(ctx, "run-001", initial) _ = engine.SaveCheckpoint(ctx, "run-001", "after-validation") // Resume from checkpoint (e.g., after crash or for A/B test) finalA, _ := engine.ResumeFromCheckpoint(ctx, "after-validation", "run-002-pathA", "pathA") finalB, _ := engine.ResumeFromCheckpoint(ctx, "after-validation", "run-003-pathB", "pathB")
func (*Engine[S]) Run ¶
Run executes the workflow from start to completion or error.
Workflow execution:
- Validates engine configuration (reducer, store, startNode)
- Initializes state with initial value
- Executes nodes starting from startNode
- Follows routing decisions (Stop, Goto, Many)
- Applies reducer to merge state updates
- Persists state after each node
- Emits observability events
- Enforces MaxSteps limit
- Respects context cancellation
Parameters:
- ctx: Context for cancellation and request-scoped values
- runID: Unique identifier for this workflow execution
- initial: Starting state value
Returns:
- Final state after workflow completion
- Error if validation fails, node execution fails, or limits exceeded
Example:
ctx := context.Background()
final, err := engine.Run(ctx, "run-001", MyState{Query: "hello"})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Final state: %+v\n", final)
func (*Engine[S]) SaveCheckpoint ¶
SaveCheckpoint creates a named checkpoint for the most recent state of a run.
Checkpoints enable:
- Branching workflows (save checkpoint, try different paths)
- Manual resumption points
- Rollback to known-good states
- A/B testing (checkpoint before experiment)
The checkpoint captures the latest persisted state from the specified run. Multiple checkpoints can be created for the same run with different labels.
Parameters:
- ctx: Context for cancellation
- runID: The workflow run to checkpoint
- cpID: Unique identifier for this checkpoint (e.g., "after-validation", "before-deploy")
Returns error if:
- runID doesn't exist or has no persisted state
- Store operation fails
Example:
// Run workflow
final, _ := engine.Run(ctx, "run-001", initial)
// Save checkpoint at completion
err := engine.SaveCheckpoint(ctx, "run-001", "before-deploy")
if err != nil {
log.Fatal(err)
}
// Later, can resume from this checkpoint
resumed, _ := engine.ResumeFromCheckpoint(ctx, "before-deploy", "run-002")
func (*Engine[S]) StartAt ¶
StartAt sets the entry point for workflow execution.
The start node is executed first when Run() is called. The node must have been registered via Add() before calling StartAt.
Parameters:
- nodeID: ID of the node to start execution at
Returns error if:
- nodeID is empty
- node with this ID doesn't exist
Example:
engine.Add("start", startNode)
engine.StartAt("start")
type EngineError ¶
EngineError represents an error from Engine operations.
func (*EngineError) Error ¶
func (e *EngineError) Error() string
type Next ¶
type Next struct {
// To specifies the next single node to execute.
// Mutually exclusive with Many and Terminal.
To string
// Many specifies multiple nodes to execute in parallel (fan-out).
// Mutually exclusive with To and Terminal.
Many []string
// Terminal indicates workflow execution should stop.
// Mutually exclusive with To and Many.
Terminal bool
}
Next specifies the next step(s) in workflow execution after a node completes.
It supports three routing modes:
- Terminal: Stop execution (Route.Terminal = true)
- Single: Go to a specific node (Route.To = "nodeID")
- Fan-out: Go to multiple nodes in parallel (Route.Many = []string{"node1", "node2"})
type Node ¶
type Node[S any] interface { // Run executes the node's logic with the given context and state. // It returns a NodeResult containing state changes, routing decisions, // events, and any errors encountered. Run(ctx context.Context, state S) NodeResult[S] }
Node represents a processing unit in the workflow graph. It receives state of type S, performs computation, and returns a NodeResult.
Nodes are the fundamental building blocks of LangGraph workflows. Each node can:
- Access the current state
- Perform computation (call LLMs, tools, or custom logic)
- Return state modifications via Delta
- Control routing via Route
- Emit observability events
- Handle errors
Type parameter S is the state type shared across the workflow.
type NodeError ¶
type NodeError struct {
// Message is the human-readable error description.
Message string
// Code is a machine-readable error code for programmatic handling.
Code string
// NodeID identifies which node produced this error.
NodeID string
// Cause is the underlying error that caused this NodeError.
Cause error
}
NodeError represents an error that occurred during node execution. It provides structured error information for better observability and debugging.
type NodeFunc ¶
type NodeFunc[S any] func(ctx context.Context, state S) NodeResult[S]
NodeFunc is a function adapter that implements the Node interface. It allows using plain functions as nodes without creating custom types.
Example:
processNode := NodeFunc[MyState](func(ctx context.Context, s MyState) NodeResult[MyState] {
return NodeResult[MyState]{
Delta: MyState{Result: "processed"},
Route: Stop(),
}
})
type NodeResult ¶
type NodeResult[S any] struct { // Delta is the partial state update produced by this node. // It will be merged with the current state using the configured reducer. Delta S // Route specifies the next step(s) in workflow execution. // Use Stop() for terminal nodes, Goto(id) for explicit routing, // or set Many for fan-out to multiple nodes. Route Next // Err contains any error that occurred during node execution. // Non-nil errors halt the workflow unless custom error handling is implemented. Err error }
NodeResult represents the output of a node execution.
It contains all information needed to continue workflow execution:
- Delta: Partial state update to be merged via reducer
- Route: Next hop(s) for execution flow
- Events: Observability events emitted during execution
- Err: Node-level error (if any)
type Options ¶
type Options struct {
// MaxSteps limits workflow execution to prevent infinite loops.
// If 0, no limit is enforced (use with caution).
//
// Workflow loops (A → B → A) are fully supported. Use MaxSteps to prevent
// infinite loops when a conditional exit is missing or misconfigured.
//
// Loop patterns:
// 1. Node-based conditional loop:
// nodeA returns Goto("B") if shouldContinue(state), else Stop()
// 2. Edge predicate loop:
// Connect("A", "B", loopPredicate)
// Connect("A", "exit", exitPredicate)
//
// Recommended values:
// - Simple workflows (3-5 nodes): MaxSteps = 20
// - Workflows with loops: MaxSteps = depth × max_iterations (e.g., 5 nodes × 10 iterations = 50)
// - Complex multi-loop workflows: MaxSteps = 100-200
//
// When MaxSteps is exceeded, Run() returns EngineError with code "MAX_STEPS_EXCEEDED".
MaxSteps int
// Retries specifies how many times to retry a node on transient errors.
// If 0, nodes are not retried.
// Transient errors are identified by checking if error implements Retryable interface.
Retries int
}
Options configures Engine execution behavior.
Zero values are valid - the Engine will use sensible defaults.
type Predicate ¶
Predicate is a function that evaluates state to determine if an edge should be traversed.
Predicates enable conditional routing based on workflow state. They should be pure functions (deterministic, no side effects).
Common patterns:
- Threshold: state.Score > 0.8
- Presence: state.Result != ""
- Boolean flag: state.IsReady
- Complex logic: state.Retries < 3 && state.Error == nil
Type parameter S is the state type to evaluate.
type Reducer ¶
type Reducer[S any] func(prev, delta S) S
Reducer is a function that merges a partial state update (delta) into the previous state.
Reducers are responsible for deterministic state composition, enabling:
- Sequential state updates across workflow nodes
- Parallel branch result merging
- Checkpoint-based state reconstruction
The reducer must be:
- **Pure**: Same inputs always produce same output
- **Deterministic**: No randomness or side effects
- **Commutative** (for parallel merges): Order of deltas shouldn't matter for correctness
Example:
reducer := func(prev, delta MyState) MyState {
if delta.Query != "" {
prev.Query = delta.Query // Last write wins
}
prev.Counter += delta.Counter // Accumulate
return prev
}