graph

package
v0.8.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2026 License: MIT Imports: 21 Imported by: 13

Documentation

Overview

Package graph provides the core graph construction and execution engine for LangGraph Go.

This package implements the fundamental building blocks for creating stateful, multi-agent applications using directed graphs. It offers both untyped and typed interfaces for building workflows, with support for parallel execution, checkpointing, streaming, and comprehensive event handling.

Core Concepts

StateGraph The primary component for building graphs is StateGraph, which maintains state as it flows through nodes. Each node can process and transform the state before passing it to the next node based on defined edges.

Nodes and Edges Nodes represent processing units (functions, agents, tools) that transform state. Edges define the flow between nodes, supporting conditional routing based on state content.

Typed Support For type safety, the package provides StateGraph[S] which uses Go generics to enforce state types at compile time, reducing runtime errors and improving code maintainability.

Key Features

  • Parallel node execution with coordination
  • Checkpointing for durable execution with resume capability
  • Streaming for real-time event monitoring
  • Comprehensive listener system for observability
  • Built-in retry mechanisms with configurable policies
  • Subgraph composition for modular design
  • Graph visualization (Mermaid, ASCII, DOT)
  • Interrupt support for human-in-the-loop workflows

Example Usage

Basic State Graph

g := graph.NewStateGraph()

// Add nodes
g.AddNode("process", "Process node", func(ctx context.Context, state any) (any, error) {
	// Process the state
	s := state.(map[string]any)
	s["processed"] = true
	return s, nil
})

g.AddNode("validate", "Validate node", func(ctx context.Context, state any) (any, error) {
	// Validate the processed state
	s := state.(map[string]any)
	if s["processed"].(bool) {
		s["valid"] = true
	}
	return s, nil
})

// Set entry point and edges
g.SetEntryPoint("process")
g.AddEdge("process", "validate")
g.AddEdge("validate", graph.END)

// Compile and run
runnable := g.Compile()
result, err := runnable.Invoke(context.Background(), map[string]any{
	"data": "example",
})

Typed State Graph

type WorkflowState struct {
	Input    string `json:"input"`
	Output   string `json:"output"`
	Complete bool   `json:"complete"`
}

g := graph.NewStateGraph[WorkflowState]()

g.AddNode("process", "Process the input", func(ctx context.Context, state WorkflowState) (WorkflowState, error) {
	state.Output = strings.ToUpper(state.Input)
	state.Complete = true
	return state, nil
})

// Add validate node
g.AddNode("validate", "Validate the output", func(ctx context.Context, state WorkflowState) (WorkflowState, error) {
	return state, nil
})
g.AddNode("retry", "Retry processing", func(ctx context.Context, state WorkflowState) (WorkflowState, error) {
	return state, nil
})

// Conditional routing
g.AddConditionalEdge("process", func(ctx context.Context, state WorkflowState) string {
	if state.Complete {
		return "validate"
	}
	return "retry"
})
g.AddEdge("validate", graph.END)
g.AddEdge("retry", "process")

Parallel Execution

// Add parallel nodes
g.AddParallelNodes("parallel_tasks", map[string]func(context.Context, any) (any, error){
	"task1": func(ctx context.Context, state any) (any, error) {
		// First task logic
		return state, nil
	},
	"task2": func(ctx context.Context, state any) (any, error) {
		// Second task logic
		return state, nil
	},
})

Checkpointing

// Note: Checkpointing is handled at the runnable level
// See store package examples for checkpointing implementation

runnable := g.Compile()

// Execute with context
result, err := runnable.Invoke(context.Background(), initialState)

Streaming

// Create listenable graph for streaming
g := graph.NewListenableStateGraph()
g.AddNode("process", "Process node", func(ctx context.Context, state map[string]any) (map[string]any, error) {
	state["processed"] = true
	return state, nil
})
g.SetEntryPoint("process")
g.AddEdge("process", graph.END)

// Compile to listenable runnable
runnable, _ := g.CompileListenable()

// Create streaming runnable
streaming := graph.NewStreamingRunnableWithDefaults(runnable)

// Stream execution
result := streaming.Stream(context.Background(), initialState)

// Process events
for event := range result.Events {
	fmt.Printf("Event: %v\n", event)
}

Listener System

The package provides a powerful listener system for monitoring and reacting to graph events:

  • ProgressListener: Track execution progress
  • LoggingListener: Structured logging of events
  • MetricsListener: Collect performance metrics
  • ChatListener: Chat-style output formatting
  • Custom listeners: Implement NodeListener interface

Error Handling

  • Built-in retry policies with exponential backoff
  • Custom error filtering for selective retries
  • Interrupt handling for pausing execution
  • Comprehensive error context in events

Visualization

Export graphs for documentation and debugging:

exporter := graph.NewExporter(g)

// Mermaid diagram
mermaid := exporter.DrawMermaid()

// Mermaid with options
mermaidWithOptions := exporter.DrawMermaidWithOptions(graph.MermaidOptions{
	Direction: "LR", // Left to right
})

Thread Safety

All graph structures are thread-safe for read operations. Write operations (adding nodes, edges, or listeners) should be performed before compilation or protected by external synchronization.

Best Practices

  1. Use typed graphs when possible for better type safety
  2. Set appropriate buffer sizes for streaming to balance memory and performance
  3. Implement proper error handling in node functions
  4. Use checkpoints for long-running or critical workflows
  5. Add listeners for debugging and monitoring
  6. Keep node functions pure and stateless when possible
  7. Use conditional edges for complex routing logic
  8. Leverage parallel execution for independent tasks

Index

Examples

Constants

View Source
const END = "END"

END is a special constant used to represent the end node in the graph.

Variables

View Source
var (
	// ErrEntryPointNotSet is returned when the entry point of the graph is not set.
	ErrEntryPointNotSet = errors.New("entry point not set")

	// ErrNodeNotFound is returned when a node is not found in the graph.
	ErrNodeNotFound = errors.New("node not found")

	// ErrNoOutgoingEdge is returned when no outgoing edge is found for a node.
	ErrNoOutgoingEdge = errors.New("no outgoing edge found for node")
)

Functions

func AddMessages

func AddMessages(current, new any) (any, error)

AddMessages is a reducer designed for merging chat messages. It handles ID-based deduplication and upserts. If a new message has the same ID as an existing one, it replaces the existing one. Otherwise, it appends the new message.

func AddNestedConditionalSubgraph added in v0.8.0

func AddNestedConditionalSubgraph[S, SubS any](
	g *StateGraph[S],
	name string,
	router func(S) string,
	subgraphs map[string]*StateGraph[SubS],
	converter func(S) SubS,
	resultConverter func(SubS) S,
) error

AddNestedConditionalSubgraph creates a subgraph with its own conditional routing

func AddRecursiveSubgraph added in v0.8.0

func AddRecursiveSubgraph[S, SubS any](
	g *StateGraph[S],
	name string,
	maxDepth int,
	condition func(SubS, int) bool,
	builder func(*StateGraph[SubS]) error,
	converter func(S) SubS,
	resultConverter func(SubS) S,
) error

AddRecursiveSubgraph adds a recursive subgraph to the parent graph

func AddSubgraph added in v0.8.0

func AddSubgraph[S, SubS any](g *StateGraph[S], name string, subgraph *StateGraph[SubS], converter func(S) SubS, resultConverter func(SubS) S) error

AddSubgraph adds a subgraph as a node in the parent graph

func AppendReducer

func AppendReducer(current, new any) (any, error)

AppendReducer appends the new value to the current slice. It supports appending a slice to a slice, or a single element to a slice.

func AppendSliceMerge added in v0.6.0

func AppendSliceMerge(current, new reflect.Value) reflect.Value

AppendSliceMerge appends new slice to current slice.

func ContextWithSpan

func ContextWithSpan(ctx context.Context, span *TraceSpan) context.Context

ContextWithSpan returns a new context with the span stored

func CreateSubgraph added in v0.8.0

func CreateSubgraph[S, SubS any](g *StateGraph[S], name string, builder func(*StateGraph[SubS]) error, converter func(S) SubS, resultConverter func(SubS) S) error

CreateSubgraph creates and adds a subgraph using a builder function

func DefaultStructMerge added in v0.6.0

func DefaultStructMerge[S any](current, new S) (S, error)

DefaultStructMerge provides a default merge function for struct states. It uses reflection to merge non-zero fields from new into current. This is a sensible default for most struct types.

func ExponentialBackoffRetry

func ExponentialBackoffRetry(
	ctx context.Context,
	fn func() (any, error),
	maxAttempts int,
	baseDelay time.Duration,
) (any, error)

ExponentialBackoffRetry implements exponential backoff with jitter

func GetResumeValue

func GetResumeValue(ctx context.Context) any

GetResumeValue retrieves the resume value from the context.

func Interrupt

func Interrupt(ctx context.Context, value any) (any, error)

Interrupt pauses execution and waits for input. If resuming, it returns the value provided in the resume command.

func KeepCurrentMerge added in v0.6.0

func KeepCurrentMerge(current, new reflect.Value) reflect.Value

KeepCurrentMerge always keeps the current value (ignores new).

func MaxIntMerge added in v0.6.0

func MaxIntMerge(current, new reflect.Value) reflect.Value

MaxIntMerge takes the maximum of two integer values.

func MinIntMerge added in v0.6.0

func MinIntMerge(current, new reflect.Value) reflect.Value

MinIntMerge takes the minimum of two integer values.

func NewFileCheckpointStore

func NewFileCheckpointStore(path string) (store.CheckpointStore, error)

NewFileCheckpointStore creates a new file-based checkpoint store

func NewMemoryCheckpointStore

func NewMemoryCheckpointStore() store.CheckpointStore

NewMemoryCheckpointStore creates a new in-memory checkpoint store

func OverwriteMerge added in v0.6.0

func OverwriteMerge(current, new reflect.Value) reflect.Value

OverwriteMerge always uses the new value.

func OverwriteReducer

func OverwriteReducer(current, new any) (any, error)

OverwriteReducer replaces the old value with the new one.

func OverwriteStructMerge added in v0.6.0

func OverwriteStructMerge[S any](current, new S) (S, error)

OverwriteStructMerge is a merge function that completely replaces the current state with the new state.

func SafeGo added in v0.6.0

func SafeGo(wg *sync.WaitGroup, fn func(), onPanic func(any))

SafeGo runs a function in a goroutine with panic recovery. It uses a WaitGroup (if provided) and supports a custom panic handler.

func SumIntMerge added in v0.6.0

func SumIntMerge(current, new reflect.Value) reflect.Value

SumIntMerge adds two integer values.

func WithConfig

func WithConfig(ctx context.Context, config *Config) context.Context

WithConfig adds the config to the context

func WithResumeValue

func WithResumeValue(ctx context.Context, value any) context.Context

WithResumeValue adds a resume value to the context. This value will be returned by Interrupt() when re-executing a node.

Types

type BackoffStrategy

type BackoffStrategy int

BackoffStrategy defines different backoff strategies

const (
	FixedBackoff BackoffStrategy = iota
	ExponentialBackoff
	LinearBackoff
)

type CallbackHandler

type CallbackHandler interface {
	// Chain callbacks (for graph/workflow execution)
	OnChainStart(ctx context.Context, serialized map[string]any, inputs map[string]any, runID string, parentRunID *string, tags []string, metadata map[string]any)
	OnChainEnd(ctx context.Context, outputs map[string]any, runID string)
	OnChainError(ctx context.Context, err error, runID string)

	// LLM callbacks (for AI model calls)
	OnLLMStart(ctx context.Context, serialized map[string]any, prompts []string, runID string, parentRunID *string, tags []string, metadata map[string]any)
	OnLLMEnd(ctx context.Context, response any, runID string)
	OnLLMError(ctx context.Context, err error, runID string)

	// Tool callbacks (for tool/function calls)
	OnToolStart(ctx context.Context, serialized map[string]any, inputStr string, runID string, parentRunID *string, tags []string, metadata map[string]any)
	OnToolEnd(ctx context.Context, output string, runID string)
	OnToolError(ctx context.Context, err error, runID string)

	// Retriever callbacks (for data retrieval operations)
	OnRetrieverStart(ctx context.Context, serialized map[string]any, query string, runID string, parentRunID *string, tags []string, metadata map[string]any)
	OnRetrieverEnd(ctx context.Context, documents []any, runID string)
	OnRetrieverError(ctx context.Context, err error, runID string)
}

CallbackHandler defines the interface for handling graph execution callbacks This matches Python's LangChain callback pattern

type ChatListener

type ChatListener struct {
	// contains filtered or unexported fields
}

ChatListener provides real-time chat-style updates

func NewChatListener

func NewChatListener() *ChatListener

NewChatListener creates a new chat-style listener

func NewChatListenerWithWriter

func NewChatListenerWithWriter(writer io.Writer) *ChatListener

NewChatListenerWithWriter creates a chat listener with custom writer

func (*ChatListener) OnNodeEvent

func (cl *ChatListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, _ map[string]any, err error)

OnNodeEvent implements the NodeListener[map[string]any] interface

func (*ChatListener) SetNodeMessage

func (cl *ChatListener) SetNodeMessage(nodeName, message string)

SetNodeMessage sets a custom message for a specific node

func (*ChatListener) WithTime

func (cl *ChatListener) WithTime(enabled bool) *ChatListener

WithTime enables or disables timestamps

type Checkpoint

type Checkpoint = store.Checkpoint

Checkpoint is an alias for store.Checkpoint

type CheckpointConfig

type CheckpointConfig struct {
	// Store is the checkpoint storage backend
	Store store.CheckpointStore

	// AutoSave enables automatic checkpointing after each node
	AutoSave bool

	// SaveInterval specifies how often to save (when AutoSave is false)
	SaveInterval time.Duration

	// MaxCheckpoints limits the number of checkpoints to keep
	MaxCheckpoints int
}

CheckpointConfig configures checkpointing behavior

func DefaultCheckpointConfig

func DefaultCheckpointConfig() CheckpointConfig

DefaultCheckpointConfig returns a default checkpoint configuration

type CheckpointListener

type CheckpointListener[S any] struct {
	// contains filtered or unexported fields
}

CheckpointListener automatically creates checkpoints during execution

func (*CheckpointListener[S]) OnChainEnd added in v0.8.0

func (cl *CheckpointListener[S]) OnChainEnd(context.Context, map[string]any, string)

func (*CheckpointListener[S]) OnChainError added in v0.8.0

func (cl *CheckpointListener[S]) OnChainError(context.Context, error, string)

func (*CheckpointListener[S]) OnChainStart added in v0.8.0

func (cl *CheckpointListener[S]) OnChainStart(context.Context, map[string]any, map[string]any, string, *string, []string, map[string]any)

Implement other methods of CallbackHandler as no-ops

func (*CheckpointListener[S]) OnGraphStep

func (cl *CheckpointListener[S]) OnGraphStep(ctx context.Context, nodeName string, state any)

OnGraphStep is called after a step in the graph has completed and the state has been merged.

func (*CheckpointListener[S]) OnLLMEnd added in v0.8.0

func (cl *CheckpointListener[S]) OnLLMEnd(context.Context, any, string)

func (*CheckpointListener[S]) OnLLMError added in v0.8.0

func (cl *CheckpointListener[S]) OnLLMError(context.Context, error, string)

func (*CheckpointListener[S]) OnLLMStart added in v0.8.0

func (cl *CheckpointListener[S]) OnLLMStart(context.Context, map[string]any, []string, string, *string, []string, map[string]any)

func (*CheckpointListener[S]) OnRetrieverEnd added in v0.8.0

func (cl *CheckpointListener[S]) OnRetrieverEnd(context.Context, []any, string)

func (*CheckpointListener[S]) OnRetrieverError added in v0.8.0

func (cl *CheckpointListener[S]) OnRetrieverError(context.Context, error, string)

func (*CheckpointListener[S]) OnRetrieverStart added in v0.8.0

func (cl *CheckpointListener[S]) OnRetrieverStart(context.Context, map[string]any, string, string, *string, []string, map[string]any)

func (*CheckpointListener[S]) OnToolEnd added in v0.8.0

func (cl *CheckpointListener[S]) OnToolEnd(context.Context, string, string)

func (*CheckpointListener[S]) OnToolError added in v0.8.0

func (cl *CheckpointListener[S]) OnToolError(context.Context, error, string)

func (*CheckpointListener[S]) OnToolStart added in v0.8.0

func (cl *CheckpointListener[S]) OnToolStart(context.Context, map[string]any, string, string, *string, []string, map[string]any)

type CheckpointStore

type CheckpointStore = store.CheckpointStore

CheckpointStore is an alias for store.CheckpointStore

type CheckpointableRunnable

type CheckpointableRunnable[S any] struct {
	// contains filtered or unexported fields
}

CheckpointableRunnable[S] wraps a ListenableRunnable[S] with checkpointing capabilities

func NewCheckpointableRunnable

func NewCheckpointableRunnable[S any](runnable *ListenableRunnable[S], config CheckpointConfig) *CheckpointableRunnable[S]

NewCheckpointableRunnable creates a new checkpointable runnable from a listenable runnable

func (*CheckpointableRunnable[S]) ClearCheckpoints

func (cr *CheckpointableRunnable[S]) ClearCheckpoints(ctx context.Context) error

ClearCheckpoints removes all checkpoints for this execution

func (*CheckpointableRunnable[S]) GetExecutionID added in v0.8.0

func (cr *CheckpointableRunnable[S]) GetExecutionID() string

GetExecutionID returns the current execution ID

func (*CheckpointableRunnable[S]) GetGraph added in v0.8.0

func (cr *CheckpointableRunnable[S]) GetGraph() *ListenableStateGraph[S]

GetGraph returns the underlying graph

func (*CheckpointableRunnable[S]) GetState

func (cr *CheckpointableRunnable[S]) GetState(ctx context.Context, config *Config) (*StateSnapshot, error)

GetState retrieves the state for the given config

func (*CheckpointableRunnable[S]) GetTracer added in v0.8.0

func (cr *CheckpointableRunnable[S]) GetTracer() *Tracer

GetTracer returns the tracer from the underlying runnable

func (*CheckpointableRunnable[S]) Invoke

func (cr *CheckpointableRunnable[S]) Invoke(ctx context.Context, initialState S) (S, error)

Invoke executes the graph with checkpointing support

func (*CheckpointableRunnable[S]) InvokeWithConfig

func (cr *CheckpointableRunnable[S]) InvokeWithConfig(ctx context.Context, initialState S, config *Config) (S, error)

InvokeWithConfig executes the graph with checkpointing support and config

func (*CheckpointableRunnable[S]) ListCheckpoints

func (cr *CheckpointableRunnable[S]) ListCheckpoints(ctx context.Context) ([]*store.Checkpoint, error)

ListCheckpoints lists all checkpoints for the current execution

func (*CheckpointableRunnable[S]) LoadCheckpoint

func (cr *CheckpointableRunnable[S]) LoadCheckpoint(ctx context.Context, checkpointID string) (*store.Checkpoint, error)

LoadCheckpoint loads a specific checkpoint

func (*CheckpointableRunnable[S]) SaveCheckpoint

func (cr *CheckpointableRunnable[S]) SaveCheckpoint(ctx context.Context, nodeName string, state S) error

SaveCheckpoint manually saves a checkpoint at the current state

func (*CheckpointableRunnable[S]) SetExecutionID added in v0.8.0

func (cr *CheckpointableRunnable[S]) SetExecutionID(executionID string)

SetExecutionID sets a new execution ID

func (*CheckpointableRunnable[S]) SetTracer added in v0.8.0

func (cr *CheckpointableRunnable[S]) SetTracer(tracer *Tracer)

SetTracer sets the tracer on the underlying runnable

func (*CheckpointableRunnable[S]) Stream added in v0.8.0

func (cr *CheckpointableRunnable[S]) Stream(ctx context.Context, initialState S) <-chan StreamEvent[S]

Stream executes the graph with checkpointing and streaming support

func (*CheckpointableRunnable[S]) UpdateState

func (cr *CheckpointableRunnable[S]) UpdateState(ctx context.Context, config *Config, asNode string, values S) (*Config, error)

UpdateState updates the state and saves a checkpoint.

func (*CheckpointableRunnable[S]) WithTracer added in v0.8.0

func (cr *CheckpointableRunnable[S]) WithTracer(tracer *Tracer) *CheckpointableRunnable[S]

WithTracer returns a new CheckpointableRunnable with the given tracer

type CheckpointableStateGraph added in v0.6.0

type CheckpointableStateGraph[S any] struct {
	*ListenableStateGraph[S]
	// contains filtered or unexported fields
}

CheckpointableStateGraph[S any] extends ListenableStateGraph[S] with checkpointing

func NewCheckpointableStateGraph added in v0.6.0

func NewCheckpointableStateGraph[S any]() *CheckpointableStateGraph[S]

NewCheckpointableStateGraph creates a new checkpointable state graph with type parameter

func NewCheckpointableStateGraphWithConfig added in v0.6.0

func NewCheckpointableStateGraphWithConfig[S any](config CheckpointConfig) *CheckpointableStateGraph[S]

NewCheckpointableStateGraphWithConfig creates a checkpointable graph with custom config

func (*CheckpointableStateGraph[S]) CompileCheckpointable added in v0.6.0

func (g *CheckpointableStateGraph[S]) CompileCheckpointable() (*CheckpointableRunnable[S], error)

CompileCheckpointable compiles the graph into a checkpointable runnable

func (*CheckpointableStateGraph[S]) GetCheckpointConfig added in v0.6.0

func (g *CheckpointableStateGraph[S]) GetCheckpointConfig() CheckpointConfig

GetCheckpointConfig returns the current checkpointing configuration

func (*CheckpointableStateGraph[S]) SetCheckpointConfig added in v0.6.0

func (g *CheckpointableStateGraph[S]) SetCheckpointConfig(config CheckpointConfig)

SetCheckpointConfig updates the checkpointing configuration

type CircuitBreaker

type CircuitBreaker[S any] struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern

func NewCircuitBreaker

func NewCircuitBreaker[S any](node TypedNode[S], config CircuitBreakerConfig) *CircuitBreaker[S]

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker[S]) Execute

func (cb *CircuitBreaker[S]) Execute(ctx context.Context, state S) (S, error)

Execute runs the node with circuit breaker logic

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	FailureThreshold int           // Number of failures before opening
	SuccessThreshold int           // Number of successes before closing
	Timeout          time.Duration // Time before attempting to close
	HalfOpenMaxCalls int           // Max calls in half-open state
}

CircuitBreakerConfig configures circuit breaker behavior

type CircuitBreakerState

type CircuitBreakerState int

CircuitBreakerState represents the state of a circuit breaker

const (
	CircuitClosed CircuitBreakerState = iota
	CircuitOpen
	CircuitHalfOpen
)

type Command

type Command struct {
	// Update is the value to update the state with.
	// It will be processed by the schema's reducers.
	Update any

	// Goto specifies the next node(s) to execute.
	// If set, it overrides the graph's edges.
	// Can be a single string (node name) or []string.
	Goto any
}

Command allows a node to dynamically update the state and control the flow. It can be returned by a node function instead of a direct state update.

type CompositeGraph

type CompositeGraph[S any] struct {
	// contains filtered or unexported fields
}

CompositeGraph allows composing multiple graphs together

func NewCompositeGraph

func NewCompositeGraph[S any]() *CompositeGraph[S]

NewCompositeGraph creates a new composite graph

func (*CompositeGraph[S]) AddGraph

func (cg *CompositeGraph[S]) AddGraph(name string, graph *StateGraph[S])

AddGraph adds a named graph to the composite

func (*CompositeGraph[S]) Compile

func (cg *CompositeGraph[S]) Compile() (*StateRunnable[S], error)

Compile compiles the composite graph into a single runnable

func (*CompositeGraph[S]) Connect

func (cg *CompositeGraph[S]) Connect(
	fromGraph string,
	fromNode string,
	toGraph string,
	toNode string,
	transform func(S) S,
) error

Connect connects two graphs with a transformation function

type Config

type Config struct {
	// Callbacks to be invoked during execution
	Callbacks []CallbackHandler `json:"callbacks"`

	// Metadata to attach to the execution
	Metadata map[string]any `json:"metadata"`

	// Tags to categorize the execution
	Tags []string `json:"tags"`

	// Configurable parameters for the execution
	Configurable map[string]any `json:"configurable"`

	// RunName for this execution
	RunName string `json:"run_name"`

	// Timeout for the execution
	Timeout *time.Duration `json:"timeout"`

	// InterruptBefore nodes to stop before execution
	InterruptBefore []string `json:"interrupt_before"`

	// InterruptAfter nodes to stop after execution
	InterruptAfter []string `json:"interrupt_after"`

	// ResumeFrom nodes to start execution from (bypassing entry point)
	ResumeFrom []string `json:"resume_from"`

	// ResumeValue provides the value to return from an Interrupt() call when resuming
	ResumeValue any `json:"resume_value"`
}

Config represents configuration for graph invocation This matches Python's config dict pattern

func GetConfig

func GetConfig(ctx context.Context) *Config

GetConfig retrieves the config from the context

func WithInterruptAfter added in v0.8.2

func WithInterruptAfter(nodes ...string) *Config

WithInterruptAfter creates a Config with interrupt points set after specified nodes.

Example:

config := graph.WithInterruptAfter("node1", "node2")
result, err := runnable.Invoke(ctx, state, config)

func WithInterruptBefore added in v0.8.2

func WithInterruptBefore(nodes ...string) *Config

WithInterruptBefore creates a Config with interrupt points set before specified nodes.

Example:

config := graph.WithInterruptBefore("node1", "node2")
result, err := runnable.Invoke(ctx, state, config)

func WithThreadID added in v0.8.2

func WithThreadID(threadID string) *Config

WithThreadID creates a Config with the given thread_id set in the configurable map. This is a convenience function for setting up checkpoint-based conversation resumption.

Example:

result, err := runnable.Invoke(ctx, state, graph.WithThreadID("conversation-1"))

type Edge

type Edge struct {
	// From is the name of the node from which the edge originates.
	From string

	// To is the name of the node to which the edge points.
	To string
}

Edge represents an edge in the graph.

type Exporter

type Exporter[S any] struct {
	// contains filtered or unexported fields
}

Exporter provides methods to export graphs in different formats

func GetGraphForRunnable added in v0.8.0

func GetGraphForRunnable(r *Runnable) *Exporter[map[string]any]

GetGraphForRunnable returns a Exporter for the compiled graph's visualization

func NewExporter

func NewExporter[S any](graph *StateGraph[S]) *Exporter[S]

NewExporter creates a new graph exporter for the given graph

func (*Exporter[S]) DrawASCII

func (ge *Exporter[S]) DrawASCII() string

DrawASCII generates an ASCII tree representation of the graph

func (*Exporter[S]) DrawDOT

func (ge *Exporter[S]) DrawDOT() string

DrawDOT generates a DOT (Graphviz) representation of the graph

func (*Exporter[S]) DrawMermaid

func (ge *Exporter[S]) DrawMermaid() string

DrawMermaid generates a Mermaid diagram representation of the graph

func (*Exporter[S]) DrawMermaidWithOptions

func (ge *Exporter[S]) DrawMermaidWithOptions(opts MermaidOptions) string

DrawMermaidWithOptions generates a Mermaid diagram with custom options

type FieldMerger added in v0.6.0

type FieldMerger[S any] struct {
	InitialValue  S
	FieldMergeFns map[string]func(currentVal, newVal reflect.Value) reflect.Value
}

FieldMerger provides fine-grained control over how individual struct fields are merged.

func NewFieldMerger added in v0.6.0

func NewFieldMerger[S any](initial S) *FieldMerger[S]

NewFieldMerger creates a new FieldMerger with the given initial value.

func (*FieldMerger[S]) Init added in v0.6.0

func (fm *FieldMerger[S]) Init() S

Init returns the initial state.

func (*FieldMerger[S]) RegisterFieldMerge added in v0.6.0

func (fm *FieldMerger[S]) RegisterFieldMerge(fieldName string, mergeFn func(currentVal, newVal reflect.Value) reflect.Value)

RegisterFieldMerge registers a custom merge function for a specific field.

func (*FieldMerger[S]) Update added in v0.6.0

func (fm *FieldMerger[S]) Update(current, new S) (S, error)

Update merges the new state into the current state using registered field merge functions.

type GraphCallbackHandler

type GraphCallbackHandler interface {
	CallbackHandler
	// OnGraphStep is called after a step (node execution + state update) is completed
	OnGraphStep(ctx context.Context, stepNode string, state any)
}

GraphCallbackHandler extends CallbackHandler with graph-specific events

type GraphInterrupt

type GraphInterrupt struct {
	// Node that caused the interruption
	Node string
	// State at the time of interruption
	State any
	// NextNodes that would have been executed if not interrupted
	NextNodes []string
	// InterruptValue is the value provided by the dynamic interrupt (if any)
	InterruptValue any
}

GraphInterrupt is returned when execution is interrupted by configuration or dynamic interrupt

func (*GraphInterrupt) Error

func (e *GraphInterrupt) Error() string

type ListenableNode

type ListenableNode[S any] struct {
	TypedNode[S]
	// contains filtered or unexported fields
}

ListenableNode extends TypedNode with listener capabilities

func NewListenableNode

func NewListenableNode[S any](node TypedNode[S]) *ListenableNode[S]

NewListenableNode creates a new listenable node from a regular typed node

func (*ListenableNode[S]) AddListener

func (ln *ListenableNode[S]) AddListener(listener NodeListener[S]) *ListenableNode[S]

AddListener adds a listener to the node and returns the listenable node for chaining

func (*ListenableNode[S]) AddListenerWithID added in v0.8.0

func (ln *ListenableNode[S]) AddListenerWithID(listener NodeListener[S]) string

AddListenerWithID adds a listener to the node and returns its ID

func (*ListenableNode[S]) Execute

func (ln *ListenableNode[S]) Execute(ctx context.Context, state S) (S, error)

Execute runs the node function with listener notifications

func (*ListenableNode[S]) GetListenerIDs added in v0.8.0

func (ln *ListenableNode[S]) GetListenerIDs() []string

GetListenerIDs returns a copy of the current listener IDs

func (*ListenableNode[S]) GetListeners

func (ln *ListenableNode[S]) GetListeners() []NodeListener[S]

GetListeners returns a copy of the current listeners

func (*ListenableNode[S]) NotifyListeners

func (ln *ListenableNode[S]) NotifyListeners(ctx context.Context, event NodeEvent, state S, err error)

NotifyListeners notifies all listeners of an event

func (*ListenableNode[S]) RemoveListener

func (ln *ListenableNode[S]) RemoveListener(listenerID string)

RemoveListener removes a listener from the node by ID

func (*ListenableNode[S]) RemoveListenerByFunc added in v0.8.0

func (ln *ListenableNode[S]) RemoveListenerByFunc(listener NodeListener[S])

RemoveListenerByFunc removes a listener from the node by comparing listener values

type ListenableRunnable

type ListenableRunnable[S any] struct {
	// contains filtered or unexported fields
}

ListenableRunnable wraps a StateRunnable with listener capabilities

func (*ListenableRunnable[S]) GetGraph

func (lr *ListenableRunnable[S]) GetGraph() *Exporter[S]

GetGraph returns an Exporter for visualization

func (*ListenableRunnable[S]) GetListenableGraph added in v0.8.0

func (lr *ListenableRunnable[S]) GetListenableGraph() *ListenableStateGraph[S]

GetListenableGraph returns the underlying ListenableStateGraph

func (*ListenableRunnable[S]) GetTracer added in v0.8.0

func (lr *ListenableRunnable[S]) GetTracer() *Tracer

GetTracer returns the tracer from the underlying runnable

func (*ListenableRunnable[S]) Invoke

func (lr *ListenableRunnable[S]) Invoke(ctx context.Context, initialState S) (S, error)

Invoke executes the graph with listener notifications

func (*ListenableRunnable[S]) InvokeWithConfig

func (lr *ListenableRunnable[S]) InvokeWithConfig(ctx context.Context, initialState S, config *Config) (S, error)

InvokeWithConfig executes the graph with listener notifications and config

func (*ListenableRunnable[S]) SetTracer added in v0.8.0

func (lr *ListenableRunnable[S]) SetTracer(tracer *Tracer)

SetTracer sets a tracer for the underlying runnable

func (*ListenableRunnable[S]) Stream added in v0.8.0

func (lr *ListenableRunnable[S]) Stream(ctx context.Context, initialState S) <-chan StreamEvent[S]

Stream executes the graph with listener notifications and streams events

func (*ListenableRunnable[S]) WithTracer added in v0.8.0

func (lr *ListenableRunnable[S]) WithTracer(tracer *Tracer) *ListenableRunnable[S]

WithTracer returns a new ListenableRunnableWith the given tracer

type ListenableRunnableMap added in v0.8.0

type ListenableRunnableMap = ListenableRunnable[map[string]any]

ListenableRunnableMap is an alias for ListenableRunnable[map[string]any].

type ListenableStateGraph

type ListenableStateGraph[S any] struct {
	*StateGraph[S]
	// contains filtered or unexported fields
}

ListenableStateGraph extends StateGraph with listener capabilities

func NewListenableStateGraph

func NewListenableStateGraph[S any]() *ListenableStateGraph[S]

NewListenableStateGraph creates a new typed state graph with listener support

func (*ListenableStateGraph[S]) AddGlobalListener added in v0.6.0

func (g *ListenableStateGraph[S]) AddGlobalListener(listener NodeListener[S])

AddGlobalListener adds a listener to all nodes in the graph

func (*ListenableStateGraph[S]) AddNode added in v0.6.0

func (g *ListenableStateGraph[S]) AddNode(name string, description string, fn func(ctx context.Context, state S) (S, error)) *ListenableNode[S]

AddNode adds a node with listener capabilities

func (*ListenableStateGraph[S]) CompileListenable added in v0.6.0

func (g *ListenableStateGraph[S]) CompileListenable() (*ListenableRunnable[S], error)

CompileListenable creates a runnable with listener support

func (*ListenableStateGraph[S]) GetListenableNode added in v0.6.0

func (g *ListenableStateGraph[S]) GetListenableNode(name string) *ListenableNode[S]

GetListenableNode returns the listenable node by name

func (*ListenableStateGraph[S]) RemoveGlobalListener added in v0.6.0

func (g *ListenableStateGraph[S]) RemoveGlobalListener(listener NodeListener[S])

RemoveGlobalListener removes a listener from all nodes in the graph by function reference

func (*ListenableStateGraph[S]) RemoveGlobalListenerByID added in v0.8.0

func (g *ListenableStateGraph[S]) RemoveGlobalListenerByID(listenerID string)

RemoveGlobalListenerByID removes a listener from all nodes in the graph by ID

type ListenableStateGraphMap added in v0.8.0

type ListenableStateGraphMap = ListenableStateGraph[map[string]any]

ListenableStateGraphMap is an alias for ListenableStateGraph[map[string]any].

type LogLevel

type LogLevel int

LogLevel defines logging levels

const (
	LogLevelDebug LogLevel = iota
	LogLevelInfo
	LogLevelWarn
	LogLevelError
)

type LoggingListener

type LoggingListener struct {
	// contains filtered or unexported fields
}

LoggingListener provides structured logging for node events

func NewLoggingListener

func NewLoggingListener() *LoggingListener

NewLoggingListener creates a new logging listener

func NewLoggingListenerWithLogger

func NewLoggingListenerWithLogger(logger *log.Logger) *LoggingListener

NewLoggingListenerWithLogger creates a logging listener with custom logger

func (*LoggingListener) OnNodeEvent

func (ll *LoggingListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state map[string]any, err error)

OnNodeEvent implements the NodeListener[map[string]any] interface

func (*LoggingListener) WithLogLevel

func (ll *LoggingListener) WithLogLevel(level LogLevel) *LoggingListener

WithLogLevel sets the minimum log level

func (*LoggingListener) WithState

func (ll *LoggingListener) WithState(enabled bool) *LoggingListener

WithState enables or disables state logging

type MapReduceNode

type MapReduceNode[S any] struct {
	// contains filtered or unexported fields
}

MapReduceNode executes nodes in parallel and reduces results

func NewMapReduceNode

func NewMapReduceNode[S any](name string, reducer func([]S) (S, error), mapNodes ...TypedNode[S]) *MapReduceNode[S]

NewMapReduceNode creates a new map-reduce node

func (*MapReduceNode[S]) Execute

func (mr *MapReduceNode[S]) Execute(ctx context.Context, state S) (S, error)

Execute runs map nodes in parallel and reduces results

type MapSchema

type MapSchema struct {
	Reducers map[string]Reducer
}

MapSchema implements StateSchema for map[string]any. It allows defining reducers for specific keys.

func NewMapSchema

func NewMapSchema() *MapSchema

NewMapSchema creates a new MapSchema.

func (*MapSchema) Init

func (s *MapSchema) Init() map[string]any

Init returns an empty map.

func (*MapSchema) RegisterReducer

func (s *MapSchema) RegisterReducer(key string, reducer Reducer)

RegisterReducer adds a reducer for a specific key.

func (*MapSchema) Update

func (s *MapSchema) Update(current, new map[string]any) (map[string]any, error)

Update merges the new map into the current map using registered reducers.

type MermaidOptions

type MermaidOptions struct {
	// Direction of the flowchart (e.g., "TD", "LR")
	Direction string
}

MermaidOptions defines configuration for Mermaid diagram generation

type MessageWithID

type MessageWithID interface {
	GetID() string
	GetContent() llms.MessageContent
}

MessageWithID is an interface that allows messages to have an ID for deduplication/upsert. Since langchaingo's MessageContent doesn't have an ID field, we can wrap it or use a custom struct. For now, we'll check if the message implements this interface or is a map with an "id" key.

type MetricsListener

type MetricsListener struct {
	// contains filtered or unexported fields
}

MetricsListener collects performance and execution metrics

func NewMetricsListener

func NewMetricsListener() *MetricsListener

NewMetricsListener creates a new metrics listener

func (*MetricsListener) GetNodeAverageDuration

func (ml *MetricsListener) GetNodeAverageDuration() map[string]time.Duration

GetNodeAverageDuration returns the average duration for each node

func (*MetricsListener) GetNodeErrors

func (ml *MetricsListener) GetNodeErrors() map[string]int

GetNodeErrors returns the number of errors for each node

func (*MetricsListener) GetNodeExecutions

func (ml *MetricsListener) GetNodeExecutions() map[string]int

GetNodeExecutions returns the number of executions for each node

func (*MetricsListener) GetTotalExecutions

func (ml *MetricsListener) GetTotalExecutions() int

GetTotalExecutions returns the total number of node executions

func (*MetricsListener) OnNodeEvent

func (ml *MetricsListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, _ map[string]any, _ error)

OnNodeEvent implements the NodeListener[map[string]any] interface

func (*MetricsListener) PrintSummary

func (ml *MetricsListener) PrintSummary(writer io.Writer)

PrintSummary prints a summary of collected metrics

func (*MetricsListener) Reset

func (ml *MetricsListener) Reset()

Reset clears all collected metrics

type NoOpCallbackHandler

type NoOpCallbackHandler struct{}

NoOpCallbackHandler provides a no-op implementation of CallbackHandler

func (*NoOpCallbackHandler) OnChainEnd

func (n *NoOpCallbackHandler) OnChainEnd(ctx context.Context, outputs map[string]any, runID string)

func (*NoOpCallbackHandler) OnChainError

func (n *NoOpCallbackHandler) OnChainError(ctx context.Context, err error, runID string)

func (*NoOpCallbackHandler) OnChainStart

func (n *NoOpCallbackHandler) OnChainStart(ctx context.Context, serialized map[string]any, inputs map[string]any, runID string, parentRunID *string, tags []string, metadata map[string]any)

func (*NoOpCallbackHandler) OnLLMEnd

func (n *NoOpCallbackHandler) OnLLMEnd(ctx context.Context, response any, runID string)

func (*NoOpCallbackHandler) OnLLMError

func (n *NoOpCallbackHandler) OnLLMError(ctx context.Context, err error, runID string)

func (*NoOpCallbackHandler) OnLLMStart

func (n *NoOpCallbackHandler) OnLLMStart(ctx context.Context, serialized map[string]any, prompts []string, runID string, parentRunID *string, tags []string, metadata map[string]any)

func (*NoOpCallbackHandler) OnRetrieverEnd

func (n *NoOpCallbackHandler) OnRetrieverEnd(ctx context.Context, documents []any, runID string)

func (*NoOpCallbackHandler) OnRetrieverError

func (n *NoOpCallbackHandler) OnRetrieverError(ctx context.Context, err error, runID string)

func (*NoOpCallbackHandler) OnRetrieverStart

func (n *NoOpCallbackHandler) OnRetrieverStart(ctx context.Context, serialized map[string]any, query string, runID string, parentRunID *string, tags []string, metadata map[string]any)

func (*NoOpCallbackHandler) OnToolEnd

func (n *NoOpCallbackHandler) OnToolEnd(ctx context.Context, output string, runID string)

func (*NoOpCallbackHandler) OnToolError

func (n *NoOpCallbackHandler) OnToolError(ctx context.Context, err error, runID string)

func (*NoOpCallbackHandler) OnToolStart

func (n *NoOpCallbackHandler) OnToolStart(ctx context.Context, serialized map[string]any, inputStr string, runID string, parentRunID *string, tags []string, metadata map[string]any)

type NodeEvent

type NodeEvent string

NodeEvent represents different types of node events

const (
	// NodeEventStart indicates a node has started execution
	NodeEventStart NodeEvent = "start"

	// NodeEventProgress indicates progress during node execution
	NodeEventProgress NodeEvent = "progress"

	// NodeEventComplete indicates a node has completed successfully
	NodeEventComplete NodeEvent = "complete"

	// NodeEventError indicates a node encountered an error
	NodeEventError NodeEvent = "error"

	// EventChainStart indicates the graph execution has started
	EventChainStart NodeEvent = "chain_start"

	// EventChainEnd indicates the graph execution has completed
	EventChainEnd NodeEvent = "chain_end"

	// EventToolStart indicates a tool execution has started
	EventToolStart NodeEvent = "tool_start"

	// EventToolEnd indicates a tool execution has completed
	EventToolEnd NodeEvent = "tool_end"

	// EventLLMStart indicates an LLM call has started
	EventLLMStart NodeEvent = "llm_start"

	// EventLLMEnd indicates an LLM call has completed
	EventLLMEnd NodeEvent = "llm_end"

	// EventToken indicates a generated token (for streaming)
	EventToken NodeEvent = "token"

	// EventCustom indicates a custom user-defined event
	EventCustom NodeEvent = "custom"
)

type NodeInterrupt

type NodeInterrupt struct {
	// Node is the name of the node that triggered the interrupt
	Node string
	// Value is the data/query provided by the interrupt
	Value any
}

NodeInterrupt is returned when a node requests an interrupt (e.g. waiting for human input).

func (*NodeInterrupt) Error

func (e *NodeInterrupt) Error() string

type NodeListener

type NodeListener[S any] interface {
	// OnNodeEvent is called when a node event occurs
	OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state S, err error)
}

NodeListener defines the interface for typed node event listeners

type NodeListenerFunc

type NodeListenerFunc[S any] func(ctx context.Context, event NodeEvent, nodeName string, state S, err error)

NodeListenerFunc is a function adapter for NodeListener

func (NodeListenerFunc[S]) OnNodeEvent

func (f NodeListenerFunc[S]) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state S, err error)

OnNodeEvent implements the NodeListener interface

type ParallelNode

type ParallelNode[S any] struct {
	// contains filtered or unexported fields
}

ParallelNode represents a set of nodes that can execute in parallel

func NewParallelNode

func NewParallelNode[S any](name string, nodes ...TypedNode[S]) *ParallelNode[S]

NewParallelNode creates a new parallel node

func (*ParallelNode[S]) Execute

func (pn *ParallelNode[S]) Execute(ctx context.Context, state S) ([]S, error)

Execute runs all nodes in parallel and collects results

type ProgressListener

type ProgressListener struct {
	// contains filtered or unexported fields
}

ProgressListener provides progress tracking with customizable output

func NewProgressListener

func NewProgressListener() *ProgressListener

NewProgressListener creates a new progress listener

func NewProgressListenerWithWriter

func NewProgressListenerWithWriter(writer io.Writer) *ProgressListener

NewProgressListenerWithWriter creates a progress listener with custom writer

func (*ProgressListener) OnNodeEvent

func (pl *ProgressListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state map[string]any, err error)

OnNodeEvent implements the NodeListener[map[string]any] interface

func (*ProgressListener) SetNodeStep

func (pl *ProgressListener) SetNodeStep(nodeName, step string)

SetNodeStep sets a custom message for a specific node

func (*ProgressListener) WithDetails

func (pl *ProgressListener) WithDetails(enabled bool) *ProgressListener

WithDetails enables or disables detailed output

func (*ProgressListener) WithPrefix

func (pl *ProgressListener) WithPrefix(prefix string) *ProgressListener

WithPrefix sets a custom prefix for progress messages

func (*ProgressListener) WithTiming

func (pl *ProgressListener) WithTiming(enabled bool) *ProgressListener

WithTiming enables or disables timing information

type RateLimiter

type RateLimiter[S any] struct {
	// contains filtered or unexported fields
}

RateLimiter implements rate limiting for nodes

func NewRateLimiter

func NewRateLimiter[S any](node TypedNode[S], maxCalls int, window time.Duration) *RateLimiter[S]

NewRateLimiter creates a new rate limiter

func (*RateLimiter[S]) Execute

func (rl *RateLimiter[S]) Execute(ctx context.Context, state S) (S, error)

Execute runs the node with rate limiting

type RecursiveSubgraph

type RecursiveSubgraph[S any] struct {
	// contains filtered or unexported fields
}

RecursiveSubgraph allows a subgraph to call itself recursively

func NewRecursiveSubgraph

func NewRecursiveSubgraph[S any](
	name string,
	maxDepth int,
	condition func(S, int) bool,
) *RecursiveSubgraph[S]

NewRecursiveSubgraph creates a new recursive subgraph

func (*RecursiveSubgraph[S]) Execute

func (rs *RecursiveSubgraph[S]) Execute(ctx context.Context, state S) (S, error)

Execute runs the recursive subgraph

type Reducer

type Reducer func(current, new any) (any, error)

Reducer defines how a state value should be updated. It takes the current value and the new value, and returns the merged value.

type RetryConfig

type RetryConfig struct {
	MaxAttempts     int
	InitialDelay    time.Duration
	MaxDelay        time.Duration
	BackoffFactor   float64
	RetryableErrors func(error) bool // Determines if an error should trigger retry
}

RetryConfig configures retry behavior for nodes

func DefaultRetryConfig

func DefaultRetryConfig() *RetryConfig

DefaultRetryConfig returns a default retry configuration

type RetryNode

type RetryNode[S any] struct {
	// contains filtered or unexported fields
}

RetryNode wraps a node with retry logic

func NewRetryNode

func NewRetryNode[S any](node TypedNode[S], config *RetryConfig) *RetryNode[S]

NewRetryNode creates a new retry node

func (*RetryNode[S]) Execute

func (rn *RetryNode[S]) Execute(ctx context.Context, state S) (S, error)

Execute runs the node with retry logic

type RetryPolicy

type RetryPolicy struct {
	MaxRetries      int
	BackoffStrategy BackoffStrategy
	RetryableErrors []string
}

RetryPolicy defines how to handle node failures

type Runnable

type Runnable = StateRunnable[map[string]any]

Runnable is an alias for StateRunnable[map[string]any] for convenience.

type StateGraph

type StateGraph[S any] struct {

	// Schema defines the state structure and update logic
	Schema StateSchema[S]
	// contains filtered or unexported fields
}

StateGraph represents a generic state-based graph with compile-time type safety. The type parameter S represents the state type, which is typically a struct.

Example usage:

type MyState struct {
    Count int
    Name  string
}

g := graph.NewStateGraph[MyState]()
g.AddNode("increment", "Increment counter", func(ctx context.Context, state MyState) (MyState, error) {
    state.Count++
    return state, nil
})
Example
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/smallnest/langgraphgo/graph"
	"github.com/tmc/langchaingo/llms"
	"github.com/tmc/langchaingo/llms/openai"
)

func main() {
	// Skip if no OpenAI API key is available
	if os.Getenv("OPENAI_API_KEY") == "" {
		fmt.Println("[{human [{What is 1 + 1?}]} {ai [{1 + 1 equals 2.}]}]")
		return
	}

	model, err := openai.New()
	if err != nil {
		panic(err)
	}

	g := graph.NewStateGraph[[]llms.MessageContent]()

	g.AddNode("oracle", "oracle", func(ctx context.Context, state []llms.MessageContent) ([]llms.MessageContent, error) {
		r, err := model.GenerateContent(ctx, state, llms.WithTemperature(0.0))
		if err != nil {
			return nil, err
		}
		return append(state,
			llms.TextParts("ai", r.Choices[0].Content),
		), nil
	})
	g.AddNode(graph.END, graph.END, func(_ context.Context, state []llms.MessageContent) ([]llms.MessageContent, error) {
		return state, nil
	})

	g.AddEdge("oracle", graph.END)
	g.SetEntryPoint("oracle")

	runnable, err := g.Compile()
	if err != nil {
		panic(err)
	}

	ctx := context.Background()
	// Let's run it!
	res, err := runnable.Invoke(ctx, []llms.MessageContent{
		llms.TextParts("human", "What is 1 + 1?"),
	})
	if err != nil {
		panic(err)
	}

	fmt.Println(res)
}

func NewMessageGraph deprecated

func NewMessageGraph() *StateGraph[map[string]any]

NewMessageGraph creates a new instance of StateGraph[map[string]any] with a default schema that handles "messages" using the AddMessages reducer. This is the recommended constructor for chat-based agents that use map[string]any as state with a "messages" key.

Deprecated: Use NewStateGraph[MessageState]() for type-safe state management.

func NewStateGraph

func NewStateGraph[S any]() *StateGraph[S]

NewStateGraph creates a new instance of StateGraph with type safety. The type parameter S specifies the state type.

Example:

g := graph.NewStateGraph[MyState]()

func (*StateGraph[S]) AddConditionalEdge

func (g *StateGraph[S]) AddConditionalEdge(from string, condition func(ctx context.Context, state S) string)

AddConditionalEdge adds a conditional edge where the target node is determined at runtime. The condition function is fully typed - no type assertions needed!

Example:

g.AddConditionalEdge("check", func(ctx context.Context, state MyState) string {
    if state.Count > 10 {  // Type-safe access!
        return "high"
    }
    return "low"
})

func (*StateGraph[S]) AddEdge

func (g *StateGraph[S]) AddEdge(from, to string)

AddEdge adds a new edge to the state graph between the "from" and "to" nodes.

func (*StateGraph[S]) AddMapReduceNode added in v0.6.0

func (g *StateGraph[S]) AddMapReduceNode(
	name string,
	mapFunctions map[string]func(context.Context, S) (S, error),
	reducer func([]S) (S, error),
)

AddMapReduceNode adds a map-reduce pattern node

func (*StateGraph[S]) AddNode

func (g *StateGraph[S]) AddNode(name string, description string, fn func(ctx context.Context, state S) (S, error))

AddNode adds a new node to the state graph with the given name, description and function. The node function is fully typed - no type assertions needed!

Example:

g.AddNode("process", "Process data", func(ctx context.Context, state MyState) (MyState, error) {
    state.Count++  // Type-safe access!
    return state, nil
})

func (*StateGraph[S]) AddNodeWithCircuitBreaker added in v0.6.0

func (g *StateGraph[S]) AddNodeWithCircuitBreaker(
	name string,
	description string,
	fn func(context.Context, S) (S, error),
	config CircuitBreakerConfig,
)

AddNodeWithCircuitBreaker adds a node with circuit breaker

func (*StateGraph[S]) AddNodeWithRateLimit added in v0.6.0

func (g *StateGraph[S]) AddNodeWithRateLimit(
	name string,
	description string,
	fn func(context.Context, S) (S, error),
	maxCalls int,
	window time.Duration,
)

AddNodeWithRateLimit adds a node with rate limiting

func (*StateGraph[S]) AddNodeWithRetry added in v0.6.0

func (g *StateGraph[S]) AddNodeWithRetry(
	name string,
	description string,
	fn func(context.Context, S) (S, error),
	config *RetryConfig,
)

AddNodeWithRetry adds a node with retry logic

func (*StateGraph[S]) AddNodeWithTimeout added in v0.6.0

func (g *StateGraph[S]) AddNodeWithTimeout(
	name string,
	description string,
	fn func(context.Context, S) (S, error),
	timeout time.Duration,
)

AddNodeWithTimeout adds a node with timeout

func (*StateGraph[S]) AddParallelNodes added in v0.6.0

func (g *StateGraph[S]) AddParallelNodes(
	groupName string,
	nodes map[string]func(context.Context, S) (S, error),
	merger func([]S) S,
)

AddParallelNodes adds a set of nodes that execute in parallel. merger is used to combine the results from parallel execution into a single state S.

func (*StateGraph[S]) Compile

func (g *StateGraph[S]) Compile() (*StateRunnable[S], error)

Compile compiles the state graph and returns a StateRunnable instance.

func (*StateGraph[S]) FanOutFanIn added in v0.6.0

func (g *StateGraph[S]) FanOutFanIn(
	source string,
	_ []string,
	collector string,
	workerFuncs map[string]func(context.Context, S) (S, error),
	aggregator func([]S) S,
	collectFunc func(S) (S, error),
)

FanOutFanIn creates a fan-out/fan-in pattern. aggregator merges worker results into a state S that is passed to the collector.

func (*StateGraph[S]) SetEntryPoint

func (g *StateGraph[S]) SetEntryPoint(name string)

SetEntryPoint sets the entry point node name for the state graph.

func (*StateGraph[S]) SetRetryPolicy

func (g *StateGraph[S]) SetRetryPolicy(policy *RetryPolicy)

SetRetryPolicy sets the retry policy for the graph.

func (*StateGraph[S]) SetSchema

func (g *StateGraph[S]) SetSchema(schema StateSchema[S])

SetSchema sets the state schema for the graph.

func (*StateGraph[S]) SetStateMerger

func (g *StateGraph[S]) SetStateMerger(merger TypedStateMerger[S])

SetStateMerger sets the state merger function for the state graph.

type StateGraphMap added in v0.8.0

type StateGraphMap = StateGraph[map[string]any]

StateGraphMap is an alias for StateGraph[map[string]any] for convenience. Use NewStateGraph[map[string]any]() or NewStateGraph[S]() for other types.

type StateRunnable

type StateRunnable[S any] struct {
	// contains filtered or unexported fields
}

StateRunnable represents a compiled state graph that can be invoked with type safety.

func (*StateRunnable[S]) GetTracer added in v0.8.0

func (r *StateRunnable[S]) GetTracer() *Tracer

GetTracer returns the current tracer.

func (*StateRunnable[S]) Invoke

func (r *StateRunnable[S]) Invoke(ctx context.Context, initialState S) (S, error)

Invoke executes the compiled state graph with the given input state. Returns the final state with full type safety - no type assertions needed!

Example:

initialState := MyState{Count: 0}
finalState, err := app.Invoke(ctx, initialState)
// finalState is MyState type - no casting needed!

func (*StateRunnable[S]) InvokeWithConfig

func (r *StateRunnable[S]) InvokeWithConfig(ctx context.Context, initialState S, config *Config) (S, error)

InvokeWithConfig executes the compiled state graph with the given input state and config.

func (*StateRunnable[S]) SetTracer added in v0.6.0

func (r *StateRunnable[S]) SetTracer(tracer *Tracer)

SetTracer sets a tracer for observability.

func (*StateRunnable[S]) WithTracer added in v0.6.0

func (r *StateRunnable[S]) WithTracer(tracer *Tracer) *StateRunnable[S]

WithTracer returns a new StateRunnable with the given tracer.

type StateSchema

type StateSchema[S any] interface {
	// Init returns the initial state.
	Init() S

	// Update merges the new state into the current state.
	Update(current, new S) (S, error)
}

StateSchema defines the structure and update logic for the graph state with type safety.

type StateSnapshot

type StateSnapshot struct {
	Values    any
	Next      []string
	Config    Config
	Metadata  map[string]any
	CreatedAt time.Time
	ParentID  string
}

StateSnapshot represents a snapshot of the graph state

type StateTracedRunnable added in v0.8.0

type StateTracedRunnable[S any] struct {
	// contains filtered or unexported fields
}

StateTracedRunnable[S] wraps a StateRunnable[S] with tracing capabilities

func NewStateTracedRunnable added in v0.8.0

func NewStateTracedRunnable[S any](runnable *StateRunnable[S], tracer *Tracer) *StateTracedRunnable[S]

NewStateTracedRunnable creates a new generic traced runnable

func (*StateTracedRunnable[S]) GetTracer added in v0.8.0

func (tr *StateTracedRunnable[S]) GetTracer() *Tracer

GetTracer returns the tracer instance

func (*StateTracedRunnable[S]) Invoke added in v0.8.0

func (tr *StateTracedRunnable[S]) Invoke(ctx context.Context, initialState S) (S, error)

Invoke executes the graph with tracing enabled

type StreamConfig

type StreamConfig struct {
	// BufferSize is the size of the event channel buffer
	BufferSize int

	// EnableBackpressure determines if backpressure handling is enabled
	EnableBackpressure bool

	// MaxDroppedEvents is the maximum number of events to drop before logging
	MaxDroppedEvents int

	// Mode specifies what kind of events to stream
	Mode StreamMode
}

StreamConfig configures streaming behavior

func DefaultStreamConfig

func DefaultStreamConfig() StreamConfig

DefaultStreamConfig returns the default streaming configuration

type StreamEvent

type StreamEvent[S any] struct {
	// Timestamp when the event occurred
	Timestamp time.Time

	// NodeName is the name of the node that generated the event
	NodeName string

	// Event is the type of event
	Event NodeEvent

	// State is the current state at the time of the event (typed)
	State S

	// Error contains any error that occurred (if Event is NodeEventError)
	Error error

	// Metadata contains additional event-specific data
	Metadata map[string]any

	// Duration is how long the node took (only for Complete events)
	Duration time.Duration
}

StreamEvent represents a typed event in the streaming execution

type StreamMode

type StreamMode string

StreamMode defines the mode of streaming

const (
	// StreamModeValues emits the full state after each step
	StreamModeValues StreamMode = "values"
	// StreamModeUpdates emits the updates (deltas) from each node
	StreamModeUpdates StreamMode = "updates"
	// StreamModeMessages emits LLM messages/tokens (if available)
	StreamModeMessages StreamMode = "messages"
	// StreamModeDebug emits all events (default)
	StreamModeDebug StreamMode = "debug"
)

type StreamResult

type StreamResult[S any] struct {
	// Events channel receives StreamEvent objects in real-time
	Events <-chan StreamEvent[S]

	// Result channel receives the final result when execution completes
	Result <-chan S

	// Errors channel receives any errors that occur during execution
	Errors <-chan error

	// Done channel is closed when streaming is complete
	Done <-chan struct{}

	// Cancel function can be called to stop streaming
	Cancel context.CancelFunc
}

StreamResult contains the channels returned by streaming execution

type StreamingExecutor

type StreamingExecutor[S any] struct {
	// contains filtered or unexported fields
}

StreamingExecutor[S] provides a high-level interface for streaming execution

func NewStreamingExecutor

func NewStreamingExecutor[S any](runnable *StreamingRunnable[S]) *StreamingExecutor[S]

NewStreamingExecutor creates a new streaming executor

func (*StreamingExecutor[S]) ExecuteAsync

func (se *StreamingExecutor[S]) ExecuteAsync(ctx context.Context, initialState S) *StreamResult[S]

ExecuteAsync executes the graph asynchronously and returns immediately

func (*StreamingExecutor[S]) ExecuteWithCallback

func (se *StreamingExecutor[S]) ExecuteWithCallback(
	ctx context.Context,
	initialState S,
	eventCallback func(event StreamEvent[S]),
	resultCallback func(result S, err error),
) error

ExecuteWithCallback executes the graph and calls the callback for each event

type StreamingListener

type StreamingListener[S any] struct {
	// contains filtered or unexported fields
}

StreamingListener implements NodeListener for streaming events

func NewStreamingListener

func NewStreamingListener[S any](eventChan chan<- StreamEvent[S], config StreamConfig) *StreamingListener[S]

NewStreamingListener creates a new streaming listener

func (*StreamingListener[S]) Close

func (sl *StreamingListener[S]) Close()

Close marks the listener as closed to prevent sending to closed channels

func (*StreamingListener[S]) GetDroppedEventsCount

func (sl *StreamingListener[S]) GetDroppedEventsCount() int

GetDroppedEventsCount returns the number of dropped events

func (*StreamingListener[S]) OnNodeEvent

func (sl *StreamingListener[S]) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state S, err error)

OnNodeEvent implements the NodeListener interface

type StreamingRunnable

type StreamingRunnable[S any] struct {
	// contains filtered or unexported fields
}

StreamingRunnable wraps a ListenableRunnable with streaming capabilities

func NewStreamingRunnable

func NewStreamingRunnable[S any](runnable *ListenableRunnable[S], config StreamConfig) *StreamingRunnable[S]

NewStreamingRunnable creates a new streaming runnable

func NewStreamingRunnableWithDefaults

func NewStreamingRunnableWithDefaults[S any](runnable *ListenableRunnable[S]) *StreamingRunnable[S]

NewStreamingRunnableWithDefaults creates a streaming runnable with default config

func (*StreamingRunnable[S]) GetGraph

func (sr *StreamingRunnable[S]) GetGraph() *Exporter[S]

GetGraph returns a Exporter for the streaming runnable

func (*StreamingRunnable[S]) GetTracer added in v0.8.0

func (sr *StreamingRunnable[S]) GetTracer() *Tracer

GetTracer returns the tracer from the underlying runnable

func (*StreamingRunnable[S]) SetTracer added in v0.8.0

func (sr *StreamingRunnable[S]) SetTracer(tracer *Tracer)

SetTracer sets the tracer on the underlying runnable

func (*StreamingRunnable[S]) Stream

func (sr *StreamingRunnable[S]) Stream(ctx context.Context, initialState S) *StreamResult[S]

Stream executes the graph with real-time event streaming

func (*StreamingRunnable[S]) WithTracer added in v0.8.0

func (sr *StreamingRunnable[S]) WithTracer(tracer *Tracer) *StreamingRunnable[S]

WithTracer returns a new StreamingRunnable with the given tracer

type StreamingStateGraph added in v0.6.0

type StreamingStateGraph[S any] struct {
	*ListenableStateGraph[S]
	// contains filtered or unexported fields
}

StreamingStateGraph[S any] extends ListenableStateGraph[S] with streaming capabilities

func NewStreamingStateGraph added in v0.6.0

func NewStreamingStateGraph[S any]() *StreamingStateGraph[S]

NewStreamingStateGraph creates a new streaming state graph with type parameter

func NewStreamingStateGraphWithConfig added in v0.6.0

func NewStreamingStateGraphWithConfig[S any](config StreamConfig) *StreamingStateGraph[S]

NewStreamingStateGraphWithConfig creates a streaming graph with custom config

func (*StreamingStateGraph[S]) CompileStreaming added in v0.6.0

func (g *StreamingStateGraph[S]) CompileStreaming() (*StreamingRunnable[S], error)

CompileStreaming compiles the graph into a streaming runnable

func (*StreamingStateGraph[S]) GetStreamConfig added in v0.6.0

func (g *StreamingStateGraph[S]) GetStreamConfig() StreamConfig

GetStreamConfig returns the current streaming configuration

func (*StreamingStateGraph[S]) SetStreamConfig added in v0.6.0

func (g *StreamingStateGraph[S]) SetStreamConfig(config StreamConfig)

SetStreamConfig updates the streaming configuration

type StructSchema added in v0.6.0

type StructSchema[S any] struct {
	InitialValue S
	MergeFunc    func(current, new S) (S, error)
}

StructSchema implements StateSchema for struct-based states. It provides a simple and type-safe way to manage struct states.

Example:

type MyState struct {
    Count int
    Logs  []string
}

schema := graph.NewStructSchema(
    MyState{Count: 0},
    func(current, new MyState) (MyState, error) {
        // Merge logs (append)
        current.Logs = append(current.Logs, new.Logs...)
        // Add counts
        current.Count += new.Count
        return current, nil
    },
)

func NewStructSchema added in v0.6.0

func NewStructSchema[S any](initial S, merge func(S, S) (S, error)) *StructSchema[S]

NewStructSchema creates a new StructSchema with the given initial value and merge function. If merge function is nil, a default merge function will be used that overwrites non-zero fields.

func (*StructSchema[S]) Init added in v0.6.0

func (s *StructSchema[S]) Init() S

Init returns the initial state.

func (*StructSchema[S]) Update added in v0.6.0

func (s *StructSchema[S]) Update(current, new S) (S, error)

Update merges the new state into the current state using the merge function.

type Subgraph

type Subgraph[S any] struct {
	// contains filtered or unexported fields
}

Subgraph represents a nested graph that can be used as a node

func NewSubgraph

func NewSubgraph[S any](name string, graph *StateGraph[S]) (*Subgraph[S], error)

NewSubgraph creates a new generic subgraph

func (*Subgraph[S]) Execute

func (s *Subgraph[S]) Execute(ctx context.Context, state S) (S, error)

Execute runs the subgraph as a node

type TimeoutNode

type TimeoutNode[S any] struct {
	// contains filtered or unexported fields
}

TimeoutNode wraps a node with timeout logic

func NewTimeoutNode

func NewTimeoutNode[S any](node TypedNode[S], timeout time.Duration) *TimeoutNode[S]

NewTimeoutNode creates a new timeout node

func (*TimeoutNode[S]) Execute

func (tn *TimeoutNode[S]) Execute(ctx context.Context, state S) (S, error)

Execute runs the node with timeout

type TraceEvent

type TraceEvent string

TraceEvent represents different types of events in graph execution

const (
	// TraceEventGraphStart indicates the start of graph execution
	TraceEventGraphStart TraceEvent = "graph_start"

	// TraceEventGraphEnd indicates the end of graph execution
	TraceEventGraphEnd TraceEvent = "graph_end"

	// TraceEventNodeStart indicates the start of node execution
	TraceEventNodeStart TraceEvent = "node_start"

	// TraceEventNodeEnd indicates the end of node execution
	TraceEventNodeEnd TraceEvent = "node_end"

	// TraceEventNodeError indicates an error occurred in node execution
	TraceEventNodeError TraceEvent = "node_error"

	// TraceEventEdgeTraversal indicates traversal from one node to another
	TraceEventEdgeTraversal TraceEvent = "edge_traversal"
)

type TraceHook

type TraceHook interface {
	// OnEvent is called when a trace event occurs
	OnEvent(ctx context.Context, span *TraceSpan)
}

TraceHook defines the interface for trace event handlers

type TraceHookFunc

type TraceHookFunc func(ctx context.Context, span *TraceSpan)

TraceHookFunc is a function adapter for TraceHook

func (TraceHookFunc) OnEvent

func (f TraceHookFunc) OnEvent(ctx context.Context, span *TraceSpan)

OnEvent implements the TraceHook interface

type TraceSpan

type TraceSpan struct {
	// ID is a unique identifier for this span
	ID string

	// ParentID is the ID of the parent span (empty for root spans)
	ParentID string

	// Event indicates the type of event this span represents
	Event TraceEvent

	// NodeName is the name of the node being executed (if applicable)
	NodeName string

	// FromNode is the source node for edge traversals
	FromNode string

	// ToNode is the destination node for edge traversals
	ToNode string

	// StartTime is when this span began
	StartTime time.Time

	// EndTime is when this span completed (zero for ongoing spans)
	EndTime time.Time

	// Duration is the total time taken (calculated when span ends)
	Duration time.Duration

	// State is a snapshot of the state at this point (optional)
	State any

	// Error contains any error that occurred during execution
	Error error

	// Metadata contains additional key-value pairs for observability
	Metadata map[string]any
}

TraceSpan represents a span of execution with timing and metadata

func SpanFromContext

func SpanFromContext(ctx context.Context) *TraceSpan

SpanFromContext extracts a span from context

type Tracer

type Tracer struct {
	// contains filtered or unexported fields
}

Tracer manages trace collection and hooks

func NewTracer

func NewTracer() *Tracer

NewTracer creates a new tracer instance

func (*Tracer) AddHook

func (t *Tracer) AddHook(hook TraceHook)

AddHook registers a new trace hook

func (*Tracer) Clear

func (t *Tracer) Clear()

Clear removes all collected spans

func (*Tracer) EndSpan

func (t *Tracer) EndSpan(ctx context.Context, span *TraceSpan, state any, err error)

EndSpan completes a trace span

func (*Tracer) GetSpans

func (t *Tracer) GetSpans() map[string]*TraceSpan

GetSpans returns all collected spans

func (*Tracer) StartSpan

func (t *Tracer) StartSpan(ctx context.Context, event TraceEvent, nodeName string) *TraceSpan

StartSpan creates a new trace span

func (*Tracer) TraceEdgeTraversal

func (t *Tracer) TraceEdgeTraversal(ctx context.Context, fromNode, toNode string)

TraceEdgeTraversal records an edge traversal event

type TypedNode added in v0.8.0

type TypedNode[S any] struct {
	Name        string
	Description string
	Function    func(ctx context.Context, state S) (S, error)
}

TypedNode represents a typed node in the graph.

type TypedStateMerger added in v0.8.0

type TypedStateMerger[S any] func(ctx context.Context, currentState S, newStates []S) (S, error)

StateMerger is a typed function to merge states from parallel execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL