longrunning

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint struct {
	ID        string         `json:"id"`
	Step      int            `json:"step"`
	State     any            `json:"state"`
	Timestamp time.Time      `json:"timestamp"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

Checkpoint is a recoverable checkpoint snapshot.

type CheckpointStore added in v1.0.0

type CheckpointStore interface {
	SaveCheckpoint(ctx context.Context, exec *Execution) error
	LoadCheckpoint(ctx context.Context, execID string) (*Execution, error)
	ListCheckpoints(ctx context.Context) ([]*Execution, error)
	DeleteCheckpoint(ctx context.Context, execID string) error
}

CheckpointStore abstracts checkpoint persistence. Default implementation uses filesystem (existing behavior). Persistence-backed implementation uses TaskStoreAdapter.

type EventType added in v1.0.0

type EventType string

EventType identifies the kind of execution event.

const (
	EventStepStarted   EventType = "step_started"
	EventStepCompleted EventType = "step_completed"
	EventStepFailed    EventType = "step_failed"
	EventCheckpointed  EventType = "checkpointed"
	EventPaused        EventType = "paused"
	EventResumed       EventType = "resumed"
)

type Execution

type Execution struct {
	ID          string         `json:"id"`
	Name        string         `json:"name"`
	State       ExecutionState `json:"state"`
	Progress    float64        `json:"progress"`
	CurrentStep int            `json:"current_step"`
	TotalSteps  int            `json:"total_steps"`
	StepNames   []string       `json:"step_names,omitempty"`
	StartTime   time.Time      `json:"start_time"`
	LastUpdate  time.Time      `json:"last_update"`
	EndTime     *time.Time     `json:"end_time,omitempty"`
	Checkpoints []Checkpoint   `json:"checkpoints"`
	Metadata    map[string]any `json:"metadata,omitempty"`
	Error       string         `json:"error,omitempty"`
	// contains filtered or unexported fields
}

Execution is a long-running agent execution instance.

type ExecutionEvent added in v1.0.0

type ExecutionEvent struct {
	Type      EventType
	ExecID    string
	Step      int
	Timestamp time.Time
	Error     error
	State     any
}

ExecutionEvent is emitted during execution lifecycle transitions.

type ExecutionState

type ExecutionState string

ExecutionState is the state of a long-running execution.

const (
	StateInitialized ExecutionState = "initialized"
	StateRunning     ExecutionState = "running"
	StatePaused      ExecutionState = "paused"
	StateResuming    ExecutionState = "resuming"
	StateCompleted   ExecutionState = "completed"
	StateFailed      ExecutionState = "failed"
	StateCancelled   ExecutionState = "cancelled"
)

type Executor

type Executor struct {
	OnEvent func(ExecutionEvent)
	// contains filtered or unexported fields
}

Executor manages long-running agent executions.

func NewExecutor

func NewExecutor(config ExecutorConfig, logger *zap.Logger, opts ...ExecutorOption) *Executor

NewExecutor creates a new long-running executor.

func (*Executor) AutoResumeAll added in v1.0.0

func (e *Executor) AutoResumeAll(ctx context.Context) (int, error)

AutoResumeAll loads all checkpoints from the store and resumes any that are resumable (running, paused, or resuming) and have named steps registered in the step registry.

func (*Executor) CreateExecution

func (e *Executor) CreateExecution(name string, steps []StepFunc) *Execution

CreateExecution creates a new long-running execution with anonymous steps.

func (*Executor) CreateNamedExecution added in v1.0.0

func (e *Executor) CreateNamedExecution(name string, steps []NamedStep) *Execution

CreateNamedExecution creates a new execution with named steps for resume capability.

func (*Executor) GetExecution

func (e *Executor) GetExecution(execID string) (*Execution, bool)

GetExecution retrieves an execution by ID.

func (*Executor) ListExecutions

func (e *Executor) ListExecutions() []*Execution

ListExecutions returns all tracked executions.

func (*Executor) LoadExecution

func (e *Executor) LoadExecution(execID string) (*Execution, error)

LoadExecution loads an execution from the checkpoint store.

func (*Executor) Pause

func (e *Executor) Pause(execID string) error

Pause signals a running execution to pause.

func (*Executor) Registry added in v1.0.0

func (e *Executor) Registry() *StepRegistry

Registry returns the executor's step registry.

func (*Executor) Resume

func (e *Executor) Resume(execID string) error

Resume signals a paused execution to continue.

func (*Executor) ResumeExecution added in v1.0.0

func (e *Executor) ResumeExecution(ctx context.Context, execID string, lastState any) error

ResumeExecution resumes a loaded execution from its last checkpoint using the step registry.

func (*Executor) Start

func (e *Executor) Start(ctx context.Context, execID string, initialState any) error

Start begins a long-running execution.

type ExecutorConfig

type ExecutorConfig struct {
	CheckpointInterval time.Duration `json:"checkpoint_interval"`
	CheckpointDir      string        `json:"checkpoint_dir"`
	MaxRetries         int           `json:"max_retries"`
	HeartbeatInterval  time.Duration `json:"heartbeat_interval"`
	AutoResume         bool          `json:"auto_resume"`
}

ExecutorConfig configures the long-running executor.

func DefaultExecutorConfig

func DefaultExecutorConfig() ExecutorConfig

DefaultExecutorConfig returns the default configuration.

type ExecutorOption added in v1.0.0

type ExecutorOption func(*Executor)

ExecutorOption configures the Executor.

func WithCheckpointStore added in v1.0.0

func WithCheckpointStore(store CheckpointStore) ExecutorOption

WithCheckpointStore sets a custom CheckpointStore on the Executor.

type FileCheckpointStore added in v1.0.0

type FileCheckpointStore struct {
	// contains filtered or unexported fields
}

FileCheckpointStore is the default filesystem-based implementation. This extracts the existing os.WriteFile/os.ReadFile behavior from executor.go.

func NewFileCheckpointStore added in v1.0.0

func NewFileCheckpointStore(dir string, logger *zap.Logger) *FileCheckpointStore

NewFileCheckpointStore creates a new filesystem-based checkpoint store.

func (*FileCheckpointStore) DeleteCheckpoint added in v1.0.0

func (s *FileCheckpointStore) DeleteCheckpoint(_ context.Context, execID string) error

DeleteCheckpoint removes a checkpoint file from disk.

func (*FileCheckpointStore) ListCheckpoints added in v1.0.0

func (s *FileCheckpointStore) ListCheckpoints(_ context.Context) ([]*Execution, error)

ListCheckpoints reads all checkpoint files from the directory.

func (*FileCheckpointStore) LoadCheckpoint added in v1.0.0

func (s *FileCheckpointStore) LoadCheckpoint(_ context.Context, execID string) (*Execution, error)

LoadCheckpoint reads an execution from a JSON file on disk.

func (*FileCheckpointStore) SaveCheckpoint added in v1.0.0

func (s *FileCheckpointStore) SaveCheckpoint(_ context.Context, exec *Execution) error

SaveCheckpoint persists execution state to a JSON file on disk.

type NamedStep added in v1.0.0

type NamedStep struct {
	Name string
	Func StepFunc
}

NamedStep pairs a step function with a name for checkpoint-based resume.

type PersistentCheckpointStore added in v1.0.0

type PersistentCheckpointStore struct {
	// contains filtered or unexported fields
}

PersistentCheckpointStore implements CheckpointStore using TaskStoreAdapter.

func NewPersistentCheckpointStore added in v1.0.0

func NewPersistentCheckpointStore(store TaskStoreAdapter, logger *zap.Logger) *PersistentCheckpointStore

NewPersistentCheckpointStore creates a checkpoint store backed by a TaskStoreAdapter.

func (*PersistentCheckpointStore) DeleteCheckpoint added in v1.0.0

func (s *PersistentCheckpointStore) DeleteCheckpoint(ctx context.Context, execID string) error

DeleteCheckpoint removes a TaskRecord by ID.

func (*PersistentCheckpointStore) ListCheckpoints added in v1.0.0

func (s *PersistentCheckpointStore) ListCheckpoints(ctx context.Context) ([]*Execution, error)

ListCheckpoints retrieves all TaskRecords and unmarshals each to an Execution.

func (*PersistentCheckpointStore) LoadCheckpoint added in v1.0.0

func (s *PersistentCheckpointStore) LoadCheckpoint(ctx context.Context, execID string) (*Execution, error)

LoadCheckpoint retrieves a TaskRecord and unmarshals it back to an Execution.

func (*PersistentCheckpointStore) SaveCheckpoint added in v1.0.0

func (s *PersistentCheckpointStore) SaveCheckpoint(ctx context.Context, exec *Execution) error

SaveCheckpoint marshals the Execution to JSON and saves it as a TaskRecord.

type StepFunc

type StepFunc func(ctx context.Context, state any) (any, error)

StepFunc represents a single step in a long-running execution.

type StepRegistry added in v1.0.0

type StepRegistry struct {
	// contains filtered or unexported fields
}

StepRegistry allows named step registration for resume capability.

func NewStepRegistry added in v1.0.0

func NewStepRegistry() *StepRegistry

NewStepRegistry creates a new step registry.

func (*StepRegistry) Get added in v1.0.0

func (r *StepRegistry) Get(name string) (StepFunc, bool)

Get retrieves a step function by name.

func (*StepRegistry) Register added in v1.0.0

func (r *StepRegistry) Register(name string, fn StepFunc)

Register adds a named step function to the registry.

type TaskRecord added in v1.0.0

type TaskRecord struct {
	ID       string
	Status   string
	Progress float64
	Data     []byte // JSON-serialized Execution
	Metadata map[string]string
}

TaskRecord is the local representation of a persistent task.

type TaskStoreAdapter added in v1.0.0

type TaskStoreAdapter interface {
	SaveTask(ctx context.Context, task *TaskRecord) error
	GetTask(ctx context.Context, taskID string) (*TaskRecord, error)
	ListTasks(ctx context.Context) ([]*TaskRecord, error)
	DeleteTask(ctx context.Context, taskID string) error
	UpdateProgress(ctx context.Context, taskID string, progress float64) error
	UpdateStatus(ctx context.Context, taskID string, status string) error
}

TaskStoreAdapter is a local interface matching the subset of persistence.TaskStore needed for checkpoint storage. Using local interface pattern to avoid direct import of persistence in most of the longrunning package.

type TaskStoreBridge added in v1.0.0

type TaskStoreBridge struct {
	// contains filtered or unexported fields
}

TaskStoreBridge adapts persistence.TaskStore to TaskStoreAdapter. This is the only file in the longrunning package that imports persistence.

func NewTaskStoreBridge added in v1.0.0

func NewTaskStoreBridge(store persistence.TaskStore) *TaskStoreBridge

NewTaskStoreBridge creates a new bridge from persistence.TaskStore to TaskStoreAdapter.

func (*TaskStoreBridge) DeleteTask added in v1.0.0

func (b *TaskStoreBridge) DeleteTask(ctx context.Context, taskID string) error

DeleteTask deletes a task by ID.

func (*TaskStoreBridge) GetTask added in v1.0.0

func (b *TaskStoreBridge) GetTask(ctx context.Context, taskID string) (*TaskRecord, error)

GetTask retrieves a persistence.AsyncTask and converts it to a TaskRecord.

func (*TaskStoreBridge) ListTasks added in v1.0.0

func (b *TaskStoreBridge) ListTasks(ctx context.Context) ([]*TaskRecord, error)

ListTasks retrieves all tasks and converts them to TaskRecords.

func (*TaskStoreBridge) SaveTask added in v1.0.0

func (b *TaskStoreBridge) SaveTask(ctx context.Context, task *TaskRecord) error

SaveTask converts a TaskRecord to persistence.AsyncTask and saves it.

func (*TaskStoreBridge) UpdateProgress added in v1.0.0

func (b *TaskStoreBridge) UpdateProgress(ctx context.Context, taskID string, progress float64) error

UpdateProgress updates the progress of a task.

func (*TaskStoreBridge) UpdateStatus added in v1.0.0

func (b *TaskStoreBridge) UpdateStatus(ctx context.Context, taskID string, status string) error

UpdateStatus updates the status of a task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL