Documentation
¶
Overview ¶
Package graph provides the core graph construction and execution engine for LangGraph Go.
This package implements the fundamental building blocks for creating stateful, multi-agent applications using directed graphs. It offers both untyped and typed interfaces for building workflows, with support for parallel execution, checkpointing, streaming, and comprehensive event handling.
Core Concepts ¶
StateGraph The primary component for building graphs is StateGraph, which maintains state as it flows through nodes. Each node can process and transform the state before passing it to the next node based on defined edges.
Nodes and Edges Nodes represent processing units (functions, agents, tools) that transform state. Edges define the flow between nodes, supporting conditional routing based on state content.
Typed Support For type safety, the package provides StateGraph[S] which uses Go generics to enforce state types at compile time, reducing runtime errors and improving code maintainability.
Key Features ¶
- Parallel node execution with coordination
- Checkpointing for durable execution with resume capability
- Streaming for real-time event monitoring
- Comprehensive listener system for observability
- Built-in retry mechanisms with configurable policies
- Subgraph composition for modular design
- Graph visualization (Mermaid, ASCII, DOT)
- Interrupt support for human-in-the-loop workflows
Example Usage ¶
Basic State Graph
g := graph.NewStateGraph()
// Add nodes
g.AddNode("process", "Process node", func(ctx context.Context, state any) (any, error) {
// Process the state
s := state.(map[string]any)
s["processed"] = true
return s, nil
})
g.AddNode("validate", "Validate node", func(ctx context.Context, state any) (any, error) {
// Validate the processed state
s := state.(map[string]any)
if s["processed"].(bool) {
s["valid"] = true
}
return s, nil
})
// Set entry point and edges
g.SetEntryPoint("process")
g.AddEdge("process", "validate")
g.AddEdge("validate", graph.END)
// Compile and run
runnable := g.Compile()
result, err := runnable.Invoke(context.Background(), map[string]any{
"data": "example",
})
Typed State Graph
type WorkflowState struct {
Input string `json:"input"`
Output string `json:"output"`
Complete bool `json:"complete"`
}
g := graph.NewStateGraph[WorkflowState]()
g.AddNode("process", "Process the input", func(ctx context.Context, state WorkflowState) (WorkflowState, error) {
state.Output = strings.ToUpper(state.Input)
state.Complete = true
return state, nil
})
// Add validate node
g.AddNode("validate", "Validate the output", func(ctx context.Context, state WorkflowState) (WorkflowState, error) {
return state, nil
})
g.AddNode("retry", "Retry processing", func(ctx context.Context, state WorkflowState) (WorkflowState, error) {
return state, nil
})
// Conditional routing
g.AddConditionalEdge("process", func(ctx context.Context, state WorkflowState) string {
if state.Complete {
return "validate"
}
return "retry"
})
g.AddEdge("validate", graph.END)
g.AddEdge("retry", "process")
Parallel Execution
// Add parallel nodes
g.AddParallelNodes("parallel_tasks", map[string]func(context.Context, any) (any, error){
"task1": func(ctx context.Context, state any) (any, error) {
// First task logic
return state, nil
},
"task2": func(ctx context.Context, state any) (any, error) {
// Second task logic
return state, nil
},
})
Checkpointing
// Note: Checkpointing is handled at the runnable level // See store package examples for checkpointing implementation runnable := g.Compile() // Execute with context result, err := runnable.Invoke(context.Background(), initialState)
Streaming
// Create listenable graph for streaming
g := graph.NewListenableStateGraph()
g.AddNode("process", "Process node", func(ctx context.Context, state map[string]any) (map[string]any, error) {
state["processed"] = true
return state, nil
})
g.SetEntryPoint("process")
g.AddEdge("process", graph.END)
// Compile to listenable runnable
runnable, _ := g.CompileListenable()
// Create streaming runnable
streaming := graph.NewStreamingRunnableWithDefaults(runnable)
// Stream execution
result := streaming.Stream(context.Background(), initialState)
// Process events
for event := range result.Events {
fmt.Printf("Event: %v\n", event)
}
Listener System ¶
The package provides a powerful listener system for monitoring and reacting to graph events:
- ProgressListener: Track execution progress
- LoggingListener: Structured logging of events
- MetricsListener: Collect performance metrics
- ChatListener: Chat-style output formatting
- Custom listeners: Implement NodeListener interface
Error Handling
- Built-in retry policies with exponential backoff
- Custom error filtering for selective retries
- Interrupt handling for pausing execution
- Comprehensive error context in events
Visualization ¶
Export graphs for documentation and debugging:
exporter := graph.NewExporter(g)
// Mermaid diagram
mermaid := exporter.DrawMermaid()
// Mermaid with options
mermaidWithOptions := exporter.DrawMermaidWithOptions(graph.MermaidOptions{
Direction: "LR", // Left to right
})
Thread Safety ¶
All graph structures are thread-safe for read operations. Write operations (adding nodes, edges, or listeners) should be performed before compilation or protected by external synchronization.
Best Practices
- Use typed graphs when possible for better type safety
- Set appropriate buffer sizes for streaming to balance memory and performance
- Implement proper error handling in node functions
- Use checkpoints for long-running or critical workflows
- Add listeners for debugging and monitoring
- Keep node functions pure and stateless when possible
- Use conditional edges for complex routing logic
- Leverage parallel execution for independent tasks
Index ¶
- Constants
- Variables
- func AddMessages(current, new any) (any, error)
- func AddNestedConditionalSubgraph[S, SubS any](g *StateGraph[S], name string, router func(S) string, ...) error
- func AddRecursiveSubgraph[S, SubS any](g *StateGraph[S], name string, maxDepth int, condition func(SubS, int) bool, ...) error
- func AddSubgraph[S, SubS any](g *StateGraph[S], name string, subgraph *StateGraph[SubS], ...) error
- func AppendReducer(current, new any) (any, error)
- func AppendSliceMerge(current, new reflect.Value) reflect.Value
- func ContextWithSpan(ctx context.Context, span *TraceSpan) context.Context
- func CreateSubgraph[S, SubS any](g *StateGraph[S], name string, builder func(*StateGraph[SubS]) error, ...) error
- func DefaultStructMerge[S any](current, new S) (S, error)
- func ExponentialBackoffRetry(ctx context.Context, fn func() (any, error), maxAttempts int, ...) (any, error)
- func GetResumeValue(ctx context.Context) any
- func Interrupt(ctx context.Context, value any) (any, error)
- func KeepCurrentMerge(current, new reflect.Value) reflect.Value
- func MaxIntMerge(current, new reflect.Value) reflect.Value
- func MinIntMerge(current, new reflect.Value) reflect.Value
- func NewFileCheckpointStore(path string) (store.CheckpointStore, error)
- func NewMemoryCheckpointStore() store.CheckpointStore
- func OverwriteMerge(current, new reflect.Value) reflect.Value
- func OverwriteReducer(current, new any) (any, error)
- func OverwriteStructMerge[S any](current, new S) (S, error)
- func SafeGo(wg *sync.WaitGroup, fn func(), onPanic func(any))
- func SumIntMerge(current, new reflect.Value) reflect.Value
- func WithConfig(ctx context.Context, config *Config) context.Context
- func WithResumeValue(ctx context.Context, value any) context.Context
- type BackoffStrategy
- type CallbackHandler
- type ChatListener
- type Checkpoint
- type CheckpointConfig
- type CheckpointListener
- func (cl *CheckpointListener[S]) OnChainEnd(context.Context, map[string]any, string)
- func (cl *CheckpointListener[S]) OnChainError(context.Context, error, string)
- func (cl *CheckpointListener[S]) OnChainStart(context.Context, map[string]any, map[string]any, string, *string, []string, ...)
- func (cl *CheckpointListener[S]) OnGraphStep(ctx context.Context, nodeName string, state any)
- func (cl *CheckpointListener[S]) OnLLMEnd(context.Context, any, string)
- func (cl *CheckpointListener[S]) OnLLMError(context.Context, error, string)
- func (cl *CheckpointListener[S]) OnLLMStart(context.Context, map[string]any, []string, string, *string, []string, ...)
- func (cl *CheckpointListener[S]) OnRetrieverEnd(context.Context, []any, string)
- func (cl *CheckpointListener[S]) OnRetrieverError(context.Context, error, string)
- func (cl *CheckpointListener[S]) OnRetrieverStart(context.Context, map[string]any, string, string, *string, []string, ...)
- func (cl *CheckpointListener[S]) OnToolEnd(context.Context, string, string)
- func (cl *CheckpointListener[S]) OnToolError(context.Context, error, string)
- func (cl *CheckpointListener[S]) OnToolStart(context.Context, map[string]any, string, string, *string, []string, ...)
- type CheckpointStore
- type CheckpointableRunnable
- func (cr *CheckpointableRunnable[S]) ClearCheckpoints(ctx context.Context) error
- func (cr *CheckpointableRunnable[S]) GetExecutionID() string
- func (cr *CheckpointableRunnable[S]) GetGraph() *ListenableStateGraph[S]
- func (cr *CheckpointableRunnable[S]) GetState(ctx context.Context, config *Config) (*StateSnapshot, error)
- func (cr *CheckpointableRunnable[S]) GetTracer() *Tracer
- func (cr *CheckpointableRunnable[S]) Invoke(ctx context.Context, initialState S) (S, error)
- func (cr *CheckpointableRunnable[S]) InvokeWithConfig(ctx context.Context, initialState S, config *Config) (S, error)
- func (cr *CheckpointableRunnable[S]) ListCheckpoints(ctx context.Context) ([]*store.Checkpoint, error)
- func (cr *CheckpointableRunnable[S]) LoadCheckpoint(ctx context.Context, checkpointID string) (*store.Checkpoint, error)
- func (cr *CheckpointableRunnable[S]) SaveCheckpoint(ctx context.Context, nodeName string, state S) error
- func (cr *CheckpointableRunnable[S]) SetExecutionID(executionID string)
- func (cr *CheckpointableRunnable[S]) SetTracer(tracer *Tracer)
- func (cr *CheckpointableRunnable[S]) Stream(ctx context.Context, initialState S) <-chan StreamEvent[S]
- func (cr *CheckpointableRunnable[S]) UpdateState(ctx context.Context, config *Config, asNode string, values S) (*Config, error)
- func (cr *CheckpointableRunnable[S]) WithTracer(tracer *Tracer) *CheckpointableRunnable[S]
- type CheckpointableStateGraph
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerState
- type Command
- type CompositeGraph
- type Config
- type Edge
- type Exporter
- type FieldMerger
- type GraphCallbackHandler
- type GraphInterrupt
- type ListenableNode
- func (ln *ListenableNode[S]) AddListener(listener NodeListener[S]) *ListenableNode[S]
- func (ln *ListenableNode[S]) AddListenerWithID(listener NodeListener[S]) string
- func (ln *ListenableNode[S]) Execute(ctx context.Context, state S) (S, error)
- func (ln *ListenableNode[S]) GetListenerIDs() []string
- func (ln *ListenableNode[S]) GetListeners() []NodeListener[S]
- func (ln *ListenableNode[S]) NotifyListeners(ctx context.Context, event NodeEvent, state S, err error)
- func (ln *ListenableNode[S]) RemoveListener(listenerID string)
- func (ln *ListenableNode[S]) RemoveListenerByFunc(listener NodeListener[S])
- type ListenableRunnable
- func (lr *ListenableRunnable[S]) GetGraph() *Exporter[S]
- func (lr *ListenableRunnable[S]) GetListenableGraph() *ListenableStateGraph[S]
- func (lr *ListenableRunnable[S]) GetTracer() *Tracer
- func (lr *ListenableRunnable[S]) Invoke(ctx context.Context, initialState S) (S, error)
- func (lr *ListenableRunnable[S]) InvokeWithConfig(ctx context.Context, initialState S, config *Config) (S, error)
- func (lr *ListenableRunnable[S]) SetTracer(tracer *Tracer)
- func (lr *ListenableRunnable[S]) Stream(ctx context.Context, initialState S) <-chan StreamEvent[S]
- func (lr *ListenableRunnable[S]) WithTracer(tracer *Tracer) *ListenableRunnable[S]
- type ListenableRunnableMap
- type ListenableStateGraph
- func (g *ListenableStateGraph[S]) AddGlobalListener(listener NodeListener[S])
- func (g *ListenableStateGraph[S]) AddNode(name string, description string, ...) *ListenableNode[S]
- func (g *ListenableStateGraph[S]) CompileListenable() (*ListenableRunnable[S], error)
- func (g *ListenableStateGraph[S]) GetListenableNode(name string) *ListenableNode[S]
- func (g *ListenableStateGraph[S]) RemoveGlobalListener(listener NodeListener[S])
- func (g *ListenableStateGraph[S]) RemoveGlobalListenerByID(listenerID string)
- type ListenableStateGraphMap
- type LogLevel
- type LoggingListener
- type MapReduceNode
- type MapSchema
- type MermaidOptions
- type MessageWithID
- type MetricsListener
- func (ml *MetricsListener) GetNodeAverageDuration() map[string]time.Duration
- func (ml *MetricsListener) GetNodeErrors() map[string]int
- func (ml *MetricsListener) GetNodeExecutions() map[string]int
- func (ml *MetricsListener) GetTotalExecutions() int
- func (ml *MetricsListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, _ map[string]any, _ error)
- func (ml *MetricsListener) PrintSummary(writer io.Writer)
- func (ml *MetricsListener) Reset()
- type NoOpCallbackHandler
- func (n *NoOpCallbackHandler) OnChainEnd(ctx context.Context, outputs map[string]any, runID string)
- func (n *NoOpCallbackHandler) OnChainError(ctx context.Context, err error, runID string)
- func (n *NoOpCallbackHandler) OnChainStart(ctx context.Context, serialized map[string]any, inputs map[string]any, ...)
- func (n *NoOpCallbackHandler) OnLLMEnd(ctx context.Context, response any, runID string)
- func (n *NoOpCallbackHandler) OnLLMError(ctx context.Context, err error, runID string)
- func (n *NoOpCallbackHandler) OnLLMStart(ctx context.Context, serialized map[string]any, prompts []string, runID string, ...)
- func (n *NoOpCallbackHandler) OnRetrieverEnd(ctx context.Context, documents []any, runID string)
- func (n *NoOpCallbackHandler) OnRetrieverError(ctx context.Context, err error, runID string)
- func (n *NoOpCallbackHandler) OnRetrieverStart(ctx context.Context, serialized map[string]any, query string, runID string, ...)
- func (n *NoOpCallbackHandler) OnToolEnd(ctx context.Context, output string, runID string)
- func (n *NoOpCallbackHandler) OnToolError(ctx context.Context, err error, runID string)
- func (n *NoOpCallbackHandler) OnToolStart(ctx context.Context, serialized map[string]any, inputStr string, runID string, ...)
- type NodeEvent
- type NodeInterrupt
- type NodeListener
- type NodeListenerFunc
- type ParallelNode
- type ProgressListener
- func (pl *ProgressListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state map[string]any, ...)
- func (pl *ProgressListener) SetNodeStep(nodeName, step string)
- func (pl *ProgressListener) WithDetails(enabled bool) *ProgressListener
- func (pl *ProgressListener) WithPrefix(prefix string) *ProgressListener
- func (pl *ProgressListener) WithTiming(enabled bool) *ProgressListener
- type RateLimiter
- type RecursiveSubgraph
- type Reducer
- type RetryConfig
- type RetryNode
- type RetryPolicy
- type Runnable
- type StateGraph
- func (g *StateGraph[S]) AddConditionalEdge(from string, condition func(ctx context.Context, state S) string)
- func (g *StateGraph[S]) AddEdge(from, to string)
- func (g *StateGraph[S]) AddMapReduceNode(name string, mapFunctions map[string]func(context.Context, S) (S, error), ...)
- func (g *StateGraph[S]) AddNode(name string, description string, ...)
- func (g *StateGraph[S]) AddNodeWithCircuitBreaker(name string, description string, fn func(context.Context, S) (S, error), ...)
- func (g *StateGraph[S]) AddNodeWithRateLimit(name string, description string, fn func(context.Context, S) (S, error), ...)
- func (g *StateGraph[S]) AddNodeWithRetry(name string, description string, fn func(context.Context, S) (S, error), ...)
- func (g *StateGraph[S]) AddNodeWithTimeout(name string, description string, fn func(context.Context, S) (S, error), ...)
- func (g *StateGraph[S]) AddParallelNodes(groupName string, nodes map[string]func(context.Context, S) (S, error), ...)
- func (g *StateGraph[S]) Compile() (*StateRunnable[S], error)
- func (g *StateGraph[S]) FanOutFanIn(source string, _ []string, collector string, ...)
- func (g *StateGraph[S]) SetEntryPoint(name string)
- func (g *StateGraph[S]) SetRetryPolicy(policy *RetryPolicy)
- func (g *StateGraph[S]) SetSchema(schema StateSchema[S])
- func (g *StateGraph[S]) SetStateMerger(merger TypedStateMerger[S])
- type StateGraphMap
- type StateRunnable
- func (r *StateRunnable[S]) GetTracer() *Tracer
- func (r *StateRunnable[S]) Invoke(ctx context.Context, initialState S) (S, error)
- func (r *StateRunnable[S]) InvokeWithConfig(ctx context.Context, initialState S, config *Config) (S, error)
- func (r *StateRunnable[S]) SetTracer(tracer *Tracer)
- func (r *StateRunnable[S]) WithTracer(tracer *Tracer) *StateRunnable[S]
- type StateSchema
- type StateSnapshot
- type StateTracedRunnable
- type StreamConfig
- type StreamEvent
- type StreamMode
- type StreamResult
- type StreamingExecutor
- type StreamingListener
- type StreamingRunnable
- func (sr *StreamingRunnable[S]) GetGraph() *Exporter[S]
- func (sr *StreamingRunnable[S]) GetTracer() *Tracer
- func (sr *StreamingRunnable[S]) SetTracer(tracer *Tracer)
- func (sr *StreamingRunnable[S]) Stream(ctx context.Context, initialState S) *StreamResult[S]
- func (sr *StreamingRunnable[S]) WithTracer(tracer *Tracer) *StreamingRunnable[S]
- type StreamingStateGraph
- type StructSchema
- type Subgraph
- type TimeoutNode
- type TraceEvent
- type TraceHook
- type TraceHookFunc
- type TraceSpan
- type Tracer
- func (t *Tracer) AddHook(hook TraceHook)
- func (t *Tracer) Clear()
- func (t *Tracer) EndSpan(ctx context.Context, span *TraceSpan, state any, err error)
- func (t *Tracer) GetSpans() map[string]*TraceSpan
- func (t *Tracer) StartSpan(ctx context.Context, event TraceEvent, nodeName string) *TraceSpan
- func (t *Tracer) TraceEdgeTraversal(ctx context.Context, fromNode, toNode string)
- type TypedNode
- type TypedStateMerger
Examples ¶
Constants ¶
const END = "END"
END is a special constant used to represent the end node in the graph.
Variables ¶
var ( // ErrEntryPointNotSet is returned when the entry point of the graph is not set. ErrEntryPointNotSet = errors.New("entry point not set") // ErrNodeNotFound is returned when a node is not found in the graph. ErrNodeNotFound = errors.New("node not found") // ErrNoOutgoingEdge is returned when no outgoing edge is found for a node. ErrNoOutgoingEdge = errors.New("no outgoing edge found for node") )
Functions ¶
func AddMessages ¶
AddMessages is a reducer designed for merging chat messages. It handles ID-based deduplication and upserts. If a new message has the same ID as an existing one, it replaces the existing one. Otherwise, it appends the new message.
func AddNestedConditionalSubgraph ¶ added in v0.8.0
func AddNestedConditionalSubgraph[S, SubS any]( g *StateGraph[S], name string, router func(S) string, subgraphs map[string]*StateGraph[SubS], converter func(S) SubS, resultConverter func(SubS) S, ) error
AddNestedConditionalSubgraph creates a subgraph with its own conditional routing
func AddRecursiveSubgraph ¶ added in v0.8.0
func AddRecursiveSubgraph[S, SubS any]( g *StateGraph[S], name string, maxDepth int, condition func(SubS, int) bool, builder func(*StateGraph[SubS]) error, converter func(S) SubS, resultConverter func(SubS) S, ) error
AddRecursiveSubgraph adds a recursive subgraph to the parent graph
func AddSubgraph ¶ added in v0.8.0
func AddSubgraph[S, SubS any](g *StateGraph[S], name string, subgraph *StateGraph[SubS], converter func(S) SubS, resultConverter func(SubS) S) error
AddSubgraph adds a subgraph as a node in the parent graph
func AppendReducer ¶
AppendReducer appends the new value to the current slice. It supports appending a slice to a slice, or a single element to a slice.
func AppendSliceMerge ¶ added in v0.6.0
AppendSliceMerge appends new slice to current slice.
func ContextWithSpan ¶
ContextWithSpan returns a new context with the span stored
func CreateSubgraph ¶ added in v0.8.0
func CreateSubgraph[S, SubS any](g *StateGraph[S], name string, builder func(*StateGraph[SubS]) error, converter func(S) SubS, resultConverter func(SubS) S) error
CreateSubgraph creates and adds a subgraph using a builder function
func DefaultStructMerge ¶ added in v0.6.0
DefaultStructMerge provides a default merge function for struct states. It uses reflection to merge non-zero fields from new into current. This is a sensible default for most struct types.
func ExponentialBackoffRetry ¶
func ExponentialBackoffRetry( ctx context.Context, fn func() (any, error), maxAttempts int, baseDelay time.Duration, ) (any, error)
ExponentialBackoffRetry implements exponential backoff with jitter
func GetResumeValue ¶
GetResumeValue retrieves the resume value from the context.
func Interrupt ¶
Interrupt pauses execution and waits for input. If resuming, it returns the value provided in the resume command.
func KeepCurrentMerge ¶ added in v0.6.0
KeepCurrentMerge always keeps the current value (ignores new).
func MaxIntMerge ¶ added in v0.6.0
MaxIntMerge takes the maximum of two integer values.
func MinIntMerge ¶ added in v0.6.0
MinIntMerge takes the minimum of two integer values.
func NewFileCheckpointStore ¶
func NewFileCheckpointStore(path string) (store.CheckpointStore, error)
NewFileCheckpointStore creates a new file-based checkpoint store
func NewMemoryCheckpointStore ¶
func NewMemoryCheckpointStore() store.CheckpointStore
NewMemoryCheckpointStore creates a new in-memory checkpoint store
func OverwriteMerge ¶ added in v0.6.0
OverwriteMerge always uses the new value.
func OverwriteReducer ¶
OverwriteReducer replaces the old value with the new one.
func OverwriteStructMerge ¶ added in v0.6.0
OverwriteStructMerge is a merge function that completely replaces the current state with the new state.
func SafeGo ¶ added in v0.6.0
SafeGo runs a function in a goroutine with panic recovery. It uses a WaitGroup (if provided) and supports a custom panic handler.
func SumIntMerge ¶ added in v0.6.0
SumIntMerge adds two integer values.
func WithConfig ¶
WithConfig adds the config to the context
Types ¶
type BackoffStrategy ¶
type BackoffStrategy int
BackoffStrategy defines different backoff strategies
const ( FixedBackoff BackoffStrategy = iota ExponentialBackoff LinearBackoff )
type CallbackHandler ¶
type CallbackHandler interface {
// Chain callbacks (for graph/workflow execution)
OnChainStart(ctx context.Context, serialized map[string]any, inputs map[string]any, runID string, parentRunID *string, tags []string, metadata map[string]any)
OnChainEnd(ctx context.Context, outputs map[string]any, runID string)
OnChainError(ctx context.Context, err error, runID string)
// LLM callbacks (for AI model calls)
OnLLMStart(ctx context.Context, serialized map[string]any, prompts []string, runID string, parentRunID *string, tags []string, metadata map[string]any)
OnLLMEnd(ctx context.Context, response any, runID string)
OnLLMError(ctx context.Context, err error, runID string)
// Tool callbacks (for tool/function calls)
OnToolStart(ctx context.Context, serialized map[string]any, inputStr string, runID string, parentRunID *string, tags []string, metadata map[string]any)
OnToolEnd(ctx context.Context, output string, runID string)
OnToolError(ctx context.Context, err error, runID string)
// Retriever callbacks (for data retrieval operations)
OnRetrieverStart(ctx context.Context, serialized map[string]any, query string, runID string, parentRunID *string, tags []string, metadata map[string]any)
OnRetrieverEnd(ctx context.Context, documents []any, runID string)
OnRetrieverError(ctx context.Context, err error, runID string)
}
CallbackHandler defines the interface for handling graph execution callbacks This matches Python's LangChain callback pattern
type ChatListener ¶
type ChatListener struct {
// contains filtered or unexported fields
}
ChatListener provides real-time chat-style updates
func NewChatListener ¶
func NewChatListener() *ChatListener
NewChatListener creates a new chat-style listener
func NewChatListenerWithWriter ¶
func NewChatListenerWithWriter(writer io.Writer) *ChatListener
NewChatListenerWithWriter creates a chat listener with custom writer
func (*ChatListener) OnNodeEvent ¶
func (cl *ChatListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, _ map[string]any, err error)
OnNodeEvent implements the NodeListener[map[string]any] interface
func (*ChatListener) SetNodeMessage ¶
func (cl *ChatListener) SetNodeMessage(nodeName, message string)
SetNodeMessage sets a custom message for a specific node
func (*ChatListener) WithTime ¶
func (cl *ChatListener) WithTime(enabled bool) *ChatListener
WithTime enables or disables timestamps
type CheckpointConfig ¶
type CheckpointConfig struct {
// Store is the checkpoint storage backend
Store store.CheckpointStore
// AutoSave enables automatic checkpointing after each node
AutoSave bool
// SaveInterval specifies how often to save (when AutoSave is false)
SaveInterval time.Duration
// MaxCheckpoints limits the number of checkpoints to keep
MaxCheckpoints int
}
CheckpointConfig configures checkpointing behavior
func DefaultCheckpointConfig ¶
func DefaultCheckpointConfig() CheckpointConfig
DefaultCheckpointConfig returns a default checkpoint configuration
type CheckpointListener ¶
type CheckpointListener[S any] struct { // contains filtered or unexported fields }
CheckpointListener automatically creates checkpoints during execution
func (*CheckpointListener[S]) OnChainEnd ¶ added in v0.8.0
func (*CheckpointListener[S]) OnChainError ¶ added in v0.8.0
func (cl *CheckpointListener[S]) OnChainError(context.Context, error, string)
func (*CheckpointListener[S]) OnChainStart ¶ added in v0.8.0
func (cl *CheckpointListener[S]) OnChainStart(context.Context, map[string]any, map[string]any, string, *string, []string, map[string]any)
Implement other methods of CallbackHandler as no-ops
func (*CheckpointListener[S]) OnGraphStep ¶
func (cl *CheckpointListener[S]) OnGraphStep(ctx context.Context, nodeName string, state any)
OnGraphStep is called after a step in the graph has completed and the state has been merged.
func (*CheckpointListener[S]) OnLLMEnd ¶ added in v0.8.0
func (cl *CheckpointListener[S]) OnLLMEnd(context.Context, any, string)
func (*CheckpointListener[S]) OnLLMError ¶ added in v0.8.0
func (cl *CheckpointListener[S]) OnLLMError(context.Context, error, string)
func (*CheckpointListener[S]) OnLLMStart ¶ added in v0.8.0
func (*CheckpointListener[S]) OnRetrieverEnd ¶ added in v0.8.0
func (cl *CheckpointListener[S]) OnRetrieverEnd(context.Context, []any, string)
func (*CheckpointListener[S]) OnRetrieverError ¶ added in v0.8.0
func (cl *CheckpointListener[S]) OnRetrieverError(context.Context, error, string)
func (*CheckpointListener[S]) OnRetrieverStart ¶ added in v0.8.0
func (*CheckpointListener[S]) OnToolEnd ¶ added in v0.8.0
func (cl *CheckpointListener[S]) OnToolEnd(context.Context, string, string)
func (*CheckpointListener[S]) OnToolError ¶ added in v0.8.0
func (cl *CheckpointListener[S]) OnToolError(context.Context, error, string)
type CheckpointStore ¶
type CheckpointStore = store.CheckpointStore
CheckpointStore is an alias for store.CheckpointStore
type CheckpointableRunnable ¶
type CheckpointableRunnable[S any] struct { // contains filtered or unexported fields }
CheckpointableRunnable[S] wraps a ListenableRunnable[S] with checkpointing capabilities
func NewCheckpointableRunnable ¶
func NewCheckpointableRunnable[S any](runnable *ListenableRunnable[S], config CheckpointConfig) *CheckpointableRunnable[S]
NewCheckpointableRunnable creates a new checkpointable runnable from a listenable runnable
func (*CheckpointableRunnable[S]) ClearCheckpoints ¶
func (cr *CheckpointableRunnable[S]) ClearCheckpoints(ctx context.Context) error
ClearCheckpoints removes all checkpoints for this execution
func (*CheckpointableRunnable[S]) GetExecutionID ¶ added in v0.8.0
func (cr *CheckpointableRunnable[S]) GetExecutionID() string
GetExecutionID returns the current execution ID
func (*CheckpointableRunnable[S]) GetGraph ¶ added in v0.8.0
func (cr *CheckpointableRunnable[S]) GetGraph() *ListenableStateGraph[S]
GetGraph returns the underlying graph
func (*CheckpointableRunnable[S]) GetState ¶
func (cr *CheckpointableRunnable[S]) GetState(ctx context.Context, config *Config) (*StateSnapshot, error)
GetState retrieves the state for the given config
func (*CheckpointableRunnable[S]) GetTracer ¶ added in v0.8.0
func (cr *CheckpointableRunnable[S]) GetTracer() *Tracer
GetTracer returns the tracer from the underlying runnable
func (*CheckpointableRunnable[S]) Invoke ¶
func (cr *CheckpointableRunnable[S]) Invoke(ctx context.Context, initialState S) (S, error)
Invoke executes the graph with checkpointing support
func (*CheckpointableRunnable[S]) InvokeWithConfig ¶
func (cr *CheckpointableRunnable[S]) InvokeWithConfig(ctx context.Context, initialState S, config *Config) (S, error)
InvokeWithConfig executes the graph with checkpointing support and config
func (*CheckpointableRunnable[S]) ListCheckpoints ¶
func (cr *CheckpointableRunnable[S]) ListCheckpoints(ctx context.Context) ([]*store.Checkpoint, error)
ListCheckpoints lists all checkpoints for the current execution
func (*CheckpointableRunnable[S]) LoadCheckpoint ¶
func (cr *CheckpointableRunnable[S]) LoadCheckpoint(ctx context.Context, checkpointID string) (*store.Checkpoint, error)
LoadCheckpoint loads a specific checkpoint
func (*CheckpointableRunnable[S]) SaveCheckpoint ¶
func (cr *CheckpointableRunnable[S]) SaveCheckpoint(ctx context.Context, nodeName string, state S) error
SaveCheckpoint manually saves a checkpoint at the current state
func (*CheckpointableRunnable[S]) SetExecutionID ¶ added in v0.8.0
func (cr *CheckpointableRunnable[S]) SetExecutionID(executionID string)
SetExecutionID sets a new execution ID
func (*CheckpointableRunnable[S]) SetTracer ¶ added in v0.8.0
func (cr *CheckpointableRunnable[S]) SetTracer(tracer *Tracer)
SetTracer sets the tracer on the underlying runnable
func (*CheckpointableRunnable[S]) Stream ¶ added in v0.8.0
func (cr *CheckpointableRunnable[S]) Stream(ctx context.Context, initialState S) <-chan StreamEvent[S]
Stream executes the graph with checkpointing and streaming support
func (*CheckpointableRunnable[S]) UpdateState ¶
func (cr *CheckpointableRunnable[S]) UpdateState(ctx context.Context, config *Config, asNode string, values S) (*Config, error)
UpdateState updates the state and saves a checkpoint.
func (*CheckpointableRunnable[S]) WithTracer ¶ added in v0.8.0
func (cr *CheckpointableRunnable[S]) WithTracer(tracer *Tracer) *CheckpointableRunnable[S]
WithTracer returns a new CheckpointableRunnable with the given tracer
type CheckpointableStateGraph ¶ added in v0.6.0
type CheckpointableStateGraph[S any] struct { *ListenableStateGraph[S] // contains filtered or unexported fields }
CheckpointableStateGraph[S any] extends ListenableStateGraph[S] with checkpointing
func NewCheckpointableStateGraph ¶ added in v0.6.0
func NewCheckpointableStateGraph[S any]() *CheckpointableStateGraph[S]
NewCheckpointableStateGraph creates a new checkpointable state graph with type parameter
func NewCheckpointableStateGraphWithConfig ¶ added in v0.6.0
func NewCheckpointableStateGraphWithConfig[S any](config CheckpointConfig) *CheckpointableStateGraph[S]
NewCheckpointableStateGraphWithConfig creates a checkpointable graph with custom config
func (*CheckpointableStateGraph[S]) CompileCheckpointable ¶ added in v0.6.0
func (g *CheckpointableStateGraph[S]) CompileCheckpointable() (*CheckpointableRunnable[S], error)
CompileCheckpointable compiles the graph into a checkpointable runnable
func (*CheckpointableStateGraph[S]) GetCheckpointConfig ¶ added in v0.6.0
func (g *CheckpointableStateGraph[S]) GetCheckpointConfig() CheckpointConfig
GetCheckpointConfig returns the current checkpointing configuration
func (*CheckpointableStateGraph[S]) SetCheckpointConfig ¶ added in v0.6.0
func (g *CheckpointableStateGraph[S]) SetCheckpointConfig(config CheckpointConfig)
SetCheckpointConfig updates the checkpointing configuration
type CircuitBreaker ¶
type CircuitBreaker[S any] struct { // contains filtered or unexported fields }
CircuitBreaker implements the circuit breaker pattern
func NewCircuitBreaker ¶
func NewCircuitBreaker[S any](node TypedNode[S], config CircuitBreakerConfig) *CircuitBreaker[S]
NewCircuitBreaker creates a new circuit breaker
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
FailureThreshold int // Number of failures before opening
SuccessThreshold int // Number of successes before closing
Timeout time.Duration // Time before attempting to close
HalfOpenMaxCalls int // Max calls in half-open state
}
CircuitBreakerConfig configures circuit breaker behavior
type CircuitBreakerState ¶
type CircuitBreakerState int
CircuitBreakerState represents the state of a circuit breaker
const ( CircuitClosed CircuitBreakerState = iota CircuitOpen CircuitHalfOpen )
type Command ¶
type Command struct {
// Update is the value to update the state with.
// It will be processed by the schema's reducers.
Update any
// Goto specifies the next node(s) to execute.
// If set, it overrides the graph's edges.
// Can be a single string (node name) or []string.
Goto any
}
Command allows a node to dynamically update the state and control the flow. It can be returned by a node function instead of a direct state update.
type CompositeGraph ¶
type CompositeGraph[S any] struct { // contains filtered or unexported fields }
CompositeGraph allows composing multiple graphs together
func NewCompositeGraph ¶
func NewCompositeGraph[S any]() *CompositeGraph[S]
NewCompositeGraph creates a new composite graph
func (*CompositeGraph[S]) AddGraph ¶
func (cg *CompositeGraph[S]) AddGraph(name string, graph *StateGraph[S])
AddGraph adds a named graph to the composite
func (*CompositeGraph[S]) Compile ¶
func (cg *CompositeGraph[S]) Compile() (*StateRunnable[S], error)
Compile compiles the composite graph into a single runnable
type Config ¶
type Config struct {
// Callbacks to be invoked during execution
Callbacks []CallbackHandler `json:"callbacks"`
// Metadata to attach to the execution
Metadata map[string]any `json:"metadata"`
// Tags to categorize the execution
Tags []string `json:"tags"`
// Configurable parameters for the execution
Configurable map[string]any `json:"configurable"`
// RunName for this execution
RunName string `json:"run_name"`
// Timeout for the execution
Timeout *time.Duration `json:"timeout"`
// InterruptBefore nodes to stop before execution
InterruptBefore []string `json:"interrupt_before"`
// InterruptAfter nodes to stop after execution
InterruptAfter []string `json:"interrupt_after"`
// ResumeFrom nodes to start execution from (bypassing entry point)
ResumeFrom []string `json:"resume_from"`
// ResumeValue provides the value to return from an Interrupt() call when resuming
ResumeValue any `json:"resume_value"`
}
Config represents configuration for graph invocation This matches Python's config dict pattern
func WithInterruptAfter ¶ added in v0.8.2
WithInterruptAfter creates a Config with interrupt points set after specified nodes.
Example:
config := graph.WithInterruptAfter("node1", "node2")
result, err := runnable.Invoke(ctx, state, config)
func WithInterruptBefore ¶ added in v0.8.2
WithInterruptBefore creates a Config with interrupt points set before specified nodes.
Example:
config := graph.WithInterruptBefore("node1", "node2")
result, err := runnable.Invoke(ctx, state, config)
func WithThreadID ¶ added in v0.8.2
WithThreadID creates a Config with the given thread_id set in the configurable map. This is a convenience function for setting up checkpoint-based conversation resumption.
Example:
result, err := runnable.Invoke(ctx, state, graph.WithThreadID("conversation-1"))
type Edge ¶
type Edge struct {
// From is the name of the node from which the edge originates.
From string
// To is the name of the node to which the edge points.
To string
}
Edge represents an edge in the graph.
type Exporter ¶
type Exporter[S any] struct { // contains filtered or unexported fields }
Exporter provides methods to export graphs in different formats
func GetGraphForRunnable ¶ added in v0.8.0
GetGraphForRunnable returns a Exporter for the compiled graph's visualization
func NewExporter ¶
func NewExporter[S any](graph *StateGraph[S]) *Exporter[S]
NewExporter creates a new graph exporter for the given graph
func (*Exporter[S]) DrawMermaid ¶
DrawMermaid generates a Mermaid diagram representation of the graph
func (*Exporter[S]) DrawMermaidWithOptions ¶
func (ge *Exporter[S]) DrawMermaidWithOptions(opts MermaidOptions) string
DrawMermaidWithOptions generates a Mermaid diagram with custom options
type FieldMerger ¶ added in v0.6.0
type FieldMerger[S any] struct { InitialValue S FieldMergeFns map[string]func(currentVal, newVal reflect.Value) reflect.Value }
FieldMerger provides fine-grained control over how individual struct fields are merged.
func NewFieldMerger ¶ added in v0.6.0
func NewFieldMerger[S any](initial S) *FieldMerger[S]
NewFieldMerger creates a new FieldMerger with the given initial value.
func (*FieldMerger[S]) Init ¶ added in v0.6.0
func (fm *FieldMerger[S]) Init() S
Init returns the initial state.
func (*FieldMerger[S]) RegisterFieldMerge ¶ added in v0.6.0
func (fm *FieldMerger[S]) RegisterFieldMerge(fieldName string, mergeFn func(currentVal, newVal reflect.Value) reflect.Value)
RegisterFieldMerge registers a custom merge function for a specific field.
func (*FieldMerger[S]) Update ¶ added in v0.6.0
func (fm *FieldMerger[S]) Update(current, new S) (S, error)
Update merges the new state into the current state using registered field merge functions.
type GraphCallbackHandler ¶
type GraphCallbackHandler interface {
CallbackHandler
// OnGraphStep is called after a step (node execution + state update) is completed
OnGraphStep(ctx context.Context, stepNode string, state any)
}
GraphCallbackHandler extends CallbackHandler with graph-specific events
type GraphInterrupt ¶
type GraphInterrupt struct {
// Node that caused the interruption
Node string
// State at the time of interruption
State any
// NextNodes that would have been executed if not interrupted
NextNodes []string
// InterruptValue is the value provided by the dynamic interrupt (if any)
InterruptValue any
}
GraphInterrupt is returned when execution is interrupted by configuration or dynamic interrupt
func (*GraphInterrupt) Error ¶
func (e *GraphInterrupt) Error() string
type ListenableNode ¶
ListenableNode extends TypedNode with listener capabilities
func NewListenableNode ¶
func NewListenableNode[S any](node TypedNode[S]) *ListenableNode[S]
NewListenableNode creates a new listenable node from a regular typed node
func (*ListenableNode[S]) AddListener ¶
func (ln *ListenableNode[S]) AddListener(listener NodeListener[S]) *ListenableNode[S]
AddListener adds a listener to the node and returns the listenable node for chaining
func (*ListenableNode[S]) AddListenerWithID ¶ added in v0.8.0
func (ln *ListenableNode[S]) AddListenerWithID(listener NodeListener[S]) string
AddListenerWithID adds a listener to the node and returns its ID
func (*ListenableNode[S]) Execute ¶
func (ln *ListenableNode[S]) Execute(ctx context.Context, state S) (S, error)
Execute runs the node function with listener notifications
func (*ListenableNode[S]) GetListenerIDs ¶ added in v0.8.0
func (ln *ListenableNode[S]) GetListenerIDs() []string
GetListenerIDs returns a copy of the current listener IDs
func (*ListenableNode[S]) GetListeners ¶
func (ln *ListenableNode[S]) GetListeners() []NodeListener[S]
GetListeners returns a copy of the current listeners
func (*ListenableNode[S]) NotifyListeners ¶
func (ln *ListenableNode[S]) NotifyListeners(ctx context.Context, event NodeEvent, state S, err error)
NotifyListeners notifies all listeners of an event
func (*ListenableNode[S]) RemoveListener ¶
func (ln *ListenableNode[S]) RemoveListener(listenerID string)
RemoveListener removes a listener from the node by ID
func (*ListenableNode[S]) RemoveListenerByFunc ¶ added in v0.8.0
func (ln *ListenableNode[S]) RemoveListenerByFunc(listener NodeListener[S])
RemoveListenerByFunc removes a listener from the node by comparing listener values
type ListenableRunnable ¶
type ListenableRunnable[S any] struct { // contains filtered or unexported fields }
ListenableRunnable wraps a StateRunnable with listener capabilities
func (*ListenableRunnable[S]) GetGraph ¶
func (lr *ListenableRunnable[S]) GetGraph() *Exporter[S]
GetGraph returns an Exporter for visualization
func (*ListenableRunnable[S]) GetListenableGraph ¶ added in v0.8.0
func (lr *ListenableRunnable[S]) GetListenableGraph() *ListenableStateGraph[S]
GetListenableGraph returns the underlying ListenableStateGraph
func (*ListenableRunnable[S]) GetTracer ¶ added in v0.8.0
func (lr *ListenableRunnable[S]) GetTracer() *Tracer
GetTracer returns the tracer from the underlying runnable
func (*ListenableRunnable[S]) Invoke ¶
func (lr *ListenableRunnable[S]) Invoke(ctx context.Context, initialState S) (S, error)
Invoke executes the graph with listener notifications
func (*ListenableRunnable[S]) InvokeWithConfig ¶
func (lr *ListenableRunnable[S]) InvokeWithConfig(ctx context.Context, initialState S, config *Config) (S, error)
InvokeWithConfig executes the graph with listener notifications and config
func (*ListenableRunnable[S]) SetTracer ¶ added in v0.8.0
func (lr *ListenableRunnable[S]) SetTracer(tracer *Tracer)
SetTracer sets a tracer for the underlying runnable
func (*ListenableRunnable[S]) Stream ¶ added in v0.8.0
func (lr *ListenableRunnable[S]) Stream(ctx context.Context, initialState S) <-chan StreamEvent[S]
Stream executes the graph with listener notifications and streams events
func (*ListenableRunnable[S]) WithTracer ¶ added in v0.8.0
func (lr *ListenableRunnable[S]) WithTracer(tracer *Tracer) *ListenableRunnable[S]
WithTracer returns a new ListenableRunnableWith the given tracer
type ListenableRunnableMap ¶ added in v0.8.0
type ListenableRunnableMap = ListenableRunnable[map[string]any]
ListenableRunnableMap is an alias for ListenableRunnable[map[string]any].
type ListenableStateGraph ¶
type ListenableStateGraph[S any] struct { *StateGraph[S] // contains filtered or unexported fields }
ListenableStateGraph extends StateGraph with listener capabilities
func NewListenableStateGraph ¶
func NewListenableStateGraph[S any]() *ListenableStateGraph[S]
NewListenableStateGraph creates a new typed state graph with listener support
func (*ListenableStateGraph[S]) AddGlobalListener ¶ added in v0.6.0
func (g *ListenableStateGraph[S]) AddGlobalListener(listener NodeListener[S])
AddGlobalListener adds a listener to all nodes in the graph
func (*ListenableStateGraph[S]) AddNode ¶ added in v0.6.0
func (g *ListenableStateGraph[S]) AddNode(name string, description string, fn func(ctx context.Context, state S) (S, error)) *ListenableNode[S]
AddNode adds a node with listener capabilities
func (*ListenableStateGraph[S]) CompileListenable ¶ added in v0.6.0
func (g *ListenableStateGraph[S]) CompileListenable() (*ListenableRunnable[S], error)
CompileListenable creates a runnable with listener support
func (*ListenableStateGraph[S]) GetListenableNode ¶ added in v0.6.0
func (g *ListenableStateGraph[S]) GetListenableNode(name string) *ListenableNode[S]
GetListenableNode returns the listenable node by name
func (*ListenableStateGraph[S]) RemoveGlobalListener ¶ added in v0.6.0
func (g *ListenableStateGraph[S]) RemoveGlobalListener(listener NodeListener[S])
RemoveGlobalListener removes a listener from all nodes in the graph by function reference
func (*ListenableStateGraph[S]) RemoveGlobalListenerByID ¶ added in v0.8.0
func (g *ListenableStateGraph[S]) RemoveGlobalListenerByID(listenerID string)
RemoveGlobalListenerByID removes a listener from all nodes in the graph by ID
type ListenableStateGraphMap ¶ added in v0.8.0
type ListenableStateGraphMap = ListenableStateGraph[map[string]any]
ListenableStateGraphMap is an alias for ListenableStateGraph[map[string]any].
type LoggingListener ¶
type LoggingListener struct {
// contains filtered or unexported fields
}
LoggingListener provides structured logging for node events
func NewLoggingListener ¶
func NewLoggingListener() *LoggingListener
NewLoggingListener creates a new logging listener
func NewLoggingListenerWithLogger ¶
func NewLoggingListenerWithLogger(logger *log.Logger) *LoggingListener
NewLoggingListenerWithLogger creates a logging listener with custom logger
func (*LoggingListener) OnNodeEvent ¶
func (ll *LoggingListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state map[string]any, err error)
OnNodeEvent implements the NodeListener[map[string]any] interface
func (*LoggingListener) WithLogLevel ¶
func (ll *LoggingListener) WithLogLevel(level LogLevel) *LoggingListener
WithLogLevel sets the minimum log level
func (*LoggingListener) WithState ¶
func (ll *LoggingListener) WithState(enabled bool) *LoggingListener
WithState enables or disables state logging
type MapReduceNode ¶
type MapReduceNode[S any] struct { // contains filtered or unexported fields }
MapReduceNode executes nodes in parallel and reduces results
func NewMapReduceNode ¶
func NewMapReduceNode[S any](name string, reducer func([]S) (S, error), mapNodes ...TypedNode[S]) *MapReduceNode[S]
NewMapReduceNode creates a new map-reduce node
type MapSchema ¶
MapSchema implements StateSchema for map[string]any. It allows defining reducers for specific keys.
func (*MapSchema) RegisterReducer ¶
RegisterReducer adds a reducer for a specific key.
type MermaidOptions ¶
type MermaidOptions struct {
// Direction of the flowchart (e.g., "TD", "LR")
Direction string
}
MermaidOptions defines configuration for Mermaid diagram generation
type MessageWithID ¶
type MessageWithID interface {
GetID() string
GetContent() llms.MessageContent
}
MessageWithID is an interface that allows messages to have an ID for deduplication/upsert. Since langchaingo's MessageContent doesn't have an ID field, we can wrap it or use a custom struct. For now, we'll check if the message implements this interface or is a map with an "id" key.
type MetricsListener ¶
type MetricsListener struct {
// contains filtered or unexported fields
}
MetricsListener collects performance and execution metrics
func NewMetricsListener ¶
func NewMetricsListener() *MetricsListener
NewMetricsListener creates a new metrics listener
func (*MetricsListener) GetNodeAverageDuration ¶
func (ml *MetricsListener) GetNodeAverageDuration() map[string]time.Duration
GetNodeAverageDuration returns the average duration for each node
func (*MetricsListener) GetNodeErrors ¶
func (ml *MetricsListener) GetNodeErrors() map[string]int
GetNodeErrors returns the number of errors for each node
func (*MetricsListener) GetNodeExecutions ¶
func (ml *MetricsListener) GetNodeExecutions() map[string]int
GetNodeExecutions returns the number of executions for each node
func (*MetricsListener) GetTotalExecutions ¶
func (ml *MetricsListener) GetTotalExecutions() int
GetTotalExecutions returns the total number of node executions
func (*MetricsListener) OnNodeEvent ¶
func (ml *MetricsListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, _ map[string]any, _ error)
OnNodeEvent implements the NodeListener[map[string]any] interface
func (*MetricsListener) PrintSummary ¶
func (ml *MetricsListener) PrintSummary(writer io.Writer)
PrintSummary prints a summary of collected metrics
func (*MetricsListener) Reset ¶
func (ml *MetricsListener) Reset()
Reset clears all collected metrics
type NoOpCallbackHandler ¶
type NoOpCallbackHandler struct{}
NoOpCallbackHandler provides a no-op implementation of CallbackHandler
func (*NoOpCallbackHandler) OnChainEnd ¶
func (*NoOpCallbackHandler) OnChainError ¶
func (n *NoOpCallbackHandler) OnChainError(ctx context.Context, err error, runID string)
func (*NoOpCallbackHandler) OnChainStart ¶
func (*NoOpCallbackHandler) OnLLMEnd ¶
func (n *NoOpCallbackHandler) OnLLMEnd(ctx context.Context, response any, runID string)
func (*NoOpCallbackHandler) OnLLMError ¶
func (n *NoOpCallbackHandler) OnLLMError(ctx context.Context, err error, runID string)
func (*NoOpCallbackHandler) OnLLMStart ¶
func (*NoOpCallbackHandler) OnRetrieverEnd ¶
func (n *NoOpCallbackHandler) OnRetrieverEnd(ctx context.Context, documents []any, runID string)
func (*NoOpCallbackHandler) OnRetrieverError ¶
func (n *NoOpCallbackHandler) OnRetrieverError(ctx context.Context, err error, runID string)
func (*NoOpCallbackHandler) OnRetrieverStart ¶
func (*NoOpCallbackHandler) OnToolEnd ¶
func (n *NoOpCallbackHandler) OnToolEnd(ctx context.Context, output string, runID string)
func (*NoOpCallbackHandler) OnToolError ¶
func (n *NoOpCallbackHandler) OnToolError(ctx context.Context, err error, runID string)
type NodeEvent ¶
type NodeEvent string
NodeEvent represents different types of node events
const ( // NodeEventStart indicates a node has started execution NodeEventStart NodeEvent = "start" // NodeEventProgress indicates progress during node execution NodeEventProgress NodeEvent = "progress" // NodeEventComplete indicates a node has completed successfully NodeEventComplete NodeEvent = "complete" // NodeEventError indicates a node encountered an error NodeEventError NodeEvent = "error" // EventChainStart indicates the graph execution has started EventChainStart NodeEvent = "chain_start" // EventChainEnd indicates the graph execution has completed EventChainEnd NodeEvent = "chain_end" // EventToolStart indicates a tool execution has started EventToolStart NodeEvent = "tool_start" // EventToolEnd indicates a tool execution has completed EventToolEnd NodeEvent = "tool_end" // EventLLMStart indicates an LLM call has started EventLLMStart NodeEvent = "llm_start" // EventLLMEnd indicates an LLM call has completed EventLLMEnd NodeEvent = "llm_end" // EventToken indicates a generated token (for streaming) EventToken NodeEvent = "token" // EventCustom indicates a custom user-defined event EventCustom NodeEvent = "custom" )
type NodeInterrupt ¶
type NodeInterrupt struct {
// Node is the name of the node that triggered the interrupt
Node string
// Value is the data/query provided by the interrupt
Value any
}
NodeInterrupt is returned when a node requests an interrupt (e.g. waiting for human input).
func (*NodeInterrupt) Error ¶
func (e *NodeInterrupt) Error() string
type NodeListener ¶
type NodeListener[S any] interface { // OnNodeEvent is called when a node event occurs OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state S, err error) }
NodeListener defines the interface for typed node event listeners
type NodeListenerFunc ¶
type NodeListenerFunc[S any] func(ctx context.Context, event NodeEvent, nodeName string, state S, err error)
NodeListenerFunc is a function adapter for NodeListener
func (NodeListenerFunc[S]) OnNodeEvent ¶
func (f NodeListenerFunc[S]) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state S, err error)
OnNodeEvent implements the NodeListener interface
type ParallelNode ¶
type ParallelNode[S any] struct { // contains filtered or unexported fields }
ParallelNode represents a set of nodes that can execute in parallel
func NewParallelNode ¶
func NewParallelNode[S any](name string, nodes ...TypedNode[S]) *ParallelNode[S]
NewParallelNode creates a new parallel node
type ProgressListener ¶
type ProgressListener struct {
// contains filtered or unexported fields
}
ProgressListener provides progress tracking with customizable output
func NewProgressListener ¶
func NewProgressListener() *ProgressListener
NewProgressListener creates a new progress listener
func NewProgressListenerWithWriter ¶
func NewProgressListenerWithWriter(writer io.Writer) *ProgressListener
NewProgressListenerWithWriter creates a progress listener with custom writer
func (*ProgressListener) OnNodeEvent ¶
func (pl *ProgressListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state map[string]any, err error)
OnNodeEvent implements the NodeListener[map[string]any] interface
func (*ProgressListener) SetNodeStep ¶
func (pl *ProgressListener) SetNodeStep(nodeName, step string)
SetNodeStep sets a custom message for a specific node
func (*ProgressListener) WithDetails ¶
func (pl *ProgressListener) WithDetails(enabled bool) *ProgressListener
WithDetails enables or disables detailed output
func (*ProgressListener) WithPrefix ¶
func (pl *ProgressListener) WithPrefix(prefix string) *ProgressListener
WithPrefix sets a custom prefix for progress messages
func (*ProgressListener) WithTiming ¶
func (pl *ProgressListener) WithTiming(enabled bool) *ProgressListener
WithTiming enables or disables timing information
type RateLimiter ¶
type RateLimiter[S any] struct { // contains filtered or unexported fields }
RateLimiter implements rate limiting for nodes
func NewRateLimiter ¶
NewRateLimiter creates a new rate limiter
type RecursiveSubgraph ¶
type RecursiveSubgraph[S any] struct { // contains filtered or unexported fields }
RecursiveSubgraph allows a subgraph to call itself recursively
func NewRecursiveSubgraph ¶
func NewRecursiveSubgraph[S any]( name string, maxDepth int, condition func(S, int) bool, ) *RecursiveSubgraph[S]
NewRecursiveSubgraph creates a new recursive subgraph
type Reducer ¶
Reducer defines how a state value should be updated. It takes the current value and the new value, and returns the merged value.
type RetryConfig ¶
type RetryConfig struct {
MaxAttempts int
InitialDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
RetryableErrors func(error) bool // Determines if an error should trigger retry
}
RetryConfig configures retry behavior for nodes
func DefaultRetryConfig ¶
func DefaultRetryConfig() *RetryConfig
DefaultRetryConfig returns a default retry configuration
type RetryNode ¶
type RetryNode[S any] struct { // contains filtered or unexported fields }
RetryNode wraps a node with retry logic
func NewRetryNode ¶
func NewRetryNode[S any](node TypedNode[S], config *RetryConfig) *RetryNode[S]
NewRetryNode creates a new retry node
type RetryPolicy ¶
type RetryPolicy struct {
MaxRetries int
BackoffStrategy BackoffStrategy
RetryableErrors []string
}
RetryPolicy defines how to handle node failures
type Runnable ¶
type Runnable = StateRunnable[map[string]any]
Runnable is an alias for StateRunnable[map[string]any] for convenience.
type StateGraph ¶
type StateGraph[S any] struct { // Schema defines the state structure and update logic Schema StateSchema[S] // contains filtered or unexported fields }
StateGraph represents a generic state-based graph with compile-time type safety. The type parameter S represents the state type, which is typically a struct.
Example usage:
type MyState struct {
Count int
Name string
}
g := graph.NewStateGraph[MyState]()
g.AddNode("increment", "Increment counter", func(ctx context.Context, state MyState) (MyState, error) {
state.Count++
return state, nil
})
Example ¶
package main
import (
"context"
"fmt"
"os"
"github.com/smallnest/langgraphgo/graph"
"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/openai"
)
func main() {
// Skip if no OpenAI API key is available
if os.Getenv("OPENAI_API_KEY") == "" {
fmt.Println("[{human [{What is 1 + 1?}]} {ai [{1 + 1 equals 2.}]}]")
return
}
model, err := openai.New()
if err != nil {
panic(err)
}
g := graph.NewStateGraph[[]llms.MessageContent]()
g.AddNode("oracle", "oracle", func(ctx context.Context, state []llms.MessageContent) ([]llms.MessageContent, error) {
r, err := model.GenerateContent(ctx, state, llms.WithTemperature(0.0))
if err != nil {
return nil, err
}
return append(state,
llms.TextParts("ai", r.Choices[0].Content),
), nil
})
g.AddNode(graph.END, graph.END, func(_ context.Context, state []llms.MessageContent) ([]llms.MessageContent, error) {
return state, nil
})
g.AddEdge("oracle", graph.END)
g.SetEntryPoint("oracle")
runnable, err := g.Compile()
if err != nil {
panic(err)
}
ctx := context.Background()
// Let's run it!
res, err := runnable.Invoke(ctx, []llms.MessageContent{
llms.TextParts("human", "What is 1 + 1?"),
})
if err != nil {
panic(err)
}
fmt.Println(res)
}
func NewMessageGraph
deprecated
func NewMessageGraph() *StateGraph[map[string]any]
NewMessageGraph creates a new instance of StateGraph[map[string]any] with a default schema that handles "messages" using the AddMessages reducer. This is the recommended constructor for chat-based agents that use map[string]any as state with a "messages" key.
Deprecated: Use NewStateGraph[MessageState]() for type-safe state management.
func NewStateGraph ¶
func NewStateGraph[S any]() *StateGraph[S]
NewStateGraph creates a new instance of StateGraph with type safety. The type parameter S specifies the state type.
Example:
g := graph.NewStateGraph[MyState]()
func (*StateGraph[S]) AddConditionalEdge ¶
func (g *StateGraph[S]) AddConditionalEdge(from string, condition func(ctx context.Context, state S) string)
AddConditionalEdge adds a conditional edge where the target node is determined at runtime. The condition function is fully typed - no type assertions needed!
Example:
g.AddConditionalEdge("check", func(ctx context.Context, state MyState) string {
if state.Count > 10 { // Type-safe access!
return "high"
}
return "low"
})
func (*StateGraph[S]) AddEdge ¶
func (g *StateGraph[S]) AddEdge(from, to string)
AddEdge adds a new edge to the state graph between the "from" and "to" nodes.
func (*StateGraph[S]) AddMapReduceNode ¶ added in v0.6.0
func (g *StateGraph[S]) AddMapReduceNode( name string, mapFunctions map[string]func(context.Context, S) (S, error), reducer func([]S) (S, error), )
AddMapReduceNode adds a map-reduce pattern node
func (*StateGraph[S]) AddNode ¶
func (g *StateGraph[S]) AddNode(name string, description string, fn func(ctx context.Context, state S) (S, error))
AddNode adds a new node to the state graph with the given name, description and function. The node function is fully typed - no type assertions needed!
Example:
g.AddNode("process", "Process data", func(ctx context.Context, state MyState) (MyState, error) {
state.Count++ // Type-safe access!
return state, nil
})
func (*StateGraph[S]) AddNodeWithCircuitBreaker ¶ added in v0.6.0
func (g *StateGraph[S]) AddNodeWithCircuitBreaker( name string, description string, fn func(context.Context, S) (S, error), config CircuitBreakerConfig, )
AddNodeWithCircuitBreaker adds a node with circuit breaker
func (*StateGraph[S]) AddNodeWithRateLimit ¶ added in v0.6.0
func (g *StateGraph[S]) AddNodeWithRateLimit( name string, description string, fn func(context.Context, S) (S, error), maxCalls int, window time.Duration, )
AddNodeWithRateLimit adds a node with rate limiting
func (*StateGraph[S]) AddNodeWithRetry ¶ added in v0.6.0
func (g *StateGraph[S]) AddNodeWithRetry( name string, description string, fn func(context.Context, S) (S, error), config *RetryConfig, )
AddNodeWithRetry adds a node with retry logic
func (*StateGraph[S]) AddNodeWithTimeout ¶ added in v0.6.0
func (g *StateGraph[S]) AddNodeWithTimeout( name string, description string, fn func(context.Context, S) (S, error), timeout time.Duration, )
AddNodeWithTimeout adds a node with timeout
func (*StateGraph[S]) AddParallelNodes ¶ added in v0.6.0
func (g *StateGraph[S]) AddParallelNodes( groupName string, nodes map[string]func(context.Context, S) (S, error), merger func([]S) S, )
AddParallelNodes adds a set of nodes that execute in parallel. merger is used to combine the results from parallel execution into a single state S.
func (*StateGraph[S]) Compile ¶
func (g *StateGraph[S]) Compile() (*StateRunnable[S], error)
Compile compiles the state graph and returns a StateRunnable instance.
func (*StateGraph[S]) FanOutFanIn ¶ added in v0.6.0
func (g *StateGraph[S]) FanOutFanIn( source string, _ []string, collector string, workerFuncs map[string]func(context.Context, S) (S, error), aggregator func([]S) S, collectFunc func(S) (S, error), )
FanOutFanIn creates a fan-out/fan-in pattern. aggregator merges worker results into a state S that is passed to the collector.
func (*StateGraph[S]) SetEntryPoint ¶
func (g *StateGraph[S]) SetEntryPoint(name string)
SetEntryPoint sets the entry point node name for the state graph.
func (*StateGraph[S]) SetRetryPolicy ¶
func (g *StateGraph[S]) SetRetryPolicy(policy *RetryPolicy)
SetRetryPolicy sets the retry policy for the graph.
func (*StateGraph[S]) SetSchema ¶
func (g *StateGraph[S]) SetSchema(schema StateSchema[S])
SetSchema sets the state schema for the graph.
func (*StateGraph[S]) SetStateMerger ¶
func (g *StateGraph[S]) SetStateMerger(merger TypedStateMerger[S])
SetStateMerger sets the state merger function for the state graph.
type StateGraphMap ¶ added in v0.8.0
type StateGraphMap = StateGraph[map[string]any]
StateGraphMap is an alias for StateGraph[map[string]any] for convenience. Use NewStateGraph[map[string]any]() or NewStateGraph[S]() for other types.
type StateRunnable ¶
type StateRunnable[S any] struct { // contains filtered or unexported fields }
StateRunnable represents a compiled state graph that can be invoked with type safety.
func (*StateRunnable[S]) GetTracer ¶ added in v0.8.0
func (r *StateRunnable[S]) GetTracer() *Tracer
GetTracer returns the current tracer.
func (*StateRunnable[S]) Invoke ¶
func (r *StateRunnable[S]) Invoke(ctx context.Context, initialState S) (S, error)
Invoke executes the compiled state graph with the given input state. Returns the final state with full type safety - no type assertions needed!
Example:
initialState := MyState{Count: 0}
finalState, err := app.Invoke(ctx, initialState)
// finalState is MyState type - no casting needed!
func (*StateRunnable[S]) InvokeWithConfig ¶
func (r *StateRunnable[S]) InvokeWithConfig(ctx context.Context, initialState S, config *Config) (S, error)
InvokeWithConfig executes the compiled state graph with the given input state and config.
func (*StateRunnable[S]) SetTracer ¶ added in v0.6.0
func (r *StateRunnable[S]) SetTracer(tracer *Tracer)
SetTracer sets a tracer for observability.
func (*StateRunnable[S]) WithTracer ¶ added in v0.6.0
func (r *StateRunnable[S]) WithTracer(tracer *Tracer) *StateRunnable[S]
WithTracer returns a new StateRunnable with the given tracer.
type StateSchema ¶
type StateSchema[S any] interface { // Init returns the initial state. Init() S // Update merges the new state into the current state. Update(current, new S) (S, error) }
StateSchema defines the structure and update logic for the graph state with type safety.
type StateSnapshot ¶
type StateSnapshot struct {
Values any
Next []string
Config Config
Metadata map[string]any
CreatedAt time.Time
ParentID string
}
StateSnapshot represents a snapshot of the graph state
type StateTracedRunnable ¶ added in v0.8.0
type StateTracedRunnable[S any] struct { // contains filtered or unexported fields }
StateTracedRunnable[S] wraps a StateRunnable[S] with tracing capabilities
func NewStateTracedRunnable ¶ added in v0.8.0
func NewStateTracedRunnable[S any](runnable *StateRunnable[S], tracer *Tracer) *StateTracedRunnable[S]
NewStateTracedRunnable creates a new generic traced runnable
func (*StateTracedRunnable[S]) GetTracer ¶ added in v0.8.0
func (tr *StateTracedRunnable[S]) GetTracer() *Tracer
GetTracer returns the tracer instance
type StreamConfig ¶
type StreamConfig struct {
// BufferSize is the size of the event channel buffer
BufferSize int
// EnableBackpressure determines if backpressure handling is enabled
EnableBackpressure bool
// MaxDroppedEvents is the maximum number of events to drop before logging
MaxDroppedEvents int
// Mode specifies what kind of events to stream
Mode StreamMode
}
StreamConfig configures streaming behavior
func DefaultStreamConfig ¶
func DefaultStreamConfig() StreamConfig
DefaultStreamConfig returns the default streaming configuration
type StreamEvent ¶
type StreamEvent[S any] struct { // Timestamp when the event occurred Timestamp time.Time // NodeName is the name of the node that generated the event NodeName string // Event is the type of event Event NodeEvent // State is the current state at the time of the event (typed) State S // Error contains any error that occurred (if Event is NodeEventError) Error error // Metadata contains additional event-specific data Metadata map[string]any // Duration is how long the node took (only for Complete events) Duration time.Duration }
StreamEvent represents a typed event in the streaming execution
type StreamMode ¶
type StreamMode string
StreamMode defines the mode of streaming
const ( // StreamModeValues emits the full state after each step StreamModeValues StreamMode = "values" // StreamModeUpdates emits the updates (deltas) from each node StreamModeUpdates StreamMode = "updates" // StreamModeMessages emits LLM messages/tokens (if available) StreamModeMessages StreamMode = "messages" // StreamModeDebug emits all events (default) StreamModeDebug StreamMode = "debug" )
type StreamResult ¶
type StreamResult[S any] struct { // Events channel receives StreamEvent objects in real-time Events <-chan StreamEvent[S] // Result channel receives the final result when execution completes Result <-chan S // Errors channel receives any errors that occur during execution Errors <-chan error // Done channel is closed when streaming is complete Done <-chan struct{} // Cancel function can be called to stop streaming Cancel context.CancelFunc }
StreamResult contains the channels returned by streaming execution
type StreamingExecutor ¶
type StreamingExecutor[S any] struct { // contains filtered or unexported fields }
StreamingExecutor[S] provides a high-level interface for streaming execution
func NewStreamingExecutor ¶
func NewStreamingExecutor[S any](runnable *StreamingRunnable[S]) *StreamingExecutor[S]
NewStreamingExecutor creates a new streaming executor
func (*StreamingExecutor[S]) ExecuteAsync ¶
func (se *StreamingExecutor[S]) ExecuteAsync(ctx context.Context, initialState S) *StreamResult[S]
ExecuteAsync executes the graph asynchronously and returns immediately
func (*StreamingExecutor[S]) ExecuteWithCallback ¶
func (se *StreamingExecutor[S]) ExecuteWithCallback( ctx context.Context, initialState S, eventCallback func(event StreamEvent[S]), resultCallback func(result S, err error), ) error
ExecuteWithCallback executes the graph and calls the callback for each event
type StreamingListener ¶
type StreamingListener[S any] struct { // contains filtered or unexported fields }
StreamingListener implements NodeListener for streaming events
func NewStreamingListener ¶
func NewStreamingListener[S any](eventChan chan<- StreamEvent[S], config StreamConfig) *StreamingListener[S]
NewStreamingListener creates a new streaming listener
func (*StreamingListener[S]) Close ¶
func (sl *StreamingListener[S]) Close()
Close marks the listener as closed to prevent sending to closed channels
func (*StreamingListener[S]) GetDroppedEventsCount ¶
func (sl *StreamingListener[S]) GetDroppedEventsCount() int
GetDroppedEventsCount returns the number of dropped events
func (*StreamingListener[S]) OnNodeEvent ¶
func (sl *StreamingListener[S]) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state S, err error)
OnNodeEvent implements the NodeListener interface
type StreamingRunnable ¶
type StreamingRunnable[S any] struct { // contains filtered or unexported fields }
StreamingRunnable wraps a ListenableRunnable with streaming capabilities
func NewStreamingRunnable ¶
func NewStreamingRunnable[S any](runnable *ListenableRunnable[S], config StreamConfig) *StreamingRunnable[S]
NewStreamingRunnable creates a new streaming runnable
func NewStreamingRunnableWithDefaults ¶
func NewStreamingRunnableWithDefaults[S any](runnable *ListenableRunnable[S]) *StreamingRunnable[S]
NewStreamingRunnableWithDefaults creates a streaming runnable with default config
func (*StreamingRunnable[S]) GetGraph ¶
func (sr *StreamingRunnable[S]) GetGraph() *Exporter[S]
GetGraph returns a Exporter for the streaming runnable
func (*StreamingRunnable[S]) GetTracer ¶ added in v0.8.0
func (sr *StreamingRunnable[S]) GetTracer() *Tracer
GetTracer returns the tracer from the underlying runnable
func (*StreamingRunnable[S]) SetTracer ¶ added in v0.8.0
func (sr *StreamingRunnable[S]) SetTracer(tracer *Tracer)
SetTracer sets the tracer on the underlying runnable
func (*StreamingRunnable[S]) Stream ¶
func (sr *StreamingRunnable[S]) Stream(ctx context.Context, initialState S) *StreamResult[S]
Stream executes the graph with real-time event streaming
func (*StreamingRunnable[S]) WithTracer ¶ added in v0.8.0
func (sr *StreamingRunnable[S]) WithTracer(tracer *Tracer) *StreamingRunnable[S]
WithTracer returns a new StreamingRunnable with the given tracer
type StreamingStateGraph ¶ added in v0.6.0
type StreamingStateGraph[S any] struct { *ListenableStateGraph[S] // contains filtered or unexported fields }
StreamingStateGraph[S any] extends ListenableStateGraph[S] with streaming capabilities
func NewStreamingStateGraph ¶ added in v0.6.0
func NewStreamingStateGraph[S any]() *StreamingStateGraph[S]
NewStreamingStateGraph creates a new streaming state graph with type parameter
func NewStreamingStateGraphWithConfig ¶ added in v0.6.0
func NewStreamingStateGraphWithConfig[S any](config StreamConfig) *StreamingStateGraph[S]
NewStreamingStateGraphWithConfig creates a streaming graph with custom config
func (*StreamingStateGraph[S]) CompileStreaming ¶ added in v0.6.0
func (g *StreamingStateGraph[S]) CompileStreaming() (*StreamingRunnable[S], error)
CompileStreaming compiles the graph into a streaming runnable
func (*StreamingStateGraph[S]) GetStreamConfig ¶ added in v0.6.0
func (g *StreamingStateGraph[S]) GetStreamConfig() StreamConfig
GetStreamConfig returns the current streaming configuration
func (*StreamingStateGraph[S]) SetStreamConfig ¶ added in v0.6.0
func (g *StreamingStateGraph[S]) SetStreamConfig(config StreamConfig)
SetStreamConfig updates the streaming configuration
type StructSchema ¶ added in v0.6.0
StructSchema implements StateSchema for struct-based states. It provides a simple and type-safe way to manage struct states.
Example:
type MyState struct {
Count int
Logs []string
}
schema := graph.NewStructSchema(
MyState{Count: 0},
func(current, new MyState) (MyState, error) {
// Merge logs (append)
current.Logs = append(current.Logs, new.Logs...)
// Add counts
current.Count += new.Count
return current, nil
},
)
func NewStructSchema ¶ added in v0.6.0
func NewStructSchema[S any](initial S, merge func(S, S) (S, error)) *StructSchema[S]
NewStructSchema creates a new StructSchema with the given initial value and merge function. If merge function is nil, a default merge function will be used that overwrites non-zero fields.
func (*StructSchema[S]) Init ¶ added in v0.6.0
func (s *StructSchema[S]) Init() S
Init returns the initial state.
func (*StructSchema[S]) Update ¶ added in v0.6.0
func (s *StructSchema[S]) Update(current, new S) (S, error)
Update merges the new state into the current state using the merge function.
type Subgraph ¶
type Subgraph[S any] struct { // contains filtered or unexported fields }
Subgraph represents a nested graph that can be used as a node
func NewSubgraph ¶
func NewSubgraph[S any](name string, graph *StateGraph[S]) (*Subgraph[S], error)
NewSubgraph creates a new generic subgraph
type TimeoutNode ¶
type TimeoutNode[S any] struct { // contains filtered or unexported fields }
TimeoutNode wraps a node with timeout logic
func NewTimeoutNode ¶
func NewTimeoutNode[S any](node TypedNode[S], timeout time.Duration) *TimeoutNode[S]
NewTimeoutNode creates a new timeout node
type TraceEvent ¶
type TraceEvent string
TraceEvent represents different types of events in graph execution
const ( // TraceEventGraphStart indicates the start of graph execution TraceEventGraphStart TraceEvent = "graph_start" // TraceEventGraphEnd indicates the end of graph execution TraceEventGraphEnd TraceEvent = "graph_end" // TraceEventNodeStart indicates the start of node execution TraceEventNodeStart TraceEvent = "node_start" // TraceEventNodeEnd indicates the end of node execution TraceEventNodeEnd TraceEvent = "node_end" // TraceEventNodeError indicates an error occurred in node execution TraceEventNodeError TraceEvent = "node_error" // TraceEventEdgeTraversal indicates traversal from one node to another TraceEventEdgeTraversal TraceEvent = "edge_traversal" )
type TraceHook ¶
type TraceHook interface {
// OnEvent is called when a trace event occurs
OnEvent(ctx context.Context, span *TraceSpan)
}
TraceHook defines the interface for trace event handlers
type TraceHookFunc ¶
TraceHookFunc is a function adapter for TraceHook
type TraceSpan ¶
type TraceSpan struct {
// ID is a unique identifier for this span
ID string
// ParentID is the ID of the parent span (empty for root spans)
ParentID string
// Event indicates the type of event this span represents
Event TraceEvent
// NodeName is the name of the node being executed (if applicable)
NodeName string
// FromNode is the source node for edge traversals
FromNode string
// ToNode is the destination node for edge traversals
ToNode string
// StartTime is when this span began
StartTime time.Time
// EndTime is when this span completed (zero for ongoing spans)
EndTime time.Time
// Duration is the total time taken (calculated when span ends)
Duration time.Duration
// State is a snapshot of the state at this point (optional)
State any
// Error contains any error that occurred during execution
Error error
// Metadata contains additional key-value pairs for observability
Metadata map[string]any
}
TraceSpan represents a span of execution with timing and metadata
func SpanFromContext ¶
SpanFromContext extracts a span from context
type Tracer ¶
type Tracer struct {
// contains filtered or unexported fields
}
Tracer manages trace collection and hooks