runview

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: MIT Imports: 31 Imported by: 0

Documentation

Overview

Package runview exposes a service-layer view of iterion runs for programmatic consumers — the HTTP server and the future "run console" UI. It contains the canonical Launch / Resume / Cancel / Snapshot implementations that the CLI also delegates to, along with a pure reducer that derives a per-execution snapshot from the persisted event stream.

Index

Constants

View Source
const IRWorkflowEndpointPath = "/api/runs/{id}/workflow"

IRWorkflowEndpointPath is exposed for symmetry with other runview helpers; the server wires the handler manually.

View Source
const MainBranch = "main"

MainBranch is the synthetic branch name used when an event carries no explicit branch_id (single-threaded execution before any fan-out).

View Source
const MaxEventsPerPage = 5000

MaxEventsPerPage caps the number of events any single LoadEvents response materialises. A 200MB events.jsonl from a long-running run with hundreds of LLM I/O events would otherwise allocate the full file into memory on every reconnect / scrubber drag — exhausting memory in typical devcontainers. Callers paginate by passing the next page's `from` as previous_last.Seq+1; len(out) == cap means "more available".

View Source
const NoEventsSeq int64 = -1

SnapshotBuilder is a stateful incremental reducer: feed it events in sequence order via Apply, and read out the current RunSnapshot via Snapshot. The same builder is used for cold reads (replay every event from disk) and for live subscribers (replay history then accept new events as they arrive).

The reducer is deterministic: BuildSnapshot(run, events) always produces the same output for the same input, which lets the frontend derive the same per-seq snapshots locally to power the time-travel scrubber. NoEventsSeq is the sentinel value of RunSnapshot.LastSeq when no events have been applied yet. Distinguishing "empty stream" from "one event at seq 0" matters for WS catch-up dedup: we must not drop the first live event after subscribing to a fresh run.

Variables

View Source
var ErrRunNotActive = errors.New("runview: run is not active in this process")

ErrRunNotActive is returned when a manager operation references a run ID that has no in-process handle (either it was never launched in this process or it has already terminated).

Functions

func BuildExecutor

func BuildExecutor(spec ExecutorSpec) (*model.ClawExecutor, error)

BuildExecutor wires up the default ClawExecutor: registry, default delegate registry, store-backed event hooks (chained with spec.ExtraHooks), tool registry with claw built-ins, MCP catalog (when the workflow declares servers), and the per-run plan-mode state directory. Used by both the CLI and the HTTP service so the two transports stay aligned on tool policies, MCP auth, and executor lifecycle.

func CompileWorkflow

func CompileWorkflow(path string) (*ir.Workflow, error)

CompileWorkflow parses and compiles a .iter file at path. It returns the compiled workflow or an error with the first parse / compile diagnostic encountered.

MCP server resolution is finalised against the file's directory so relative `command` paths in `mcp_server` blocks resolve correctly.

func CompileWorkflowWithHash

func CompileWorkflowWithHash(path string) (*ir.Workflow, string, error)

CompileWorkflowWithHash is CompileWorkflow plus a SHA-256 hash of the raw source bytes. The hash is persisted in run.json so that resume can detect when the .iter file has changed under it (and require --force to proceed). Use this everywhere a workflow is loaded for execution; CompileWorkflow is for static-only callers (validate, diagram).

func MCPHealthCheck

func MCPHealthCheck(ctx context.Context, executor runtime.NodeExecutor, servers []string) error

MCPHealthCheck runs the executor's optional MCP health-check implementation. The `iterion run` and `iterion resume` paths invoke this just before eng.Run / eng.Resume so a misconfigured catalog surfaces an error before any node is dispatched.

func MakeExecutionID

func MakeExecutionID(branch, nodeID string, iteration int) string

MakeExecutionID composes a stable ID from (branch, node, iteration). The format is documented in the WS protocol; clients depend on it for tab/anchor URLs and for matching events to executions. Empty branch is normalised to MainBranch.

func ParseExecutionID

func ParseExecutionID(id string) (branch, nodeID string, iteration int, err error)

ParseExecutionID is the inverse of MakeExecutionID. It returns the branch, node ID, and iteration. Returns an error if the input is not a well-formed exec ID.

Types

type ArtifactSummary

type ArtifactSummary struct {
	Version   int       `json:"version"`
	WrittenAt time.Time `json:"written_at"`
}

ArtifactSummary is the lightweight shape returned by ListArtifacts — just enough for the UI to populate a version selector without reading every artifact body.

type EventBroker

type EventBroker struct {
	// contains filtered or unexported fields
}

EventBroker is the in-process fan-out for persistent run events. The runtime publishes via runtime.WithEventObserver(broker.Publish); WS handlers subscribe via broker.Subscribe(runID).

The broker does NOT read from disk. Callers that need historical events (catch-up replay) should LoadEvents(runID) first, then drain the live channel and dedup by seq — see Service.Subscribe for the canonical recipe.

func NewEventBroker

func NewEventBroker() *EventBroker

NewEventBroker creates an empty broker.

func (*EventBroker) CloseRun

func (b *EventBroker) CloseRun(runID string)

CloseRun terminates every subscription for runID. Used at run completion to signal that no further events will arrive on this stream. After CloseRun the broker forgets about the run; new Subscribe calls would receive a fresh empty subscription that never fires (callers should use cold reads for terminated runs).

func (*EventBroker) Publish

func (b *EventBroker) Publish(evt store.Event)

Publish fans out an event to every active subscriber for evt.RunID. Safe for concurrent use. This is the function passed to runtime.WithEventObserver. Slow subscribers are dropped lossily — the publisher never blocks (see subscriberBufferSize for the rationale).

The read lock is held through the entire fan-out, including the non-blocking channel sends. This serialises against Cancel and CloseRun (which take the write lock to close subscriber channels), preventing the "send on closed channel" race that would otherwise occur if Cancel ran between Publish's slice copy and its send. Because every send is wrapped in a `select { default }`, the read lock is held only briefly even under buffer pressure.

Publish takes a value to match the runtime.WithEventObserver signature; we copy the pointer-shaped fields on the way out so downstream consumers receive a stable snapshot of the event even if the runtime mutated its local copy after emit.

func (*EventBroker) Subscribe

func (b *EventBroker) Subscribe(runID string) *EventSubscription

Subscribe registers a new subscriber for runID and returns its handle. The subscription only delivers events received AFTER this call returns; for catch-up + live replay use Service.Subscribe.

func (*EventBroker) SubscriberCount

func (b *EventBroker) SubscriberCount(runID string) int

SubscriberCount returns the live subscriber count for runID. Useful for tests and metrics.

type EventSubscription

type EventSubscription struct {
	C <-chan *store.Event
	// contains filtered or unexported fields
}

EventSubscription is the public handle returned to callers. Receive from C, call Cancel to unsubscribe (idempotent).

func (*EventSubscription) Cancel

func (s *EventSubscription) Cancel()

Cancel unregisters the subscription and closes the channel. Idempotent — safe to call multiple times.

func (*EventSubscription) Drops

func (s *EventSubscription) Drops() int

Drops reports the number of events that were dropped because this subscriber's buffer was full. The transport layer can surface this to the client so the UI knows to re-subscribe with from_seq.

type ExecStatus

type ExecStatus string

ExecStatus is the lifecycle state of a single execution (one branch × one loop iteration of an IR node).

const (
	ExecStatusRunning  ExecStatus = "running"
	ExecStatusFinished ExecStatus = "finished"
	ExecStatusFailed   ExecStatus = "failed"
	ExecStatusPaused   ExecStatus = "paused_waiting_human"
	ExecStatusSkipped  ExecStatus = "skipped"
)

type ExecutionState

type ExecutionState struct {
	ExecutionID         string     `json:"execution_id"`
	IRNodeID            string     `json:"ir_node_id"`
	BranchID            string     `json:"branch_id"`
	LoopIteration       int        `json:"loop_iteration"`
	Status              ExecStatus `json:"status"`
	Kind                string     `json:"kind,omitempty"` // node kind (Agent / Judge / Router / ...)
	StartedAt           *time.Time `json:"started_at,omitempty"`
	FinishedAt          *time.Time `json:"finished_at,omitempty"`
	LastArtifactVersion *int       `json:"last_artifact_version,omitempty"`
	CurrentEventSeq     int64      `json:"current_event_seq"`
	Error               string     `json:"error,omitempty"`
	// FirstSeq / LastSeq mark the persisted event range that produced
	// this execution row, allowing clients to scrub directly to the
	// segment of events.jsonl describing this execution.
	FirstSeq int64 `json:"first_seq"`
	LastSeq  int64 `json:"last_seq"`
}

ExecutionState is one rendered row in the dynamic execution graph: a concrete invocation of an IR node within a specific branch and loop iteration. The same IR node may appear N times across branches and loop iterations — each gets its own ExecutionState with a distinct ExecutionID.

type ExecutorSpec

type ExecutorSpec struct {
	Workflow *ir.Workflow
	Vars     map[string]string
	Store    model.EventEmitter // typically *store.RunStore
	RunID    string
	Logger   *iterlog.Logger
	StoreDir string
	// ExtraHooks are merged into the store-backed event hooks. Pass
	// the prometheus exporter's hooks here (cli does this); the HTTP
	// service can pass nothing or a future broker-side hook chain.
	ExtraHooks []model.EventHooks
}

ExecutorSpec carries the inputs required to construct a default ClawExecutor. Splitting the args into a struct keeps cli/run.go and the HTTP service layer in sync as new options accrue (compactor callbacks, recipe overrides, etc.).

type HandleSnapshot

type HandleSnapshot struct {
	RunID  string
	Cancel context.CancelFunc
	Done   <-chan struct{}
	PID    int // 0 for in-process; >0 for detached subprocess
}

HandleSnapshot is one row in the Snapshot view: the run ID plus the in-memory primitives Drain needs (cancel + done) and the optional PID so callers can distinguish in-process from detached runners.

type LaunchResult

type LaunchResult struct {
	RunID string
	// Done is closed when the run goroutine exits (success or
	// failure). Callers that want to wait can `<-result.Done`.
	Done <-chan struct{}
}

LaunchResult is returned by Launch on success.

type LaunchSpec

type LaunchSpec struct {
	FilePath string            // absolute .iter path; sandbox check is the caller's job
	Vars     map[string]string // --var-style overrides
	RunID    string            // optional explicit ID; auto-generated when empty
	Timeout  time.Duration     // 0 disables
	// MergeInto controls the worktree-finalization fast-forward target
	// for `worktree: auto` runs. "" or "current" → FF the user's
	// currently-checked-out branch (default); "none" → skip FF;
	// <branch-name> → FF that branch (only honoured when it matches
	// the currently-checked-out branch).
	MergeInto string
	// BranchName overrides the default storage branch
	// `iterion/run/<friendly>` created on the worktree's HEAD.
	BranchName string
}

LaunchSpec describes a workflow invocation. Mirrors the inputs of `iterion run` but framed as data so HTTP handlers (and any future programmatic caller) construct it without going through cobra flags.

type ListFilter

type ListFilter struct {
	Status   store.RunStatus // exact match
	Workflow string          // exact match on WorkflowName
	Since    time.Time       // UpdatedAt >= Since
	Limit    int             // 0 = no limit
	// Node filters runs to those whose persisted events include at
	// least one node_started for this IR node ID. Used by the editor
	// to surface "this node was touched by N runs" without scanning
	// every run on the client. Scanning happens at request time —
	// fine for hundreds of runs; wire an inverted index later if the
	// store grows past low thousands.
	Node string
}

ListFilter scopes a List request. Empty fields mean no filter.

type LogSlice

type LogSlice struct {
	BestEffort bool      `json:"best_effort"`
	StartTime  time.Time `json:"start_time"`
	EndTime    time.Time `json:"end_time"`
	StartByte  int64     `json:"start_byte,omitempty"`
	EndByte    int64     `json:"end_byte,omitempty"`
	Truncated  bool      `json:"truncated,omitempty"`
	Notes      []string  `json:"notes,omitempty"`
	Body       string    `json:"body,omitempty"`
}

LogSlice is a best-effort timestamp-windowed slice of run.log for a single ExecutionState. The free-form log format the iterion logger writes (HH:MM:SS in the host's local TZ + emoji + free text) carries no node ID, so the slice is derived from the execution's [StartedAt, FinishedAt] range. Multi-branch concurrent runs may interleave lines from sibling executions in this window — consumers should treat the slice as a hint, not ground truth.

func BuildLogSlice

func BuildLogSlice(storeDir, runID string, exec *ExecutionState, tail int) *LogSlice

BuildLogSlice extracts the timestamp-windowed slice of run.log matching exec. The window is [exec.StartedAt, exec.FinishedAt]; if FinishedAt is nil, the upper bound is open (read to EOF).

The log timestamp prefix is interpreted in time.Local (matching what the iterion logger writes via time.Now().Format("15:04:05")), so the reference date for each line is the local-TZ calendar date of the execution's start. We track day rollovers within the file by detecting backwards jumps in HH:MM:SS (e.g. 23:59:59 → 00:00:00) and incrementing the working date.

When tail > 0, the matched region is kept in a rolling tail buffer (last tail lines) instead of accumulating the full window in memory and trimming at the end. Truncation is reported in the result.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager owns the lifecycle of in-process workflow goroutines. A run is "active" between Register and Deregister; Cancel signals it to stop; Stop drains every active run on server shutdown.

func NewManager

func NewManager() *Manager

NewManager creates an empty manager.

func (*Manager) Active

func (m *Manager) Active(runID string) bool

Active reports whether a handle exists for runID.

func (*Manager) ActiveRuns

func (m *Manager) ActiveRuns() []string

ActiveRuns returns the IDs of every run currently held by the manager. Order is undefined.

func (*Manager) Cancel

func (m *Manager) Cancel(runID string) error

Cancel signals the engine goroutine for runID to stop. The goroutine observes ctx.Done() and translates it into a checkpoint + RunCancelled event. Returns ErrRunNotActive if no handle exists.

func (*Manager) Deregister

func (m *Manager) Deregister(runID string)

Deregister removes the handle and closes its done channel. Called by the goroutine on its way out, regardless of success/failure. Idempotent.

func (*Manager) Register

func (m *Manager) Register(parent context.Context, runID string) (context.Context, error)

Register installs a new run handle and returns the cancellable ctx the engine goroutine should use. Register MUST be called before spawning the goroutine — otherwise an immediate Cancel could miss the registration and the run would be uncancellable.

The returned ctx inherits from parent so any parent cancellation (e.g. server shutdown) propagates as well.

Returns an error if the manager has been Stop'd or a handle is already registered for runID (defensive — Service.Launch generates IDs that should be unique).

func (*Manager) RegisterDetached

func (m *Manager) RegisterDetached(runID string, pid int, cancel context.CancelFunc, done chan struct{}) error

RegisterDetached installs a handle for a runner running as a detached subprocess (PID > 0). Cancel is the closure that the caller wants invoked when Manager.Cancel(runID) is called — typically `func() { syscall.Kill(-pid, syscall.SIGTERM) }`. The caller is responsible for closing done when the runner exits.

Unlike Register, this method does NOT create a context — detached runners own their own context inside the spawned process, so the server-side handle has no ctx to propagate.

func (*Manager) Snapshot

func (m *Manager) Snapshot() []HandleSnapshot

Snapshot returns a point-in-time copy of every active handle. Drain uses this to issue cancel + wait without holding the manager's lock across the wait (which would deadlock with concurrent Deregister).

func (*Manager) Stop

func (m *Manager) Stop(ctx context.Context)

Stop cancels every active run and waits for them to drain. Used during server shutdown — we want every goroutine to reach its failRunWithCheckpoint path so the on-disk checkpoint is preserved for resume. After ctx expires, any still-running goroutine is forcibly forgotten (the goroutine itself keeps running but the manager drops its handle); callers should accept that this drops a small amount of in-flight progress in favour of bounded shutdown latency.

func (*Manager) Wait

func (m *Manager) Wait(ctx context.Context, runID string) error

Wait blocks until the goroutine for runID completes, or until ctx is done. Returns ErrRunNotActive immediately if no handle exists.

type ResumeSpec

type ResumeSpec struct {
	RunID    string
	FilePath string                 // .iter file (loaded fresh; must match the run's WorkflowHash unless Force)
	Answers  map[string]interface{} // answers for human nodes; ignored for failed_resumable
	Force    bool                   // skip workflow hash check
	Timeout  time.Duration          // 0 disables
}

ResumeSpec describes a resume request.

type RunHeader

type RunHeader struct {
	ID string `json:"id"`
	// Name is the deterministic, human-friendly label for the run.
	// Empty for legacy runs persisted before this field existed.
	Name         string                 `json:"name,omitempty"`
	WorkflowName string                 `json:"workflow_name"`
	WorkflowHash string                 `json:"workflow_hash,omitempty"`
	FilePath     string                 `json:"file_path,omitempty"`
	Status       store.RunStatus        `json:"status"`
	Inputs       map[string]interface{} `json:"inputs,omitempty"`
	CreatedAt    time.Time              `json:"created_at"`
	UpdatedAt    time.Time              `json:"updated_at"`
	FinishedAt   *time.Time             `json:"finished_at,omitempty"`
	Error        string                 `json:"error,omitempty"`
	Checkpoint   *store.Checkpoint      `json:"checkpoint,omitempty"`
	// WorkDir is the absolute filesystem path the run executed in
	// (per-run worktree when Worktree is true, otherwise inherited cwd).
	// Empty for runs created before this field was persisted; the editor
	// hides the modified-files panel in that case.
	WorkDir string `json:"work_dir,omitempty"`
	// Worktree is true when WorkDir was created by `worktree: auto`.
	Worktree bool `json:"worktree,omitempty"`
	// Worktree finalization summary (only populated for `worktree:
	// auto` runs that reached a clean exit). The editor uses these to
	// surface the persistent branch and FF status in the run header.
	FinalCommit string `json:"final_commit,omitempty"`
	FinalBranch string `json:"final_branch,omitempty"`
	MergedInto  string `json:"merged_into,omitempty"`
}

RunHeader is the run-level metadata embedded in a snapshot.

type RunLogBuffer

type RunLogBuffer struct {
	// contains filtered or unexported fields
}

RunLogBuffer captures per-run logger output: io.Writer for io.MultiWriter composition, a bounded ring for catch-up replay, fan-out to live subscribers, and an optional file tee for post-mortem inspection. Thread-safe.

func NewRunLogBuffer

func NewRunLogBuffer(filePath string) (*RunLogBuffer, error)

NewRunLogBuffer creates an empty buffer. Pass filePath to also persist writes to disk; pass "" to keep the buffer purely in-memory. The returned buffer is always usable; fileErr is non-nil when the optional disk persistence couldn't be set up — callers should log a warning and proceed (the in-memory ring is the source of truth for WS subscribers; the file is best-effort post-mortem).

O_APPEND so a resumed run extends the existing file rather than truncating it.

func (*RunLogBuffer) Close

func (b *RunLogBuffer) Close()

Close terminates all live subscriptions and closes the persisted file. Late writes from goroutines racing run completion are silently dropped to avoid surfacing benign races as errors.

func (*RunLogBuffer) Snapshot

func (b *RunLogBuffer) Snapshot(from int64) (offset int64, data []byte, total int64)

Snapshot returns the bytes available with offset >= from. The first returned int is the actual offset of the returned bytes (== from when from is in-range, == b.start when from is older than the retained tail). The second return is total bytes ever written, so callers can detect whether more will arrive.

Pass from=0 for "everything we still have".

func (*RunLogBuffer) Subscribe

func (b *RunLogBuffer) Subscribe() *RunLogSubscription

Subscribe registers a live consumer. Callers should Snapshot(from) historical bytes BEFORE Subscribe to avoid racing the next Write, then drain live chunks dropping any whose offset overlaps the snapshot tail.

func (*RunLogBuffer) Total

func (b *RunLogBuffer) Total() int64

Total returns the total bytes ever written.

func (*RunLogBuffer) Write

func (b *RunLogBuffer) Write(p []byte) (int, error)

Write implements io.Writer.

io.Writer's contract permits the caller to reuse p after Write returns, so we copy bytes before retaining them — the ring (via append) and the subscriber channel each get their own copy.

type RunLogSubscription

type RunLogSubscription struct {
	C <-chan runLogChunk
	// contains filtered or unexported fields
}

RunLogSubscription mirrors EventSubscription: receive on C, call Cancel to unregister. Drops counts chunks lost to a full buffer; clients that see drops > 0 should re-anchor via the REST endpoint.

func (*RunLogSubscription) Cancel

func (s *RunLogSubscription) Cancel()

Cancel unregisters the subscription. Idempotent.

func (*RunLogSubscription) Drops

func (s *RunLogSubscription) Drops() int

type RunSnapshot

type RunSnapshot struct {
	Run        RunHeader        `json:"run"`
	Executions []ExecutionState `json:"executions"`
	LastSeq    int64            `json:"last_seq"`
}

RunSnapshot is the structured view returned by GET /api/runs/{id} and pushed to WS subscribers on connect. It bundles a RunHeader (slowly- changing run-level metadata) with the dynamic ExecutionState rows derived by folding the run's events.

func BuildSnapshot

func BuildSnapshot(s *store.RunStore, runID string) (*RunSnapshot, error)

BuildSnapshot is the cold-read convenience: load run.json + events from the store, then fold them into a RunSnapshot. Events are streamed via ScanEvents to keep memory bounded for long runs.

type RunSummary

type RunSummary struct {
	ID string `json:"id"`
	// Name is the deterministic, human-friendly label for the run.
	// Empty for legacy runs persisted before this field existed.
	Name         string          `json:"name,omitempty"`
	WorkflowName string          `json:"workflow_name"`
	Status       store.RunStatus `json:"status"`
	FilePath     string          `json:"file_path,omitempty"`
	CreatedAt    time.Time       `json:"created_at"`
	UpdatedAt    time.Time       `json:"updated_at"`
	FinishedAt   *time.Time      `json:"finished_at,omitempty"`
	Error        string          `json:"error,omitempty"`
	// Active reports whether the run is currently held by this
	// process's manager. A run with status "running" but Active=false
	// belongs to another process or to a previous boot — Cancel won't
	// reach it from here.
	Active bool `json:"active"`
	// Worktree finalization summary (only populated for `worktree:
	// auto` runs that reached a clean exit). See store.Run for the
	// full semantics.
	FinalCommit string `json:"final_commit,omitempty"`
	FinalBranch string `json:"final_branch,omitempty"`
	MergedInto  string `json:"merged_into,omitempty"`
}

RunSummary is the lightweight per-row shape returned by List. Heavier fields (events, artifacts, checkpoint detail) live in RunSnapshot — call Snapshot for the full view.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the canonical façade over runtime + store + broker + manager. The HTTP server, the editor, and (optionally) the CLI all route through here — keeping a single source of truth for run lifecycle, validation, and event fan-out.

func NewService

func NewService(storeDir string, opts ...ServiceOption) (*Service, error)

NewService constructs a Service rooted at storeDir.

func (*Service) Broker

func (s *Service) Broker() *EventBroker

Broker exposes the event broker for transports that need to subscribe directly (the WS handler).

func (*Service) Cancel

func (s *Service) Cancel(runID string) error

Cancel signals an active run to stop. Returns ErrRunNotActive if the run is not held by this process — cross-process cancel is not supported in the current design.

func (*Service) Drain

func (s *Service) Drain(ctx context.Context)

Drain performs a graceful shutdown of every active run:

  1. Sets the draining flag so subsequent Launch / Resume return runtime.ErrServerDraining.
  2. Snapshots active handles and cancels each one.
  3. Waits on each handle's done channel up to ctx's deadline.
  4. For every run that was active at the moment of Drain — whether its goroutine exited cleanly within the deadline or not — emits EventRunInterrupted and flips the persisted status to failed_resumable with reason "server drained".

The status flip happens regardless of clean exit so the on-disk state is unambiguous; the runtime's own failure event (typically EventRunFailed with cause "context canceled") may also land in the same events.jsonl, which is acceptable telemetry noise — both events accurately describe what happened.

Drain is intended to be called once during process shutdown. After it returns, the service should not be used to launch new work.

func (*Service) GetLogBuffer

func (s *Service) GetLogBuffer(runID string) *RunLogBuffer

GetLogBuffer returns the live log buffer for runID, or nil if the run is not held by this process. Valid only while the run is active; the buffer is Close'd and removed when the run goroutine exits.

func (*Service) Launch

func (s *Service) Launch(parent context.Context, spec LaunchSpec) (*LaunchResult, error)

Launch starts a workflow asynchronously and returns once the run handle has been registered with the manager (i.e. Cancel will work from the moment Launch returns nil error).

The caller is expected to have already validated spec.FilePath against any sandbox / origin policy. The service does not double- check origins — its job is lifecycle, not authentication.

func (*Service) List

func (s *Service) List(f ListFilter) ([]RunSummary, error)

List returns every run in the store filtered by f. The result is sorted by CreatedAt descending (newest first); Limit truncates after sort.

func (*Service) ListArtifacts

func (s *Service) ListArtifacts(runID, nodeID string) ([]ArtifactSummary, error)

ListArtifacts enumerates the persisted artifacts for one node by reading the artifact directory directly — avoids the O(versions) JSON-decode of the full bodies that LoadArtifact would do just to extract the version number. Returns the versions in ascending order.

func (*Service) LoadArtifact

func (s *Service) LoadArtifact(runID, nodeID string, version int) (*store.Artifact, error)

LoadArtifact returns one persisted artifact body.

func (*Service) LoadEvents

func (s *Service) LoadEvents(runID string, from, to int64) ([]*store.Event, error)

LoadEvents returns events in [from, to] (inclusive on from, exclusive on to), capped at MaxEventsPerPage. Pass to=0 for "no upper bound". Used by the scrubber to lazy-load segments of a long run.

Streams via store.LoadEventsRange so we never materialise more than the page-cap worth of events at once; callers paginate.

func (*Service) LoadRun

func (s *Service) LoadRun(runID string) (*store.Run, error)

LoadRun returns the persisted Run metadata for runID.

func (*Service) LoadWireWorkflow

func (s *Service) LoadWireWorkflow(runID string) (*WireWorkflow, error)

LoadWireWorkflow recompiles the .iter source for runID and projects the resulting IR into WireWorkflow shape. Results are memoised by (filePath, content hash) so repeated calls for the same revision don't re-parse. The stale_hash flag is derived by comparing the freshly-computed hash against the one persisted in run.json at launch.

func (*Service) Resume

func (s *Service) Resume(parent context.Context, spec ResumeSpec) (*LaunchResult, error)

Resume re-enters a paused, failed_resumable, or cancelled run with optional answers. The .iter source must be supplied (and must hash- match the original unless spec.Force).

func (*Service) Snapshot

func (s *Service) Snapshot(runID string) (*RunSnapshot, error)

Snapshot returns the structured RunSnapshot for runID by folding the persisted events through the canonical reducer.

func (*Service) Stop

func (s *Service) Stop(ctx context.Context)

Stop cancels every active run and waits for their goroutines to finish, but does not flip persisted statuses or emit any observability event. Use Stop in tests or for a quiet teardown where the caller takes responsibility for the on-disk state.

Production shutdown should call Drain instead, which additionally publishes EventRunInterrupted and flips each in-flight run to failed_resumable so the next server boot can offer one-click resume.

func (*Service) StoreDir

func (s *Service) StoreDir() string

StoreDir returns the on-disk store directory. Exposed so HTTP handlers can fall back to persisted run.log when the in-memory buffer is gone.

type ServiceOption

type ServiceOption func(*Service)

ServiceOption configures a Service at construction time.

func WithBroker

func WithBroker(b *EventBroker) ServiceOption

WithBroker injects an existing event broker. When omitted, the service creates its own.

func WithExtraEventObservers

func WithExtraEventObservers(observers ...func(store.Event)) ServiceOption

WithExtraEventObservers adds observers chained alongside the broker.Publish observer. Use this to wire Prometheus / OTLP exporters into the HTTP service's run goroutines.

func WithLogger

func WithLogger(l *iterlog.Logger) ServiceOption

WithLogger sets the logger used for service-level diagnostics.

func WithManager

func WithManager(m *Manager) ServiceOption

WithManager injects an existing lifecycle manager. When omitted, the service creates its own.

type SnapshotBuilder

type SnapshotBuilder struct {
	// contains filtered or unexported fields
}

func NewSnapshotBuilder

func NewSnapshotBuilder(run *store.Run) *SnapshotBuilder

NewSnapshotBuilder seeds a builder from the persisted Run metadata. Pass run=nil for an empty initial snapshot (e.g. when the WS catch-up races run.json creation).

func (*SnapshotBuilder) Apply

func (b *SnapshotBuilder) Apply(evt *store.Event)

Apply folds a single event into the running snapshot. Events MUST be applied in non-decreasing seq order; out-of-order events are ignored (the reducer is monotonic — re-applying a stale event would not produce a deterministic state).

func (*SnapshotBuilder) LastSeq

func (b *SnapshotBuilder) LastSeq() int64

LastSeq exposes the highest seq applied so far so live subscribers can resume cleanly via WS subscribe{from_seq}.

func (*SnapshotBuilder) SetRun

func (b *SnapshotBuilder) SetRun(run *store.Run)

SetRun refreshes the run-level header. Call this when a fresh run.json was just persisted (e.g. on terminal events).

func (*SnapshotBuilder) Snapshot

func (b *SnapshotBuilder) Snapshot() *RunSnapshot

Snapshot returns the current snapshot. Callers receive a fresh value (the slice is copied); the underlying ExecutionState pointers are shared but treated as immutable from the caller's side.

type WireEdge

type WireEdge struct {
	From       string `json:"from"`
	To         string `json:"to"`
	Condition  string `json:"condition,omitempty"`
	Negated    bool   `json:"negated,omitempty"`
	Expression string `json:"expression,omitempty"`
	Loop       string `json:"loop,omitempty"`
}

WireEdge mirrors the runtime-relevant fields of ir.Edge. Expression is sent as the original source string (the AST itself isn't useful to the UI, and serializing it would leak compiler internals).

type WireNode

type WireNode struct {
	ID              string `json:"id"`
	Kind            string `json:"kind"`
	Model           string `json:"model,omitempty"`
	Backend         string `json:"backend,omitempty"`
	ReasoningEffort string `json:"reasoning_effort,omitempty"`
}

WireNode is the minimal node projection used by the run-console canvas. It carries id + kind plus the LLM-call metadata (model / backend / reasoning_effort) for nodes that drive an LLM (Agent, Judge, Router-LLM) so the canvas can render those fields next to the node without the frontend having to parse the .iter source itself.

type WireWorkflow

type WireWorkflow struct {
	Name string `json:"name"`
	// Entry is the first node ID the runtime picks. Useful so the
	// frontend can mark it specially and start its layout from there.
	Entry string     `json:"entry"`
	Nodes []WireNode `json:"nodes"`
	Edges []WireEdge `json:"edges"`
	// StaleHash signals that the .iter source on disk no longer matches
	// the hash captured at run launch. The frontend can warn the user
	// that the IR they are viewing may diverge from what was executed.
	StaleHash bool `json:"stale_hash,omitempty"`
}

WireWorkflow is the JSON projection of an IR workflow used by the editor's "IR overlay" view. Heavier fields (schemas, prompts, vars, MCP config, full expression ASTs) are intentionally omitted — the overlay only needs the topology so it can layer execution counts.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL