graph

package
v0.1.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2025 License: MIT Imports: 7 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Edge

type Edge[S any] struct {
	// From is the source node ID.
	From string

	// To is the destination node ID.
	To string

	// When is an optional predicate that determines if this edge should be traversed.
	// If nil, the edge is unconditional (always traverse).
	// If non-nil, the edge is only traversed when When(state) returns true.
	When Predicate[S]
}

Edge represents a connection between two nodes in the workflow graph.

Edges define the control flow between nodes. They can be:

  • Unconditional: Always traverse (When = nil)
  • Conditional: Only traverse if predicate returns true (When != nil)

Edges are used during graph construction to define possible transitions. At runtime, the Engine evaluates predicates to determine which edge to follow.

For explicit routing, nodes can return Next in NodeResult which overrides edge-based routing.

Type parameter S is the state type used for predicate evaluation.

type Engine

type Engine[S any] struct {
	// contains filtered or unexported fields
}

Engine orchestrates stateful workflow execution with checkpointing support.

The Engine is the core runtime that:

  • Manages workflow graph topology (nodes and edges)
  • Executes nodes in sequence or parallel
  • Merges state updates via the reducer
  • Persists state at each step via the store
  • Emits observability events via the emitter
  • Enforces execution limits (MaxSteps, Retries)
  • Supports checkpoint save/resume

Type parameter S is the state type shared across the workflow.

Example:

reducer := func(prev, delta MyState) MyState {
    if delta.Query != "" {
        prev.Query = delta.Query
    }
    prev.Steps++
    return prev
}

store := store.NewMemStore[MyState]()
emitter := emit.NewLogEmitter()
opts := Options{MaxSteps: 100}

engine := New(reducer, store, emitter, opts)
engine.Add("process", processNode)
engine.StartAt("process")

final, err := engine.Run(ctx, "run-001", MyState{Query: "hello"})

func New

func New[S any](reducer Reducer[S], st store.Store[S], emitter emit.Emitter, opts Options) *Engine[S]

New creates a new Engine with the given configuration.

Parameters:

  • reducer: Function to merge partial state updates (required for Run)
  • store: Persistence backend for state and checkpoints (required for Run)
  • emitter: Observability event receiver (optional, can be nil)
  • opts: Execution configuration (MaxSteps, Retries)

The constructor does not validate all parameters to allow flexible initialization. Validation occurs when Run() is called.

Example:

engine := New(
    myReducer,
    store.NewMemStore[MyState](),
    emit.NewLogEmitter(),
    Options{MaxSteps: 100, Retries: 3},
)

func (*Engine[S]) Add

func (e *Engine[S]) Add(nodeID string, node Node[S]) error

Add registers a node in the workflow graph.

Nodes must be added before calling StartAt or Run. Node IDs must be unique within the workflow.

Parameters:

  • nodeID: Unique identifier for this node (cannot be empty)
  • node: Node implementation (cannot be nil)

Returns error if:

  • nodeID is empty
  • node is nil
  • a node with this ID already exists

Example:

processNode := NodeFunc[MyState](func(ctx context.Context, s MyState) NodeResult[MyState] {
    return NodeResult[MyState]{
        Delta: MyState{Result: "processed"},
        Route: Stop(),
    }
})

err := engine.Add("process", processNode)

func (*Engine[S]) Connect

func (e *Engine[S]) Connect(from, to string, predicate Predicate[S]) error

Connect creates an edge between two nodes.

Edges define possible transitions in the workflow graph. They can be:

  • Unconditional: Always traverse (predicate = nil)
  • Conditional: Only traverse if predicate returns true

Node explicit routing via NodeResult.Route takes precedence over edges.

Parameters:

  • from: Source node ID (cannot be empty)
  • to: Destination node ID (cannot be empty)
  • predicate: Optional condition for traversal (nil = unconditional)

Returns error if:

  • from or to is empty

Note: Node existence is not validated (lazy validation) to allow flexible graph construction order.

Example:

// Unconditional edge
engine.Connect("nodeA", "nodeB", nil)

// Conditional edge
engine.Connect("router", "pathA", func(s MyState) bool {
    return s.Score > 0.8
})

func (*Engine[S]) ResumeFromCheckpoint

func (e *Engine[S]) ResumeFromCheckpoint(ctx context.Context, cpID string, newRunID string, startNode string) (S, error)

ResumeFromCheckpoint resumes workflow execution from a saved checkpoint.

This enables:

  • Crash recovery (save checkpoints, resume after failure)
  • Branching workflows (checkpoint, try path A, resume from checkpoint, try path B)
  • Manual intervention (pause at checkpoint, human review, resume)
  • A/B testing (checkpoint before experiment, resume multiple times with variants)

The resume operation:

  1. Loads the checkpoint state
  2. Starts a new workflow run with the checkpoint state as initial state
  3. Begins execution at the specified node (typically the next node after checkpoint)
  4. Continues until workflow completes or errors

Parameters:

  • ctx: Context for cancellation
  • cpID: Checkpoint identifier to resume from
  • newRunID: New unique run ID for the resumed execution
  • startNode: Node to begin execution at (typically next node after checkpoint)

Returns:

  • Final state after resumed execution completes
  • Error if checkpoint doesn't exist, startNode invalid, or execution fails

Example:

// Original run with checkpoint
_, _ = engine.Run(ctx, "run-001", initial)
_ = engine.SaveCheckpoint(ctx, "run-001", "after-validation")

// Resume from checkpoint (e.g., after crash or for A/B test)
finalA, _ := engine.ResumeFromCheckpoint(ctx, "after-validation", "run-002-pathA", "pathA")
finalB, _ := engine.ResumeFromCheckpoint(ctx, "after-validation", "run-003-pathB", "pathB")

func (*Engine[S]) Run

func (e *Engine[S]) Run(ctx context.Context, runID string, initial S) (S, error)

Run executes the workflow from start to completion or error.

Workflow execution:

  1. Validates engine configuration (reducer, store, startNode)
  2. Initializes state with initial value
  3. Executes nodes starting from startNode
  4. Follows routing decisions (Stop, Goto, Many)
  5. Applies reducer to merge state updates
  6. Persists state after each node
  7. Emits observability events
  8. Enforces MaxSteps limit
  9. Respects context cancellation

Parameters:

  • ctx: Context for cancellation and request-scoped values
  • runID: Unique identifier for this workflow execution
  • initial: Starting state value

Returns:

  • Final state after workflow completion
  • Error if validation fails, node execution fails, or limits exceeded

Example:

ctx := context.Background()
final, err := engine.Run(ctx, "run-001", MyState{Query: "hello"})
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Final state: %+v\n", final)

func (*Engine[S]) SaveCheckpoint

func (e *Engine[S]) SaveCheckpoint(ctx context.Context, runID string, cpID string) error

SaveCheckpoint creates a named checkpoint for the most recent state of a run.

Checkpoints enable:

  • Branching workflows (save checkpoint, try different paths)
  • Manual resumption points
  • Rollback to known-good states
  • A/B testing (checkpoint before experiment)

The checkpoint captures the latest persisted state from the specified run. Multiple checkpoints can be created for the same run with different labels.

Parameters:

  • ctx: Context for cancellation
  • runID: The workflow run to checkpoint
  • cpID: Unique identifier for this checkpoint (e.g., "after-validation", "before-deploy")

Returns error if:

  • runID doesn't exist or has no persisted state
  • Store operation fails

Example:

// Run workflow
final, _ := engine.Run(ctx, "run-001", initial)

// Save checkpoint at completion
err := engine.SaveCheckpoint(ctx, "run-001", "before-deploy")
if err != nil {
    log.Fatal(err)
}

// Later, can resume from this checkpoint
resumed, _ := engine.ResumeFromCheckpoint(ctx, "before-deploy", "run-002")

func (*Engine[S]) StartAt

func (e *Engine[S]) StartAt(nodeID string) error

StartAt sets the entry point for workflow execution.

The start node is executed first when Run() is called. The node must have been registered via Add() before calling StartAt.

Parameters:

  • nodeID: ID of the node to start execution at

Returns error if:

  • nodeID is empty
  • node with this ID doesn't exist

Example:

engine.Add("start", startNode)
engine.StartAt("start")

type EngineError

type EngineError struct {
	Message string
	Code    string
}

EngineError represents an error from Engine operations.

func (*EngineError) Error

func (e *EngineError) Error() string

type Next

type Next struct {
	// To specifies the next single node to execute.
	// Mutually exclusive with Many and Terminal.
	To string

	// Many specifies multiple nodes to execute in parallel (fan-out).
	// Mutually exclusive with To and Terminal.
	Many []string

	// Terminal indicates workflow execution should stop.
	// Mutually exclusive with To and Many.
	Terminal bool
}

Next specifies the next step(s) in workflow execution after a node completes.

It supports three routing modes:

  • Terminal: Stop execution (Route.Terminal = true)
  • Single: Go to a specific node (Route.To = "nodeID")
  • Fan-out: Go to multiple nodes in parallel (Route.Many = []string{"node1", "node2"})

func Goto

func Goto(nodeID string) Next

Goto returns a Next that routes to the specified node.

func Stop

func Stop() Next

Stop returns a Next that terminates workflow execution.

type Node

type Node[S any] interface {
	// Run executes the node's logic with the given context and state.
	// It returns a NodeResult containing state changes, routing decisions,
	// events, and any errors encountered.
	Run(ctx context.Context, state S) NodeResult[S]
}

Node represents a processing unit in the workflow graph. It receives state of type S, performs computation, and returns a NodeResult.

Nodes are the fundamental building blocks of LangGraph workflows. Each node can:

  • Access the current state
  • Perform computation (call LLMs, tools, or custom logic)
  • Return state modifications via Delta
  • Control routing via Route
  • Emit observability events
  • Handle errors

Type parameter S is the state type shared across the workflow.

type NodeError

type NodeError struct {
	// Message is the human-readable error description.
	Message string

	// Code is a machine-readable error code for programmatic handling.
	Code string

	// NodeID identifies which node produced this error.
	NodeID string

	// Cause is the underlying error that caused this NodeError.
	Cause error
}

NodeError represents an error that occurred during node execution. It provides structured error information for better observability and debugging.

func (*NodeError) Error

func (e *NodeError) Error() string

Error implements the error interface.

func (*NodeError) Unwrap

func (e *NodeError) Unwrap() error

Unwrap returns the underlying cause error for error wrapping support.

type NodeFunc

type NodeFunc[S any] func(ctx context.Context, state S) NodeResult[S]

NodeFunc is a function adapter that implements the Node interface. It allows using plain functions as nodes without creating custom types.

Example:

processNode := NodeFunc[MyState](func(ctx context.Context, s MyState) NodeResult[MyState] {
    return NodeResult[MyState]{
        Delta: MyState{Result: "processed"},
        Route: Stop(),
    }
})

func (NodeFunc[S]) Run

func (f NodeFunc[S]) Run(ctx context.Context, state S) NodeResult[S]

Run implements the Node interface for NodeFunc.

type NodeResult

type NodeResult[S any] struct {
	// Delta is the partial state update produced by this node.
	// It will be merged with the current state using the configured reducer.
	Delta S

	// Route specifies the next step(s) in workflow execution.
	// Use Stop() for terminal nodes, Goto(id) for explicit routing,
	// or set Many for fan-out to multiple nodes.
	Route Next

	// Err contains any error that occurred during node execution.
	// Non-nil errors halt the workflow unless custom error handling is implemented.
	Err error
}

NodeResult represents the output of a node execution.

It contains all information needed to continue workflow execution:

  • Delta: Partial state update to be merged via reducer
  • Route: Next hop(s) for execution flow
  • Events: Observability events emitted during execution
  • Err: Node-level error (if any)

type Options

type Options struct {
	// MaxSteps limits workflow execution to prevent infinite loops.
	// If 0, no limit is enforced (use with caution).
	//
	// Workflow loops (A → B → A) are fully supported. Use MaxSteps to prevent
	// infinite loops when a conditional exit is missing or misconfigured.
	//
	// Loop patterns:
	//   1. Node-based conditional loop:
	//        nodeA returns Goto("B") if shouldContinue(state), else Stop()
	//   2. Edge predicate loop:
	//        Connect("A", "B", loopPredicate)
	//        Connect("A", "exit", exitPredicate)
	//
	// Recommended values:
	//   - Simple workflows (3-5 nodes): MaxSteps = 20
	//   - Workflows with loops: MaxSteps = depth × max_iterations (e.g., 5 nodes × 10 iterations = 50)
	//   - Complex multi-loop workflows: MaxSteps = 100-200
	//
	// When MaxSteps is exceeded, Run() returns EngineError with code "MAX_STEPS_EXCEEDED".
	MaxSteps int

	// Retries specifies how many times to retry a node on transient errors.
	// If 0, nodes are not retried.
	// Transient errors are identified by checking if error implements Retryable interface.
	Retries int
}

Options configures Engine execution behavior.

Zero values are valid - the Engine will use sensible defaults.

type Predicate

type Predicate[S any] func(state S) bool

Predicate is a function that evaluates state to determine if an edge should be traversed.

Predicates enable conditional routing based on workflow state. They should be pure functions (deterministic, no side effects).

Common patterns:

  • Threshold: state.Score > 0.8
  • Presence: state.Result != ""
  • Boolean flag: state.IsReady
  • Complex logic: state.Retries < 3 && state.Error == nil

Type parameter S is the state type to evaluate.

type Reducer

type Reducer[S any] func(prev, delta S) S

Reducer is a function that merges a partial state update (delta) into the previous state.

Reducers are responsible for deterministic state composition, enabling:

  • Sequential state updates across workflow nodes
  • Parallel branch result merging
  • Checkpoint-based state reconstruction

The reducer must be:

  • **Pure**: Same inputs always produce same output
  • **Deterministic**: No randomness or side effects
  • **Commutative** (for parallel merges): Order of deltas shouldn't matter for correctness

Example:

reducer := func(prev, delta MyState) MyState {
    if delta.Query != "" {
        prev.Query = delta.Query  // Last write wins
    }
    prev.Counter += delta.Counter  // Accumulate
    return prev
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL