Documentation
¶
Overview ¶
Package graph provides the core graph execution engine for LangGraph-Go.
Package graph provides the core graph execution engine for LangGraph-Go.
Package graph provides the core graph execution engine for LangGraph-Go.
Package graph provides the core graph execution engine for LangGraph-Go.
Package graph provides the core graph execution engine for LangGraph-Go.
Package graph provides the core graph execution engine for LangGraph-Go.
Package graph provides the core graph execution engine for LangGraph-Go.
Package graph provides the core graph execution engine for LangGraph-Go.
Package graph provides the core graph execution engine for LangGraph-Go.
Index ¶
- Constants
- Variables
- func ComputeOrderKey(parentNodeID string, edgeIndex int) uint64
- type Checkpoint
- type ConflictPolicy
- type CostTracker
- func (ct *CostTracker) Disable()
- func (ct *CostTracker) Enable()
- func (ct *CostTracker) GetCallHistory() []LLMCall
- func (ct *CostTracker) GetCostByModel() map[string]float64
- func (ct *CostTracker) GetTokenUsage() (inputTokens, outputTokens int64)
- func (ct *CostTracker) GetTotalCost() float64
- func (ct *CostTracker) RecordLLMCall(model string, inputTokens, outputTokens int, nodeID string) error
- func (ct *CostTracker) Reset()
- func (ct *CostTracker) SetCustomPricing(model string, inputPer1M, outputPer1M float64)
- func (ct *CostTracker) String() string
- type Edge
- type Engine
- func (e *Engine[S]) Add(nodeID string, node Node[S]) error
- func (e *Engine[S]) Connect(from, to string, predicate Predicate[S]) error
- func (e *Engine[S]) ReplayRun(ctx context.Context, runID string) (S, error)
- func (e *Engine[S]) ResumeFromCheckpoint(ctx context.Context, cpID string, newRunID string, startNode string) (S, error)
- func (e *Engine[S]) Run(ctx context.Context, runID string, initial S) (S, error)
- func (e *Engine[S]) RunWithCheckpoint(ctx context.Context, checkpoint store.CheckpointV2[S]) (S, error)
- func (e *Engine[S]) SaveCheckpoint(ctx context.Context, runID string, cpID string) error
- func (e *Engine[S]) StartAt(nodeID string) error
- type EngineError
- type Frontier
- type LLMCall
- type ModelPricing
- type Next
- type Node
- type NodeError
- type NodeFunc
- type NodePolicy
- type NodeResult
- type Option
- func WithBackpressureTimeout(d time.Duration) Option
- func WithConflictPolicy(policy ConflictPolicy) Option
- func WithCostTracker(tracker *CostTracker) Option
- func WithDefaultNodeTimeout(d time.Duration) Option
- func WithMaxConcurrent(n int) Option
- func WithMaxSteps(n int) Option
- func WithMetrics(metrics *PrometheusMetrics) Option
- func WithQueueDepth(n int) Option
- func WithReplayMode(enabled bool) Option
- func WithRunWallClockBudget(d time.Duration) Option
- func WithStrictReplay(enabled bool) Option
- type Options
- type Predicate
- type PrometheusMetrics
- func (pm *PrometheusMetrics) Disable()
- func (pm *PrometheusMetrics) Enable()
- func (pm *PrometheusMetrics) IncrementBackpressure(runID, reason string)
- func (pm *PrometheusMetrics) IncrementMergeConflicts(runID, conflictType string)
- func (pm *PrometheusMetrics) IncrementRetries(runID, nodeID, reason string)
- func (pm *PrometheusMetrics) RecordStepLatency(runID, nodeID string, latency time.Duration, status string)
- func (pm *PrometheusMetrics) Reset()
- func (pm *PrometheusMetrics) UpdateInflightNodes(count int)
- func (pm *PrometheusMetrics) UpdateQueueDepth(depth int)
- type RecordedIO
- type Reducer
- type RetryPolicy
- type SchedulerMetrics
- type SideEffectPolicy
- type StateCopier
- type WorkItem
Constants ¶
const ( // RunIDKey is the context key for the unique workflow run identifier. RunIDKey contextKey = "langgraph.run_id" // StepIDKey is the context key for the current execution step number. StepIDKey contextKey = "langgraph.step_id" // NodeIDKey is the context key for the current node identifier. NodeIDKey contextKey = "langgraph.node_id" // OrderKeyKey is the context key for the deterministic ordering key. // This hash determines the order in which concurrent node results are merged. OrderKeyKey contextKey = "langgraph.order_key" // AttemptKey is the context key for the current retry attempt number (0-based). // Value is 0 for first execution, incremented on each retry. AttemptKey contextKey = "langgraph.attempt" // RNGKey is the context key for the seeded random number generator. // Provides deterministic randomness for replay scenarios. // Type: *rand.Rand (from math/rand package) RNGKey contextKey = "langgraph.rng" // RecordedIOsKey is the context key for storing recorded I/O during replay. RecordedIOsKey contextKey = "langgraph.recordedIOs" )
Context keys for propagating execution metadata to nodes. These values are injected into the context passed to Node.Run() and can be retrieved by nodes to access execution metadata such as the current run ID, step number, and node ID.
Example usage in a node:
func (n *MyNode) Run(ctx context.Context, state MyState) NodeResult[MyState] {
runID := ctx.Value(RunIDKey).(string)
stepID := ctx.Value(StepIDKey).(int)
// Use metadata for logging, tracing, etc.
}
Variables ¶
var ErrBackpressure = errors.New("downstream backpressure exceeded threshold")
ErrBackpressure indicates that downstream processing cannot keep up with. the current execution rate. This typically occurs when output buffers are. full or rate limits are exceeded. This is distinct from ErrBackpressureTimeout. which is specifically for frontier queue overflow.
var ErrBackpressureTimeout = errors.New("backpressure timeout: frontier queue full")
ErrBackpressureTimeout is returned when the frontier queue remains full beyond the configured timeout. This indicates nodes are being enqueued faster than they can be executed. The engine checkpoints state and pauses execution to prevent memory exhaustion. Resume execution after reducing load or increasing MaxConcurrentNodes.
var ErrIdempotencyViolation = errors.New("idempotency violation: checkpoint already committed")
ErrIdempotencyViolation is returned when attempting to commit a checkpoint with a duplicate idempotency key. This prevents duplicate execution of non-idempotent operations (e.g., charging payment twice). Idempotency keys are computed from runID, stepID, frontier state, and accumulated state. If this error occurs, the checkpoint was already successfully committed in a previous execution.
var ErrInvalidRetryPolicy = errors.New("invalid retry policy configuration")
ErrInvalidRetryPolicy indicates that a RetryPolicy configuration is invalid. This occurs when: - MaxAttempts < 1 (at least one attempt is required) - MaxDelay > 0 and MaxDelay < BaseDelay (cap cannot be less than base)
var ErrMaxAttemptsExceeded = errors.New("max retry attempts exceeded")
ErrMaxAttemptsExceeded is returned when a node fails more times than allowed by its retry policy. The node's MaxAttempts limit has been reached, and the error is considered non-recoverable. Check the node's error logs to diagnose the root cause of repeated failures.
var ErrMaxStepsExceeded = errors.New("execution exceeded maximum steps limit")
ErrMaxStepsExceeded indicates that the graph execution reached the maximum. allowed step count without completing. This prevents infinite loops and. runaway executions.
var ErrNoProgress = errors.New("no progress: no runnable nodes in frontier")
ErrNoProgress is returned when the scheduler detects a deadlock condition. This occurs when the frontier queue is empty but no nodes are actively running, meaning the workflow cannot make forward progress. Common causes: - All nodes waiting on conditions that will never be satisfied. - Circular dependencies without conditional break. - Missing edges or routing logic.
var ErrReplayMismatch = errors.New("replay mismatch: recorded I/O hash mismatch")
ErrReplayMismatch is returned when recorded I/O hash does not match current execution during replay. This indicates non-deterministic behavior in a node (e.g., random values, system time, or external state). Replay mode expects nodes to produce identical outputs given identical inputs.
Functions ¶
func ComputeOrderKey ¶
ComputeOrderKey generates a deterministic sort key from the parent node ID and edge index. This key ensures consistent execution ordering across replays, regardless of runtime scheduling variations or goroutine completion order.
The key is computed as follows:
- Hash the concatenation of parentNodeID + edgeIndex (as 4-byte big-endian int)
- Extract the first 8 bytes of the SHA-256 hash
- Interpret as uint64 (big-endian)
This approach guarantees:
- Determinism: Same inputs always produce the same order key
- Low collision probability: SHA-256 provides cryptographic collision resistance
- Total ordering: uint64 keys can be consistently sorted
- Path awareness: Keys capture the execution path context
The order key enables the frontier queue to maintain deterministic work item priority even when nodes execute concurrently and complete out of order.
Types ¶
type Checkpoint ¶
type Checkpoint[S any] struct { // RunID uniquely identifies the execution this checkpoint belongs to. RunID string `json:"run_id"` // StepID is the execution step number at checkpoint time. // Monotonically increasing within a run. StepID int `json:"step_id"` // State is the current accumulated state after applying all deltas up to StepID. // Must be JSON-serializable for persistence. State S `json:"state"` // Frontier contains the work items ready to execute at this checkpoint. // These represent the nodes queued for execution when resuming from this checkpoint. Frontier []WorkItem[S] `json:"frontier"` // RNGSeed is the seed for deterministic random number generation. // Computed from RunID to ensure consistent random values across replays. RNGSeed int64 `json:"rng_seed"` // RecordedIOs contains all captured external interactions up to this checkpoint. // Indexed by (NodeID, Attempt) for lookup during replay. RecordedIOs []RecordedIO `json:"recorded_ios"` // IdempotencyKey is a hash of (RunID, StepID, State, Frontier) that prevents. // duplicate checkpoint commits. Format: "sha256:hex_encoded_hash". IdempotencyKey string `json:"idempotency_key"` // Timestamp records when this checkpoint was created. Timestamp time.Time `json:"timestamp"` // Label is an optional user-defined name for this checkpoint, useful for. // debugging or creating named save points (e.g., "before_summary", "after_validation"). // Empty string for automatic checkpoints. Label string `json:"label,omitempty"` }
Checkpoint represents a durable snapshot of execution state, enabling. resumption and deterministic replay of graph executions.
Checkpoints are created atomically after each execution step and contain. all the information needed to resume from that point: - Current accumulated state. - Work items ready to execute (frontier). - Recorded I/O for replay. - RNG seed for deterministic random number generation. - Idempotency key for preventing duplicate commits.
Checkpoints support both automatic resumption after failures and. user-initiated labeled snapshots for debugging or branching workflows.
type ConflictPolicy ¶
type ConflictPolicy int
ConflictPolicy defines how concurrent state updates are handled when multiple branches. modify the same state fields.
This is a placeholder for future CRDT (Conflict-free Replicated Data Type) support. Currently only ConflictFail is supported, which returns an error on detected conflicts.
Planned policies: - ConflictFail: Return error on conflict (default, implemented). - LastWriterWins: Last merge wins based on OrderKey. - CRDT: Use conflict-free replicated data types for automatic resolution.
const ( // ConflictFail returns an error when concurrent branches modify the same state field. // This is the safest default, requiring explicit conflict resolution. ConflictFail ConflictPolicy = iota // LastWriterWins uses the delta with the highest OrderKey when conflicts occur. // WARNING: Not yet implemented. Will return error if specified. LastWriterWins // ConflictCRDT uses CRDT semantics for automatic conflict resolution. // WARNING: Not yet implemented. Will return error if specified. ConflictCRDT )
type CostTracker ¶
type CostTracker struct {
// RunID associates costs with a specific workflow execution.
RunID string
// Currency is the cost unit (e.g., "USD").
Currency string
// Pricing maps model names to their input/output token costs.
Pricing map[string]ModelPricing
// Calls records all LLM invocations with full details.
Calls []LLMCall
// TotalCost accumulates all costs in the specified currency.
TotalCost float64
// ModelCosts tracks costs per model for attribution.
ModelCosts map[string]float64
// InputTokens counts total input tokens across all calls.
InputTokens int64
// OutputTokens counts total output tokens across all calls.
OutputTokens int64
// CreatedAt marks when cost tracking began.
CreatedAt time.Time
// contains filtered or unexported fields
}
CostTracker (T040) tracks financial costs associated with LLM API calls, providing detailed token usage and cost attribution for production monitoring.
Features: - Per-model token counting (input/output separate). - Accurate cost calculation using static pricing tables. - Cumulative cost tracking across multiple calls. - Per-model cost breakdown for attribution. - Thread-safe concurrent recording.
Pricing is based on static tables (defaultModelPricing) for major providers: - OpenAI: GPT-4o, GPT-4o-mini, GPT-4-turbo, GPT-3.5-turbo. - Anthropic: Claude 3.5 Sonnet, Claude 3 Opus/Sonnet/Haiku. - Google: Gemini 1.5 Pro/Flash, Gemini 1.0 Pro.
Usage:
// Create tracker for a run. tracker := NewCostTracker("run-123", "USD").
// Record LLM calls. tracker.RecordLLMCall("gpt-4o", 1000, 500, "nodeA"). tracker.RecordLLMCall("claude-3-sonnet", 2000, 800, "nodeB").
// Get total cost. total := tracker.GetTotalCost() // e.g., $0.0345.
// Get per-model breakdown. costs := tracker.GetCostByModel() // map[string]float64{"gpt-4o": 0.0125, "claude-3-sonnet": 0.0220}.
Thread-safe: All methods use mutex protection for concurrent access.
func NewCostTracker ¶
func NewCostTracker(runID, currency string) *CostTracker
NewCostTracker (T040) creates a new cost tracker with default pricing tables.
Parameters: - runID: Unique workflow execution identifier. - currency: Cost unit (e.g., "USD").
Returns: - *CostTracker: Fully initialized cost tracker with default model pricing.
Example:
tracker := NewCostTracker("run-123", "USD").
func (*CostTracker) Disable ¶
func (ct *CostTracker) Disable()
Disable temporarily disables cost tracking (useful for testing).
func (*CostTracker) Enable ¶
func (ct *CostTracker) Enable()
Enable re-enables cost tracking after Disable().
func (*CostTracker) GetCallHistory ¶
func (ct *CostTracker) GetCallHistory() []LLMCall
GetCallHistory returns all recorded LLM calls with full metadata.
Returns: - []LLMCall: Slice of all calls in chronological order.
Thread-safe: Uses read lock and returns a copy.
func (*CostTracker) GetCostByModel ¶
func (ct *CostTracker) GetCostByModel() map[string]float64
GetCostByModel (T043) returns a breakdown of costs attributed to each model.
Returns: - map[string]float64: Map of model name to cumulative cost in tracker's currency.
Example:
costs := tracker.GetCostByModel(). for model, cost := range costs {. fmt.Printf("%s: $%.4f\n", model, cost). }.
// Output:
// gpt-4o: $0.0125. // claude-3-sonnet: $0.0220.
Thread-safe: Uses read lock and returns a copy to prevent mutation.
func (*CostTracker) GetTokenUsage ¶
func (ct *CostTracker) GetTokenUsage() (inputTokens, outputTokens int64)
GetTokenUsage returns total input and output token counts.
Returns: - inputTokens: Total input tokens across all calls. - outputTokens: Total output tokens across all calls.
Thread-safe: Uses read lock.
func (*CostTracker) GetTotalCost ¶
func (ct *CostTracker) GetTotalCost() float64
GetTotalCost (T042) returns the cumulative cost across all recorded LLM calls.
Returns: - float64: Total cost in the tracker's currency (e.g., USD).
Example:
total := tracker.GetTotalCost(). fmt.Printf("Total LLM cost: $%.4f\n", total).
Thread-safe: Uses read lock for concurrent access.
func (*CostTracker) RecordLLMCall ¶
func (ct *CostTracker) RecordLLMCall(model string, inputTokens, outputTokens int, nodeID string) error
RecordLLMCall (T041) records a single LLM API invocation with token usage and calculates cost.
This method: 1. Looks up model pricing from the static pricing table. 2. Calculates cost: (inputTokens * inputPrice + outputTokens * outputPrice) / 1M. 3. Records the call with full metadata. 4. Updates cumulative totals (TotalCost, ModelCosts, InputTokens, OutputTokens).
Parameters: - model: Model identifier (must exist in pricing table, e.g., "gpt-4o", "claude-3-sonnet"). - inputTokens: Number of input tokens consumed. - outputTokens: Number of output tokens generated. - nodeID: Node that made the call (optional, use "" if not applicable).
Returns: - error: ErrUnknownModel if model not found in pricing table, nil otherwise.
Example:
// Record a GPT-4o call: 1000 input tokens, 500 output tokens. err := tracker.RecordLLMCall("gpt-4o", 1000, 500, "research_node"). if err != nil {. log.Printf("Cost tracking error: %v", err). }.
Thread-safe: Uses mutex protection for concurrent recording.
func (*CostTracker) Reset ¶
func (ct *CostTracker) Reset()
Reset clears all recorded data and resets cumulative totals. Preserves pricing configuration.
func (*CostTracker) SetCustomPricing ¶
func (ct *CostTracker) SetCustomPricing(model string, inputPer1M, outputPer1M float64)
SetCustomPricing allows overriding default pricing for specific models. Useful for custom deployments, enterprise pricing, or price updates.
Parameters: - model: Model identifier. - inputPer1M: Cost per 1M input tokens in USD. - outputPer1M: Cost per 1M output tokens in USD.
Example:
// Override GPT-4o pricing for enterprise rate. tracker.SetCustomPricing("gpt-4o", 2.00, 8.00).
func (*CostTracker) String ¶
func (ct *CostTracker) String() string
String returns a human-readable summary of cost tracking.
type Edge ¶
type Edge[S any] struct { // From is the source node ID. From string // To is the destination node ID. To string // When is an optional predicate that determines if this edge should be traversed. // If nil, the edge is unconditional (always traverse). // If non-nil, the edge is only traversed when When(state) returns true. When Predicate[S] }
Edge represents a connection between two nodes in the workflow graph.
Edges define the control flow between nodes. They can be: - Unconditional: Always traverse (When = nil). - Conditional: Only traverse if predicate returns true (When != nil).
Edges are used during graph construction to define possible transitions. At runtime, the Engine evaluates predicates to determine which edge to follow.
For explicit routing, nodes can return Next in NodeResult which overrides. edge-based routing.
Type parameter S is the state type used for predicate evaluation.
type Engine ¶
type Engine[S any] struct { // contains filtered or unexported fields }
Engine orchestrates stateful workflow execution with checkpointing support.
The Engine is the core runtime that:
- Manages workflow graph topology (nodes and edges)
- Executes nodes in sequence or parallel
- Merges state updates via the reducer
- Persists state at each step via the store
- Emits observability events via the emitter
- Enforces execution limits (MaxSteps, Retries)
- Supports checkpoint save/resume
Concurrency Model ¶
The Engine supports two execution modes:
**Sequential Mode** (MaxConcurrentNodes = 0):
- Nodes execute one at a time in strict order
- Deterministic execution: Same inputs → Same outputs
- Best for workflows requiring strict ordering or replay guarantees
- Lower resource usage, simpler debugging
**Concurrent Mode** (MaxConcurrentNodes > 0):
- Multiple nodes execute simultaneously (bounded by MaxConcurrentNodes)
- Worker pool with FIFO work stealing from priority heap
- Higher throughput for parallel workflows
- Recommended: MaxConcurrentNodes = NumCPU for CPU-bound tasks
Concurrency Safety Mechanisms (Bug Fixes 2025-10-29) ¶
The Engine implements four critical concurrency patterns:
**1. Per-Worker RNG Derivation (BUG-002 Fix)**
- Each worker receives a unique deterministic RNG instance
- Derived from RunID + WorkerID to ensure thread safety
- Enables deterministic random values in concurrent execution
- Context key: RNGKey (*rand.Rand)
**2. Heap-Based Priority Ordering (BUG-003 Fix)**
- Frontier uses heap as single source of truth
- Channel provides notifications only (empty struct{})
- Ensures deterministic OrderKey-based merge ordering
- Prevents out-of-order processing in concurrent scenarios
**3. Buffered Error Delivery (BUG-001 Fix)**
- Results channel sized at MaxConcurrentNodes*2
- sendErrorAndCancel blocks on error delivery (errors are rare + critical)
- Guarantees 100% error delivery rate
- Prevents deadlocks under simultaneous error conditions
**4. Atomic Completion Detection (BUG-004 Fix)**
- CompareAndSwap-based completion flag (race-free)
- Workers check completion after dequeue failure and node execution
- 290x faster than polling (36µs vs 10.5ms)
- Zero premature or delayed termination scenarios
Deterministic Replay Guarantees ¶
When using the same RunID across multiple executions:
- Sequential mode (MaxConcurrentNodes=0): 100% deterministic (byte-identical states)
- Concurrent mode: Deterministic when using OrderKey for merge ordering
- RNG sequences identical across replays (per-worker seed derivation from RunID)
- Retry delays and backoff jitter deterministic (RNG-based with deterministic seed)
Validation Results (see CHANGELOG.md "Deterministic Replay Validation"):
- 1000-iteration stress test: byte-identical final states across all runs
- RNG sequence validation: 100 runs with identical random value sequences
- OrderKey merge consistency: 50 runs with identical parallel branch merge order
- Throughput: 9,750 executions/sec with zero determinism failures
Prerequisites for deterministic replay:
- Use consistent RunID across replay attempts
- Nodes must use context-provided RNG (ctx.Value(RNGKey)), not global rand
- External I/O (network, filesystem) must be mocked or use deterministic responses
- System time calls (time.Now()) should use context-provided clock for testing
See graph/replay_validation_test.go for comprehensive validation test suite.
Type parameter S is the state type shared across the workflow.
Example:
reducer := func(prev, delta MyState) MyState {
if delta.Query != "" {
prev.Query = delta.Query
}
prev.Steps++
return prev
}
store := store.NewMemStore[MyState]()
emitter := emit.NewLogEmitter()
opts := Options{
MaxSteps: 100,
MaxConcurrentNodes: 4, // 4 parallel workers
}
engine := New(reducer, store, emitter, opts)
engine.Add("process", processNode)
engine.StartAt("process")
final, err := engine.Run(ctx, "run-001", MyState{Query: "hello"})
func New ¶
func New[S any](reducer Reducer[S], st store.Store[S], emitter emit.Emitter, options ...interface{}) *Engine[S]
New creates a new Engine with the given configuration.
Supports two configuration patterns for backward compatibility:
1. Options struct (legacy):
engine := New(reducer, store, emitter, Options{MaxSteps: 100})
2. Functional options (recommended):
engine := New(
reducer, store, emitter,
WithMaxConcurrent(16),
WithQueueDepth(2048),
WithDefaultNodeTimeout(10*time.Second),
)
3. Mixed (Options struct + functional options):
baseOpts := Options{MaxSteps: 100}
engine := New(
reducer, store, emitter,
baseOpts,
WithMaxConcurrent(8), // Overrides baseOpts if specified
)
Parameters:
- reducer: Function to merge partial state updates (required for Run)
- store: Persistence backend for state and checkpoints (required for Run)
- emitter: Observability event receiver (optional, can be nil)
- options: Configuration via Options struct or variadic Option functions
The constructor does not validate all parameters to allow flexible initialization. Validation occurs when Run() is called.
Functional options (recommended):
- WithMaxConcurrent(n): Set max concurrent nodes
- WithQueueDepth(n): Set frontier queue capacity
- WithBackpressureTimeout(d): Set queue full timeout
- WithDefaultNodeTimeout(d): Set default node timeout
- WithRunWallClockBudget(d): Set total execution timeout
- WithReplayMode(bool): Enable replay mode
- WithStrictReplay(bool): Enable strict replay validation
- WithConflictPolicy(policy): Set conflict resolution policy
func (*Engine[S]) Add ¶
Add registers a node in the workflow graph.
Nodes must be added before calling StartAt or Run. Node IDs must be unique within the workflow.
Parameters:
- nodeID: Unique identifier for this node (cannot be empty)
- node: Node implementation (cannot be nil)
Returns error if:
- nodeID is empty
- node is nil
- a node with this ID already exists
Example:
processNode := NodeFunc[MyState](func(ctx context.Context, s MyState) NodeResult[MyState] {
return NodeResult[MyState]{
Delta: MyState{Result: "processed"},
Route: Stop(),
}
})
err := engine.Add("process", processNode)
func (*Engine[S]) Connect ¶
Connect creates an edge between two nodes.
Edges define possible transitions in the workflow graph. They can be:
- Unconditional: Always traverse (predicate = nil)
- Conditional: Only traverse if predicate returns true
Node explicit routing via NodeResult.Route takes precedence over edges.
Parameters:
- from: Source node ID (cannot be empty)
- to: Destination node ID (cannot be empty)
- predicate: Optional condition for traversal (nil = unconditional)
Returns error if:
- from or to is empty
Note: Node existence is not validated (lazy validation) to allow flexible graph construction order.
Example:
// Unconditional edge
engine.Connect("nodeA", "nodeB", nil)
// Conditional edge
engine.Connect("router", "pathA", func(s MyState) bool {
return s.Score > 0.8
})
func (*Engine[S]) ReplayRun ¶
ReplayRun replays a previous execution using recorded I/O without re-invoking external services.
This method enables exact replay of executions for debugging, auditing, or testing. It loads the latest checkpoint for the given runID and replays the execution using recorded I/O responses instead of making live external calls.
The replay process:
- Loads the latest checkpoint containing recorded I/O and state
- Configures engine in replay mode (Options.ReplayMode=true)
- Executes nodes using recorded responses instead of live calls
- Verifies execution matches original via hash comparison (if StrictReplay=true)
Parameters:
- ctx: Context for cancellation and request-scoped values
- runID: Unique identifier of the run to replay
Returns:
- Final state after replayed execution completes
- Error if replay fails, checkpoint not found, or mismatch detected
Example:
// Original execution with I/O recording
opts := Options{ReplayMode: false} // Record mode
engine := New(reducer, store, emitter, opts)
_, err := engine.Run(ctx, "run-001", initialState)
// Later: replay execution for debugging
replayOpts := Options{ReplayMode: true, StrictReplay: true}
replayEngine := New(reducer, store, emitter, replayOpts)
replayedState, err := replayEngine.ReplayRun(ctx, "run-001")
if err != nil {
log.Fatal(err)
}
fmt.Printf("Replayed state: %+v\n", replayedState)
Use Cases:
- Debugging: Replay production issues locally without external dependencies
- Testing: Verify workflow logic without mocking external services
- Auditing: Reconstruct exact execution flow for compliance
- Development: Test changes against recorded production data
Requirements:
- Original run must have been executed with recordable nodes
- Checkpoint must contain recorded I/O (RecordedIOs field populated)
- Engine must be configured with ReplayMode=true
Thread-safety: This method is safe for concurrent use with different runIDs.
func (*Engine[S]) ResumeFromCheckpoint ¶
func (e *Engine[S]) ResumeFromCheckpoint(ctx context.Context, cpID string, newRunID string, startNode string) (S, error)
ResumeFromCheckpoint resumes workflow execution from a saved checkpoint.
This enables:
- Crash recovery (save checkpoints, resume after failure)
- Branching workflows (checkpoint, try path A, resume from checkpoint, try path B)
- Manual intervention (pause at checkpoint, human review, resume)
- A/B testing (checkpoint before experiment, resume multiple times with variants)
The resume operation:
- Loads the checkpoint state
- Starts a new workflow run with the checkpoint state as initial state
- Begins execution at the specified node (typically the next node after checkpoint)
- Continues until workflow completes or errors
Parameters:
- ctx: Context for cancellation
- cpID: Checkpoint identifier to resume from
- newRunID: New unique run ID for the resumed execution
- startNode: Node to begin execution at (typically next node after checkpoint)
Returns:
- Final state after resumed execution completes
- Error if checkpoint doesn't exist, startNode invalid, or execution fails
Example:
// Original run with checkpoint _, _ = engine.Run(ctx, "run-001", initial) _ = engine.SaveCheckpoint(ctx, "run-001", "after-validation") // Resume from checkpoint (e.g., after crash or for A/B test) finalA, _ := engine.ResumeFromCheckpoint(ctx, "after-validation", "run-002-pathA", "pathA") finalB, _ := engine.ResumeFromCheckpoint(ctx, "after-validation", "run-003-pathB", "pathB")
func (*Engine[S]) Run ¶
Run executes the workflow from start to completion or error.
Workflow execution:
- Validates engine configuration (reducer, store, startNode)
- Initializes state with initial value
- Executes nodes starting from startNode
- Follows routing decisions (Stop, Goto, Many)
- Applies reducer to merge state updates
- Persists state after each node
- Emits observability events
- Enforces MaxSteps limit
- Respects context cancellation
Parameters:
- ctx: Context for cancellation and request-scoped values
- runID: Unique identifier for this workflow execution
- initial: Starting state value
Returns:
- Final state after workflow completion
- Error if validation fails, node execution fails, or limits exceeded
Example:
ctx := context.Background()
final, err := engine.Run(ctx, "run-001", MyState{Query: "hello"})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Final state: %+v\n", final)
func (*Engine[S]) RunWithCheckpoint ¶
func (e *Engine[S]) RunWithCheckpoint(ctx context.Context, checkpoint store.CheckpointV2[S]) (S, error)
RunWithCheckpoint resumes execution from a saved checkpoint.
This method enables crash recovery and branching workflows by resuming execution from a previously saved checkpoint. It restores the complete execution context:
- Accumulated state
- Execution frontier (pending work items)
- RNG seed for deterministic random values
- Recorded I/O for replay
The resumed execution continues from where the checkpoint was created, processing all work items in the frontier until the workflow completes or encounters an error.
Parameters:
- ctx: Context for cancellation and request-scoped values
- checkpoint: Complete checkpoint containing execution state and context
Returns:
- Final state after resumed execution completes
- Error if validation fails, node execution fails, or limits exceeded
Example:
// Original execution with checkpoint
_, _ = engine.Run(ctx, "run-001", initialState)
// Load checkpoint from store
checkpoint, err := store.LoadCheckpointV2(ctx, "run-001", 5)
if err != nil {
log.Fatal(err)
}
// Resume execution from checkpoint
final, err := engine.RunWithCheckpoint(ctx, checkpoint)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Resumed execution completed: %+v\n", final)
Use Cases:
- Crash recovery: Resume after application restart
- Debugging: Replay execution from specific checkpoint
- Branching: Try different paths from same checkpoint
- Long-running workflows: Checkpoint and resume across restarts
Thread-safety: This method is safe for concurrent use with different checkpoints.
func (*Engine[S]) SaveCheckpoint ¶
SaveCheckpoint creates a named checkpoint for the most recent state of a run.
Checkpoints enable:
- Branching workflows (save checkpoint, try different paths)
- Manual resumption points
- Rollback to known-good states
- A/B testing (checkpoint before experiment)
The checkpoint captures the latest persisted state from the specified run. Multiple checkpoints can be created for the same run with different labels.
Parameters:
- ctx: Context for cancellation
- runID: The workflow run to checkpoint
- cpID: Unique identifier for this checkpoint (e.g., "after-validation", "before-deploy")
Returns error if:
- runID doesn't exist or has no persisted state
- Store operation fails
Example:
// Run workflow
final, _ := engine.Run(ctx, "run-001", initial)
// Save checkpoint at completion
err := engine.SaveCheckpoint(ctx, "run-001", "before-deploy")
if err != nil {
log.Fatal(err)
}
// Later, can resume from this checkpoint
resumed, _ := engine.ResumeFromCheckpoint(ctx, "before-deploy", "run-002")
func (*Engine[S]) StartAt ¶
StartAt sets the entry point for workflow execution.
The start node is executed first when Run() is called. The node must have been registered via Add() before calling StartAt.
Parameters:
- nodeID: ID of the node to start execution at
Returns error if:
- nodeID is empty
- node with this ID doesn't exist
Example:
engine.Add("start", startNode)
engine.StartAt("start")
type EngineError ¶
EngineError represents an error from Engine operations.
func (*EngineError) Error ¶
func (e *EngineError) Error() string
type Frontier ¶
type Frontier[S any] struct { // contains filtered or unexported fields }
Frontier manages the work queue for concurrent graph execution with bounded capacity and deterministic ordering. It combines a priority queue (heap) for ordering with a buffered channel for bounded queue depth and backpressure.
The Frontier ensures that work items are dequeued in deterministic order (by OrderKey) even when they are enqueued concurrently from multiple goroutines. This is critical for deterministic replay of graph executions.
Architecture (BUG-003 fix): - Heap is the single source of truth for work item storage and ordering - Channel carries empty struct notifications only (not data) - Enqueue: Push to heap THEN send notification to channel - Dequeue: Wait for notification from channel THEN pop from heap This prevents heap/channel desynchronization and reduces memory usage.
The bounded channel provides backpressure: when the queue is full, Enqueue will block until capacity becomes available or the context is cancelled. This prevents unbounded memory growth when nodes produce work faster than it can be consumed.
Thread-safety: All methods are safe for concurrent use by multiple goroutines.
func NewFrontier ¶
func NewFrontier[S any](ctx context.Context, capacity int, runID string, metrics *PrometheusMetrics, emitter emit.Emitter) *Frontier[S]
NewFrontier creates a new Frontier with the specified capacity and context. The capacity determines the maximum number of work items that can be queued. The context is used for cancellation propagation.
US3: Optional observability parameters enable backpressure monitoring:
- runID: Run identifier for event/metric attribution
- metrics: Prometheus metrics collector for backpressure_events_total
- emitter: Event emitter for backpressure events with metadata
BUG-003 fix (T019): Channel is notification-only (empty struct), not data carrier.
func (*Frontier[S]) Dequeue ¶
Dequeue retrieves the work item with the smallest OrderKey from the frontier. This method blocks until:
- A work item becomes available, or
- The context is cancelled
Returns the work item with the minimum OrderKey and nil error on success. Returns a zero-value work item and context error if the context is cancelled.
BUG-003 fix (T021): Dequeue waits for notification, then pops from heap. This ensures deterministic ordering: heap is the single source of truth for work item storage and ordering. The channel provides notification only.
func (*Frontier[S]) Enqueue ¶
Enqueue adds a work item to the frontier queue. The item is first added to the internal heap (sorted by OrderKey), then sent to the buffered channel.
If the channel is full, this method blocks until:
- Space becomes available in the channel (backpressure), or
- The context is cancelled
Returns an error if the context is cancelled before the item can be enqueued. This blocking behavior provides natural backpressure when nodes produce work faster than the system can process it.
Backpressure Implementation (T063): The buffered channel f.queue enforces capacity limits. When the channel reaches its configured capacity (set via NewFrontier), the send operation on line 172 blocks until a receiver (Dequeue) consumes an item, freeing up space. This implements FR-011: System MUST implement backpressure by blocking admission when frontier queue reaches QueueDepth capacity.
The select statement ensures that context cancellation is respected even while blocked on channel send, providing graceful shutdown during backpressure.
func (*Frontier[S]) Len ¶
Len returns the current number of work items in the frontier queue. This method is thread-safe and can be called concurrently with Enqueue/Dequeue.
func (*Frontier[S]) Metrics ¶
func (f *Frontier[S]) Metrics() SchedulerMetrics
Metrics returns a snapshot of current scheduler metrics (T068).
The returned SchedulerMetrics is a point-in-time snapshot and may be stale immediately after return if execution is ongoing. For consistent reads, call this method when execution is paused or completed.
This method is thread-safe and uses atomic operations to read metric values.
type LLMCall ¶
type LLMCall struct {
Model string // Model identifier (e.g., "gpt-4o", "claude-3-sonnet")
InputTokens int // Number of input tokens consumed
OutputTokens int // Number of output tokens generated
CostUSD float64 // Calculated cost in USD
Timestamp time.Time // When the call was made
NodeID string // Node that made the call (optional)
}
LLMCall represents a single LLM API invocation with token usage and cost.
type ModelPricing ¶
type ModelPricing struct {
InputPer1M float64 // Cost per 1M input tokens in USD
OutputPer1M float64 // Cost per 1M output tokens in USD
}
ModelPricing defines input and output token costs for LLM models. Prices are in USD per 1M tokens (per million tokens).
type Next ¶
type Next struct {
// To specifies the next single node to execute.
// Mutually exclusive with Many and Terminal.
To string
// Many specifies multiple nodes to execute in parallel (fan-out).
// Mutually exclusive with To and Terminal.
Many []string
// Terminal indicates workflow execution should stop.
// Mutually exclusive with To and Many.
Terminal bool
}
Next specifies the next step(s) in workflow execution after a node completes.
It supports three routing modes:
- Terminal: Stop execution (Route.Terminal = true)
- Single: Go to a specific node (Route.To = "nodeID")
- Fan-out: Go to multiple nodes in parallel (Route.Many = []string{"node1", "node2"})
type Node ¶
type Node[S any] interface { // Run executes the node's logic with the given context and state. // It returns a NodeResult containing state changes, routing decisions, // events, and any errors encountered. Run(ctx context.Context, state S) NodeResult[S] }
Node represents a processing unit in the workflow graph. It receives state of type S, performs computation, and returns a NodeResult.
Nodes are the fundamental building blocks of LangGraph workflows. Each node can:
- Access the current state
- Perform computation (call LLMs, tools, or custom logic)
- Return state modifications via Delta
- Control routing via Route
- Emit observability events
- Handle errors
Type parameter S is the state type shared across the workflow.
Optional Methods:
Nodes can optionally implement Policy() NodePolicy to configure execution behavior such as timeouts, retry policies, and idempotency keys. If not implemented, the engine uses default settings from Options.
To maintain backward compatibility, embed DefaultPolicy in your node struct to get a zero-value Policy() implementation:
type MyNode struct {
DefaultPolicy // Provides Policy() returning zero value
// ... your fields
}
For custom policies, implement the method directly:
func (n *MyNode) Policy() NodePolicy {
return NodePolicy{
Timeout: 30 * time.Second,
RetryPolicy: &RetryPolicy{MaxAttempts: 3},
}
}
Nodes can also optionally implement Effects() SideEffectPolicy to declare their I/O characteristics for deterministic replay. If not implemented, the node is assumed to be pure (no external side effects).
For nodes with external I/O (LLM calls, API requests, database queries):
func (n *MyNode) Effects() SideEffectPolicy {
return SideEffectPolicy{
Recordable: true, // I/O can be captured for replay
RequiresIdempotency: true, // Use idempotency keys
}
}
Pure nodes (computation only, no I/O) don't need to implement Effects() - the default zero value indicates no side effects.
type NodeError ¶
type NodeError struct {
// Message is the human-readable error description.
Message string
// Code is a machine-readable error code for programmatic handling.
Code string
// NodeID identifies which node produced this error.
NodeID string
// Cause is the underlying error that caused this NodeError.
Cause error
}
NodeError represents an error that occurred during node execution. It provides structured error information for better observability and debugging.
type NodeFunc ¶
type NodeFunc[S any] func(ctx context.Context, state S) NodeResult[S]
NodeFunc is a function adapter that implements the Node interface. It allows using plain functions as nodes without creating custom types.
Example:
processNode := NodeFunc[MyState](func(ctx context.Context, s MyState) NodeResult[MyState] {
return NodeResult[MyState]{
Delta: MyState{Result: "processed"},
Route: Stop(),
}
})
type NodePolicy ¶
type NodePolicy struct {
// Timeout is the maximum execution time allowed for this node.
// If zero, Options.DefaultNodeTimeout is used.
Timeout time.Duration
// RetryPolicy specifies automatic retry behavior for transient failures.
// If nil, no retries are attempted.
RetryPolicy *RetryPolicy
// IdempotencyKeyFunc generates a custom idempotency key from the state.
// If nil, a default key based on node ID and step ID is used.
// This is useful for side-effecting nodes that need exactly-once semantics.
IdempotencyKeyFunc func(state any) string
}
NodePolicy configures the execution behavior for a specific node, including. timeouts, retry logic, and idempotency key generation.
Policies are attached to nodes and enforced by the scheduler. If not specified, default values from Options are used.
type NodeResult ¶
type NodeResult[S any] struct { // Delta is the partial state update produced by this node. // It will be merged with the current state using the configured reducer. Delta S // Route specifies the next step(s) in workflow execution. // Use Stop() for terminal nodes, Goto(id) for explicit routing, // or set Many for fan-out to multiple nodes. Route Next // Err contains any error that occurred during node execution. // Non-nil errors halt the workflow unless custom error handling is implemented. Err error }
NodeResult represents the output of a node execution.
It contains all information needed to continue workflow execution:
- Delta: Partial state update to be merged via reducer
- Route: Next hop(s) for execution flow
- Events: Observability events emitted during execution
- Err: Node-level error (if any)
type Option ¶
type Option func(*engineConfig) error
Option is a functional option for configuring an Engine.
Functional options provide a clean, extensible API for engine configuration: - Chainable: engine := New(reducer, store, emitter, WithMaxConcurrent(8), WithQueueDepth(1024)). - Self-documenting: Option names clearly describe their purpose. - Optional: Only specify the configuration you need. - Backward compatible: Existing Options struct still works.
Example:
engine := graph.New(.
reducer, store, emitter, graph.WithMaxConcurrent(16), graph.WithQueueDepth(2048), graph.WithDefaultNodeTimeout(10*time.Second),
).
Options can be mixed with the Options struct:
opts := graph.Options{MaxSteps: 100}. engine := graph.New(.
reducer, store, emitter, opts,
graph.WithMaxConcurrent(8), // Overrides opts if specified. ).
func WithBackpressureTimeout ¶
WithBackpressureTimeout sets the maximum time to wait when the frontier queue is full.
Default: 30s. If exceeded, Run() returns ErrBackpressureTimeout.
Set lower for fast-failing systems, higher for workflows with bursty fan-outs.
When backpressure timeout is exceeded: 1. Execution pauses with checkpoint saved. 2. ErrBackpressureTimeout is returned. 3. Resume from checkpoint after reducing load or increasing MaxConcurrentNodes.
Example:
engine := graph.New(.
reducer, store, emitter, graph.WithMaxConcurrent(8),
graph.WithBackpressureTimeout(60*time.Second), // Wait up to 60s for queue space. ).
func WithConflictPolicy ¶
func WithConflictPolicy(policy ConflictPolicy) Option
WithConflictPolicy sets the policy for handling concurrent state update conflicts.
Default: ConflictFail (returns error on conflict).
Only ConflictFail is currently supported. Other policies are planned for future releases.
Example:
engine := graph.New(.
reducer, store, emitter,
graph.WithConflictPolicy(graph.ConflictFail), // Explicit error on conflicts. ).
func WithCostTracker ¶
func WithCostTracker(tracker *CostTracker) Option
WithCostTracker enables LLM cost tracking with static pricing.
Cost tracker monitors:
- Per-model token usage (input/output)
- Cost calculation using static pricing tables
- Cumulative cost tracking across calls
- Per-model cost attribution
Static pricing includes OpenAI, Anthropic, and Google models.
Example:
tracker := graph.NewCostTracker("run-123", "USD")
engine := graph.New(
reducer, store, emitter,
graph.WithCostTracker(tracker),
)
// After execution, get cost summary
totalCost := tracker.GetTotalCost()
modelCosts := tracker.GetCostByModel()
func WithDefaultNodeTimeout ¶
WithDefaultNodeTimeout sets the maximum execution time for nodes without explicit Policy().Timeout.
Default: 30s. Individual nodes can override via NodePolicy.Timeout.
Prevents single slow nodes from blocking workflow progress indefinitely. When exceeded, node execution is cancelled and returns context.DeadlineExceeded.
Example:
engine := graph.New(.
reducer, store, emitter,
graph.WithDefaultNodeTimeout(10*time.Second), // All nodes timeout after 10s by default. ).
func WithMaxConcurrent ¶
WithMaxConcurrent sets the maximum number of nodes executing concurrently.
Default: 8 when concurrent mode is enabled (determined by presence of other concurrent options). Set to 0 for sequential execution (backward compatible default).
Tuning guidance: - CPU-bound workflows: Set to runtime.NumCPU(). - I/O-bound workflows: Set to 10-50 depending on external service limits. - Memory-constrained: Reduce to prevent excessive state copies.
Each concurrent node holds a deep copy of state, so memory usage scales. linearly with MaxConcurrentNodes.
Example:
engine := graph.New(.
reducer, store, emitter,
graph.WithMaxConcurrent(16), // Up to 16 nodes execute in parallel. ).
func WithMaxSteps ¶
WithMaxSteps limits workflow execution to prevent infinite loops.
Default: 0 (no limit, use with caution).
Workflow loops (A → B → A) are fully supported. Use MaxSteps to prevent infinite loops when a conditional exit is missing or misconfigured.
Loop patterns:
- Node-based conditional loop: nodeA returns Goto("B") if shouldContinue(state), else Stop()
- Edge predicate loop: Connect("A", "B", loopPredicate) and Connect("A", "exit", exitPredicate)
Recommended values:
- Simple workflows (3-5 nodes): MaxSteps = 20
- Workflows with loops: MaxSteps = depth × max_iterations (e.g., 5 nodes × 10 iterations = 50)
- Complex multi-loop workflows: MaxSteps = 100-200
When MaxSteps is exceeded, Run() returns EngineError with code "MAX_STEPS_EXCEEDED".
Example:
engine := graph.New(
reducer, store, emitter,
graph.WithMaxSteps(100), // Limit to 100 execution steps
)
func WithMetrics ¶
func WithMetrics(metrics *PrometheusMetrics) Option
WithMetrics enables Prometheus metrics collection.
Metrics enable production monitoring with 6 key metrics:
- inflight_nodes: Current concurrent node count
- queue_depth: Pending nodes in scheduler queue
- step_latency_ms: Node execution duration histogram
- retries_total: Cumulative retry attempts
- merge_conflicts_total: Concurrent state merge conflicts
- backpressure_events_total: Queue saturation events
All metrics are automatically updated during workflow execution.
Example:
registry := prometheus.NewRegistry()
metrics := graph.NewPrometheusMetrics(registry)
engine := graph.New(
reducer, store, emitter,
graph.WithMetrics(metrics),
)
// Expose metrics endpoint
http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
func WithQueueDepth ¶
WithQueueDepth sets the capacity of the execution frontier queue.
Default: 1024. Increase for workflows with large fan-outs.
When the queue fills, new work items block until space is available. This provides backpressure to prevent unbounded memory growth.
Recommended: MaxConcurrentNodes × 100 for initial estimate.
Example:
engine := graph.New(.
reducer, store, emitter, graph.WithMaxConcurrent(8),
graph.WithQueueDepth(2048), // Queue can hold 2048 work items. ).
func WithReplayMode ¶
WithReplayMode enables deterministic replay using recorded I/O.
Default: false (record mode - captures I/O for later replay).
When true, nodes with SideEffectPolicy.Recordable=true will use recorded. responses instead of executing live I/O. This enables: - Debugging: Replay production executions locally. - Testing: Verify workflow logic without external dependencies. - Auditing: Reconstruct exact execution flow from checkpoints.
Requires prior execution with ReplayMode=false to record I/O.
Example:
// Original execution (record mode). recordEngine := graph.New(reducer, store, emitter, graph.WithReplayMode(false)). _, _ = recordEngine.Run(ctx, "run-001", initialState).
// Later: replay execution. replayEngine := graph.New(reducer, store, emitter, graph.WithReplayMode(true)). replayedState, _ := replayEngine.ReplayRun(ctx, "run-001").
func WithRunWallClockBudget ¶
WithRunWallClockBudget sets the maximum total execution time for Run().
Default: 10m. If exceeded, Run() returns context.DeadlineExceeded.
Use this to enforce hard deadlines on entire workflow execution. Set to 0 to disable (workflow runs until completion or MaxSteps).
Example:
engine := graph.New(.
reducer, store, emitter,
graph.WithRunWallClockBudget(5*time.Minute), // Entire workflow must complete in 5 minutes. ).
func WithStrictReplay ¶
WithStrictReplay controls replay mismatch behavior.
Default: true (fail on I/O hash mismatch).
When true, replay mode verifies recorded I/O hashes match expected values. If a mismatch is detected (indicating logic changes), Run() returns ErrReplayMismatch.
Set to false to allow "best effort" replay that tolerates minor changes. Useful when debugging with modified node logic.
Example:
engine := graph.New(.
reducer, store, emitter, graph.WithReplayMode(true),
graph.WithStrictReplay(false), // Allow replay even if logic changed. ).
type Options ¶
type Options struct {
// MaxSteps limits workflow execution to prevent infinite loops.
// If 0, no limit is enforced (use with caution).
//
// Workflow loops (A → B → A) are fully supported. Use MaxSteps to prevent
// infinite loops when a conditional exit is missing or misconfigured.
//
// Loop patterns:
// 1. Node-based conditional loop:
// nodeA returns Goto("B") if shouldContinue(state), else Stop()
// 2. Edge predicate loop:
// Connect("A", "B", loopPredicate)
// Connect("A", "exit", exitPredicate)
//
// Recommended values:
// - Simple workflows (3-5 nodes): MaxSteps = 20
// - Workflows with loops: MaxSteps = depth × max_iterations (e.g., 5 nodes × 10 iterations = 50)
// - Complex multi-loop workflows: MaxSteps = 100-200
//
// When MaxSteps is exceeded, Run() returns EngineError with code "MAX_STEPS_EXCEEDED".
MaxSteps int
// Retries specifies how many times to retry a node on transient errors.
// If 0, nodes are not retried.
// Transient errors are identified by checking if error implements Retryable interface.
// Deprecated: Use NodePolicy.RetryPolicy for per-node retry configuration.
Retries int
// MaxConcurrentNodes limits the number of nodes executing in parallel.
// Default: 8. Set to 0 for sequential execution (backward compatible).
//
// Tuning guidance:
// - CPU-bound workflows: Set to runtime.NumCPU()
// - I/O-bound workflows: Set to 10-50 depending on external service limits
// - Memory-constrained: Reduce to prevent excessive state copies
//
// Each concurrent node holds a deep copy of state, so memory usage scales
// linearly with MaxConcurrentNodes.
MaxConcurrentNodes int
// QueueDepth sets the capacity of the execution frontier queue.
// Default: 1024. Increase for workflows with large fan-outs.
//
// When the queue fills, new work items block until space is available.
// This provides backpressure to prevent unbounded memory growth.
//
// Recommended: MaxConcurrentNodes × 100 for initial estimate.
QueueDepth int
// BackpressureTimeout is the maximum time to wait when the frontier queue is full.
// Default: 30s. If exceeded, Run() returns ErrBackpressureTimeout.
//
// Set lower for fast-failing systems, higher for workflows with bursty fan-outs.
BackpressureTimeout time.Duration
// DefaultNodeTimeout is the maximum execution time for nodes without explicit Policy().Timeout.
// Default: 30s. Individual nodes can override via NodePolicy.Timeout.
//
// Prevents single slow nodes from blocking workflow progress indefinitely.
// When exceeded, node execution is cancelled and returns context.DeadlineExceeded.
DefaultNodeTimeout time.Duration
// RunWallClockBudget is the maximum total execution time for Run().
// Default: 10m. If exceeded, Run() returns context.DeadlineExceeded.
//
// Use this to enforce hard deadlines on entire workflow execution.
// Set to 0 to disable (workflow runs until completion or MaxSteps).
RunWallClockBudget time.Duration
// ReplayMode enables deterministic replay using recorded I/O.
// Default: false (record mode - captures I/O for later replay).
//
// When true, nodes with SideEffectPolicy.Recordable=true will use recorded
// responses instead of executing live I/O. This enables:
// - Debugging: Replay production executions locally
// - Testing: Verify workflow logic without external dependencies
// - Auditing: Reconstruct exact execution flow from checkpoints
//
// Requires prior execution with ReplayMode=false to record I/O.
ReplayMode bool
// StrictReplay controls replay mismatch behavior.
// Default: true (fail on I/O hash mismatch).
//
// When true, replay mode verifies recorded I/O hashes match expected values.
// If a mismatch is detected (indicating logic changes), Run() returns ErrReplayMismatch.
//
// Set to false to allow "best effort" replay that tolerates minor changes.
// Useful when debugging with modified node logic.
StrictReplay bool
// Metrics enables Prometheus metrics collection (T044).
// If nil, metrics are not collected.
//
// Create with NewPrometheusMetrics(registry) to enable production monitoring.
// All 6 metrics (inflight_nodes, queue_depth, step_latency_ms, retries_total,
// merge_conflicts_total, backpressure_events_total) are automatically updated.
//
// Example:
// registry := prometheus.NewRegistry()
// metrics := NewPrometheusMetrics(registry)
// engine := New(reducer, store, emitter, Options{Metrics: metrics})
Metrics *PrometheusMetrics
// CostTracker enables LLM cost tracking with static pricing (T045).
// If nil, cost tracking is disabled.
//
// Create with NewCostTracker(runID, "USD") to track token usage and costs.
// Static pricing includes OpenAI, Anthropic, and Google models.
//
// Example:
// tracker := NewCostTracker("run-123", "USD")
// engine := New(reducer, store, emitter, Options{CostTracker: tracker})
CostTracker *CostTracker
}
Options configures Engine execution behavior.
Zero values are valid - the Engine will use sensible defaults.
type Predicate ¶
Predicate is a function that evaluates state to determine if an edge should be traversed.
Predicates enable conditional routing based on workflow state. They should be pure functions (deterministic, no side effects).
Common patterns: - Threshold: state.Score > 0.8. - Presence: state.Result != "". - Boolean flag: state.IsReady. - Complex logic: state.Retries < 3 && state.Error == nil.
Type parameter S is the state type to evaluate.
type PrometheusMetrics ¶
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
PrometheusMetrics (T032) provides comprehensive Prometheus-compatible metrics. collection for graph execution monitoring in production environments.
Metrics exposed (all namespaced with "langgraph_"):
1. inflight_nodes (gauge): Current number of nodes executing concurrently. Labels: run_id, graph_id. Use: Monitor concurrency levels and detect bottlenecks.
2. queue_depth (gauge): Number of pending nodes waiting for execution. Labels: run_id, graph_id. Use: Track backpressure and queue saturation.
3. step_latency_ms (histogram): Node execution duration in milliseconds. Labels: run_id, node_id, status (success/error). Buckets: [1, 5, 10, 50, 100, 500, 1000, 5000, 10000]. Use: P50/P95/P99 latency analysis per node.
4. retries_total (counter): Cumulative retry attempts across all nodes. Labels: run_id, node_id, reason. Use: Identify flaky nodes and error patterns.
5. merge_conflicts_total (counter): Concurrent state merge conflicts detected. Labels: run_id, conflict_type. Use: Monitor determinism violations in concurrent execution.
6. backpressure_events_total (counter): Queue saturation events triggering backpressure. Labels: run_id, reason. Use: Track when execution is throttled due to resource limits.
Usage:
// Create metrics with custom registry. registry := prometheus.NewRegistry(). metrics := NewPrometheusMetrics(registry).
// Integrate with engine. engine := New[MyState](.
WithMetrics(metrics),
).
// Metrics automatically update during execution.
// Expose via HTTP for Prometheus scraping:
http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})).
Thread-safe: All methods use atomic operations or mutex protection.
func NewPrometheusMetrics ¶
func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics
NewPrometheusMetrics (T033) creates and registers all graph execution metrics. with the provided Prometheus registry.
Parameters: - registry: Prometheus registry to register metrics with (use prometheus.DefaultRegisterer for global registry).
Returns: - *PrometheusMetrics: Fully initialized metrics collector.
All metrics are registered with namespace "langgraph" and appropriate labels. Histograms use buckets optimized for typical node execution times (1ms to 10s).
Example:
// Use default global registry. metrics := NewPrometheusMetrics(prometheus.DefaultRegisterer).
// Use custom registry (recommended for isolation). registry := prometheus.NewRegistry(). metrics := NewPrometheusMetrics(registry). http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{})).
func (*PrometheusMetrics) Disable ¶
func (pm *PrometheusMetrics) Disable()
Disable temporarily disables metric recording (useful for testing).
func (*PrometheusMetrics) Enable ¶
func (pm *PrometheusMetrics) Enable()
Enable re-enables metric recording after Disable().
func (*PrometheusMetrics) IncrementBackpressure ¶
func (pm *PrometheusMetrics) IncrementBackpressure(runID, reason string)
IncrementBackpressure (T039) increments the backpressure event counter.
This updates the backpressure_events_total counter with labels for run_id and reason. Use this to track when execution is throttled due to resource limits (queue full, max concurrent reached, etc.).
Parameters: - runID: Unique workflow execution identifier. - reason: Backpressure cause ("queue_full", "max_concurrent", "timeout").
Example:
if queueDepth >= maxQueueDepth {. metrics.IncrementBackpressure(runID, "queue_full"). return ErrBackpressure. }.
func (*PrometheusMetrics) IncrementMergeConflicts ¶
func (pm *PrometheusMetrics) IncrementMergeConflicts(runID, conflictType string)
IncrementMergeConflicts (T038) increments the merge conflict counter.
This updates the merge_conflicts_total counter with labels for run_id and conflict_type. Use this to detect determinism violations or reducer errors in concurrent execution.
Parameters: - runID: Unique workflow execution identifier. - conflictType: Type of conflict ("reducer_error", "state_divergence").
Example:
if err := reducer(prev, delta); err != nil {. metrics.IncrementMergeConflicts(runID, "reducer_error"). }.
func (*PrometheusMetrics) IncrementRetries ¶
func (pm *PrometheusMetrics) IncrementRetries(runID, nodeID, reason string)
IncrementRetries (T035) increments the retry counter for a specific node and reason.
This updates the retries_total counter with labels for run_id, node_id, and reason. Use this to identify flaky nodes and error patterns requiring investigation.
Parameters: - runID: Unique workflow execution identifier. - nodeID: Node that is being retried. - reason: Retry cause ("error", "timeout", "transient").
Example:
if result.Err != nil {. metrics.IncrementRetries(runID, nodeID, "error").
// Retry logic... }.
func (*PrometheusMetrics) RecordStepLatency ¶
func (pm *PrometheusMetrics) RecordStepLatency(runID, nodeID string, latency time.Duration, status string)
RecordStepLatency (T034) records the execution duration of a node in milliseconds.
This updates the step_latency_ms histogram with labels for run_id, node_id, and status. Use this to track P50/P95/P99 latencies per node for performance monitoring.
Parameters: - runID: Unique workflow execution identifier. - nodeID: Node that was executed. - latency: Execution duration. - status: Execution outcome ("success", "error", "timeout").
Example:
start := time.Now(). result := node.Run(ctx, state). metrics.RecordStepLatency(runID, nodeID, time.Since(start), "success").
func (*PrometheusMetrics) Reset ¶
func (pm *PrometheusMetrics) Reset()
Reset clears all metric values (useful for testing). This does not unregister metrics from the registry.
func (*PrometheusMetrics) UpdateInflightNodes ¶
func (pm *PrometheusMetrics) UpdateInflightNodes(count int)
UpdateInflightNodes (T037) sets the current number of nodes executing concurrently.
This updates the inflight_nodes gauge. Use this to monitor concurrency levels. and detect whether MaxConcurrent limits are being reached.
Parameters: - count: Current number of nodes in execution.
Example:
metrics.UpdateInflightNodes(len(activeNodes)).
func (*PrometheusMetrics) UpdateQueueDepth ¶
func (pm *PrometheusMetrics) UpdateQueueDepth(depth int)
UpdateQueueDepth (T036) sets the current number of pending nodes in the scheduler queue.
This updates the queue_depth gauge. Use this to monitor backpressure and detect. when the system is saturated with pending work.
Parameters: - depth: Current number of nodes waiting for execution.
Example:
metrics.UpdateQueueDepth(scheduler.PendingCount()).
type RecordedIO ¶
type RecordedIO struct {
// NodeID identifies the node that performed this I/O operation.
NodeID string `json:"node_id"`
// Attempt is the retry attempt number this I/O corresponds to.
// This allows matching I/O recordings to specific retry attempts.
Attempt int `json:"attempt"`
// Request is the serialized request data sent to the external service.
// Stored as JSON for cross-language compatibility and human readability.
Request json.RawMessage `json:"request"`
// Response is the serialized response data received from the external service.
// Stored as JSON for cross-language compatibility and human readability.
Response json.RawMessage `json:"response"`
// Hash is a SHA-256 hash of the response content, used for mismatch detection.
// during replay. Format: "sha256:hex_encoded_hash".
Hash string `json:"hash"`
// Timestamp records when this I/O operation was captured.
Timestamp time.Time `json:"timestamp"`
// Duration is how long the I/O operation took to complete.
// This can be used for performance analysis and replay simulation.
Duration time.Duration `json:"duration"`
}
RecordedIO captures an external interaction (API call, database query, etc.). for deterministic replay without re-invoking the external service.
During initial execution, RecordedIO instances are created for nodes with. SideEffectPolicy.Recordable=true. During replay, these recorded I/Os are. matched by (NodeID, Attempt) and their responses are returned directly. without executing the node.
The Hash field enables mismatch detection: if a live execution produces. a different response hash than recorded, ErrReplayMismatch is raised, indicating non-deterministic behavior.
type Reducer ¶
type Reducer[S any] func(prev, delta S) S
Reducer is a function that merges a partial state update (delta) into the previous state.
Reducers are responsible for deterministic state composition, enabling:
- Sequential state updates across workflow nodes
- Parallel branch result merging
- Checkpoint-based state reconstruction
The reducer must be:
- **Pure**: Same inputs always produce same output
- **Deterministic**: No randomness or side effects
- **Commutative** (for parallel merges): Order of deltas shouldn't matter for correctness
Example:
reducer := func(prev, delta MyState) MyState {
if delta.Query != "" {
prev.Query = delta.Query // Last write wins
}
prev.Counter += delta.Counter // Accumulate
return prev
}
type RetryPolicy ¶
type RetryPolicy struct {
// MaxAttempts is the maximum number of execution attempts (including initial attempt).
// Must be >= 1. A value of 1 means no retries.
MaxAttempts int
// BaseDelay is the base delay for exponential backoff between retries.
// The actual delay is computed as: min(BaseDelay * 2^attempt + jitter, MaxDelay).
BaseDelay time.Duration
// MaxDelay is the maximum delay cap for exponential backoff.
// Must be >= BaseDelay.
MaxDelay time.Duration
// Retryable is a predicate function that determines if an error is retryable.
// If nil, all errors are considered non-retryable.
// Common patterns:
// - Network errors: temporary, connection refused, timeout.
// - HTTP 429, 503, 504.
// - Database deadlocks.
Retryable func(error) bool
}
RetryPolicy defines automatic retry configuration for transient node failures.
When a node execution fails, the retry policy determines whether the failure. is retryable and how long to wait before the next attempt. Exponential backoff. with jitter is used to avoid thundering herd problems.
func (*RetryPolicy) Validate ¶
func (rp *RetryPolicy) Validate() error
Validate checks if the RetryPolicy configuration is valid. Returns an error if any constraints are violated:
- MaxAttempts must be >= 1 (1 means no retries, just initial attempt)
- If both MaxDelay and BaseDelay are > 0, then MaxDelay must be >= BaseDelay (MaxDelay == 0 is treated as "no maximum delay cap")
type SchedulerMetrics ¶
type SchedulerMetrics struct {
// ActiveNodes is the current number of nodes executing concurrently.
// This value should never exceed MaxConcurrentNodes.
// Use atomic.LoadInt32() to read safely from multiple goroutines.
ActiveNodes int32
// QueueDepth is the current number of work items waiting in the frontier.
// High queue depth indicates work is being produced faster than consumed.
// When this reaches capacity, backpressure kicks in.
QueueDepth int32
// QueueCapacity is the maximum queue depth configured via Options.QueueDepth.
// This is the capacity at which backpressure will block new work items.
QueueCapacity int32
// TotalSteps is the cumulative count of execution steps since run start.
// Monotonically increasing counter.
TotalSteps int64
// TotalEnqueued is the total number of work items enqueued since run start.
// Useful for calculating throughput (enqueued/second).
TotalEnqueued int64
// TotalDequeued is the total number of work items dequeued since run start.
// Should equal TotalEnqueued when run completes.
TotalDequeued int64
// BackpressureEvents counts how many times backpressure was triggered.
// High count indicates sustained overload or insufficient concurrency.
BackpressureEvents int32
// PeakActiveNodes is the maximum concurrent execution observed during this run.
// Should be <= MaxConcurrentNodes.
PeakActiveNodes int32
// PeakQueueDepth is the maximum queue depth observed during this run.
// If this equals QueueCapacity, backpressure was triggered.
PeakQueueDepth int32
}
SchedulerMetrics tracks execution metrics for monitoring and observability (T067).
These metrics provide insight into the runtime behavior of the concurrent scheduler, enabling operators to:
- Detect bottlenecks and capacity issues
- Monitor backpressure conditions
- Tune concurrency parameters (MaxConcurrentNodes, QueueDepth)
- Alert on unhealthy execution patterns
Metrics are updated atomically by the scheduler and can be safely read concurrently.
According to spec.md FR-027: System MUST expose metrics for queue_depth, step_latency_ms, active_nodes, and step_count.
type SideEffectPolicy ¶
type SideEffectPolicy struct {
// Recordable indicates whether this node's I/O can be captured for replay.
// Examples:
// - LLM API calls: true (responses are cacheable).
// - Pure functions: false (no external I/O).
// - Database queries: false (may be non-deterministic).
Recordable bool
// RequiresIdempotency indicates whether this node requires an idempotency key.
// to prevent duplicate execution. This is important for side-effecting operations.
// like database writes, payments, or notifications.
//
// If true, the node must provide an IdempotencyKeyFunc in its NodePolicy.
RequiresIdempotency bool
}
SideEffectPolicy declares the external I/O characteristics of a node, informing the replay engine whether the node's interactions should be. recorded and replayed.
This policy affects deterministic replay behavior: - Recordable=true: I/O is captured and can be replayed without re-execution. - RequiresIdempotency=true: Node needs idempotency key to ensure exactly-once semantics.
type StateCopier ¶
StateCopier is an optional interface that state types can implement to provide custom deep copy logic. When a state type implements this interface, the engine will use the custom DeepCopy method instead of JSON-based serialization.
This interface allows users to: - Avoid JSON serialization overhead in performance-critical paths - Properly handle unexported fields, channels, or other non-JSON-serializable types - Implement application-specific copying semantics
The interface uses any return type to accommodate both value and pointer receiver implementations, avoiding generic interface instantiation issues where S = *T would require DeepCopy() (*T, error) instead of DeepCopy() (T, error).
Example implementations:
Value receiver (returns value):
type MyState struct {
Counter int
Data []byte
}
func (s MyState) DeepCopy() (any, error) {
copied := MyState{
Counter: s.Counter,
Data: append([]byte(nil), s.Data...),
}
return copied, nil
}
Pointer receiver (returns pointer):
type MyState struct {
Counter int
Data []byte
}
func (s *MyState) DeepCopy() (any, error) {
copied := &MyState{
Counter: s.Counter,
Data: append([]byte(nil), s.Data...),
}
return copied, nil
}
Thread-safety: DeepCopy implementations must be safe to call from multiple goroutines without external synchronization.
type WorkItem ¶
type WorkItem[S any] struct { // StepID is the monotonically increasing step number in the run StepID int `json:"step_id"` // OrderKey is a deterministic sort key computed from hash(path_hash, edge_index). // This ensures consistent execution order across replays. OrderKey uint64 `json:"order_key"` // NodeID identifies the node to execute for this work item NodeID string `json:"node_id"` // State is the snapshot of state for this work item's execution State S `json:"state"` // Attempt is the retry counter (0 for first execution, 1+ for retries) Attempt int `json:"attempt"` // ParentNodeID is the node that spawned this work item, used for path hash computation ParentNodeID string `json:"parent_node_id"` // EdgeIndex is the index of the edge taken from parent, used for deterministic ordering EdgeIndex int `json:"edge_index"` }
WorkItem represents a schedulable unit of work in the execution frontier. Each WorkItem contains all the context needed to execute a node, including the node's input state, execution metadata, and provenance information for deterministic ordering.
WorkItems are created when nodes produce routing decisions and are queued in the frontier for concurrent execution. The OrderKey ensures deterministic processing order even when nodes complete out of order.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package emit provides event emission and observability for graph execution.
|
Package emit provides event emission and observability for graph execution. |
|
Package mcp provides MCP (Model Context Protocol) server implementation for LangGraph workflows.
|
Package mcp provides MCP (Model Context Protocol) server implementation for LangGraph workflows. |
|
Package model provides LLM integration adapters.
|
Package model provides LLM integration adapters. |
|
anthropic
Package anthropic provides ChatModel adapter for Anthropic Claude API.
|
Package anthropic provides ChatModel adapter for Anthropic Claude API. |
|
bedrock
Package bedrock provides AWS Bedrock LLM integration for LangGraph-Go.
|
Package bedrock provides AWS Bedrock LLM integration for LangGraph-Go. |
|
google
Package google provides ChatModel adapter for Google Gemini API.
|
Package google provides ChatModel adapter for Google Gemini API. |
|
ollama
Package ollama provides ChatModel adapter for Ollama API.
|
Package ollama provides ChatModel adapter for Ollama API. |
|
openai
Package openai provides ChatModel adapter for OpenAI GPT API.
|
Package openai provides ChatModel adapter for OpenAI GPT API. |
|
Package store provides persistence implementations for graph state.
|
Package store provides persistence implementations for graph state. |
|
Package tool provides tool interfaces for graph nodes.
|
Package tool provides tool interfaces for graph nodes. |