Documentation
¶
Index ¶
- Constants
- Variables
- func CancelWorkflow(ctx context.Context, wfFSM *WorkflowFSM, stepFSM *StepFSM, workflowID string, ...) error
- func ComputeBackoff(policy *schema.RetryPolicy, attempt int) time.Duration
- func IsRetryableError(err error) bool
- func WaitForBackoff(ctx context.Context, delay time.Duration) error
- type CircuitBreakerConfig
- type CircuitBreakerRegistry
- func (r *CircuitBreakerRegistry) AllowRequest(actionName string) error
- func (r *CircuitBreakerRegistry) GetState(actionName string) CircuitState
- func (r *CircuitBreakerRegistry) GetStats(actionName string) map[string]any
- func (r *CircuitBreakerRegistry) RecordFailure(actionName string) CircuitState
- func (r *CircuitBreakerRegistry) RecordSuccess(actionName string)
- type CircuitState
- type DAG
- type ErrorHandlerResult
- type EventAppender
- type EventLogger
- type ExecutionResult
- type Executor
- type ExecutorConfig
- type PoolMetrics
- type StepFSM
- type StepResult
- type TransitionHook
- type WorkerPool
- type WorkflowFSM
- type WorkflowStatus
Constants ¶
const DefaultOnTimeout = "fail"
DefaultOnTimeout is the default behavior when a workflow-level timeout fires.
const DefaultPoolSize = 10
DefaultPoolSize is the default worker pool concurrency.
Variables ¶
var ErrPoolShutdown = errors.New("worker pool is shut down")
ErrPoolShutdown is returned when work is submitted to a shut-down pool.
var ValidStepTransitions = map[schema.StepStatus][]schema.StepStatus{ schema.StepStatusPending: {schema.StepStatusScheduled, schema.StepStatusSkipped}, schema.StepStatusScheduled: {schema.StepStatusRunning, schema.StepStatusSkipped, schema.StepStatusSuspended}, schema.StepStatusRunning: {schema.StepStatusCompleted, schema.StepStatusFailed, schema.StepStatusSuspended, schema.StepStatusRetrying, schema.StepStatusSkipped}, schema.StepStatusRetrying: {schema.StepStatusRunning, schema.StepStatusFailed}, schema.StepStatusSuspended: {schema.StepStatusRunning, schema.StepStatusFailed, schema.StepStatusSkipped}, schema.StepStatusCompleted: {}, schema.StepStatusFailed: {}, schema.StepStatusSkipped: {}, }
ValidStepTransitions defines the allowed state transitions for steps.
var ValidWorkflowTransitions = map[schema.WorkflowStatus][]schema.WorkflowStatus{ schema.WorkflowStatusPending: {schema.WorkflowStatusActive, schema.WorkflowStatusCancelled}, schema.WorkflowStatusActive: {schema.WorkflowStatusSuspended, schema.WorkflowStatusCompleted, schema.WorkflowStatusFailed, schema.WorkflowStatusCancelled}, schema.WorkflowStatusSuspended: {schema.WorkflowStatusActive, schema.WorkflowStatusCancelled, schema.WorkflowStatusFailed}, schema.WorkflowStatusCompleted: {}, schema.WorkflowStatusFailed: {}, schema.WorkflowStatusCancelled: {}, }
ValidWorkflowTransitions defines the allowed state transitions for workflows.
Functions ¶
func CancelWorkflow ¶
func CancelWorkflow(ctx context.Context, wfFSM *WorkflowFSM, stepFSM *StepFSM, workflowID string, currentStatus schema.WorkflowStatus, stepStates map[string]schema.StepStatus) error
CancelWorkflow transitions a workflow to cancelled and skips all non-terminal steps. stepStates is a map of step_id -> current StepStatus for all known steps.
func ComputeBackoff ¶
func ComputeBackoff(policy *schema.RetryPolicy, attempt int) time.Duration
ComputeBackoff calculates the delay before the next retry attempt. Supports none, constant, linear, and exponential backoff with optional max_delay cap.
func IsRetryableError ¶
IsRetryableError classifies whether an error should be retried. Retryable by default: network errors, timeouts, context.DeadlineExceeded. Non-retryable: validation errors, permission denied, typed OpcodeErrors with non-retryable codes.
Types ¶
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
// FailureThreshold is the number of consecutive failures before opening the circuit.
FailureThreshold int
// Cooldown is how long the circuit stays open before transitioning to half-open.
Cooldown time.Duration
// HalfOpenMax is the number of test requests allowed in half-open state.
HalfOpenMax int
}
CircuitBreakerConfig configures the circuit breaker behavior.
func DefaultCircuitBreakerConfig ¶
func DefaultCircuitBreakerConfig() CircuitBreakerConfig
DefaultCircuitBreakerConfig returns a sensible default configuration.
type CircuitBreakerRegistry ¶
type CircuitBreakerRegistry struct {
// contains filtered or unexported fields
}
CircuitBreakerRegistry manages per-action circuit breakers.
func NewCircuitBreakerRegistry ¶
func NewCircuitBreakerRegistry(config CircuitBreakerConfig) *CircuitBreakerRegistry
NewCircuitBreakerRegistry creates a new registry with the given config.
func (*CircuitBreakerRegistry) AllowRequest ¶
func (r *CircuitBreakerRegistry) AllowRequest(actionName string) error
AllowRequest checks whether a request to the given action is allowed. Returns nil if allowed, or an OpcodeError if the circuit is open.
func (*CircuitBreakerRegistry) GetState ¶
func (r *CircuitBreakerRegistry) GetState(actionName string) CircuitState
GetState returns the current state of the circuit for an action.
func (*CircuitBreakerRegistry) GetStats ¶
func (r *CircuitBreakerRegistry) GetStats(actionName string) map[string]any
GetStats returns diagnostic information about a circuit breaker.
func (*CircuitBreakerRegistry) RecordFailure ¶
func (r *CircuitBreakerRegistry) RecordFailure(actionName string) CircuitState
RecordFailure records a failed execution for the action. Returns the new circuit state.
func (*CircuitBreakerRegistry) RecordSuccess ¶
func (r *CircuitBreakerRegistry) RecordSuccess(actionName string)
RecordSuccess records a successful execution for the action.
type CircuitState ¶
type CircuitState int
CircuitState represents the state of a circuit breaker.
const ( CircuitClosed CircuitState = iota // Normal operation CircuitOpen // Failing, rejecting calls CircuitHalfOpen // Testing recovery )
func (CircuitState) String ¶
func (s CircuitState) String() string
type DAG ¶
type DAG struct {
Steps map[string]*schema.StepDefinition // step ID → definition
Edges map[string][]string // step ID → dependencies (depends_on)
Reverse map[string][]string // step ID → dependents (who depends on me)
Sorted []string // topological order
Roots []string // steps with no dependencies
Levels [][]string // parallel execution levels
}
DAG is the in-memory directed acyclic graph representation of a workflow. Built from a WorkflowDefinition, used by the Executor to determine execution order.
type ErrorHandlerResult ¶
type ErrorHandlerResult struct {
// Handled is true if the error was handled (ignore, fallback).
Handled bool
// FallbackStepID is set when the handler triggers a fallback step.
FallbackStepID string
// ShouldFailWorkflow is true when the handler explicitly requests workflow failure.
ShouldFailWorkflow bool
}
ErrorHandlerResult describes the outcome of an error handler invocation.
func HandleStepError ¶
func HandleStepError( ctx context.Context, eventLog EventAppender, workflowID, stepID string, handler *schema.ErrorHandler, stepErr error, ) (*ErrorHandlerResult, error)
HandleStepError evaluates the on_error handler for a step and determines the next action. It logs the invocation as an event. If no handler is configured, it returns unhandled.
type EventAppender ¶
EventAppender is satisfied by the Store and EventLog; used by FSMs to emit events on transitions.
type EventLogger ¶
type EventLogger interface {
EventAppender
GetEvents(ctx context.Context, workflowID string, since int64) ([]*store.Event, error)
GetEventsByType(ctx context.Context, eventType string, filter store.EventFilter) ([]*store.Event, error)
ReplayEvents(ctx context.Context, workflowID string) (map[string]*store.StepState, error)
}
EventLogger abstracts the event log operations needed by the executor. Satisfied by *store.EventLog and test mocks.
type ExecutionResult ¶
type ExecutionResult struct {
WorkflowID string `json:"workflow_id"`
Status schema.WorkflowStatus `json:"status"`
Output json.RawMessage `json:"output,omitempty"`
Error *schema.OpcodeError `json:"error,omitempty"`
StartedAt time.Time `json:"started_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Steps map[string]*StepResult `json:"steps,omitempty"`
}
ExecutionResult is returned by Run and Resume with the workflow outcome.
type Executor ¶
type Executor interface {
// Run starts a new workflow from a persisted Workflow record.
Run(ctx context.Context, wf *store.Workflow, params map[string]any) (*ExecutionResult, error)
// Resume continues a suspended or interrupted workflow from its last checkpoint.
// Replays events to rebuild state, then continues from first pending step.
// Reasoning nodes are NEVER replayed — stored decisions are injected.
Resume(ctx context.Context, workflowID string) (*ExecutionResult, error)
// Signal delivers an agent message to a suspended workflow.
Signal(ctx context.Context, workflowID string, signal schema.Signal) error
// Extend mutates the DAG of a running workflow in-flight.
Extend(ctx context.Context, workflowID string, mutation schema.DAGMutation) error
// Cancel terminates a workflow with a reason, cascading to active steps.
Cancel(ctx context.Context, workflowID string, reason string) error
// Status returns the current state of a workflow.
Status(ctx context.Context, workflowID string) (*WorkflowStatus, error)
}
Executor is the central workflow execution coordinator.
func NewExecutor ¶
func NewExecutor(s store.Store, el EventLogger, registry actions.ActionRegistry, cfg ExecutorConfig, vault ...secrets.Vault) Executor
NewExecutor creates a new Executor with the given dependencies. vault is optional (nil = secrets interpolation disabled).
type ExecutorConfig ¶
type ExecutorConfig struct {
PoolSize int // max concurrent step goroutines
CircuitBreaker *CircuitBreakerConfig // circuit breaker config (nil = defaults)
}
ExecutorConfig holds configuration for the executor.
type PoolMetrics ¶
type PoolMetrics struct {
Active int64 `json:"active"`
Completed int64 `json:"completed"`
Failed int64 `json:"failed"`
Panics int64 `json:"panics"`
}
PoolMetrics tracks worker pool operational metrics.
type StepFSM ¶
type StepFSM struct {
// contains filtered or unexported fields
}
StepFSM manages step lifecycle state transitions.
func NewStepFSM ¶
func NewStepFSM(appender EventAppender) *StepFSM
NewStepFSM creates a new StepFSM that emits events via the given appender.
func (*StepFSM) OnAfter ¶
func (f *StepFSM) OnAfter(from, to schema.StepStatus, hook TransitionHook)
OnAfter registers a hook called after a step transition.
func (*StepFSM) OnBefore ¶
func (f *StepFSM) OnBefore(from, to schema.StepStatus, hook TransitionHook)
OnBefore registers a hook called before a step transition.
func (*StepFSM) Transition ¶
func (f *StepFSM) Transition(ctx context.Context, workflowID, stepID string, from, to schema.StepStatus) error
Transition validates and executes a step state transition. It emits the corresponding event via the appender.
type StepResult ¶
type StepResult struct {
StepID string `json:"step_id"`
Status schema.StepStatus `json:"status"`
Output json.RawMessage `json:"output,omitempty"`
Error *schema.OpcodeError `json:"error,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
}
StepResult summarizes the outcome of a single step.
type TransitionHook ¶
TransitionHook is called before or after a state transition.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool is a bounded goroutine pool for concurrent step execution.
func NewWorkerPool ¶
func NewWorkerPool(size int) *WorkerPool
NewWorkerPool creates a pool with the given max concurrency.
func (*WorkerPool) Metrics ¶
func (p *WorkerPool) Metrics() PoolMetrics
Metrics returns a snapshot of the current pool metrics.
func (*WorkerPool) Shutdown ¶
func (p *WorkerPool) Shutdown()
Shutdown gracefully stops the pool. It prevents new submissions and waits for all active work to complete.
func (*WorkerPool) Submit ¶
Submit enqueues work into the pool. It blocks if the pool is at capacity (backpressure) and respects context cancellation while waiting. Returns ErrPoolShutdown if the pool has been shut down.
func (*WorkerPool) Wait ¶
func (p *WorkerPool) Wait()
Wait blocks until all submitted work completes.
type WorkflowFSM ¶
type WorkflowFSM struct {
// contains filtered or unexported fields
}
WorkflowFSM manages workflow lifecycle state transitions.
func NewWorkflowFSM ¶
func NewWorkflowFSM(appender EventAppender) *WorkflowFSM
NewWorkflowFSM creates a new WorkflowFSM that emits events via the given appender.
func (*WorkflowFSM) OnAfter ¶
func (f *WorkflowFSM) OnAfter(from, to schema.WorkflowStatus, hook TransitionHook)
OnAfter registers a hook called after a workflow transition.
func (*WorkflowFSM) OnBefore ¶
func (f *WorkflowFSM) OnBefore(from, to schema.WorkflowStatus, hook TransitionHook)
OnBefore registers a hook called before a workflow transition.
func (*WorkflowFSM) Transition ¶
func (f *WorkflowFSM) Transition(ctx context.Context, workflowID string, from, to schema.WorkflowStatus) error
Transition validates and executes a workflow state transition. It emits the corresponding event via the appender. The caller (Executor) is responsible for persisting the new state to the store.
type WorkflowStatus ¶
type WorkflowStatus struct {
WorkflowID string `json:"workflow_id"`
Status schema.WorkflowStatus `json:"status"`
Steps map[string]*store.StepState `json:"steps,omitempty"`
PendingDecisions []*store.PendingDecision `json:"pending_decisions,omitempty"`
Context *store.WorkflowContext `json:"context,omitempty"`
Events []*store.Event `json:"events,omitempty"`
}
WorkflowStatus is a snapshot of a workflow's current state for querying.