Documentation
¶
Index ¶
- Constants
- Variables
- func AddMessages(current, new interface{}) (interface{}, error)
- func AppendReducer(current, new interface{}) (interface{}, error)
- func ContextWithSpan(ctx context.Context, span *TraceSpan) context.Context
- func ExponentialBackoffRetry(ctx context.Context, fn func() (interface{}, error), maxAttempts int, ...) (interface{}, error)
- func GetResumeValue(ctx context.Context) interface{}
- func Interrupt(ctx context.Context, value interface{}) (interface{}, error)
- func OverwriteReducer(current, new interface{}) (interface{}, error)
- func WithConfig(ctx context.Context, config *Config) context.Context
- func WithResumeValue(ctx context.Context, value interface{}) context.Context
- type BackoffStrategy
- type CallbackHandler
- type ChatListener
- type Checkpoint
- type CheckpointConfig
- type CheckpointListener
- type CheckpointStore
- type CheckpointableMessageGraph
- type CheckpointableRunnable
- func (cr *CheckpointableRunnable) ClearCheckpoints(ctx context.Context) error
- func (cr *CheckpointableRunnable) GetState(ctx context.Context, config *Config) (*StateSnapshot, error)
- func (cr *CheckpointableRunnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)
- func (cr *CheckpointableRunnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)
- func (cr *CheckpointableRunnable) ListCheckpoints(ctx context.Context) ([]*Checkpoint, error)
- func (cr *CheckpointableRunnable) LoadCheckpoint(ctx context.Context, checkpointID string) (*Checkpoint, error)
- func (cr *CheckpointableRunnable) ResumeFromCheckpoint(ctx context.Context, checkpointID string) (interface{}, error)
- func (cr *CheckpointableRunnable) SaveCheckpoint(ctx context.Context, nodeName string, state interface{}) error
- func (cr *CheckpointableRunnable) UpdateState(ctx context.Context, config *Config, values interface{}, asNode string) (*Config, error)
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerState
- type CleaningStateSchema
- type Command
- type CompositeGraph
- type Config
- type Edge
- type Event
- type EventEmitter
- type EventListener
- type Exporter
- type FileCheckpointStore
- func (f *FileCheckpointStore) Clear(_ context.Context, executionID string) error
- func (f *FileCheckpointStore) Delete(_ context.Context, checkpointID string) error
- func (f *FileCheckpointStore) List(_ context.Context, _ string) ([]*Checkpoint, error)
- func (f *FileCheckpointStore) Load(_ context.Context, checkpointID string) (*Checkpoint, error)
- func (f *FileCheckpointStore) Save(_ context.Context, checkpoint *Checkpoint) error
- type GraphCallbackHandler
- type GraphInterrupt
- type ListenableMessageGraph
- func (g *ListenableMessageGraph) AddGlobalListener(listener NodeListener)
- func (g *ListenableMessageGraph) AddNode(name string, ...) *ListenableNode
- func (g *ListenableMessageGraph) CompileListenable() (*ListenableRunnable, error)
- func (g *ListenableMessageGraph) GetListenableNode(name string) *ListenableNode
- func (g *ListenableMessageGraph) RemoveGlobalListener(listener NodeListener)
- type ListenableNode
- func (ln *ListenableNode) AddListener(listener NodeListener) *ListenableNode
- func (ln *ListenableNode) Execute(ctx context.Context, state interface{}) (interface{}, error)
- func (ln *ListenableNode) GetListeners() []NodeListener
- func (ln *ListenableNode) NotifyListeners(ctx context.Context, event NodeEvent, state interface{}, err error)
- func (ln *ListenableNode) RemoveListener(listener NodeListener)
- type ListenableRunnable
- type ListenableStateGraph
- type LogLevel
- type LoggingListener
- type MapReduceNode
- type MapSchema
- func (s *MapSchema) Cleanup(state interface{}) interface{}
- func (s *MapSchema) Init() interface{}
- func (s *MapSchema) RegisterChannel(key string, reducer Reducer, isEphemeral bool)
- func (s *MapSchema) RegisterReducer(key string, reducer Reducer)
- func (s *MapSchema) Update(current, new interface{}) (interface{}, error)
- type MemoryCheckpointStore
- func (m *MemoryCheckpointStore) Clear(_ context.Context, executionID string) error
- func (m *MemoryCheckpointStore) Delete(_ context.Context, checkpointID string) error
- func (m *MemoryCheckpointStore) List(_ context.Context, executionID string) ([]*Checkpoint, error)
- func (m *MemoryCheckpointStore) Load(_ context.Context, checkpointID string) (*Checkpoint, error)
- func (m *MemoryCheckpointStore) Save(_ context.Context, checkpoint *Checkpoint) error
- type MermaidOptions
- type MessageGraph
- func (g *MessageGraph) AddConditionalEdge(from string, condition func(ctx context.Context, state interface{}) string)
- func (g *MessageGraph) AddEdge(from, to string)
- func (g *MessageGraph) AddMapReduceNode(name string, ...)
- func (g *MessageGraph) AddNestedConditionalSubgraph(name string, router func(interface{}) string, ...) error
- func (g *MessageGraph) AddNode(name string, ...)
- func (g *MessageGraph) AddNodeWithCircuitBreaker(name string, fn func(context.Context, interface{}) (interface{}, error), ...)
- func (g *MessageGraph) AddNodeWithRateLimit(name string, fn func(context.Context, interface{}) (interface{}, error), ...)
- func (g *MessageGraph) AddNodeWithRetry(name string, fn func(context.Context, interface{}) (interface{}, error), ...)
- func (g *MessageGraph) AddNodeWithTimeout(name string, fn func(context.Context, interface{}) (interface{}, error), ...)
- func (g *MessageGraph) AddParallelNodes(groupName string, ...)
- func (g *MessageGraph) AddRecursiveSubgraph(name string, maxDepth int, condition func(interface{}, int) bool, ...)
- func (g *MessageGraph) AddSubgraph(name string, subgraph *MessageGraph) error
- func (g *MessageGraph) Compile() (*Runnable, error)
- func (g *MessageGraph) CreateSubgraph(name string, builder func(*MessageGraph)) error
- func (g *MessageGraph) FanOutFanIn(source string, _ []string, collector string, ...)
- func (g *MessageGraph) SetEntryPoint(name string)
- func (g *MessageGraph) SetSchema(schema StateSchema)
- func (g *MessageGraph) SetStateMerger(merger StateMerger)
- type MessageWithID
- type MetricsListener
- func (ml *MetricsListener) GetNodeAverageDuration() map[string]time.Duration
- func (ml *MetricsListener) GetNodeErrors() map[string]int
- func (ml *MetricsListener) GetNodeExecutions() map[string]int
- func (ml *MetricsListener) GetTotalExecutions() int
- func (ml *MetricsListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, _ interface{}, _ error)
- func (ml *MetricsListener) PrintSummary(writer io.Writer)
- func (ml *MetricsListener) Reset()
- type NoOpCallbackHandler
- func (n *NoOpCallbackHandler) OnChainEnd(ctx context.Context, outputs map[string]interface{}, runID string)
- func (n *NoOpCallbackHandler) OnChainError(ctx context.Context, err error, runID string)
- func (n *NoOpCallbackHandler) OnChainStart(ctx context.Context, serialized map[string]interface{}, ...)
- func (n *NoOpCallbackHandler) OnLLMEnd(ctx context.Context, response interface{}, runID string)
- func (n *NoOpCallbackHandler) OnLLMError(ctx context.Context, err error, runID string)
- func (n *NoOpCallbackHandler) OnLLMStart(ctx context.Context, serialized map[string]interface{}, prompts []string, ...)
- func (n *NoOpCallbackHandler) OnRetrieverEnd(ctx context.Context, documents []interface{}, runID string)
- func (n *NoOpCallbackHandler) OnRetrieverError(ctx context.Context, err error, runID string)
- func (n *NoOpCallbackHandler) OnRetrieverStart(ctx context.Context, serialized map[string]interface{}, query string, ...)
- func (n *NoOpCallbackHandler) OnToolEnd(ctx context.Context, output string, runID string)
- func (n *NoOpCallbackHandler) OnToolError(ctx context.Context, err error, runID string)
- func (n *NoOpCallbackHandler) OnToolStart(ctx context.Context, serialized map[string]interface{}, inputStr string, ...)
- type Node
- type NodeEvent
- type NodeInterrupt
- type NodeListener
- type NodeListenerFunc
- type ParallelNode
- type ProgressListener
- func (pl *ProgressListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state interface{}, ...)
- func (pl *ProgressListener) SetNodeStep(nodeName, step string)
- func (pl *ProgressListener) WithDetails(enabled bool) *ProgressListener
- func (pl *ProgressListener) WithPrefix(prefix string) *ProgressListener
- func (pl *ProgressListener) WithTiming(enabled bool) *ProgressListener
- type RateLimiter
- type RecursiveSubgraph
- type Reducer
- type RetryConfig
- type RetryNode
- type RetryPolicy
- type Runnable
- func (r *Runnable) GetGraph() *Exporter
- func (r *Runnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)
- func (r *Runnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)
- func (r *Runnable) SetTracer(tracer *Tracer)
- func (r *Runnable) WithTracer(tracer *Tracer) *Runnable
- type StateGraph
- func (g *StateGraph) AddConditionalEdge(from string, condition func(ctx context.Context, state interface{}) string)
- func (g *StateGraph) AddEdge(from, to string)
- func (g *StateGraph) AddNode(name string, ...)
- func (g *StateGraph) Compile() (*StateRunnable, error)
- func (g *StateGraph) SetEntryPoint(name string)
- func (g *StateGraph) SetRetryPolicy(policy *RetryPolicy)
- func (g *StateGraph) SetSchema(schema StateSchema)
- func (g *StateGraph) SetStateMerger(merger StateMerger)
- type StateMerger
- type StateRunnable
- type StateSchema
- type StateSnapshot
- type StreamConfig
- type StreamEvent
- type StreamMode
- type StreamResult
- type StreamingExecutor
- type StreamingListener
- func (sl *StreamingListener) Close()
- func (sl *StreamingListener) GetDroppedEventsCount() int
- func (sl *StreamingListener) OnChainEnd(ctx context.Context, outputs map[string]interface{}, runID string)
- func (sl *StreamingListener) OnChainError(ctx context.Context, err error, runID string)
- func (sl *StreamingListener) OnChainStart(ctx context.Context, serialized map[string]interface{}, ...)
- func (sl *StreamingListener) OnGraphStep(ctx context.Context, stepNode string, state interface{})
- func (sl *StreamingListener) OnLLMEnd(ctx context.Context, response interface{}, runID string)
- func (sl *StreamingListener) OnLLMError(ctx context.Context, err error, runID string)
- func (sl *StreamingListener) OnLLMStart(ctx context.Context, serialized map[string]interface{}, prompts []string, ...)
- func (sl *StreamingListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state interface{}, ...)
- func (sl *StreamingListener) OnRetrieverEnd(ctx context.Context, documents []interface{}, runID string)
- func (sl *StreamingListener) OnRetrieverError(ctx context.Context, err error, runID string)
- func (sl *StreamingListener) OnRetrieverStart(ctx context.Context, serialized map[string]interface{}, query string, ...)
- func (sl *StreamingListener) OnToolEnd(ctx context.Context, output string, runID string)
- func (sl *StreamingListener) OnToolError(ctx context.Context, err error, runID string)
- func (sl *StreamingListener) OnToolStart(ctx context.Context, serialized map[string]interface{}, inputStr string, ...)
- type StreamingMessageGraph
- type StreamingRunnable
- type Subgraph
- type TimeoutNode
- type TraceEvent
- type TraceHook
- type TraceHookFunc
- type TraceSpan
- type TracedRunnable
- type Tracer
- func (t *Tracer) AddHook(hook TraceHook)
- func (t *Tracer) Clear()
- func (t *Tracer) EndSpan(ctx context.Context, span *TraceSpan, state interface{}, err error)
- func (t *Tracer) GetSpans() map[string]*TraceSpan
- func (t *Tracer) StartSpan(ctx context.Context, event TraceEvent, nodeName string) *TraceSpan
- func (t *Tracer) TraceEdgeTraversal(ctx context.Context, fromNode, toNode string)
Examples ¶
Constants ¶
const END = "END"
END is a special constant used to represent the end node in the graph.
Variables ¶
var ( // ErrEntryPointNotSet is returned when the entry point of the graph is not set. ErrEntryPointNotSet = errors.New("entry point not set") // ErrNodeNotFound is returned when a node is not found in the graph. ErrNodeNotFound = errors.New("node not found") // ErrNoOutgoingEdge is returned when no outgoing edge is found for a node. ErrNoOutgoingEdge = errors.New("no outgoing edge found for node") )
Functions ¶
func AddMessages ¶
func AddMessages(current, new interface{}) (interface{}, error)
AddMessages is a reducer designed for merging chat messages. It handles ID-based deduplication and upserts. If a new message has the same ID as an existing one, it replaces the existing one. Otherwise, it appends the new message.
func AppendReducer ¶
func AppendReducer(current, new interface{}) (interface{}, error)
AppendReducer appends the new value to the current slice. It supports appending a slice to a slice, or a single element to a slice.
func ContextWithSpan ¶
ContextWithSpan returns a new context with the span stored
func ExponentialBackoffRetry ¶
func ExponentialBackoffRetry( ctx context.Context, fn func() (interface{}, error), maxAttempts int, baseDelay time.Duration, ) (interface{}, error)
ExponentialBackoffRetry implements exponential backoff with jitter
func GetResumeValue ¶
GetResumeValue retrieves the resume value from the context.
func Interrupt ¶
Interrupt pauses execution and waits for input. If resuming, it returns the value provided in the resume command.
func OverwriteReducer ¶
func OverwriteReducer(current, new interface{}) (interface{}, error)
OverwriteReducer replaces the old value with the new one.
func WithConfig ¶
WithConfig adds the config to the context
Types ¶
type BackoffStrategy ¶
type BackoffStrategy int
BackoffStrategy defines different backoff strategies
const ( FixedBackoff BackoffStrategy = iota ExponentialBackoff LinearBackoff )
type CallbackHandler ¶
type CallbackHandler interface {
// Chain callbacks (for graph/workflow execution)
OnChainStart(ctx context.Context, serialized map[string]interface{}, inputs map[string]interface{}, runID string, parentRunID *string, tags []string, metadata map[string]interface{})
OnChainEnd(ctx context.Context, outputs map[string]interface{}, runID string)
OnChainError(ctx context.Context, err error, runID string)
// LLM callbacks (for AI model calls)
OnLLMStart(ctx context.Context, serialized map[string]interface{}, prompts []string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})
OnLLMEnd(ctx context.Context, response interface{}, runID string)
OnLLMError(ctx context.Context, err error, runID string)
// Tool callbacks (for tool/function calls)
OnToolStart(ctx context.Context, serialized map[string]interface{}, inputStr string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})
OnToolEnd(ctx context.Context, output string, runID string)
OnToolError(ctx context.Context, err error, runID string)
// Retriever callbacks (for data retrieval operations)
OnRetrieverStart(ctx context.Context, serialized map[string]interface{}, query string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})
OnRetrieverEnd(ctx context.Context, documents []interface{}, runID string)
OnRetrieverError(ctx context.Context, err error, runID string)
}
CallbackHandler defines the interface for handling graph execution callbacks This matches Python's LangChain callback pattern
type ChatListener ¶
type ChatListener struct {
// contains filtered or unexported fields
}
ChatListener provides real-time chat-style updates
func NewChatListener ¶
func NewChatListener() *ChatListener
NewChatListener creates a new chat-style listener
func NewChatListenerWithWriter ¶
func NewChatListenerWithWriter(writer io.Writer) *ChatListener
NewChatListenerWithWriter creates a chat listener with custom writer
func (*ChatListener) OnNodeEvent ¶
func (cl *ChatListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, _ interface{}, err error)
OnNodeEvent implements the NodeListener interface
func (*ChatListener) SetNodeMessage ¶
func (cl *ChatListener) SetNodeMessage(nodeName, message string)
SetNodeMessage sets a custom message for a specific node
func (*ChatListener) WithTime ¶
func (cl *ChatListener) WithTime(enabled bool) *ChatListener
WithTime enables or disables timestamps
type Checkpoint ¶
type Checkpoint struct {
ID string `json:"id"`
NodeName string `json:"node_name"`
State interface{} `json:"state"`
Metadata map[string]interface{} `json:"metadata"`
Timestamp time.Time `json:"timestamp"`
Version int `json:"version"`
}
Checkpoint represents a saved state at a specific point in execution
type CheckpointConfig ¶
type CheckpointConfig struct {
// Store is the checkpoint storage backend
Store CheckpointStore
// AutoSave enables automatic checkpointing after each node
AutoSave bool
// SaveInterval specifies how often to save (when AutoSave is false)
SaveInterval time.Duration
// MaxCheckpoints limits the number of checkpoints to keep
MaxCheckpoints int
}
CheckpointConfig configures checkpointing behavior
func DefaultCheckpointConfig ¶
func DefaultCheckpointConfig() CheckpointConfig
DefaultCheckpointConfig returns a default checkpoint configuration
type CheckpointListener ¶
type CheckpointListener struct {
// Embed NoOpCallbackHandler to satisfy other CallbackHandler methods
NoOpCallbackHandler
// contains filtered or unexported fields
}
CheckpointListener automatically creates checkpoints during execution
func (*CheckpointListener) OnGraphStep ¶
func (cl *CheckpointListener) OnGraphStep(ctx context.Context, stepNode string, state interface{})
OnGraphStep implements GraphCallbackHandler
type CheckpointStore ¶
type CheckpointStore interface {
// Save stores a checkpoint
Save(ctx context.Context, checkpoint *Checkpoint) error
// Load retrieves a checkpoint by ID
Load(ctx context.Context, checkpointID string) (*Checkpoint, error)
// List returns all checkpoints for a given execution
List(ctx context.Context, executionID string) ([]*Checkpoint, error)
// Delete removes a checkpoint
Delete(ctx context.Context, checkpointID string) error
// Clear removes all checkpoints for an execution
Clear(ctx context.Context, executionID string) error
}
CheckpointStore defines the interface for checkpoint persistence
type CheckpointableMessageGraph ¶
type CheckpointableMessageGraph struct {
*ListenableMessageGraph
// contains filtered or unexported fields
}
CheckpointableMessageGraph extends ListenableMessageGraph with checkpointing
func NewCheckpointableMessageGraph ¶
func NewCheckpointableMessageGraph() *CheckpointableMessageGraph
NewCheckpointableMessageGraph creates a new checkpointable message graph
func NewCheckpointableMessageGraphWithConfig ¶
func NewCheckpointableMessageGraphWithConfig(config CheckpointConfig) *CheckpointableMessageGraph
NewCheckpointableMessageGraphWithConfig creates a checkpointable graph with custom config
func (*CheckpointableMessageGraph) CompileCheckpointable ¶
func (g *CheckpointableMessageGraph) CompileCheckpointable() (*CheckpointableRunnable, error)
CompileCheckpointable compiles the graph into a checkpointable runnable
func (*CheckpointableMessageGraph) GetCheckpointConfig ¶
func (g *CheckpointableMessageGraph) GetCheckpointConfig() CheckpointConfig
GetCheckpointConfig returns the current checkpointing configuration
func (*CheckpointableMessageGraph) SetCheckpointConfig ¶
func (g *CheckpointableMessageGraph) SetCheckpointConfig(config CheckpointConfig)
SetCheckpointConfig updates the checkpointing configuration
type CheckpointableRunnable ¶
type CheckpointableRunnable struct {
// contains filtered or unexported fields
}
CheckpointableRunnable wraps a runnable with checkpointing capabilities
func NewCheckpointableRunnable ¶
func NewCheckpointableRunnable(runnable *ListenableRunnable, config CheckpointConfig) *CheckpointableRunnable
NewCheckpointableRunnable creates a new checkpointable runnable
func (*CheckpointableRunnable) ClearCheckpoints ¶
func (cr *CheckpointableRunnable) ClearCheckpoints(ctx context.Context) error
ClearCheckpoints removes all checkpoints for this execution
func (*CheckpointableRunnable) GetState ¶
func (cr *CheckpointableRunnable) GetState(ctx context.Context, config *Config) (*StateSnapshot, error)
GetState retrieves the state for the given config
func (*CheckpointableRunnable) Invoke ¶
func (cr *CheckpointableRunnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)
Invoke executes the graph with checkpointing
func (*CheckpointableRunnable) InvokeWithConfig ¶
func (cr *CheckpointableRunnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)
InvokeWithConfig executes the graph with checkpointing and config
func (*CheckpointableRunnable) ListCheckpoints ¶
func (cr *CheckpointableRunnable) ListCheckpoints(ctx context.Context) ([]*Checkpoint, error)
ListCheckpoints returns all checkpoints for this execution
func (*CheckpointableRunnable) LoadCheckpoint ¶
func (cr *CheckpointableRunnable) LoadCheckpoint(ctx context.Context, checkpointID string) (*Checkpoint, error)
LoadCheckpoint loads a specific checkpoint
func (*CheckpointableRunnable) ResumeFromCheckpoint ¶
func (cr *CheckpointableRunnable) ResumeFromCheckpoint(ctx context.Context, checkpointID string) (interface{}, error)
ResumeFromCheckpoint resumes execution from a specific checkpoint
func (*CheckpointableRunnable) SaveCheckpoint ¶
func (cr *CheckpointableRunnable) SaveCheckpoint(ctx context.Context, nodeName string, state interface{}) error
SaveCheckpoint manually saves a checkpoint
func (*CheckpointableRunnable) UpdateState ¶
func (cr *CheckpointableRunnable) UpdateState(ctx context.Context, config *Config, values interface{}, asNode string) (*Config, error)
UpdateState updates the state for the given config
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern
func NewCircuitBreaker ¶
func NewCircuitBreaker(node Node, config CircuitBreakerConfig) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
FailureThreshold int // Number of failures before opening
SuccessThreshold int // Number of successes before closing
Timeout time.Duration // Time before attempting to close
HalfOpenMaxCalls int // Max calls in half-open state
}
CircuitBreakerConfig configures circuit breaker behavior
type CircuitBreakerState ¶
type CircuitBreakerState int
CircuitBreakerState represents the state of a circuit breaker
const ( CircuitClosed CircuitBreakerState = iota CircuitOpen CircuitHalfOpen )
type CleaningStateSchema ¶
type CleaningStateSchema interface {
StateSchema
// Cleanup performs any necessary cleanup on the state after a step.
Cleanup(state interface{}) interface{}
}
CleaningStateSchema extends StateSchema with cleanup capabilities. This allows implementing ephemeral channels (values that are cleared after each step).
type Command ¶
type Command struct {
// Update is the value to update the state with.
// It will be processed by the schema's reducers.
Update interface{}
// Goto specifies the next node(s) to execute.
// If set, it overrides the graph's edges.
// Can be a single string (node name) or []string.
Goto interface{}
}
Command allows a node to dynamically update the state and control the flow. It can be returned by a node function instead of a direct state update.
type CompositeGraph ¶
type CompositeGraph struct {
// contains filtered or unexported fields
}
CompositeGraph allows composing multiple graphs together
func NewCompositeGraph ¶
func NewCompositeGraph() *CompositeGraph
NewCompositeGraph creates a new composite graph
func (*CompositeGraph) AddGraph ¶
func (cg *CompositeGraph) AddGraph(name string, graph *MessageGraph)
AddGraph adds a named graph to the composite
func (*CompositeGraph) Compile ¶
func (cg *CompositeGraph) Compile() (*Runnable, error)
Compile compiles the composite graph into a single runnable
type Config ¶
type Config struct {
// Callbacks to be invoked during execution
Callbacks []CallbackHandler `json:"callbacks"`
// Metadata to attach to the execution
Metadata map[string]interface{} `json:"metadata"`
// Tags to categorize the execution
Tags []string `json:"tags"`
// Configurable parameters for the execution
Configurable map[string]interface{} `json:"configurable"`
// RunName for this execution
RunName string `json:"run_name"`
// Timeout for the execution
Timeout *time.Duration `json:"timeout"`
// InterruptBefore nodes to stop before execution
InterruptBefore []string `json:"interrupt_before"`
// InterruptAfter nodes to stop after execution
InterruptAfter []string `json:"interrupt_after"`
// ResumeFrom nodes to start execution from (bypassing entry point)
ResumeFrom []string `json:"resume_from"`
// ResumeValue provides the value to return from an Interrupt() call when resuming
ResumeValue interface{} `json:"resume_value"`
}
Config represents configuration for graph invocation This matches Python's config dict pattern
type Edge ¶
type Edge struct {
// From is the name of the node from which the edge originates.
From string
// To is the name of the node to which the edge points.
To string
}
Edge represents an edge in the message graph.
type Event ¶
type Event struct {
Type string `json:"type"`
NodeName string `json:"node_name,omitempty"`
Timestamp time.Time `json:"timestamp"`
Duration time.Duration `json:"duration,omitempty"`
Error error `json:"error,omitempty"`
State interface{} `json:"state,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
Event represents an event (matching listeners.go pattern)
type EventEmitter ¶
type EventEmitter struct {
// contains filtered or unexported fields
}
EventEmitter handles emitting events to listeners (from listeners.go integration)
func NewEventEmitter ¶
func NewEventEmitter() *EventEmitter
NewEventEmitter creates a new event emitter
func (*EventEmitter) AddListener ¶
func (e *EventEmitter) AddListener(listener EventListener)
AddListener adds an event listener
type EventListener ¶
EventListener defines the interface for event listeners (matching listeners.go)
type Exporter ¶
type Exporter struct {
// contains filtered or unexported fields
}
Exporter provides methods to export graphs in different formats
func NewExporter ¶
func NewExporter(graph *MessageGraph) *Exporter
NewExporter creates a new graph exporter for the given graph
func (*Exporter) DrawMermaid ¶
DrawMermaid generates a Mermaid diagram representation of the graph
func (*Exporter) DrawMermaidWithOptions ¶
func (ge *Exporter) DrawMermaidWithOptions(opts MermaidOptions) string
DrawMermaidWithOptions generates a Mermaid diagram with custom options
type FileCheckpointStore ¶
type FileCheckpointStore struct {
// contains filtered or unexported fields
}
FileCheckpointStore provides file-based checkpoint storage
func NewFileCheckpointStore ¶
func NewFileCheckpointStore(writer io.Writer, reader io.Reader) *FileCheckpointStore
NewFileCheckpointStore creates a new file-based checkpoint store
func (*FileCheckpointStore) Clear ¶
func (f *FileCheckpointStore) Clear(_ context.Context, executionID string) error
Clear implements CheckpointStore interface for file storage
func (*FileCheckpointStore) Delete ¶
func (f *FileCheckpointStore) Delete(_ context.Context, checkpointID string) error
Delete implements CheckpointStore interface for file storage
func (*FileCheckpointStore) List ¶
func (f *FileCheckpointStore) List(_ context.Context, _ string) ([]*Checkpoint, error)
List implements CheckpointStore interface for file storage
func (*FileCheckpointStore) Load ¶
func (f *FileCheckpointStore) Load(_ context.Context, checkpointID string) (*Checkpoint, error)
Load implements CheckpointStore interface for file storage
func (*FileCheckpointStore) Save ¶
func (f *FileCheckpointStore) Save(_ context.Context, checkpoint *Checkpoint) error
Save implements CheckpointStore interface for file storage
type GraphCallbackHandler ¶
type GraphCallbackHandler interface {
CallbackHandler
// OnGraphStep is called after a step (node execution + state update) is completed
OnGraphStep(ctx context.Context, stepNode string, state interface{})
}
GraphCallbackHandler extends CallbackHandler with graph-specific events
type GraphInterrupt ¶
type GraphInterrupt struct {
// Node that caused the interruption
Node string
// State at the time of interruption
State interface{}
// NextNodes that would have been executed if not interrupted
NextNodes []string
// InterruptValue is the value provided by the dynamic interrupt (if any)
InterruptValue interface{}
}
GraphInterrupt is returned when execution is interrupted by configuration or dynamic interrupt
func (*GraphInterrupt) Error ¶
func (e *GraphInterrupt) Error() string
type ListenableMessageGraph ¶
type ListenableMessageGraph struct {
*MessageGraph
// contains filtered or unexported fields
}
ListenableMessageGraph extends MessageGraph with listener capabilities
func NewListenableMessageGraph ¶
func NewListenableMessageGraph() *ListenableMessageGraph
NewListenableMessageGraph creates a new message graph with listener support
func (*ListenableMessageGraph) AddGlobalListener ¶
func (g *ListenableMessageGraph) AddGlobalListener(listener NodeListener)
AddGlobalListener adds a listener to all nodes in the graph
func (*ListenableMessageGraph) AddNode ¶
func (g *ListenableMessageGraph) AddNode(name string, fn func(ctx context.Context, state interface{}) (interface{}, error)) *ListenableNode
AddNode adds a node with listener capabilities
func (*ListenableMessageGraph) CompileListenable ¶
func (g *ListenableMessageGraph) CompileListenable() (*ListenableRunnable, error)
NewListenableRunnable creates a runnable with listener support
func (*ListenableMessageGraph) GetListenableNode ¶
func (g *ListenableMessageGraph) GetListenableNode(name string) *ListenableNode
GetListenableNode returns the listenable node by name
func (*ListenableMessageGraph) RemoveGlobalListener ¶
func (g *ListenableMessageGraph) RemoveGlobalListener(listener NodeListener)
RemoveGlobalListener removes a listener from all nodes in the graph
type ListenableNode ¶
type ListenableNode struct {
Node
// contains filtered or unexported fields
}
ListenableNode extends Node with listener capabilities
func NewListenableNode ¶
func NewListenableNode(node Node) *ListenableNode
NewListenableNode creates a new listenable node from a regular node
func (*ListenableNode) AddListener ¶
func (ln *ListenableNode) AddListener(listener NodeListener) *ListenableNode
AddListener adds a listener to the node
func (*ListenableNode) Execute ¶
func (ln *ListenableNode) Execute(ctx context.Context, state interface{}) (interface{}, error)
Execute runs the node function with listener notifications
func (*ListenableNode) GetListeners ¶
func (ln *ListenableNode) GetListeners() []NodeListener
GetListeners returns a copy of the current listeners
func (*ListenableNode) NotifyListeners ¶
func (ln *ListenableNode) NotifyListeners(ctx context.Context, event NodeEvent, state interface{}, err error)
NotifyListeners notifies all listeners of an event
func (*ListenableNode) RemoveListener ¶
func (ln *ListenableNode) RemoveListener(listener NodeListener)
RemoveListener removes a listener from the node
type ListenableRunnable ¶
type ListenableRunnable struct {
// contains filtered or unexported fields
}
ListenableRunnable wraps a Runnable with listener capabilities
func (*ListenableRunnable) GetGraph ¶
func (lr *ListenableRunnable) GetGraph() *Exporter
GetGraph returns a Exporter for visualization
func (*ListenableRunnable) Invoke ¶
func (lr *ListenableRunnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)
Invoke executes the graph with listener notifications
func (*ListenableRunnable) InvokeWithConfig ¶
func (lr *ListenableRunnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)
InvokeWithConfig executes the graph with listener notifications and config
type ListenableStateGraph ¶
type ListenableStateGraph struct {
*StateGraph
// contains filtered or unexported fields
}
ListenableStateGraph extends StateGraph with listener capabilities
func NewListenableStateGraph ¶
func NewListenableStateGraph() *ListenableStateGraph
NewListenableStateGraph creates a state graph with listener support
func (*ListenableStateGraph) AddListener ¶
func (g *ListenableStateGraph) AddListener(listener EventListener)
AddListener adds an event listener to the graph
type LoggingListener ¶
type LoggingListener struct {
// contains filtered or unexported fields
}
LoggingListener provides structured logging for node events
func NewLoggingListener ¶
func NewLoggingListener() *LoggingListener
NewLoggingListener creates a new logging listener
func NewLoggingListenerWithLogger ¶
func NewLoggingListenerWithLogger(logger *log.Logger) *LoggingListener
NewLoggingListenerWithLogger creates a logging listener with custom logger
func (*LoggingListener) OnNodeEvent ¶
func (ll *LoggingListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state interface{}, err error)
OnNodeEvent implements the NodeListener interface
func (*LoggingListener) WithLogLevel ¶
func (ll *LoggingListener) WithLogLevel(level LogLevel) *LoggingListener
WithLogLevel sets the minimum log level
func (*LoggingListener) WithState ¶
func (ll *LoggingListener) WithState(enabled bool) *LoggingListener
WithState enables or disables state logging
type MapReduceNode ¶
type MapReduceNode struct {
// contains filtered or unexported fields
}
MapReduceNode executes nodes in parallel and reduces results
func NewMapReduceNode ¶
func NewMapReduceNode(name string, reducer func([]interface{}) (interface{}, error), mapNodes ...Node) *MapReduceNode
NewMapReduceNode creates a new map-reduce node
type MapSchema ¶
MapSchema implements StateSchema for map[string]interface{}. It allows defining reducers for specific keys.
func (*MapSchema) Cleanup ¶
func (s *MapSchema) Cleanup(state interface{}) interface{}
Cleanup removes ephemeral keys from the state.
func (*MapSchema) RegisterChannel ¶
RegisterChannel adds a channel definition (reducer + ephemeral flag).
func (*MapSchema) RegisterReducer ¶
RegisterReducer adds a reducer for a specific key.
type MemoryCheckpointStore ¶
type MemoryCheckpointStore struct {
// contains filtered or unexported fields
}
MemoryCheckpointStore provides in-memory checkpoint storage
func NewMemoryCheckpointStore ¶
func NewMemoryCheckpointStore() *MemoryCheckpointStore
NewMemoryCheckpointStore creates a new in-memory checkpoint store
func (*MemoryCheckpointStore) Clear ¶
func (m *MemoryCheckpointStore) Clear(_ context.Context, executionID string) error
Clear implements CheckpointStore interface
func (*MemoryCheckpointStore) Delete ¶
func (m *MemoryCheckpointStore) Delete(_ context.Context, checkpointID string) error
Delete implements CheckpointStore interface
func (*MemoryCheckpointStore) List ¶
func (m *MemoryCheckpointStore) List(_ context.Context, executionID string) ([]*Checkpoint, error)
List implements CheckpointStore interface
func (*MemoryCheckpointStore) Load ¶
func (m *MemoryCheckpointStore) Load(_ context.Context, checkpointID string) (*Checkpoint, error)
Load implements CheckpointStore interface
func (*MemoryCheckpointStore) Save ¶
func (m *MemoryCheckpointStore) Save(_ context.Context, checkpoint *Checkpoint) error
Save implements CheckpointStore interface
type MermaidOptions ¶
type MermaidOptions struct {
// Direction of the flowchart (e.g., "TD", "LR")
Direction string
}
MermaidOptions defines configuration for Mermaid diagram generation
type MessageGraph ¶
type MessageGraph struct {
// Schema defines the state structure and update logic
Schema StateSchema
// contains filtered or unexported fields
}
MessageGraph represents a message graph.
Example ¶
package main
import (
"context"
"fmt"
"os"
"github.com/smallnest/langgraphgo/graph"
"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/openai"
)
func main() {
// Skip if no OpenAI API key is available
if os.Getenv("OPENAI_API_KEY") == "" {
fmt.Println("[{human [{What is 1 + 1?}]} {ai [{1 + 1 equals 2.}]}]")
return
}
model, err := openai.New()
if err != nil {
panic(err)
}
g := graph.NewMessageGraph()
g.AddNode("oracle", func(ctx context.Context, state interface{}) (interface{}, error) {
messages := state.([]llms.MessageContent)
r, err := model.GenerateContent(ctx, messages, llms.WithTemperature(0.0))
if err != nil {
return nil, err
}
return append(messages,
llms.TextParts("ai", r.Choices[0].Content),
), nil
})
g.AddNode(graph.END, func(_ context.Context, state interface{}) (interface{}, error) {
return state, nil
})
g.AddEdge("oracle", graph.END)
g.SetEntryPoint("oracle")
runnable, err := g.Compile()
if err != nil {
panic(err)
}
ctx := context.Background()
// Let's run it!
res, err := runnable.Invoke(ctx, []llms.MessageContent{
llms.TextParts("human", "What is 1 + 1?"),
})
if err != nil {
panic(err)
}
fmt.Println(res)
}
Output: [{human [{What is 1 + 1?}]} {ai [{1 + 1 equals 2.}]}]
func NewMessageGraph ¶
func NewMessageGraph() *MessageGraph
NewMessageGraph creates a new instance of MessageGraph.
func (*MessageGraph) AddConditionalEdge ¶
func (g *MessageGraph) AddConditionalEdge(from string, condition func(ctx context.Context, state interface{}) string)
AddConditionalEdge adds a conditional edge where the target node is determined at runtime. The condition function receives the current state and returns the name of the next node.
func (*MessageGraph) AddEdge ¶
func (g *MessageGraph) AddEdge(from, to string)
AddEdge adds a new edge to the message graph between the "from" and "to" nodes.
func (*MessageGraph) AddMapReduceNode ¶
func (g *MessageGraph) AddMapReduceNode( name string, mapFunctions map[string]func(context.Context, interface{}) (interface{}, error), reducer func([]interface{}) (interface{}, error), )
AddMapReduceNode adds a map-reduce pattern node
func (*MessageGraph) AddNestedConditionalSubgraph ¶
func (g *MessageGraph) AddNestedConditionalSubgraph( name string, router func(interface{}) string, subgraphs map[string]*MessageGraph, ) error
NestedConditionalSubgraph creates a subgraph with its own conditional routing
func (*MessageGraph) AddNode ¶
func (g *MessageGraph) AddNode(name string, fn func(ctx context.Context, state interface{}) (interface{}, error))
AddNode adds a new node to the message graph with the given name and function.
func (*MessageGraph) AddNodeWithCircuitBreaker ¶
func (g *MessageGraph) AddNodeWithCircuitBreaker( name string, fn func(context.Context, interface{}) (interface{}, error), config CircuitBreakerConfig, )
AddNodeWithCircuitBreaker adds a node with circuit breaker
func (*MessageGraph) AddNodeWithRateLimit ¶
func (g *MessageGraph) AddNodeWithRateLimit( name string, fn func(context.Context, interface{}) (interface{}, error), maxCalls int, window time.Duration, )
AddNodeWithRateLimit adds a node with rate limiting
func (*MessageGraph) AddNodeWithRetry ¶
func (g *MessageGraph) AddNodeWithRetry( name string, fn func(context.Context, interface{}) (interface{}, error), config *RetryConfig, )
AddNodeWithRetry adds a node with retry logic
func (*MessageGraph) AddNodeWithTimeout ¶
func (g *MessageGraph) AddNodeWithTimeout( name string, fn func(context.Context, interface{}) (interface{}, error), timeout time.Duration, )
AddNodeWithTimeout adds a node with timeout
func (*MessageGraph) AddParallelNodes ¶
func (g *MessageGraph) AddParallelNodes(groupName string, nodes map[string]func(context.Context, interface{}) (interface{}, error))
AddParallelNodes adds a set of nodes that execute in parallel
func (*MessageGraph) AddRecursiveSubgraph ¶
func (g *MessageGraph) AddRecursiveSubgraph( name string, maxDepth int, condition func(interface{}, int) bool, builder func(*MessageGraph), )
AddRecursiveSubgraph adds a recursive subgraph to the parent graph
func (*MessageGraph) AddSubgraph ¶
func (g *MessageGraph) AddSubgraph(name string, subgraph *MessageGraph) error
AddSubgraph adds a subgraph as a node in the parent graph
func (*MessageGraph) Compile ¶
func (g *MessageGraph) Compile() (*Runnable, error)
Compile compiles the message graph and returns a Runnable instance. It returns an error if the entry point is not set.
func (*MessageGraph) CreateSubgraph ¶
func (g *MessageGraph) CreateSubgraph(name string, builder func(*MessageGraph)) error
CreateSubgraph creates and adds a subgraph using a builder function
func (*MessageGraph) FanOutFanIn ¶
func (g *MessageGraph) FanOutFanIn( source string, _ []string, collector string, workerFuncs map[string]func(context.Context, interface{}) (interface{}, error), collectFunc func([]interface{}) (interface{}, error), )
FanOutFanIn creates a fan-out/fan-in pattern
func (*MessageGraph) SetEntryPoint ¶
func (g *MessageGraph) SetEntryPoint(name string)
SetEntryPoint sets the entry point node name for the message graph.
func (*MessageGraph) SetSchema ¶
func (g *MessageGraph) SetSchema(schema StateSchema)
SetSchema sets the state schema for the message graph.
func (*MessageGraph) SetStateMerger ¶
func (g *MessageGraph) SetStateMerger(merger StateMerger)
SetStateMerger sets the state merger function for the message graph.
type MessageWithID ¶
type MessageWithID interface {
GetID() string
GetContent() llms.MessageContent
}
MessageWithID is an interface that allows messages to have an ID for deduplication/upsert. Since langchaingo's MessageContent doesn't have an ID field, we can wrap it or use a custom struct. For now, we'll check if the message implements this interface or is a map with an "id" key.
type MetricsListener ¶
type MetricsListener struct {
// contains filtered or unexported fields
}
MetricsListener collects performance and execution metrics
func NewMetricsListener ¶
func NewMetricsListener() *MetricsListener
NewMetricsListener creates a new metrics listener
func (*MetricsListener) GetNodeAverageDuration ¶
func (ml *MetricsListener) GetNodeAverageDuration() map[string]time.Duration
GetNodeAverageDuration returns the average duration for each node
func (*MetricsListener) GetNodeErrors ¶
func (ml *MetricsListener) GetNodeErrors() map[string]int
GetNodeErrors returns the number of errors for each node
func (*MetricsListener) GetNodeExecutions ¶
func (ml *MetricsListener) GetNodeExecutions() map[string]int
GetNodeExecutions returns the number of executions for each node
func (*MetricsListener) GetTotalExecutions ¶
func (ml *MetricsListener) GetTotalExecutions() int
GetTotalExecutions returns the total number of node executions
func (*MetricsListener) OnNodeEvent ¶
func (ml *MetricsListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, _ interface{}, _ error)
OnNodeEvent implements the NodeListener interface
func (*MetricsListener) PrintSummary ¶
func (ml *MetricsListener) PrintSummary(writer io.Writer)
PrintSummary prints a summary of collected metrics
func (*MetricsListener) Reset ¶
func (ml *MetricsListener) Reset()
Reset clears all collected metrics
type NoOpCallbackHandler ¶
type NoOpCallbackHandler struct{}
NoOpCallbackHandler provides a no-op implementation of CallbackHandler
func (*NoOpCallbackHandler) OnChainEnd ¶
func (n *NoOpCallbackHandler) OnChainEnd(ctx context.Context, outputs map[string]interface{}, runID string)
func (*NoOpCallbackHandler) OnChainError ¶
func (n *NoOpCallbackHandler) OnChainError(ctx context.Context, err error, runID string)
func (*NoOpCallbackHandler) OnChainStart ¶
func (*NoOpCallbackHandler) OnLLMEnd ¶
func (n *NoOpCallbackHandler) OnLLMEnd(ctx context.Context, response interface{}, runID string)
func (*NoOpCallbackHandler) OnLLMError ¶
func (n *NoOpCallbackHandler) OnLLMError(ctx context.Context, err error, runID string)
func (*NoOpCallbackHandler) OnLLMStart ¶
func (*NoOpCallbackHandler) OnRetrieverEnd ¶
func (n *NoOpCallbackHandler) OnRetrieverEnd(ctx context.Context, documents []interface{}, runID string)
func (*NoOpCallbackHandler) OnRetrieverError ¶
func (n *NoOpCallbackHandler) OnRetrieverError(ctx context.Context, err error, runID string)
func (*NoOpCallbackHandler) OnRetrieverStart ¶
func (*NoOpCallbackHandler) OnToolEnd ¶
func (n *NoOpCallbackHandler) OnToolEnd(ctx context.Context, output string, runID string)
func (*NoOpCallbackHandler) OnToolError ¶
func (n *NoOpCallbackHandler) OnToolError(ctx context.Context, err error, runID string)
type Node ¶
type Node struct {
// Name is the unique identifier for the node.
Name string
// Function is the function associated with the node.
// It takes a context and any state as input and returns the updated state and an error.
Function func(ctx context.Context, state interface{}) (interface{}, error)
}
Node represents a node in the message graph.
type NodeEvent ¶
type NodeEvent string
NodeEvent represents different types of node events
const ( // NodeEventStart indicates a node has started execution NodeEventStart NodeEvent = "start" // NodeEventProgress indicates progress during node execution NodeEventProgress NodeEvent = "progress" // NodeEventComplete indicates a node has completed successfully NodeEventComplete NodeEvent = "complete" // NodeEventError indicates a node encountered an error NodeEventError NodeEvent = "error" // EventChainStart indicates the graph execution has started EventChainStart NodeEvent = "chain_start" // EventChainEnd indicates the graph execution has completed EventChainEnd NodeEvent = "chain_end" // EventToolStart indicates a tool execution has started EventToolStart NodeEvent = "tool_start" // EventToolEnd indicates a tool execution has completed EventToolEnd NodeEvent = "tool_end" // EventLLMStart indicates an LLM call has started EventLLMStart NodeEvent = "llm_start" // EventLLMEnd indicates an LLM call has completed EventLLMEnd NodeEvent = "llm_end" // EventToken indicates a generated token (for streaming) EventToken NodeEvent = "token" // EventCustom indicates a custom user-defined event EventCustom NodeEvent = "custom" )
type NodeInterrupt ¶
type NodeInterrupt struct {
// Node is the name of the node that triggered the interrupt
Node string
// Value is the data/query provided by the interrupt
Value interface{}
}
NodeInterrupt is returned when a node requests an interrupt (e.g. waiting for human input).
func (*NodeInterrupt) Error ¶
func (e *NodeInterrupt) Error() string
type NodeListener ¶
type NodeListener interface {
// OnNodeEvent is called when a node event occurs
OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error)
}
NodeListener defines the interface for node event listeners
type NodeListenerFunc ¶
type NodeListenerFunc func(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error)
NodeListenerFunc is a function adapter for NodeListener
func (NodeListenerFunc) OnNodeEvent ¶
func (f NodeListenerFunc) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error)
OnNodeEvent implements the NodeListener interface
type ParallelNode ¶
type ParallelNode struct {
// contains filtered or unexported fields
}
ParallelNode represents a set of nodes that can execute in parallel
func NewParallelNode ¶
func NewParallelNode(name string, nodes ...Node) *ParallelNode
NewParallelNode creates a new parallel node
type ProgressListener ¶
type ProgressListener struct {
// contains filtered or unexported fields
}
ProgressListener provides progress tracking with customizable output
func NewProgressListener ¶
func NewProgressListener() *ProgressListener
NewProgressListener creates a new progress listener
func NewProgressListenerWithWriter ¶
func NewProgressListenerWithWriter(writer io.Writer) *ProgressListener
NewProgressListenerWithWriter creates a progress listener with custom writer
func (*ProgressListener) OnNodeEvent ¶
func (pl *ProgressListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state interface{}, err error)
OnNodeEvent implements the NodeListener interface
func (*ProgressListener) SetNodeStep ¶
func (pl *ProgressListener) SetNodeStep(nodeName, step string)
SetNodeStep sets a custom message for a specific node
func (*ProgressListener) WithDetails ¶
func (pl *ProgressListener) WithDetails(enabled bool) *ProgressListener
WithDetails enables or disables detailed output
func (*ProgressListener) WithPrefix ¶
func (pl *ProgressListener) WithPrefix(prefix string) *ProgressListener
WithPrefix sets a custom prefix for progress messages
func (*ProgressListener) WithTiming ¶
func (pl *ProgressListener) WithTiming(enabled bool) *ProgressListener
WithTiming enables or disables timing information
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements rate limiting for nodes
func NewRateLimiter ¶
func NewRateLimiter(node Node, maxCalls int, window time.Duration) *RateLimiter
NewRateLimiter creates a new rate limiter
type RecursiveSubgraph ¶
type RecursiveSubgraph struct {
// contains filtered or unexported fields
}
RecursiveSubgraph allows a subgraph to call itself recursively
func NewRecursiveSubgraph ¶
func NewRecursiveSubgraph( name string, maxDepth int, condition func(interface{}, int) bool, ) *RecursiveSubgraph
NewRecursiveSubgraph creates a new recursive subgraph
type Reducer ¶
type Reducer func(current, new interface{}) (interface{}, error)
Reducer defines how a state value should be updated. It takes the current value and the new value, and returns the merged value.
type RetryConfig ¶
type RetryConfig struct {
MaxAttempts int
InitialDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
RetryableErrors func(error) bool // Determines if an error should trigger retry
}
RetryConfig configures retry behavior for nodes
func DefaultRetryConfig ¶
func DefaultRetryConfig() *RetryConfig
DefaultRetryConfig returns a default retry configuration
type RetryNode ¶
type RetryNode struct {
// contains filtered or unexported fields
}
RetryNode wraps a node with retry logic
func NewRetryNode ¶
func NewRetryNode(node Node, config *RetryConfig) *RetryNode
NewRetryNode creates a new retry node
type RetryPolicy ¶
type RetryPolicy struct {
MaxRetries int
BackoffStrategy BackoffStrategy
RetryableErrors []string
}
RetryPolicy defines how to handle node failures
type Runnable ¶
type Runnable struct {
// contains filtered or unexported fields
}
Runnable represents a compiled message graph that can be invoked.
func (*Runnable) Invoke ¶
Invoke executes the compiled message graph with the given input state. It returns the resulting state and an error if any occurs during the execution.
func (*Runnable) InvokeWithConfig ¶
func (r *Runnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)
InvokeWithConfig executes the compiled message graph with the given input state and config. It returns the resulting state and an error if any occurs during the execution.
func (*Runnable) WithTracer ¶
WithTracer returns a new Runnable with the given tracer
type StateGraph ¶
type StateGraph struct {
// Schema defines the state structure and update logic
Schema StateSchema
// contains filtered or unexported fields
}
StateGraph represents a state-based graph similar to Python's LangGraph StateGraph
func NewMessagesStateGraph ¶
func NewMessagesStateGraph() *StateGraph
NewMessagesStateGraph creates a StateGraph with a default schema that handles "messages" using the AddMessages reducer. This is the recommended starting point for chat-based agents.
func NewStateGraph ¶
func NewStateGraph() *StateGraph
NewStateGraph creates a new instance of StateGraph
func (*StateGraph) AddConditionalEdge ¶
func (g *StateGraph) AddConditionalEdge(from string, condition func(ctx context.Context, state interface{}) string)
AddConditionalEdge adds a conditional edge where the target node is determined at runtime
func (*StateGraph) AddEdge ¶
func (g *StateGraph) AddEdge(from, to string)
AddEdge adds a new edge to the state graph between the "from" and "to" nodes
func (*StateGraph) AddNode ¶
func (g *StateGraph) AddNode(name string, fn func(ctx context.Context, state interface{}) (interface{}, error))
AddNode adds a new node to the state graph with the given name and function
func (*StateGraph) Compile ¶
func (g *StateGraph) Compile() (*StateRunnable, error)
Compile compiles the state graph and returns a StateRunnable instance
func (*StateGraph) SetEntryPoint ¶
func (g *StateGraph) SetEntryPoint(name string)
SetEntryPoint sets the entry point node name for the state graph
func (*StateGraph) SetRetryPolicy ¶
func (g *StateGraph) SetRetryPolicy(policy *RetryPolicy)
SetRetryPolicy sets the retry policy for the graph
func (*StateGraph) SetSchema ¶
func (g *StateGraph) SetSchema(schema StateSchema)
SetSchema sets the state schema for the graph
func (*StateGraph) SetStateMerger ¶
func (g *StateGraph) SetStateMerger(merger StateMerger)
SetStateMerger sets the state merger function for the state graph
type StateMerger ¶
type StateMerger func(ctx context.Context, currentState interface{}, newStates []interface{}) (interface{}, error)
StateMerger merges multiple state updates into a single state.
type StateRunnable ¶
type StateRunnable struct {
// contains filtered or unexported fields
}
StateRunnable represents a compiled state graph that can be invoked
func (*StateRunnable) Invoke ¶
func (r *StateRunnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)
Invoke executes the compiled state graph with the given input state
func (*StateRunnable) InvokeWithConfig ¶
func (r *StateRunnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)
InvokeWithConfig executes the compiled state graph with the given input state and config
type StateSchema ¶
type StateSchema interface {
// Init returns the initial state.
Init() interface{}
// Update merges the new state into the current state.
Update(current, new interface{}) (interface{}, error)
}
StateSchema defines the structure and update logic for the graph state.
type StateSnapshot ¶
type StateSnapshot struct {
Values interface{}
Next []string
Config Config
Metadata map[string]interface{}
CreatedAt time.Time
ParentID string
}
StateSnapshot represents a snapshot of the graph state
type StreamConfig ¶
type StreamConfig struct {
// BufferSize is the size of the event channel buffer
BufferSize int
// EnableBackpressure determines if backpressure handling is enabled
EnableBackpressure bool
// MaxDroppedEvents is the maximum number of events to drop before logging
MaxDroppedEvents int
// Mode specifies what kind of events to stream
Mode StreamMode
}
StreamConfig configures streaming behavior
func DefaultStreamConfig ¶
func DefaultStreamConfig() StreamConfig
DefaultStreamConfig returns the default streaming configuration
type StreamEvent ¶
type StreamEvent struct {
// Timestamp when the event occurred
Timestamp time.Time
// NodeName is the name of the node that generated the event
NodeName string
// Event is the type of event
Event NodeEvent
// State is the current state at the time of the event
State interface{}
// Error contains any error that occurred (if Event is NodeEventError)
Error error
// Metadata contains additional event-specific data
Metadata map[string]interface{}
// Duration is how long the node took (only for Complete events)
Duration time.Duration
}
StreamEvent represents an event in the streaming execution
type StreamMode ¶
type StreamMode string
StreamMode defines the mode of streaming
const ( // StreamModeValues emits the full state after each step StreamModeValues StreamMode = "values" // StreamModeUpdates emits the updates (deltas) from each node StreamModeUpdates StreamMode = "updates" // StreamModeMessages emits LLM messages/tokens (if available) StreamModeMessages StreamMode = "messages" // StreamModeDebug emits all events (default) StreamModeDebug StreamMode = "debug" )
type StreamResult ¶
type StreamResult struct {
// Events channel receives StreamEvent objects in real-time
Events <-chan StreamEvent
// Result channel receives the final result when execution completes
Result <-chan interface{}
// Errors channel receives any errors that occur during execution
Errors <-chan error
// Done channel is closed when streaming is complete
Done <-chan struct{}
// Cancel function can be called to stop streaming
Cancel context.CancelFunc
}
StreamResult contains the channels returned by streaming execution
type StreamingExecutor ¶
type StreamingExecutor struct {
// contains filtered or unexported fields
}
StreamingExecutor provides a high-level interface for streaming execution
func NewStreamingExecutor ¶
func NewStreamingExecutor(runnable *StreamingRunnable) *StreamingExecutor
NewStreamingExecutor creates a new streaming executor
func (*StreamingExecutor) ExecuteAsync ¶
func (se *StreamingExecutor) ExecuteAsync(ctx context.Context, initialState interface{}) *StreamResult
ExecuteAsync executes the graph asynchronously and returns immediately
func (*StreamingExecutor) ExecuteWithCallback ¶
func (se *StreamingExecutor) ExecuteWithCallback( ctx context.Context, initialState interface{}, eventCallback func(event StreamEvent), resultCallback func(result interface{}, err error), ) error
ExecuteWithCallback executes the graph and calls the callback for each event
type StreamingListener ¶
type StreamingListener struct {
// contains filtered or unexported fields
}
StreamingListener implements NodeListener for streaming events
func NewStreamingListener ¶
func NewStreamingListener(eventChan chan<- StreamEvent, config StreamConfig) *StreamingListener
NewStreamingListener creates a new streaming listener
func (*StreamingListener) Close ¶
func (sl *StreamingListener) Close()
Close marks the listener as closed to prevent sending to closed channels
func (*StreamingListener) GetDroppedEventsCount ¶
func (sl *StreamingListener) GetDroppedEventsCount() int
GetDroppedEventsCount returns the number of dropped events
func (*StreamingListener) OnChainEnd ¶
func (sl *StreamingListener) OnChainEnd(ctx context.Context, outputs map[string]interface{}, runID string)
func (*StreamingListener) OnChainError ¶
func (sl *StreamingListener) OnChainError(ctx context.Context, err error, runID string)
func (*StreamingListener) OnChainStart ¶
func (*StreamingListener) OnGraphStep ¶
func (sl *StreamingListener) OnGraphStep(ctx context.Context, stepNode string, state interface{})
OnGraphStep implements GraphCallbackHandler
func (*StreamingListener) OnLLMEnd ¶
func (sl *StreamingListener) OnLLMEnd(ctx context.Context, response interface{}, runID string)
func (*StreamingListener) OnLLMError ¶
func (sl *StreamingListener) OnLLMError(ctx context.Context, err error, runID string)
func (*StreamingListener) OnLLMStart ¶
func (*StreamingListener) OnNodeEvent ¶
func (sl *StreamingListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state interface{}, err error)
OnNodeEvent implements the NodeListener interface
func (*StreamingListener) OnRetrieverEnd ¶
func (sl *StreamingListener) OnRetrieverEnd(ctx context.Context, documents []interface{}, runID string)
func (*StreamingListener) OnRetrieverError ¶
func (sl *StreamingListener) OnRetrieverError(ctx context.Context, err error, runID string)
func (*StreamingListener) OnRetrieverStart ¶
func (*StreamingListener) OnToolEnd ¶
func (sl *StreamingListener) OnToolEnd(ctx context.Context, output string, runID string)
func (*StreamingListener) OnToolError ¶
func (sl *StreamingListener) OnToolError(ctx context.Context, err error, runID string)
type StreamingMessageGraph ¶
type StreamingMessageGraph struct {
*ListenableMessageGraph
// contains filtered or unexported fields
}
StreamingMessageGraph extends ListenableMessageGraph with streaming capabilities
func NewStreamingMessageGraph ¶
func NewStreamingMessageGraph() *StreamingMessageGraph
NewStreamingMessageGraph creates a new streaming message graph
func NewStreamingMessageGraphWithConfig ¶
func NewStreamingMessageGraphWithConfig(config StreamConfig) *StreamingMessageGraph
NewStreamingMessageGraphWithConfig creates a streaming graph with custom config
func (*StreamingMessageGraph) CompileStreaming ¶
func (g *StreamingMessageGraph) CompileStreaming() (*StreamingRunnable, error)
CompileStreaming compiles the graph into a streaming runnable
func (*StreamingMessageGraph) GetStreamConfig ¶
func (g *StreamingMessageGraph) GetStreamConfig() StreamConfig
GetStreamConfig returns the current streaming configuration
func (*StreamingMessageGraph) SetStreamConfig ¶
func (g *StreamingMessageGraph) SetStreamConfig(config StreamConfig)
SetStreamConfig updates the streaming configuration
type StreamingRunnable ¶
type StreamingRunnable struct {
// contains filtered or unexported fields
}
StreamingRunnable wraps a ListenableRunnable with streaming capabilities
func NewStreamingRunnable ¶
func NewStreamingRunnable(runnable *ListenableRunnable, config StreamConfig) *StreamingRunnable
NewStreamingRunnable creates a new streaming runnable
func NewStreamingRunnableWithDefaults ¶
func NewStreamingRunnableWithDefaults(runnable *ListenableRunnable) *StreamingRunnable
NewStreamingRunnableWithDefaults creates a streaming runnable with default config
func (*StreamingRunnable) GetGraph ¶
func (sr *StreamingRunnable) GetGraph() *Exporter
GetGraph returns a Exporter for the streaming runnable
func (*StreamingRunnable) Stream ¶
func (sr *StreamingRunnable) Stream(ctx context.Context, initialState interface{}) *StreamResult
Stream executes the graph with real-time event streaming
type Subgraph ¶
type Subgraph struct {
// contains filtered or unexported fields
}
Subgraph represents a nested graph that can be used as a node
func NewSubgraph ¶
func NewSubgraph(name string, graph *MessageGraph) (*Subgraph, error)
NewSubgraph creates a new subgraph
type TimeoutNode ¶
type TimeoutNode struct {
// contains filtered or unexported fields
}
TimeoutNode wraps a node with timeout logic
func NewTimeoutNode ¶
func NewTimeoutNode(node Node, timeout time.Duration) *TimeoutNode
NewTimeoutNode creates a new timeout node
type TraceEvent ¶
type TraceEvent string
TraceEvent represents different types of events in graph execution
const ( // TraceEventGraphStart indicates the start of graph execution TraceEventGraphStart TraceEvent = "graph_start" // TraceEventGraphEnd indicates the end of graph execution TraceEventGraphEnd TraceEvent = "graph_end" // TraceEventNodeStart indicates the start of node execution TraceEventNodeStart TraceEvent = "node_start" // TraceEventNodeEnd indicates the end of node execution TraceEventNodeEnd TraceEvent = "node_end" // TraceEventNodeError indicates an error occurred in node execution TraceEventNodeError TraceEvent = "node_error" // TraceEventEdgeTraversal indicates traversal from one node to another TraceEventEdgeTraversal TraceEvent = "edge_traversal" )
type TraceHook ¶
type TraceHook interface {
// OnEvent is called when a trace event occurs
OnEvent(ctx context.Context, span *TraceSpan)
}
TraceHook defines the interface for trace event handlers
type TraceHookFunc ¶
TraceHookFunc is a function adapter for TraceHook
type TraceSpan ¶
type TraceSpan struct {
// ID is a unique identifier for this span
ID string
// ParentID is the ID of the parent span (empty for root spans)
ParentID string
// Event indicates the type of event this span represents
Event TraceEvent
// NodeName is the name of the node being executed (if applicable)
NodeName string
// FromNode is the source node for edge traversals
FromNode string
// ToNode is the destination node for edge traversals
ToNode string
// StartTime is when this span began
StartTime time.Time
// EndTime is when this span completed (zero for ongoing spans)
EndTime time.Time
// Duration is the total time taken (calculated when span ends)
Duration time.Duration
// State is a snapshot of the state at this point (optional)
State interface{}
// Error contains any error that occurred during execution
Error error
// Metadata contains additional key-value pairs for observability
Metadata map[string]interface{}
}
TraceSpan represents a span of execution with timing and metadata
func SpanFromContext ¶
SpanFromContext extracts a span from context
type TracedRunnable ¶
type TracedRunnable struct {
*Runnable
// contains filtered or unexported fields
}
TracedRunnable wraps a Runnable with tracing capabilities
func NewTracedRunnable ¶
func NewTracedRunnable(runnable *Runnable, tracer *Tracer) *TracedRunnable
NewTracedRunnable creates a new traced runnable
func (*TracedRunnable) GetTracer ¶
func (tr *TracedRunnable) GetTracer() *Tracer
GetTracer returns the tracer instance
type Tracer ¶
type Tracer struct {
// contains filtered or unexported fields
}
Tracer manages trace collection and hooks