Documentation
¶
Index ¶
- type Checkpoint
- type CheckpointStore
- type EventType
- type Execution
- type ExecutionEvent
- type ExecutionState
- type Executor
- func (e *Executor) AutoResumeAll(ctx context.Context) (int, error)
- func (e *Executor) CreateExecution(name string, steps []StepFunc) *Execution
- func (e *Executor) CreateNamedExecution(name string, steps []NamedStep) *Execution
- func (e *Executor) GetExecution(execID string) (*Execution, bool)
- func (e *Executor) ListExecutions() []*Execution
- func (e *Executor) LoadExecution(execID string) (*Execution, error)
- func (e *Executor) Pause(execID string) error
- func (e *Executor) Registry() *StepRegistry
- func (e *Executor) Resume(execID string) error
- func (e *Executor) ResumeExecution(ctx context.Context, execID string, lastState any) error
- func (e *Executor) Start(ctx context.Context, execID string, initialState any) error
- type ExecutorConfig
- type ExecutorOption
- type FileCheckpointStore
- func (s *FileCheckpointStore) DeleteCheckpoint(_ context.Context, execID string) error
- func (s *FileCheckpointStore) ListCheckpoints(_ context.Context) ([]*Execution, error)
- func (s *FileCheckpointStore) LoadCheckpoint(_ context.Context, execID string) (*Execution, error)
- func (s *FileCheckpointStore) SaveCheckpoint(_ context.Context, exec *Execution) error
- type NamedStep
- type PersistentCheckpointStore
- func (s *PersistentCheckpointStore) DeleteCheckpoint(ctx context.Context, execID string) error
- func (s *PersistentCheckpointStore) ListCheckpoints(ctx context.Context) ([]*Execution, error)
- func (s *PersistentCheckpointStore) LoadCheckpoint(ctx context.Context, execID string) (*Execution, error)
- func (s *PersistentCheckpointStore) SaveCheckpoint(ctx context.Context, exec *Execution) error
- type StepFunc
- type StepRegistry
- type TaskRecord
- type TaskStoreAdapter
- type TaskStoreBridge
- func (b *TaskStoreBridge) DeleteTask(ctx context.Context, taskID string) error
- func (b *TaskStoreBridge) GetTask(ctx context.Context, taskID string) (*TaskRecord, error)
- func (b *TaskStoreBridge) ListTasks(ctx context.Context) ([]*TaskRecord, error)
- func (b *TaskStoreBridge) SaveTask(ctx context.Context, task *TaskRecord) error
- func (b *TaskStoreBridge) UpdateProgress(ctx context.Context, taskID string, progress float64) error
- func (b *TaskStoreBridge) UpdateStatus(ctx context.Context, taskID string, status string) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Checkpoint ¶
type Checkpoint struct {
ID string `json:"id"`
Step int `json:"step"`
State any `json:"state"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]any `json:"metadata,omitempty"`
}
Checkpoint is a recoverable checkpoint snapshot.
type CheckpointStore ¶ added in v1.0.0
type CheckpointStore interface {
SaveCheckpoint(ctx context.Context, exec *Execution) error
LoadCheckpoint(ctx context.Context, execID string) (*Execution, error)
ListCheckpoints(ctx context.Context) ([]*Execution, error)
DeleteCheckpoint(ctx context.Context, execID string) error
}
CheckpointStore abstracts checkpoint persistence. Default implementation uses filesystem (existing behavior). Persistence-backed implementation uses TaskStoreAdapter.
type EventType ¶ added in v1.0.0
type EventType string
EventType identifies the kind of execution event.
type Execution ¶
type Execution struct {
ID string `json:"id"`
Name string `json:"name"`
State ExecutionState `json:"state"`
Progress float64 `json:"progress"`
CurrentStep int `json:"current_step"`
TotalSteps int `json:"total_steps"`
StepNames []string `json:"step_names,omitempty"`
StartTime time.Time `json:"start_time"`
LastUpdate time.Time `json:"last_update"`
EndTime *time.Time `json:"end_time,omitempty"`
Checkpoints []Checkpoint `json:"checkpoints"`
Metadata map[string]any `json:"metadata,omitempty"`
Error string `json:"error,omitempty"`
// contains filtered or unexported fields
}
Execution is a long-running agent execution instance.
type ExecutionEvent ¶ added in v1.0.0
type ExecutionEvent struct {
Type EventType
ExecID string
Step int
Timestamp time.Time
Error error
State any
}
ExecutionEvent is emitted during execution lifecycle transitions.
type ExecutionState ¶
type ExecutionState string
ExecutionState is the state of a long-running execution.
const ( StateInitialized ExecutionState = "initialized" StateRunning ExecutionState = "running" StatePaused ExecutionState = "paused" StateResuming ExecutionState = "resuming" StateCompleted ExecutionState = "completed" StateFailed ExecutionState = "failed" StateCancelled ExecutionState = "cancelled" )
type Executor ¶
type Executor struct {
OnEvent func(ExecutionEvent)
// contains filtered or unexported fields
}
Executor manages long-running agent executions.
func NewExecutor ¶
func NewExecutor(config ExecutorConfig, logger *zap.Logger, opts ...ExecutorOption) *Executor
NewExecutor creates a new long-running executor.
func (*Executor) AutoResumeAll ¶ added in v1.0.0
AutoResumeAll loads all checkpoints from the store and resumes any that are resumable (running, paused, or resuming) and have named steps registered in the step registry.
func (*Executor) CreateExecution ¶
CreateExecution creates a new long-running execution with anonymous steps.
func (*Executor) CreateNamedExecution ¶ added in v1.0.0
CreateNamedExecution creates a new execution with named steps for resume capability.
func (*Executor) GetExecution ¶
GetExecution retrieves an execution by ID.
func (*Executor) ListExecutions ¶
ListExecutions returns all tracked executions.
func (*Executor) LoadExecution ¶
LoadExecution loads an execution from the checkpoint store.
func (*Executor) Registry ¶ added in v1.0.0
func (e *Executor) Registry() *StepRegistry
Registry returns the executor's step registry.
func (*Executor) ResumeExecution ¶ added in v1.0.0
ResumeExecution resumes a loaded execution from its last checkpoint using the step registry.
type ExecutorConfig ¶
type ExecutorConfig struct {
CheckpointInterval time.Duration `json:"checkpoint_interval"`
CheckpointDir string `json:"checkpoint_dir"`
MaxRetries int `json:"max_retries"`
HeartbeatInterval time.Duration `json:"heartbeat_interval"`
AutoResume bool `json:"auto_resume"`
}
ExecutorConfig configures the long-running executor.
func DefaultExecutorConfig ¶
func DefaultExecutorConfig() ExecutorConfig
DefaultExecutorConfig returns the default configuration.
type ExecutorOption ¶ added in v1.0.0
type ExecutorOption func(*Executor)
ExecutorOption configures the Executor.
func WithCheckpointStore ¶ added in v1.0.0
func WithCheckpointStore(store CheckpointStore) ExecutorOption
WithCheckpointStore sets a custom CheckpointStore on the Executor.
type FileCheckpointStore ¶ added in v1.0.0
type FileCheckpointStore struct {
// contains filtered or unexported fields
}
FileCheckpointStore is the default filesystem-based implementation. This extracts the existing os.WriteFile/os.ReadFile behavior from executor.go.
func NewFileCheckpointStore ¶ added in v1.0.0
func NewFileCheckpointStore(dir string, logger *zap.Logger) *FileCheckpointStore
NewFileCheckpointStore creates a new filesystem-based checkpoint store.
func (*FileCheckpointStore) DeleteCheckpoint ¶ added in v1.0.0
func (s *FileCheckpointStore) DeleteCheckpoint(_ context.Context, execID string) error
DeleteCheckpoint removes a checkpoint file from disk.
func (*FileCheckpointStore) ListCheckpoints ¶ added in v1.0.0
func (s *FileCheckpointStore) ListCheckpoints(_ context.Context) ([]*Execution, error)
ListCheckpoints reads all checkpoint files from the directory.
func (*FileCheckpointStore) LoadCheckpoint ¶ added in v1.0.0
LoadCheckpoint reads an execution from a JSON file on disk.
func (*FileCheckpointStore) SaveCheckpoint ¶ added in v1.0.0
func (s *FileCheckpointStore) SaveCheckpoint(_ context.Context, exec *Execution) error
SaveCheckpoint persists execution state to a JSON file on disk.
type NamedStep ¶ added in v1.0.0
NamedStep pairs a step function with a name for checkpoint-based resume.
type PersistentCheckpointStore ¶ added in v1.0.0
type PersistentCheckpointStore struct {
// contains filtered or unexported fields
}
PersistentCheckpointStore implements CheckpointStore using TaskStoreAdapter.
func NewPersistentCheckpointStore ¶ added in v1.0.0
func NewPersistentCheckpointStore(store TaskStoreAdapter, logger *zap.Logger) *PersistentCheckpointStore
NewPersistentCheckpointStore creates a checkpoint store backed by a TaskStoreAdapter.
func (*PersistentCheckpointStore) DeleteCheckpoint ¶ added in v1.0.0
func (s *PersistentCheckpointStore) DeleteCheckpoint(ctx context.Context, execID string) error
DeleteCheckpoint removes a TaskRecord by ID.
func (*PersistentCheckpointStore) ListCheckpoints ¶ added in v1.0.0
func (s *PersistentCheckpointStore) ListCheckpoints(ctx context.Context) ([]*Execution, error)
ListCheckpoints retrieves all TaskRecords and unmarshals each to an Execution.
func (*PersistentCheckpointStore) LoadCheckpoint ¶ added in v1.0.0
func (s *PersistentCheckpointStore) LoadCheckpoint(ctx context.Context, execID string) (*Execution, error)
LoadCheckpoint retrieves a TaskRecord and unmarshals it back to an Execution.
func (*PersistentCheckpointStore) SaveCheckpoint ¶ added in v1.0.0
func (s *PersistentCheckpointStore) SaveCheckpoint(ctx context.Context, exec *Execution) error
SaveCheckpoint marshals the Execution to JSON and saves it as a TaskRecord.
type StepRegistry ¶ added in v1.0.0
type StepRegistry struct {
// contains filtered or unexported fields
}
StepRegistry allows named step registration for resume capability.
func NewStepRegistry ¶ added in v1.0.0
func NewStepRegistry() *StepRegistry
NewStepRegistry creates a new step registry.
func (*StepRegistry) Get ¶ added in v1.0.0
func (r *StepRegistry) Get(name string) (StepFunc, bool)
Get retrieves a step function by name.
func (*StepRegistry) Register ¶ added in v1.0.0
func (r *StepRegistry) Register(name string, fn StepFunc)
Register adds a named step function to the registry.
type TaskRecord ¶ added in v1.0.0
type TaskRecord struct {
ID string
Status string
Progress float64
Data []byte // JSON-serialized Execution
Metadata map[string]string
}
TaskRecord is the local representation of a persistent task.
type TaskStoreAdapter ¶ added in v1.0.0
type TaskStoreAdapter interface {
SaveTask(ctx context.Context, task *TaskRecord) error
GetTask(ctx context.Context, taskID string) (*TaskRecord, error)
ListTasks(ctx context.Context) ([]*TaskRecord, error)
DeleteTask(ctx context.Context, taskID string) error
UpdateProgress(ctx context.Context, taskID string, progress float64) error
UpdateStatus(ctx context.Context, taskID string, status string) error
}
TaskStoreAdapter is a local interface matching the subset of persistence.TaskStore needed for checkpoint storage. Using local interface pattern to avoid direct import of persistence in most of the longrunning package.
type TaskStoreBridge ¶ added in v1.0.0
type TaskStoreBridge struct {
// contains filtered or unexported fields
}
TaskStoreBridge adapts persistence.TaskStore to TaskStoreAdapter. This is the only file in the longrunning package that imports persistence.
func NewTaskStoreBridge ¶ added in v1.0.0
func NewTaskStoreBridge(store persistence.TaskStore) *TaskStoreBridge
NewTaskStoreBridge creates a new bridge from persistence.TaskStore to TaskStoreAdapter.
func (*TaskStoreBridge) DeleteTask ¶ added in v1.0.0
func (b *TaskStoreBridge) DeleteTask(ctx context.Context, taskID string) error
DeleteTask deletes a task by ID.
func (*TaskStoreBridge) GetTask ¶ added in v1.0.0
func (b *TaskStoreBridge) GetTask(ctx context.Context, taskID string) (*TaskRecord, error)
GetTask retrieves a persistence.AsyncTask and converts it to a TaskRecord.
func (*TaskStoreBridge) ListTasks ¶ added in v1.0.0
func (b *TaskStoreBridge) ListTasks(ctx context.Context) ([]*TaskRecord, error)
ListTasks retrieves all tasks and converts them to TaskRecords.
func (*TaskStoreBridge) SaveTask ¶ added in v1.0.0
func (b *TaskStoreBridge) SaveTask(ctx context.Context, task *TaskRecord) error
SaveTask converts a TaskRecord to persistence.AsyncTask and saves it.
func (*TaskStoreBridge) UpdateProgress ¶ added in v1.0.0
func (b *TaskStoreBridge) UpdateProgress(ctx context.Context, taskID string, progress float64) error
UpdateProgress updates the progress of a task.
func (*TaskStoreBridge) UpdateStatus ¶ added in v1.0.0
UpdateStatus updates the status of a task.