graph

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: MIT Imports: 16 Imported by: 13

Documentation

Index

Examples

Constants

View Source
const END = "END"

END is a special constant used to represent the end node in the graph.

Variables

View Source
var (
	// ErrEntryPointNotSet is returned when the entry point of the graph is not set.
	ErrEntryPointNotSet = errors.New("entry point not set")

	// ErrNodeNotFound is returned when a node is not found in the graph.
	ErrNodeNotFound = errors.New("node not found")

	// ErrNoOutgoingEdge is returned when no outgoing edge is found for a node.
	ErrNoOutgoingEdge = errors.New("no outgoing edge found for node")
)

Functions

func AddMessages

func AddMessages(current, new interface{}) (interface{}, error)

AddMessages is a reducer designed for merging chat messages. It handles ID-based deduplication and upserts. If a new message has the same ID as an existing one, it replaces the existing one. Otherwise, it appends the new message.

func AppendReducer

func AppendReducer(current, new interface{}) (interface{}, error)

AppendReducer appends the new value to the current slice. It supports appending a slice to a slice, or a single element to a slice.

func ContextWithSpan

func ContextWithSpan(ctx context.Context, span *TraceSpan) context.Context

ContextWithSpan returns a new context with the span stored

func ExponentialBackoffRetry

func ExponentialBackoffRetry(
	ctx context.Context,
	fn func() (interface{}, error),
	maxAttempts int,
	baseDelay time.Duration,
) (interface{}, error)

ExponentialBackoffRetry implements exponential backoff with jitter

func GetResumeValue

func GetResumeValue(ctx context.Context) interface{}

GetResumeValue retrieves the resume value from the context.

func Interrupt

func Interrupt(ctx context.Context, value interface{}) (interface{}, error)

Interrupt pauses execution and waits for input. If resuming, it returns the value provided in the resume command.

func OverwriteReducer

func OverwriteReducer(current, new interface{}) (interface{}, error)

OverwriteReducer replaces the old value with the new one.

func WithConfig

func WithConfig(ctx context.Context, config *Config) context.Context

WithConfig adds the config to the context

func WithResumeValue

func WithResumeValue(ctx context.Context, value interface{}) context.Context

WithResumeValue adds a resume value to the context. This value will be returned by Interrupt() when re-executing a node.

Types

type BackoffStrategy

type BackoffStrategy int

BackoffStrategy defines different backoff strategies

const (
	FixedBackoff BackoffStrategy = iota
	ExponentialBackoff
	LinearBackoff
)

type CallbackHandler

type CallbackHandler interface {
	// Chain callbacks (for graph/workflow execution)
	OnChainStart(ctx context.Context, serialized map[string]interface{}, inputs map[string]interface{}, runID string, parentRunID *string, tags []string, metadata map[string]interface{})
	OnChainEnd(ctx context.Context, outputs map[string]interface{}, runID string)
	OnChainError(ctx context.Context, err error, runID string)

	// LLM callbacks (for AI model calls)
	OnLLMStart(ctx context.Context, serialized map[string]interface{}, prompts []string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})
	OnLLMEnd(ctx context.Context, response interface{}, runID string)
	OnLLMError(ctx context.Context, err error, runID string)

	// Tool callbacks (for tool/function calls)
	OnToolStart(ctx context.Context, serialized map[string]interface{}, inputStr string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})
	OnToolEnd(ctx context.Context, output string, runID string)
	OnToolError(ctx context.Context, err error, runID string)

	// Retriever callbacks (for data retrieval operations)
	OnRetrieverStart(ctx context.Context, serialized map[string]interface{}, query string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})
	OnRetrieverEnd(ctx context.Context, documents []interface{}, runID string)
	OnRetrieverError(ctx context.Context, err error, runID string)
}

CallbackHandler defines the interface for handling graph execution callbacks This matches Python's LangChain callback pattern

type ChatListener

type ChatListener struct {
	// contains filtered or unexported fields
}

ChatListener provides real-time chat-style updates

func NewChatListener

func NewChatListener() *ChatListener

NewChatListener creates a new chat-style listener

func NewChatListenerWithWriter

func NewChatListenerWithWriter(writer io.Writer) *ChatListener

NewChatListenerWithWriter creates a chat listener with custom writer

func (*ChatListener) OnNodeEvent

func (cl *ChatListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, _ interface{}, err error)

OnNodeEvent implements the NodeListener interface

func (*ChatListener) SetNodeMessage

func (cl *ChatListener) SetNodeMessage(nodeName, message string)

SetNodeMessage sets a custom message for a specific node

func (*ChatListener) WithTime

func (cl *ChatListener) WithTime(enabled bool) *ChatListener

WithTime enables or disables timestamps

type Checkpoint

type Checkpoint struct {
	ID        string                 `json:"id"`
	NodeName  string                 `json:"node_name"`
	State     interface{}            `json:"state"`
	Metadata  map[string]interface{} `json:"metadata"`
	Timestamp time.Time              `json:"timestamp"`
	Version   int                    `json:"version"`
}

Checkpoint represents a saved state at a specific point in execution

type CheckpointConfig

type CheckpointConfig struct {
	// Store is the checkpoint storage backend
	Store CheckpointStore

	// AutoSave enables automatic checkpointing after each node
	AutoSave bool

	// SaveInterval specifies how often to save (when AutoSave is false)
	SaveInterval time.Duration

	// MaxCheckpoints limits the number of checkpoints to keep
	MaxCheckpoints int
}

CheckpointConfig configures checkpointing behavior

func DefaultCheckpointConfig

func DefaultCheckpointConfig() CheckpointConfig

DefaultCheckpointConfig returns a default checkpoint configuration

type CheckpointListener

type CheckpointListener struct {

	// Embed NoOpCallbackHandler to satisfy other CallbackHandler methods
	NoOpCallbackHandler
	// contains filtered or unexported fields
}

CheckpointListener automatically creates checkpoints during execution

func (*CheckpointListener) OnGraphStep

func (cl *CheckpointListener) OnGraphStep(ctx context.Context, stepNode string, state interface{})

OnGraphStep implements GraphCallbackHandler

type CheckpointStore

type CheckpointStore interface {
	// Save stores a checkpoint
	Save(ctx context.Context, checkpoint *Checkpoint) error

	// Load retrieves a checkpoint by ID
	Load(ctx context.Context, checkpointID string) (*Checkpoint, error)

	// List returns all checkpoints for a given execution
	List(ctx context.Context, executionID string) ([]*Checkpoint, error)

	// Delete removes a checkpoint
	Delete(ctx context.Context, checkpointID string) error

	// Clear removes all checkpoints for an execution
	Clear(ctx context.Context, executionID string) error
}

CheckpointStore defines the interface for checkpoint persistence

type CheckpointableMessageGraph

type CheckpointableMessageGraph struct {
	*ListenableMessageGraph
	// contains filtered or unexported fields
}

CheckpointableMessageGraph extends ListenableMessageGraph with checkpointing

func NewCheckpointableMessageGraph

func NewCheckpointableMessageGraph() *CheckpointableMessageGraph

NewCheckpointableMessageGraph creates a new checkpointable message graph

func NewCheckpointableMessageGraphWithConfig

func NewCheckpointableMessageGraphWithConfig(config CheckpointConfig) *CheckpointableMessageGraph

NewCheckpointableMessageGraphWithConfig creates a checkpointable graph with custom config

func (*CheckpointableMessageGraph) CompileCheckpointable

func (g *CheckpointableMessageGraph) CompileCheckpointable() (*CheckpointableRunnable, error)

CompileCheckpointable compiles the graph into a checkpointable runnable

func (*CheckpointableMessageGraph) GetCheckpointConfig

func (g *CheckpointableMessageGraph) GetCheckpointConfig() CheckpointConfig

GetCheckpointConfig returns the current checkpointing configuration

func (*CheckpointableMessageGraph) SetCheckpointConfig

func (g *CheckpointableMessageGraph) SetCheckpointConfig(config CheckpointConfig)

SetCheckpointConfig updates the checkpointing configuration

type CheckpointableRunnable

type CheckpointableRunnable struct {
	// contains filtered or unexported fields
}

CheckpointableRunnable wraps a runnable with checkpointing capabilities

func NewCheckpointableRunnable

func NewCheckpointableRunnable(runnable *ListenableRunnable, config CheckpointConfig) *CheckpointableRunnable

NewCheckpointableRunnable creates a new checkpointable runnable

func (*CheckpointableRunnable) ClearCheckpoints

func (cr *CheckpointableRunnable) ClearCheckpoints(ctx context.Context) error

ClearCheckpoints removes all checkpoints for this execution

func (*CheckpointableRunnable) GetState

func (cr *CheckpointableRunnable) GetState(ctx context.Context, config *Config) (*StateSnapshot, error)

GetState retrieves the state for the given config

func (*CheckpointableRunnable) Invoke

func (cr *CheckpointableRunnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)

Invoke executes the graph with checkpointing

func (*CheckpointableRunnable) InvokeWithConfig

func (cr *CheckpointableRunnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)

InvokeWithConfig executes the graph with checkpointing and config

func (*CheckpointableRunnable) ListCheckpoints

func (cr *CheckpointableRunnable) ListCheckpoints(ctx context.Context) ([]*Checkpoint, error)

ListCheckpoints returns all checkpoints for this execution

func (*CheckpointableRunnable) LoadCheckpoint

func (cr *CheckpointableRunnable) LoadCheckpoint(ctx context.Context, checkpointID string) (*Checkpoint, error)

LoadCheckpoint loads a specific checkpoint

func (*CheckpointableRunnable) ResumeFromCheckpoint

func (cr *CheckpointableRunnable) ResumeFromCheckpoint(ctx context.Context, checkpointID string) (interface{}, error)

ResumeFromCheckpoint resumes execution from a specific checkpoint

func (*CheckpointableRunnable) SaveCheckpoint

func (cr *CheckpointableRunnable) SaveCheckpoint(ctx context.Context, nodeName string, state interface{}) error

SaveCheckpoint manually saves a checkpoint

func (*CheckpointableRunnable) UpdateState

func (cr *CheckpointableRunnable) UpdateState(ctx context.Context, config *Config, values interface{}, asNode string) (*Config, error)

UpdateState updates the state for the given config

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern

func NewCircuitBreaker

func NewCircuitBreaker(node Node, config CircuitBreakerConfig) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker

func (*CircuitBreaker) Execute

func (cb *CircuitBreaker) Execute(ctx context.Context, state interface{}) (interface{}, error)

Execute runs the node with circuit breaker logic

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	FailureThreshold int           // Number of failures before opening
	SuccessThreshold int           // Number of successes before closing
	Timeout          time.Duration // Time before attempting to close
	HalfOpenMaxCalls int           // Max calls in half-open state
}

CircuitBreakerConfig configures circuit breaker behavior

type CircuitBreakerState

type CircuitBreakerState int

CircuitBreakerState represents the state of a circuit breaker

const (
	CircuitClosed CircuitBreakerState = iota
	CircuitOpen
	CircuitHalfOpen
)

type CleaningStateSchema

type CleaningStateSchema interface {
	StateSchema
	// Cleanup performs any necessary cleanup on the state after a step.
	Cleanup(state interface{}) interface{}
}

CleaningStateSchema extends StateSchema with cleanup capabilities. This allows implementing ephemeral channels (values that are cleared after each step).

type Command

type Command struct {
	// Update is the value to update the state with.
	// It will be processed by the schema's reducers.
	Update interface{}

	// Goto specifies the next node(s) to execute.
	// If set, it overrides the graph's edges.
	// Can be a single string (node name) or []string.
	Goto interface{}
}

Command allows a node to dynamically update the state and control the flow. It can be returned by a node function instead of a direct state update.

type CompositeGraph

type CompositeGraph struct {
	// contains filtered or unexported fields
}

CompositeGraph allows composing multiple graphs together

func NewCompositeGraph

func NewCompositeGraph() *CompositeGraph

NewCompositeGraph creates a new composite graph

func (*CompositeGraph) AddGraph

func (cg *CompositeGraph) AddGraph(name string, graph *MessageGraph)

AddGraph adds a named graph to the composite

func (*CompositeGraph) Compile

func (cg *CompositeGraph) Compile() (*Runnable, error)

Compile compiles the composite graph into a single runnable

func (*CompositeGraph) Connect

func (cg *CompositeGraph) Connect(
	fromGraph string,
	fromNode string,
	toGraph string,
	toNode string,
	transform func(interface{}) interface{},
) error

Connect connects two graphs with a transformation function

type Config

type Config struct {
	// Callbacks to be invoked during execution
	Callbacks []CallbackHandler `json:"callbacks"`

	// Metadata to attach to the execution
	Metadata map[string]interface{} `json:"metadata"`

	// Tags to categorize the execution
	Tags []string `json:"tags"`

	// Configurable parameters for the execution
	Configurable map[string]interface{} `json:"configurable"`

	// RunName for this execution
	RunName string `json:"run_name"`

	// Timeout for the execution
	Timeout *time.Duration `json:"timeout"`

	// InterruptBefore nodes to stop before execution
	InterruptBefore []string `json:"interrupt_before"`

	// InterruptAfter nodes to stop after execution
	InterruptAfter []string `json:"interrupt_after"`

	// ResumeFrom nodes to start execution from (bypassing entry point)
	ResumeFrom []string `json:"resume_from"`

	// ResumeValue provides the value to return from an Interrupt() call when resuming
	ResumeValue interface{} `json:"resume_value"`
}

Config represents configuration for graph invocation This matches Python's config dict pattern

func GetConfig

func GetConfig(ctx context.Context) *Config

GetConfig retrieves the config from the context

type Edge

type Edge struct {
	// From is the name of the node from which the edge originates.
	From string

	// To is the name of the node to which the edge points.
	To string
}

Edge represents an edge in the message graph.

type Event

type Event struct {
	Type      string                 `json:"type"`
	NodeName  string                 `json:"node_name,omitempty"`
	Timestamp time.Time              `json:"timestamp"`
	Duration  time.Duration          `json:"duration,omitempty"`
	Error     error                  `json:"error,omitempty"`
	State     interface{}            `json:"state,omitempty"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
}

Event represents an event (matching listeners.go pattern)

type EventEmitter

type EventEmitter struct {
	// contains filtered or unexported fields
}

EventEmitter handles emitting events to listeners (from listeners.go integration)

func NewEventEmitter

func NewEventEmitter() *EventEmitter

NewEventEmitter creates a new event emitter

func (*EventEmitter) AddListener

func (e *EventEmitter) AddListener(listener EventListener)

AddListener adds an event listener

func (*EventEmitter) EmitEvent

func (e *EventEmitter) EmitEvent(ctx context.Context, event Event) error

EmitEvent emits an event to all listeners

type EventListener

type EventListener interface {
	OnEvent(ctx context.Context, event Event) error
}

EventListener defines the interface for event listeners (matching listeners.go)

type Exporter

type Exporter struct {
	// contains filtered or unexported fields
}

Exporter provides methods to export graphs in different formats

func NewExporter

func NewExporter(graph *MessageGraph) *Exporter

NewExporter creates a new graph exporter for the given graph

func (*Exporter) DrawASCII

func (ge *Exporter) DrawASCII() string

DrawASCII generates an ASCII tree representation of the graph

func (*Exporter) DrawDOT

func (ge *Exporter) DrawDOT() string

DrawDOT generates a DOT (Graphviz) representation of the graph

func (*Exporter) DrawMermaid

func (ge *Exporter) DrawMermaid() string

DrawMermaid generates a Mermaid diagram representation of the graph

func (*Exporter) DrawMermaidWithOptions

func (ge *Exporter) DrawMermaidWithOptions(opts MermaidOptions) string

DrawMermaidWithOptions generates a Mermaid diagram with custom options

type FileCheckpointStore

type FileCheckpointStore struct {
	// contains filtered or unexported fields
}

FileCheckpointStore provides file-based checkpoint storage

func NewFileCheckpointStore

func NewFileCheckpointStore(writer io.Writer, reader io.Reader) *FileCheckpointStore

NewFileCheckpointStore creates a new file-based checkpoint store

func (*FileCheckpointStore) Clear

func (f *FileCheckpointStore) Clear(_ context.Context, executionID string) error

Clear implements CheckpointStore interface for file storage

func (*FileCheckpointStore) Delete

func (f *FileCheckpointStore) Delete(_ context.Context, checkpointID string) error

Delete implements CheckpointStore interface for file storage

func (*FileCheckpointStore) List

List implements CheckpointStore interface for file storage

func (*FileCheckpointStore) Load

func (f *FileCheckpointStore) Load(_ context.Context, checkpointID string) (*Checkpoint, error)

Load implements CheckpointStore interface for file storage

func (*FileCheckpointStore) Save

func (f *FileCheckpointStore) Save(_ context.Context, checkpoint *Checkpoint) error

Save implements CheckpointStore interface for file storage

type GraphCallbackHandler

type GraphCallbackHandler interface {
	CallbackHandler
	// OnGraphStep is called after a step (node execution + state update) is completed
	OnGraphStep(ctx context.Context, stepNode string, state interface{})
}

GraphCallbackHandler extends CallbackHandler with graph-specific events

type GraphInterrupt

type GraphInterrupt struct {
	// Node that caused the interruption
	Node string
	// State at the time of interruption
	State interface{}
	// NextNodes that would have been executed if not interrupted
	NextNodes []string
	// InterruptValue is the value provided by the dynamic interrupt (if any)
	InterruptValue interface{}
}

GraphInterrupt is returned when execution is interrupted by configuration or dynamic interrupt

func (*GraphInterrupt) Error

func (e *GraphInterrupt) Error() string

type ListenableMessageGraph

type ListenableMessageGraph struct {
	*MessageGraph
	// contains filtered or unexported fields
}

ListenableMessageGraph extends MessageGraph with listener capabilities

func NewListenableMessageGraph

func NewListenableMessageGraph() *ListenableMessageGraph

NewListenableMessageGraph creates a new message graph with listener support

func (*ListenableMessageGraph) AddGlobalListener

func (g *ListenableMessageGraph) AddGlobalListener(listener NodeListener)

AddGlobalListener adds a listener to all nodes in the graph

func (*ListenableMessageGraph) AddNode

func (g *ListenableMessageGraph) AddNode(name string, fn func(ctx context.Context, state interface{}) (interface{}, error)) *ListenableNode

AddNode adds a node with listener capabilities

func (*ListenableMessageGraph) CompileListenable

func (g *ListenableMessageGraph) CompileListenable() (*ListenableRunnable, error)

NewListenableRunnable creates a runnable with listener support

func (*ListenableMessageGraph) GetListenableNode

func (g *ListenableMessageGraph) GetListenableNode(name string) *ListenableNode

GetListenableNode returns the listenable node by name

func (*ListenableMessageGraph) RemoveGlobalListener

func (g *ListenableMessageGraph) RemoveGlobalListener(listener NodeListener)

RemoveGlobalListener removes a listener from all nodes in the graph

type ListenableNode

type ListenableNode struct {
	Node
	// contains filtered or unexported fields
}

ListenableNode extends Node with listener capabilities

func NewListenableNode

func NewListenableNode(node Node) *ListenableNode

NewListenableNode creates a new listenable node from a regular node

func (*ListenableNode) AddListener

func (ln *ListenableNode) AddListener(listener NodeListener) *ListenableNode

AddListener adds a listener to the node

func (*ListenableNode) Execute

func (ln *ListenableNode) Execute(ctx context.Context, state interface{}) (interface{}, error)

Execute runs the node function with listener notifications

func (*ListenableNode) GetListeners

func (ln *ListenableNode) GetListeners() []NodeListener

GetListeners returns a copy of the current listeners

func (*ListenableNode) NotifyListeners

func (ln *ListenableNode) NotifyListeners(ctx context.Context, event NodeEvent, state interface{}, err error)

NotifyListeners notifies all listeners of an event

func (*ListenableNode) RemoveListener

func (ln *ListenableNode) RemoveListener(listener NodeListener)

RemoveListener removes a listener from the node

type ListenableRunnable

type ListenableRunnable struct {
	// contains filtered or unexported fields
}

ListenableRunnable wraps a Runnable with listener capabilities

func (*ListenableRunnable) GetGraph

func (lr *ListenableRunnable) GetGraph() *Exporter

GetGraph returns a Exporter for visualization

func (*ListenableRunnable) Invoke

func (lr *ListenableRunnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)

Invoke executes the graph with listener notifications

func (*ListenableRunnable) InvokeWithConfig

func (lr *ListenableRunnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)

InvokeWithConfig executes the graph with listener notifications and config

type ListenableStateGraph

type ListenableStateGraph struct {
	*StateGraph
	// contains filtered or unexported fields
}

ListenableStateGraph extends StateGraph with listener capabilities

func NewListenableStateGraph

func NewListenableStateGraph() *ListenableStateGraph

NewListenableStateGraph creates a state graph with listener support

func (*ListenableStateGraph) AddListener

func (g *ListenableStateGraph) AddListener(listener EventListener)

AddListener adds an event listener to the graph

type LogLevel

type LogLevel int

LogLevel defines logging levels

const (
	LogLevelDebug LogLevel = iota
	LogLevelInfo
	LogLevelWarn
	LogLevelError
)

type LoggingListener

type LoggingListener struct {
	// contains filtered or unexported fields
}

LoggingListener provides structured logging for node events

func NewLoggingListener

func NewLoggingListener() *LoggingListener

NewLoggingListener creates a new logging listener

func NewLoggingListenerWithLogger

func NewLoggingListenerWithLogger(logger *log.Logger) *LoggingListener

NewLoggingListenerWithLogger creates a logging listener with custom logger

func (*LoggingListener) OnNodeEvent

func (ll *LoggingListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state interface{}, err error)

OnNodeEvent implements the NodeListener interface

func (*LoggingListener) WithLogLevel

func (ll *LoggingListener) WithLogLevel(level LogLevel) *LoggingListener

WithLogLevel sets the minimum log level

func (*LoggingListener) WithState

func (ll *LoggingListener) WithState(enabled bool) *LoggingListener

WithState enables or disables state logging

type MapReduceNode

type MapReduceNode struct {
	// contains filtered or unexported fields
}

MapReduceNode executes nodes in parallel and reduces results

func NewMapReduceNode

func NewMapReduceNode(name string, reducer func([]interface{}) (interface{}, error), mapNodes ...Node) *MapReduceNode

NewMapReduceNode creates a new map-reduce node

func (*MapReduceNode) Execute

func (mr *MapReduceNode) Execute(ctx context.Context, state interface{}) (interface{}, error)

Execute runs map nodes in parallel and reduces results

type MapSchema

type MapSchema struct {
	Reducers      map[string]Reducer
	EphemeralKeys map[string]bool
}

MapSchema implements StateSchema for map[string]interface{}. It allows defining reducers for specific keys.

func NewMapSchema

func NewMapSchema() *MapSchema

NewMapSchema creates a new MapSchema.

func (*MapSchema) Cleanup

func (s *MapSchema) Cleanup(state interface{}) interface{}

Cleanup removes ephemeral keys from the state.

func (*MapSchema) Init

func (s *MapSchema) Init() interface{}

Init returns an empty map.

func (*MapSchema) RegisterChannel

func (s *MapSchema) RegisterChannel(key string, reducer Reducer, isEphemeral bool)

RegisterChannel adds a channel definition (reducer + ephemeral flag).

func (*MapSchema) RegisterReducer

func (s *MapSchema) RegisterReducer(key string, reducer Reducer)

RegisterReducer adds a reducer for a specific key.

func (*MapSchema) Update

func (s *MapSchema) Update(current, new interface{}) (interface{}, error)

Update merges the new map into the current map using registered reducers.

type MemoryCheckpointStore

type MemoryCheckpointStore struct {
	// contains filtered or unexported fields
}

MemoryCheckpointStore provides in-memory checkpoint storage

func NewMemoryCheckpointStore

func NewMemoryCheckpointStore() *MemoryCheckpointStore

NewMemoryCheckpointStore creates a new in-memory checkpoint store

func (*MemoryCheckpointStore) Clear

func (m *MemoryCheckpointStore) Clear(_ context.Context, executionID string) error

Clear implements CheckpointStore interface

func (*MemoryCheckpointStore) Delete

func (m *MemoryCheckpointStore) Delete(_ context.Context, checkpointID string) error

Delete implements CheckpointStore interface

func (*MemoryCheckpointStore) List

func (m *MemoryCheckpointStore) List(_ context.Context, executionID string) ([]*Checkpoint, error)

List implements CheckpointStore interface

func (*MemoryCheckpointStore) Load

func (m *MemoryCheckpointStore) Load(_ context.Context, checkpointID string) (*Checkpoint, error)

Load implements CheckpointStore interface

func (*MemoryCheckpointStore) Save

func (m *MemoryCheckpointStore) Save(_ context.Context, checkpoint *Checkpoint) error

Save implements CheckpointStore interface

type MermaidOptions

type MermaidOptions struct {
	// Direction of the flowchart (e.g., "TD", "LR")
	Direction string
}

MermaidOptions defines configuration for Mermaid diagram generation

type MessageGraph

type MessageGraph struct {

	// Schema defines the state structure and update logic
	Schema StateSchema
	// contains filtered or unexported fields
}

MessageGraph represents a message graph.

Example
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/smallnest/langgraphgo/graph"
	"github.com/tmc/langchaingo/llms"
	"github.com/tmc/langchaingo/llms/openai"
)

func main() {
	// Skip if no OpenAI API key is available
	if os.Getenv("OPENAI_API_KEY") == "" {
		fmt.Println("[{human [{What is 1 + 1?}]} {ai [{1 + 1 equals 2.}]}]")
		return
	}

	model, err := openai.New()
	if err != nil {
		panic(err)
	}

	g := graph.NewMessageGraph()

	g.AddNode("oracle", func(ctx context.Context, state interface{}) (interface{}, error) {
		messages := state.([]llms.MessageContent)
		r, err := model.GenerateContent(ctx, messages, llms.WithTemperature(0.0))
		if err != nil {
			return nil, err
		}
		return append(messages,
			llms.TextParts("ai", r.Choices[0].Content),
		), nil
	})
	g.AddNode(graph.END, func(_ context.Context, state interface{}) (interface{}, error) {
		return state, nil
	})

	g.AddEdge("oracle", graph.END)
	g.SetEntryPoint("oracle")

	runnable, err := g.Compile()
	if err != nil {
		panic(err)
	}

	ctx := context.Background()
	// Let's run it!
	res, err := runnable.Invoke(ctx, []llms.MessageContent{
		llms.TextParts("human", "What is 1 + 1?"),
	})
	if err != nil {
		panic(err)
	}

	fmt.Println(res)

}
Output:
[{human [{What is 1 + 1?}]} {ai [{1 + 1 equals 2.}]}]

func NewMessageGraph

func NewMessageGraph() *MessageGraph

NewMessageGraph creates a new instance of MessageGraph.

func (*MessageGraph) AddConditionalEdge

func (g *MessageGraph) AddConditionalEdge(from string, condition func(ctx context.Context, state interface{}) string)

AddConditionalEdge adds a conditional edge where the target node is determined at runtime. The condition function receives the current state and returns the name of the next node.

func (*MessageGraph) AddEdge

func (g *MessageGraph) AddEdge(from, to string)

AddEdge adds a new edge to the message graph between the "from" and "to" nodes.

func (*MessageGraph) AddMapReduceNode

func (g *MessageGraph) AddMapReduceNode(
	name string,
	mapFunctions map[string]func(context.Context, interface{}) (interface{}, error),
	reducer func([]interface{}) (interface{}, error),
)

AddMapReduceNode adds a map-reduce pattern node

func (*MessageGraph) AddNestedConditionalSubgraph

func (g *MessageGraph) AddNestedConditionalSubgraph(
	name string,
	router func(interface{}) string,
	subgraphs map[string]*MessageGraph,
) error

NestedConditionalSubgraph creates a subgraph with its own conditional routing

func (*MessageGraph) AddNode

func (g *MessageGraph) AddNode(name string, fn func(ctx context.Context, state interface{}) (interface{}, error))

AddNode adds a new node to the message graph with the given name and function.

func (*MessageGraph) AddNodeWithCircuitBreaker

func (g *MessageGraph) AddNodeWithCircuitBreaker(
	name string,
	fn func(context.Context, interface{}) (interface{}, error),
	config CircuitBreakerConfig,
)

AddNodeWithCircuitBreaker adds a node with circuit breaker

func (*MessageGraph) AddNodeWithRateLimit

func (g *MessageGraph) AddNodeWithRateLimit(
	name string,
	fn func(context.Context, interface{}) (interface{}, error),
	maxCalls int,
	window time.Duration,
)

AddNodeWithRateLimit adds a node with rate limiting

func (*MessageGraph) AddNodeWithRetry

func (g *MessageGraph) AddNodeWithRetry(
	name string,
	fn func(context.Context, interface{}) (interface{}, error),
	config *RetryConfig,
)

AddNodeWithRetry adds a node with retry logic

func (*MessageGraph) AddNodeWithTimeout

func (g *MessageGraph) AddNodeWithTimeout(
	name string,
	fn func(context.Context, interface{}) (interface{}, error),
	timeout time.Duration,
)

AddNodeWithTimeout adds a node with timeout

func (*MessageGraph) AddParallelNodes

func (g *MessageGraph) AddParallelNodes(groupName string, nodes map[string]func(context.Context, interface{}) (interface{}, error))

AddParallelNodes adds a set of nodes that execute in parallel

func (*MessageGraph) AddRecursiveSubgraph

func (g *MessageGraph) AddRecursiveSubgraph(
	name string,
	maxDepth int,
	condition func(interface{}, int) bool,
	builder func(*MessageGraph),
)

AddRecursiveSubgraph adds a recursive subgraph to the parent graph

func (*MessageGraph) AddSubgraph

func (g *MessageGraph) AddSubgraph(name string, subgraph *MessageGraph) error

AddSubgraph adds a subgraph as a node in the parent graph

func (*MessageGraph) Compile

func (g *MessageGraph) Compile() (*Runnable, error)

Compile compiles the message graph and returns a Runnable instance. It returns an error if the entry point is not set.

func (*MessageGraph) CreateSubgraph

func (g *MessageGraph) CreateSubgraph(name string, builder func(*MessageGraph)) error

CreateSubgraph creates and adds a subgraph using a builder function

func (*MessageGraph) FanOutFanIn

func (g *MessageGraph) FanOutFanIn(
	source string,
	_ []string,
	collector string,
	workerFuncs map[string]func(context.Context, interface{}) (interface{}, error),
	collectFunc func([]interface{}) (interface{}, error),
)

FanOutFanIn creates a fan-out/fan-in pattern

func (*MessageGraph) SetEntryPoint

func (g *MessageGraph) SetEntryPoint(name string)

SetEntryPoint sets the entry point node name for the message graph.

func (*MessageGraph) SetSchema

func (g *MessageGraph) SetSchema(schema StateSchema)

SetSchema sets the state schema for the message graph.

func (*MessageGraph) SetStateMerger

func (g *MessageGraph) SetStateMerger(merger StateMerger)

SetStateMerger sets the state merger function for the message graph.

type MessageWithID

type MessageWithID interface {
	GetID() string
	GetContent() llms.MessageContent
}

MessageWithID is an interface that allows messages to have an ID for deduplication/upsert. Since langchaingo's MessageContent doesn't have an ID field, we can wrap it or use a custom struct. For now, we'll check if the message implements this interface or is a map with an "id" key.

type MetricsListener

type MetricsListener struct {
	// contains filtered or unexported fields
}

MetricsListener collects performance and execution metrics

func NewMetricsListener

func NewMetricsListener() *MetricsListener

NewMetricsListener creates a new metrics listener

func (*MetricsListener) GetNodeAverageDuration

func (ml *MetricsListener) GetNodeAverageDuration() map[string]time.Duration

GetNodeAverageDuration returns the average duration for each node

func (*MetricsListener) GetNodeErrors

func (ml *MetricsListener) GetNodeErrors() map[string]int

GetNodeErrors returns the number of errors for each node

func (*MetricsListener) GetNodeExecutions

func (ml *MetricsListener) GetNodeExecutions() map[string]int

GetNodeExecutions returns the number of executions for each node

func (*MetricsListener) GetTotalExecutions

func (ml *MetricsListener) GetTotalExecutions() int

GetTotalExecutions returns the total number of node executions

func (*MetricsListener) OnNodeEvent

func (ml *MetricsListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, _ interface{}, _ error)

OnNodeEvent implements the NodeListener interface

func (*MetricsListener) PrintSummary

func (ml *MetricsListener) PrintSummary(writer io.Writer)

PrintSummary prints a summary of collected metrics

func (*MetricsListener) Reset

func (ml *MetricsListener) Reset()

Reset clears all collected metrics

type NoOpCallbackHandler

type NoOpCallbackHandler struct{}

NoOpCallbackHandler provides a no-op implementation of CallbackHandler

func (*NoOpCallbackHandler) OnChainEnd

func (n *NoOpCallbackHandler) OnChainEnd(ctx context.Context, outputs map[string]interface{}, runID string)

func (*NoOpCallbackHandler) OnChainError

func (n *NoOpCallbackHandler) OnChainError(ctx context.Context, err error, runID string)

func (*NoOpCallbackHandler) OnChainStart

func (n *NoOpCallbackHandler) OnChainStart(ctx context.Context, serialized map[string]interface{}, inputs map[string]interface{}, runID string, parentRunID *string, tags []string, metadata map[string]interface{})

func (*NoOpCallbackHandler) OnLLMEnd

func (n *NoOpCallbackHandler) OnLLMEnd(ctx context.Context, response interface{}, runID string)

func (*NoOpCallbackHandler) OnLLMError

func (n *NoOpCallbackHandler) OnLLMError(ctx context.Context, err error, runID string)

func (*NoOpCallbackHandler) OnLLMStart

func (n *NoOpCallbackHandler) OnLLMStart(ctx context.Context, serialized map[string]interface{}, prompts []string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})

func (*NoOpCallbackHandler) OnRetrieverEnd

func (n *NoOpCallbackHandler) OnRetrieverEnd(ctx context.Context, documents []interface{}, runID string)

func (*NoOpCallbackHandler) OnRetrieverError

func (n *NoOpCallbackHandler) OnRetrieverError(ctx context.Context, err error, runID string)

func (*NoOpCallbackHandler) OnRetrieverStart

func (n *NoOpCallbackHandler) OnRetrieverStart(ctx context.Context, serialized map[string]interface{}, query string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})

func (*NoOpCallbackHandler) OnToolEnd

func (n *NoOpCallbackHandler) OnToolEnd(ctx context.Context, output string, runID string)

func (*NoOpCallbackHandler) OnToolError

func (n *NoOpCallbackHandler) OnToolError(ctx context.Context, err error, runID string)

func (*NoOpCallbackHandler) OnToolStart

func (n *NoOpCallbackHandler) OnToolStart(ctx context.Context, serialized map[string]interface{}, inputStr string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})

type Node

type Node struct {
	// Name is the unique identifier for the node.
	Name string

	// Function is the function associated with the node.
	// It takes a context and any state as input and returns the updated state and an error.
	Function func(ctx context.Context, state interface{}) (interface{}, error)
}

Node represents a node in the message graph.

type NodeEvent

type NodeEvent string

NodeEvent represents different types of node events

const (
	// NodeEventStart indicates a node has started execution
	NodeEventStart NodeEvent = "start"

	// NodeEventProgress indicates progress during node execution
	NodeEventProgress NodeEvent = "progress"

	// NodeEventComplete indicates a node has completed successfully
	NodeEventComplete NodeEvent = "complete"

	// NodeEventError indicates a node encountered an error
	NodeEventError NodeEvent = "error"

	// EventChainStart indicates the graph execution has started
	EventChainStart NodeEvent = "chain_start"

	// EventChainEnd indicates the graph execution has completed
	EventChainEnd NodeEvent = "chain_end"

	// EventToolStart indicates a tool execution has started
	EventToolStart NodeEvent = "tool_start"

	// EventToolEnd indicates a tool execution has completed
	EventToolEnd NodeEvent = "tool_end"

	// EventLLMStart indicates an LLM call has started
	EventLLMStart NodeEvent = "llm_start"

	// EventLLMEnd indicates an LLM call has completed
	EventLLMEnd NodeEvent = "llm_end"

	// EventToken indicates a generated token (for streaming)
	EventToken NodeEvent = "token"

	// EventCustom indicates a custom user-defined event
	EventCustom NodeEvent = "custom"
)

type NodeInterrupt

type NodeInterrupt struct {
	// Node is the name of the node that triggered the interrupt
	Node string
	// Value is the data/query provided by the interrupt
	Value interface{}
}

NodeInterrupt is returned when a node requests an interrupt (e.g. waiting for human input).

func (*NodeInterrupt) Error

func (e *NodeInterrupt) Error() string

type NodeListener

type NodeListener interface {
	// OnNodeEvent is called when a node event occurs
	OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error)
}

NodeListener defines the interface for node event listeners

type NodeListenerFunc

type NodeListenerFunc func(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error)

NodeListenerFunc is a function adapter for NodeListener

func (NodeListenerFunc) OnNodeEvent

func (f NodeListenerFunc) OnNodeEvent(ctx context.Context, event NodeEvent, nodeName string, state interface{}, err error)

OnNodeEvent implements the NodeListener interface

type ParallelNode

type ParallelNode struct {
	// contains filtered or unexported fields
}

ParallelNode represents a set of nodes that can execute in parallel

func NewParallelNode

func NewParallelNode(name string, nodes ...Node) *ParallelNode

NewParallelNode creates a new parallel node

func (*ParallelNode) Execute

func (pn *ParallelNode) Execute(ctx context.Context, state interface{}) (interface{}, error)

Execute runs all nodes in parallel and collects results

type ProgressListener

type ProgressListener struct {
	// contains filtered or unexported fields
}

ProgressListener provides progress tracking with customizable output

func NewProgressListener

func NewProgressListener() *ProgressListener

NewProgressListener creates a new progress listener

func NewProgressListenerWithWriter

func NewProgressListenerWithWriter(writer io.Writer) *ProgressListener

NewProgressListenerWithWriter creates a progress listener with custom writer

func (*ProgressListener) OnNodeEvent

func (pl *ProgressListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state interface{}, err error)

OnNodeEvent implements the NodeListener interface

func (*ProgressListener) SetNodeStep

func (pl *ProgressListener) SetNodeStep(nodeName, step string)

SetNodeStep sets a custom message for a specific node

func (*ProgressListener) WithDetails

func (pl *ProgressListener) WithDetails(enabled bool) *ProgressListener

WithDetails enables or disables detailed output

func (*ProgressListener) WithPrefix

func (pl *ProgressListener) WithPrefix(prefix string) *ProgressListener

WithPrefix sets a custom prefix for progress messages

func (*ProgressListener) WithTiming

func (pl *ProgressListener) WithTiming(enabled bool) *ProgressListener

WithTiming enables or disables timing information

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements rate limiting for nodes

func NewRateLimiter

func NewRateLimiter(node Node, maxCalls int, window time.Duration) *RateLimiter

NewRateLimiter creates a new rate limiter

func (*RateLimiter) Execute

func (rl *RateLimiter) Execute(ctx context.Context, state interface{}) (interface{}, error)

Execute runs the node with rate limiting

type RecursiveSubgraph

type RecursiveSubgraph struct {
	// contains filtered or unexported fields
}

RecursiveSubgraph allows a subgraph to call itself recursively

func NewRecursiveSubgraph

func NewRecursiveSubgraph(
	name string,
	maxDepth int,
	condition func(interface{}, int) bool,
) *RecursiveSubgraph

NewRecursiveSubgraph creates a new recursive subgraph

func (*RecursiveSubgraph) Execute

func (rs *RecursiveSubgraph) Execute(ctx context.Context, state interface{}) (interface{}, error)

Execute runs the recursive subgraph

type Reducer

type Reducer func(current, new interface{}) (interface{}, error)

Reducer defines how a state value should be updated. It takes the current value and the new value, and returns the merged value.

type RetryConfig

type RetryConfig struct {
	MaxAttempts     int
	InitialDelay    time.Duration
	MaxDelay        time.Duration
	BackoffFactor   float64
	RetryableErrors func(error) bool // Determines if an error should trigger retry
}

RetryConfig configures retry behavior for nodes

func DefaultRetryConfig

func DefaultRetryConfig() *RetryConfig

DefaultRetryConfig returns a default retry configuration

type RetryNode

type RetryNode struct {
	// contains filtered or unexported fields
}

RetryNode wraps a node with retry logic

func NewRetryNode

func NewRetryNode(node Node, config *RetryConfig) *RetryNode

NewRetryNode creates a new retry node

func (*RetryNode) Execute

func (rn *RetryNode) Execute(ctx context.Context, state interface{}) (interface{}, error)

Execute runs the node with retry logic

type RetryPolicy

type RetryPolicy struct {
	MaxRetries      int
	BackoffStrategy BackoffStrategy
	RetryableErrors []string
}

RetryPolicy defines how to handle node failures

type Runnable

type Runnable struct {
	// contains filtered or unexported fields
}

Runnable represents a compiled message graph that can be invoked.

func (*Runnable) GetGraph

func (r *Runnable) GetGraph() *Exporter

GetGraph returns a Exporter for the compiled graph's visualization

func (*Runnable) Invoke

func (r *Runnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)

Invoke executes the compiled message graph with the given input state. It returns the resulting state and an error if any occurs during the execution.

func (*Runnable) InvokeWithConfig

func (r *Runnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)

InvokeWithConfig executes the compiled message graph with the given input state and config. It returns the resulting state and an error if any occurs during the execution.

func (*Runnable) SetTracer

func (r *Runnable) SetTracer(tracer *Tracer)

SetTracer sets a tracer for observability

func (*Runnable) WithTracer

func (r *Runnable) WithTracer(tracer *Tracer) *Runnable

WithTracer returns a new Runnable with the given tracer

type StateGraph

type StateGraph struct {

	// Schema defines the state structure and update logic
	Schema StateSchema
	// contains filtered or unexported fields
}

StateGraph represents a state-based graph similar to Python's LangGraph StateGraph

func NewMessagesStateGraph

func NewMessagesStateGraph() *StateGraph

NewMessagesStateGraph creates a StateGraph with a default schema that handles "messages" using the AddMessages reducer. This is the recommended starting point for chat-based agents.

func NewStateGraph

func NewStateGraph() *StateGraph

NewStateGraph creates a new instance of StateGraph

func (*StateGraph) AddConditionalEdge

func (g *StateGraph) AddConditionalEdge(from string, condition func(ctx context.Context, state interface{}) string)

AddConditionalEdge adds a conditional edge where the target node is determined at runtime

func (*StateGraph) AddEdge

func (g *StateGraph) AddEdge(from, to string)

AddEdge adds a new edge to the state graph between the "from" and "to" nodes

func (*StateGraph) AddNode

func (g *StateGraph) AddNode(name string, fn func(ctx context.Context, state interface{}) (interface{}, error))

AddNode adds a new node to the state graph with the given name and function

func (*StateGraph) Compile

func (g *StateGraph) Compile() (*StateRunnable, error)

Compile compiles the state graph and returns a StateRunnable instance

func (*StateGraph) SetEntryPoint

func (g *StateGraph) SetEntryPoint(name string)

SetEntryPoint sets the entry point node name for the state graph

func (*StateGraph) SetRetryPolicy

func (g *StateGraph) SetRetryPolicy(policy *RetryPolicy)

SetRetryPolicy sets the retry policy for the graph

func (*StateGraph) SetSchema

func (g *StateGraph) SetSchema(schema StateSchema)

SetSchema sets the state schema for the graph

func (*StateGraph) SetStateMerger

func (g *StateGraph) SetStateMerger(merger StateMerger)

SetStateMerger sets the state merger function for the state graph

type StateMerger

type StateMerger func(ctx context.Context, currentState interface{}, newStates []interface{}) (interface{}, error)

StateMerger merges multiple state updates into a single state.

type StateRunnable

type StateRunnable struct {
	// contains filtered or unexported fields
}

StateRunnable represents a compiled state graph that can be invoked

func (*StateRunnable) Invoke

func (r *StateRunnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)

Invoke executes the compiled state graph with the given input state

func (*StateRunnable) InvokeWithConfig

func (r *StateRunnable) InvokeWithConfig(ctx context.Context, initialState interface{}, config *Config) (interface{}, error)

InvokeWithConfig executes the compiled state graph with the given input state and config

type StateSchema

type StateSchema interface {
	// Init returns the initial state.
	Init() interface{}

	// Update merges the new state into the current state.
	Update(current, new interface{}) (interface{}, error)
}

StateSchema defines the structure and update logic for the graph state.

type StateSnapshot

type StateSnapshot struct {
	Values    interface{}
	Next      []string
	Config    Config
	Metadata  map[string]interface{}
	CreatedAt time.Time
	ParentID  string
}

StateSnapshot represents a snapshot of the graph state

type StreamConfig

type StreamConfig struct {
	// BufferSize is the size of the event channel buffer
	BufferSize int

	// EnableBackpressure determines if backpressure handling is enabled
	EnableBackpressure bool

	// MaxDroppedEvents is the maximum number of events to drop before logging
	MaxDroppedEvents int

	// Mode specifies what kind of events to stream
	Mode StreamMode
}

StreamConfig configures streaming behavior

func DefaultStreamConfig

func DefaultStreamConfig() StreamConfig

DefaultStreamConfig returns the default streaming configuration

type StreamEvent

type StreamEvent struct {
	// Timestamp when the event occurred
	Timestamp time.Time

	// NodeName is the name of the node that generated the event
	NodeName string

	// Event is the type of event
	Event NodeEvent

	// State is the current state at the time of the event
	State interface{}

	// Error contains any error that occurred (if Event is NodeEventError)
	Error error

	// Metadata contains additional event-specific data
	Metadata map[string]interface{}

	// Duration is how long the node took (only for Complete events)
	Duration time.Duration
}

StreamEvent represents an event in the streaming execution

type StreamMode

type StreamMode string

StreamMode defines the mode of streaming

const (
	// StreamModeValues emits the full state after each step
	StreamModeValues StreamMode = "values"
	// StreamModeUpdates emits the updates (deltas) from each node
	StreamModeUpdates StreamMode = "updates"
	// StreamModeMessages emits LLM messages/tokens (if available)
	StreamModeMessages StreamMode = "messages"
	// StreamModeDebug emits all events (default)
	StreamModeDebug StreamMode = "debug"
)

type StreamResult

type StreamResult struct {
	// Events channel receives StreamEvent objects in real-time
	Events <-chan StreamEvent

	// Result channel receives the final result when execution completes
	Result <-chan interface{}

	// Errors channel receives any errors that occur during execution
	Errors <-chan error

	// Done channel is closed when streaming is complete
	Done <-chan struct{}

	// Cancel function can be called to stop streaming
	Cancel context.CancelFunc
}

StreamResult contains the channels returned by streaming execution

type StreamingExecutor

type StreamingExecutor struct {
	// contains filtered or unexported fields
}

StreamingExecutor provides a high-level interface for streaming execution

func NewStreamingExecutor

func NewStreamingExecutor(runnable *StreamingRunnable) *StreamingExecutor

NewStreamingExecutor creates a new streaming executor

func (*StreamingExecutor) ExecuteAsync

func (se *StreamingExecutor) ExecuteAsync(ctx context.Context, initialState interface{}) *StreamResult

ExecuteAsync executes the graph asynchronously and returns immediately

func (*StreamingExecutor) ExecuteWithCallback

func (se *StreamingExecutor) ExecuteWithCallback(
	ctx context.Context,
	initialState interface{},
	eventCallback func(event StreamEvent),
	resultCallback func(result interface{}, err error),
) error

ExecuteWithCallback executes the graph and calls the callback for each event

type StreamingListener

type StreamingListener struct {
	// contains filtered or unexported fields
}

StreamingListener implements NodeListener for streaming events

func NewStreamingListener

func NewStreamingListener(eventChan chan<- StreamEvent, config StreamConfig) *StreamingListener

NewStreamingListener creates a new streaming listener

func (*StreamingListener) Close

func (sl *StreamingListener) Close()

Close marks the listener as closed to prevent sending to closed channels

func (*StreamingListener) GetDroppedEventsCount

func (sl *StreamingListener) GetDroppedEventsCount() int

GetDroppedEventsCount returns the number of dropped events

func (*StreamingListener) OnChainEnd

func (sl *StreamingListener) OnChainEnd(ctx context.Context, outputs map[string]interface{}, runID string)

func (*StreamingListener) OnChainError

func (sl *StreamingListener) OnChainError(ctx context.Context, err error, runID string)

func (*StreamingListener) OnChainStart

func (sl *StreamingListener) OnChainStart(ctx context.Context, serialized map[string]interface{}, inputs map[string]interface{}, runID string, parentRunID *string, tags []string, metadata map[string]interface{})

func (*StreamingListener) OnGraphStep

func (sl *StreamingListener) OnGraphStep(ctx context.Context, stepNode string, state interface{})

OnGraphStep implements GraphCallbackHandler

func (*StreamingListener) OnLLMEnd

func (sl *StreamingListener) OnLLMEnd(ctx context.Context, response interface{}, runID string)

func (*StreamingListener) OnLLMError

func (sl *StreamingListener) OnLLMError(ctx context.Context, err error, runID string)

func (*StreamingListener) OnLLMStart

func (sl *StreamingListener) OnLLMStart(ctx context.Context, serialized map[string]interface{}, prompts []string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})

func (*StreamingListener) OnNodeEvent

func (sl *StreamingListener) OnNodeEvent(_ context.Context, event NodeEvent, nodeName string, state interface{}, err error)

OnNodeEvent implements the NodeListener interface

func (*StreamingListener) OnRetrieverEnd

func (sl *StreamingListener) OnRetrieverEnd(ctx context.Context, documents []interface{}, runID string)

func (*StreamingListener) OnRetrieverError

func (sl *StreamingListener) OnRetrieverError(ctx context.Context, err error, runID string)

func (*StreamingListener) OnRetrieverStart

func (sl *StreamingListener) OnRetrieverStart(ctx context.Context, serialized map[string]interface{}, query string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})

func (*StreamingListener) OnToolEnd

func (sl *StreamingListener) OnToolEnd(ctx context.Context, output string, runID string)

func (*StreamingListener) OnToolError

func (sl *StreamingListener) OnToolError(ctx context.Context, err error, runID string)

func (*StreamingListener) OnToolStart

func (sl *StreamingListener) OnToolStart(ctx context.Context, serialized map[string]interface{}, inputStr string, runID string, parentRunID *string, tags []string, metadata map[string]interface{})

type StreamingMessageGraph

type StreamingMessageGraph struct {
	*ListenableMessageGraph
	// contains filtered or unexported fields
}

StreamingMessageGraph extends ListenableMessageGraph with streaming capabilities

func NewStreamingMessageGraph

func NewStreamingMessageGraph() *StreamingMessageGraph

NewStreamingMessageGraph creates a new streaming message graph

func NewStreamingMessageGraphWithConfig

func NewStreamingMessageGraphWithConfig(config StreamConfig) *StreamingMessageGraph

NewStreamingMessageGraphWithConfig creates a streaming graph with custom config

func (*StreamingMessageGraph) CompileStreaming

func (g *StreamingMessageGraph) CompileStreaming() (*StreamingRunnable, error)

CompileStreaming compiles the graph into a streaming runnable

func (*StreamingMessageGraph) GetStreamConfig

func (g *StreamingMessageGraph) GetStreamConfig() StreamConfig

GetStreamConfig returns the current streaming configuration

func (*StreamingMessageGraph) SetStreamConfig

func (g *StreamingMessageGraph) SetStreamConfig(config StreamConfig)

SetStreamConfig updates the streaming configuration

type StreamingRunnable

type StreamingRunnable struct {
	// contains filtered or unexported fields
}

StreamingRunnable wraps a ListenableRunnable with streaming capabilities

func NewStreamingRunnable

func NewStreamingRunnable(runnable *ListenableRunnable, config StreamConfig) *StreamingRunnable

NewStreamingRunnable creates a new streaming runnable

func NewStreamingRunnableWithDefaults

func NewStreamingRunnableWithDefaults(runnable *ListenableRunnable) *StreamingRunnable

NewStreamingRunnableWithDefaults creates a streaming runnable with default config

func (*StreamingRunnable) GetGraph

func (sr *StreamingRunnable) GetGraph() *Exporter

GetGraph returns a Exporter for the streaming runnable

func (*StreamingRunnable) Stream

func (sr *StreamingRunnable) Stream(ctx context.Context, initialState interface{}) *StreamResult

Stream executes the graph with real-time event streaming

type Subgraph

type Subgraph struct {
	// contains filtered or unexported fields
}

Subgraph represents a nested graph that can be used as a node

func NewSubgraph

func NewSubgraph(name string, graph *MessageGraph) (*Subgraph, error)

NewSubgraph creates a new subgraph

func (*Subgraph) Execute

func (s *Subgraph) Execute(ctx context.Context, state interface{}) (interface{}, error)

Execute runs the subgraph as a node

type TimeoutNode

type TimeoutNode struct {
	// contains filtered or unexported fields
}

TimeoutNode wraps a node with timeout logic

func NewTimeoutNode

func NewTimeoutNode(node Node, timeout time.Duration) *TimeoutNode

NewTimeoutNode creates a new timeout node

func (*TimeoutNode) Execute

func (tn *TimeoutNode) Execute(ctx context.Context, state interface{}) (interface{}, error)

Execute runs the node with timeout

type TraceEvent

type TraceEvent string

TraceEvent represents different types of events in graph execution

const (
	// TraceEventGraphStart indicates the start of graph execution
	TraceEventGraphStart TraceEvent = "graph_start"

	// TraceEventGraphEnd indicates the end of graph execution
	TraceEventGraphEnd TraceEvent = "graph_end"

	// TraceEventNodeStart indicates the start of node execution
	TraceEventNodeStart TraceEvent = "node_start"

	// TraceEventNodeEnd indicates the end of node execution
	TraceEventNodeEnd TraceEvent = "node_end"

	// TraceEventNodeError indicates an error occurred in node execution
	TraceEventNodeError TraceEvent = "node_error"

	// TraceEventEdgeTraversal indicates traversal from one node to another
	TraceEventEdgeTraversal TraceEvent = "edge_traversal"
)

type TraceHook

type TraceHook interface {
	// OnEvent is called when a trace event occurs
	OnEvent(ctx context.Context, span *TraceSpan)
}

TraceHook defines the interface for trace event handlers

type TraceHookFunc

type TraceHookFunc func(ctx context.Context, span *TraceSpan)

TraceHookFunc is a function adapter for TraceHook

func (TraceHookFunc) OnEvent

func (f TraceHookFunc) OnEvent(ctx context.Context, span *TraceSpan)

OnEvent implements the TraceHook interface

type TraceSpan

type TraceSpan struct {
	// ID is a unique identifier for this span
	ID string

	// ParentID is the ID of the parent span (empty for root spans)
	ParentID string

	// Event indicates the type of event this span represents
	Event TraceEvent

	// NodeName is the name of the node being executed (if applicable)
	NodeName string

	// FromNode is the source node for edge traversals
	FromNode string

	// ToNode is the destination node for edge traversals
	ToNode string

	// StartTime is when this span began
	StartTime time.Time

	// EndTime is when this span completed (zero for ongoing spans)
	EndTime time.Time

	// Duration is the total time taken (calculated when span ends)
	Duration time.Duration

	// State is a snapshot of the state at this point (optional)
	State interface{}

	// Error contains any error that occurred during execution
	Error error

	// Metadata contains additional key-value pairs for observability
	Metadata map[string]interface{}
}

TraceSpan represents a span of execution with timing and metadata

func SpanFromContext

func SpanFromContext(ctx context.Context) *TraceSpan

SpanFromContext extracts a span from context

type TracedRunnable

type TracedRunnable struct {
	*Runnable
	// contains filtered or unexported fields
}

TracedRunnable wraps a Runnable with tracing capabilities

func NewTracedRunnable

func NewTracedRunnable(runnable *Runnable, tracer *Tracer) *TracedRunnable

NewTracedRunnable creates a new traced runnable

func (*TracedRunnable) GetTracer

func (tr *TracedRunnable) GetTracer() *Tracer

GetTracer returns the tracer instance

func (*TracedRunnable) Invoke

func (tr *TracedRunnable) Invoke(ctx context.Context, initialState interface{}) (interface{}, error)

Invoke executes the graph with tracing enabled

type Tracer

type Tracer struct {
	// contains filtered or unexported fields
}

Tracer manages trace collection and hooks

func NewTracer

func NewTracer() *Tracer

NewTracer creates a new tracer instance

func (*Tracer) AddHook

func (t *Tracer) AddHook(hook TraceHook)

AddHook registers a new trace hook

func (*Tracer) Clear

func (t *Tracer) Clear()

Clear removes all collected spans

func (*Tracer) EndSpan

func (t *Tracer) EndSpan(ctx context.Context, span *TraceSpan, state interface{}, err error)

EndSpan completes a trace span

func (*Tracer) GetSpans

func (t *Tracer) GetSpans() map[string]*TraceSpan

GetSpans returns all collected spans

func (*Tracer) StartSpan

func (t *Tracer) StartSpan(ctx context.Context, event TraceEvent, nodeName string) *TraceSpan

StartSpan creates a new trace span

func (*Tracer) TraceEdgeTraversal

func (t *Tracer) TraceEdgeTraversal(ctx context.Context, fromNode, toNode string)

TraceEdgeTraversal records an edge traversal event

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL