runtime

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package runtime implements the workflow execution engine. It walks the compiled IR graph node by node, persists outputs and artifacts via the store, evaluates edge conditions and loop counters, and emits lifecycle events. It supports both sequential execution and parallel fan-out/join patterns via a bounded branch scheduler.

Package runtime — git worktree helpers for `worktree: auto` workflows.

When a workflow declares `worktree: auto`, the engine creates a fresh git worktree at run start (under <store-dir>/worktrees/<run-id>) so the run executes in an isolated checkout. This decouples the run's mutations from the user's main working tree — WIP stays invisible, the run's commits land via the shared .git, and a failed run leaves the worktree in place for inspection.

On a successful run, finalizeWorktree promotes any commits the run produced onto a persistent branch (default `iterion/run/<friendly>`) and best-effort fast-forwards the user's checked-out branch, then removes the worktree directory. Without that promotion the commits are reachable only via reflog and are eligible for GC.

Index

Constants

This section is empty.

Variables

View Source
var ErrBudgetExceeded = fmt.Errorf("runtime: budget exceeded")

ErrBudgetExceeded is returned when a budget limit has been reached.

View Source
var ErrCompactionUnsupported = model.ErrCompactionUnsupported

ErrCompactionUnsupported is re-exported from the model package so runtime callers can match on it without importing model directly. This is a const alias — the canonical sentinel lives in model/.

View Source
var ErrRunCancelled = errors.New("runtime: run cancelled")

ErrRunCancelled is returned when a run is interrupted by context cancellation (e.g. SIGINT). Distinguished from failures so callers can handle cancellation gracefully.

View Source
var ErrRunPaused = errors.New("runtime: run paused waiting for human input")

ErrRunPaused is returned by Run or Resume when execution is suspended at a human node. This is not a failure — the run can be resumed via Engine.Resume.

View Source
var ErrServerDraining = errors.New("runtime: server draining")

ErrServerDraining is returned by the runview Service when Launch or Resume is called after the server has begun graceful shutdown. The HTTP layer translates this to 503 Service Unavailable.

Functions

This section is empty.

Types

type Compactor

type Compactor interface {
	Compact(ctx context.Context, nodeID string) error
}

Compactor is an optional executor capability surfaced for RecoveryCompactAndRetry. Backends that can drop older conversation turns (e.g. claw's ConversationLoop.Compact) implement it structurally; the engine falls back to a plain retry when the underlying executor does not. Compact may return model.ErrCompactionUnsupported to signal an architectural no-op without alarming the operator.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine executes workflows. It supports sequential execution and parallel fan-out via bounded branch scheduling.

func New

func New(wf *ir.Workflow, s *store.RunStore, exec NodeExecutor, opts ...EngineOption) *Engine

New creates a new Engine for a raw workflow.

func NewFromRecipe

func NewFromRecipe(r *recipe.RecipeSpec, wf *ir.Workflow, s *store.RunStore, exec NodeExecutor, opts ...EngineOption) (*Engine, error)

NewFromRecipe creates a new Engine by applying a recipe's presets onto the given workflow. The recipe merges preset variables, prompt overrides, and budget limits, producing a self-contained execution unit.

func (*Engine) Resume

func (e *Engine) Resume(ctx context.Context, runID string, answers map[string]interface{}) error

Resume resumes a paused or failed-resumable run. For paused runs, human answers are recorded and execution continues from the human node. For failed-resumable runs, execution restarts from the node after the last successfully completed one (re-executing the failed node).

func (*Engine) Run

func (e *Engine) Run(ctx context.Context, runID string, inputs map[string]interface{}) error

Run executes the workflow. It creates a run, walks the graph from the entry node, and returns when a terminal node is reached, a human pause is hit (ErrRunPaused), or an error occurs.

type EngineOption

type EngineOption func(*Engine)

EngineOption configures an Engine.

func WithBranchName

func WithBranchName(name string) EngineOption

WithBranchName overrides the storage branch name for the worktree finalization. The default `iterion/run/<runName>` is used when this is empty. The branch is always created (it is the GC guard for the run's commits); on collision the engine appends a numeric suffix.

No effect on runs without `worktree: auto`.

func WithEventObserver

func WithEventObserver(fn func(evt store.Event)) EngineOption

WithEventObserver registers a callback invoked after every successful event append (including branch_started/finished). It must be safe for concurrent use; the callback runs in the goroutine that emitted the event. Use it to fan out events to non-store observers (Prometheus, custom metrics) without changing the persistence layer.

func WithFilePath

func WithFilePath(path string) EngineOption

WithFilePath records the absolute .iter source path on the run metadata so that resume (and the run console) can re-locate the workflow without the caller having to thread it back through the API. Optional — empty string is ignored.

func WithForceResume

func WithForceResume(force bool) EngineOption

WithForceResume allows resuming a run even when the workflow source has changed since the run was started. The hash mismatch is logged as a warning instead of causing an error.

func WithLogger

func WithLogger(l *iterlog.Logger) EngineOption

WithLogger sets a leveled logger for console output during execution.

func WithMergeInto

func WithMergeInto(target string) EngineOption

WithMergeInto controls the worktree-finalization fast-forward target for `worktree: auto` runs. Values:

  • "" or "current" → fast-forward the user's currently-checked-out branch (default behaviour)
  • "none" → skip the fast-forward; the storage branch remains the only landing
  • <branch-name> → fast-forward this named branch (only honoured when it matches the currently-checked-out branch; otherwise a warning is logged and the FF is skipped)

No effect on runs without `worktree: auto`.

func WithOnNodeFinished

func WithOnNodeFinished(fn func(nodeID string, output map[string]interface{})) EngineOption

WithOnNodeFinished registers a callback invoked after each node finishes with the node's ID and output. The callback must be safe for concurrent use.

func WithOutputValidation

func WithOutputValidation(enabled bool) EngineOption

WithOutputValidation enables post-execution validation of node outputs against their declared output schemas. When enabled, a node whose output does not conform to its schema will cause the run to fail immediately.

func WithRecoveryDispatch

func WithRecoveryDispatch(d RecoveryDispatch) EngineOption

WithRecoveryDispatch installs the dispatcher consulted when a node's executor returns an error. The dispatcher decides between retry, compact-and-retry, pause for human, and terminal failure. When unset, every error falls straight through to failed_resumable (legacy behaviour).

Build the dispatcher with recovery.Dispatch(recovery.DefaultRecipes()).

func WithRunName

func WithRunName(name string) EngineOption

WithRunName records a deterministic, human-friendly label on the run metadata at creation. Display-only — the canonical identifier remains the run ID. Optional — empty string is ignored.

func WithWorkDir

func WithWorkDir(dir string) EngineOption

WithWorkDir sets the working directory used for backend subprocesses and for resolving the `${PROJECT_DIR}` placeholder in workflow var defaults. When unset, defaults to os.Getwd() at Run() time. With worktree: auto on the workflow, the engine overrides this with the per-run worktree path.

func WithWorkflowHash

func WithWorkflowHash(hash string) EngineOption

WithWorkflowHash sets a hash of the .iter source so that Resume can detect if the workflow changed since the run was started.

type ErrorCode

type ErrorCode string

ErrorCode categorizes runtime errors for programmatic handling.

const (
	ErrCodeNodeNotFound          ErrorCode = "NODE_NOT_FOUND"
	ErrCodeNoOutgoingEdge        ErrorCode = "NO_OUTGOING_EDGE"
	ErrCodeLoopExhausted         ErrorCode = "LOOP_EXHAUSTED"
	ErrCodeBudgetExceeded        ErrorCode = "BUDGET_EXCEEDED"
	ErrCodeExecutionFailed       ErrorCode = "EXECUTION_FAILED"
	ErrCodeWorkspaceSafety       ErrorCode = "WORKSPACE_SAFETY"
	ErrCodeTimeout               ErrorCode = "TIMEOUT"
	ErrCodeCancelled             ErrorCode = "CANCELLED"
	ErrCodeJoinFailed            ErrorCode = "JOIN_FAILED"
	ErrCodeResumeInvalid         ErrorCode = "RESUME_INVALID"
	ErrCodeSchemaValidation      ErrorCode = "SCHEMA_VALIDATION"
	ErrCodeRateLimited           ErrorCode = "RATE_LIMITED"
	ErrCodeContextLengthExceeded ErrorCode = "CONTEXT_LENGTH_EXCEEDED"
	ErrCodeToolFailedTransient   ErrorCode = "TOOL_FAILED_TRANSIENT"
	ErrCodeToolFailedPermanent   ErrorCode = "TOOL_FAILED_PERMANENT"
)

type NodeExecutor

type NodeExecutor interface {
	// Execute runs the given node with the provided input and returns its
	// output. For terminal nodes (done/fail) this is never called.
	Execute(ctx context.Context, node ir.Node, input map[string]interface{}) (map[string]interface{}, error)
}

NodeExecutor is the abstraction called by the engine to actually run a node (LLM call, tool invocation, etc.). The runtime itself is agnostic to the concrete implementation — tests supply stubs, production code plugs in real providers.

type RecoveryAction

type RecoveryAction struct {
	Kind         RecoveryActionKind
	Delay        time.Duration
	AttemptsLeft int
	Reason       string
}

RecoveryAction is the engine-facing decision returned by a RecoveryDispatch. The zero value (RecoveryRetrySameNode with no delay, no attempts left) is safe to apply.

type RecoveryActionKind

type RecoveryActionKind int

RecoveryActionKind enumerates how the engine should handle a node failure.

const (
	RecoveryRetrySameNode RecoveryActionKind = iota
	// RecoveryCompactAndRetry: the engine asks the executor to drop
	// older conversation turns first; falls back to a plain retry when
	// the executor does not implement Compactor.
	RecoveryCompactAndRetry
	// RecoveryPauseForHuman writes a synthetic interaction so the run
	// is resumable via `iterion resume --answers-file`.
	RecoveryPauseForHuman
	// RecoveryFailTerminal still produces a checkpoint (failRunWithCheckpoint),
	// just no further retries.
	RecoveryFailTerminal
)

type RecoveryDispatch

type RecoveryDispatch func(ctx context.Context, err error, priorAttempts func(ErrorCode) int) (RecoveryAction, ErrorCode)

RecoveryDispatch is the callback consulted by the engine when a node execution returns an error. The engine passes a `priorAttempts` resolver so the dispatcher can classify the error first and only then look up the per-class attempt count — avoiding a redundant double-call. Implementations classify, look up the recipe, and return the action together with the matched ErrorCode (so the engine can bucket attempt counts on runState).

Implementations live in runtime/recovery so they don't cycle back into runtime; this signature is the only contract the engine cares about.

type RuntimeError

type RuntimeError struct {
	Code    ErrorCode // machine-readable error category
	Message string    // human-readable description
	NodeID  string    // node where the error originated (may be empty)
	Hint    string    // suggested resolution for the user
	Cause   error     // underlying error (may be nil)
}

RuntimeError is a structured error carrying a machine-readable code, the node where the error occurred, and a human-friendly hint for resolution. It implements the error interface and can wrap an underlying cause.

func (*RuntimeError) Error

func (e *RuntimeError) Error() string

func (*RuntimeError) Unwrap

func (e *RuntimeError) Unwrap() error

type SharedBudget

type SharedBudget struct {
	// contains filtered or unexported fields
}

SharedBudget tracks resource consumption across a workflow run. It is safe for concurrent use by parallel branches (first-come-first-served).

Budget enforcement is "soft": because nodes are checked before execution and recorded after, concurrent branches may slightly exceed limits when multiple nodes pass the pre-check simultaneously. This is by design — hard enforcement would require holding the lock across the entire node execution, which would serialize all parallel branches.

func (*SharedBudget) Check

func (b *SharedBudget) Check() []budgetCheckResult

Check checks current budget status without recording usage.

func (*SharedBudget) RecordUsage

func (b *SharedBudget) RecordUsage(tokens int, costUSD float64) []budgetCheckResult

RecordUsage records resource consumption from a node execution and returns check results. tokens and costUSD may be zero if the executor does not report them.

Because budget enforcement is soft (pre-check and post-record are not atomic), concurrent branches may push usage past the limit. When overage exceeds 20% of the limit, a warning is logged to aid debugging.

Directories

Path Synopsis
Package recovery defines typed recovery recipes that decide what to do when a node fails.
Package recovery defines typed recovery recipes that decide what to do when a node fails.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL