workflow

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint struct {
	ID        string         `json:"id"`
	StepID    string         `json:"step_id"`
	State     map[string]any `json:"state"`
	CreatedAt time.Time      `json:"created_at"`
}

Checkpoint represents a restorable point in workflow execution

type ExecuteOptions

type ExecuteOptions struct {
	// ResumeFromCheckpoint enables resumption from last checkpoint
	ResumeFromCheckpoint bool
	// Context provides initial workflow context
	Context map[string]any
	// ExecutionID allows specifying a custom execution ID
	ExecutionID string
}

ExecuteOptions configures workflow execution

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

Executor executes workflows with persistence

func NewExecutor

func NewExecutor(store Store) *Executor

NewExecutor creates a new workflow executor

func (*Executor) Execute

func (e *Executor) Execute(ctx context.Context, workflowID string, opts *ExecuteOptions) (*State, error)

Execute starts or resumes workflow execution

func (*Executor) GetState

func (e *Executor) GetState(executionID string) (*State, error)

GetState returns the current state of a workflow execution

func (*Executor) ListExecutions

func (e *Executor) ListExecutions(workflowID string) ([]*State, error)

ListExecutions returns all executions for a workflow

func (*Executor) Pause

func (e *Executor) Pause(executionID string) error

Pause pauses a running workflow

func (*Executor) RegisterWorkflow

func (e *Executor) RegisterWorkflow(workflow *Workflow) error

RegisterWorkflow registers a workflow with the executor

func (*Executor) Resume

func (e *Executor) Resume(ctx context.Context, executionID string) (*State, error)

Resume resumes a paused workflow

type FileStore

type FileStore struct {
	// contains filtered or unexported fields
}

FileStore implements Store using the filesystem

func NewFileStore

func NewFileStore(baseDir string) (*FileStore, error)

NewFileStore creates a new file-based workflow store

func (*FileStore) Delete

func (s *FileStore) Delete(executionID string) error

Delete removes a workflow state file

func (*FileStore) List

func (s *FileStore) List(workflowID string) ([]*State, error)

List returns all workflow states

func (*FileStore) Load

func (s *FileStore) Load(executionID string) (*State, error)

Load retrieves workflow state from a file

func (*FileStore) LoadLatestCheckpoint

func (s *FileStore) LoadLatestCheckpoint(executionID string) (*Checkpoint, error)

LoadLatestCheckpoint loads the most recent checkpoint

func (*FileStore) Save

func (s *FileStore) Save(state *State) error

Save persists workflow state to a file

func (*FileStore) SaveCheckpoint

func (s *FileStore) SaveCheckpoint(executionID string, checkpoint *Checkpoint) error

SaveCheckpoint saves a checkpoint for recovery

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore implements Store using in-memory storage (useful for testing)

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore creates a new in-memory workflow store

func (*MemoryStore) Delete

func (s *MemoryStore) Delete(executionID string) error

Delete removes workflow state from memory

func (*MemoryStore) List

func (s *MemoryStore) List(workflowID string) ([]*State, error)

List returns all workflow states from memory

func (*MemoryStore) Load

func (s *MemoryStore) Load(executionID string) (*State, error)

Load retrieves workflow state from memory

func (*MemoryStore) LoadLatestCheckpoint

func (s *MemoryStore) LoadLatestCheckpoint(executionID string) (*Checkpoint, error)

LoadLatestCheckpoint loads the most recent checkpoint from memory

func (*MemoryStore) Save

func (s *MemoryStore) Save(state *State) error

Save persists workflow state in memory

func (*MemoryStore) SaveCheckpoint

func (s *MemoryStore) SaveCheckpoint(executionID string, checkpoint *Checkpoint) error

SaveCheckpoint saves a checkpoint in memory

type State

type State struct {
	ID          string         `json:"id"`
	WorkflowID  string         `json:"workflow_id"`
	Status      Status         `json:"status"`
	CurrentStep string         `json:"current_step"`
	StepStates  map[string]any `json:"step_states"`
	Context     map[string]any `json:"context"`
	StartedAt   time.Time      `json:"started_at"`
	UpdatedAt   time.Time      `json:"updated_at"`
	CompletedAt *time.Time     `json:"completed_at,omitempty"`
	Error       string         `json:"error,omitempty"`
	Checkpoints []Checkpoint   `json:"checkpoints,omitempty"`
}

State represents the current state of a workflow execution

type Status

type Status string

Status represents workflow execution status

const (
	StatusPending   Status = "pending"
	StatusRunning   Status = "running"
	StatusPaused    Status = "paused"
	StatusCompleted Status = "completed"
	StatusFailed    Status = "failed"
	StatusCancelled Status = "cancelled"
)

type Step

type Step struct {
	ID          string         `json:"id"`
	Name        string         `json:"name"`
	Description string         `json:"description,omitempty"`
	Handler     StepHandler    `json:"-"`
	NextSteps   []string       `json:"next_steps,omitempty"`
	Config      map[string]any `json:"config,omitempty"`
	Retries     int            `json:"retries,omitempty"`
	Timeout     time.Duration  `json:"timeout,omitempty"`
}

Step represents a single step in a workflow

type StepHandler

type StepHandler func(ctx context.Context, input map[string]any) (map[string]any, error)

StepHandler is the function that executes a step

type Store

type Store interface {
	// Save persists the current workflow state
	Save(state *State) error

	// Load retrieves a workflow state by execution ID
	Load(executionID string) (*State, error)

	// List returns all workflow states, optionally filtered by workflow ID
	List(workflowID string) ([]*State, error)

	// Delete removes a workflow state
	Delete(executionID string) error

	// SaveCheckpoint saves a checkpoint for recovery
	SaveCheckpoint(executionID string, checkpoint *Checkpoint) error

	// LoadLatestCheckpoint loads the most recent checkpoint
	LoadLatestCheckpoint(executionID string) (*Checkpoint, error)
}

Store defines the interface for workflow state persistence

type Workflow

type Workflow struct {
	ID          string           `json:"id"`
	Name        string           `json:"name"`
	Description string           `json:"description,omitempty"`
	Steps       map[string]*Step `json:"steps"`
	StartStep   string           `json:"start_step"`
}

Workflow defines a workflow with steps

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL