checkpoint

package
v1.31.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package checkpoint provides execution state capture and recovery.

Architecture (ported from legacy Hector)

ExecutionState captures the full state of an agent execution at a point in time. This enables:

  • Fault tolerance: Resume after crashes
  • HITL workflows: Pause for human approval, resume later
  • Long-running tasks: Survive server restarts
  • Cost optimization: Don't re-execute completed work

The checkpoint system is built on top of session.Service - checkpoints are stored in session state (under "pending_executions" key) and can be recovered on startup.

Multi-Agent Scope

Checkpoints capture the state of the CURRENTLY EXECUTING agent only, not the entire agent tree. This is intentional and matches legacy Hector behavior.

Why single-agent scope is sufficient:

  1. Session events are the source of truth - All agent interactions are persisted to session.Service, providing complete conversation history across all agents.

  2. Runner determines correct agent - On recovery, runner.findAgentToRun() uses session event history to determine which agent should resume.

  3. Context is preserved - The LLM sees full conversation history via session events when the agent resumes.

Recovery Flow

┌─────────────────────────────────────────────────────────────────────────┐
│   CHECKPOINT CREATION                                                    │
│   ───────────────────                                                    │
│   User → Coordinator → Researcher (tool approval needed)                 │
│                            ↓                                             │
│                   CHECKPOINT: AgentName="researcher"                     │
│                              AgentState={iteration: 1, ...}              │
│                              PendingToolCall={requires_approval: true}   │
├─────────────────────────────────────────────────────────────────────────┤
│   RECOVERY                                                               │
│   ────────                                                               │
│   1. Load checkpoint → AgentName="researcher"                            │
│   2. Load session → Events from all agents (full history)                │
│   3. Runner.findAgentToRun() → Returns "researcher"                      │
│   4. Resume researcher with full context                                 │
└─────────────────────────────────────────────────────────────────────────┘

Workflow Agent Support

For workflow agents (sequential, parallel, loop), additional state is captured:

  • WorkflowType: The workflow pattern being executed
  • WorkflowStage: Current stage in sequential workflows
  • LoopIteration: Current iteration in loop workflows

Parallel workflows have limited checkpoint support because multiple agents may be executing concurrently. In this case, recovery starts the parallel workflow from the beginning.

Integration

Agents can implement the agent.Checkpointable interface to provide custom state capture/restore logic. The checkpoint hooks in manager.go provide integration points for the runner.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentStateSnapshot

type AgentStateSnapshot struct {
	// Iteration tracking
	Iteration   int `json:"iteration"`
	TotalTokens int `json:"total_tokens"`

	// Conversation state (from legacy ReasoningStateSnapshot)
	History     []*agent.Event `json:"history,omitempty"`
	LastEvent   *agent.Event   `json:"last_event,omitempty"`
	CurrentTurn []*agent.Event `json:"current_turn,omitempty"` // Messages in current turn

	// Response accumulation
	AccumulatedResponse string `json:"accumulated_response,omitempty"`
	FinalResponseAdded  bool   `json:"final_response_added"` // Response complete flag

	// Tool execution tracking
	PendingToolCalls        []*ToolCallSnapshot `json:"pending_tool_calls,omitempty"`
	FirstIterationToolCalls []*ToolCallSnapshot `json:"first_iteration_tool_calls,omitempty"` // For agentic loop

	// Multi-agent context (from legacy SubAgents field)
	SubAgents   []string `json:"sub_agents,omitempty"`   // Available sub-agents (for transfer)
	ParentAgent string   `json:"parent_agent,omitempty"` // Who invoked this agent
	Branch      string   `json:"branch,omitempty"`       // Agent branch path (e.g., "root.coordinator.researcher")

	// Workflow state (for sequential/parallel/loop agents)
	WorkflowType      string `json:"workflow_type,omitempty"`       // "sequential", "parallel", "loop"
	WorkflowStage     int    `json:"workflow_stage,omitempty"`      // Current stage index in sequential
	LoopIteration     int    `json:"loop_iteration,omitempty"`      // Current loop iteration
	LoopMaxIterations int    `json:"loop_max_iterations,omitempty"` // Max loop iterations

	// Agent-specific state (for custom agents)
	Custom map[string]any `json:"custom,omitempty"`
}

AgentStateSnapshot captures the state of an LLM agent during execution.

Multi-Agent Scope (ported from legacy Hector):

Checkpoints capture the state of the CURRENTLY EXECUTING agent only.
This is intentional - the full multi-agent history is preserved in
session events (the source of truth). On recovery:
  1. Checkpoint tells us which agent was active
  2. Session events provide full conversation history
  3. Runner.findAgentToRun() routes to the correct agent

For workflow agents (sequential, parallel, loop), the WorkflowState fields track progress within the workflow.

type Callbacks added in v1.16.1

type Callbacks struct {
	// contains filtered or unexported fields
}

Callbacks adapts agent callbacks to checkpoint manager hooks. This bridges the agent execution flow (llmagent) to the checkpointing system.

func NewCallbacks added in v1.16.1

func NewCallbacks(manager *Manager) *Callbacks

NewCallbacks creates a new Callbacks adapter.

func (*Callbacks) AfterModel added in v1.16.1

func (c *Callbacks) AfterModel(ctx agent.CallbackContext, resp *model.Response, llmErr error) (*model.Response, error)

AfterModel implements agent.AfterModelCallback. Creates a checkpoint after the LLM response is received (PhasePostLLM).

func (*Callbacks) AfterTool added in v1.16.1

func (c *Callbacks) AfterTool(ctx tool.Context, t tool.Tool, args, result map[string]any, err error) (map[string]any, error)

AfterTool implements agent.AfterToolCallback. Creates a checkpoint after a tool execution if configured.

func (*Callbacks) BeforeModel added in v1.16.1

func (c *Callbacks) BeforeModel(ctx agent.CallbackContext, req *model.Request) (*model.Response, error)

BeforeModel implements agent.BeforeModelCallback. Creates a checkpoint before the LLM is called (PhasePreLLM).

func (*Callbacks) BeforeTool added in v1.16.1

func (c *Callbacks) BeforeTool(ctx tool.Context, t tool.Tool, args map[string]any) (map[string]any, error)

BeforeTool implements agent.BeforeToolCallback. Creates a checkpoint before a tool is executed (PhaseToolExecution).

func (*Callbacks) OnComplete added in v1.16.1

func (c *Callbacks) OnComplete(ctx agent.CallbackContext) (*a2a.Message, error)

OnComplete implements agent.AfterAgentCallback. Clears the checkpoint when the agent completes successfully.

type CheckpointHooks

type CheckpointHooks struct {
	// contains filtered or unexported fields
}

CheckpointHooks provides integration hooks for the runner.

func NewCheckpointHooks

func NewCheckpointHooks(manager *Manager) *CheckpointHooks

NewCheckpointHooks creates hooks for runner integration.

func (*CheckpointHooks) AfterLLMCall

func (h *CheckpointHooks) AfterLLMCall(ctx context.Context, state *State)

AfterLLMCall creates a checkpoint after an LLM API call.

func (*CheckpointHooks) AfterToolExecution

func (h *CheckpointHooks) AfterToolExecution(ctx context.Context, state *State, toolName string)

AfterToolExecution creates a checkpoint after tool execution.

func (*CheckpointHooks) BeforeLLMCall

func (h *CheckpointHooks) BeforeLLMCall(ctx context.Context, state *State)

BeforeLLMCall creates a checkpoint before an LLM API call.

func (*CheckpointHooks) BeforeToolExecution

func (h *CheckpointHooks) BeforeToolExecution(ctx context.Context, state *State, toolName string)

BeforeToolExecution creates a checkpoint before tool execution.

func (*CheckpointHooks) OnComplete

func (h *CheckpointHooks) OnComplete(ctx context.Context, appName, userID, sessionID, taskID string)

OnComplete clears the checkpoint when execution completes successfully.

func (*CheckpointHooks) OnError

func (h *CheckpointHooks) OnError(ctx context.Context, state *State, err error)

OnError creates a checkpoint when an error occurs.

func (*CheckpointHooks) OnIterationEnd

func (h *CheckpointHooks) OnIterationEnd(ctx context.Context, state *State, iteration int)

OnIterationEnd creates a checkpoint at end of an iteration.

func (*CheckpointHooks) OnToolApprovalRequired

func (h *CheckpointHooks) OnToolApprovalRequired(ctx context.Context, state *State, pendingTool *PendingToolCall)

OnToolApprovalRequired creates a checkpoint when HITL approval is needed.

type CheckpointStats

type CheckpointStats struct {
	Total         int
	Working       int
	InputRequired int
	Expired       int
	OldestAge     time.Duration
	AverageAge    time.Duration
}

CheckpointStats contains statistics about pending checkpoints.

type Config

type Config struct {
	// Enabled enables checkpointing.
	// Default: false
	Enabled *bool `yaml:"enabled,omitempty"`

	// Strategy determines when checkpoints are created.
	// Values: "event", "interval", "hybrid"
	// Default: "event"
	Strategy Strategy `yaml:"strategy,omitempty"`

	// Interval specifies checkpoint frequency (every N iterations).
	// Only used when Strategy is "interval" or "hybrid".
	// Default: 0 (disabled)
	Interval int `yaml:"interval,omitempty"`

	// AfterTools checkpoints after tool executions complete.
	// Default: false
	AfterTools *bool `yaml:"after_tools,omitempty"`

	// BeforeLLM checkpoints before LLM API calls.
	// Default: false
	BeforeLLM *bool `yaml:"before_llm,omitempty"`

	// Recovery configures checkpoint recovery behavior.
	Recovery *RecoveryConfig `yaml:"recovery,omitempty"`
}

Config configures checkpoint behavior.

Example YAML configuration:

checkpoint:
  enabled: true
  strategy: hybrid
  interval: 5
  after_tools: true
  before_llm: false
  recovery:
    auto_resume: true
    auto_resume_hitl: false
    timeout: 3600

func (*Config) GetRecoveryTimeout

func (c *Config) GetRecoveryTimeout() time.Duration

GetRecoveryTimeout returns the recovery timeout as a duration.

func (*Config) IsEnabled

func (c *Config) IsEnabled() bool

IsEnabled returns whether checkpointing is enabled.

func (*Config) SetDefaults

func (c *Config) SetDefaults()

SetDefaults applies default values.

func (*Config) ShouldAutoResume

func (c *Config) ShouldAutoResume() bool

ShouldAutoResume returns whether to auto-resume on startup.

func (*Config) ShouldAutoResumeHITL

func (c *Config) ShouldAutoResumeHITL() bool

ShouldAutoResumeHITL returns whether to auto-resume INPUT_REQUIRED tasks.

func (*Config) ShouldCheckpointAfterTools

func (c *Config) ShouldCheckpointAfterTools() bool

ShouldCheckpointAfterTools returns whether to checkpoint after tool execution.

func (*Config) ShouldCheckpointAtIteration

func (c *Config) ShouldCheckpointAtIteration(iteration int) bool

ShouldCheckpointAtIteration returns whether to checkpoint at the given iteration.

func (*Config) ShouldCheckpointBeforeLLM

func (c *Config) ShouldCheckpointBeforeLLM() bool

ShouldCheckpointBeforeLLM returns whether to checkpoint before LLM calls.

func (*Config) ShouldCheckpointInterval

func (c *Config) ShouldCheckpointInterval() bool

ShouldCheckpointInterval returns whether interval checkpointing is enabled.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the configuration.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager orchestrates checkpointing and recovery operations.

It provides a unified interface for:

  • Creating checkpoints during execution
  • Recovering from checkpoints on startup
  • Managing checkpoint lifecycle

func NewManager

func NewManager(cfg *Config, sessionService session.Service) *Manager

NewManager creates a new checkpoint Manager.

func (*Manager) ClearCheckpoint

func (m *Manager) ClearCheckpoint(ctx context.Context, appName, userID, sessionID, taskID string) error

ClearCheckpoint removes a checkpoint.

func (*Manager) Config

func (m *Manager) Config() *Config

Config returns the checkpoint configuration.

func (*Manager) GetPendingCheckpoints

func (m *Manager) GetPendingCheckpoints(ctx context.Context, appName, userID string) ([]*State, error)

GetPendingCheckpoints returns all pending checkpoints for a user.

func (*Manager) GetStats

func (m *Manager) GetStats(ctx context.Context, appName string) (*CheckpointStats, error)

GetStats returns statistics about pending checkpoints.

func (*Manager) IsEnabled

func (m *Manager) IsEnabled() bool

IsEnabled returns whether checkpointing is enabled.

func (*Manager) LoadCheckpoint

func (m *Manager) LoadCheckpoint(ctx context.Context, appName, userID, sessionID, taskID string) (*State, error)

LoadCheckpoint retrieves a checkpoint by identifiers.

func (*Manager) RecoverOnStartup

func (m *Manager) RecoverOnStartup(ctx context.Context, appName string) error

RecoverOnStartup recovers pending tasks on startup.

func (*Manager) ResumeTask

func (m *Manager) ResumeTask(ctx context.Context, appName, userID, sessionID, taskID string, userInput string) error

ResumeTask manually resumes a task from checkpoint.

func (*Manager) SaveCheckpoint

func (m *Manager) SaveCheckpoint(ctx context.Context, state *State) error

SaveCheckpoint creates and persists a checkpoint.

func (*Manager) SetResumeCallback

func (m *Manager) SetResumeCallback(cb ResumeCallback)

SetResumeCallback sets the callback for resuming tasks.

func (*Manager) ShouldCheckpointAfterTools

func (m *Manager) ShouldCheckpointAfterTools() bool

ShouldCheckpointAfterTools returns whether to checkpoint after tool execution.

func (*Manager) ShouldCheckpointAtIteration

func (m *Manager) ShouldCheckpointAtIteration(iteration int) bool

ShouldCheckpointAtIteration returns whether to checkpoint at the given iteration.

func (*Manager) ShouldCheckpointBeforeLLM

func (m *Manager) ShouldCheckpointBeforeLLM() bool

ShouldCheckpointBeforeLLM returns whether to checkpoint before LLM calls.

type PendingToolCall

type PendingToolCall struct {
	ID               string         `json:"id"`
	Name             string         `json:"name"`
	Description      string         `json:"description,omitempty"`
	Arguments        map[string]any `json:"arguments,omitempty"`
	RequiresApproval bool           `json:"requires_approval"`
}

PendingToolCall represents a tool call awaiting execution or approval.

type Phase

type Phase string

Phase represents the execution phase when checkpoint was created.

const (
	// PhaseInitialized - Agent execution just started.
	PhaseInitialized Phase = "initialized"

	// PhasePreLLM - Before LLM call.
	PhasePreLLM Phase = "pre_llm"

	// PhasePostLLM - After LLM response received.
	PhasePostLLM Phase = "post_llm"

	// PhaseToolExecution - During tool execution.
	PhaseToolExecution Phase = "tool_execution"

	// PhasePostTool - After tool execution completed.
	PhasePostTool Phase = "post_tool"

	// PhaseIterationEnd - End of an agent loop iteration.
	PhaseIterationEnd Phase = "iteration_end"

	// PhaseToolApproval - Waiting for HITL tool approval.
	PhaseToolApproval Phase = "tool_approval"

	// PhaseError - Checkpoint created due to error.
	PhaseError Phase = "error"
)

type RecoveryConfig

type RecoveryConfig struct {
	// AutoResume enables automatic recovery on startup.
	// Default: false
	AutoResume *bool `yaml:"auto_resume,omitempty"`

	// AutoResumeHITL enables automatic recovery for INPUT_REQUIRED tasks.
	// When false, INPUT_REQUIRED tasks wait for explicit user action.
	// Default: false
	AutoResumeHITL *bool `yaml:"auto_resume_hitl,omitempty"`

	// Timeout is the maximum age (in seconds) for a checkpoint to be recoverable.
	// Checkpoints older than this are considered expired and marked as FAILED.
	// Default: 3600 (1 hour)
	Timeout int `yaml:"timeout,omitempty"`
}

RecoveryConfig configures checkpoint recovery behavior.

func (*RecoveryConfig) SetDefaults

func (c *RecoveryConfig) SetDefaults()

SetDefaults applies default values for RecoveryConfig.

func (*RecoveryConfig) Validate

func (c *RecoveryConfig) Validate() error

Validate checks the RecoveryConfig.

type RecoveryManager

type RecoveryManager struct {
	// contains filtered or unexported fields
}

RecoveryManager handles checkpoint recovery on startup and during runtime.

Architecture (ported from legacy Hector):

On startup, RecoveryManager scans for pending checkpoints and:
  1. Validates checkpoint states (not expired, recoverable)
  2. For WORKING tasks: Auto-resumes if configured
  3. For INPUT_REQUIRED tasks: Waits for user action (unless AutoResumeHITL)
  4. For expired checkpoints: Marks tasks as FAILED

func NewRecoveryManager

func NewRecoveryManager(cfg *Config, storage *Storage) *RecoveryManager

NewRecoveryManager creates a new RecoveryManager.

func (*RecoveryManager) CancelCheckpoint

func (m *RecoveryManager) CancelCheckpoint(ctx context.Context, appName, userID, sessionID, taskID string) error

CancelCheckpoint removes a checkpoint without resuming.

func (*RecoveryManager) GetCheckpoint

func (m *RecoveryManager) GetCheckpoint(ctx context.Context, appName, userID, sessionID, taskID string) (*State, error)

GetCheckpoint returns a specific checkpoint.

func (*RecoveryManager) GetPendingCheckpoints

func (m *RecoveryManager) GetPendingCheckpoints(ctx context.Context, appName, userID string) ([]*State, error)

GetPendingCheckpoints returns all pending checkpoints for a user.

func (*RecoveryManager) GetStats

func (m *RecoveryManager) GetStats(ctx context.Context, appName string) (*CheckpointStats, error)

GetStats returns statistics about pending checkpoints.

func (*RecoveryManager) RecoverPendingTasks

func (m *RecoveryManager) RecoverPendingTasks(ctx context.Context, appName string) error

RecoverPendingTasks recovers tasks with checkpoints on startup. This should be called during server initialization.

func (*RecoveryManager) ResumeTask

func (m *RecoveryManager) ResumeTask(ctx context.Context, appName, userID, sessionID, taskID string, userInput string) error

ResumeTask manually resumes a task from checkpoint. This is used when a user explicitly requests to resume an INPUT_REQUIRED task.

func (*RecoveryManager) SetResumeCallback

func (m *RecoveryManager) SetResumeCallback(cb ResumeCallback)

SetResumeCallback sets the callback for resuming tasks.

type ResumeCallback

type ResumeCallback func(ctx context.Context, state *State) error

ResumeCallback is called to resume execution from a checkpoint.

type State

type State struct {
	// Core identifiers
	TaskID    string `json:"task_id"`
	SessionID string `json:"session_id"`
	UserID    string `json:"user_id"`
	AppName   string `json:"app_name"`

	// Original user input
	Query string `json:"query"`

	// Agent state snapshot
	AgentName      string              `json:"agent_name"`
	AgentState     *AgentStateSnapshot `json:"agent_state,omitempty"`
	InvocationID   string              `json:"invocation_id"`
	LastEventIndex int                 `json:"last_event_index"` // Index of last processed event

	// Pending tool call (for HITL approval)
	PendingToolCall *PendingToolCall `json:"pending_tool_call,omitempty"`

	// Checkpoint metadata
	Phase          Phase     `json:"phase"`
	CheckpointType Type      `json:"checkpoint_type"`
	CheckpointTime time.Time `json:"checkpoint_time"`

	// Error information (if Phase == PhaseError)
	Error string `json:"error,omitempty"`
}

State represents the full execution state at a checkpoint.

This captures everything needed to resume agent execution:

  • Task and session identifiers
  • The original user query
  • Agent execution state (messages, iteration count, etc.)
  • Pending tool calls awaiting approval
  • Checkpoint metadata (phase, type, time)

func Deserialize

func Deserialize(data []byte) (*State, error)

Deserialize reconstructs a State from JSON bytes.

func NewState

func NewState(taskID, sessionID, userID, appName, query, agentName, invocationID string) *State

NewState creates a new checkpoint State with required fields.

func (*State) IsExpired

func (s *State) IsExpired(timeout time.Duration) bool

IsExpired checks if the checkpoint has expired based on the timeout.

func (*State) IsRecoverable

func (s *State) IsRecoverable() bool

IsRecoverable returns true if the checkpoint can be recovered.

func (*State) NeedsUserInput

func (s *State) NeedsUserInput() bool

NeedsUserInput returns true if the checkpoint is waiting for user input.

func (*State) Serialize

func (s *State) Serialize() ([]byte, error)

Serialize converts the State to JSON bytes.

func (*State) WithAgentState

func (s *State) WithAgentState(as *AgentStateSnapshot) *State

WithAgentState sets the agent state snapshot.

func (*State) WithError

func (s *State) WithError(err error) *State

WithError sets the error message.

func (*State) WithLastEventIndex

func (s *State) WithLastEventIndex(idx int) *State

WithLastEventIndex sets the index of the last processed event.

func (*State) WithPendingToolCall

func (s *State) WithPendingToolCall(tc *PendingToolCall) *State

WithPendingToolCall sets a pending tool call.

func (*State) WithPhase

func (s *State) WithPhase(phase Phase) *State

WithPhase sets the checkpoint phase.

func (*State) WithType

func (s *State) WithType(t Type) *State

WithType sets the checkpoint type.

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage manages checkpoint persistence.

Architecture (derived from legacy Hector):

Checkpoints are stored in session state (metadata) under the "pending_executions" key.
This keeps checkpoints co-located with the session they belong to, making recovery simple.

Storage layout:

session.state["pending_executions"] = {
    "<task_id>": { ... checkpoint state ... }
}

func NewStorage

func NewStorage(sessionService session.Service) *Storage

NewStorage creates a new checkpoint Storage.

func (*Storage) Clear

func (s *Storage) Clear(ctx context.Context, appName, userID, sessionID, taskID string) error

Clear removes a checkpoint for a task.

func (*Storage) ListAllPending

func (s *Storage) ListAllPending(ctx context.Context, appName string) ([]*State, error)

ListAllPending returns all pending checkpoints across all users. This is used for startup recovery.

func (*Storage) ListPending

func (s *Storage) ListPending(ctx context.Context, appName, userID string) ([]*State, error)

ListPending returns all pending checkpoints for a user.

func (*Storage) Load

func (s *Storage) Load(ctx context.Context, appName, userID, sessionID, taskID string) (*State, error)

Load retrieves a checkpoint state for a task.

func (*Storage) Save

func (s *Storage) Save(ctx context.Context, state *State) error

Save persists a checkpoint state.

type Strategy

type Strategy string

Strategy determines when checkpoints are created.

const (
	// StrategyEvent - Checkpoint on specific events (tool approval, errors).
	StrategyEvent Strategy = "event"

	// StrategyInterval - Checkpoint every N iterations.
	StrategyInterval Strategy = "interval"

	// StrategyHybrid - Both event and interval checkpointing.
	StrategyHybrid Strategy = "hybrid"
)

type ToolCallSnapshot

type ToolCallSnapshot struct {
	ID        string         `json:"id"`
	Name      string         `json:"name"`
	Arguments map[string]any `json:"arguments,omitempty"`
	Result    any            `json:"result,omitempty"`
	Error     string         `json:"error,omitempty"`
	Completed bool           `json:"completed"`
}

ToolCallSnapshot captures the state of a tool call in progress.

type Type

type Type string

Type represents why the checkpoint was created.

const (
	// TypeEvent - Event-driven (tool approval, error, etc.).
	TypeEvent Type = "event"

	// TypeInterval - Interval-based (every N iterations).
	TypeInterval Type = "interval"

	// TypeManual - Manual pause requested.
	TypeManual Type = "manual"

	// TypeError - Error recovery checkpoint.
	TypeError Type = "error"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL