Documentation
¶
Overview ¶
Package checkpoint provides execution state capture and recovery.
Architecture (ported from legacy Hector) ¶
ExecutionState captures the full state of an agent execution at a point in time. This enables:
- Fault tolerance: Resume after crashes
- HITL workflows: Pause for human approval, resume later
- Long-running tasks: Survive server restarts
- Cost optimization: Don't re-execute completed work
The checkpoint system is built on top of session.Service - checkpoints are stored in session state (under "pending_executions" key) and can be recovered on startup.
Multi-Agent Scope ¶
Checkpoints capture the state of the CURRENTLY EXECUTING agent only, not the entire agent tree. This is intentional and matches legacy Hector behavior.
Why single-agent scope is sufficient:
Session events are the source of truth - All agent interactions are persisted to session.Service, providing complete conversation history across all agents.
Runner determines correct agent - On recovery, runner.findAgentToRun() uses session event history to determine which agent should resume.
Context is preserved - The LLM sees full conversation history via session events when the agent resumes.
Recovery Flow ¶
┌─────────────────────────────────────────────────────────────────────────┐
│ CHECKPOINT CREATION │
│ ─────────────────── │
│ User → Coordinator → Researcher (tool approval needed) │
│ ↓ │
│ CHECKPOINT: AgentName="researcher" │
│ AgentState={iteration: 1, ...} │
│ PendingToolCall={requires_approval: true} │
├─────────────────────────────────────────────────────────────────────────┤
│ RECOVERY │
│ ──────── │
│ 1. Load checkpoint → AgentName="researcher" │
│ 2. Load session → Events from all agents (full history) │
│ 3. Runner.findAgentToRun() → Returns "researcher" │
│ 4. Resume researcher with full context │
└─────────────────────────────────────────────────────────────────────────┘
Workflow Agent Support ¶
For workflow agents (sequential, parallel, loop), additional state is captured:
- WorkflowType: The workflow pattern being executed
- WorkflowStage: Current stage in sequential workflows
- LoopIteration: Current iteration in loop workflows
Parallel workflows have limited checkpoint support because multiple agents may be executing concurrently. In this case, recovery starts the parallel workflow from the beginning.
Integration ¶
Agents can implement the agent.Checkpointable interface to provide custom state capture/restore logic. The checkpoint hooks in manager.go provide integration points for the runner.
Index ¶
- type AgentStateSnapshot
- type CheckpointHooks
- func (h *CheckpointHooks) AfterLLMCall(ctx context.Context, state *State)
- func (h *CheckpointHooks) AfterToolExecution(ctx context.Context, state *State, toolName string)
- func (h *CheckpointHooks) BeforeLLMCall(ctx context.Context, state *State)
- func (h *CheckpointHooks) BeforeToolExecution(ctx context.Context, state *State, toolName string)
- func (h *CheckpointHooks) OnComplete(ctx context.Context, appName, userID, sessionID, taskID string)
- func (h *CheckpointHooks) OnError(ctx context.Context, state *State, err error)
- func (h *CheckpointHooks) OnIterationEnd(ctx context.Context, state *State, iteration int)
- func (h *CheckpointHooks) OnToolApprovalRequired(ctx context.Context, state *State, pendingTool *PendingToolCall)
- type CheckpointStats
- type Config
- func (c *Config) GetRecoveryTimeout() time.Duration
- func (c *Config) IsEnabled() bool
- func (c *Config) SetDefaults()
- func (c *Config) ShouldAutoResume() bool
- func (c *Config) ShouldAutoResumeHITL() bool
- func (c *Config) ShouldCheckpointAfterTools() bool
- func (c *Config) ShouldCheckpointAtIteration(iteration int) bool
- func (c *Config) ShouldCheckpointBeforeLLM() bool
- func (c *Config) ShouldCheckpointInterval() bool
- func (c *Config) Validate() error
- type Manager
- func (m *Manager) ClearCheckpoint(ctx context.Context, appName, userID, sessionID, taskID string) error
- func (m *Manager) Config() *Config
- func (m *Manager) GetPendingCheckpoints(ctx context.Context, appName, userID string) ([]*State, error)
- func (m *Manager) GetStats(ctx context.Context, appName string) (*CheckpointStats, error)
- func (m *Manager) IsEnabled() bool
- func (m *Manager) LoadCheckpoint(ctx context.Context, appName, userID, sessionID, taskID string) (*State, error)
- func (m *Manager) RecoverOnStartup(ctx context.Context, appName string) error
- func (m *Manager) ResumeTask(ctx context.Context, appName, userID, sessionID, taskID string, ...) error
- func (m *Manager) SaveCheckpoint(ctx context.Context, state *State) error
- func (m *Manager) SetResumeCallback(cb ResumeCallback)
- func (m *Manager) ShouldCheckpointAfterTools() bool
- func (m *Manager) ShouldCheckpointAtIteration(iteration int) bool
- func (m *Manager) ShouldCheckpointBeforeLLM() bool
- type PendingToolCall
- type Phase
- type RecoveryConfig
- type RecoveryManager
- func (m *RecoveryManager) CancelCheckpoint(ctx context.Context, appName, userID, sessionID, taskID string) error
- func (m *RecoveryManager) GetCheckpoint(ctx context.Context, appName, userID, sessionID, taskID string) (*State, error)
- func (m *RecoveryManager) GetPendingCheckpoints(ctx context.Context, appName, userID string) ([]*State, error)
- func (m *RecoveryManager) GetStats(ctx context.Context, appName string) (*CheckpointStats, error)
- func (m *RecoveryManager) RecoverPendingTasks(ctx context.Context, appName string) error
- func (m *RecoveryManager) ResumeTask(ctx context.Context, appName, userID, sessionID, taskID string, ...) error
- func (m *RecoveryManager) SetResumeCallback(cb ResumeCallback)
- type ResumeCallback
- type State
- func (s *State) IsExpired(timeout time.Duration) bool
- func (s *State) IsRecoverable() bool
- func (s *State) NeedsUserInput() bool
- func (s *State) Serialize() ([]byte, error)
- func (s *State) WithAgentState(as *AgentStateSnapshot) *State
- func (s *State) WithError(err error) *State
- func (s *State) WithLastEventIndex(idx int) *State
- func (s *State) WithPendingToolCall(tc *PendingToolCall) *State
- func (s *State) WithPhase(phase Phase) *State
- func (s *State) WithType(t Type) *State
- type Storage
- func (s *Storage) Clear(ctx context.Context, appName, userID, sessionID, taskID string) error
- func (s *Storage) ListAllPending(ctx context.Context, appName string) ([]*State, error)
- func (s *Storage) ListPending(ctx context.Context, appName, userID string) ([]*State, error)
- func (s *Storage) Load(ctx context.Context, appName, userID, sessionID, taskID string) (*State, error)
- func (s *Storage) Save(ctx context.Context, state *State) error
- type Strategy
- type ToolCallSnapshot
- type Type
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentStateSnapshot ¶
type AgentStateSnapshot struct {
// Iteration tracking
Iteration int `json:"iteration"`
TotalTokens int `json:"total_tokens"`
// Conversation state (from legacy ReasoningStateSnapshot)
History []*agent.Event `json:"history,omitempty"`
LastEvent *agent.Event `json:"last_event,omitempty"`
CurrentTurn []*agent.Event `json:"current_turn,omitempty"` // Messages in current turn
// Response accumulation
AccumulatedResponse string `json:"accumulated_response,omitempty"`
FinalResponseAdded bool `json:"final_response_added"` // Response complete flag
// Tool execution tracking
PendingToolCalls []*ToolCallSnapshot `json:"pending_tool_calls,omitempty"`
FirstIterationToolCalls []*ToolCallSnapshot `json:"first_iteration_tool_calls,omitempty"` // For agentic loop
// Multi-agent context (from legacy SubAgents field)
SubAgents []string `json:"sub_agents,omitempty"` // Available sub-agents (for transfer)
ParentAgent string `json:"parent_agent,omitempty"` // Who invoked this agent
Branch string `json:"branch,omitempty"` // Agent branch path (e.g., "root.coordinator.researcher")
// Workflow state (for sequential/parallel/loop agents)
WorkflowType string `json:"workflow_type,omitempty"` // "sequential", "parallel", "loop"
WorkflowStage int `json:"workflow_stage,omitempty"` // Current stage index in sequential
LoopIteration int `json:"loop_iteration,omitempty"` // Current loop iteration
LoopMaxIterations int `json:"loop_max_iterations,omitempty"` // Max loop iterations
// Agent-specific state (for custom agents)
Custom map[string]any `json:"custom,omitempty"`
}
AgentStateSnapshot captures the state of an LLM agent during execution.
Multi-Agent Scope (ported from legacy Hector):
Checkpoints capture the state of the CURRENTLY EXECUTING agent only. This is intentional - the full multi-agent history is preserved in session events (the source of truth). On recovery: 1. Checkpoint tells us which agent was active 2. Session events provide full conversation history 3. Runner.findAgentToRun() routes to the correct agent
For workflow agents (sequential, parallel, loop), the WorkflowState fields track progress within the workflow.
type CheckpointHooks ¶
type CheckpointHooks struct {
// contains filtered or unexported fields
}
CheckpointHooks provides integration hooks for the runner.
func NewCheckpointHooks ¶
func NewCheckpointHooks(manager *Manager) *CheckpointHooks
NewCheckpointHooks creates hooks for runner integration.
func (*CheckpointHooks) AfterLLMCall ¶
func (h *CheckpointHooks) AfterLLMCall(ctx context.Context, state *State)
AfterLLMCall creates a checkpoint after an LLM API call.
func (*CheckpointHooks) AfterToolExecution ¶
func (h *CheckpointHooks) AfterToolExecution(ctx context.Context, state *State, toolName string)
AfterToolExecution creates a checkpoint after tool execution.
func (*CheckpointHooks) BeforeLLMCall ¶
func (h *CheckpointHooks) BeforeLLMCall(ctx context.Context, state *State)
BeforeLLMCall creates a checkpoint before an LLM API call.
func (*CheckpointHooks) BeforeToolExecution ¶
func (h *CheckpointHooks) BeforeToolExecution(ctx context.Context, state *State, toolName string)
BeforeToolExecution creates a checkpoint before tool execution.
func (*CheckpointHooks) OnComplete ¶
func (h *CheckpointHooks) OnComplete(ctx context.Context, appName, userID, sessionID, taskID string)
OnComplete clears the checkpoint when execution completes successfully.
func (*CheckpointHooks) OnError ¶
func (h *CheckpointHooks) OnError(ctx context.Context, state *State, err error)
OnError creates a checkpoint when an error occurs.
func (*CheckpointHooks) OnIterationEnd ¶
func (h *CheckpointHooks) OnIterationEnd(ctx context.Context, state *State, iteration int)
OnIterationEnd creates a checkpoint at end of an iteration.
func (*CheckpointHooks) OnToolApprovalRequired ¶
func (h *CheckpointHooks) OnToolApprovalRequired(ctx context.Context, state *State, pendingTool *PendingToolCall)
OnToolApprovalRequired creates a checkpoint when HITL approval is needed.
type CheckpointStats ¶
type CheckpointStats struct {
Total int
Working int
InputRequired int
Expired int
OldestAge time.Duration
AverageAge time.Duration
}
CheckpointStats contains statistics about pending checkpoints.
type Config ¶
type Config struct {
// Enabled enables checkpointing.
// Default: false
Enabled *bool `yaml:"enabled,omitempty"`
// Strategy determines when checkpoints are created.
// Values: "event", "interval", "hybrid"
// Default: "event"
Strategy Strategy `yaml:"strategy,omitempty"`
// Interval specifies checkpoint frequency (every N iterations).
// Only used when Strategy is "interval" or "hybrid".
// Default: 0 (disabled)
Interval int `yaml:"interval,omitempty"`
// AfterTools checkpoints after tool executions complete.
// Default: false
AfterTools *bool `yaml:"after_tools,omitempty"`
// BeforeLLM checkpoints before LLM API calls.
// Default: false
BeforeLLM *bool `yaml:"before_llm,omitempty"`
// Recovery configures checkpoint recovery behavior.
Recovery *RecoveryConfig `yaml:"recovery,omitempty"`
}
Config configures checkpoint behavior.
Example YAML configuration:
checkpoint:
enabled: true
strategy: hybrid
interval: 5
after_tools: true
before_llm: false
recovery:
auto_resume: true
auto_resume_hitl: false
timeout: 3600
func (*Config) GetRecoveryTimeout ¶
GetRecoveryTimeout returns the recovery timeout as a duration.
func (*Config) ShouldAutoResume ¶
ShouldAutoResume returns whether to auto-resume on startup.
func (*Config) ShouldAutoResumeHITL ¶
ShouldAutoResumeHITL returns whether to auto-resume INPUT_REQUIRED tasks.
func (*Config) ShouldCheckpointAfterTools ¶
ShouldCheckpointAfterTools returns whether to checkpoint after tool execution.
func (*Config) ShouldCheckpointAtIteration ¶
ShouldCheckpointAtIteration returns whether to checkpoint at the given iteration.
func (*Config) ShouldCheckpointBeforeLLM ¶
ShouldCheckpointBeforeLLM returns whether to checkpoint before LLM calls.
func (*Config) ShouldCheckpointInterval ¶
ShouldCheckpointInterval returns whether interval checkpointing is enabled.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager orchestrates checkpointing and recovery operations.
It provides a unified interface for:
- Creating checkpoints during execution
- Recovering from checkpoints on startup
- Managing checkpoint lifecycle
func NewManager ¶
NewManager creates a new checkpoint Manager.
func (*Manager) ClearCheckpoint ¶
func (m *Manager) ClearCheckpoint(ctx context.Context, appName, userID, sessionID, taskID string) error
ClearCheckpoint removes a checkpoint.
func (*Manager) GetPendingCheckpoints ¶
func (m *Manager) GetPendingCheckpoints(ctx context.Context, appName, userID string) ([]*State, error)
GetPendingCheckpoints returns all pending checkpoints for a user.
func (*Manager) LoadCheckpoint ¶
func (m *Manager) LoadCheckpoint(ctx context.Context, appName, userID, sessionID, taskID string) (*State, error)
LoadCheckpoint retrieves a checkpoint by identifiers.
func (*Manager) RecoverOnStartup ¶
RecoverOnStartup recovers pending tasks on startup.
func (*Manager) ResumeTask ¶
func (m *Manager) ResumeTask(ctx context.Context, appName, userID, sessionID, taskID string, userInput string) error
ResumeTask manually resumes a task from checkpoint.
func (*Manager) SaveCheckpoint ¶
SaveCheckpoint creates and persists a checkpoint.
func (*Manager) SetResumeCallback ¶
func (m *Manager) SetResumeCallback(cb ResumeCallback)
SetResumeCallback sets the callback for resuming tasks.
func (*Manager) ShouldCheckpointAfterTools ¶
ShouldCheckpointAfterTools returns whether to checkpoint after tool execution.
func (*Manager) ShouldCheckpointAtIteration ¶
ShouldCheckpointAtIteration returns whether to checkpoint at the given iteration.
func (*Manager) ShouldCheckpointBeforeLLM ¶
ShouldCheckpointBeforeLLM returns whether to checkpoint before LLM calls.
type PendingToolCall ¶
type PendingToolCall struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Arguments map[string]any `json:"arguments,omitempty"`
RequiresApproval bool `json:"requires_approval"`
}
PendingToolCall represents a tool call awaiting execution or approval.
type Phase ¶
type Phase string
Phase represents the execution phase when checkpoint was created.
const ( // PhaseInitialized - Agent execution just started. PhaseInitialized Phase = "initialized" // PhasePreLLM - Before LLM call. PhasePreLLM Phase = "pre_llm" // PhasePostLLM - After LLM response received. PhasePostLLM Phase = "post_llm" // PhaseToolExecution - During tool execution. PhaseToolExecution Phase = "tool_execution" // PhasePostTool - After tool execution completed. PhasePostTool Phase = "post_tool" // PhaseIterationEnd - End of an agent loop iteration. PhaseIterationEnd Phase = "iteration_end" // PhaseToolApproval - Waiting for HITL tool approval. PhaseToolApproval Phase = "tool_approval" // PhaseError - Checkpoint created due to error. PhaseError Phase = "error" )
type RecoveryConfig ¶
type RecoveryConfig struct {
// AutoResume enables automatic recovery on startup.
// Default: false
AutoResume *bool `yaml:"auto_resume,omitempty"`
// AutoResumeHITL enables automatic recovery for INPUT_REQUIRED tasks.
// When false, INPUT_REQUIRED tasks wait for explicit user action.
// Default: false
AutoResumeHITL *bool `yaml:"auto_resume_hitl,omitempty"`
// Timeout is the maximum age (in seconds) for a checkpoint to be recoverable.
// Checkpoints older than this are considered expired and marked as FAILED.
// Default: 3600 (1 hour)
Timeout int `yaml:"timeout,omitempty"`
}
RecoveryConfig configures checkpoint recovery behavior.
func (*RecoveryConfig) SetDefaults ¶
func (c *RecoveryConfig) SetDefaults()
SetDefaults applies default values for RecoveryConfig.
func (*RecoveryConfig) Validate ¶
func (c *RecoveryConfig) Validate() error
Validate checks the RecoveryConfig.
type RecoveryManager ¶
type RecoveryManager struct {
// contains filtered or unexported fields
}
RecoveryManager handles checkpoint recovery on startup and during runtime.
Architecture (ported from legacy Hector):
On startup, RecoveryManager scans for pending checkpoints and: 1. Validates checkpoint states (not expired, recoverable) 2. For WORKING tasks: Auto-resumes if configured 3. For INPUT_REQUIRED tasks: Waits for user action (unless AutoResumeHITL) 4. For expired checkpoints: Marks tasks as FAILED
func NewRecoveryManager ¶
func NewRecoveryManager(cfg *Config, storage *Storage) *RecoveryManager
NewRecoveryManager creates a new RecoveryManager.
func (*RecoveryManager) CancelCheckpoint ¶
func (m *RecoveryManager) CancelCheckpoint(ctx context.Context, appName, userID, sessionID, taskID string) error
CancelCheckpoint removes a checkpoint without resuming.
func (*RecoveryManager) GetCheckpoint ¶
func (m *RecoveryManager) GetCheckpoint(ctx context.Context, appName, userID, sessionID, taskID string) (*State, error)
GetCheckpoint returns a specific checkpoint.
func (*RecoveryManager) GetPendingCheckpoints ¶
func (m *RecoveryManager) GetPendingCheckpoints(ctx context.Context, appName, userID string) ([]*State, error)
GetPendingCheckpoints returns all pending checkpoints for a user.
func (*RecoveryManager) GetStats ¶
func (m *RecoveryManager) GetStats(ctx context.Context, appName string) (*CheckpointStats, error)
GetStats returns statistics about pending checkpoints.
func (*RecoveryManager) RecoverPendingTasks ¶
func (m *RecoveryManager) RecoverPendingTasks(ctx context.Context, appName string) error
RecoverPendingTasks recovers tasks with checkpoints on startup. This should be called during server initialization.
func (*RecoveryManager) ResumeTask ¶
func (m *RecoveryManager) ResumeTask(ctx context.Context, appName, userID, sessionID, taskID string, userInput string) error
ResumeTask manually resumes a task from checkpoint. This is used when a user explicitly requests to resume an INPUT_REQUIRED task.
func (*RecoveryManager) SetResumeCallback ¶
func (m *RecoveryManager) SetResumeCallback(cb ResumeCallback)
SetResumeCallback sets the callback for resuming tasks.
type ResumeCallback ¶
ResumeCallback is called to resume execution from a checkpoint.
type State ¶
type State struct {
// Core identifiers
TaskID string `json:"task_id"`
SessionID string `json:"session_id"`
UserID string `json:"user_id"`
AppName string `json:"app_name"`
// Original user input
Query string `json:"query"`
// Agent state snapshot
AgentName string `json:"agent_name"`
AgentState *AgentStateSnapshot `json:"agent_state,omitempty"`
InvocationID string `json:"invocation_id"`
LastEventIndex int `json:"last_event_index"` // Index of last processed event
// Pending tool call (for HITL approval)
PendingToolCall *PendingToolCall `json:"pending_tool_call,omitempty"`
// Checkpoint metadata
Phase Phase `json:"phase"`
CheckpointType Type `json:"checkpoint_type"`
CheckpointTime time.Time `json:"checkpoint_time"`
// Error information (if Phase == PhaseError)
Error string `json:"error,omitempty"`
}
State represents the full execution state at a checkpoint.
This captures everything needed to resume agent execution:
- Task and session identifiers
- The original user query
- Agent execution state (messages, iteration count, etc.)
- Pending tool calls awaiting approval
- Checkpoint metadata (phase, type, time)
func Deserialize ¶
Deserialize reconstructs a State from JSON bytes.
func (*State) IsRecoverable ¶
IsRecoverable returns true if the checkpoint can be recovered.
func (*State) NeedsUserInput ¶
NeedsUserInput returns true if the checkpoint is waiting for user input.
func (*State) WithAgentState ¶
func (s *State) WithAgentState(as *AgentStateSnapshot) *State
WithAgentState sets the agent state snapshot.
func (*State) WithLastEventIndex ¶
WithLastEventIndex sets the index of the last processed event.
func (*State) WithPendingToolCall ¶
func (s *State) WithPendingToolCall(tc *PendingToolCall) *State
WithPendingToolCall sets a pending tool call.
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage manages checkpoint persistence.
Architecture (derived from legacy Hector):
Checkpoints are stored in session state (metadata) under the "pending_executions" key. This keeps checkpoints co-located with the session they belong to, making recovery simple.
Storage layout:
session.state["pending_executions"] = {
"<task_id>": { ... checkpoint state ... }
}
func NewStorage ¶
NewStorage creates a new checkpoint Storage.
func (*Storage) ListAllPending ¶
ListAllPending returns all pending checkpoints across all users. This is used for startup recovery.
func (*Storage) ListPending ¶
ListPending returns all pending checkpoints for a user.
type Strategy ¶
type Strategy string
Strategy determines when checkpoints are created.
const ( // StrategyEvent - Checkpoint on specific events (tool approval, errors). StrategyEvent Strategy = "event" // StrategyInterval - Checkpoint every N iterations. StrategyInterval Strategy = "interval" // StrategyHybrid - Both event and interval checkpointing. StrategyHybrid Strategy = "hybrid" )
type ToolCallSnapshot ¶
type ToolCallSnapshot struct {
ID string `json:"id"`
Name string `json:"name"`
Arguments map[string]any `json:"arguments,omitempty"`
Result any `json:"result,omitempty"`
Error string `json:"error,omitempty"`
Completed bool `json:"completed"`
}
ToolCallSnapshot captures the state of a tool call in progress.
type Type ¶
type Type string
Type represents why the checkpoint was created.
const ( // TypeEvent - Event-driven (tool approval, error, etc.). TypeEvent Type = "event" // TypeInterval - Interval-based (every N iterations). TypeInterval Type = "interval" // TypeManual - Manual pause requested. TypeManual Type = "manual" // TypeError - Error recovery checkpoint. TypeError Type = "error" )