store

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package store implements the file-backed persistence layer for iterion runs. It manages the lifecycle of runs, their events, artifacts, and human interactions using a local filesystem layout:

runs/<run_id>/run.json
runs/<run_id>/events.jsonl
runs/<run_id>/artifacts/<node>/<version>.json
runs/<run_id>/interactions/<interaction_id>.json

Index

Constants

View Source
const RunFormatVersion = 1

RunFormatVersion is the current version of the persisted run.json format. Bump this when making breaking changes to the Run struct.

View Source
const StoreDirName = ".iterion"

StoreDirName is the conventional directory name for an iterion run store.

Variables

This section is empty.

Functions

func GenerateRunName

func GenerateRunName(seed string) string

GenerateRunName derives a stable, human-friendly run label from an opaque seed. Same seed → same name. The output is kebab-case ASCII, path- and URL-safe: <adjective>-<noun>-<4hex>, e.g. "swift-cedar-a3f2".

The 4-char hex suffix (16 bits) lifts the namespace from ~6.4k adj-noun pairs to ~419M combinations, keeping pairwise collision risk negligible at any realistic run count.

func ResolveStoreDir

func ResolveStoreDir(start, override string) string

ResolveStoreDir picks the run-store directory shared by the CLI and the editor. An explicit override wins; otherwise it walks up from start looking for an existing .iterion directory (git-style discovery), and falls back to creating one alongside start.

Types

type Artifact

type Artifact struct {
	RunID     string                 `json:"run_id"`
	NodeID    string                 `json:"node_id"`
	Version   int                    `json:"version"`
	Data      map[string]interface{} `json:"data"`
	WrittenAt time.Time              `json:"written_at"`
}

Artifact is a versioned output persisted under artifacts/<node>/<version>.json.

type ArtifactVersionInfo

type ArtifactVersionInfo struct {
	Version   int
	WrittenAt time.Time
}

ArtifactVersionInfo is the lightweight (version, mtime) pair returned by ListArtifactVersions — the directory enumeration without the full body decode that LoadArtifact incurs.

type Checkpoint

type Checkpoint struct {
	NodeID             string                            `json:"node_id"`                        // the node where we paused
	InteractionID      string                            `json:"interaction_id"`                 // pending interaction ID
	Outputs            map[string]map[string]interface{} `json:"outputs"`                        // per-node outputs accumulated so far
	LoopCounters       map[string]int                    `json:"loop_counters"`                  // current loop iteration counts
	RoundRobinCounters map[string]int                    `json:"round_robin_counters,omitempty"` // round-robin router counters (keyed by router node ID)
	// LoopPreviousOutput / LoopCurrentOutput preserve the rotating snapshot
	// of source-node outputs at each loop-edge traversal so that
	// {{loop.<name>.previous_output}} resolves correctly across resume.
	// Without these, a paused/failed run would lose the prior-iteration
	// snapshot and the very next iteration would see nil.
	LoopPreviousOutput map[string]map[string]interface{} `json:"loop_previous_output,omitempty"`
	LoopCurrentOutput  map[string]map[string]interface{} `json:"loop_current_output,omitempty"`
	ArtifactVersions   map[string]int                    `json:"artifact_versions"` // next artifact version per node
	Vars               map[string]interface{}            `json:"vars"`              // resolved workflow variables
	// InteractionQuestions embeds the questions from the interaction record
	// so that resume is self-sufficient even if the interaction file is deleted.
	InteractionQuestions map[string]interface{} `json:"interaction_questions,omitempty"`
	// BackendSessionID is the session ID of a blocked backend, enabling
	// re-invocation with session: inherit on resume.
	BackendSessionID string `json:"backend_session_id,omitempty"`
	// BackendName identifies which backend was used.
	BackendName string `json:"backend_name,omitempty"`
	// BackendConversation is the opaque, backend-specific persisted
	// conversation captured at the moment of an ask_user pause. On
	// resume, the backend rehydrates from this blob (claw: []api.Message)
	// and appends a tool_result block answering BackendPendingToolUseID,
	// avoiding a stateless restart from system+user prompts.
	BackendConversation json.RawMessage `json:"backend_conversation,omitempty"`
	// BackendPendingToolUseID is the ID of the tool_use block awaiting
	// an answer in BackendConversation. Required when BackendConversation
	// is non-nil.
	BackendPendingToolUseID string `json:"backend_pending_tool_use_id,omitempty"`
	// NodeAttempts records prior failed attempts per (node_id, error_code) so
	// that resume preserves the recovery dispatcher's retry budget. Outer key
	// is the node ID, inner key is the runtime error code (string-typed).
	NodeAttempts map[string]map[string]int `json:"node_attempts,omitempty"`
}

Checkpoint captures the runtime state at a pause point (human node or backend interaction), enabling exact resume without replaying upstream nodes.

The checkpoint embedded in run.json is the authoritative source of truth for resume. Events (events.jsonl) are observational only — they are not replayed to reconstruct state. If the checkpoint is lost, recovery is not possible via event replay. The separate interaction file (interactions/<id>.json) is a convenience for tooling; InteractionQuestions is embedded here for resilience.

type Event

type Event struct {
	Seq       int64                  `json:"seq"`       // monotonic sequence within the run
	Timestamp time.Time              `json:"timestamp"` // wall-clock time
	Type      EventType              `json:"type"`
	RunID     string                 `json:"run_id"`
	BranchID  string                 `json:"branch_id,omitempty"`
	NodeID    string                 `json:"node_id,omitempty"`
	Data      map[string]interface{} `json:"data,omitempty"`
}

Event is a single timestamped fact persisted in events.jsonl. The Data field carries event-specific payload; its concrete shape depends on Type.

type EventType

type EventType string

EventType enumerates the minimum events to persist per the V4 plan.

const (
	EventRunStarted           EventType = "run_started"
	EventBranchStarted        EventType = "branch_started"
	EventBranchFinished       EventType = "branch_finished"
	EventNodeStarted          EventType = "node_started"
	EventLLMRequest           EventType = "llm_request"
	EventLLMPrompt            EventType = "llm_prompt"
	EventLLMRetry             EventType = "llm_retry"
	EventNodeRecovery         EventType = "node_recovery"
	EventLLMStepFinished      EventType = "llm_step_finished"
	EventLLMCompacted         EventType = "llm_compacted"
	EventToolCalled           EventType = "tool_called"
	EventToolError            EventType = "tool_error"
	EventArtifactWritten      EventType = "artifact_written"
	EventHumanInputRequested  EventType = "human_input_requested"
	EventRunPaused            EventType = "run_paused"
	EventHumanAnswersRecorded EventType = "human_answers_recorded"
	EventRunResumed           EventType = "run_resumed"
	EventJoinReady            EventType = "join_ready"
	EventNodeFinished         EventType = "node_finished"
	EventEdgeSelected         EventType = "edge_selected"
	EventBudgetWarning        EventType = "budget_warning"
	EventBudgetExceeded       EventType = "budget_exceeded"
	EventRunFinished          EventType = "run_finished"
	EventRunFailed            EventType = "run_failed"
	EventRunCancelled         EventType = "run_cancelled"
	// EventRunInterrupted is emitted when the editor server drains in-flight
	// runs during shutdown (SIGTERM, watchexec rebuild, etc). The companion
	// run.json status flips to failed_resumable so the next boot can offer
	// one-click resume — distinct from EventRunCancelled (user-initiated).
	EventRunInterrupted   EventType = "run_interrupted"
	EventDelegateStarted  EventType = "delegate_started"
	EventDelegateFinished EventType = "delegate_finished"
	EventDelegateError    EventType = "delegate_error"
	EventDelegateRetry    EventType = "delegate_retry"
)

type Interaction

type Interaction struct {
	ID          string                 `json:"id"`
	RunID       string                 `json:"run_id"`
	NodeID      string                 `json:"node_id"`
	RequestedAt time.Time              `json:"requested_at"`
	AnsweredAt  *time.Time             `json:"answered_at,omitempty"`
	Questions   map[string]interface{} `json:"questions,omitempty"`
	Answers     map[string]interface{} `json:"answers,omitempty"`
}

