Documentation
¶
Overview ¶
ABOUTME: Named, typed artifact storage for large pipeline outputs. ABOUTME: Transparently file-backs artifacts exceeding a size threshold (default 100KB).
ABOUTME: Defines the CodergenBackend interface that decouples CodergenHandler from the agent loop. ABOUTME: Provides AgentRunConfig and AgentRunResult types for configuring and receiving agent outcomes.
ABOUTME: AgentBackend wires CodergenBackend to the real agent loop and LLM SDK. ABOUTME: Creates agent sessions, configures provider profiles, and runs ProcessInput for each codergen node.
ABOUTME: ClaudeCodeBackend shells out to the `claude` CLI for codergen pipeline nodes. ABOUTME: Parses streaming JSONL output to extract results, token usage, and OUTCOME markers.
ABOUTME: Checkpoint serialization for persisting execution state to disk. ABOUTME: Supports JSON save/load for resuming pipeline runs from a known point.
ABOUTME: Type aliases bridging attractor/ to the consolidated dot/ package. ABOUTME: Provides backward-compatible Graph, Node, Edge, Subgraph types and Parse wrapper.
ABOUTME: Condition expression language for edge guards in the pipeline graph. ABOUTME: Evaluates clauses like "outcome = success && context.mode = prod" against Outcome and Context.
ABOUTME: Thread-safe key-value context store shared across pipeline stages. ABOUTME: Also defines StageStatus and Outcome types for node execution results.
ABOUTME: Edge selection algorithm for choosing the next edge during pipeline graph traversal. ABOUTME: Implements five-step priority: condition match > preferred label > suggested IDs > weight > lexical.
ABOUTME: Pipeline execution engine implementing the 5-phase lifecycle: PARSE, VALIDATE, INITIALIZE, EXECUTE, FINALIZE. ABOUTME: Orchestrates graph traversal, handler dispatch, retry logic, checkpointing, and edge selection.
ABOUTME: Query API for the append-only event log stored in RunStateStore. ABOUTME: Provides filtering, pagination, counting, tailing, and summarization of EngineEvents.
ABOUTME: Normalizes error messages into stable signatures for detecting repeat deterministic failures. ABOUTME: Replaces hex strings, UUIDs, numbers, timestamps, and file paths with placeholders.
ABOUTME: Context fidelity modes controlling how much context is carried between pipeline nodes. ABOUTME: Implements precedence resolution: edge > node > graph default > "compact".
ABOUTME: Applies fidelity-based context compaction and generates preamble strings. ABOUTME: Transforms pipeline context based on the fidelity mode before passing to the next node.
ABOUTME: Common handler interface, registry, and shape-to-type mapping for the attractor pipeline runner. ABOUTME: All 9 built-in node handlers implement NodeHandler and are registered via DefaultHandlerRegistry.
ABOUTME: Codergen (LLM coding agent) handler for the attractor pipeline runner. ABOUTME: Delegates to a CodergenBackend for actual LLM execution; returns error when backend is nil.
ABOUTME: Conditional branching handler for the attractor pipeline runner. ABOUTME: Returns success for diamond-shaped routing nodes; actual routing is handled by the engine's edge selection.
ABOUTME: Exit node handler for the attractor pipeline runner. ABOUTME: Captures the final pipeline state and returns success at the terminal node.
ABOUTME: Parallel fan-in handler for the attractor pipeline runner. ABOUTME: Waits for all incoming parallel branches to complete and merges their results.
ABOUTME: Wait for human handler for the attractor pipeline runner. ABOUTME: Presents choices derived from outgoing edges to a human via the Interviewer interface.
ABOUTME: Stack manager loop handler implementing observe/guard/steer supervision for child pipelines. ABOUTME: Uses the ManagerBackend interface for LLM-powered supervision; nil backend falls back to stub logging.
ABOUTME: Parallel fan-out handler for the attractor pipeline runner. ABOUTME: Records outgoing branches for concurrent execution by the engine.
ABOUTME: Start node handler for the attractor pipeline runner. ABOUTME: Initializes pipeline execution by recording a start timestamp and returning success.
ABOUTME: Tool handler that executes shell commands via os/exec for the attractor pipeline. ABOUTME: Supports timeout, working directory, env vars, and stores large output as artifacts.
ABOUTME: Interviewer interface and built-in implementations for human-in-the-loop interaction. ABOUTME: Provides AutoApprove, Callback, Queue, Recording, and Console interviewers.
ABOUTME: Defines the LogSink interface for structured event storage with query, retention, and indexing. ABOUTME: Provides FSLogSink, a filesystem-backed implementation wrapping FSRunStateStore and FSEventQuery.
ABOUTME: Parallel branch execution and context merging for concurrent pipeline fan-out/fan-in. ABOUTME: Provides ExecuteParallelBranches, MergeContexts, and supporting types for the engine.
ABOUTME: Pre-execution validation that checks provider accessibility and pipeline requirements. ABOUTME: Runs before the engine starts to provide fast, clear failure messages.
ABOUTME: Append-only NDJSON event logger for pipeline execution observability. ABOUTME: Writes engine events to a .ndjson file and maintains a live.json status snapshot.
ABOUTME: Loop restart handling for edges with loop_restart=true attribute. ABOUTME: Provides ErrLoopRestart sentinel, EdgeHasLoopRestart check, RestartConfig, and restart wrapper logic.
ABOUTME: Retry policy configuration and exponential backoff delay calculation for pipeline node execution. ABOUTME: Provides preset policies (none, standard, aggressive, linear, patient) and helper functions.
ABOUTME: RunDirectory manages the per-run directory layout for pipeline executions. ABOUTME: Provides structured storage for node artifacts, prompts, responses, and checkpoints.
ABOUTME: Defines RunState types and the RunStateStore interface for tracking pipeline run lifecycle. ABOUTME: Provides run ID generation using crypto/rand and the core data model for persistent run tracking.
ABOUTME: Filesystem-backed implementation of RunStateStore for persisting pipeline run state. ABOUTME: Stores each run in a directory with manifest.json, context.json, and an append-only events.jsonl log.
ABOUTME: HTTP server for managing pipeline execution via REST API with SSE streaming. ABOUTME: Provides endpoints for submitting, querying, cancelling pipelines, and human-in-the-loop Q&A.
ABOUTME: HTMX web frontend for the PipelineServer dashboard and pipeline detail views. ABOUTME: Uses go:embed for HTML templates and serves a browser-friendly UI alongside the JSON API.
ABOUTME: Computes a content-addressable hash of pipeline DOT source for matching runs. ABOUTME: Uses SHA-256 with no normalization — any byte change produces a different hash.
ABOUTME: CSS-like model stylesheet parser and applicator for assigning LLM properties to graph nodes. ABOUTME: Supports universal (*), class (.name), and ID (#name) selectors with specificity-based resolution.
ABOUTME: Sub-pipeline composition for inlining child DOT graphs into a parent pipeline. ABOUTME: Provides LoadSubPipeline, ComposeGraphs, and SubPipelineTransform for graph merging with namespace isolation.
ABOUTME: AST transforms applied between parsing and validation for the pipeline graph. ABOUTME: Implements variable expansion ($goal) and stylesheet application as a transform chain.
ABOUTME: Pipeline validation rules that check graph structure and node/edge attributes for correctness. ABOUTME: Provides a pluggable LintRule interface, built-in rules, Validate, and ValidateOrError functions.
ABOUTME: Background watchdog that detects stalled pipeline stages via progress timestamps. ABOUTME: Emits warning events when a node exceeds its configured stall timeout without progress.
Index ¶
- func DefaultShouldRetry(err error) bool
- func EdgeHasLoopRestart(edge *Edge) bool
- func EvaluateCondition(condition string, outcome *Outcome, ctx *Context) bool
- func FailureSignature(msg string) string
- func GeneratePreamble(prevNode string, mode FidelityMode, removedKeys int) string
- func GenerateRunID() (string, error)
- func HasCodergenNodes(graph *Graph) bool
- func IsValidFidelity(mode string) bool
- func MergeContexts(parent *Context, branches []BranchResult, policy string) error
- func NodeIDFromContext(ctx context.Context) string
- func NormalizeFailure(msg string) string
- func NormalizeLabel(label string) string
- func ShapeToHandlerType(shape string) string
- func SourceHash(source string) string
- func ValidFidelityModes() []string
- func ValidateConditionSyntax(condition string) bool
- func WithNodeID(ctx context.Context, nodeID string) context.Context
- type AgentBackend
- type AgentRunConfig
- type AgentRunResult
- type ArtifactInfo
- type ArtifactStore
- func (s *ArtifactStore) BaseDir() string
- func (s *ArtifactStore) Clear() error
- func (s *ArtifactStore) Has(id string) bool
- func (s *ArtifactStore) List() []ArtifactInfo
- func (s *ArtifactStore) Remove(id string) error
- func (s *ArtifactStore) Retrieve(id string) ([]byte, error)
- func (s *ArtifactStore) Store(id, name string, data []byte) (*ArtifactInfo, error)
- type AutoApproveInterviewer
- type BackoffConfig
- type BranchResult
- type CallbackInterviewer
- type Checkpoint
- type ClaudeCodeBackend
- type ClaudeCodeOption
- func WithClaudeAllowedTools(tools []string) ClaudeCodeOption
- func WithClaudeAppendSystemPrompt(prompt string) ClaudeCodeOption
- func WithClaudeBinaryPath(path string) ClaudeCodeOption
- func WithClaudeMaxBudgetUSD(budget float64) ClaudeCodeOption
- func WithClaudeModel(model string) ClaudeCodeOption
- func WithClaudeSkipPermissions(skip bool) ClaudeCodeOption
- type CodergenBackend
- type CodergenHandler
- type ConditionalHandler
- type ConsoleInterviewer
- type Context
- func (c *Context) AppendLog(entry string)
- func (c *Context) ApplyUpdates(updates map[string]any)
- func (c *Context) Clone() *Context
- func (c *Context) Get(key string) any
- func (c *Context) GetString(key string, defaultVal string) string
- func (c *Context) Logs() []string
- func (c *Context) Set(key string, value any)
- func (c *Context) Snapshot() map[string]any
- type DOTRenderFunc
- type Diagnostic
- type Edge
- type Engine
- func (e *Engine) GetEventHandler() func(EngineEvent)
- func (e *Engine) GetHandler(typeName string) NodeHandler
- func (e *Engine) ResumeFromCheckpoint(ctx context.Context, graph *Graph, checkpointPath string) (*RunResult, error)
- func (e *Engine) Run(ctx context.Context, source string) (*RunResult, error)
- func (e *Engine) RunGraph(ctx context.Context, graph *Graph) (*RunResult, error)
- func (e *Engine) SetEventHandler(handler func(EngineEvent))
- func (e *Engine) SetHandler(handler NodeHandler)
- type EngineConfig
- type EngineEvent
- type EngineEventType
- type ErrLoopRestart
- type EventFilter
- type EventQuery
- type EventQueryResponse
- type EventSummary
- type EventSummaryResponse
- type EventTailResponse
- type ExitHandler
- type FSEventQuery
- func (q *FSEventQuery) CountEvents(runID string, filter EventFilter) (int, error)
- func (q *FSEventQuery) QueryEvents(runID string, filter EventFilter) ([]EngineEvent, error)
- func (q *FSEventQuery) SummarizeEvents(runID string) (*EventSummary, error)
- func (q *FSEventQuery) TailEvents(runID string, n int) ([]EngineEvent, error)
- type FSLogSink
- func (s *FSLogSink) Append(runID string, event EngineEvent) error
- func (s *FSLogSink) Close() error
- func (s *FSLogSink) ListRuns() ([]RunIndexEntry, error)
- func (s *FSLogSink) Prune(olderThan time.Duration) (int, error)
- func (s *FSLogSink) Query(runID string, filter EventFilter) ([]EngineEvent, int, error)
- func (s *FSLogSink) Summarize(runID string) (*EventSummary, error)
- func (s *FSLogSink) Tail(runID string, n int) ([]EngineEvent, error)
- type FSRunStateStore
- func (s *FSRunStateStore) AddEvent(id string, event EngineEvent) error
- func (s *FSRunStateStore) CheckpointPath(runID string) string
- func (s *FSRunStateStore) Create(state *RunState) error
- func (s *FSRunStateStore) FindResumable(sourceHash string) (*RunState, error)
- func (s *FSRunStateStore) Get(id string) (*RunState, error)
- func (s *FSRunStateStore) List() ([]*RunState, error)
- func (s *FSRunStateStore) RunDir(runID string) string
- func (s *FSRunStateStore) Update(state *RunState) error
- type FailureTracker
- type FanInHandler
- type FidelityMode
- type FidelityOptions
- type Graph
- type GraphDOTFunc
- type GraphDOTWithStatusFunc
- type HandlerRegistry
- type Interviewer
- type LintRule
- type LiveState
- type LogSink
- type ManagerBackend
- type ManagerLoopHandler
- type Node
- type NodeHandler
- type NodeHandlerUnwrapper
- type Outcome
- type ParallelConfig
- type ParallelHandler
- type PendingQuestion
- type PipelineRun
- type PipelineServer
- type PipelineStatus
- type PreflightCheck
- type PreflightFailure
- type PreflightResult
- type ProgressEntry
- type ProgressLogger
- type QAPair
- type Question
- type QueueInterviewer
- type RecordingInterviewer
- type RestartConfig
- type RetentionConfig
- type RetryPolicy
- type RunDirectory
- func (rd *RunDirectory) EnsureNodeDir(nodeID string) error
- func (rd *RunDirectory) ListNodeArtifacts(nodeID string) ([]string, error)
- func (rd *RunDirectory) LoadCheckpoint() (*Checkpoint, error)
- func (rd *RunDirectory) NodeDir(nodeID string) string
- func (rd *RunDirectory) ReadNodeArtifact(nodeID, filename string) ([]byte, error)
- func (rd *RunDirectory) SaveCheckpoint(cp *Checkpoint) error
- func (rd *RunDirectory) WriteNodeArtifact(nodeID, filename string, data []byte) error
- func (rd *RunDirectory) WritePrompt(nodeID, prompt string) error
- func (rd *RunDirectory) WriteResponse(nodeID, response string) error
- type RunIndex
- type RunIndexEntry
- type RunResult
- type RunState
- type RunStateStore
- type Severity
- type StageStatus
- type StartHandler
- type StyleRule
- type Stylesheet
- type StylesheetApplicationTransform
- type SubPipelineTransform
- type Subgraph
- type TokenUsage
- type ToolCallEntry
- type ToolHandler
- type Transform
- type VariableExpansionTransform
- type WaitForHumanHandler
- type Watchdog
- type WatchdogConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultShouldRetry ¶
DefaultShouldRetry returns true for most errors as a simple default retry predicate. Returns false for nil errors.
func EdgeHasLoopRestart ¶
EdgeHasLoopRestart returns true if the edge has the loop_restart attribute set to "true".
func EvaluateCondition ¶
EvaluateCondition evaluates a condition expression against an outcome and context. Condition grammar: Clause ('&&' Clause)* Clause: Key Operator Literal Key: 'outcome' | 'preferred_label' | 'context.' Path | bare identifier Operator: '=' | '!=' An empty or whitespace-only condition evaluates to true (unconditional edge).
func FailureSignature ¶
FailureSignature returns a deterministic signature for the given error message. Messages that differ only in variable content (UUIDs, timestamps, line numbers, etc.) will produce identical signatures.
func GeneratePreamble ¶
func GeneratePreamble(prevNode string, mode FidelityMode, removedKeys int) string
GeneratePreamble produces a human-readable string describing what fidelity transformation was applied when transitioning from a previous node.
func GenerateRunID ¶
GenerateRunID produces a random 16-character hex string (8 bytes of entropy).
func HasCodergenNodes ¶
HasCodergenNodes returns true if the graph contains any nodes that would be resolved to the codergen handler. This mirrors the resolution logic in HandlerRegistry.Resolve: explicit type attribute first, then shape-based mapping, then default to codergen.
func IsValidFidelity ¶
IsValidFidelity checks if a string is a valid fidelity mode.
func MergeContexts ¶
func MergeContexts(parent *Context, branches []BranchResult, policy string) error
MergeContexts merges branch results back into the parent context according to the specified join policy. All merge operations are logged to the parent context's log for visibility, including which branch wrote which values and conflict resolution via last-write-wins. Artifact references from branch contexts are consolidated into a parallel.artifacts manifest.
Policies:
- "wait_all": All branches must succeed. Returns error if any branch failed. Merges all branch contexts into parent (last-write-wins for conflicts).
- "wait_any": At least one branch must succeed. Only successful branches are merged. Returns error if all branches failed.
- "k_of_n": At least K branches must succeed (K read from parent context key "parallel.k_required", defaults to N). Only successful branches merged.
- "quorum": A strict majority (>50%) of branches must succeed. Only successful branches are merged.
func NodeIDFromContext ¶
NodeIDFromContext extracts the pipeline node ID from the context. Returns an empty string when no node ID is present.
func NormalizeFailure ¶
NormalizeFailure takes an error message and replaces variable content (UUIDs, hex strings, timestamps, file paths, numbers) with stable placeholders. This allows structural comparison of errors that differ only in runtime-specific values.
func NormalizeLabel ¶
NormalizeLabel lowercases a label, trims whitespace, and strips accelerator prefixes like "[Y] ", "Y) ", "Y - " that are used for keyboard shortcuts in human interaction nodes.
func ShapeToHandlerType ¶
ShapeToHandlerType returns the handler type string for a given Graphviz shape. Unknown shapes default to "codergen" (the LLM handler).
func SourceHash ¶
SourceHash returns the lowercase hex-encoded SHA-256 hash of the raw source bytes. No normalization is applied: if the file changed at all, the hash changes.
func ValidFidelityModes ¶
func ValidFidelityModes() []string
ValidFidelityModes returns the list of valid fidelity mode strings.
func ValidateConditionSyntax ¶
ValidateConditionSyntax checks whether a condition string is syntactically valid. Returns true if the condition can be parsed, false otherwise.
Types ¶
type AgentBackend ¶
type AgentBackend struct {
// Client is the LLM client to use for making API calls. If nil,
// a client is created from environment variables at runtime.
Client *llm.Client
}
AgentBackend implements CodergenBackend by wiring to the real agent loop. It creates an agent.Session, configures the appropriate provider profile, sets up the execution environment, and runs the agent loop to completion.
func (*AgentBackend) RunAgent ¶
func (b *AgentBackend) RunAgent(ctx context.Context, config AgentRunConfig) (*AgentRunResult, error)
RunAgent executes an agent loop with the given configuration. It creates a session, selects the provider profile, sets up the execution environment, and runs ProcessInput until the agent completes or hits limits.
type AgentRunConfig ¶
type AgentRunConfig struct {
Prompt string // the prompt/instructions for the LLM
Model string // LLM model name (e.g., "claude-sonnet-4-5")
Provider string // LLM provider name (e.g., "anthropic", "openai", "gemini")
BaseURL string // custom API base URL (overrides provider default)
WorkDir string // working directory for file operations and commands
Goal string // pipeline-level goal for additional context
NodeID string // pipeline node identifier for logging/tracking
MaxTurns int // maximum agent loop turns (0 = use default of 20)
FidelityMode string // fidelity mode controlling conversation history management ("full", "compact", "truncate", "summary:*")
SystemPrompt string // user override appended to the agent's system prompt (empty = no override)
EventHandler func(EngineEvent) // engine event callback for agent-level observability
}
AgentRunConfig holds all configuration needed to execute an agent run for a single codergen pipeline node.
type AgentRunResult ¶
type AgentRunResult struct {
Output string // final text output from the agent
ToolCalls int // total number of tool calls made during the run
TokensUsed int // total tokens consumed across all LLM calls
Success bool // whether the agent completed without errors
ToolCallLog []ToolCallEntry // individual tool call details
TurnCount int // LLM call rounds
Usage TokenUsage // granular per-category token breakdown
}
AgentRunResult holds the outcome of an agent run.
type ArtifactInfo ¶
type ArtifactInfo struct {
ID string
Name string
SizeBytes int
StoredAt time.Time
IsFileBacked bool
}
ArtifactInfo describes a stored artifact.
type ArtifactStore ¶
type ArtifactStore struct {
// contains filtered or unexported fields
}
ArtifactStore provides named, typed storage for large outputs.
func NewArtifactStore ¶
func NewArtifactStore(baseDir string) *ArtifactStore
NewArtifactStore creates a new artifact store rooted at the given directory.
func (*ArtifactStore) BaseDir ¶
func (s *ArtifactStore) BaseDir() string
BaseDir returns the base directory for artifact storage.
func (*ArtifactStore) Clear ¶
func (s *ArtifactStore) Clear() error
Clear removes all artifacts, including any file-backed data on disk.
func (*ArtifactStore) Has ¶
func (s *ArtifactStore) Has(id string) bool
Has checks whether an artifact with the given ID exists.
func (*ArtifactStore) List ¶
func (s *ArtifactStore) List() []ArtifactInfo
List returns metadata for all stored artifacts.
func (*ArtifactStore) Remove ¶
func (s *ArtifactStore) Remove(id string) error
Remove deletes an artifact by ID. File-backed artifacts have their disk file removed.
func (*ArtifactStore) Retrieve ¶
func (s *ArtifactStore) Retrieve(id string) ([]byte, error)
Retrieve returns the data for the given artifact ID.
func (*ArtifactStore) Store ¶
func (s *ArtifactStore) Store(id, name string, data []byte) (*ArtifactInfo, error)
Store saves an artifact. Large artifacts (exceeding the threshold) are written to disk.
type AutoApproveInterviewer ¶
type AutoApproveInterviewer struct {
// contains filtered or unexported fields
}
AutoApproveInterviewer always returns a configured answer (or the first option). Intended for testing and automated pipelines where no human is available.
func NewAutoApproveInterviewer ¶
func NewAutoApproveInterviewer(defaultAnswer string) *AutoApproveInterviewer
NewAutoApproveInterviewer creates an AutoApproveInterviewer with the given default answer.
type BackoffConfig ¶
type BackoffConfig struct {
InitialDelay time.Duration // default 200ms
Factor float64 // default 2.0
MaxDelay time.Duration // default 60s
Jitter bool // default true
}
BackoffConfig controls delay timing between retry attempts.
func (BackoffConfig) DelayForAttempt ¶
func (b BackoffConfig) DelayForAttempt(attempt int) time.Duration
DelayForAttempt calculates the delay for a given attempt number (0-indexed). The formula is: InitialDelay * Factor^attempt, capped at MaxDelay. If Jitter is enabled, the delay is randomized in [0, calculated_delay].
type BranchResult ¶
BranchResult holds the outcome of executing a single parallel branch.
func ExecuteParallelBranches ¶
func ExecuteParallelBranches( ctx context.Context, graph *Graph, pctx *Context, store *ArtifactStore, registry *HandlerRegistry, branches []string, config ParallelConfig, ) ([]BranchResult, error)
ExecuteParallelBranches forks the context for each branch and executes them concurrently in separate goroutines. Each branch follows edges from its start node until it reaches a fan-in node (shape=tripleoctagon) or a terminal node. A buffered channel is used as a semaphore to respect MaxParallel.
Error policies:
- "continue": all branches run to completion regardless of failures (default).
- "fail_fast": on the first branch error or failure outcome, cancel remaining branches.
type CallbackInterviewer ¶
type CallbackInterviewer struct {
// contains filtered or unexported fields
}
CallbackInterviewer delegates question answering to a provided callback function. Useful for integrating with external systems (Slack, web UI, API).
func NewCallbackInterviewer ¶
func NewCallbackInterviewer(fn func(ctx context.Context, question string, options []string) (string, error)) *CallbackInterviewer
NewCallbackInterviewer creates a CallbackInterviewer that delegates to the given function.
type Checkpoint ¶
type Checkpoint struct {
Timestamp time.Time `json:"timestamp"`
CurrentNode string `json:"current_node"`
CompletedNodes []string `json:"completed_nodes"`
NodeRetries map[string]int `json:"node_retries"`
ContextValues map[string]any `json:"context_values"`
Logs []string `json:"logs"`
}
Checkpoint is a serializable snapshot of execution state.
func LoadCheckpoint ¶
func LoadCheckpoint(path string) (*Checkpoint, error)
LoadCheckpoint deserializes a checkpoint from JSON at the given path.
func NewCheckpoint ¶
func NewCheckpoint(ctx *Context, currentNode string, completedNodes []string, nodeRetries map[string]int) *Checkpoint
NewCheckpoint creates a checkpoint from the current execution state.
func (*Checkpoint) Save ¶
func (cp *Checkpoint) Save(path string) error
Save serializes the checkpoint to JSON and writes it to the given path.
type ClaudeCodeBackend ¶
type ClaudeCodeBackend struct {
BinaryPath string // resolved via exec.LookPath("claude") if empty
DefaultModel string // falls back to "" (let claude pick)
AllowedTools []string // e.g. ["Bash","Read","Edit","Write","Glob","Grep"]
SkipPermissions bool // default: true (required for autonomous pipelines)
AppendSystemPrompt string // appended to claude's default system prompt
MaxBudgetUSD float64 // maximum dollar spend per run (0 = no limit)
}
ClaudeCodeBackend implements CodergenBackend by shelling out to the `claude` CLI. It uses --print --output-format stream-json to get streaming JSONL output with real token breakdowns, then parses the result events.
The claude CLI does not support a --max-turns flag. AgentRunConfig.MaxTurns is not honored by this backend. Use --max-budget-usd for cost control instead.
func NewClaudeCodeBackend ¶
func NewClaudeCodeBackend(opts ...ClaudeCodeOption) (*ClaudeCodeBackend, error)
NewClaudeCodeBackend creates a ClaudeCodeBackend with the given options. By default it resolves the "claude" binary from PATH and enables SkipPermissions (required for non-interactive pipeline execution).
func (*ClaudeCodeBackend) RunAgent ¶
func (b *ClaudeCodeBackend) RunAgent(ctx context.Context, config AgentRunConfig) (*AgentRunResult, error)
RunAgent executes the claude CLI with the given configuration and parses the streaming JSONL output to build an AgentRunResult.
type ClaudeCodeOption ¶
type ClaudeCodeOption func(*ClaudeCodeBackend)
ClaudeCodeOption configures a ClaudeCodeBackend via functional options.
func WithClaudeAllowedTools ¶
func WithClaudeAllowedTools(tools []string) ClaudeCodeOption
WithClaudeAllowedTools sets the allowed tools list for claude CLI invocations.
func WithClaudeAppendSystemPrompt ¶
func WithClaudeAppendSystemPrompt(prompt string) ClaudeCodeOption
WithClaudeAppendSystemPrompt sets additional system prompt text.
func WithClaudeBinaryPath ¶
func WithClaudeBinaryPath(path string) ClaudeCodeOption
WithClaudeBinaryPath sets the path to the claude binary.
func WithClaudeMaxBudgetUSD ¶
func WithClaudeMaxBudgetUSD(budget float64) ClaudeCodeOption
WithClaudeMaxBudgetUSD sets the maximum dollar spend per invocation.
func WithClaudeModel ¶
func WithClaudeModel(model string) ClaudeCodeOption
WithClaudeModel sets the default model for claude CLI invocations.
func WithClaudeSkipPermissions ¶
func WithClaudeSkipPermissions(skip bool) ClaudeCodeOption
WithClaudeSkipPermissions controls whether --dangerously-skip-permissions is passed.
type CodergenBackend ¶
type CodergenBackend interface {
// RunAgent executes an agent loop with the given configuration and returns
// the result. The context controls cancellation and timeouts.
RunAgent(ctx context.Context, config AgentRunConfig) (*AgentRunResult, error)
}
CodergenBackend abstracts the LLM agent execution so that CodergenHandler does not depend directly on the agent or llm packages.
type CodergenHandler ¶
type CodergenHandler struct {
// Backend is the agent execution backend. When nil, the handler
// returns StatusFail indicating no LLM backend is configured.
Backend CodergenBackend
// BaseURL is the default API base URL for the LLM provider. Set by the
// engine during backend wiring. Can be overridden per-node via base_url attr.
BaseURL string
// EventHandler receives agent-level observability events bridged from the
// agent session into the engine event system. Set by the engine during
// backend wiring.
EventHandler func(EngineEvent)
}
CodergenHandler handles LLM-powered coding task nodes (shape=box). This is the default handler for nodes without an explicit type. When Backend is set, it delegates to the agent loop for real LLM execution. When Backend is nil, it returns StatusFail with a configuration error.
func (*CodergenHandler) Execute ¶
func (h *CodergenHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)
Execute processes a codergen node by reading its prompt, label, model, and provider. If a Backend is configured, it runs the agent loop for real LLM execution. Otherwise, it falls back to stub behavior.
func (*CodergenHandler) Type ¶
func (h *CodergenHandler) Type() string
Type returns the handler type string "codergen".
type ConditionalHandler ¶
type ConditionalHandler struct{}
ConditionalHandler handles conditional routing nodes (shape=diamond). The handler itself is a no-op that returns success. The actual routing is handled by the execution engine's edge selection algorithm, which evaluates conditions on outgoing edges using EvaluateCondition.
func (*ConditionalHandler) Execute ¶
func (h *ConditionalHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)
Execute returns success with a note describing the conditional evaluation. Edge condition evaluation and routing are performed by the engine after this handler returns.
func (*ConditionalHandler) Type ¶
func (h *ConditionalHandler) Type() string
Type returns the handler type string "conditional".
type ConsoleInterviewer ¶
type ConsoleInterviewer struct {
// contains filtered or unexported fields
}
ConsoleInterviewer reads answers from an io.Reader and writes prompts to an io.Writer. By default, uses os.Stdin and os.Stdout.
func NewConsoleInterviewer ¶
func NewConsoleInterviewer() *ConsoleInterviewer
NewConsoleInterviewer creates a ConsoleInterviewer using os.Stdin and os.Stdout.
func NewConsoleInterviewerWithIO ¶
func NewConsoleInterviewerWithIO(r io.Reader, w io.Writer) *ConsoleInterviewer
NewConsoleInterviewerWithIO creates a ConsoleInterviewer with configurable reader and writer.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
Context is a thread-safe key-value store shared across pipeline stages.
func ApplyFidelity ¶
func ApplyFidelity(pctx *Context, mode FidelityMode, opts FidelityOptions) (*Context, string)
ApplyFidelity applies the given fidelity mode to the context and returns the transformed context and a preamble string describing what was done.
func (*Context) ApplyUpdates ¶
ApplyUpdates merges the given key-value pairs into the context.
func (*Context) GetString ¶
GetString retrieves the string value for the given key. If the key is missing or the value is not a string, defaultVal is returned.
type DOTRenderFunc ¶
DOTRenderFunc renders raw DOT text to the specified format (svg, png).
type Diagnostic ¶
type Diagnostic struct {
Rule string
Severity Severity
Message string
NodeID string // optional
Edge *[2]string // optional (from, to)
Fix string // optional suggested fix
}
Diagnostic represents a validation finding.
func Validate ¶
func Validate(g *Graph, extraRules ...LintRule) []Diagnostic
Validate runs all built-in lint rules plus any extra rules on the graph.
func ValidateOrError ¶
func ValidateOrError(g *Graph, extraRules ...LintRule) ([]Diagnostic, error)
ValidateOrError runs validation and returns an error if any ERROR-severity diagnostics exist.
type Edge ¶
func SelectEdge ¶
SelectEdge chooses the next edge from a node using five-step priority: 1. Condition-matching edges (non-empty condition that evaluates true), best by weight then lexical 2. Preferred label match (outcome.PreferredLabel matches edge label after normalization) 3. Suggested next IDs (outcome.SuggestedNextIDs matches edge.To) 4. Highest weight among unconditional edges (no condition attribute or empty condition) 5. Lexical tiebreak on To field Returns nil if no outgoing edges exist.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is the pipeline execution engine that runs attractor graph pipelines.
func NewEngine ¶
func NewEngine(config EngineConfig) *Engine
NewEngine creates a new pipeline execution engine with the given configuration.
func (*Engine) GetEventHandler ¶
func (e *Engine) GetEventHandler() func(EngineEvent)
GetEventHandler returns the engine's current event callback, or nil if none is set.
func (*Engine) GetHandler ¶
func (e *Engine) GetHandler(typeName string) NodeHandler
GetHandler returns the handler registered for the given type string from the engine's handler registry. Returns nil if no registry is configured or the handler type is not found. If no registry was configured, a default registry is initialized first.
func (*Engine) ResumeFromCheckpoint ¶
func (e *Engine) ResumeFromCheckpoint(ctx context.Context, graph *Graph, checkpointPath string) (*RunResult, error)
ResumeFromCheckpoint loads a checkpoint from disk and resumes graph execution from the node after the checkpointed node. If the previous node used full fidelity, the first hop after resume is degraded to summary:high since in-memory LLM sessions cannot be serialized.
func (*Engine) Run ¶
Run parses DOT source, then runs the resulting graph through the full 5-phase lifecycle.
func (*Engine) RunGraph ¶
RunGraph runs an already-parsed graph through the VALIDATE, INITIALIZE, EXECUTE, and FINALIZE phases.
func (*Engine) SetEventHandler ¶
func (e *Engine) SetEventHandler(handler func(EngineEvent))
SetEventHandler sets the engine's event callback after creation. This allows external components (like the TUI) to wire into the event stream.
func (*Engine) SetHandler ¶
func (e *Engine) SetHandler(handler NodeHandler)
SetHandler registers a handler in the engine's handler registry. If no registry was configured, a default registry is initialized first.
type EngineConfig ¶
type EngineConfig struct {
CheckpointDir string // directory for checkpoint files (empty = no checkpoints)
AutoCheckpointPath string // path to overwrite with latest checkpoint after each node (empty = disabled)
ArtifactDir string // directory for artifact storage (empty = use ArtifactsBaseDir/<RunID>)
ArtifactsBaseDir string // base directory for run directories (default = "./artifacts")
RunID string // run identifier for the artifact subdirectory (empty = auto-generated)
Transforms []Transform // transforms to apply (nil = DefaultTransforms)
ExtraLintRules []LintRule // additional validation rules
DefaultRetry RetryPolicy // default retry policy for nodes
Handlers *HandlerRegistry // nil = DefaultHandlerRegistry
EventHandler func(EngineEvent) // optional event callback
Backend CodergenBackend // backend for codergen nodes (nil = stub behavior)
BaseURL string // default API base URL for codergen nodes (overridable per-node)
RestartConfig *RestartConfig // loop restart configuration (nil = DefaultRestartConfig)
DefaultNodeTimeout time.Duration // global fallback timeout for node execution (0 = no timeout)
}
EngineConfig holds configuration for the pipeline execution engine.
type EngineEvent ¶
type EngineEvent struct {
Type EngineEventType
NodeID string
Data map[string]any
Timestamp time.Time
}
EngineEvent represents a lifecycle event emitted by the engine during pipeline execution.
type EngineEventType ¶
type EngineEventType string
EngineEventType identifies the kind of engine lifecycle event.
const ( EventPipelineStarted EngineEventType = "pipeline.started" EventPipelineCompleted EngineEventType = "pipeline.completed" EventPipelineFailed EngineEventType = "pipeline.failed" EventStageStarted EngineEventType = "stage.started" EventStageCompleted EngineEventType = "stage.completed" EventStageFailed EngineEventType = "stage.failed" EventStageRetrying EngineEventType = "stage.retrying" EventStageStalled EngineEventType = "stage.stalled" EventCheckpointSaved EngineEventType = "checkpoint.saved" // Agent-level observability events bridged from the coding agent session. EventAgentToolCallStart EngineEventType = "agent.tool_call.start" EventAgentToolCallEnd EngineEventType = "agent.tool_call.end" EventAgentLLMTurn EngineEventType = "agent.llm_turn" EventAgentSteering EngineEventType = "agent.steering" EventAgentLoopDetected EngineEventType = "agent.loop_detected" )
type ErrLoopRestart ¶
type ErrLoopRestart struct {
TargetNode string
}
ErrLoopRestart is a sentinel error returned by executeGraph when a selected edge has the loop_restart=true attribute. It carries the target node ID so the engine can restart traversal from that node with a fresh context.
func (*ErrLoopRestart) Error ¶
func (e *ErrLoopRestart) Error() string
Error implements the error interface.
type EventFilter ¶
type EventFilter struct {
Types []EngineEventType // filter by event type(s); empty means all types
NodeID string // filter by specific node; empty means all nodes
Since *time.Time // events at or after this time; nil means no lower bound
Until *time.Time // events at or before this time; nil means no upper bound
Limit int // max results; 0 means unlimited
Offset int // skip first N results after filtering
}
EventFilter specifies criteria for filtering engine events from a run's event log.
type EventQuery ¶
type EventQuery interface {
QueryEvents(runID string, filter EventFilter) ([]EngineEvent, error)
CountEvents(runID string, filter EventFilter) (int, error)
TailEvents(runID string, n int) ([]EngineEvent, error)
SummarizeEvents(runID string) (*EventSummary, error)
}
EventQuery defines the interface for querying engine events from a run.
type EventQueryResponse ¶
type EventQueryResponse struct {
Events []EngineEvent `json:"events"`
Total int `json:"total"`
}
EventQueryResponse is the JSON response for event query endpoints.
type EventSummary ¶
type EventSummary struct {
TotalEvents int
ByType map[EngineEventType]int
ByNode map[string]int
FirstEvent *time.Time
LastEvent *time.Time
}
EventSummary holds aggregate statistics about a run's events.
type EventSummaryResponse ¶
type EventSummaryResponse struct {
TotalEvents int `json:"total_events"`
ByType map[string]int `json:"by_type"`
ByNode map[string]int `json:"by_node"`
FirstEvent string `json:"first_event,omitempty"`
LastEvent string `json:"last_event,omitempty"`
}
EventSummaryResponse is the JSON response for the event summary endpoint.
type EventTailResponse ¶
type EventTailResponse struct {
Events []EngineEvent `json:"events"`
}
EventTailResponse is the JSON response for the event tail endpoint.
type ExitHandler ¶
type ExitHandler struct{}
ExitHandler handles the pipeline exit point node (shape=Msquare). It records the finish time and returns success. Goal gate enforcement is handled by the execution engine, not by this handler.
func (*ExitHandler) Execute ¶
func (h *ExitHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)
Execute captures the final outcome and writes a summary to context.
func (*ExitHandler) Type ¶
func (h *ExitHandler) Type() string
Type returns the handler type string "exit".
type FSEventQuery ¶
type FSEventQuery struct {
// contains filtered or unexported fields
}
FSEventQuery implements EventQuery using an FSRunStateStore as its backing store.
func NewFSEventQuery ¶
func NewFSEventQuery(store *FSRunStateStore) *FSEventQuery
NewFSEventQuery creates a new FSEventQuery backed by the given FSRunStateStore.
func (*FSEventQuery) CountEvents ¶
func (q *FSEventQuery) CountEvents(runID string, filter EventFilter) (int, error)
CountEvents returns the count of events matching the filter criteria. Pagination (Limit/Offset) is ignored for counting purposes.
func (*FSEventQuery) QueryEvents ¶
func (q *FSEventQuery) QueryEvents(runID string, filter EventFilter) ([]EngineEvent, error)
QueryEvents returns events from the given run matching the filter criteria. Events are loaded from the JSONL file and filtered in memory.
func (*FSEventQuery) SummarizeEvents ¶
func (q *FSEventQuery) SummarizeEvents(runID string) (*EventSummary, error)
SummarizeEvents produces aggregate statistics about a run's event log.
func (*FSEventQuery) TailEvents ¶
func (q *FSEventQuery) TailEvents(runID string, n int) ([]EngineEvent, error)
TailEvents returns the last n events from the run. If there are fewer than n events, all events are returned.
type FSLogSink ¶
type FSLogSink struct {
// contains filtered or unexported fields
}
FSLogSink is a filesystem-backed LogSink that wraps FSRunStateStore for storage and FSEventQuery for querying. It maintains a JSON index file for fast run enumeration.
func NewFSLogSink ¶
NewFSLogSink creates a new filesystem-backed LogSink rooted at baseDir. The index file is created if it does not already exist.
func (*FSLogSink) Append ¶
func (s *FSLogSink) Append(runID string, event EngineEvent) error
Append writes an event to the run's event log and updates the index.
func (*FSLogSink) Close ¶
Close releases any resources held by the sink. Calling Close multiple times is safe.
func (*FSLogSink) ListRuns ¶
func (s *FSLogSink) ListRuns() ([]RunIndexEntry, error)
ListRuns returns all run index entries from the index file.
func (*FSLogSink) Prune ¶
Prune deletes all runs whose start time is older than the given duration ago. It removes both the run directory and its entry from the index. Returns the number of runs pruned.
func (*FSLogSink) Query ¶
func (s *FSLogSink) Query(runID string, filter EventFilter) ([]EngineEvent, int, error)
Query returns events matching the filter and the total count of matching events (before pagination). The total reflects the full filtered result set, while the returned slice respects Limit and Offset.
type FSRunStateStore ¶
type FSRunStateStore struct {
// contains filtered or unexported fields
}
FSRunStateStore is a filesystem-backed RunStateStore. Each run is stored in a subdirectory of baseDir named by run ID.
func NewFSRunStateStore ¶
func NewFSRunStateStore(baseDir string) (*FSRunStateStore, error)
NewFSRunStateStore creates a new filesystem-backed run state store rooted at baseDir. The base directory is created if it does not already exist.
func (*FSRunStateStore) AddEvent ¶
func (s *FSRunStateStore) AddEvent(id string, event EngineEvent) error
AddEvent appends an EngineEvent to the run's events.jsonl file. Returns an error if the run does not exist.
func (*FSRunStateStore) CheckpointPath ¶
func (s *FSRunStateStore) CheckpointPath(runID string) string
CheckpointPath returns the path to the checkpoint.json file for a given run ID.
func (*FSRunStateStore) Create ¶
func (s *FSRunStateStore) Create(state *RunState) error
Create persists a new RunState to disk. Returns an error if a run with the same ID already exists.
func (*FSRunStateStore) FindResumable ¶
func (s *FSRunStateStore) FindResumable(sourceHash string) (*RunState, error)
FindResumable returns the most recent non-completed run whose SourceHash matches the given hash AND has a checkpoint.json file in its run directory. Returns nil if no matching run is found.
func (*FSRunStateStore) Get ¶
func (s *FSRunStateStore) Get(id string) (*RunState, error)
Get loads a RunState from disk by its ID. Returns an error if the run does not exist or if any of the stored files are corrupt.
func (*FSRunStateStore) List ¶
func (s *FSRunStateStore) List() ([]*RunState, error)
List returns all RunStates stored on disk. Non-directory entries in the base directory are silently ignored.
func (*FSRunStateStore) RunDir ¶
func (s *FSRunStateStore) RunDir(runID string) string
RunDir returns the base directory path for a given run ID.
func (*FSRunStateStore) Update ¶
func (s *FSRunStateStore) Update(state *RunState) error
Update overwrites the manifest and context for an existing run. Returns an error if the run does not exist.
type FailureTracker ¶
type FailureTracker struct {
// contains filtered or unexported fields
}
FailureTracker tracks failure signatures across retries to detect deterministic (repeating) failures. A failure is considered deterministic when the same normalized signature has been seen 2 or more times. FailureTracker is safe for concurrent use.
func NewFailureTracker ¶
func NewFailureTracker() *FailureTracker
NewFailureTracker creates a FailureTracker ready to record errors.
func (*FailureTracker) Count ¶
func (t *FailureTracker) Count(signature string) int
Count returns how many times the given signature has been recorded.
func (*FailureTracker) IsDeterministic ¶
func (t *FailureTracker) IsDeterministic(signature string) bool
IsDeterministic returns true if the given signature has been seen 2 or more times, indicating the failure is likely deterministic and not transient.
func (*FailureTracker) Record ¶
func (t *FailureTracker) Record(err error) string
Record normalizes the error message, increments the count for its signature, and returns the signature string.
type FanInHandler ¶
type FanInHandler struct{}
FanInHandler handles parallel fan-in nodes (shape=tripleoctagon). It reads parallel results from the pipeline context and consolidates them. If no parallel results are available, it returns a failure.
func (*FanInHandler) Execute ¶
func (h *FanInHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)
Execute reads parallel branch results from context and merges them. Returns success when results are present, or failure if none are found.
func (*FanInHandler) Type ¶
func (h *FanInHandler) Type() string
Type returns the handler type string "parallel.fan_in".
type FidelityMode ¶
type FidelityMode string
FidelityMode represents how much context is carried between nodes.
const ( FidelityFull FidelityMode = "full" FidelityTruncate FidelityMode = "truncate" FidelityCompact FidelityMode = "compact" FidelitySummaryLow FidelityMode = "summary:low" FidelitySummaryMedium FidelityMode = "summary:medium" FidelitySummaryHigh FidelityMode = "summary:high" )
func ResolveFidelity ¶
func ResolveFidelity(edge *Edge, targetNode *Node, graph *Graph) FidelityMode
ResolveFidelity resolves the fidelity mode for a target node using precedence: 1. Edge fidelity attribute (on incoming edge) 2. Target node fidelity attribute 3. Graph default_fidelity attribute 4. Default: compact
type FidelityOptions ¶
type FidelityOptions struct {
MaxKeys int // max context keys to keep (default 50 for truncate)
MaxValueLength int // max string value length before truncation (default 1024 for compact, 500 for summary:high)
MaxLogs int // max log entries to keep (default 20 for compact)
Whitelist []string // custom whitelist keys for summary modes (overrides defaults)
}
FidelityOptions configures the behavior of fidelity-based context compaction.
type Graph ¶
Type aliases: all attractor code continues to use Graph, Node, Edge, Subgraph without any changes, but the actual types come from the dot/ package.
func ApplyTransforms ¶
ApplyTransforms applies a sequence of transforms to a graph, returning the final result.
func ComposeGraphs ¶
func ComposeGraphs(parent *Graph, childGraph *Graph, insertNodeID string, namespace string) (*Graph, error)
ComposeGraphs merges a child graph into a parent graph by replacing the node identified by insertNodeID. Child node IDs are prefixed with "{namespace}." to avoid ID conflicts. Parent edges pointing to or from the insert node are reconnected to the child's start and terminal nodes respectively. Child graph attributes are copied, but parent attributes take precedence on conflicts.
func LoadSubPipeline ¶
LoadSubPipeline reads a DOT file from disk and parses it into a Graph.
type GraphDOTFunc ¶
GraphDOTFunc converts a Graph to DOT text. Used by PipelineServer for graph rendering.
type GraphDOTWithStatusFunc ¶
GraphDOTWithStatusFunc converts a Graph to DOT text with execution status color overlays.
type HandlerRegistry ¶
type HandlerRegistry struct {
// contains filtered or unexported fields
}
HandlerRegistry maps handler type strings to handler instances.
func DefaultHandlerRegistry ¶
func DefaultHandlerRegistry() *HandlerRegistry
DefaultHandlerRegistry creates a registry with all 9 built-in handlers registered.
func NewHandlerRegistry ¶
func NewHandlerRegistry() *HandlerRegistry
NewHandlerRegistry creates a new empty handler registry.
func (*HandlerRegistry) Get ¶
func (r *HandlerRegistry) Get(typeName string) NodeHandler
Get returns the handler registered for the given type string, or nil if not found.
func (*HandlerRegistry) Register ¶
func (r *HandlerRegistry) Register(handler NodeHandler)
Register adds a handler to the registry, keyed by its Type() string. Registering for an already-registered type replaces the previous handler.
func (*HandlerRegistry) Resolve ¶
func (r *HandlerRegistry) Resolve(node *Node) NodeHandler
Resolve finds the appropriate handler for a node using the resolution order: 1. Explicit type attribute on the node 2. Shape-based resolution using the shape-to-handler-type mapping 3. Default to codergen handler
type Interviewer ¶
type Interviewer interface {
Ask(ctx context.Context, question string, options []string) (string, error)
}
Interviewer is the abstraction for human-in-the-loop interaction. Any frontend (CLI, web, Slack, programmatic) implements this interface.
type LintRule ¶
type LintRule interface {
Name() string
Apply(g *Graph) []Diagnostic
}
LintRule is the interface for validation rules.
type LiveState ¶
type LiveState struct {
Status string `json:"status"`
ActiveNode string `json:"active_node"`
Completed []string `json:"completed"`
Failed []string `json:"failed"`
StartedAt string `json:"started_at"`
UpdatedAt string `json:"updated_at"`
EventCount int `json:"event_count"`
}
LiveState represents the current pipeline execution snapshot, written to live.json after each event so external tools can poll for status.
type LogSink ¶
type LogSink interface {
// Append writes an event to the log for the given run.
Append(runID string, event EngineEvent) error
// Query returns events matching the filter, along with the total count of
// matching events (before pagination). This allows callers to paginate while
// knowing the full result set size.
Query(runID string, filter EventFilter) ([]EngineEvent, int, error)
// Tail returns the last n events from the run's event log.
Tail(runID string, n int) ([]EngineEvent, error)
// Summarize returns aggregate statistics for a run's event log.
Summarize(runID string) (*EventSummary, error)
// Prune deletes all runs whose start time is older than the given duration ago.
// Returns the number of runs pruned.
Prune(olderThan time.Duration) (int, error)
// Close releases any resources held by the sink.
Close() error
}
LogSink defines the interface for structured event log storage with query, retention, and lifecycle management capabilities.
type ManagerBackend ¶
type ManagerBackend interface {
// Observe inspects the current state of the supervised pipeline and returns
// a textual observation summarizing progress.
Observe(ctx context.Context, nodeID string, iteration int, pctx *Context) (string, error)
// Guard evaluates whether the supervised agent is on track. Returns true if
// the guard condition is satisfied, false if steering is needed.
Guard(ctx context.Context, nodeID string, iteration int, observation string, guardCondition string, pctx *Context) (bool, error)
// Steer applies a correction to the supervised pipeline, returning a textual
// description of the steering action taken.
Steer(ctx context.Context, nodeID string, iteration int, steerPrompt string, pctx *Context) (string, error)
}
ManagerBackend defines the interface for LLM-powered supervision actions. Implementations handle observing agent progress, guarding against drift, and steering corrections when the agent goes off-track.
type ManagerLoopHandler ¶
type ManagerLoopHandler struct {
// Backend provides the LLM-powered observe/guard/steer operations.
// If nil, the handler uses stub behavior that logs supervision steps.
Backend ManagerBackend
}
ManagerLoopHandler handles stack manager loop nodes (shape=house). It runs a supervision loop that observes, guards, and steers a child pipeline or agent. When Backend is nil, the handler operates in stub mode, logging each step and returning success.
func (*ManagerLoopHandler) Execute ¶
func (h *ManagerLoopHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)
Execute runs the supervision loop for the given manager node. It reads configuration from node attributes, then iterates through observe/guard/steer cycles up to max_iterations.
func (*ManagerLoopHandler) Type ¶
func (h *ManagerLoopHandler) Type() string
Type returns the handler type string "stack.manager_loop".
type NodeHandler ¶
type NodeHandler interface {
// Type returns the handler type string (e.g., "start", "codergen", "wait.human").
Type() string
// Execute runs the handler logic for the given node.
// ctx is the Go context for cancellation/timeout.
// node is the parsed Node with all its attributes.
// pctx is the shared pipeline Context (thread-safe KV store).
// store is the artifact store for large outputs.
Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)
}
NodeHandler is the interface that all node handlers implement. The execution engine dispatches to the appropriate handler based on node type or shape.
type NodeHandlerUnwrapper ¶
type NodeHandlerUnwrapper interface {
InnerHandler() NodeHandler
}
NodeHandlerUnwrapper allows handler wrappers to expose their inner handler. This enables backend wiring to reach through decorator layers (e.g. interviewerInjectingHandler) to the underlying CodergenHandler.
type Outcome ¶
type Outcome struct {
Status StageStatus
PreferredLabel string
SuggestedNextIDs []string
ContextUpdates map[string]any
Notes string
FailureReason string
}
Outcome is the result of executing a node handler.
type ParallelConfig ¶
type ParallelConfig struct {
MaxParallel int
JoinPolicy string
ErrorPolicy string
KRequired int // For k_of_n policy: minimum number of branches that must succeed
}
ParallelConfig holds parsed configuration for parallel execution.
func ParallelConfigFromContext ¶
func ParallelConfigFromContext(pctx *Context) ParallelConfig
ParallelConfigFromContext reads parallel configuration values from the pipeline context and returns a ParallelConfig with defaults applied for missing values.
type ParallelHandler ¶
type ParallelHandler struct{}
ParallelHandler handles parallel fan-out nodes (shape=component). It identifies all outgoing edges as parallel branches and records them in the outcome. The actual concurrent execution is managed by the engine.
func (*ParallelHandler) Execute ¶
func (h *ParallelHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)
Execute identifies outgoing branches and returns an outcome listing them. If there are no outgoing edges, it returns a failure.
func (*ParallelHandler) Type ¶
func (h *ParallelHandler) Type() string
Type returns the handler type string "parallel".
type PendingQuestion ¶
type PendingQuestion struct {
ID string `json:"id"`
Question string `json:"question"`
Options []string `json:"options"`
Answered bool `json:"answered"`
Answer string `json:"answer,omitempty"`
}
PendingQuestion represents a question waiting for a human answer.
type PipelineRun ¶
type PipelineRun struct {
ID string
Status string // "running", "completed", "failed", "cancelled"
Source string // original DOT source
Result *RunResult
Error string
ArtifactDir string // path to the run's artifact directory
Events []EngineEvent // collected events
Cancel context.CancelFunc
Questions []PendingQuestion // for human-in-the-loop
CreatedAt time.Time
// contains filtered or unexported fields
}
PipelineRun tracks a running pipeline.
type PipelineServer ¶
type PipelineServer struct {
// ToDOT converts a Graph to DOT text. If nil, handleGetGraph uses a minimal fallback.
ToDOT GraphDOTFunc
// ToDOTWithStatus converts a Graph to DOT text with status color overlays.
// If nil, falls back to ToDOT.
ToDOTWithStatus GraphDOTWithStatusFunc
// RenderDOTSource renders raw DOT text to svg/png. If nil, only DOT format is available.
RenderDOTSource DOTRenderFunc
// contains filtered or unexported fields
}
PipelineServer manages HTTP endpoints for running pipelines.
func NewPipelineServer ¶
func NewPipelineServer(engine *Engine) *PipelineServer
NewPipelineServer creates a new PipelineServer with the given engine.
func (*PipelineServer) Handler ¶
func (s *PipelineServer) Handler() http.Handler
Handler returns the HTTP handler for this server, wrapped with request logging.
func (*PipelineServer) LoadPersistedRuns ¶
func (s *PipelineServer) LoadPersistedRuns() error
LoadPersistedRuns loads all previously persisted pipeline runs from the configured RunStateStore into the server's in-memory map. This allows the server to serve historical run data after a restart.
func (*PipelineServer) ServeHTTP ¶
func (s *PipelineServer) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP delegates to the internal mux.
func (*PipelineServer) SetEventQuery ¶
func (s *PipelineServer) SetEventQuery(eq EventQuery)
SetEventQuery configures the EventQuery backing store for the event query REST endpoints.
func (*PipelineServer) SetRunStateStore ¶
func (s *PipelineServer) SetRunStateStore(store RunStateStore)
SetRunStateStore configures the persistent RunStateStore for the server. When set, the server persists pipeline runs on creation and completion, and can load previously persisted runs on startup.
type PipelineStatus ¶
type PipelineStatus struct {
ID string `json:"id"`
Status string `json:"status"`
CompletedNodes []string `json:"completed_nodes,omitempty"`
ArtifactDir string `json:"artifact_dir,omitempty"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
PipelineStatus is the JSON response for pipeline status queries.
type PreflightCheck ¶
type PreflightCheck struct {
Name string // human-readable check name
Check func(ctx context.Context) error // the actual check; nil error means pass
}
PreflightCheck represents a single validation check to run before pipeline execution.
func BuildPreflightChecks ¶
func BuildPreflightChecks(graph *Graph, cfg EngineConfig) []PreflightCheck
BuildPreflightChecks analyzes the graph and engine configuration to produce the set of preflight checks appropriate for this pipeline. Checks include:
- Backend availability when codergen nodes are present
- Required environment variables declared via env_required node attributes
type PreflightFailure ¶
PreflightFailure records a single check failure with its name and reason.
type PreflightResult ¶
type PreflightResult struct {
Passed []string // names of checks that passed
Failed []PreflightFailure // checks that failed with reasons
}
PreflightResult holds the aggregated results of all preflight checks.
func RunPreflight ¶
func RunPreflight(ctx context.Context, checks []PreflightCheck) PreflightResult
RunPreflight executes all checks and collects results. Every check is run regardless of whether earlier checks fail, so the caller gets a complete picture of what needs to be fixed.
func (PreflightResult) Error ¶
func (r PreflightResult) Error() string
Error formats all failures as a multi-line string. Returns empty string if no failures.
type ProgressEntry ¶
type ProgressEntry struct {
Timestamp string `json:"timestamp"`
Type string `json:"type"`
NodeID string `json:"node_id,omitempty"`
Data map[string]any `json:"data,omitempty"`
}
ProgressEntry is a JSON-serializable record written as one line in the NDJSON log.
type ProgressLogger ¶
type ProgressLogger struct {
WriteErrors int // count of write errors encountered (for diagnostics)
// contains filtered or unexported fields
}
ProgressLogger writes engine events to an append-only NDJSON file and maintains a live.json snapshot reflecting current pipeline state.
func NewProgressLogger ¶
func NewProgressLogger(dir string) (*ProgressLogger, error)
NewProgressLogger creates a progress logger that writes to the given directory. It opens progress.ndjson for appending and writes an initial live.json with pending status.
func (*ProgressLogger) Close ¶
func (p *ProgressLogger) Close() error
Close closes the underlying NDJSON file. After Close, HandleEvent becomes a no-op.
func (*ProgressLogger) HandleEvent ¶
func (p *ProgressLogger) HandleEvent(evt EngineEvent)
HandleEvent converts an EngineEvent to a ProgressEntry, appends it to the NDJSON file, updates the live state, and atomically rewrites live.json. This method signature matches EngineConfig.EventHandler so it can be wired directly.
func (*ProgressLogger) State ¶
func (p *ProgressLogger) State() LiveState
State returns a copy of the current live state.
type Question ¶
type Question struct {
ID string
Text string
Options []string // empty means free-text
Default string // default answer if timeout
Metadata map[string]string // arbitrary key-value pairs
}
Question represents a structured question for human review.
type QueueInterviewer ¶
type QueueInterviewer struct {
// contains filtered or unexported fields
}
QueueInterviewer reads answers from a pre-filled queue (FIFO order). Intended for deterministic testing and replay scenarios.
func NewQueueInterviewer ¶
func NewQueueInterviewer(answers ...string) *QueueInterviewer
NewQueueInterviewer creates a QueueInterviewer pre-loaded with the given answers.
type RecordingInterviewer ¶
type RecordingInterviewer struct {
// contains filtered or unexported fields
}
RecordingInterviewer wraps another Interviewer and records all Q&A pairs. Useful for replay, debugging, and audit trails.
func NewRecordingInterviewer ¶
func NewRecordingInterviewer(inner Interviewer) *RecordingInterviewer
NewRecordingInterviewer wraps the given inner Interviewer with recording capability.
func (*RecordingInterviewer) Ask ¶
func (r *RecordingInterviewer) Ask(ctx context.Context, question string, options []string) (string, error)
Ask delegates to the inner Interviewer and records the Q&A pair.
func (*RecordingInterviewer) Recordings ¶
func (r *RecordingInterviewer) Recordings() []QAPair
Recordings returns a copy of all recorded Q&A pairs.
type RestartConfig ¶
type RestartConfig struct {
MaxRestarts int // maximum number of restarts before giving up (default 5)
}
RestartConfig controls loop restart behavior.
func DefaultRestartConfig ¶
func DefaultRestartConfig() *RestartConfig
DefaultRestartConfig returns a RestartConfig with sensible defaults.
type RetentionConfig ¶
type RetentionConfig struct {
MaxAge time.Duration // prune runs older than this; 0 means no age limit
MaxRuns int // keep at most this many runs; 0 means unlimited
}
RetentionConfig specifies how long and how many runs to retain.
func (RetentionConfig) PruneByMaxRuns ¶
func (rc RetentionConfig) PruneByMaxRuns(sink LogSink) (int, error)
PruneByMaxRuns removes the oldest runs that exceed the MaxRuns limit. Runs are sorted by start time; the oldest beyond the limit are deleted. Returns the number of runs pruned.
type RetryPolicy ¶
type RetryPolicy struct {
MaxAttempts int // minimum 1 (1 = no retries)
Backoff BackoffConfig
ShouldRetry func(error) bool
}
RetryPolicy controls how many times a node execution is retried on failure.
func RetryPolicyAggressive ¶
func RetryPolicyAggressive() RetryPolicy
RetryPolicyAggressive returns a policy with 5 attempts and a higher initial delay.
func RetryPolicyLinear ¶
func RetryPolicyLinear() RetryPolicy
RetryPolicyLinear returns a policy with 3 attempts and constant delay (factor=1.0).
func RetryPolicyNone ¶
func RetryPolicyNone() RetryPolicy
RetryPolicyNone returns a policy with no retries (single attempt).
func RetryPolicyPatient ¶
func RetryPolicyPatient() RetryPolicy
RetryPolicyPatient returns a policy with 3 attempts, high initial delay, and steep backoff.
func RetryPolicyStandard ¶
func RetryPolicyStandard() RetryPolicy
RetryPolicyStandard returns a standard retry policy with 5 attempts and exponential backoff.
type RunDirectory ¶
RunDirectory represents the directory structure for a single pipeline run.
func NewRunDirectory ¶
func NewRunDirectory(baseDir, runID string) (*RunDirectory, error)
NewRunDirectory creates a new run directory structure at baseDir/runID.
func (*RunDirectory) EnsureNodeDir ¶
func (rd *RunDirectory) EnsureNodeDir(nodeID string) error
EnsureNodeDir creates the directory for a node if it doesn't exist.
func (*RunDirectory) ListNodeArtifacts ¶
func (rd *RunDirectory) ListNodeArtifacts(nodeID string) ([]string, error)
ListNodeArtifacts returns the filenames of all artifacts for a node.
func (*RunDirectory) LoadCheckpoint ¶
func (rd *RunDirectory) LoadCheckpoint() (*Checkpoint, error)
LoadCheckpoint loads a checkpoint from checkpoint.json in the run directory.
func (*RunDirectory) NodeDir ¶
func (rd *RunDirectory) NodeDir(nodeID string) string
NodeDir returns the path for a node's artifact directory. The nodeID is sanitized to prevent path traversal attacks.
func (*RunDirectory) ReadNodeArtifact ¶
func (rd *RunDirectory) ReadNodeArtifact(nodeID, filename string) ([]byte, error)
ReadNodeArtifact reads data from a file within a node's directory.
func (*RunDirectory) SaveCheckpoint ¶
func (rd *RunDirectory) SaveCheckpoint(cp *Checkpoint) error
SaveCheckpoint saves a checkpoint to checkpoint.json in the run directory.
func (*RunDirectory) WriteNodeArtifact ¶
func (rd *RunDirectory) WriteNodeArtifact(nodeID, filename string, data []byte) error
WriteNodeArtifact writes data to a file within a node's directory.
func (*RunDirectory) WritePrompt ¶
func (rd *RunDirectory) WritePrompt(nodeID, prompt string) error
WritePrompt writes a prompt to prompt.md in a node's directory.
func (*RunDirectory) WriteResponse ¶
func (rd *RunDirectory) WriteResponse(nodeID, response string) error
WriteResponse writes a response to response.md in a node's directory.
type RunIndex ¶
type RunIndex struct {
Runs map[string]RunIndexEntry `json:"runs"`
Updated time.Time `json:"updated"`
}
RunIndex is the top-level structure persisted as index.json in the store root. It maps run IDs to their metadata for fast enumeration and filtering.
type RunIndexEntry ¶
type RunIndexEntry struct {
ID string `json:"id"`
Status string `json:"status"`
StartTime time.Time `json:"start_time"`
EventCount int `json:"event_count"`
}
RunIndexEntry holds metadata about a single run for fast lookup without scanning the full directory tree.
type RunResult ¶
type RunResult struct {
FinalOutcome *Outcome
CompletedNodes []string
NodeOutcomes map[string]*Outcome
Context *Context
}
RunResult holds the final state of a completed pipeline execution.
type RunState ¶
type RunState struct {
ID string `json:"id"`
PipelineFile string `json:"pipeline_file"`
Status string `json:"status"` // "running", "completed", "failed", "cancelled"
Source string `json:"source,omitempty"`
SourceHash string `json:"source_hash,omitempty"`
StartedAt time.Time `json:"started_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
CurrentNode string `json:"current_node"`
CompletedNodes []string `json:"completed_nodes"`
Context map[string]any `json:"context"`
Events []EngineEvent `json:"events"`
Error string `json:"error,omitempty"`
}
RunState represents the full state of a single pipeline run.
type RunStateStore ¶
type RunStateStore interface {
Create(state *RunState) error
Get(id string) (*RunState, error)
Update(state *RunState) error
List() ([]*RunState, error)
AddEvent(id string, event EngineEvent) error
}
RunStateStore is the interface for persisting and retrieving pipeline run state.
type StageStatus ¶
type StageStatus string
StageStatus represents the outcome of executing a node.
const ( StatusSuccess StageStatus = "success" StatusFail StageStatus = "fail" StatusPartialSuccess StageStatus = "partial_success" StatusRetry StageStatus = "retry" StatusSkipped StageStatus = "skipped" )
type StartHandler ¶
type StartHandler struct{}
StartHandler handles the pipeline entry point node (shape=Mdiamond). It performs no work beyond recording the start time and returning success.
func (*StartHandler) Execute ¶
func (h *StartHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)
Execute initializes context with a start timestamp and returns success.
func (*StartHandler) Type ¶
func (h *StartHandler) Type() string
Type returns the handler type string "start".
type StyleRule ¶
StyleRule represents a single CSS-like rule with a selector, properties, and specificity.
type Stylesheet ¶
type Stylesheet struct {
Rules []StyleRule
}
Stylesheet is a collection of style rules parsed from a CSS-like DSL.
func ParseStylesheet ¶
func ParseStylesheet(input string) (*Stylesheet, error)
ParseStylesheet parses a CSS-like stylesheet string into a Stylesheet. Supported selectors: * (universal, specificity=0), .class (specificity=1), #id (specificity=2).
func (*Stylesheet) Apply ¶
func (ss *Stylesheet) Apply(g *Graph)
Apply applies the stylesheet rules to all nodes in the graph. Higher specificity rules override lower ones. Explicit node attributes override all.
type StylesheetApplicationTransform ¶
type StylesheetApplicationTransform struct{}
StylesheetApplicationTransform applies the model_stylesheet graph attribute to all nodes using CSS-like specificity rules.
func (*StylesheetApplicationTransform) Apply ¶
func (t *StylesheetApplicationTransform) Apply(g *Graph) *Graph
Apply parses and applies the model_stylesheet from graph attributes.
type SubPipelineTransform ¶
type SubPipelineTransform struct{}
SubPipelineTransform is a Transform that scans for nodes with a "sub_pipeline" attribute, loads the referenced DOT file, and composes it into the graph. The insert node's ID is used as the namespace to avoid conflicts.
func (*SubPipelineTransform) Apply ¶
func (t *SubPipelineTransform) Apply(g *Graph) *Graph
Apply scans the graph for nodes with a sub_pipeline attribute and inlines the referenced child graphs. If a sub-pipeline file cannot be loaded or composed, the node is left intact and the error is silently skipped.
type TokenUsage ¶
type TokenUsage struct {
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
TotalTokens int `json:"total_tokens"`
ReasoningTokens int `json:"reasoning_tokens"`
CacheReadTokens int `json:"cache_read_tokens"`
CacheWriteTokens int `json:"cache_write_tokens"`
}
TokenUsage tracks granular token consumption across an agent run, broken down by input, output, reasoning, and cache categories.
func (TokenUsage) Add ¶
func (u TokenUsage) Add(other TokenUsage) TokenUsage
Add combines two TokenUsage values by summing all fields.
type ToolCallEntry ¶
type ToolCallEntry struct {
ToolName string `json:"tool_name"`
CallID string `json:"call_id"`
Duration time.Duration `json:"duration"`
Output string `json:"output"` // truncated to 500 chars
}
ToolCallEntry records details about a single tool invocation during an agent run.
type ToolHandler ¶
type ToolHandler struct{}
ToolHandler handles external tool execution nodes (shape=parallelogram). It reads the command from node attributes, executes it via the system shell, and captures stdout, stderr, and exit code.
func (*ToolHandler) Execute ¶
func (h *ToolHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)
Execute runs the shell command specified in the node's "command" (or "prompt") attribute. It configures timeout, working directory, and environment variables from node attributes, then returns an Outcome with stdout, stderr, and exit code.
func (*ToolHandler) Type ¶
func (h *ToolHandler) Type() string
Type returns the handler type string "tool".
type Transform ¶
Transform is the interface for AST transformations applied to a parsed graph.
func DefaultTransforms ¶
func DefaultTransforms() []Transform
DefaultTransforms returns the standard ordered transform chain.
type VariableExpansionTransform ¶
type VariableExpansionTransform struct{}
VariableExpansionTransform expands $variable references in node attributes using graph-level attribute values.
func (*VariableExpansionTransform) Apply ¶
func (t *VariableExpansionTransform) Apply(g *Graph) *Graph
Apply expands $variable references in node prompt and label attributes.
type WaitForHumanHandler ¶
type WaitForHumanHandler struct {
// Interviewer is the human interaction frontend. If nil, the handler
// returns a failure indicating no interviewer is available.
Interviewer Interviewer
}
WaitForHumanHandler handles human gate nodes (shape=hexagon). It presents choices derived from outgoing edges to a human via the Interviewer interface and returns their selection.
func (*WaitForHumanHandler) Execute ¶
func (h *WaitForHumanHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)
Execute presents choices to a human and returns their selection. Choices are derived from outgoing edges of the node.
Supports optional node attributes:
- timeout: Duration string (e.g. "5m", "1h") limiting how long to wait for human input.
- default_choice: Edge label to select if the timeout expires.
- reminder_interval: Duration string for periodic re-prompting (parsed and validated, but only effective if the Interviewer implementation supports it).
Context updates always include human.timed_out (bool) and human.response_time_ms (int64).
func (*WaitForHumanHandler) Type ¶
func (h *WaitForHumanHandler) Type() string
Type returns the handler type string "wait.human".
type Watchdog ¶
type Watchdog struct {
// contains filtered or unexported fields
}
Watchdog monitors active pipeline nodes and emits EventStageStalled warnings when a node has not made progress within the configured StallTimeout. It never cancels execution -- it is purely an observability tool.
func NewWatchdog ¶
func NewWatchdog(cfg WatchdogConfig, eventHandler func(EngineEvent)) *Watchdog
NewWatchdog creates a Watchdog with the given config and event handler. The event handler is called (from the watchdog goroutine) whenever a stall warning is emitted.
func (*Watchdog) ActiveNodes ¶
ActiveNodes returns a slice of node IDs currently being tracked. The order is non-deterministic.
func (*Watchdog) HandleEvent ¶
func (w *Watchdog) HandleEvent(evt EngineEvent)
HandleEvent is a convenience method that routes engine events to NodeStarted or NodeFinished based on event type. This lets the watchdog be composed with another event handler in a pipeline's EventHandler chain.
func (*Watchdog) NodeFinished ¶
NodeFinished removes a node from active tracking. After this call the watchdog will no longer consider the node for stall detection.
func (*Watchdog) NodeStarted ¶
NodeStarted records that a node has become active. If the node was previously tracked and warned, the warning state is reset so a new stall can be detected if the node stalls again.
type WatchdogConfig ¶
type WatchdogConfig struct {
StallTimeout time.Duration // how long before a node is considered stalled
CheckInterval time.Duration // how often to check for stalls
}
WatchdogConfig holds configuration for the stall-detection watchdog.
func DefaultWatchdogConfig ¶
func DefaultWatchdogConfig() WatchdogConfig
DefaultWatchdogConfig returns a WatchdogConfig with sensible defaults: 5 minute stall timeout and 10 second check interval.
Source Files
¶
- artifact.go
- backend.go
- backend_agent.go
- backend_claude_code.go
- checkpoint.go
- compat.go
- conditions.go
- context.go
- edge_selection.go
- engine.go
- eventlog.go
- failure_signature.go
- fidelity.go
- fidelity_transform.go
- handlers.go
- handlers_codergen.go
- handlers_conditional.go
- handlers_exit.go
- handlers_fanin.go
- handlers_human.go
- handlers_manager.go
- handlers_parallel.go
- handlers_start.go
- handlers_tool.go
- interviewer.go
- logsink.go
- parallel.go
- preflight.go
- progress.go
- restart.go
- retry.go
- rundir.go
- runstate.go
- runstate_fs.go
- server.go
- server_ui.go
- sourcehash.go
- stylesheet.go
- subpipeline.go
- transforms.go
- validate.go
- watchdog.go