Documentation
¶
Overview ¶
Package core provides the fundamental building blocks for creating and executing graph-based workflows in GoLangGraph.
The core package implements a graph execution engine that allows you to define workflows as directed graphs where nodes represent computational units and edges define the flow of execution. This package is the foundation of the GoLangGraph framework and provides the essential abstractions for building AI agent workflows.
Graph Execution Model ¶
The core execution model revolves around two main concepts:
- Graph: A directed graph structure containing nodes and edges that defines the workflow topology
- BaseState: A thread-safe state container that carries data between nodes during execution
Basic Usage ¶
Creating and executing a simple graph:
graph := core.NewGraph("my-workflow")
// Add nodes with processing functions
graph.AddNode("start", "Start Node", func(ctx context.Context, state *core.BaseState) (*core.BaseState, error) {
state.Set("message", "Hello, World!")
return state, nil
})
graph.AddNode("end", "End Node", func(ctx context.Context, state *core.BaseState) (*core.BaseState, error) {
message, _ := state.Get("message")
fmt.Println(message)
return state, nil
})
// Connect nodes
graph.AddEdge("start", "end", nil)
graph.SetStartNode("start")
graph.AddEndNode("end")
// Execute the graph
initialState := core.NewBaseState()
ctx := context.Background()
finalState, err := graph.Execute(ctx, initialState)
Conditional Execution ¶
Graphs support conditional edges that determine the next node based on the current state:
graph.AddEdge("decision", "path_a", func(ctx context.Context, state *core.BaseState) (string, error) {
if condition, _ := state.Get("condition"); condition == "A" {
return "path_a", nil
}
return "path_b", nil
})
State Management ¶
The BaseState provides thread-safe access to workflow data:
state := core.NewBaseState()
state.Set("key", "value")
value, exists := state.Get("key")
state.SetMetadata("execution_id", "12345")
// Clone state for parallel processing
clonedState := state.Clone()
// Merge states from parallel branches
state.Merge(otherState)
Streaming Execution ¶
For long-running workflows, use streaming execution to receive intermediate results:
resultChan := make(chan *core.BaseState, 10)
go func() {
err := graph.Stream(ctx, initialState, resultChan)
close(resultChan)
}()
for state := range resultChan {
// Process intermediate state
}
Error Handling ¶
The package provides comprehensive error handling with automatic retries and graceful degradation:
- Node execution errors are wrapped with context information
- Validation errors prevent invalid graph configurations
- Timeout handling for long-running operations
- Interrupt support for graceful cancellation
Thread Safety ¶
All core types are designed to be thread-safe:
- BaseState uses read-write mutexes for concurrent access
- Graph execution supports parallel node processing
- State cloning enables safe parallel branches
Performance Considerations ¶
The core package is optimized for performance:
- Minimal memory allocation during execution
- Efficient state management with copy-on-write semantics
- Lazy evaluation of conditional edges
- Configurable retry policies and timeouts
For more advanced usage patterns and integration with other GoLangGraph packages, see the examples in the examples/ directory and the comprehensive documentation in the docs/ directory.
Index ¶
- Constants
- func RouteByMessageType(ctx context.Context, state *BaseState) (string, error)
- func RouteByToolCalls(ctx context.Context, state *BaseState) (string, error)
- type BaseState
- func (bs *BaseState) Clone() *BaseState
- func (bs *BaseState) CreateSnapshot() StateSnapshot
- func (bs *BaseState) Delete(key string)
- func (bs *BaseState) FromJSON(data []byte) error
- func (bs *BaseState) Get(key string) (StateValue, bool)
- func (bs *BaseState) GetAll() map[string]StateValue
- func (bs *BaseState) GetHistory() *StateHistory
- func (bs *BaseState) GetMetadata(key string) (interface{}, bool)
- func (bs *BaseState) Keys() []string
- func (bs *BaseState) Merge(other *BaseState)
- func (bs *BaseState) RestoreFromSnapshot(snapshot StateSnapshot)
- func (bs *BaseState) Set(key string, value StateValue)
- func (bs *BaseState) SetMetadata(key string, value interface{})
- func (bs *BaseState) ToJSON() ([]byte, error)
- type ConditionalEdge
- type ConditionalRouter
- type Edge
- type EdgeCondition
- type ExecutionResult
- type Graph
- func (g *Graph) AddConditionalEdges(from string, condition EdgeCondition, routes map[string]string) error
- func (g *Graph) AddEdge(from, to string, condition EdgeCondition) *Edge
- func (g *Graph) AddEndNode(nodeID string) error
- func (g *Graph) AddNode(id, name string, fn NodeFunc) *Node
- func (g *Graph) Close()
- func (g *Graph) Execute(ctx context.Context, initialState *BaseState) (*BaseState, error)
- func (g *Graph) ExecuteParallel(ctx context.Context, nodeIDs []string, state *BaseState) (map[string]*ExecutionResult, error)
- func (g *Graph) GetConditionalEdge(nodeID string) (*ConditionalEdge, bool)
- func (g *Graph) GetCurrentState() *BaseState
- func (g *Graph) GetExecutionHistory() []*ExecutionResult
- func (g *Graph) GetNextNodes(ctx context.Context, currentNodeID string, state *BaseState) ([]string, error)
- func (g *Graph) GetNodesByType(nodeType string) []*Node
- func (g *Graph) GetTopology() map[string][]string
- func (g *Graph) Interrupt()
- func (g *Graph) IsEndNode(nodeID string) bool
- func (g *Graph) IsRunning() bool
- func (g *Graph) IsStartNode(nodeID string) bool
- func (g *Graph) Reset()
- func (g *Graph) SetStartNode(nodeID string) error
- func (g *Graph) Stream() <-chan *ExecutionResult
- func (g *Graph) Validate() error
- type GraphConfig
- type Node
- type NodeFunc
- type RouterFunction
- func RouteByCondition(conditionKey string, trueRoute string, falseRoute string) RouterFunction
- func RouteByCounter(counterKey string, maxCount int, continueRoute string, exitRoute string) RouterFunction
- func RouteByStateValue(key string, routes map[interface{}]string, defaultRoute string) RouterFunction
- type StateHistory
- type StateManager
- type StateSnapshot
- type StateValue
Constants ¶
const ( START = "__start__" END = "__end__" )
START and END constants for graph flow control
Variables ¶
This section is empty.
Functions ¶
func RouteByMessageType ¶
RouteByMessageType routes based on the type of the last message
Types ¶
type BaseState ¶
type BaseState struct {
// contains filtered or unexported fields
}
BaseState represents the base state structure
func (*BaseState) CreateSnapshot ¶
func (bs *BaseState) CreateSnapshot() StateSnapshot
CreateSnapshot creates a snapshot of the current state
func (*BaseState) Get ¶
func (bs *BaseState) Get(key string) (StateValue, bool)
Get retrieves a value from the state
func (*BaseState) GetAll ¶
func (bs *BaseState) GetAll() map[string]StateValue
GetAll returns a copy of all data in the state
func (*BaseState) GetHistory ¶
func (bs *BaseState) GetHistory() *StateHistory
GetHistory returns the state history
func (*BaseState) GetMetadata ¶
GetMetadata retrieves metadata from the state
func (*BaseState) RestoreFromSnapshot ¶
func (bs *BaseState) RestoreFromSnapshot(snapshot StateSnapshot)
RestoreFromSnapshot restores the state from a snapshot
func (*BaseState) Set ¶
func (bs *BaseState) Set(key string, value StateValue)
Set sets a value in the state
func (*BaseState) SetMetadata ¶
SetMetadata sets metadata for the state
type ConditionalEdge ¶
type ConditionalEdge struct {
ID string `json:"id"`
From string `json:"from"`
Condition EdgeCondition `json:"-"`
Routes map[string]string `json:"routes"` // condition result -> target node
Metadata map[string]interface{} `json:"metadata"`
}
ConditionalEdge represents a conditional edge that can route to different nodes
type ConditionalRouter ¶
type ConditionalRouter struct {
// contains filtered or unexported fields
}
ConditionalRouter manages conditional routing logic
func NewConditionalRouter ¶
func NewConditionalRouter(fallback string) *ConditionalRouter
NewConditionalRouter creates a new conditional router
func (*ConditionalRouter) AddRoute ¶
func (cr *ConditionalRouter) AddRoute(condition string, router RouterFunction)
AddRoute adds a route with a condition
type Edge ¶
type Edge struct {
ID string `json:"id"`
From string `json:"from"`
To string `json:"to"`
Condition EdgeCondition `json:"-"`
Metadata map[string]interface{} `json:"metadata"`
}
Edge represents an edge in the graph
type EdgeCondition ¶
EdgeCondition represents a condition function for conditional edges
type ExecutionResult ¶
type ExecutionResult struct {
NodeID string `json:"node_id"`
Success bool `json:"success"`
Error error `json:"error,omitempty"`
Duration time.Duration `json:"duration"`
Timestamp time.Time `json:"timestamp"`
State *BaseState `json:"state,omitempty"`
}
ExecutionResult represents the result of node execution
type Graph ¶
type Graph struct {
ID string `json:"id"`
Name string `json:"name"`
Nodes map[string]*Node `json:"nodes"`
Edges map[string]*Edge `json:"edges"`
StartNode string `json:"start_node"`
EndNodes []string `json:"end_nodes"`
Config *GraphConfig `json:"config"`
Metadata map[string]interface{} `json:"metadata"`
// contains filtered or unexported fields
}
Graph represents the execution graph
func (*Graph) AddConditionalEdges ¶
func (g *Graph) AddConditionalEdges(from string, condition EdgeCondition, routes map[string]string) error
AddConditionalEdges adds conditional edges to the graph
func (*Graph) AddEdge ¶
func (g *Graph) AddEdge(from, to string, condition EdgeCondition) *Edge
AddEdge adds an edge to the graph
func (*Graph) AddEndNode ¶
AddEndNode adds an end node to the graph
func (*Graph) ExecuteParallel ¶
func (g *Graph) ExecuteParallel(ctx context.Context, nodeIDs []string, state *BaseState) (map[string]*ExecutionResult, error)
ExecuteParallel executes multiple nodes in parallel (for super-step execution)
func (*Graph) GetConditionalEdge ¶
func (g *Graph) GetConditionalEdge(nodeID string) (*ConditionalEdge, bool)
GetConditionalEdge retrieves a conditional edge for a node
func (*Graph) GetCurrentState ¶
GetCurrentState returns the current state
func (*Graph) GetExecutionHistory ¶
func (g *Graph) GetExecutionHistory() []*ExecutionResult
GetExecutionHistory returns the execution history
func (*Graph) GetNextNodes ¶
func (g *Graph) GetNextNodes(ctx context.Context, currentNodeID string, state *BaseState) ([]string, error)
GetNextNodes determines the next nodes to execute based on current node and state
func (*Graph) GetNodesByType ¶
GetNodesByType returns nodes filtered by metadata type
func (*Graph) GetTopology ¶
GetTopology returns the graph topology as adjacency list
func (*Graph) IsStartNode ¶
IsStartNode checks if a node is the start node
func (*Graph) SetStartNode ¶
SetStartNode sets the starting node for execution
func (*Graph) Stream ¶
func (g *Graph) Stream() <-chan *ExecutionResult
Stream returns a channel for streaming execution results
type GraphConfig ¶
type GraphConfig struct {
MaxIterations int `json:"max_iterations"`
Timeout time.Duration `json:"timeout"`
EnableStreaming bool `json:"enable_streaming"`
EnableCheckpoints bool `json:"enable_checkpoints"`
ParallelExecution bool `json:"parallel_execution"`
RetryAttempts int `json:"retry_attempts"`
RetryDelay time.Duration `json:"retry_delay"`
}
GraphConfig represents configuration for graph execution
func DefaultGraphConfig ¶
func DefaultGraphConfig() *GraphConfig
DefaultGraphConfig returns default configuration
type Node ¶
type Node struct {
ID string `json:"id"`
Name string `json:"name"`
Function NodeFunc `json:"-"`
Metadata map[string]interface{} `json:"metadata"`
}
Node represents a node in the graph
type RouterFunction ¶
RouterFunction represents a function that determines the next node based on state
func RouteByCondition ¶
func RouteByCondition(conditionKey string, trueRoute string, falseRoute string) RouterFunction
RouteByCondition routes based on a boolean condition in state
func RouteByCounter ¶
func RouteByCounter(counterKey string, maxCount int, continueRoute string, exitRoute string) RouterFunction
RouteByCounter routes based on a counter value
func RouteByStateValue ¶
func RouteByStateValue(key string, routes map[interface{}]string, defaultRoute string) RouterFunction
RouteByStateValue routes based on a specific state value
type StateHistory ¶
type StateHistory struct {
// contains filtered or unexported fields
}
StateHistory manages the history of state changes
func NewStateHistory ¶
func NewStateHistory(maxSize int) *StateHistory
NewStateHistory creates a new state history with a maximum size
func (*StateHistory) AddSnapshot ¶
func (sh *StateHistory) AddSnapshot(snapshot StateSnapshot)
AddSnapshot adds a new snapshot to the history
func (*StateHistory) GetSnapshot ¶
func (sh *StateHistory) GetSnapshot(id string) (*StateSnapshot, error)
GetSnapshot returns a specific snapshot by ID
func (*StateHistory) GetSnapshots ¶
func (sh *StateHistory) GetSnapshots() []StateSnapshot
GetSnapshots returns all snapshots in the history
type StateManager ¶
type StateManager struct {
// contains filtered or unexported fields
}
StateManager manages multiple states and provides advanced operations
func NewStateManager ¶
func NewStateManager() *StateManager
NewStateManager creates a new state manager
func (*StateManager) CreateState ¶
func (sm *StateManager) CreateState(id string) *BaseState
CreateState creates a new state with the given ID
func (*StateManager) DeleteState ¶
func (sm *StateManager) DeleteState(id string)
DeleteState removes a state by ID
func (*StateManager) GetState ¶
func (sm *StateManager) GetState(id string) (*BaseState, bool)
GetState retrieves a state by ID
func (*StateManager) ListStates ¶
func (sm *StateManager) ListStates() []string
ListStates returns all state IDs
type StateSnapshot ¶
type StateSnapshot struct {
ID string `json:"id"`
Timestamp time.Time `json:"timestamp"`
Data map[string]StateValue `json:"data"`
Metadata map[string]interface{} `json:"metadata"`
}
StateSnapshot represents a snapshot of the state at a specific point in time
type StateValue ¶
type StateValue interface{}
StateValue represents any value that can be stored in state