Interaction records a human pause/resume exchange.

type Run

type Run struct {
	FormatVersion int    `json:"format_version"`
	ID            string `json:"id"`
	// Name is a deterministic, human-friendly label derived from
	// (file_path + run_id) at run creation. Display-only — the
	// canonical identifier remains ID. Empty for runs persisted
	// before this field existed; surfaces should fall back to
	// WorkflowName in that case.
	Name          string                 `json:"name,omitempty"`
	WorkflowName  string                 `json:"workflow_name"`
	WorkflowHash  string                 `json:"workflow_hash,omitempty"` // SHA-256 of the .iter source at run start
	FilePath      string                 `json:"file_path,omitempty"`     // absolute .iter source path captured at launch (resume without re-supplying file)
	Status        RunStatus              `json:"status"`
	Inputs        map[string]interface{} `json:"inputs,omitempty"`
	CreatedAt     time.Time              `json:"created_at"`
	UpdatedAt     time.Time              `json:"updated_at"`
	FinishedAt    *time.Time             `json:"finished_at,omitempty"`
	Error         string                 `json:"error,omitempty"`
	Checkpoint    *Checkpoint            `json:"checkpoint,omitempty"`
	ArtifactIndex map[string]int         `json:"artifact_index,omitempty"` // node_id → latest version written
	// WorkDir is the absolute filesystem path the run executes in
	// (the per-run git worktree when Worktree is true, otherwise the
	// engine's resolved cwd at start). Persisted so editor surfaces
	// (e.g. modified-files panel) can locate the run's working tree
	// without re-deriving it from the runtime.
	WorkDir string `json:"work_dir,omitempty"`
	// Worktree is true when WorkDir was created by `worktree: auto`,
	// false when WorkDir is the inherited cwd.
	Worktree bool `json:"worktree,omitempty"`
	// RepoRoot is the absolute path of the main git repository the
	// worktree was forked from. Used by the editor's modified-files
	// panel after the worktree directory is gc'd to compute the diff
	// against FinalCommit (the persistent branch lives in this repo's
	// shared .git). Empty for non-worktree runs.
	RepoRoot string `json:"repo_root,omitempty"`
	// BaseCommit is the SHA of HEAD on the main repo at the moment the
	// worktree was created — i.e. the run's baseline. The post-finalization
	// diff renders FinalCommit relative to this commit. Empty for non-
	// worktree runs and for legacy runs that predate this field.
	BaseCommit string `json:"base_commit,omitempty"`
	// FinalCommit is the SHA the worktree's HEAD pointed to when the
	// run finished successfully, captured before the worktree was torn
	// down. Empty when the run made no commits, didn't use a worktree,
	// or didn't finish.
	FinalCommit string `json:"final_commit,omitempty"`
	// FinalBranch is the persistent branch name created on
	// FinalCommit (default "iterion/run/<friendly-name>", overridable
	// via launch params). Acts as a GC guard so the commits remain
	// reachable after the worktree directory is removed.
	FinalBranch string `json:"final_branch,omitempty"`
	// MergedInto is the branch the engine fast-forwarded to FinalCommit
	// after the run, or empty when the FF was skipped (dirty main,
	// non-FF, branch divergence, opt-out, or detached HEAD at start).
	MergedInto string `json:"merged_into,omitempty"`
}

Run is the top-level metadata for a single workflow invocation.

type RunLock

type RunLock interface {
	// Unlock releases the lock. Must be called when done.
	Unlock() error
}

RunLock represents an exclusive advisory lock on a run directory. The lock prevents concurrent processes from modifying the same run.

type RunStatus

type RunStatus string

RunStatus represents the current state of a run.

const (
	RunStatusRunning            RunStatus = "running"
	RunStatusPausedWaitingHuman RunStatus = "paused_waiting_human"
	RunStatusFinished           RunStatus = "finished"
	RunStatusFailed             RunStatus = "failed"
	RunStatusFailedResumable    RunStatus = "failed_resumable"
	RunStatusCancelled          RunStatus = "cancelled"
)

type RunStore

type RunStore struct {
	// contains filtered or unexported fields
}

RunStore manages the on-disk layout:

<root>/runs/<run_id>/run.json
<root>/runs/<run_id>/events.jsonl
<root>/runs/<run_id>/artifacts/<node>/<version>.json
<root>/runs/<run_id>/interactions/<interaction_id>.json

func New

func New(root string, opts ...StoreOption) (*RunStore, error)

New creates a RunStore rooted at the given directory. The directory is created if it does not exist.

func (*RunStore) AppendEvent

func (s *RunStore) AppendEvent(runID string, evt Event) (*Event, error)

AppendEvent appends an event to the run's events.jsonl. Seq and Timestamp are set automatically. The entire operation is serialized under mu to prevent interleaved writes from concurrent branches. The sequence counter is only incremented after a successful write to avoid gaps in the event stream.

func (*RunStore) CreateRun

func (s *RunStore) CreateRun(id, workflowName string, inputs map[string]interface{}) (*Run, error)

CreateRun persists a new run with status "running".

func (*RunStore) FailRunResumable

func (s *RunStore) FailRunResumable(id string, cp *Checkpoint, runErr string) error

FailRunResumable atomically sets the checkpoint, error message, and status to failed_resumable in a single write, enabling resume from the last successfully completed node.

func (*RunStore) ListArtifactVersions

func (s *RunStore) ListArtifactVersions(runID, nodeID string) ([]ArtifactVersionInfo, error)

ListArtifactVersions enumerates the persisted artifact versions for a node in ascending order, returning each version's mtime without decoding the body. Returns (nil, nil) when the node has no artifact directory (a node that hasn't published anything yet).

func (*RunStore) ListInteractions

func (s *RunStore) ListInteractions(runID string) ([]string, error)

ListInteractions returns all interaction IDs for a run.

runID is sanitised before path-joining (see LoadRun for rationale).

func (*RunStore) ListRuns

func (s *RunStore) ListRuns() ([]string, error)

ListRuns returns the IDs of all persisted runs.

func (*RunStore) LoadArtifact

func (s *RunStore) LoadArtifact(runID, nodeID string, version int) (*Artifact, error)

LoadArtifact reads a specific artifact version.

func (*RunStore) LoadEvents

func (s *RunStore) LoadEvents(runID string) ([]*Event, error)

LoadEvents reads all events for a run in sequence order.

runID is sanitised before path-joining (see LoadRun for rationale).

func (*RunStore) LoadEventsRange

func (s *RunStore) LoadEventsRange(runID string, from, to int64, limit int) ([]*Event, error)

LoadEventsRange streams events with seq in [from, to) (to == 0 means "no upper bound") and caps the returned slice at limit (limit == 0 means "no cap"). Designed for paginating long events.jsonl tails without allocating the whole file: a 200MB events.jsonl with limit= 5000 returns at most 5000 entries instead of materialising every event in memory just to slice the head.

The caller can detect "more available" by passing limit and checking whether len(out) == limit; the next page starts at out[len(out)-1].Seq+1.

func (*RunStore) LoadInteraction

func (s *RunStore) LoadInteraction(runID, interactionID string) (*Interaction, error)

LoadInteraction reads a specific interaction by ID.

func (*RunStore) LoadLatestArtifact

func (s *RunStore) LoadLatestArtifact(runID, nodeID string) (*Artifact, error)

LoadLatestArtifact returns the artifact with the highest version for a node. It first checks the run's artifact index for an O(1) lookup and falls back to a directory scan for backward compatibility with older run formats.

func (*RunStore) LoadRun

func (s *RunStore) LoadRun(id string) (*Run, error)

LoadRun reads run.json for the given run ID.

The run ID is sanitised before path-joining so a hostile or network-sourced ID cannot escape the store root. The write side (CreateRun/WriteArtifact/WriteInteraction) already sanitises its inputs; the read paths must do the same so the defence is symmetric.

As a one-shot migration step, a legacy run with empty Name gets a deterministic friendly label generated and persisted on read. After the first call the field is on disk; subsequent LoadRuns skip the fixup. The seed mirrors the CLI/launch path (file_path:run_id) so the backfill produces the exact name a new launch would have produced.

func (*RunStore) LockRun

func (s *RunStore) LockRun(runID string) (RunLock, error)

LockRun acquires an exclusive file-based lock for the given run. On Unix, it uses flock(2) which is automatically released on process crash. On Windows, it uses a lockfile with PID-based stale detection.

The lock is advisory: it protects against concurrent iterion processes sharing the same store directory. It does not replace the internal sync.Mutex which handles intra-process concurrency.

Limitations: does not work over NFS (flock is local-only on Linux).

func (*RunStore) PIDFilePath

func (s *RunStore) PIDFilePath(runID string) string

PIDFilePath returns the canonical path to the .pid file for runID. The file may or may not exist.

func (*RunStore) PauseRun

func (s *RunStore) PauseRun(id string, cp *Checkpoint) error

PauseRun atomically sets the checkpoint and updates the status to paused in a single write, preventing inconsistency if one of two separate operations were to fail.

func (*RunStore) ReadPIDFile

func (s *RunStore) ReadPIDFile(runID string) (int, error)

ReadPIDFile returns the PID recorded for runID, or 0 + nil if no .pid file exists. A malformed file returns 0 + an error so callers can decide whether to clean it up.

func (*RunStore) RemovePIDFile

func (s *RunStore) RemovePIDFile(runID string) error

RemovePIDFile deletes the run's .pid file. Idempotent — missing files are not errors. Called by the runner subprocess on exit and by the reconciler when it detects a dead PID.

func (*RunStore) Root

func (s *RunStore) Root() string

Root returns the store root directory.

func (*RunStore) SaveCheckpoint

func (s *RunStore) SaveCheckpoint(id string, cp *Checkpoint) error

SaveCheckpoint persists a checkpoint on a paused run. Protected by mu to prevent concurrent read-modify-write races.

func (*RunStore) SaveRun

func (s *RunStore) SaveRun(r *Run) error

SaveRun persists the run metadata to disk.

func (*RunStore) ScanEvents

func (s *RunStore) ScanEvents(runID string, visit func(*Event) bool) error

ScanEvents streams events for a run through visit, in file order, and stops as soon as visit returns false. It allocates one *Event per scanned line (decoded into a fresh struct) so the caller can retain references freely, but it never materialises the full events.jsonl slice — callers searching for a single match (e.g. node-touched filter) or paginating a window can short-circuit without paying the O(file) memory of LoadEvents.

Errors decoding a single line are skipped (consistent with LoadEvents). The returned error reflects file-open / scanner-buffer failures, not per-line parse errors.

runID is sanitised before path-joining (see LoadRun for rationale).

func (*RunStore) UpdateRunStatus

func (s *RunStore) UpdateRunStatus(id string, status RunStatus, runErr string) error

UpdateRunStatus updates the status (and optional error) of a run. Protected by mu to prevent concurrent read-modify-write races.

func (*RunStore) WriteArtifact

func (s *RunStore) WriteArtifact(a *Artifact) error

WriteArtifact persists an artifact for a node at the given version and updates the run's artifact index for O(1) latest-version lookups.

func (*RunStore) WriteInteraction

func (s *RunStore) WriteInteraction(i *Interaction) error

WriteInteraction persists a human interaction.

func (*RunStore) WritePIDFile

func (s *RunStore) WritePIDFile(runID string, pid int) error

WritePIDFile writes pid to the run's .pid file. The directory is created if it doesn't exist. Writes are atomic via writeFileAtomic (tmp + fsync + rename) so a crashed writer cannot leave a half-written .pid that would confuse the reconciler.

type StoreOption

type StoreOption func(*RunStore)

StoreOption configures a RunStore.

func WithLogger

func WithLogger(l *iterlog.Logger) StoreOption

WithLogger sets a leveled logger on the store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL