core

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package core provides the fundamental building blocks for creating and executing graph-based workflows in GoLangGraph.

The core package implements a graph execution engine that allows you to define workflows as directed graphs where nodes represent computational units and edges define the flow of execution. This package is the foundation of the GoLangGraph framework and provides the essential abstractions for building AI agent workflows.

Graph Execution Model

The core execution model revolves around two main concepts:

  • Graph: A directed graph structure containing nodes and edges that defines the workflow topology
  • BaseState: A thread-safe state container that carries data between nodes during execution

Basic Usage

Creating and executing a simple graph:

graph := core.NewGraph("my-workflow")

// Add nodes with processing functions
graph.AddNode("start", "Start Node", func(ctx context.Context, state *core.BaseState) (*core.BaseState, error) {
	state.Set("message", "Hello, World!")
	return state, nil
})

graph.AddNode("end", "End Node", func(ctx context.Context, state *core.BaseState) (*core.BaseState, error) {
	message, _ := state.Get("message")
	fmt.Println(message)
	return state, nil
})

// Connect nodes
graph.AddEdge("start", "end", nil)
graph.SetStartNode("start")
graph.AddEndNode("end")

// Execute the graph
initialState := core.NewBaseState()
ctx := context.Background()
finalState, err := graph.Execute(ctx, initialState)

Conditional Execution

Graphs support conditional edges that determine the next node based on the current state:

graph.AddEdge("decision", "path_a", func(ctx context.Context, state *core.BaseState) (string, error) {
	if condition, _ := state.Get("condition"); condition == "A" {
		return "path_a", nil
	}
	return "path_b", nil
})

State Management

The BaseState provides thread-safe access to workflow data:

state := core.NewBaseState()
state.Set("key", "value")
value, exists := state.Get("key")
state.SetMetadata("execution_id", "12345")

// Clone state for parallel processing
clonedState := state.Clone()

// Merge states from parallel branches
state.Merge(otherState)

Streaming Execution

For long-running workflows, use streaming execution to receive intermediate results:

resultChan := make(chan *core.BaseState, 10)
go func() {
	err := graph.Stream(ctx, initialState, resultChan)
	close(resultChan)
}()

for state := range resultChan {
	// Process intermediate state
}

Error Handling

The package provides comprehensive error handling with automatic retries and graceful degradation:

  • Node execution errors are wrapped with context information
  • Validation errors prevent invalid graph configurations
  • Timeout handling for long-running operations
  • Interrupt support for graceful cancellation

Thread Safety

All core types are designed to be thread-safe:

  • BaseState uses read-write mutexes for concurrent access
  • Graph execution supports parallel node processing
  • State cloning enables safe parallel branches

Performance Considerations

The core package is optimized for performance:

  • Minimal memory allocation during execution
  • Efficient state management with copy-on-write semantics
  • Lazy evaluation of conditional edges
  • Configurable retry policies and timeouts

For more advanced usage patterns and integration with other GoLangGraph packages, see the examples in the examples/ directory and the comprehensive documentation in the docs/ directory.

Index

Constants

View Source
const (
	START = "__start__"
	END   = "__end__"
)

START and END constants for graph flow control

Variables

This section is empty.

Functions

func RouteByMessageType

func RouteByMessageType(ctx context.Context, state *BaseState) (string, error)

RouteByMessageType routes based on the type of the last message

func RouteByToolCalls

func RouteByToolCalls(ctx context.Context, state *BaseState) (string, error)

RouteByToolCalls routes based on whether tool calls are present

Types

type BaseState

type BaseState struct {
	// contains filtered or unexported fields
}

BaseState represents the base state structure

func NewBaseState

func NewBaseState() *BaseState

NewBaseState creates a new base state

func (*BaseState) Clone

func (bs *BaseState) Clone() *BaseState

Clone creates a deep copy of the state

func (*BaseState) CreateSnapshot

func (bs *BaseState) CreateSnapshot() StateSnapshot

CreateSnapshot creates a snapshot of the current state

func (*BaseState) Delete

func (bs *BaseState) Delete(key string)

Delete removes a key from the state

func (*BaseState) FromJSON

func (bs *BaseState) FromJSON(data []byte) error

FromJSON loads the state from JSON

func (*BaseState) Get

func (bs *BaseState) Get(key string) (StateValue, bool)

Get retrieves a value from the state

func (*BaseState) GetAll

func (bs *BaseState) GetAll() map[string]StateValue

GetAll returns a copy of all data in the state

func (*BaseState) GetHistory

func (bs *BaseState) GetHistory() *StateHistory

GetHistory returns the state history

func (*BaseState) GetMetadata

func (bs *BaseState) GetMetadata(key string) (interface{}, bool)

GetMetadata retrieves metadata from the state

func (*BaseState) Keys

func (bs *BaseState) Keys() []string

Keys returns all keys in the state

func (*BaseState) Merge

func (bs *BaseState) Merge(other *BaseState)

Merge merges another state into this state

func (*BaseState) RestoreFromSnapshot

func (bs *BaseState) RestoreFromSnapshot(snapshot StateSnapshot)

RestoreFromSnapshot restores the state from a snapshot

func (*BaseState) Set

func (bs *BaseState) Set(key string, value StateValue)

Set sets a value in the state

func (*BaseState) SetMetadata

func (bs *BaseState) SetMetadata(key string, value interface{})

SetMetadata sets metadata for the state

func (*BaseState) ToJSON

func (bs *BaseState) ToJSON() ([]byte, error)

ToJSON converts the state to JSON

type ConditionalEdge

type ConditionalEdge struct {
	ID        string                 `json:"id"`
	From      string                 `json:"from"`
	Condition EdgeCondition          `json:"-"`
	Routes    map[string]string      `json:"routes"` // condition result -> target node
	Metadata  map[string]interface{} `json:"metadata"`
}

ConditionalEdge represents a conditional edge that can route to different nodes

type ConditionalRouter

type ConditionalRouter struct {
	// contains filtered or unexported fields
}

ConditionalRouter manages conditional routing logic

func NewConditionalRouter

func NewConditionalRouter(fallback string) *ConditionalRouter

NewConditionalRouter creates a new conditional router

func (*ConditionalRouter) AddRoute

func (cr *ConditionalRouter) AddRoute(condition string, router RouterFunction)

AddRoute adds a route with a condition

func (*ConditionalRouter) Route

func (cr *ConditionalRouter) Route(ctx context.Context, state *BaseState) (string, error)

Route determines the next node based on state

type Edge

type Edge struct {
	ID        string                 `json:"id"`
	From      string                 `json:"from"`
	To        string                 `json:"to"`
	Condition EdgeCondition          `json:"-"`
	Metadata  map[string]interface{} `json:"metadata"`
}

Edge represents an edge in the graph

type EdgeCondition

type EdgeCondition func(ctx context.Context, state *BaseState) (string, error)

EdgeCondition represents a condition function for conditional edges

type ExecutionResult

type ExecutionResult struct {
	NodeID    string        `json:"node_id"`
	Success   bool          `json:"success"`
	Error     error         `json:"error,omitempty"`
	Duration  time.Duration `json:"duration"`
	Timestamp time.Time     `json:"timestamp"`
	State     *BaseState    `json:"state,omitempty"`
}

ExecutionResult represents the result of node execution

type Graph

type Graph struct {
	ID        string                 `json:"id"`
	Name      string                 `json:"name"`
	Nodes     map[string]*Node       `json:"nodes"`
	Edges     map[string]*Edge       `json:"edges"`
	StartNode string                 `json:"start_node"`
	EndNodes  []string               `json:"end_nodes"`
	Config    *GraphConfig           `json:"config"`
	Metadata  map[string]interface{} `json:"metadata"`
	// contains filtered or unexported fields
}

Graph represents the execution graph

func NewGraph

func NewGraph(name string) *Graph

NewGraph creates a new graph

func (*Graph) AddConditionalEdges

func (g *Graph) AddConditionalEdges(from string, condition EdgeCondition, routes map[string]string) error

AddConditionalEdges adds conditional edges to the graph

func (*Graph) AddEdge

func (g *Graph) AddEdge(from, to string, condition EdgeCondition) *Edge

AddEdge adds an edge to the graph

func (*Graph) AddEndNode

func (g *Graph) AddEndNode(nodeID string) error

AddEndNode adds an end node to the graph

func (*Graph) AddNode

func (g *Graph) AddNode(id, name string, fn NodeFunc) *Node

AddNode adds a node to the graph

func (*Graph) Close

func (g *Graph) Close()

Close closes the graph and cleans up resources

func (*Graph) Execute

func (g *Graph) Execute(ctx context.Context, initialState *BaseState) (*BaseState, error)

Execute executes the graph with the given initial state

func (*Graph) ExecuteParallel

func (g *Graph) ExecuteParallel(ctx context.Context, nodeIDs []string, state *BaseState) (map[string]*ExecutionResult, error)

ExecuteParallel executes multiple nodes in parallel (for super-step execution)

func (*Graph) GetConditionalEdge

func (g *Graph) GetConditionalEdge(nodeID string) (*ConditionalEdge, bool)

GetConditionalEdge retrieves a conditional edge for a node

func (*Graph) GetCurrentState

func (g *Graph) GetCurrentState() *BaseState

GetCurrentState returns the current state

func (*Graph) GetExecutionHistory

func (g *Graph) GetExecutionHistory() []*ExecutionResult

GetExecutionHistory returns the execution history

func (*Graph) GetNextNodes

func (g *Graph) GetNextNodes(ctx context.Context, currentNodeID string, state *BaseState) ([]string, error)

GetNextNodes determines the next nodes to execute based on current node and state

func (*Graph) GetNodesByType

func (g *Graph) GetNodesByType(nodeType string) []*Node

GetNodesByType returns nodes filtered by metadata type

func (*Graph) GetTopology

func (g *Graph) GetTopology() map[string][]string

GetTopology returns the graph topology as adjacency list

func (*Graph) Interrupt

func (g *Graph) Interrupt()

Interrupt interrupts the current execution

func (*Graph) IsEndNode

func (g *Graph) IsEndNode(nodeID string) bool

IsEndNode checks if a node is an end node

func (*Graph) IsRunning

func (g *Graph) IsRunning() bool

IsRunning returns whether the graph is currently executing

func (*Graph) IsStartNode

func (g *Graph) IsStartNode(nodeID string) bool

IsStartNode checks if a node is the start node

func (*Graph) Reset

func (g *Graph) Reset()

Reset resets the graph execution state

func (*Graph) SetStartNode

func (g *Graph) SetStartNode(nodeID string) error

SetStartNode sets the starting node for execution

func (*Graph) Stream

func (g *Graph) Stream() <-chan *ExecutionResult

Stream returns a channel for streaming execution results

func (*Graph) Validate

func (g *Graph) Validate() error

Validate validates the graph structure

type GraphConfig

type GraphConfig struct {
	MaxIterations     int           `json:"max_iterations"`
	Timeout           time.Duration `json:"timeout"`
	EnableStreaming   bool          `json:"enable_streaming"`
	EnableCheckpoints bool          `json:"enable_checkpoints"`
	ParallelExecution bool          `json:"parallel_execution"`
	RetryAttempts     int           `json:"retry_attempts"`
	RetryDelay        time.Duration `json:"retry_delay"`
}

GraphConfig represents configuration for graph execution

func DefaultGraphConfig

func DefaultGraphConfig() *GraphConfig

DefaultGraphConfig returns default configuration

type Node

type Node struct {
	ID       string                 `json:"id"`
	Name     string                 `json:"name"`
	Function NodeFunc               `json:"-"`
	Metadata map[string]interface{} `json:"metadata"`
}

Node represents a node in the graph

type NodeFunc

type NodeFunc func(ctx context.Context, state *BaseState) (*BaseState, error)

NodeFunc represents a function that can be executed as a node

type RouterFunction

type RouterFunction func(ctx context.Context, state *BaseState) (string, error)

RouterFunction represents a function that determines the next node based on state

func RouteByCondition

func RouteByCondition(conditionKey string, trueRoute string, falseRoute string) RouterFunction

RouteByCondition routes based on a boolean condition in state

func RouteByCounter

func RouteByCounter(counterKey string, maxCount int, continueRoute string, exitRoute string) RouterFunction

RouteByCounter routes based on a counter value

func RouteByStateValue

func RouteByStateValue(key string, routes map[interface{}]string, defaultRoute string) RouterFunction

RouteByStateValue routes based on a specific state value

type StateHistory

type StateHistory struct {
	// contains filtered or unexported fields
}

StateHistory manages the history of state changes

func NewStateHistory

func NewStateHistory(maxSize int) *StateHistory

NewStateHistory creates a new state history with a maximum size

func (*StateHistory) AddSnapshot

func (sh *StateHistory) AddSnapshot(snapshot StateSnapshot)

AddSnapshot adds a new snapshot to the history

func (*StateHistory) GetSnapshot

func (sh *StateHistory) GetSnapshot(id string) (*StateSnapshot, error)

GetSnapshot returns a specific snapshot by ID

func (*StateHistory) GetSnapshots

func (sh *StateHistory) GetSnapshots() []StateSnapshot

GetSnapshots returns all snapshots in the history

type StateManager

type StateManager struct {
	// contains filtered or unexported fields
}

StateManager manages multiple states and provides advanced operations

func NewStateManager

func NewStateManager() *StateManager

NewStateManager creates a new state manager

func (*StateManager) CreateState

func (sm *StateManager) CreateState(id string) *BaseState

CreateState creates a new state with the given ID

func (*StateManager) DeleteState

func (sm *StateManager) DeleteState(id string)

DeleteState removes a state by ID

func (*StateManager) GetState

func (sm *StateManager) GetState(id string) (*BaseState, bool)

GetState retrieves a state by ID

func (*StateManager) ListStates

func (sm *StateManager) ListStates() []string

ListStates returns all state IDs

type StateSnapshot

type StateSnapshot struct {
	ID        string                 `json:"id"`
	Timestamp time.Time              `json:"timestamp"`
	Data      map[string]StateValue  `json:"data"`
	Metadata  map[string]interface{} `json:"metadata"`
}

StateSnapshot represents a snapshot of the state at a specific point in time

type StateValue

type StateValue interface{}

StateValue represents any value that can be stored in state

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL