Documentation
¶
Overview ¶
Package graph provides the core graph construction and execution engine for LangGraph Go.
This package implements the fundamental building blocks for creating stateful, multi-agent applications using directed graphs. It offers both untyped and typed interfaces for building workflows, with support for parallel execution, checkpointing, streaming, and comprehensive event handling.
Core Concepts ¶
## StateGraph The primary component for building graphs is StateGraph, which maintains state as it flows through nodes. Each node can process and transform the state before passing it to the next node based on defined edges.
## Nodes and Edges Nodes represent processing units (functions, agents, tools) that transform state. Edges define the flow between nodes, supporting conditional routing based on state content.
## Typed Support For type safety, the package provides StateGraphTyped[S] which uses Go generics to enforce state types at compile time, reducing runtime errors and improving code maintainability.
Key Features ¶
- Parallel node execution with coordination
- Checkpointing for durable execution with resume capability
- Streaming for real-time event monitoring
- Comprehensive listener system for observability
- Built-in retry mechanisms with configurable policies
- Subgraph composition for modular design
- Graph visualization (Mermaid, PlantUML)
- Interrupt support for human-in-the-loop workflows
Example Usage ¶
## Basic State Graph
g := graph.NewStateGraph()
// Add nodes
g.AddNode("process", func(ctx context.Context, state map[string]any) (map[string]any, error) {
// Process the state
state["processed"] = true
return state, nil
})
g.AddNode("validate", func(ctx context.Context, state map[string]any) (map[string]any, error) {
// Validate the processed state
if state["processed"].(bool) {
state["valid"] = true
}
return state, nil
})
// Set entry point and edges
g.SetEntry("process")
g.AddEdge("process", "validate")
g.AddEdge("validate", graph.END)
// Compile and run
runnable := g.Compile()
result, err := runnable.Invoke(context.Background(), map[string]any{
"data": "example",
})
## Typed State Graph
type WorkflowState struct {
Input string `json:"input"`
Output string `json:"output"`
Complete bool `json:"complete"`
}
g := graph.NewStateGraphTyped(func() WorkflowState { return WorkflowState{} })
g.AddNodeTyped("process", func(ctx context.Context, state WorkflowState) (WorkflowState, error) {
state.Output = strings.ToUpper(state.Input)
state.Complete = true
return state, nil
})
// Conditional routing
g.AddConditionalEdge("process", func(ctx context.Context, state WorkflowState) string {
if state.Complete {
return "next"
}
return "retry"
}, "next", "retry")
## Parallel Execution
g.AddNodeParallel("parallel_tasks",
graph.NewParallelNode(
[]graph.Node{
{Name: "task1", Function: task1Func},
{Name: "task2", Function: task2Func},
},
),
)
## Checkpointing
store := graph.NewMemoryCheckpointStore()
g.WithCheckpointing(graph.CheckpointConfig{
Store: store,
})
// Execute with checkpoint
runnable := g.Compile()
result, err := runnable.Invoke(context.Background(), initialState,
graph.WithExecutionID("workflow-123"))
// Resume from checkpoint
resumed, err := runnable.Resume(context.Background(), "workflow-123", "checkpoint-456")
## Streaming
streaming := graph.NewStreamingStateGraph(g, graph.StreamConfig{
BufferSize: 100,
})
runnable := streaming.Compile()
result, err := runnable.Stream(context.Background(), initialState)
// Process events
for event := range result.Events {
fmt.Printf("Event: %v\n", event)
}
Listener System ¶
The package provides a powerful listener system for monitoring and reacting to graph events:
- ProgressListener: Track execution progress
- LoggingListener: Structured logging of events
- MetricsListener: Collect performance metrics
- ChatListener: Chat-style output formatting
- Custom listeners: Implement NodeListener interface
Error Handling ¶
- Built-in retry policies with exponential backoff
- Custom error filtering for selective retries
- Interrupt handling for pausing execution
- Comprehensive error context in events
Visualization ¶
Export graphs for documentation and debugging:
exporter := graph.NewExporter(g)
// Mermaid diagram
mermaid, err := exporter.Mermaid(graph.MermaidOptions{
Direction: "TD",
})
// PlantUML diagram
puml, err := exporter.PlantUML()
Thread Safety ¶
All graph structures are thread-safe for read operations. Write operations (adding nodes, edges, or listeners) should be performed before compilation or protected by external synchronization.
Best Practices ¶
- Use typed graphs when possible for better type safety
- Set appropriate buffer sizes for streaming to balance memory and performance
- Implement proper error handling in node functions
- Use checkpoints for long-running or critical workflows
- Add listeners for debugging and monitoring
- Keep node functions pure and stateless when possible
- Use conditional edges for complex routing logic
- Leverage parallel execution for independent tasks
Index ¶
- Constants
- Variables
- func AddMessages(current, new any) (any, error)
- func AppendReducer(current, new any) (any, error)
- func AppendSliceMerge(current, new reflect.Value) reflect.Value
- func ContextWithSpan(ctx context.Context, span *TraceSpan) context.Context
- func DefaultStructMerge[S any](current, new S) (S, error)
- func ExponentialBackoffRetry(ctx context.Context, fn func() (any, error), maxAttempts int, ...) (any, error)
- func GetResumeValue(ctx context.Context) any
- func Interrupt(ctx context.Context, value any) (any, error)
- func KeepCurrentMerge(current, new reflect.Value) reflect.Value
- func MaxIntMerge(current, new reflect.Value) reflect.Value
- func MinIntMerge(current, new reflect.Value) reflect.Value
- func OverwriteMerge(current, new reflect.Value) reflect.Value
- func OverwriteReducer(current, new any) (any, error)
- func OverwriteStructMerge[S any](current, new S) (S, error)
- func SafeGo(wg *sync.WaitGroup, fn func(), onPanic func(any))
- func SumIntMerge(current, new reflect.Value) reflect.Value
- func WithConfig(ctx context.Context, config *Config) context.Context
- func WithResumeValue(ctx context.Context, value any) context.Context
- type BackoffStrategy
- type CallbackHandler
- type ChatListener
- type Checkpoint
- type CheckpointConfig
- type CheckpointListener
- type CheckpointStore
- type CheckpointableRunnable
- func (cr *CheckpointableRunnable) ClearCheckpoints(ctx context.Context) error
- func (cr *CheckpointableRunnable) GetState(ctx context.Context, config *Config) (*StateSnapshot, error)
- func (cr *CheckpointableRunnable) Invoke(ctx context.Context, initialState any) (any, error)
- func (cr *CheckpointableRunnable) InvokeWithConfig(ctx context.Context, initialState any, config *Config) (any, error)
- func (cr *CheckpointableRunnable) ListCheckpoints(ctx context.Context) ([]*Checkpoint, error)
- func (cr *CheckpointableRunnable) LoadCheckpoint(ctx context.Context, checkpointID string) (*Checkpoint, error)
- func (cr *CheckpointableRunnable) ResumeFromCheckpoint(ctx context.Context, checkpointID string) (any, error)
- func (cr *CheckpointableRunnable) SaveCheckpoint(ctx context.Context, nodeName string, state any) error
- func (cr *CheckpointableRunnable) UpdateState(ctx context.Context, config *Config, values any, asNode string) (*Config, error)
- type CheckpointableStateGraph
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerState
- type CleaningStateSchema
- type CleaningStateSchemaTyped
- type Command
- type CompositeGraph
- type Config
- type Edge
- type Exporter
- type FieldMerger
- type FileCheckpointStore
- func (f *FileCheckpointStore) Clear(ctx context.Context, executionID string) error
- func (f *FileCheckpointStore) Delete(_ context.Context, checkpointID string) error
- func (f *FileCheckpointStore) List(_ context.Context, executionID string) ([]*Checkpoint, error)
- func (f *FileCheckpointStore) Load(_ context.Context, checkpointID string) (*Checkpoint, error)
- func (f *FileCheckpointStore) Save(_ context.Context, checkpoint *Checkpoint) error
- type GraphCallbackHandler
- type GraphInterrupt
- type ListenableNode
- func (ln *ListenableNode) AddListener(listener NodeListener) *ListenableNode
- func (ln *ListenableNode) Execute(ctx context.Context, state any) (any, error)
- func (ln *ListenableNode) GetListeners() []NodeListener
- func (ln *ListenableNode) NotifyListeners(ctx context.Context, event NodeEvent, state any, err error)
- func (ln *ListenableNode) RemoveListener(listener NodeListener)
- type ListenableNodeTyped
- func (ln *ListenableNodeTyped[S]) AddListener(listener NodeListenerTyped[S]) *ListenableNodeTyped[S]
- func (ln *ListenableNodeTyped[S]) AddListenerWithID(listener NodeListenerTyped[S]) string
- func (ln *ListenableNodeTyped[S]) Execute(ctx context.Context, state S) (S, error)
- func (ln *ListenableNodeTyped[S]) GetListenerIDs() []string
- func (ln *ListenableNodeTyped[S]) GetListeners() []NodeListenerTyped[S]
- func (ln *ListenableNodeTyped[S]) NotifyListeners(ctx context.Context, event NodeEvent, state S, err error)
- func (ln *ListenableNodeTyped[S]) RemoveListener(listenerID string)
- func (ln *ListenableNodeTyped[S]) RemoveListenerByFunc(listener NodeListenerTyped[S])
- type ListenableRunnable
- type ListenableRunnableTyped
- func (lr *ListenableRunnableTyped[S]) GetGraph() *Exporter
- func (lr *ListenableRunnableTyped[S]) Invoke(ctx context.Context, initialState S) (S, error)
- func (lr *ListenableRunnableTyped[S]) InvokeWithConfig(ctx context.Context, initialState S, config *Config) (S, error)
- func (lr *ListenableRunnableTyped[S]) SetTracer(tracer *Tracer)
- func (lr *ListenableRunnableTyped[S]) Stream(ctx context.Context, initialState S) <-chan StreamEventTyped[S]
- func (lr *ListenableRunnableTyped[S]) WithTracer(tracer *Tracer) *ListenableRunnableTyped[S]
- type ListenableStateGraph
- func (g *ListenableStateGraph) AddGlobalListener(listener NodeListener)
- func (g *ListenableStateGraph) AddNode(name string, description string, ...) *ListenableNode
- func (g *ListenableStateGraph) CompileListenable() (*ListenableRunnable, error)
- func (g *ListenableStateGraph) GetListenableNode(name string) *ListenableNode
- func (g *ListenableStateGraph) RemoveGlobalListener(listener NodeListener)
- type ListenableStateGraphTyped
- func (g *ListenableStateGraphTyped[S]) AddGlobalListener(listener NodeListenerTyped[S])
- func (g *ListenableStateGraphTyped[S]) AddNode(name string, description string, ...) *ListenableNodeTyped[S]
- func (g *ListenableStateGraphTyped[S]) CompileListenable() (*ListenableRunnableTyped[S], error)
- func (g *ListenableStateGraphTyped[S]) GetListenableNode(name string) *ListenableNodeTyped[S]
- func (g *ListenableStateGraphTyped[S]) RemoveGlobalListener(listener NodeListenerTyped[S])
- func (g *ListenableStateGraphTyped[S]) RemoveGlobalListenerByID(listenerID string)
- type LogLevel
- type LoggingListener
- type MapReduceNode
- type MapSchema
- type MemoryCheckpointStore
- func (m *MemoryCheckpointStore) Clear(_ context.Context, executionID string) error
- func (m *MemoryCheckpointStore) Delete(_ context.Context, checkpointID string) error
- func (m *MemoryCheckpointStore) List(_ context.Context, executionID string) ([]*Checkpoint, error)
- func (m *MemoryCheckpointStore) Load(_ context.Context, checkpointID string) (*Checkpoint, error)
- func (m *MemoryCheckpointStore) Save(_ context.Context, checkpoint *Checkpoint) error
- type MermaidOptions
- type MessageWithID
- type MetricsListener
- func (ml *MetricsListener) GetNodeAverageDuration() map[string]time.Duration
- func (ml *MetricsListener) GetNodeErrors() map[string]int
- func (ml *MetricsListener) GetNodeExecutions() map[string]int
- func (ml *MetricsListener) GetTotalExecutions() int
- func (ml *MetricsListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, _ any, _ error)
- func (ml *MetricsListener) PrintSummary(writer io.Writer)
- func (ml *MetricsListener) Reset()
- type NoOpCallbackHandler
- func (n *NoOpCallbackHandler) OnChainEnd(ctx context.Context, outputs map[string]any, runID string)
- func (n *NoOpCallbackHandler) OnChainError(ctx context.Context, err error, runID string)
- func (n *NoOpCallbackHandler) OnChainStart(ctx context.Context, serialized map[string]any, inputs map[string]any, ...)
- func (n *NoOpCallbackHandler) OnLLMEnd(ctx context.Context, response any, runID string)
- func (n *NoOpCallbackHandler) OnLLMError(ctx context.Context, err error, runID string)
- func (n *NoOpCallbackHandler) OnLLMStart(ctx context.Context, serialized map[string]any, prompts []string, runID string, ...)
- func (n *NoOpCallbackHandler) OnRetrieverEnd(ctx context.Context, documents []any, runID string)
- func (n *NoOpCallbackHandler) OnRetrieverError(ctx context.Context, err error, runID string)
- func (n *NoOpCallbackHandler) OnRetrieverStart(ctx context.Context, serialized map[string]any, query string, runID string, ...)
- func (n *NoOpCallbackHandler) OnToolEnd(ctx context.Context, output string, runID string)
- func (n *NoOpCallbackHandler) OnToolError(ctx context.Context, err error, runID string)
- func (n *NoOpCallbackHandler) OnToolStart(ctx context.Context, serialized map[string]any, inputStr string, runID string, ...)
- type Node
- type NodeEvent
- type NodeInterrupt
- type NodeListener
- type NodeListenerFunc
- type NodeListenerTyped
- type NodeListenerTypedFunc
- type NodeTyped
- type ParallelNode
- type ProgressListener
- func (pl *ProgressListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state any, err error)
- func (pl *ProgressListener) SetNodeStep(nodeName, step string)
- func (pl *ProgressListener) WithDetails(enabled bool) *ProgressListener
- func (pl *ProgressListener) WithPrefix(prefix string) *ProgressListener
- func (pl *ProgressListener) WithTiming(enabled bool) *ProgressListener
- type RateLimiter
- type RecursiveSubgraph
- type Reducer
- type RetryConfig
- type RetryNode
- type RetryPolicy
- type Runnable
- type StateGraph
- func (g *StateGraph) AddConditionalEdge(from string, condition func(ctx context.Context, state any) string)
- func (g *StateGraph) AddEdge(from, to string)
- func (g *StateGraph) AddMapReduceNode(name string, mapFunctions map[string]func(context.Context, any) (any, error), ...)
- func (g *StateGraph) AddNestedConditionalSubgraph(name string, router func(any) string, subgraphs map[string]*StateGraph) error
- func (g *StateGraph) AddNode(name string, description string, ...)
- func (g *StateGraph) AddNodeWithCircuitBreaker(name string, description string, fn func(context.Context, any) (any, error), ...)
- func (g *StateGraph) AddNodeWithRateLimit(name string, description string, fn func(context.Context, any) (any, error), ...)
- func (g *StateGraph) AddNodeWithRetry(name string, description string, fn func(context.Context, any) (any, error), ...)
- func (g *StateGraph) AddNodeWithTimeout(name string, description string, fn func(context.Context, any) (any, error), ...)
- func (g *StateGraph) AddParallelNodes(groupName string, nodes map[string]func(context.Context, any) (any, error))
- func (g *StateGraph) AddRecursiveSubgraph(name string, maxDepth int, condition func(any, int) bool, ...)
- func (g *StateGraph) AddSubgraph(name string, subgraph *StateGraph) error
- func (g *StateGraph) Compile() (*StateRunnable, error)
- func (g *StateGraph) CreateSubgraph(name string, builder func(*StateGraph)) error
- func (g *StateGraph) FanOutFanIn(source string, _ []string, collector string, ...)
- func (g *StateGraph) SetEntryPoint(name string)
- func (g *StateGraph) SetRetryPolicy(policy *RetryPolicy)
- func (g *StateGraph) SetSchema(schema StateSchema)
- func (g *StateGraph) SetStateMerger(merger StateMerger)
- type StateGraphTyped
- func (g *StateGraphTyped[S]) AddConditionalEdge(from string, condition func(ctx context.Context, state S) string)
- func (g *StateGraphTyped[S]) AddEdge(from, to string)
- func (g *StateGraphTyped[S]) AddNode(name string, description string, ...)
- func (g *StateGraphTyped[S]) Compile() (*StateRunnableTyped[S], error)
- func (g *StateGraphTyped[S]) SetEntryPoint(name string)
- func (g *StateGraphTyped[S]) SetRetryPolicy(policy *RetryPolicy)
- func (g *StateGraphTyped[S]) SetSchema(schema StateSchemaTyped[S])
- func (g *StateGraphTyped[S]) SetStateMerger(merger StateMergerTyped[S])
- type StateMerger
- type StateMergerTyped
- type StateRunnable
- func (r *StateRunnable) Invoke(ctx context.Context, initialState any) (any, error)
- func (r *StateRunnable) InvokeWithConfig(ctx context.Context, initialState any, config *Config) (any, error)
- func (r *StateRunnable) SetTracer(tracer *Tracer)
- func (r *StateRunnable) WithTracer(tracer *Tracer) *StateRunnable
- type StateRunnableTyped
- func (r *StateRunnableTyped[S]) Invoke(ctx context.Context, initialState S) (S, error)
- func (r *StateRunnableTyped[S]) InvokeWithConfig(ctx context.Context, initialState S, config *Config) (S, error)
- func (r *StateRunnableTyped[S]) SetTracer(tracer *Tracer)
- func (r *StateRunnableTyped[S]) WithTracer(tracer *Tracer) *StateRunnableTyped[S]
- type StateSchema
- type StateSchemaTyped
- type StateSnapshot
- type StreamConfig
- type StreamEvent
- type StreamEventTyped
- type StreamMode
- type StreamResult
- type StreamingExecutor
- type StreamingListener
- func (sl *StreamingListener) Close()
- func (sl *StreamingListener) GetDroppedEventsCount() int
- func (sl *StreamingListener) OnChainEnd(ctx context.Context, outputs map[string]any, runID string)
- func (sl *StreamingListener) OnChainError(ctx context.Context, err error, runID string)
- func (sl *StreamingListener) OnChainStart(ctx context.Context, serialized map[string]any, inputs map[string]any, ...)
- func (sl *StreamingListener) OnGraphStep(ctx context.Context, stepNode string, state any)
- func (sl *StreamingListener) OnLLMEnd(ctx context.Context, response any, runID string)
- func (sl *StreamingListener) OnLLMError(ctx context.Context, err error, runID string)
- func (sl *StreamingListener) OnLLMStart(ctx context.Context, serialized map[string]any, prompts []string, runID string, ...)
- func (sl *StreamingListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state any, err error)
- func (sl *StreamingListener) OnRetrieverEnd(ctx context.Context, documents []any, runID string)
- func (sl *StreamingListener) OnRetrieverError(ctx context.Context, err error, runID string)
- func (sl *StreamingListener) OnRetrieverStart(ctx context.Context, serialized map[string]any, query string, runID string, ...)
- func (sl *StreamingListener) OnToolEnd(ctx context.Context, output string, runID string)
- func (sl *StreamingListener) OnToolError(ctx context.Context, err error, runID string)
- func (sl *StreamingListener) OnToolStart(ctx context.Context, serialized map[string]any, inputStr string, runID string, ...)
- type StreamingListenerTyped
- type StreamingRunnable
- type StreamingStateGraph
- type StructSchema
- type Subgraph
- type TimeoutNode
- type TraceEvent
- type TraceHook
- type TraceHookFunc
- type TraceSpan
- type TracedRunnable
- type Tracer
- func (t *Tracer) AddHook(hook TraceHook)
- func (t *Tracer) Clear()
- func (t *Tracer) EndSpan(ctx context.Context, span *TraceSpan, state any, err error)
- func (t *Tracer) GetSpans() map[string]*TraceSpan
- func (t *Tracer) StartSpan(ctx context.Context, event TraceEvent, nodeName string) *TraceSpan
- func (t *Tracer) TraceEdgeTraversal(ctx context.Context, fromNode, toNode string)
Examples ¶
Constants ¶
const END = "END"
END is a special constant used to represent the end node in the graph.
Variables ¶
var ( // ErrEntryPointNotSet is returned when the entry point of the graph is not set. ErrEntryPointNotSet = errors.New("entry point not set") // ErrNodeNotFound is returned when a node is not found in the graph. ErrNodeNotFound = errors.New("node not found") // ErrNoOutgoingEdge is returned when no outgoing edge is found for a node. ErrNoOutgoingEdge = errors.New("no outgoing edge found for node") )
Functions ¶
func AddMessages ¶
AddMessages is a reducer designed for merging chat messages. It handles ID-based deduplication and upserts. If a new message has the same ID as an existing one, it replaces the existing one. Otherwise, it appends the new message.
func AppendReducer ¶
AppendReducer appends the new value to the current slice. It supports appending a slice to a slice, or a single element to a slice.
func AppendSliceMerge ¶ added in v0.6.0
AppendSliceMerge appends new slice to current slice.
func ContextWithSpan ¶
ContextWithSpan returns a new context with the span stored
func DefaultStructMerge ¶ added in v0.6.0
DefaultStructMerge provides a default merge function for struct states. It uses reflection to merge non-zero fields from new into current. This is a sensible default for most struct types.
Example:
current := MyState{Count: 5, Name: "Old"}
new := MyState{Count: 3} // Name is zero value
result, _ := DefaultStructMerge(current, new)
// result.Count = 3 (overwritten)
// result.Name = "Old" (preserved, because new.Name was zero)
func ExponentialBackoffRetry ¶
func ExponentialBackoffRetry( ctx context.Context, fn func() (any, error), maxAttempts int, baseDelay time.Duration, ) (any, error)
ExponentialBackoffRetry implements exponential backoff with jitter
func GetResumeValue ¶
GetResumeValue retrieves the resume value from the context.
func Interrupt ¶
Interrupt pauses execution and waits for input. If resuming, it returns the value provided in the resume command.
func KeepCurrentMerge ¶ added in v0.6.0
KeepCurrentMerge always keeps the current value (ignores new).
func MaxIntMerge ¶ added in v0.6.0
MaxIntMerge takes the maximum of two integer values.
func MinIntMerge ¶ added in v0.6.0
MinIntMerge takes the minimum of two integer values.
func OverwriteMerge ¶ added in v0.6.0
OverwriteMerge always uses the new value.
func OverwriteReducer ¶
OverwriteReducer replaces the old value with the new one.
func OverwriteStructMerge ¶ added in v0.6.0
OverwriteStructMerge is a merge function that completely replaces the current state with the new state.
func SafeGo ¶ added in v0.6.0
SafeGo runs a function in a goroutine with panic recovery. It uses a WaitGroup (if provided) and supports a custom panic handler.
func SumIntMerge ¶ added in v0.6.0
SumIntMerge adds two integer values.
func WithConfig ¶
WithConfig adds the config to the context
Types ¶
type BackoffStrategy ¶
type BackoffStrategy int
BackoffStrategy defines different backoff strategies
const ( FixedBackoff BackoffStrategy = iota ExponentialBackoff LinearBackoff )
type CallbackHandler ¶
type CallbackHandler interface {
// Chain callbacks (for graph/workflow execution)
OnChainStart(ctx context.Context, serialized map[string]any, inputs map[string]any, runID string, parentRunID *string, tags []string, metadata map[string]any)
OnChainEnd(ctx context.Context, outputs map[string]any, runID string)
OnChainError(ctx context.Context, err error, runID string)
// LLM callbacks (for AI model calls)
OnLLMStart(ctx context.Context, serialized map[string]any, prompts []string, runID string, parentRunID *string, tags []string, metadata map[string]any)
OnLLMEnd(ctx context.Context, response any, runID string)
OnLLMError(ctx context.Context, err error, runID string)
// Tool callbacks (for tool/function calls)
OnToolStart(ctx context.Context, serialized map[string]any, inputStr string, runID string, parentRunID *string, tags []string, metadata map[string]any)
OnToolEnd(ctx context.Context, output string, runID string)
OnToolError(ctx context.Context, err error, runID string)
// Retriever callbacks (for data retrieval operations)
OnRetrieverStart(ctx context.Context, serialized map[string]any, query string, runID string, parentRunID *string, tags []string, metadata map[string]any)
OnRetrieverEnd(ctx context.Context, documents []any, runID string)
OnRetrieverError(ctx context.Context, err error, runID string)
}
CallbackHandler defines the interface for handling graph execution callbacks This matches Python's LangChain callback pattern
type ChatListener ¶
type ChatListener struct {
// contains filtered or unexported fields
}
ChatListener provides real-time chat-style updates
func NewChatListener ¶
func NewChatListener() *ChatListener
NewChatListener creates a new chat-style listener
func NewChatListenerWithWriter ¶
func NewChatListenerWithWriter(writer io.Writer) *ChatListener
NewChatListenerWithWriter creates a chat listener with custom writer
func (*ChatListener) OnNodeEvent ¶
func (cl *ChatListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, _ any, err error)
OnNodeEvent implements the NodeListener interface
func (*ChatListener) SetNodeMessage ¶
func (cl *ChatListener) SetNodeMessage(nodeName, message string)
SetNodeMessage sets a custom message for a specific node
func (*ChatListener) WithTime ¶
func (cl *ChatListener) WithTime(enabled bool) *ChatListener
WithTime enables or disables timestamps
type Checkpoint ¶
type Checkpoint struct {
ID string `json:"id"`
NodeName string `json:"node_name"`
State any `json:"state"`
Metadata map[string]any `json:"metadata"`
Timestamp time.Time `json:"timestamp"`
Version int `json:"version"`
}
Checkpoint represents a saved state at a specific point in execution
type CheckpointConfig ¶
type CheckpointConfig struct {
// Store is the checkpoint storage backend
Store CheckpointStore
// AutoSave enables automatic checkpointing after each node
AutoSave bool
// SaveInterval specifies how often to save (when AutoSave is false)
SaveInterval time.Duration
// MaxCheckpoints limits the number of checkpoints to keep
MaxCheckpoints int
}
CheckpointConfig configures checkpointing behavior
func DefaultCheckpointConfig ¶
func DefaultCheckpointConfig() CheckpointConfig
DefaultCheckpointConfig returns a default checkpoint configuration
type CheckpointListener ¶
type CheckpointListener struct {
// Embed NoOpCallbackHandler to satisfy other CallbackHandler methods
NoOpCallbackHandler
// contains filtered or unexported fields
}
CheckpointListener automatically creates checkpoints during execution
func (*CheckpointListener) OnGraphStep ¶
func (cl *CheckpointListener) OnGraphStep(ctx context.Context, stepNode string, state any)
OnGraphStep implements GraphCallbackHandler
type CheckpointStore ¶
type CheckpointStore interface {
// Save stores a checkpoint
Save(ctx context.Context, checkpoint *Checkpoint) error
// Load retrieves a checkpoint by ID
Load(ctx context.Context, checkpointID string) (*Checkpoint, error)
// List returns all checkpoints for a given execution
List(ctx context.Context, executionID string) ([]*Checkpoint, error)
// Delete removes a checkpoint
Delete(ctx context.Context, checkpointID string) error
// Clear removes all checkpoints for an execution
Clear(ctx context.Context, executionID string) error
}
CheckpointStore defines the interface for checkpoint persistence
type CheckpointableRunnable ¶
type CheckpointableRunnable struct {
// contains filtered or unexported fields
}
CheckpointableRunnable wraps a runnable with checkpointing capabilities
func NewCheckpointableRunnable ¶
func NewCheckpointableRunnable(runnable *ListenableRunnable, config CheckpointConfig) *CheckpointableRunnable
NewCheckpointableRunnable creates a new checkpointable runnable
func (*CheckpointableRunnable) ClearCheckpoints ¶
func (cr *CheckpointableRunnable) ClearCheckpoints(ctx context.Context) error
ClearCheckpoints removes all checkpoints for this execution
func (*CheckpointableRunnable) GetState ¶
func (cr *CheckpointableRunnable) GetState(ctx context.Context, config *Config) (*StateSnapshot, error)
GetState retrieves the state for the given config
func (*CheckpointableRunnable) InvokeWithConfig ¶
func (cr *CheckpointableRunnable) InvokeWithConfig(ctx context.Context, initialState any, config *Config) (any, error)
InvokeWithConfig executes the graph with checkpointing and config
func (*CheckpointableRunnable) ListCheckpoints ¶
func (cr *CheckpointableRunnable) ListCheckpoints(ctx context.Context) ([]*Checkpoint, error)
ListCheckpoints returns all checkpoints for this execution
func (*CheckpointableRunnable) LoadCheckpoint ¶
func (cr *CheckpointableRunnable) LoadCheckpoint(ctx context.Context, checkpointID string) (*Checkpoint, error)
LoadCheckpoint loads a specific checkpoint
func (*CheckpointableRunnable) ResumeFromCheckpoint ¶
func (cr *CheckpointableRunnable) ResumeFromCheckpoint(ctx context.Context, checkpointID string) (any, error)
ResumeFromCheckpoint resumes execution from a specific checkpoint
func (*CheckpointableRunnable) SaveCheckpoint ¶
func (cr *CheckpointableRunnable) SaveCheckpoint(ctx context.Context, nodeName string, state any) error
SaveCheckpoint manually saves a checkpoint
func (*CheckpointableRunnable) UpdateState ¶
func (cr *CheckpointableRunnable) UpdateState(ctx context.Context, config *Config, values any, asNode string) (*Config, error)
UpdateState updates the state for the given config
type CheckpointableStateGraph ¶ added in v0.6.0
type CheckpointableStateGraph struct {
*ListenableStateGraph
// contains filtered or unexported fields
}
CheckpointableStateGraph extends ListenableStateGraph with checkpointing
func NewCheckpointableStateGraph ¶ added in v0.6.0
func NewCheckpointableStateGraph() *CheckpointableStateGraph
NewCheckpointableStateGraph creates a new checkpointable state graph
func NewCheckpointableStateGraphWithConfig ¶ added in v0.6.0
func NewCheckpointableStateGraphWithConfig(config CheckpointConfig) *CheckpointableStateGraph
NewCheckpointableStateGraphWithConfig creates a checkpointable graph with custom config
func (*CheckpointableStateGraph) CompileCheckpointable ¶ added in v0.6.0
func (g *CheckpointableStateGraph) CompileCheckpointable() (*CheckpointableRunnable, error)
CompileCheckpointable compiles the graph into a checkpointable runnable
func (*CheckpointableStateGraph) GetCheckpointConfig ¶ added in v0.6.0
func (g *CheckpointableStateGraph) GetCheckpointConfig() CheckpointConfig
GetCheckpointConfig returns the current checkpointing configuration
func (*CheckpointableStateGraph) SetCheckpointConfig ¶ added in v0.6.0
func (g *CheckpointableStateGraph) SetCheckpointConfig(config CheckpointConfig)
SetCheckpointConfig updates the checkpointing configuration
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern
func NewCircuitBreaker ¶
func NewCircuitBreaker(node Node, config CircuitBreakerConfig) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
FailureThreshold int // Number of failures before opening
SuccessThreshold int // Number of successes before closing
Timeout time.Duration // Time before attempting to close
HalfOpenMaxCalls int // Max calls in half-open state
}
CircuitBreakerConfig configures circuit breaker behavior
type CircuitBreakerState ¶
type CircuitBreakerState int
CircuitBreakerState represents the state of a circuit breaker
const ( CircuitClosed CircuitBreakerState = iota CircuitOpen CircuitHalfOpen )
type CleaningStateSchema ¶
type CleaningStateSchema interface {
StateSchema
// Cleanup performs any necessary cleanup on the state after a step.
Cleanup(state any) any
}
CleaningStateSchema extends StateSchema with cleanup capabilities. This allows implementing ephemeral channels (values that are cleared after each step).
type CleaningStateSchemaTyped ¶ added in v0.6.0
type CleaningStateSchemaTyped[S any] interface { StateSchemaTyped[S] // Cleanup performs any necessary cleanup on the state after a step. Cleanup(state S) S }
CleaningStateSchemaTyped extends StateSchemaTyped with cleanup capabilities. This allows implementing ephemeral channels (values that are cleared after each step).
type Command ¶
type Command struct {
// Update is the value to update the state with.
// It will be processed by the schema's reducers.
Update any
// Goto specifies the next node(s) to execute.
// If set, it overrides the graph's edges.
// Can be a single string (node name) or []string.
Goto any
}
Command allows a node to dynamically update the state and control the flow. It can be returned by a node function instead of a direct state update.
type CompositeGraph ¶
type CompositeGraph struct {
// contains filtered or unexported fields
}
CompositeGraph allows composing multiple graphs together
func NewCompositeGraph ¶
func NewCompositeGraph() *CompositeGraph
NewCompositeGraph creates a new composite graph
func (*CompositeGraph) AddGraph ¶
func (cg *CompositeGraph) AddGraph(name string, graph *StateGraph)
AddGraph adds a named graph to the composite
func (*CompositeGraph) Compile ¶
func (cg *CompositeGraph) Compile() (*Runnable, error)
Compile compiles the composite graph into a single runnable
type Config ¶
type Config struct {
// Callbacks to be invoked during execution
Callbacks []CallbackHandler `json:"callbacks"`
// Metadata to attach to the execution
Metadata map[string]any `json:"metadata"`
// Tags to categorize the execution
Tags []string `json:"tags"`
// Configurable parameters for the execution
Configurable map[string]any `json:"configurable"`
// RunName for this execution
RunName string `json:"run_name"`
// Timeout for the execution
Timeout *time.Duration `json:"timeout"`
// InterruptBefore nodes to stop before execution
InterruptBefore []string `json:"interrupt_before"`
// InterruptAfter nodes to stop after execution
InterruptAfter []string `json:"interrupt_after"`
// ResumeFrom nodes to start execution from (bypassing entry point)
ResumeFrom []string `json:"resume_from"`
// ResumeValue provides the value to return from an Interrupt() call when resuming
ResumeValue any `json:"resume_value"`
}
Config represents configuration for graph invocation This matches Python's config dict pattern
type Edge ¶
type Edge struct {
// From is the name of the node from which the edge originates.
From string
// To is the name of the node to which the edge points.
To string
}
Edge represents an edge in the graph.
type Exporter ¶
type Exporter struct {
// contains filtered or unexported fields
}
Exporter provides methods to export graphs in different formats
func NewExporter ¶
func NewExporter(graph *StateGraph) *Exporter
NewExporter creates a new graph exporter for the given graph
func (*Exporter) DrawMermaid ¶
DrawMermaid generates a Mermaid diagram representation of the graph
func (*Exporter) DrawMermaidWithOptions ¶
func (ge *Exporter) DrawMermaidWithOptions(opts MermaidOptions) string
DrawMermaidWithOptions generates a Mermaid diagram with custom options
type FieldMerger ¶ added in v0.6.0
type FieldMerger[S any] struct { InitialValue S FieldMergeFns map[string]func(currentVal, newVal reflect.Value) reflect.Value }
FieldMerger provides fine-grained control over how individual struct fields are merged.
func NewFieldMerger ¶ added in v0.6.0
func NewFieldMerger[S any](initial S) *FieldMerger[S]
NewFieldMerger creates a new FieldMerger with the given initial value.
func (*FieldMerger[S]) Init ¶ added in v0.6.0
func (fm *FieldMerger[S]) Init() S
Init returns the initial state.
func (*FieldMerger[S]) RegisterFieldMerge ¶ added in v0.6.0
func (fm *FieldMerger[S]) RegisterFieldMerge(fieldName string, mergeFn func(currentVal, newVal reflect.Value) reflect.Value)
RegisterFieldMerge registers a custom merge function for a specific field.
func (*FieldMerger[S]) Update ¶ added in v0.6.0
func (fm *FieldMerger[S]) Update(current, new S) (S, error)
Update merges the new state into the current state using registered field merge functions.
type FileCheckpointStore ¶
type FileCheckpointStore struct {
// contains filtered or unexported fields
}
FileCheckpointStore provides file-based checkpoint storage
func NewFileCheckpointStore ¶
func NewFileCheckpointStore(path string) (*FileCheckpointStore, error)
NewFileCheckpointStore creates a new file-based checkpoint store
func (*FileCheckpointStore) Clear ¶
func (f *FileCheckpointStore) Clear(ctx context.Context, executionID string) error
Clear implements CheckpointStore interface for file storage
func (*FileCheckpointStore) Delete ¶
func (f *FileCheckpointStore) Delete(_ context.Context, checkpointID string) error
Delete implements CheckpointStore interface for file storage
func (*FileCheckpointStore) List ¶
func (f *FileCheckpointStore) List(_ context.Context, executionID string) ([]*Checkpoint, error)
List implements CheckpointStore interface for file storage
func (*FileCheckpointStore) Load ¶
func (f *FileCheckpointStore) Load(_ context.Context, checkpointID string) (*Checkpoint, error)
Load implements CheckpointStore interface for file storage
func (*FileCheckpointStore) Save ¶
func (f *FileCheckpointStore) Save(_ context.Context, checkpoint *Checkpoint) error
Save implements CheckpointStore interface for file storage
type GraphCallbackHandler ¶
type GraphCallbackHandler interface {
CallbackHandler
// OnGraphStep is called after a step (node execution + state update) is completed
OnGraphStep(ctx context.Context, stepNode string, state any)
}
GraphCallbackHandler extends CallbackHandler with graph-specific events
type GraphInterrupt ¶
type GraphInterrupt struct {
// Node that caused the interruption
Node string
// State at the time of interruption
State any
// NextNodes that would have been executed if not interrupted
NextNodes []string
// InterruptValue is the value provided by the dynamic interrupt (if any)
InterruptValue any
}
GraphInterrupt is returned when execution is interrupted by configuration or dynamic interrupt
func (*GraphInterrupt) Error ¶
func (e *GraphInterrupt) Error() string
type ListenableNode ¶
type ListenableNode struct {
Node
// contains filtered or unexported fields
}
ListenableNode extends Node with listener capabilities
func NewListenableNode ¶
func NewListenableNode(node Node) *ListenableNode
NewListenableNode creates a new listenable node from a regular node
func (*ListenableNode) AddListener ¶
func (ln *ListenableNode) AddListener(listener NodeListener) *ListenableNode
AddListener adds a listener to the node
func (*ListenableNode) GetListeners ¶
func (ln *ListenableNode) GetListeners() []NodeListener
GetListeners returns a copy of the current listeners
func (*ListenableNode) NotifyListeners ¶
func (ln *ListenableNode) NotifyListeners(ctx context.Context, event NodeEvent, state any, err error)
NotifyListeners notifies all listeners of an event
func (*ListenableNode) RemoveListener ¶
func (ln *ListenableNode) RemoveListener(listener NodeListener)
RemoveListener removes a listener from the node
type ListenableNodeTyped ¶ added in v0.6.0
ListenableNodeTyped extends NodeTyped with listener capabilities
func NewListenableNodeTyped ¶ added in v0.6.0
func NewListenableNodeTyped[S any](node NodeTyped[S]) *ListenableNodeTyped[S]
NewListenableNodeTyped creates a new listenable node from a regular typed node
func (*ListenableNodeTyped[S]) AddListener ¶ added in v0.6.0
func (ln *ListenableNodeTyped[S]) AddListener(listener NodeListenerTyped[S]) *ListenableNodeTyped[S]
AddListener adds a listener to the node and returns the listenable node for chaining
func (*ListenableNodeTyped[S]) AddListenerWithID ¶ added in v0.6.0
func (ln *ListenableNodeTyped[S]) AddListenerWithID(listener NodeListenerTyped[S]) string
AddListenerWithID adds a listener to the node and returns its ID
func (*ListenableNodeTyped[S]) Execute ¶ added in v0.6.0
func (ln *ListenableNodeTyped[S]) Execute(ctx context.Context, state S) (S, error)
Execute runs the node function with listener notifications
func (*ListenableNodeTyped[S]) GetListenerIDs ¶ added in v0.6.0
func (ln *ListenableNodeTyped[S]) GetListenerIDs() []string
GetListenerIDs returns a copy of the current listener IDs
func (*ListenableNodeTyped[S]) GetListeners ¶ added in v0.6.0
func (ln *ListenableNodeTyped[S]) GetListeners() []NodeListenerTyped[S]
GetListeners returns a copy of the current listeners
func (*ListenableNodeTyped[S]) NotifyListeners ¶ added in v0.6.0
func (ln *ListenableNodeTyped[S]) NotifyListeners(ctx context.Context, event NodeEvent, state S, err error)
NotifyListeners notifies all listeners of an event
func (*ListenableNodeTyped[S]) RemoveListener ¶ added in v0.6.0
func (ln *ListenableNodeTyped[S]) RemoveListener(listenerID string)
RemoveListener removes a listener from the node by ID
func (*ListenableNodeTyped[S]) RemoveListenerByFunc ¶ added in v0.6.0
func (ln *ListenableNodeTyped[S]) RemoveListenerByFunc(listener NodeListenerTyped[S])
RemoveListenerByFunc removes a listener from the node by comparing pointer values
type ListenableRunnable ¶
type ListenableRunnable struct {
// contains filtered or unexported fields
}
ListenableRunnable wraps a Runnable with listener capabilities
func (*ListenableRunnable) GetGraph ¶
func (lr *ListenableRunnable) GetGraph() *Exporter
GetGraph returns a Exporter for visualization
func (*ListenableRunnable) InvokeWithConfig ¶
func (lr *ListenableRunnable) InvokeWithConfig(ctx context.Context, initialState any, config *Config) (any, error)
InvokeWithConfig executes the graph with listener notifications and config
type ListenableRunnableTyped ¶ added in v0.6.0
type ListenableRunnableTyped[S any] struct { // contains filtered or unexported fields }
ListenableRunnableTyped wraps a StateRunnableTyped with listener capabilities
func (*ListenableRunnableTyped[S]) GetGraph ¶ added in v0.6.0
func (lr *ListenableRunnableTyped[S]) GetGraph() *Exporter
GetGraph returns an Exporter for visualization
func (*ListenableRunnableTyped[S]) Invoke ¶ added in v0.6.0
func (lr *ListenableRunnableTyped[S]) Invoke(ctx context.Context, initialState S) (S, error)
Invoke executes the graph with listener notifications
func (*ListenableRunnableTyped[S]) InvokeWithConfig ¶ added in v0.6.0
func (lr *ListenableRunnableTyped[S]) InvokeWithConfig(ctx context.Context, initialState S, config *Config) (S, error)
InvokeWithConfig executes the graph with listener notifications and config
func (*ListenableRunnableTyped[S]) SetTracer ¶ added in v0.6.0
func (lr *ListenableRunnableTyped[S]) SetTracer(tracer *Tracer)
SetTracer sets a tracer for the underlying runnable
func (*ListenableRunnableTyped[S]) Stream ¶ added in v0.6.0
func (lr *ListenableRunnableTyped[S]) Stream(ctx context.Context, initialState S) <-chan StreamEventTyped[S]
Stream executes the graph with listener notifications and streams events
func (*ListenableRunnableTyped[S]) WithTracer ¶ added in v0.6.0
func (lr *ListenableRunnableTyped[S]) WithTracer(tracer *Tracer) *ListenableRunnableTyped[S]
WithTracer returns a new ListenableRunnableTyped with the given tracer
type ListenableStateGraph ¶
type ListenableStateGraph struct {
*StateGraph
// contains filtered or unexported fields
}
ListenableStateGraph extends StateGraph with listener capabilities
func NewListenableStateGraph ¶
func NewListenableStateGraph() *ListenableStateGraph
NewListenableStateGraph creates a new state graph with listener support
func (*ListenableStateGraph) AddGlobalListener ¶ added in v0.6.0
func (g *ListenableStateGraph) AddGlobalListener(listener NodeListener)
AddGlobalListener adds a listener to all nodes in the graph
func (*ListenableStateGraph) AddNode ¶ added in v0.6.0
func (g *ListenableStateGraph) AddNode(name string, description string, fn func(ctx context.Context, state any) (any, error)) *ListenableNode
AddNode adds a node with listener capabilities
func (*ListenableStateGraph) CompileListenable ¶ added in v0.6.0
func (g *ListenableStateGraph) CompileListenable() (*ListenableRunnable, error)
CompileListenable creates a runnable with listener support
func (*ListenableStateGraph) GetListenableNode ¶ added in v0.6.0
func (g *ListenableStateGraph) GetListenableNode(name string) *ListenableNode
GetListenableNode returns the listenable node by name
func (*ListenableStateGraph) RemoveGlobalListener ¶ added in v0.6.0
func (g *ListenableStateGraph) RemoveGlobalListener(listener NodeListener)
RemoveGlobalListener removes a listener from all nodes in the graph
type ListenableStateGraphTyped ¶ added in v0.6.0
type ListenableStateGraphTyped[S any] struct { *StateGraphTyped[S] // contains filtered or unexported fields }
ListenableStateGraphTyped extends StateGraphTyped with listener capabilities
func NewListenableStateGraphTyped ¶ added in v0.6.0
func NewListenableStateGraphTyped[S any]() *ListenableStateGraphTyped[S]
NewListenableStateGraphTyped creates a new typed state graph with listener support
func (*ListenableStateGraphTyped[S]) AddGlobalListener ¶ added in v0.6.0
func (g *ListenableStateGraphTyped[S]) AddGlobalListener(listener NodeListenerTyped[S])
AddGlobalListener adds a listener to all nodes in the graph
func (*ListenableStateGraphTyped[S]) AddNode ¶ added in v0.6.0
func (g *ListenableStateGraphTyped[S]) AddNode(name string, description string, fn func(ctx context.Context, state S) (S, error)) *ListenableNodeTyped[S]
AddNode adds a node with listener capabilities
func (*ListenableStateGraphTyped[S]) CompileListenable ¶ added in v0.6.0
func (g *ListenableStateGraphTyped[S]) CompileListenable() (*ListenableRunnableTyped[S], error)
CompileListenable creates a runnable with listener support
func (*ListenableStateGraphTyped[S]) GetListenableNode ¶ added in v0.6.0
func (g *ListenableStateGraphTyped[S]) GetListenableNode(name string) *ListenableNodeTyped[S]
GetListenableNode returns the listenable node by name
func (*ListenableStateGraphTyped[S]) RemoveGlobalListener ¶ added in v0.6.0
func (g *ListenableStateGraphTyped[S]) RemoveGlobalListener(listener NodeListenerTyped[S])
RemoveGlobalListener removes a listener from all nodes in the graph by function reference
func (*ListenableStateGraphTyped[S]) RemoveGlobalListenerByID ¶ added in v0.6.0
func (g *ListenableStateGraphTyped[S]) RemoveGlobalListenerByID(listenerID string)
RemoveGlobalListenerByID removes a listener from all nodes in the graph by ID
type LoggingListener ¶
type LoggingListener struct {
// contains filtered or unexported fields
}
LoggingListener provides structured logging for node events
func NewLoggingListener ¶
func NewLoggingListener() *LoggingListener
NewLoggingListener creates a new logging listener
func NewLoggingListenerWithLogger ¶
func NewLoggingListenerWithLogger(logger *log.Logger) *LoggingListener
NewLoggingListenerWithLogger creates a logging listener with custom logger
func (*LoggingListener) OnNodeEvent ¶
func (ll *LoggingListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state any, err error)
OnNodeEvent implements the NodeListener interface
func (*LoggingListener) WithLogLevel ¶
func (ll *LoggingListener) WithLogLevel(level LogLevel) *LoggingListener
WithLogLevel sets the minimum log level
func (*LoggingListener) WithState ¶
func (ll *LoggingListener) WithState(enabled bool) *LoggingListener
WithState enables or disables state logging
type MapReduceNode ¶
type MapReduceNode struct {
// contains filtered or unexported fields
}
MapReduceNode executes nodes in parallel and reduces results
func NewMapReduceNode ¶
func NewMapReduceNode(name string, reducer func([]any) (any, error), mapNodes ...Node) *MapReduceNode
NewMapReduceNode creates a new map-reduce node
type MapSchema ¶
MapSchema implements StateSchema for map[string]any. It allows defining reducers for specific keys.
func (*MapSchema) RegisterChannel ¶
RegisterChannel adds a channel definition (reducer + ephemeral flag).
func (*MapSchema) RegisterReducer ¶
RegisterReducer adds a reducer for a specific key.
type MemoryCheckpointStore ¶
type MemoryCheckpointStore struct {
// contains filtered or unexported fields
}
MemoryCheckpointStore provides in-memory checkpoint storage
func NewMemoryCheckpointStore ¶
func NewMemoryCheckpointStore() *MemoryCheckpointStore
NewMemoryCheckpointStore creates a new in-memory checkpoint store
func (*MemoryCheckpointStore) Clear ¶
func (m *MemoryCheckpointStore) Clear(_ context.Context, executionID string) error
Clear implements CheckpointStore interface
func (*MemoryCheckpointStore) Delete ¶
func (m *MemoryCheckpointStore) Delete(_ context.Context, checkpointID string) error
Delete implements CheckpointStore interface
func (*MemoryCheckpointStore) List ¶
func (m *MemoryCheckpointStore) List(_ context.Context, executionID string) ([]*Checkpoint, error)
List implements CheckpointStore interface
func (*MemoryCheckpointStore) Load ¶
func (m *MemoryCheckpointStore) Load(_ context.Context, checkpointID string) (*Checkpoint, error)
Load implements CheckpointStore interface
func (*MemoryCheckpointStore) Save ¶
func (m *MemoryCheckpointStore) Save(_ context.Context, checkpoint *Checkpoint) error
Save implements CheckpointStore interface
type MermaidOptions ¶
type MermaidOptions struct {
// Direction of the flowchart (e.g., "TD", "LR")
Direction string
}
MermaidOptions defines configuration for Mermaid diagram generation
type MessageWithID ¶
type MessageWithID interface {
GetID() string
GetContent() llms.MessageContent
}
MessageWithID is an interface that allows messages to have an ID for deduplication/upsert. Since langchaingo's MessageContent doesn't have an ID field, we can wrap it or use a custom struct. For now, we'll check if the message implements this interface or is a map with an "id" key.
type MetricsListener ¶
type MetricsListener struct {
// contains filtered or unexported fields
}
MetricsListener collects performance and execution metrics
func NewMetricsListener ¶
func NewMetricsListener() *MetricsListener
NewMetricsListener creates a new metrics listener
func (*MetricsListener) GetNodeAverageDuration ¶
func (ml *MetricsListener) GetNodeAverageDuration() map[string]time.Duration
GetNodeAverageDuration returns the average duration for each node
func (*MetricsListener) GetNodeErrors ¶
func (ml *MetricsListener) GetNodeErrors() map[string]int
GetNodeErrors returns the number of errors for each node
func (*MetricsListener) GetNodeExecutions ¶
func (ml *MetricsListener) GetNodeExecutions() map[string]int
GetNodeExecutions returns the number of executions for each node
func (*MetricsListener) GetTotalExecutions ¶
func (ml *MetricsListener) GetTotalExecutions() int
GetTotalExecutions returns the total number of node executions
func (*MetricsListener) OnNodeEvent ¶
func (ml *MetricsListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, _ any, _ error)
OnNodeEvent implements the NodeListener interface
func (*MetricsListener) PrintSummary ¶
func (ml *MetricsListener) PrintSummary(writer io.Writer)
PrintSummary prints a summary of collected metrics
func (*MetricsListener) Reset ¶
func (ml *MetricsListener) Reset()
Reset clears all collected metrics
type NoOpCallbackHandler ¶
type NoOpCallbackHandler struct{}
NoOpCallbackHandler provides a no-op implementation of CallbackHandler
func (*NoOpCallbackHandler) OnChainEnd ¶
func (*NoOpCallbackHandler) OnChainError ¶
func (n *NoOpCallbackHandler) OnChainError(ctx context.Context, err error, runID string)
func (*NoOpCallbackHandler) OnChainStart ¶
func (*NoOpCallbackHandler) OnLLMEnd ¶
func (n *NoOpCallbackHandler) OnLLMEnd(ctx context.Context, response any, runID string)
func (*NoOpCallbackHandler) OnLLMError ¶
func (n *NoOpCallbackHandler) OnLLMError(ctx context.Context, err error, runID string)
func (*NoOpCallbackHandler) OnLLMStart ¶
func (*NoOpCallbackHandler) OnRetrieverEnd ¶
func (n *NoOpCallbackHandler) OnRetrieverEnd(ctx context.Context, documents []any, runID string)
func (*NoOpCallbackHandler) OnRetrieverError ¶
func (n *NoOpCallbackHandler) OnRetrieverError(ctx context.Context, err error, runID string)
func (*NoOpCallbackHandler) OnRetrieverStart ¶
func (*NoOpCallbackHandler) OnToolEnd ¶
func (n *NoOpCallbackHandler) OnToolEnd(ctx context.Context, output string, runID string)
func (*NoOpCallbackHandler) OnToolError ¶
func (n *NoOpCallbackHandler) OnToolError(ctx context.Context, err error, runID string)
type Node ¶
type Node struct {
// Name is the unique identifier for the node.
Name string
// Description describes the functionality of the node.
Description string
// Function is the function associated with the node.
// It takes a context and any state as input and returns the updated state and an error.
Function func(ctx context.Context, state any) (any, error)
}
Node represents a node in the graph.
type NodeEvent ¶
type NodeEvent string
NodeEvent represents different types of node events
const ( // NodeEventStart indicates a node has started execution NodeEventStart NodeEvent = "start" // NodeEventProgress indicates progress during node execution NodeEventProgress NodeEvent = "progress" // NodeEventComplete indicates a node has completed successfully NodeEventComplete NodeEvent = "complete" // NodeEventError indicates a node encountered an error NodeEventError NodeEvent = "error" // EventChainStart indicates the graph execution has started EventChainStart NodeEvent = "chain_start" // EventChainEnd indicates the graph execution has completed EventChainEnd NodeEvent = "chain_end" // EventToolStart indicates a tool execution has started EventToolStart NodeEvent = "tool_start" // EventToolEnd indicates a tool execution has completed EventToolEnd NodeEvent = "tool_end" // EventLLMStart indicates an LLM call has started EventLLMStart NodeEvent = "llm_start" // EventLLMEnd indicates an LLM call has completed EventLLMEnd NodeEvent = "llm_end" // EventToken indicates a generated token (for streaming) EventToken NodeEvent = "token" // EventCustom indicates a custom user-defined event EventCustom NodeEvent = "custom" )
type NodeInterrupt ¶
type NodeInterrupt struct {
// Node is the name of the node that triggered the interrupt
Node string
// Value is the data/query provided by the interrupt
Value any
}
NodeInterrupt is returned when a node requests an interrupt (e.g. waiting for human input).
func (*NodeInterrupt) Error ¶
func (e *NodeInterrupt) Error() string
type NodeListener ¶
type NodeListener interface {
// OnNodeEvent is called when a node event occurs
OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state any, err error)
}
NodeListener defines the interface for node event listeners
type NodeListenerFunc ¶
type NodeListenerFunc func(ctx context.Context, event NodeEvent, nodeName string, state any, err error)
NodeListenerFunc is a function adapter for NodeListener
func (NodeListenerFunc) OnNodeEvent ¶
func (f NodeListenerFunc) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state any, err error)
OnNodeEvent implements the NodeListener interface
type NodeListenerTyped ¶ added in v0.6.0
type NodeListenerTyped[S any] interface { // OnNodeEvent is called when a node event occurs OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state S, err error) }
NodeListenerTyped defines the interface for typed node event listeners
type NodeListenerTypedFunc ¶ added in v0.6.0
type NodeListenerTypedFunc[S any] func(ctx context.Context, event NodeEvent, nodeName string, state S, err error)
NodeListenerTypedFunc is a function adapter for NodeListenerTyped
func (NodeListenerTypedFunc[S]) OnNodeEvent ¶ added in v0.6.0
func (f NodeListenerTypedFunc[S]) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state S, err error)
OnNodeEvent implements the NodeListenerTyped interface
type NodeTyped ¶ added in v0.6.0
type NodeTyped[S any] struct { Name string Description string Function func(ctx context.Context, state S) (S, error) }
NodeTyped represents a typed node in the graph.
type ParallelNode ¶
type ParallelNode struct {
// contains filtered or unexported fields
}
ParallelNode represents a set of nodes that can execute in parallel
func NewParallelNode ¶
func NewParallelNode(name string, nodes ...Node) *ParallelNode
NewParallelNode creates a new parallel node
type ProgressListener ¶
type ProgressListener struct {
// contains filtered or unexported fields
}
ProgressListener provides progress tracking with customizable output
func NewProgressListener ¶
func NewProgressListener() *ProgressListener
NewProgressListener creates a new progress listener
func NewProgressListenerWithWriter ¶
func NewProgressListenerWithWriter(writer io.Writer) *ProgressListener
NewProgressListenerWithWriter creates a progress listener with custom writer
func (*ProgressListener) OnNodeEvent ¶
func (pl *ProgressListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state any, err error)
OnNodeEvent implements the NodeListener interface
func (*ProgressListener) SetNodeStep ¶
func (pl *ProgressListener) SetNodeStep(nodeName, step string)
SetNodeStep sets a custom message for a specific node
func (*ProgressListener) WithDetails ¶
func (pl *ProgressListener) WithDetails(enabled bool) *ProgressListener
WithDetails enables or disables detailed output
func (*ProgressListener) WithPrefix ¶
func (pl *ProgressListener) WithPrefix(prefix string) *ProgressListener
WithPrefix sets a custom prefix for progress messages
func (*ProgressListener) WithTiming ¶
func (pl *ProgressListener) WithTiming(enabled bool) *ProgressListener
WithTiming enables or disables timing information
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements rate limiting for nodes
func NewRateLimiter ¶
func NewRateLimiter(node Node, maxCalls int, window time.Duration) *RateLimiter
NewRateLimiter creates a new rate limiter
type RecursiveSubgraph ¶
type RecursiveSubgraph struct {
// contains filtered or unexported fields
}
RecursiveSubgraph allows a subgraph to call itself recursively
func NewRecursiveSubgraph ¶
func NewRecursiveSubgraph( name string, maxDepth int, condition func(any, int) bool, ) *RecursiveSubgraph
NewRecursiveSubgraph creates a new recursive subgraph
type Reducer ¶
Reducer defines how a state value should be updated. It takes the current value and the new value, and returns the merged value.
type RetryConfig ¶
type RetryConfig struct {
MaxAttempts int
InitialDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
RetryableErrors func(error) bool // Determines if an error should trigger retry
}
RetryConfig configures retry behavior for nodes
func DefaultRetryConfig ¶
func DefaultRetryConfig() *RetryConfig
DefaultRetryConfig returns a default retry configuration
type RetryNode ¶
type RetryNode struct {
// contains filtered or unexported fields
}
RetryNode wraps a node with retry logic
func NewRetryNode ¶
func NewRetryNode(node Node, config *RetryConfig) *RetryNode
NewRetryNode creates a new retry node
type RetryPolicy ¶
type RetryPolicy struct {
MaxRetries int
BackoffStrategy BackoffStrategy
RetryableErrors []string
}
RetryPolicy defines how to handle node failures
type Runnable ¶
type Runnable = StateRunnable
Runnable is an alias for StateRunnable for backward compatibility. All Runnable functionality is now provided by StateRunnable.
type StateGraph ¶
type StateGraph struct {
// Schema defines the state structure and update logic
Schema StateSchema
// contains filtered or unexported fields
}
StateGraph represents a state-based graph similar to Python's LangGraph StateGraph
Example ¶
package main
import (
"context"
"fmt"
"os"
"github.com/smallnest/langgraphgo/graph"
"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/openai"
)
func main() {
// Skip if no OpenAI API key is available
if os.Getenv("OPENAI_API_KEY") == "" {
fmt.Println("[{human [{What is 1 + 1?}]} {ai [{1 + 1 equals 2.}]}]")
return
}
model, err := openai.New()
if err != nil {
panic(err)
}
g := graph.NewStateGraph()
g.AddNode("oracle", "oracle", func(ctx context.Context, state any) (any, error) {
messages := state.([]llms.MessageContent)
r, err := model.GenerateContent(ctx, messages, llms.WithTemperature(0.0))
if err != nil {
return nil, err
}
return append(messages,
llms.TextParts("ai", r.Choices[0].Content),
), nil
})
g.AddNode(graph.END, graph.END, func(_ context.Context, state any) (any, error) {
return state, nil
})
g.AddEdge("oracle", graph.END)
g.SetEntryPoint("oracle")
runnable, err := g.Compile()
if err != nil {
panic(err)
}
ctx := context.Background()
// Let's run it!
res, err := runnable.Invoke(ctx, []llms.MessageContent{
llms.TextParts("human", "What is 1 + 1?"),
})
if err != nil {
panic(err)
}
fmt.Println(res)
}
Output:
func NewMessageGraph ¶
func NewMessageGraph() *StateGraph
NewMessageGraph creates a new instance of StateGraph with a default schema that handles "messages" using the AddMessages reducer. This is the recommended constructor for chat-based agents that use map[string]any as state with a "messages" key.
This replaces the old NewMessageGraphWithSchema() function.
func NewStateGraph ¶
func NewStateGraph() *StateGraph
NewStateGraph creates a new instance of StateGraph without a schema. For chat-based agents that need message handling, use NewMessageGraph() instead.
func (*StateGraph) AddConditionalEdge ¶
func (g *StateGraph) AddConditionalEdge(from string, condition func(ctx context.Context, state any) string)
AddConditionalEdge adds a conditional edge where the target node is determined at runtime
func (*StateGraph) AddEdge ¶
func (g *StateGraph) AddEdge(from, to string)
AddEdge adds a new edge to the state graph between the "from" and "to" nodes
func (*StateGraph) AddMapReduceNode ¶ added in v0.6.0
func (g *StateGraph) AddMapReduceNode( name string, mapFunctions map[string]func(context.Context, any) (any, error), reducer func([]any) (any, error), )
AddMapReduceNode adds a map-reduce pattern node
func (*StateGraph) AddNestedConditionalSubgraph ¶ added in v0.6.0
func (g *StateGraph) AddNestedConditionalSubgraph( name string, router func(any) string, subgraphs map[string]*StateGraph, ) error
NestedConditionalSubgraph creates a subgraph with its own conditional routing
func (*StateGraph) AddNode ¶
func (g *StateGraph) AddNode(name string, description string, fn func(ctx context.Context, state any) (any, error))
AddNode adds a new node to the state graph with the given name, description and function
func (*StateGraph) AddNodeWithCircuitBreaker ¶ added in v0.6.0
func (g *StateGraph) AddNodeWithCircuitBreaker( name string, description string, fn func(context.Context, any) (any, error), config CircuitBreakerConfig, )
AddNodeWithCircuitBreaker adds a node with circuit breaker
func (*StateGraph) AddNodeWithRateLimit ¶ added in v0.6.0
func (g *StateGraph) AddNodeWithRateLimit( name string, description string, fn func(context.Context, any) (any, error), maxCalls int, window time.Duration, )
AddNodeWithRateLimit adds a node with rate limiting
func (*StateGraph) AddNodeWithRetry ¶ added in v0.6.0
func (g *StateGraph) AddNodeWithRetry( name string, description string, fn func(context.Context, any) (any, error), config *RetryConfig, )
AddNodeWithRetry adds a node with retry logic
func (*StateGraph) AddNodeWithTimeout ¶ added in v0.6.0
func (g *StateGraph) AddNodeWithTimeout( name string, description string, fn func(context.Context, any) (any, error), timeout time.Duration, )
AddNodeWithTimeout adds a node with timeout
func (*StateGraph) AddParallelNodes ¶ added in v0.6.0
func (g *StateGraph) AddParallelNodes(groupName string, nodes map[string]func(context.Context, any) (any, error))
AddParallelNodes adds a set of nodes that execute in parallel
func (*StateGraph) AddRecursiveSubgraph ¶ added in v0.6.0
func (g *StateGraph) AddRecursiveSubgraph( name string, maxDepth int, condition func(any, int) bool, builder func(*StateGraph), )
AddRecursiveSubgraph adds a recursive subgraph to the parent graph
func (*StateGraph) AddSubgraph ¶ added in v0.6.0
func (g *StateGraph) AddSubgraph(name string, subgraph *StateGraph) error
AddSubgraph adds a subgraph as a node in the parent graph
func (*StateGraph) Compile ¶
func (g *StateGraph) Compile() (*StateRunnable, error)
Compile compiles the state graph and returns a StateRunnable instance
func (*StateGraph) CreateSubgraph ¶ added in v0.6.0
func (g *StateGraph) CreateSubgraph(name string, builder func(*StateGraph)) error
CreateSubgraph creates and adds a subgraph using a builder function
func (*StateGraph) FanOutFanIn ¶ added in v0.6.0
func (g *StateGraph) FanOutFanIn( source string, _ []string, collector string, workerFuncs map[string]func(context.Context, any) (any, error), collectFunc func([]any) (any, error), )
FanOutFanIn creates a fan-out/fan-in pattern
func (*StateGraph) SetEntryPoint ¶
func (g *StateGraph) SetEntryPoint(name string)
SetEntryPoint sets the entry point node name for the state graph
func (*StateGraph) SetRetryPolicy ¶
func (g *StateGraph) SetRetryPolicy(policy *RetryPolicy)
SetRetryPolicy sets the retry policy for the graph
func (*StateGraph) SetSchema ¶
func (g *StateGraph) SetSchema(schema StateSchema)
SetSchema sets the state schema for the graph
func (*StateGraph) SetStateMerger ¶
func (g *StateGraph) SetStateMerger(merger StateMerger)
SetStateMerger sets the state merger function for the state graph
type StateGraphTyped ¶ added in v0.6.0
type StateGraphTyped[S any] struct { // Schema defines the state structure and update logic Schema StateSchemaTyped[S] // contains filtered or unexported fields }
StateGraphTyped represents a generic state-based graph with compile-time type safety. The type parameter S represents the state type, which is typically a struct.
Example usage:
type MyState struct {
Count int
Name string
}
g := graph.NewStateGraphTyped[MyState]()
g.AddNode("increment", "Increment counter", func(ctx context.Context, state MyState) (MyState, error) {
state.Count++
return state, nil
})
func NewStateGraphTyped ¶ added in v0.6.0
func NewStateGraphTyped[S any]() *StateGraphTyped[S]
NewStateGraphTyped creates a new instance of StateGraphTyped with type safety. The type parameter S specifies the state type.
Example:
g := graph.NewStateGraphTyped[MyState]()
func (*StateGraphTyped[S]) AddConditionalEdge ¶ added in v0.6.0
func (g *StateGraphTyped[S]) AddConditionalEdge(from string, condition func(ctx context.Context, state S) string)
AddConditionalEdge adds a conditional edge where the target node is determined at runtime. The condition function is fully typed - no type assertions needed!
Example:
g.AddConditionalEdge("check", func(ctx context.Context, state MyState) string {
if state.Count > 10 { // Type-safe access!
return "high"
}
return "low"
})
func (*StateGraphTyped[S]) AddEdge ¶ added in v0.6.0
func (g *StateGraphTyped[S]) AddEdge(from, to string)
AddEdge adds a new edge to the state graph between the "from" and "to" nodes.
func (*StateGraphTyped[S]) AddNode ¶ added in v0.6.0
func (g *StateGraphTyped[S]) AddNode(name string, description string, fn func(ctx context.Context, state S) (S, error))
AddNode adds a new node to the state graph with the given name, description and function. The node function is fully typed - no type assertions needed!
Example:
g.AddNode("process", "Process data", func(ctx context.Context, state MyState) (MyState, error) {
state.Count++ // Type-safe access!
return state, nil
})
func (*StateGraphTyped[S]) Compile ¶ added in v0.6.0
func (g *StateGraphTyped[S]) Compile() (*StateRunnableTyped[S], error)
Compile compiles the state graph and returns a StateRunnableTyped instance.
func (*StateGraphTyped[S]) SetEntryPoint ¶ added in v0.6.0
func (g *StateGraphTyped[S]) SetEntryPoint(name string)
SetEntryPoint sets the entry point node name for the state graph.
func (*StateGraphTyped[S]) SetRetryPolicy ¶ added in v0.6.0
func (g *StateGraphTyped[S]) SetRetryPolicy(policy *RetryPolicy)
SetRetryPolicy sets the retry policy for the graph.
func (*StateGraphTyped[S]) SetSchema ¶ added in v0.6.0
func (g *StateGraphTyped[S]) SetSchema(schema StateSchemaTyped[S])
SetSchema sets the state schema for the graph.
func (*StateGraphTyped[S]) SetStateMerger ¶ added in v0.6.0
func (g *StateGraphTyped[S]) SetStateMerger(merger StateMergerTyped[S])
SetStateMerger sets the state merger function for the state graph.
type StateMerger ¶
StateMerger merges multiple state updates into a single state.
type StateMergerTyped ¶ added in v0.6.0
StateMergerTyped is a typed function to merge states from parallel execution.
type StateRunnable ¶
type StateRunnable struct {
// contains filtered or unexported fields
}
StateRunnable represents a compiled state graph that can be invoked StateRunnable represents a compiled state graph that can be invoked
func (*StateRunnable) InvokeWithConfig ¶
func (r *StateRunnable) InvokeWithConfig(ctx context.Context, initialState any, config *Config) (any, error)
InvokeWithConfig executes the compiled state graph with the given input state and config
func (*StateRunnable) SetTracer ¶ added in v0.6.0
func (r *StateRunnable) SetTracer(tracer *Tracer)
SetTracer sets a tracer for observability
func (*StateRunnable) WithTracer ¶ added in v0.6.0
func (r *StateRunnable) WithTracer(tracer *Tracer) *StateRunnable
WithTracer returns a new StateRunnable with the given tracer
type StateRunnableTyped ¶ added in v0.6.0
type StateRunnableTyped[S any] struct { // contains filtered or unexported fields }
StateRunnableTyped represents a compiled state graph that can be invoked with type safety.
func (*StateRunnableTyped[S]) Invoke ¶ added in v0.6.0
func (r *StateRunnableTyped[S]) Invoke(ctx context.Context, initialState S) (S, error)
Invoke executes the compiled state graph with the given input state. Returns the final state with full type safety - no type assertions needed!
Example:
initialState := MyState{Count: 0}
finalState, err := app.Invoke(ctx, initialState)
// finalState is MyState type - no casting needed!
func (*StateRunnableTyped[S]) InvokeWithConfig ¶ added in v0.6.0
func (r *StateRunnableTyped[S]) InvokeWithConfig(ctx context.Context, initialState S, config *Config) (S, error)
InvokeWithConfig executes the compiled state graph with the given input state and config.
func (*StateRunnableTyped[S]) SetTracer ¶ added in v0.6.0
func (r *StateRunnableTyped[S]) SetTracer(tracer *Tracer)
SetTracer sets a tracer for observability.
func (*StateRunnableTyped[S]) WithTracer ¶ added in v0.6.0
func (r *StateRunnableTyped[S]) WithTracer(tracer *Tracer) *StateRunnableTyped[S]
WithTracer returns a new StateRunnableTyped with the given tracer.
type StateSchema ¶
type StateSchema interface {
// Init returns the initial state.
Init() any
// Update merges the new state into the current state.
Update(current, new any) (any, error)
}
StateSchema defines the structure and update logic for the graph state.
type StateSchemaTyped ¶ added in v0.6.0
type StateSchemaTyped[S any] interface { // Init returns the initial state. Init() S // Update merges the new state into the current state. Update(current, new S) (S, error) }
StateSchemaTyped defines the structure and update logic for the graph state with type safety.
type StateSnapshot ¶
type StateSnapshot struct {
Values any
Next []string
Config Config
Metadata map[string]any
CreatedAt time.Time
ParentID string
}
StateSnapshot represents a snapshot of the graph state
type StreamConfig ¶
type StreamConfig struct {
// BufferSize is the size of the event channel buffer
BufferSize int
// EnableBackpressure determines if backpressure handling is enabled
EnableBackpressure bool
// MaxDroppedEvents is the maximum number of events to drop before logging
MaxDroppedEvents int
// Mode specifies what kind of events to stream
Mode StreamMode
}
StreamConfig configures streaming behavior
func DefaultStreamConfig ¶
func DefaultStreamConfig() StreamConfig
DefaultStreamConfig returns the default streaming configuration
type StreamEvent ¶
type StreamEvent struct {
// Timestamp when the event occurred
Timestamp time.Time
// NodeName is the name of the node that generated the event
NodeName string
// Event is the type of event
Event NodeEvent
// State is the current state at the time of the event
State any
// Error contains any error that occurred (if Event is NodeEventError)
Error error
// Metadata contains additional event-specific data
Metadata map[string]any
// Duration is how long the node took (only for Complete events)
Duration time.Duration
}
StreamEvent represents an event in the streaming execution
type StreamEventTyped ¶ added in v0.6.0
type StreamEventTyped[S any] struct { // Timestamp when the event occurred Timestamp time.Time // NodeName is the name of the node that generated the event NodeName string // Event is the type of event Event NodeEvent // State is the current state at the time of the event (typed) State S // Error contains any error that occurred (if Event is NodeEventError) Error error // Metadata contains additional event-specific data Metadata map[string]any // Duration is how long the node took (only for Complete events) Duration time.Duration }
StreamEventTyped represents a typed event in the streaming execution
type StreamMode ¶
type StreamMode string
StreamMode defines the mode of streaming
const ( // StreamModeValues emits the full state after each step StreamModeValues StreamMode = "values" // StreamModeUpdates emits the updates (deltas) from each node StreamModeUpdates StreamMode = "updates" // StreamModeMessages emits LLM messages/tokens (if available) StreamModeMessages StreamMode = "messages" // StreamModeDebug emits all events (default) StreamModeDebug StreamMode = "debug" )
type StreamResult ¶
type StreamResult struct {
// Events channel receives StreamEvent objects in real-time
Events <-chan StreamEvent
// Result channel receives the final result when execution completes
Result <-chan any
// Errors channel receives any errors that occur during execution
Errors <-chan error
// Done channel is closed when streaming is complete
Done <-chan struct{}
// Cancel function can be called to stop streaming
Cancel context.CancelFunc
}
StreamResult contains the channels returned by streaming execution
type StreamingExecutor ¶
type StreamingExecutor struct {
// contains filtered or unexported fields
}
StreamingExecutor provides a high-level interface for streaming execution
func NewStreamingExecutor ¶
func NewStreamingExecutor(runnable *StreamingRunnable) *StreamingExecutor
NewStreamingExecutor creates a new streaming executor
func (*StreamingExecutor) ExecuteAsync ¶
func (se *StreamingExecutor) ExecuteAsync(ctx context.Context, initialState any) *StreamResult
ExecuteAsync executes the graph asynchronously and returns immediately
func (*StreamingExecutor) ExecuteWithCallback ¶
func (se *StreamingExecutor) ExecuteWithCallback( ctx context.Context, initialState any, eventCallback func(event StreamEvent), resultCallback func(result any, err error), ) error
ExecuteWithCallback executes the graph and calls the callback for each event
type StreamingListener ¶
type StreamingListener struct {
// contains filtered or unexported fields
}
StreamingListener implements NodeListener for streaming events
func NewStreamingListener ¶
func NewStreamingListener(eventChan chan<- StreamEvent, config StreamConfig) *StreamingListener
NewStreamingListener creates a new streaming listener
func (*StreamingListener) Close ¶
func (sl *StreamingListener) Close()
Close marks the listener as closed to prevent sending to closed channels
func (*StreamingListener) GetDroppedEventsCount ¶
func (sl *StreamingListener) GetDroppedEventsCount() int
GetDroppedEventsCount returns the number of dropped events
func (*StreamingListener) OnChainEnd ¶
func (*StreamingListener) OnChainError ¶
func (sl *StreamingListener) OnChainError(ctx context.Context, err error, runID string)
func (*StreamingListener) OnChainStart ¶
func (*StreamingListener) OnGraphStep ¶
func (sl *StreamingListener) OnGraphStep(ctx context.Context, stepNode string, state any)
OnGraphStep implements GraphCallbackHandler
func (*StreamingListener) OnLLMEnd ¶
func (sl *StreamingListener) OnLLMEnd(ctx context.Context, response any, runID string)
func (*StreamingListener) OnLLMError ¶
func (sl *StreamingListener) OnLLMError(ctx context.Context, err error, runID string)
func (*StreamingListener) OnLLMStart ¶
func (*StreamingListener) OnNodeEvent ¶
func (sl *StreamingListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state any, err error)
OnNodeEvent implements the NodeListener interface
func (*StreamingListener) OnRetrieverEnd ¶
func (sl *StreamingListener) OnRetrieverEnd(ctx context.Context, documents []any, runID string)
func (*StreamingListener) OnRetrieverError ¶
func (sl *StreamingListener) OnRetrieverError(ctx context.Context, err error, runID string)
func (*StreamingListener) OnRetrieverStart ¶
func (*StreamingListener) OnToolEnd ¶
func (sl *StreamingListener) OnToolEnd(ctx context.Context, output string, runID string)
func (*StreamingListener) OnToolError ¶
func (sl *StreamingListener) OnToolError(ctx context.Context, err error, runID string)
type StreamingListenerTyped ¶ added in v0.6.0
type StreamingListenerTyped[S any] struct { // contains filtered or unexported fields }
StreamingListenerTyped is a listener that streams node events
func (*StreamingListenerTyped[S]) OnNodeEvent ¶ added in v0.6.0
func (sl *StreamingListenerTyped[S]) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state S, err error)
OnNodeEvent implements the NodeListenerTyped interface
type StreamingRunnable ¶
type StreamingRunnable struct {
// contains filtered or unexported fields
}
StreamingRunnable wraps a ListenableRunnable with streaming capabilities
func NewStreamingRunnable ¶
func NewStreamingRunnable(runnable *ListenableRunnable, config StreamConfig) *StreamingRunnable
NewStreamingRunnable creates a new streaming runnable
func NewStreamingRunnableWithDefaults ¶
func NewStreamingRunnableWithDefaults(runnable *ListenableRunnable) *StreamingRunnable
NewStreamingRunnableWithDefaults creates a streaming runnable with default config
func (*StreamingRunnable) GetGraph ¶
func (sr *StreamingRunnable) GetGraph() *Exporter
GetGraph returns a Exporter for the streaming runnable
func (*StreamingRunnable) Stream ¶
func (sr *StreamingRunnable) Stream(ctx context.Context, initialState any) *StreamResult
Stream executes the graph with real-time event streaming
type StreamingStateGraph ¶ added in v0.6.0
type StreamingStateGraph struct {
*ListenableStateGraph
// contains filtered or unexported fields
}
StreamingStateGraph extends ListenableStateGraph with streaming capabilities
func NewStreamingStateGraph ¶ added in v0.6.0
func NewStreamingStateGraph() *StreamingStateGraph
NewStreamingStateGraph creates a new streaming message graph
func NewStreamingStateGraphWithConfig ¶ added in v0.6.0
func NewStreamingStateGraphWithConfig(config StreamConfig) *StreamingStateGraph
NewStreamingStateGraphWithConfig creates a streaming graph with custom config
func (*StreamingStateGraph) CompileStreaming ¶ added in v0.6.0
func (g *StreamingStateGraph) CompileStreaming() (*StreamingRunnable, error)
CompileStreaming compiles the graph into a streaming runnable
func (*StreamingStateGraph) GetStreamConfig ¶ added in v0.6.0
func (g *StreamingStateGraph) GetStreamConfig() StreamConfig
GetStreamConfig returns the current streaming configuration
func (*StreamingStateGraph) SetStreamConfig ¶ added in v0.6.0
func (g *StreamingStateGraph) SetStreamConfig(config StreamConfig)
SetStreamConfig updates the streaming configuration
type StructSchema ¶ added in v0.6.0
StructSchema implements StateSchemaTyped for struct-based states. It provides a simple and type-safe way to manage struct states.
Example:
type MyState struct {
Count int
Logs []string
}
schema := graph.NewStructSchema(
MyState{Count: 0},
func(current, new MyState) (MyState, error) {
// Merge logs (append)
current.Logs = append(current.Logs, new.Logs...)
// Add counts
current.Count += new.Count
return current, nil
},
)
func NewStructSchema ¶ added in v0.6.0
func NewStructSchema[S any](initial S, merge func(S, S) (S, error)) *StructSchema[S]
NewStructSchema creates a new StructSchema with the given initial value and merge function. If merge function is nil, a default merge function will be used that overwrites non-zero fields.
func (*StructSchema[S]) Init ¶ added in v0.6.0
func (s *StructSchema[S]) Init() S
Init returns the initial state.
func (*StructSchema[S]) Update ¶ added in v0.6.0
func (s *StructSchema[S]) Update(current, new S) (S, error)
Update merges the new state into the current state using the merge function.
type Subgraph ¶
type Subgraph struct {
// contains filtered or unexported fields
}
Subgraph represents a nested graph that can be used as a node
func NewSubgraph ¶
func NewSubgraph(name string, graph *StateGraph) (*Subgraph, error)
NewSubgraph creates a new subgraph
type TimeoutNode ¶
type TimeoutNode struct {
// contains filtered or unexported fields
}
TimeoutNode wraps a node with timeout logic
func NewTimeoutNode ¶
func NewTimeoutNode(node Node, timeout time.Duration) *TimeoutNode
NewTimeoutNode creates a new timeout node
type TraceEvent ¶
type TraceEvent string
TraceEvent represents different types of events in graph execution
const ( // TraceEventGraphStart indicates the start of graph execution TraceEventGraphStart TraceEvent = "graph_start" // TraceEventGraphEnd indicates the end of graph execution TraceEventGraphEnd TraceEvent = "graph_end" // TraceEventNodeStart indicates the start of node execution TraceEventNodeStart TraceEvent = "node_start" // TraceEventNodeEnd indicates the end of node execution TraceEventNodeEnd TraceEvent = "node_end" // TraceEventNodeError indicates an error occurred in node execution TraceEventNodeError TraceEvent = "node_error" // TraceEventEdgeTraversal indicates traversal from one node to another TraceEventEdgeTraversal TraceEvent = "edge_traversal" )
type TraceHook ¶
type TraceHook interface {
// OnEvent is called when a trace event occurs
OnEvent(ctx context.Context, span *TraceSpan)
}
TraceHook defines the interface for trace event handlers
type TraceHookFunc ¶
TraceHookFunc is a function adapter for TraceHook
type TraceSpan ¶
type TraceSpan struct {
// ID is a unique identifier for this span
ID string
// ParentID is the ID of the parent span (empty for root spans)
ParentID string
// Event indicates the type of event this span represents
Event TraceEvent
// NodeName is the name of the node being executed (if applicable)
NodeName string
// FromNode is the source node for edge traversals
FromNode string
// ToNode is the destination node for edge traversals
ToNode string
// StartTime is when this span began
StartTime time.Time
// EndTime is when this span completed (zero for ongoing spans)
EndTime time.Time
// Duration is the total time taken (calculated when span ends)
Duration time.Duration
// State is a snapshot of the state at this point (optional)
State any
// Error contains any error that occurred during execution
Error error
// Metadata contains additional key-value pairs for observability
Metadata map[string]any
}
TraceSpan represents a span of execution with timing and metadata
func SpanFromContext ¶
SpanFromContext extracts a span from context
type TracedRunnable ¶
type TracedRunnable struct {
*Runnable
// contains filtered or unexported fields
}
TracedRunnable wraps a Runnable with tracing capabilities
func NewTracedRunnable ¶
func NewTracedRunnable(runnable *Runnable, tracer *Tracer) *TracedRunnable
NewTracedRunnable creates a new traced runnable
func (*TracedRunnable) GetTracer ¶
func (tr *TracedRunnable) GetTracer() *Tracer
GetTracer returns the tracer instance
type Tracer ¶
type Tracer struct {
// contains filtered or unexported fields
}
Tracer manages trace collection and hooks