attractor

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: MIT Imports: 34 Imported by: 0

Documentation

Overview

ABOUTME: Named, typed artifact storage for large pipeline outputs. ABOUTME: Transparently file-backs artifacts exceeding a size threshold (default 100KB).

ABOUTME: Defines the CodergenBackend interface that decouples CodergenHandler from the agent loop. ABOUTME: Provides AgentRunConfig and AgentRunResult types for configuring and receiving agent outcomes.

ABOUTME: AgentBackend wires CodergenBackend to the real agent loop and LLM SDK. ABOUTME: Creates agent sessions, configures provider profiles, and runs ProcessInput for each codergen node.

ABOUTME: ClaudeCodeBackend shells out to the `claude` CLI for codergen pipeline nodes. ABOUTME: Parses streaming JSONL output to extract results, token usage, and OUTCOME markers.

ABOUTME: Checkpoint serialization for persisting execution state to disk. ABOUTME: Supports JSON save/load for resuming pipeline runs from a known point.

ABOUTME: Type aliases bridging attractor/ to the consolidated dot/ package. ABOUTME: Provides backward-compatible Graph, Node, Edge, Subgraph types and Parse wrapper.

ABOUTME: Condition expression language for edge guards in the pipeline graph. ABOUTME: Evaluates clauses like "outcome = success && context.mode = prod" against Outcome and Context.

ABOUTME: Thread-safe key-value context store shared across pipeline stages. ABOUTME: Also defines StageStatus and Outcome types for node execution results.

ABOUTME: Edge selection algorithm for choosing the next edge during pipeline graph traversal. ABOUTME: Implements five-step priority: condition match > preferred label > suggested IDs > weight > lexical.

ABOUTME: Pipeline execution engine implementing the 5-phase lifecycle: PARSE, VALIDATE, INITIALIZE, EXECUTE, FINALIZE. ABOUTME: Orchestrates graph traversal, handler dispatch, retry logic, checkpointing, and edge selection.

ABOUTME: Query API for the append-only event log stored in RunStateStore. ABOUTME: Provides filtering, pagination, counting, tailing, and summarization of EngineEvents.

ABOUTME: Normalizes error messages into stable signatures for detecting repeat deterministic failures. ABOUTME: Replaces hex strings, UUIDs, numbers, timestamps, and file paths with placeholders.

ABOUTME: Context fidelity modes controlling how much context is carried between pipeline nodes. ABOUTME: Implements precedence resolution: edge > node > graph default > "compact".

ABOUTME: Applies fidelity-based context compaction and generates preamble strings. ABOUTME: Transforms pipeline context based on the fidelity mode before passing to the next node.

ABOUTME: Common handler interface, registry, and shape-to-type mapping for the attractor pipeline runner. ABOUTME: All 9 built-in node handlers implement NodeHandler and are registered via DefaultHandlerRegistry.

ABOUTME: Codergen (LLM coding agent) handler for the attractor pipeline runner. ABOUTME: Delegates to a CodergenBackend for actual LLM execution; returns error when backend is nil.

ABOUTME: Conditional branching handler for the attractor pipeline runner. ABOUTME: Returns success for diamond-shaped routing nodes; actual routing is handled by the engine's edge selection.

ABOUTME: Exit node handler for the attractor pipeline runner. ABOUTME: Captures the final pipeline state and returns success at the terminal node.

ABOUTME: Parallel fan-in handler for the attractor pipeline runner. ABOUTME: Waits for all incoming parallel branches to complete and merges their results.

ABOUTME: Wait for human handler for the attractor pipeline runner. ABOUTME: Presents choices derived from outgoing edges to a human via the Interviewer interface.

ABOUTME: Stack manager loop handler implementing observe/guard/steer supervision for child pipelines. ABOUTME: Uses the ManagerBackend interface for LLM-powered supervision; nil backend falls back to stub logging.

ABOUTME: Parallel fan-out handler for the attractor pipeline runner. ABOUTME: Records outgoing branches for concurrent execution by the engine.

ABOUTME: Start node handler for the attractor pipeline runner. ABOUTME: Initializes pipeline execution by recording a start timestamp and returning success.

ABOUTME: Tool handler that executes shell commands via os/exec for the attractor pipeline. ABOUTME: Supports timeout, working directory, env vars, and stores large output as artifacts.

ABOUTME: Interviewer interface and built-in implementations for human-in-the-loop interaction. ABOUTME: Provides AutoApprove, Callback, Queue, Recording, and Console interviewers.

ABOUTME: Defines the LogSink interface for structured event storage with query, retention, and indexing. ABOUTME: Provides FSLogSink, a filesystem-backed implementation wrapping FSRunStateStore and FSEventQuery.

ABOUTME: Parallel branch execution and context merging for concurrent pipeline fan-out/fan-in. ABOUTME: Provides ExecuteParallelBranches, MergeContexts, and supporting types for the engine.

ABOUTME: Pre-execution validation that checks provider accessibility and pipeline requirements. ABOUTME: Runs before the engine starts to provide fast, clear failure messages.

ABOUTME: Append-only NDJSON event logger for pipeline execution observability. ABOUTME: Writes engine events to a .ndjson file and maintains a live.json status snapshot.

ABOUTME: Loop restart handling for edges with loop_restart=true attribute. ABOUTME: Provides ErrLoopRestart sentinel, EdgeHasLoopRestart check, RestartConfig, and restart wrapper logic.

ABOUTME: Retry policy configuration and exponential backoff delay calculation for pipeline node execution. ABOUTME: Provides preset policies (none, standard, aggressive, linear, patient) and helper functions.

ABOUTME: RunDirectory manages the per-run directory layout for pipeline executions. ABOUTME: Provides structured storage for node artifacts, prompts, responses, and checkpoints.

ABOUTME: Defines RunState types and the RunStateStore interface for tracking pipeline run lifecycle. ABOUTME: Provides run ID generation using crypto/rand and the core data model for persistent run tracking.

ABOUTME: Filesystem-backed implementation of RunStateStore for persisting pipeline run state. ABOUTME: Stores each run in a directory with manifest.json, context.json, and an append-only events.jsonl log.

ABOUTME: HTTP server for managing pipeline execution via REST API with SSE streaming. ABOUTME: Provides endpoints for submitting, querying, cancelling pipelines, and human-in-the-loop Q&A.

ABOUTME: HTMX web frontend for the PipelineServer dashboard and pipeline detail views. ABOUTME: Uses go:embed for HTML templates and serves a browser-friendly UI alongside the JSON API.

ABOUTME: Computes a content-addressable hash of pipeline DOT source for matching runs. ABOUTME: Uses SHA-256 with no normalization — any byte change produces a different hash.

ABOUTME: CSS-like model stylesheet parser and applicator for assigning LLM properties to graph nodes. ABOUTME: Supports universal (*), class (.name), and ID (#name) selectors with specificity-based resolution.

ABOUTME: Sub-pipeline composition for inlining child DOT graphs into a parent pipeline. ABOUTME: Provides LoadSubPipeline, ComposeGraphs, and SubPipelineTransform for graph merging with namespace isolation.

ABOUTME: AST transforms applied between parsing and validation for the pipeline graph. ABOUTME: Implements variable expansion ($goal) and stylesheet application as a transform chain.

ABOUTME: Pipeline validation rules that check graph structure and node/edge attributes for correctness. ABOUTME: Provides a pluggable LintRule interface, built-in rules, Validate, and ValidateOrError functions.

ABOUTME: Background watchdog that detects stalled pipeline stages via progress timestamps. ABOUTME: Emits warning events when a node exceeds its configured stall timeout without progress.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultShouldRetry

func DefaultShouldRetry(err error) bool

DefaultShouldRetry returns true for most errors as a simple default retry predicate. Returns false for nil errors.

func EdgeHasLoopRestart

func EdgeHasLoopRestart(edge *Edge) bool

EdgeHasLoopRestart returns true if the edge has the loop_restart attribute set to "true".

func EvaluateCondition

func EvaluateCondition(condition string, outcome *Outcome, ctx *Context) bool

EvaluateCondition evaluates a condition expression against an outcome and context. Condition grammar: Clause ('&&' Clause)* Clause: Key Operator Literal Key: 'outcome' | 'preferred_label' | 'context.' Path | bare identifier Operator: '=' | '!=' An empty or whitespace-only condition evaluates to true (unconditional edge).

func FailureSignature

func FailureSignature(msg string) string

FailureSignature returns a deterministic signature for the given error message. Messages that differ only in variable content (UUIDs, timestamps, line numbers, etc.) will produce identical signatures.

func GeneratePreamble

func GeneratePreamble(prevNode string, mode FidelityMode, removedKeys int) string

GeneratePreamble produces a human-readable string describing what fidelity transformation was applied when transitioning from a previous node.

func GenerateRunID

func GenerateRunID() (string, error)

GenerateRunID produces a random 16-character hex string (8 bytes of entropy).

func HasCodergenNodes

func HasCodergenNodes(graph *Graph) bool

HasCodergenNodes returns true if the graph contains any nodes that would be resolved to the codergen handler. This mirrors the resolution logic in HandlerRegistry.Resolve: explicit type attribute first, then shape-based mapping, then default to codergen.

func IsValidFidelity

func IsValidFidelity(mode string) bool

IsValidFidelity checks if a string is a valid fidelity mode.

func MergeContexts

func MergeContexts(parent *Context, branches []BranchResult, policy string) error

MergeContexts merges branch results back into the parent context according to the specified join policy. All merge operations are logged to the parent context's log for visibility, including which branch wrote which values and conflict resolution via last-write-wins. Artifact references from branch contexts are consolidated into a parallel.artifacts manifest.

Policies:

  • "wait_all": All branches must succeed. Returns error if any branch failed. Merges all branch contexts into parent (last-write-wins for conflicts).
  • "wait_any": At least one branch must succeed. Only successful branches are merged. Returns error if all branches failed.
  • "k_of_n": At least K branches must succeed (K read from parent context key "parallel.k_required", defaults to N). Only successful branches merged.
  • "quorum": A strict majority (>50%) of branches must succeed. Only successful branches are merged.

func NodeIDFromContext

func NodeIDFromContext(ctx context.Context) string

NodeIDFromContext extracts the pipeline node ID from the context. Returns an empty string when no node ID is present.

func NormalizeFailure

func NormalizeFailure(msg string) string

NormalizeFailure takes an error message and replaces variable content (UUIDs, hex strings, timestamps, file paths, numbers) with stable placeholders. This allows structural comparison of errors that differ only in runtime-specific values.

func NormalizeLabel

func NormalizeLabel(label string) string

NormalizeLabel lowercases a label, trims whitespace, and strips accelerator prefixes like "[Y] ", "Y) ", "Y - " that are used for keyboard shortcuts in human interaction nodes.

func ShapeToHandlerType

func ShapeToHandlerType(shape string) string

ShapeToHandlerType returns the handler type string for a given Graphviz shape. Unknown shapes default to "codergen" (the LLM handler).

func SourceHash

func SourceHash(source string) string

SourceHash returns the lowercase hex-encoded SHA-256 hash of the raw source bytes. No normalization is applied: if the file changed at all, the hash changes.

func ValidFidelityModes

func ValidFidelityModes() []string

ValidFidelityModes returns the list of valid fidelity mode strings.

func ValidateConditionSyntax

func ValidateConditionSyntax(condition string) bool

ValidateConditionSyntax checks whether a condition string is syntactically valid. Returns true if the condition can be parsed, false otherwise.

func WithNodeID

func WithNodeID(ctx context.Context, nodeID string) context.Context

WithNodeID attaches a pipeline node ID to the context. The node ID can later be extracted with NodeIDFromContext.

Types

type AgentBackend

type AgentBackend struct {
	// Client is the LLM client to use for making API calls. If nil,
	// a client is created from environment variables at runtime.
	Client *llm.Client
}

AgentBackend implements CodergenBackend by wiring to the real agent loop. It creates an agent.Session, configures the appropriate provider profile, sets up the execution environment, and runs the agent loop to completion.

func (*AgentBackend) RunAgent

func (b *AgentBackend) RunAgent(ctx context.Context, config AgentRunConfig) (*AgentRunResult, error)

RunAgent executes an agent loop with the given configuration. It creates a session, selects the provider profile, sets up the execution environment, and runs ProcessInput until the agent completes or hits limits.

type AgentRunConfig

type AgentRunConfig struct {
	Prompt       string            // the prompt/instructions for the LLM
	Model        string            // LLM model name (e.g., "claude-sonnet-4-5")
	Provider     string            // LLM provider name (e.g., "anthropic", "openai", "gemini")
	BaseURL      string            // custom API base URL (overrides provider default)
	WorkDir      string            // working directory for file operations and commands
	Goal         string            // pipeline-level goal for additional context
	NodeID       string            // pipeline node identifier for logging/tracking
	MaxTurns     int               // maximum agent loop turns (0 = use default of 20)
	FidelityMode string            // fidelity mode controlling conversation history management ("full", "compact", "truncate", "summary:*")
	SystemPrompt string            // user override appended to the agent's system prompt (empty = no override)
	EventHandler func(EngineEvent) // engine event callback for agent-level observability
}

AgentRunConfig holds all configuration needed to execute an agent run for a single codergen pipeline node.

type AgentRunResult

type AgentRunResult struct {
	Output      string          // final text output from the agent
	ToolCalls   int             // total number of tool calls made during the run
	TokensUsed  int             // total tokens consumed across all LLM calls
	Success     bool            // whether the agent completed without errors
	ToolCallLog []ToolCallEntry // individual tool call details
	TurnCount   int             // LLM call rounds
	Usage       TokenUsage      // granular per-category token breakdown
}

AgentRunResult holds the outcome of an agent run.

type ArtifactInfo

type ArtifactInfo struct {
	ID           string
	Name         string
	SizeBytes    int
	StoredAt     time.Time
	IsFileBacked bool
}

ArtifactInfo describes a stored artifact.

type ArtifactStore

type ArtifactStore struct {
	// contains filtered or unexported fields
}

ArtifactStore provides named, typed storage for large outputs.

func NewArtifactStore

func NewArtifactStore(baseDir string) *ArtifactStore

NewArtifactStore creates a new artifact store rooted at the given directory.

func (*ArtifactStore) BaseDir

func (s *ArtifactStore) BaseDir() string

BaseDir returns the base directory for artifact storage.

func (*ArtifactStore) Clear

func (s *ArtifactStore) Clear() error

Clear removes all artifacts, including any file-backed data on disk.

func (*ArtifactStore) Has

func (s *ArtifactStore) Has(id string) bool

Has checks whether an artifact with the given ID exists.

func (*ArtifactStore) List

func (s *ArtifactStore) List() []ArtifactInfo

List returns metadata for all stored artifacts.

func (*ArtifactStore) Remove

func (s *ArtifactStore) Remove(id string) error

Remove deletes an artifact by ID. File-backed artifacts have their disk file removed.

func (*ArtifactStore) Retrieve

func (s *ArtifactStore) Retrieve(id string) ([]byte, error)

Retrieve returns the data for the given artifact ID.

func (*ArtifactStore) Store

func (s *ArtifactStore) Store(id, name string, data []byte) (*ArtifactInfo, error)

Store saves an artifact. Large artifacts (exceeding the threshold) are written to disk.

type AutoApproveInterviewer

type AutoApproveInterviewer struct {
	// contains filtered or unexported fields
}

AutoApproveInterviewer always returns a configured answer (or the first option). Intended for testing and automated pipelines where no human is available.

func NewAutoApproveInterviewer

func NewAutoApproveInterviewer(defaultAnswer string) *AutoApproveInterviewer

NewAutoApproveInterviewer creates an AutoApproveInterviewer with the given default answer.

func (*AutoApproveInterviewer) Ask

func (a *AutoApproveInterviewer) Ask(ctx context.Context, question string, options []string) (string, error)

Ask returns the configured default answer, or the first option if no default is set.

type BackoffConfig

type BackoffConfig struct {
	InitialDelay time.Duration // default 200ms
	Factor       float64       // default 2.0
	MaxDelay     time.Duration // default 60s
	Jitter       bool          // default true
}

BackoffConfig controls delay timing between retry attempts.

func (BackoffConfig) DelayForAttempt

func (b BackoffConfig) DelayForAttempt(attempt int) time.Duration

DelayForAttempt calculates the delay for a given attempt number (0-indexed). The formula is: InitialDelay * Factor^attempt, capped at MaxDelay. If Jitter is enabled, the delay is randomized in [0, calculated_delay].

type BranchResult

type BranchResult struct {
	NodeID        string
	Outcome       *Outcome
	BranchContext *Context
	Error         error
}

BranchResult holds the outcome of executing a single parallel branch.

func ExecuteParallelBranches

func ExecuteParallelBranches(
	ctx context.Context,
	graph *Graph,
	pctx *Context,
	store *ArtifactStore,
	registry *HandlerRegistry,
	branches []string,
	config ParallelConfig,
) ([]BranchResult, error)

ExecuteParallelBranches forks the context for each branch and executes them concurrently in separate goroutines. Each branch follows edges from its start node until it reaches a fan-in node (shape=tripleoctagon) or a terminal node. A buffered channel is used as a semaphore to respect MaxParallel.

Error policies:

  • "continue": all branches run to completion regardless of failures (default).
  • "fail_fast": on the first branch error or failure outcome, cancel remaining branches.

type CallbackInterviewer

type CallbackInterviewer struct {
	// contains filtered or unexported fields
}

CallbackInterviewer delegates question answering to a provided callback function. Useful for integrating with external systems (Slack, web UI, API).

func NewCallbackInterviewer

func NewCallbackInterviewer(fn func(ctx context.Context, question string, options []string) (string, error)) *CallbackInterviewer

NewCallbackInterviewer creates a CallbackInterviewer that delegates to the given function.

func (*CallbackInterviewer) Ask

func (c *CallbackInterviewer) Ask(ctx context.Context, question string, options []string) (string, error)

Ask delegates to the callback function.

type Checkpoint

type Checkpoint struct {
	Timestamp      time.Time      `json:"timestamp"`
	CurrentNode    string         `json:"current_node"`
	CompletedNodes []string       `json:"completed_nodes"`
	NodeRetries    map[string]int `json:"node_retries"`
	ContextValues  map[string]any `json:"context_values"`
	Logs           []string       `json:"logs"`
}

Checkpoint is a serializable snapshot of execution state.

func LoadCheckpoint

func LoadCheckpoint(path string) (*Checkpoint, error)

LoadCheckpoint deserializes a checkpoint from JSON at the given path.

func NewCheckpoint

func NewCheckpoint(ctx *Context, currentNode string, completedNodes []string, nodeRetries map[string]int) *Checkpoint

NewCheckpoint creates a checkpoint from the current execution state.

func (*Checkpoint) Save

func (cp *Checkpoint) Save(path string) error

Save serializes the checkpoint to JSON and writes it to the given path.

type ClaudeCodeBackend

type ClaudeCodeBackend struct {
	BinaryPath         string   // resolved via exec.LookPath("claude") if empty
	DefaultModel       string   // falls back to "" (let claude pick)
	AllowedTools       []string // e.g. ["Bash","Read","Edit","Write","Glob","Grep"]
	SkipPermissions    bool     // default: true (required for autonomous pipelines)
	AppendSystemPrompt string   // appended to claude's default system prompt
	MaxBudgetUSD       float64  // maximum dollar spend per run (0 = no limit)
}

ClaudeCodeBackend implements CodergenBackend by shelling out to the `claude` CLI. It uses --print --output-format stream-json to get streaming JSONL output with real token breakdowns, then parses the result events.

The claude CLI does not support a --max-turns flag. AgentRunConfig.MaxTurns is not honored by this backend. Use --max-budget-usd for cost control instead.

func NewClaudeCodeBackend

func NewClaudeCodeBackend(opts ...ClaudeCodeOption) (*ClaudeCodeBackend, error)

NewClaudeCodeBackend creates a ClaudeCodeBackend with the given options. By default it resolves the "claude" binary from PATH and enables SkipPermissions (required for non-interactive pipeline execution).

func (*ClaudeCodeBackend) RunAgent

func (b *ClaudeCodeBackend) RunAgent(ctx context.Context, config AgentRunConfig) (*AgentRunResult, error)

RunAgent executes the claude CLI with the given configuration and parses the streaming JSONL output to build an AgentRunResult.

type ClaudeCodeOption

type ClaudeCodeOption func(*ClaudeCodeBackend)

ClaudeCodeOption configures a ClaudeCodeBackend via functional options.

func WithClaudeAllowedTools

func WithClaudeAllowedTools(tools []string) ClaudeCodeOption

WithClaudeAllowedTools sets the allowed tools list for claude CLI invocations.

func WithClaudeAppendSystemPrompt

func WithClaudeAppendSystemPrompt(prompt string) ClaudeCodeOption

WithClaudeAppendSystemPrompt sets additional system prompt text.

func WithClaudeBinaryPath

func WithClaudeBinaryPath(path string) ClaudeCodeOption

WithClaudeBinaryPath sets the path to the claude binary.

func WithClaudeMaxBudgetUSD

func WithClaudeMaxBudgetUSD(budget float64) ClaudeCodeOption

WithClaudeMaxBudgetUSD sets the maximum dollar spend per invocation.

func WithClaudeModel

func WithClaudeModel(model string) ClaudeCodeOption

WithClaudeModel sets the default model for claude CLI invocations.

func WithClaudeSkipPermissions

func WithClaudeSkipPermissions(skip bool) ClaudeCodeOption

WithClaudeSkipPermissions controls whether --dangerously-skip-permissions is passed.

type CodergenBackend

type CodergenBackend interface {
	// RunAgent executes an agent loop with the given configuration and returns
	// the result. The context controls cancellation and timeouts.
	RunAgent(ctx context.Context, config AgentRunConfig) (*AgentRunResult, error)
}

CodergenBackend abstracts the LLM agent execution so that CodergenHandler does not depend directly on the agent or llm packages.

type CodergenHandler

type CodergenHandler struct {
	// Backend is the agent execution backend. When nil, the handler
	// returns StatusFail indicating no LLM backend is configured.
	Backend CodergenBackend

	// BaseURL is the default API base URL for the LLM provider. Set by the
	// engine during backend wiring. Can be overridden per-node via base_url attr.
	BaseURL string

	// EventHandler receives agent-level observability events bridged from the
	// agent session into the engine event system. Set by the engine during
	// backend wiring.
	EventHandler func(EngineEvent)
}

CodergenHandler handles LLM-powered coding task nodes (shape=box). This is the default handler for nodes without an explicit type. When Backend is set, it delegates to the agent loop for real LLM execution. When Backend is nil, it returns StatusFail with a configuration error.

func (*CodergenHandler) Execute

func (h *CodergenHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)

Execute processes a codergen node by reading its prompt, label, model, and provider. If a Backend is configured, it runs the agent loop for real LLM execution. Otherwise, it falls back to stub behavior.

func (*CodergenHandler) Type

func (h *CodergenHandler) Type() string

Type returns the handler type string "codergen".

type ConditionalHandler

type ConditionalHandler struct{}

ConditionalHandler handles conditional routing nodes (shape=diamond). The handler itself is a no-op that returns success. The actual routing is handled by the execution engine's edge selection algorithm, which evaluates conditions on outgoing edges using EvaluateCondition.

func (*ConditionalHandler) Execute

func (h *ConditionalHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)

Execute returns success with a note describing the conditional evaluation. Edge condition evaluation and routing are performed by the engine after this handler returns.

func (*ConditionalHandler) Type

func (h *ConditionalHandler) Type() string

Type returns the handler type string "conditional".

type ConsoleInterviewer

type ConsoleInterviewer struct {
	// contains filtered or unexported fields
}

ConsoleInterviewer reads answers from an io.Reader and writes prompts to an io.Writer. By default, uses os.Stdin and os.Stdout.

func NewConsoleInterviewer

func NewConsoleInterviewer() *ConsoleInterviewer

NewConsoleInterviewer creates a ConsoleInterviewer using os.Stdin and os.Stdout.

func NewConsoleInterviewerWithIO

func NewConsoleInterviewerWithIO(r io.Reader, w io.Writer) *ConsoleInterviewer

NewConsoleInterviewerWithIO creates a ConsoleInterviewer with configurable reader and writer.

func (*ConsoleInterviewer) Ask

func (c *ConsoleInterviewer) Ask(ctx context.Context, question string, options []string) (string, error)

Ask prints the question and options, then reads a line from the reader. If options are provided, validates the answer is one of the options.

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context is a thread-safe key-value store shared across pipeline stages.

func ApplyFidelity

func ApplyFidelity(pctx *Context, mode FidelityMode, opts FidelityOptions) (*Context, string)

ApplyFidelity applies the given fidelity mode to the context and returns the transformed context and a preamble string describing what was done.

func NewContext

func NewContext() *Context

NewContext creates a new empty Context.

func (*Context) AppendLog

func (c *Context) AppendLog(entry string)

AppendLog adds an entry to the context's log.

func (*Context) ApplyUpdates

func (c *Context) ApplyUpdates(updates map[string]any)

ApplyUpdates merges the given key-value pairs into the context.

func (*Context) Clone

func (c *Context) Clone() *Context

Clone creates a deep copy of the Context with independent values and logs.

func (*Context) Get

func (c *Context) Get(key string) any

Get retrieves the value for the given key, or nil if not found.

func (*Context) GetString

func (c *Context) GetString(key string, defaultVal string) string

GetString retrieves the string value for the given key. If the key is missing or the value is not a string, defaultVal is returned.

func (*Context) Logs

func (c *Context) Logs() []string

Logs returns a copy of the context's log entries.

func (*Context) Set

func (c *Context) Set(key string, value any)

Set stores a value under the given key.

func (*Context) Snapshot

func (c *Context) Snapshot() map[string]any

Snapshot returns a shallow copy of all key-value pairs.

type DOTRenderFunc

type DOTRenderFunc func(ctx context.Context, dotText string, format string) ([]byte, error)

DOTRenderFunc renders raw DOT text to the specified format (svg, png).

type Diagnostic

type Diagnostic struct {
	Rule     string
	Severity Severity
	Message  string
	NodeID   string     // optional
	Edge     *[2]string // optional (from, to)
	Fix      string     // optional suggested fix
}

Diagnostic represents a validation finding.

func Validate

func Validate(g *Graph, extraRules ...LintRule) []Diagnostic

Validate runs all built-in lint rules plus any extra rules on the graph.

func ValidateOrError

func ValidateOrError(g *Graph, extraRules ...LintRule) ([]Diagnostic, error)

ValidateOrError runs validation and returns an error if any ERROR-severity diagnostics exist.

type Edge

type Edge = dot.Edge

func SelectEdge

func SelectEdge(node *Node, outcome *Outcome, ctx *Context, graph *Graph) *Edge

SelectEdge chooses the next edge from a node using five-step priority: 1. Condition-matching edges (non-empty condition that evaluates true), best by weight then lexical 2. Preferred label match (outcome.PreferredLabel matches edge label after normalization) 3. Suggested next IDs (outcome.SuggestedNextIDs matches edge.To) 4. Highest weight among unconditional edges (no condition attribute or empty condition) 5. Lexical tiebreak on To field Returns nil if no outgoing edges exist.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the pipeline execution engine that runs attractor graph pipelines.

func NewEngine

func NewEngine(config EngineConfig) *Engine

NewEngine creates a new pipeline execution engine with the given configuration.

func (*Engine) GetEventHandler

func (e *Engine) GetEventHandler() func(EngineEvent)

GetEventHandler returns the engine's current event callback, or nil if none is set.

func (*Engine) GetHandler

func (e *Engine) GetHandler(typeName string) NodeHandler

GetHandler returns the handler registered for the given type string from the engine's handler registry. Returns nil if no registry is configured or the handler type is not found. If no registry was configured, a default registry is initialized first.

func (*Engine) ResumeFromCheckpoint

func (e *Engine) ResumeFromCheckpoint(ctx context.Context, graph *Graph, checkpointPath string) (*RunResult, error)

ResumeFromCheckpoint loads a checkpoint from disk and resumes graph execution from the node after the checkpointed node. If the previous node used full fidelity, the first hop after resume is degraded to summary:high since in-memory LLM sessions cannot be serialized.

func (*Engine) Run

func (e *Engine) Run(ctx context.Context, source string) (*RunResult, error)

Run parses DOT source, then runs the resulting graph through the full 5-phase lifecycle.

func (*Engine) RunGraph

func (e *Engine) RunGraph(ctx context.Context, graph *Graph) (*RunResult, error)

RunGraph runs an already-parsed graph through the VALIDATE, INITIALIZE, EXECUTE, and FINALIZE phases.

func (*Engine) SetEventHandler

func (e *Engine) SetEventHandler(handler func(EngineEvent))

SetEventHandler sets the engine's event callback after creation. This allows external components (like the TUI) to wire into the event stream.

func (*Engine) SetHandler

func (e *Engine) SetHandler(handler NodeHandler)

SetHandler registers a handler in the engine's handler registry. If no registry was configured, a default registry is initialized first.

type EngineConfig

type EngineConfig struct {
	CheckpointDir      string            // directory for checkpoint files (empty = no checkpoints)
	AutoCheckpointPath string            // path to overwrite with latest checkpoint after each node (empty = disabled)
	ArtifactDir        string            // directory for artifact storage (empty = use ArtifactsBaseDir/<RunID>)
	ArtifactsBaseDir   string            // base directory for run directories (default = "./artifacts")
	RunID              string            // run identifier for the artifact subdirectory (empty = auto-generated)
	Transforms         []Transform       // transforms to apply (nil = DefaultTransforms)
	ExtraLintRules     []LintRule        // additional validation rules
	DefaultRetry       RetryPolicy       // default retry policy for nodes
	Handlers           *HandlerRegistry  // nil = DefaultHandlerRegistry
	EventHandler       func(EngineEvent) // optional event callback
	Backend            CodergenBackend   // backend for codergen nodes (nil = stub behavior)
	BaseURL            string            // default API base URL for codergen nodes (overridable per-node)
	RestartConfig      *RestartConfig    // loop restart configuration (nil = DefaultRestartConfig)
	DefaultNodeTimeout time.Duration     // global fallback timeout for node execution (0 = no timeout)
}

EngineConfig holds configuration for the pipeline execution engine.

type EngineEvent

type EngineEvent struct {
	Type      EngineEventType
	NodeID    string
	Data      map[string]any
	Timestamp time.Time
}

EngineEvent represents a lifecycle event emitted by the engine during pipeline execution.

type EngineEventType

type EngineEventType string

EngineEventType identifies the kind of engine lifecycle event.

const (
	EventPipelineStarted   EngineEventType = "pipeline.started"
	EventPipelineCompleted EngineEventType = "pipeline.completed"
	EventPipelineFailed    EngineEventType = "pipeline.failed"
	EventStageStarted      EngineEventType = "stage.started"
	EventStageCompleted    EngineEventType = "stage.completed"
	EventStageFailed       EngineEventType = "stage.failed"
	EventStageRetrying     EngineEventType = "stage.retrying"
	EventStageStalled      EngineEventType = "stage.stalled"
	EventCheckpointSaved   EngineEventType = "checkpoint.saved"

	// Agent-level observability events bridged from the coding agent session.
	EventAgentToolCallStart EngineEventType = "agent.tool_call.start"
	EventAgentToolCallEnd   EngineEventType = "agent.tool_call.end"
	EventAgentLLMTurn       EngineEventType = "agent.llm_turn"
	EventAgentSteering      EngineEventType = "agent.steering"
	EventAgentLoopDetected  EngineEventType = "agent.loop_detected"
)

type ErrLoopRestart

type ErrLoopRestart struct {
	TargetNode string
}

ErrLoopRestart is a sentinel error returned by executeGraph when a selected edge has the loop_restart=true attribute. It carries the target node ID so the engine can restart traversal from that node with a fresh context.

func (*ErrLoopRestart) Error

func (e *ErrLoopRestart) Error() string

Error implements the error interface.

type EventFilter

type EventFilter struct {
	Types  []EngineEventType // filter by event type(s); empty means all types
	NodeID string            // filter by specific node; empty means all nodes
	Since  *time.Time        // events at or after this time; nil means no lower bound
	Until  *time.Time        // events at or before this time; nil means no upper bound
	Limit  int               // max results; 0 means unlimited
	Offset int               // skip first N results after filtering
}

EventFilter specifies criteria for filtering engine events from a run's event log.

type EventQuery

type EventQuery interface {
	QueryEvents(runID string, filter EventFilter) ([]EngineEvent, error)
	CountEvents(runID string, filter EventFilter) (int, error)
	TailEvents(runID string, n int) ([]EngineEvent, error)
	SummarizeEvents(runID string) (*EventSummary, error)
}

EventQuery defines the interface for querying engine events from a run.

type EventQueryResponse

type EventQueryResponse struct {
	Events []EngineEvent `json:"events"`
	Total  int           `json:"total"`
}

EventQueryResponse is the JSON response for event query endpoints.

type EventSummary

type EventSummary struct {
	TotalEvents int
	ByType      map[EngineEventType]int
	ByNode      map[string]int
	FirstEvent  *time.Time
	LastEvent   *time.Time
}

EventSummary holds aggregate statistics about a run's events.

type EventSummaryResponse

type EventSummaryResponse struct {
	TotalEvents int            `json:"total_events"`
	ByType      map[string]int `json:"by_type"`
	ByNode      map[string]int `json:"by_node"`
	FirstEvent  string         `json:"first_event,omitempty"`
	LastEvent   string         `json:"last_event,omitempty"`
}

EventSummaryResponse is the JSON response for the event summary endpoint.

type EventTailResponse

type EventTailResponse struct {
	Events []EngineEvent `json:"events"`
}

EventTailResponse is the JSON response for the event tail endpoint.

type ExitHandler

type ExitHandler struct{}

ExitHandler handles the pipeline exit point node (shape=Msquare). It records the finish time and returns success. Goal gate enforcement is handled by the execution engine, not by this handler.

func (*ExitHandler) Execute

func (h *ExitHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)

Execute captures the final outcome and writes a summary to context.

func (*ExitHandler) Type

func (h *ExitHandler) Type() string

Type returns the handler type string "exit".

type FSEventQuery

type FSEventQuery struct {
	// contains filtered or unexported fields
}

FSEventQuery implements EventQuery using an FSRunStateStore as its backing store.

func NewFSEventQuery

func NewFSEventQuery(store *FSRunStateStore) *FSEventQuery

NewFSEventQuery creates a new FSEventQuery backed by the given FSRunStateStore.

func (*FSEventQuery) CountEvents

func (q *FSEventQuery) CountEvents(runID string, filter EventFilter) (int, error)

CountEvents returns the count of events matching the filter criteria. Pagination (Limit/Offset) is ignored for counting purposes.

func (*FSEventQuery) QueryEvents

func (q *FSEventQuery) QueryEvents(runID string, filter EventFilter) ([]EngineEvent, error)

QueryEvents returns events from the given run matching the filter criteria. Events are loaded from the JSONL file and filtered in memory.

func (*FSEventQuery) SummarizeEvents

func (q *FSEventQuery) SummarizeEvents(runID string) (*EventSummary, error)

SummarizeEvents produces aggregate statistics about a run's event log.

func (*FSEventQuery) TailEvents

func (q *FSEventQuery) TailEvents(runID string, n int) ([]EngineEvent, error)

TailEvents returns the last n events from the run. If there are fewer than n events, all events are returned.

type FSLogSink

type FSLogSink struct {
	// contains filtered or unexported fields
}

FSLogSink is a filesystem-backed LogSink that wraps FSRunStateStore for storage and FSEventQuery for querying. It maintains a JSON index file for fast run enumeration.

func NewFSLogSink

func NewFSLogSink(baseDir string) (*FSLogSink, error)

NewFSLogSink creates a new filesystem-backed LogSink rooted at baseDir. The index file is created if it does not already exist.

func (*FSLogSink) Append

func (s *FSLogSink) Append(runID string, event EngineEvent) error

Append writes an event to the run's event log and updates the index.

func (*FSLogSink) Close

func (s *FSLogSink) Close() error

Close releases any resources held by the sink. Calling Close multiple times is safe.

func (*FSLogSink) ListRuns

func (s *FSLogSink) ListRuns() ([]RunIndexEntry, error)

ListRuns returns all run index entries from the index file.

func (*FSLogSink) Prune

func (s *FSLogSink) Prune(olderThan time.Duration) (int, error)

Prune deletes all runs whose start time is older than the given duration ago. It removes both the run directory and its entry from the index. Returns the number of runs pruned.

func (*FSLogSink) Query

func (s *FSLogSink) Query(runID string, filter EventFilter) ([]EngineEvent, int, error)

Query returns events matching the filter and the total count of matching events (before pagination). The total reflects the full filtered result set, while the returned slice respects Limit and Offset.

func (*FSLogSink) Summarize

func (s *FSLogSink) Summarize(runID string) (*EventSummary, error)

Summarize returns aggregate statistics for a run's event log.

func (*FSLogSink) Tail

func (s *FSLogSink) Tail(runID string, n int) ([]EngineEvent, error)

Tail returns the last n events from the run's event log.

type FSRunStateStore

type FSRunStateStore struct {
	// contains filtered or unexported fields
}

FSRunStateStore is a filesystem-backed RunStateStore. Each run is stored in a subdirectory of baseDir named by run ID.

func NewFSRunStateStore

func NewFSRunStateStore(baseDir string) (*FSRunStateStore, error)

NewFSRunStateStore creates a new filesystem-backed run state store rooted at baseDir. The base directory is created if it does not already exist.

func (*FSRunStateStore) AddEvent

func (s *FSRunStateStore) AddEvent(id string, event EngineEvent) error

AddEvent appends an EngineEvent to the run's events.jsonl file. Returns an error if the run does not exist.

func (*FSRunStateStore) CheckpointPath

func (s *FSRunStateStore) CheckpointPath(runID string) string

CheckpointPath returns the path to the checkpoint.json file for a given run ID.

func (*FSRunStateStore) Create

func (s *FSRunStateStore) Create(state *RunState) error

Create persists a new RunState to disk. Returns an error if a run with the same ID already exists.

func (*FSRunStateStore) FindResumable

func (s *FSRunStateStore) FindResumable(sourceHash string) (*RunState, error)

FindResumable returns the most recent non-completed run whose SourceHash matches the given hash AND has a checkpoint.json file in its run directory. Returns nil if no matching run is found.

func (*FSRunStateStore) Get

func (s *FSRunStateStore) Get(id string) (*RunState, error)

Get loads a RunState from disk by its ID. Returns an error if the run does not exist or if any of the stored files are corrupt.

func (*FSRunStateStore) List

func (s *FSRunStateStore) List() ([]*RunState, error)

List returns all RunStates stored on disk. Non-directory entries in the base directory are silently ignored.

func (*FSRunStateStore) RunDir

func (s *FSRunStateStore) RunDir(runID string) string

RunDir returns the base directory path for a given run ID.

func (*FSRunStateStore) Update

func (s *FSRunStateStore) Update(state *RunState) error

Update overwrites the manifest and context for an existing run. Returns an error if the run does not exist.

type FailureTracker

type FailureTracker struct {
	// contains filtered or unexported fields
}

FailureTracker tracks failure signatures across retries to detect deterministic (repeating) failures. A failure is considered deterministic when the same normalized signature has been seen 2 or more times. FailureTracker is safe for concurrent use.

func NewFailureTracker

func NewFailureTracker() *FailureTracker

NewFailureTracker creates a FailureTracker ready to record errors.

func (*FailureTracker) Count

func (t *FailureTracker) Count(signature string) int

Count returns how many times the given signature has been recorded.

func (*FailureTracker) IsDeterministic

func (t *FailureTracker) IsDeterministic(signature string) bool

IsDeterministic returns true if the given signature has been seen 2 or more times, indicating the failure is likely deterministic and not transient.

func (*FailureTracker) Record

func (t *FailureTracker) Record(err error) string

Record normalizes the error message, increments the count for its signature, and returns the signature string.

type FanInHandler

type FanInHandler struct{}

FanInHandler handles parallel fan-in nodes (shape=tripleoctagon). It reads parallel results from the pipeline context and consolidates them. If no parallel results are available, it returns a failure.

func (*FanInHandler) Execute

func (h *FanInHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)

Execute reads parallel branch results from context and merges them. Returns success when results are present, or failure if none are found.

func (*FanInHandler) Type

func (h *FanInHandler) Type() string

Type returns the handler type string "parallel.fan_in".

type FidelityMode

type FidelityMode string

FidelityMode represents how much context is carried between nodes.

const (
	FidelityFull          FidelityMode = "full"
	FidelityTruncate      FidelityMode = "truncate"
	FidelityCompact       FidelityMode = "compact"
	FidelitySummaryLow    FidelityMode = "summary:low"
	FidelitySummaryMedium FidelityMode = "summary:medium"
	FidelitySummaryHigh   FidelityMode = "summary:high"
)

func ResolveFidelity

func ResolveFidelity(edge *Edge, targetNode *Node, graph *Graph) FidelityMode

ResolveFidelity resolves the fidelity mode for a target node using precedence: 1. Edge fidelity attribute (on incoming edge) 2. Target node fidelity attribute 3. Graph default_fidelity attribute 4. Default: compact

type FidelityOptions

type FidelityOptions struct {
	MaxKeys        int      // max context keys to keep (default 50 for truncate)
	MaxValueLength int      // max string value length before truncation (default 1024 for compact, 500 for summary:high)
	MaxLogs        int      // max log entries to keep (default 20 for compact)
	Whitelist      []string // custom whitelist keys for summary modes (overrides defaults)
}

FidelityOptions configures the behavior of fidelity-based context compaction.

type Graph

type Graph = dot.Graph

Type aliases: all attractor code continues to use Graph, Node, Edge, Subgraph without any changes, but the actual types come from the dot/ package.

func ApplyTransforms

func ApplyTransforms(g *Graph, transforms ...Transform) *Graph

ApplyTransforms applies a sequence of transforms to a graph, returning the final result.

func ComposeGraphs

func ComposeGraphs(parent *Graph, childGraph *Graph, insertNodeID string, namespace string) (*Graph, error)

ComposeGraphs merges a child graph into a parent graph by replacing the node identified by insertNodeID. Child node IDs are prefixed with "{namespace}." to avoid ID conflicts. Parent edges pointing to or from the insert node are reconnected to the child's start and terminal nodes respectively. Child graph attributes are copied, but parent attributes take precedence on conflicts.

func LoadSubPipeline

func LoadSubPipeline(path string) (*Graph, error)

LoadSubPipeline reads a DOT file from disk and parses it into a Graph.

func Parse

func Parse(input string) (*Graph, error)

Parse delegates to the consolidated dot/ parser. This maintains backward compatibility for all callers within attractor/ and its tests.

type GraphDOTFunc

type GraphDOTFunc func(g *Graph) string

GraphDOTFunc converts a Graph to DOT text. Used by PipelineServer for graph rendering.

type GraphDOTWithStatusFunc

type GraphDOTWithStatusFunc func(g *Graph, outcomes map[string]*Outcome) string

GraphDOTWithStatusFunc converts a Graph to DOT text with execution status color overlays.

type HandlerRegistry

type HandlerRegistry struct {
	// contains filtered or unexported fields
}

HandlerRegistry maps handler type strings to handler instances.

func DefaultHandlerRegistry

func DefaultHandlerRegistry() *HandlerRegistry

DefaultHandlerRegistry creates a registry with all 9 built-in handlers registered.

func NewHandlerRegistry

func NewHandlerRegistry() *HandlerRegistry

NewHandlerRegistry creates a new empty handler registry.

func (*HandlerRegistry) Get

func (r *HandlerRegistry) Get(typeName string) NodeHandler

Get returns the handler registered for the given type string, or nil if not found.

func (*HandlerRegistry) Register

func (r *HandlerRegistry) Register(handler NodeHandler)

Register adds a handler to the registry, keyed by its Type() string. Registering for an already-registered type replaces the previous handler.

func (*HandlerRegistry) Resolve

func (r *HandlerRegistry) Resolve(node *Node) NodeHandler

Resolve finds the appropriate handler for a node using the resolution order: 1. Explicit type attribute on the node 2. Shape-based resolution using the shape-to-handler-type mapping 3. Default to codergen handler

type Interviewer

type Interviewer interface {
	Ask(ctx context.Context, question string, options []string) (string, error)
}

Interviewer is the abstraction for human-in-the-loop interaction. Any frontend (CLI, web, Slack, programmatic) implements this interface.

type LintRule

type LintRule interface {
	Name() string
	Apply(g *Graph) []Diagnostic
}

LintRule is the interface for validation rules.

type LiveState

type LiveState struct {
	Status     string   `json:"status"`
	ActiveNode string   `json:"active_node"`
	Completed  []string `json:"completed"`
	Failed     []string `json:"failed"`
	StartedAt  string   `json:"started_at"`
	UpdatedAt  string   `json:"updated_at"`
	EventCount int      `json:"event_count"`
}

LiveState represents the current pipeline execution snapshot, written to live.json after each event so external tools can poll for status.

type LogSink

type LogSink interface {
	// Append writes an event to the log for the given run.
	Append(runID string, event EngineEvent) error

	// Query returns events matching the filter, along with the total count of
	// matching events (before pagination). This allows callers to paginate while
	// knowing the full result set size.
	Query(runID string, filter EventFilter) ([]EngineEvent, int, error)

	// Tail returns the last n events from the run's event log.
	Tail(runID string, n int) ([]EngineEvent, error)

	// Summarize returns aggregate statistics for a run's event log.
	Summarize(runID string) (*EventSummary, error)

	// Prune deletes all runs whose start time is older than the given duration ago.
	// Returns the number of runs pruned.
	Prune(olderThan time.Duration) (int, error)

	// Close releases any resources held by the sink.
	Close() error
}

LogSink defines the interface for structured event log storage with query, retention, and lifecycle management capabilities.

type ManagerBackend

type ManagerBackend interface {
	// Observe inspects the current state of the supervised pipeline and returns
	// a textual observation summarizing progress.
	Observe(ctx context.Context, nodeID string, iteration int, pctx *Context) (string, error)

	// Guard evaluates whether the supervised agent is on track. Returns true if
	// the guard condition is satisfied, false if steering is needed.
	Guard(ctx context.Context, nodeID string, iteration int, observation string, guardCondition string, pctx *Context) (bool, error)

	// Steer applies a correction to the supervised pipeline, returning a textual
	// description of the steering action taken.
	Steer(ctx context.Context, nodeID string, iteration int, steerPrompt string, pctx *Context) (string, error)
}

ManagerBackend defines the interface for LLM-powered supervision actions. Implementations handle observing agent progress, guarding against drift, and steering corrections when the agent goes off-track.

type ManagerLoopHandler

type ManagerLoopHandler struct {
	// Backend provides the LLM-powered observe/guard/steer operations.
	// If nil, the handler uses stub behavior that logs supervision steps.
	Backend ManagerBackend
}

ManagerLoopHandler handles stack manager loop nodes (shape=house). It runs a supervision loop that observes, guards, and steers a child pipeline or agent. When Backend is nil, the handler operates in stub mode, logging each step and returning success.

func (*ManagerLoopHandler) Execute

func (h *ManagerLoopHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)

Execute runs the supervision loop for the given manager node. It reads configuration from node attributes, then iterates through observe/guard/steer cycles up to max_iterations.

func (*ManagerLoopHandler) Type

func (h *ManagerLoopHandler) Type() string

Type returns the handler type string "stack.manager_loop".

type Node

type Node = dot.Node

type NodeHandler

type NodeHandler interface {
	// Type returns the handler type string (e.g., "start", "codergen", "wait.human").
	Type() string

	// Execute runs the handler logic for the given node.
	// ctx is the Go context for cancellation/timeout.
	// node is the parsed Node with all its attributes.
	// pctx is the shared pipeline Context (thread-safe KV store).
	// store is the artifact store for large outputs.
	Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)
}

NodeHandler is the interface that all node handlers implement. The execution engine dispatches to the appropriate handler based on node type or shape.

type NodeHandlerUnwrapper

type NodeHandlerUnwrapper interface {
	InnerHandler() NodeHandler
}

NodeHandlerUnwrapper allows handler wrappers to expose their inner handler. This enables backend wiring to reach through decorator layers (e.g. interviewerInjectingHandler) to the underlying CodergenHandler.

type Outcome

type Outcome struct {
	Status           StageStatus
	PreferredLabel   string
	SuggestedNextIDs []string
	ContextUpdates   map[string]any
	Notes            string
	FailureReason    string
}

Outcome is the result of executing a node handler.

type ParallelConfig

type ParallelConfig struct {
	MaxParallel int
	JoinPolicy  string
	ErrorPolicy string
	KRequired   int // For k_of_n policy: minimum number of branches that must succeed
}

ParallelConfig holds parsed configuration for parallel execution.

func ParallelConfigFromContext

func ParallelConfigFromContext(pctx *Context) ParallelConfig

ParallelConfigFromContext reads parallel configuration values from the pipeline context and returns a ParallelConfig with defaults applied for missing values.

type ParallelHandler

type ParallelHandler struct{}

ParallelHandler handles parallel fan-out nodes (shape=component). It identifies all outgoing edges as parallel branches and records them in the outcome. The actual concurrent execution is managed by the engine.

func (*ParallelHandler) Execute

func (h *ParallelHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)

Execute identifies outgoing branches and returns an outcome listing them. If there are no outgoing edges, it returns a failure.

func (*ParallelHandler) Type

func (h *ParallelHandler) Type() string

Type returns the handler type string "parallel".

type PendingQuestion

type PendingQuestion struct {
	ID       string   `json:"id"`
	Question string   `json:"question"`
	Options  []string `json:"options"`
	Answered bool     `json:"answered"`
	Answer   string   `json:"answer,omitempty"`
}

PendingQuestion represents a question waiting for a human answer.

type PipelineRun

type PipelineRun struct {
	ID          string
	Status      string // "running", "completed", "failed", "cancelled"
	Source      string // original DOT source
	Result      *RunResult
	Error       string
	ArtifactDir string        // path to the run's artifact directory
	Events      []EngineEvent // collected events
	Cancel      context.CancelFunc
	Questions   []PendingQuestion // for human-in-the-loop

	CreatedAt time.Time
	// contains filtered or unexported fields
}

PipelineRun tracks a running pipeline.

type PipelineServer

type PipelineServer struct {

	// ToDOT converts a Graph to DOT text. If nil, handleGetGraph uses a minimal fallback.
	ToDOT GraphDOTFunc

	// ToDOTWithStatus converts a Graph to DOT text with status color overlays.
	// If nil, falls back to ToDOT.
	ToDOTWithStatus GraphDOTWithStatusFunc

	// RenderDOTSource renders raw DOT text to svg/png. If nil, only DOT format is available.
	RenderDOTSource DOTRenderFunc
	// contains filtered or unexported fields
}

PipelineServer manages HTTP endpoints for running pipelines.

func NewPipelineServer

func NewPipelineServer(engine *Engine) *PipelineServer

NewPipelineServer creates a new PipelineServer with the given engine.

func (*PipelineServer) Handler

func (s *PipelineServer) Handler() http.Handler

Handler returns the HTTP handler for this server, wrapped with request logging.

func (*PipelineServer) LoadPersistedRuns

func (s *PipelineServer) LoadPersistedRuns() error

LoadPersistedRuns loads all previously persisted pipeline runs from the configured RunStateStore into the server's in-memory map. This allows the server to serve historical run data after a restart.

func (*PipelineServer) ServeHTTP

func (s *PipelineServer) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP delegates to the internal mux.

func (*PipelineServer) SetEventQuery

func (s *PipelineServer) SetEventQuery(eq EventQuery)

SetEventQuery configures the EventQuery backing store for the event query REST endpoints.

func (*PipelineServer) SetRunStateStore

func (s *PipelineServer) SetRunStateStore(store RunStateStore)

SetRunStateStore configures the persistent RunStateStore for the server. When set, the server persists pipeline runs on creation and completion, and can load previously persisted runs on startup.

type PipelineStatus

type PipelineStatus struct {
	ID             string    `json:"id"`
	Status         string    `json:"status"`
	CompletedNodes []string  `json:"completed_nodes,omitempty"`
	ArtifactDir    string    `json:"artifact_dir,omitempty"`
	Error          string    `json:"error,omitempty"`
	CreatedAt      time.Time `json:"created_at"`
}

PipelineStatus is the JSON response for pipeline status queries.

type PreflightCheck

type PreflightCheck struct {
	Name  string                          // human-readable check name
	Check func(ctx context.Context) error // the actual check; nil error means pass
}

PreflightCheck represents a single validation check to run before pipeline execution.

func BuildPreflightChecks

func BuildPreflightChecks(graph *Graph, cfg EngineConfig) []PreflightCheck

BuildPreflightChecks analyzes the graph and engine configuration to produce the set of preflight checks appropriate for this pipeline. Checks include:

  • Backend availability when codergen nodes are present
  • Required environment variables declared via env_required node attributes

type PreflightFailure

type PreflightFailure struct {
	Name   string
	Reason string
}

PreflightFailure records a single check failure with its name and reason.

type PreflightResult

type PreflightResult struct {
	Passed []string           // names of checks that passed
	Failed []PreflightFailure // checks that failed with reasons
}

PreflightResult holds the aggregated results of all preflight checks.

func RunPreflight

func RunPreflight(ctx context.Context, checks []PreflightCheck) PreflightResult

RunPreflight executes all checks and collects results. Every check is run regardless of whether earlier checks fail, so the caller gets a complete picture of what needs to be fixed.

func (PreflightResult) Error

func (r PreflightResult) Error() string

Error formats all failures as a multi-line string. Returns empty string if no failures.

func (PreflightResult) OK

func (r PreflightResult) OK() bool

OK returns true if no checks failed.

type ProgressEntry

type ProgressEntry struct {
	Timestamp string         `json:"timestamp"`
	Type      string         `json:"type"`
	NodeID    string         `json:"node_id,omitempty"`
	Data      map[string]any `json:"data,omitempty"`
}

ProgressEntry is a JSON-serializable record written as one line in the NDJSON log.

type ProgressLogger

type ProgressLogger struct {
	WriteErrors int // count of write errors encountered (for diagnostics)
	// contains filtered or unexported fields
}

ProgressLogger writes engine events to an append-only NDJSON file and maintains a live.json snapshot reflecting current pipeline state.

func NewProgressLogger

func NewProgressLogger(dir string) (*ProgressLogger, error)

NewProgressLogger creates a progress logger that writes to the given directory. It opens progress.ndjson for appending and writes an initial live.json with pending status.

func (*ProgressLogger) Close

func (p *ProgressLogger) Close() error

Close closes the underlying NDJSON file. After Close, HandleEvent becomes a no-op.

func (*ProgressLogger) HandleEvent

func (p *ProgressLogger) HandleEvent(evt EngineEvent)

HandleEvent converts an EngineEvent to a ProgressEntry, appends it to the NDJSON file, updates the live state, and atomically rewrites live.json. This method signature matches EngineConfig.EventHandler so it can be wired directly.

func (*ProgressLogger) State

func (p *ProgressLogger) State() LiveState

State returns a copy of the current live state.

type QAPair

type QAPair struct {
	Question string
	Options  []string
	Answer   string
}

QAPair records a question-answer interaction for auditing and replay.

type Question

type Question struct {
	ID       string
	Text     string
	Options  []string          // empty means free-text
	Default  string            // default answer if timeout
	Metadata map[string]string // arbitrary key-value pairs
}

Question represents a structured question for human review.

type QueueInterviewer

type QueueInterviewer struct {
	// contains filtered or unexported fields
}

QueueInterviewer reads answers from a pre-filled queue (FIFO order). Intended for deterministic testing and replay scenarios.

func NewQueueInterviewer

func NewQueueInterviewer(answers ...string) *QueueInterviewer

NewQueueInterviewer creates a QueueInterviewer pre-loaded with the given answers.

func (*QueueInterviewer) Ask

func (q *QueueInterviewer) Ask(ctx context.Context, question string, options []string) (string, error)

Ask dequeues the next answer. Returns an error when the queue is exhausted.

type RecordingInterviewer

type RecordingInterviewer struct {
	// contains filtered or unexported fields
}

RecordingInterviewer wraps another Interviewer and records all Q&A pairs. Useful for replay, debugging, and audit trails.

func NewRecordingInterviewer

func NewRecordingInterviewer(inner Interviewer) *RecordingInterviewer

NewRecordingInterviewer wraps the given inner Interviewer with recording capability.

func (*RecordingInterviewer) Ask

func (r *RecordingInterviewer) Ask(ctx context.Context, question string, options []string) (string, error)

Ask delegates to the inner Interviewer and records the Q&A pair.

func (*RecordingInterviewer) Recordings

func (r *RecordingInterviewer) Recordings() []QAPair

Recordings returns a copy of all recorded Q&A pairs.

type RestartConfig

type RestartConfig struct {
	MaxRestarts int // maximum number of restarts before giving up (default 5)
}

RestartConfig controls loop restart behavior.

func DefaultRestartConfig

func DefaultRestartConfig() *RestartConfig

DefaultRestartConfig returns a RestartConfig with sensible defaults.

type RetentionConfig

type RetentionConfig struct {
	MaxAge  time.Duration // prune runs older than this; 0 means no age limit
	MaxRuns int           // keep at most this many runs; 0 means unlimited
}

RetentionConfig specifies how long and how many runs to retain.

func (RetentionConfig) PruneByMaxRuns

func (rc RetentionConfig) PruneByMaxRuns(sink LogSink) (int, error)

PruneByMaxRuns removes the oldest runs that exceed the MaxRuns limit. Runs are sorted by start time; the oldest beyond the limit are deleted. Returns the number of runs pruned.

func (RetentionConfig) PruneLoop

func (rc RetentionConfig) PruneLoop(ctx context.Context, sink LogSink, interval time.Duration)

PruneLoop runs periodic retention cleanup until ctx is cancelled. It prunes by MaxAge each interval. This blocks until the context is done.

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts int // minimum 1 (1 = no retries)
	Backoff     BackoffConfig
	ShouldRetry func(error) bool
}

RetryPolicy controls how many times a node execution is retried on failure.

func RetryPolicyAggressive

func RetryPolicyAggressive() RetryPolicy

RetryPolicyAggressive returns a policy with 5 attempts and a higher initial delay.

func RetryPolicyLinear

func RetryPolicyLinear() RetryPolicy

RetryPolicyLinear returns a policy with 3 attempts and constant delay (factor=1.0).

func RetryPolicyNone

func RetryPolicyNone() RetryPolicy

RetryPolicyNone returns a policy with no retries (single attempt).

func RetryPolicyPatient

func RetryPolicyPatient() RetryPolicy

RetryPolicyPatient returns a policy with 3 attempts, high initial delay, and steep backoff.

func RetryPolicyStandard

func RetryPolicyStandard() RetryPolicy

RetryPolicyStandard returns a standard retry policy with 5 attempts and exponential backoff.

type RunDirectory

type RunDirectory struct {
	BaseDir string
	RunID   string
}

RunDirectory represents the directory structure for a single pipeline run.

func NewRunDirectory

func NewRunDirectory(baseDir, runID string) (*RunDirectory, error)

NewRunDirectory creates a new run directory structure at baseDir/runID.

func (*RunDirectory) EnsureNodeDir

func (rd *RunDirectory) EnsureNodeDir(nodeID string) error

EnsureNodeDir creates the directory for a node if it doesn't exist.

func (*RunDirectory) ListNodeArtifacts

func (rd *RunDirectory) ListNodeArtifacts(nodeID string) ([]string, error)

ListNodeArtifacts returns the filenames of all artifacts for a node.

func (*RunDirectory) LoadCheckpoint

func (rd *RunDirectory) LoadCheckpoint() (*Checkpoint, error)

LoadCheckpoint loads a checkpoint from checkpoint.json in the run directory.

func (*RunDirectory) NodeDir

func (rd *RunDirectory) NodeDir(nodeID string) string

NodeDir returns the path for a node's artifact directory. The nodeID is sanitized to prevent path traversal attacks.

func (*RunDirectory) ReadNodeArtifact

func (rd *RunDirectory) ReadNodeArtifact(nodeID, filename string) ([]byte, error)

ReadNodeArtifact reads data from a file within a node's directory.

func (*RunDirectory) SaveCheckpoint

func (rd *RunDirectory) SaveCheckpoint(cp *Checkpoint) error

SaveCheckpoint saves a checkpoint to checkpoint.json in the run directory.

func (*RunDirectory) WriteNodeArtifact

func (rd *RunDirectory) WriteNodeArtifact(nodeID, filename string, data []byte) error

WriteNodeArtifact writes data to a file within a node's directory.

func (*RunDirectory) WritePrompt

func (rd *RunDirectory) WritePrompt(nodeID, prompt string) error

WritePrompt writes a prompt to prompt.md in a node's directory.

func (*RunDirectory) WriteResponse

func (rd *RunDirectory) WriteResponse(nodeID, response string) error

WriteResponse writes a response to response.md in a node's directory.

type RunIndex

type RunIndex struct {
	Runs    map[string]RunIndexEntry `json:"runs"`
	Updated time.Time                `json:"updated"`
}

RunIndex is the top-level structure persisted as index.json in the store root. It maps run IDs to their metadata for fast enumeration and filtering.

type RunIndexEntry

type RunIndexEntry struct {
	ID         string    `json:"id"`
	Status     string    `json:"status"`
	StartTime  time.Time `json:"start_time"`
	EventCount int       `json:"event_count"`
}

RunIndexEntry holds metadata about a single run for fast lookup without scanning the full directory tree.

type RunResult

type RunResult struct {
	FinalOutcome   *Outcome
	CompletedNodes []string
	NodeOutcomes   map[string]*Outcome
	Context        *Context
}

RunResult holds the final state of a completed pipeline execution.

type RunState

type RunState struct {
	ID             string         `json:"id"`
	PipelineFile   string         `json:"pipeline_file"`
	Status         string         `json:"status"` // "running", "completed", "failed", "cancelled"
	Source         string         `json:"source,omitempty"`
	SourceHash     string         `json:"source_hash,omitempty"`
	StartedAt      time.Time      `json:"started_at"`
	CompletedAt    *time.Time     `json:"completed_at,omitempty"`
	CurrentNode    string         `json:"current_node"`
	CompletedNodes []string       `json:"completed_nodes"`
	Context        map[string]any `json:"context"`
	Events         []EngineEvent  `json:"events"`
	Error          string         `json:"error,omitempty"`
}

RunState represents the full state of a single pipeline run.

type RunStateStore

type RunStateStore interface {
	Create(state *RunState) error
	Get(id string) (*RunState, error)
	Update(state *RunState) error
	List() ([]*RunState, error)
	AddEvent(id string, event EngineEvent) error
}

RunStateStore is the interface for persisting and retrieving pipeline run state.

type Severity

type Severity int

Severity represents diagnostic severity level.

const (
	SeverityError Severity = iota
	SeverityWarning
	SeverityInfo
)

func (Severity) String

func (s Severity) String() string

String returns a human-readable name for the severity level.

type StageStatus

type StageStatus string

StageStatus represents the outcome of executing a node.

const (
	StatusSuccess        StageStatus = "success"
	StatusFail           StageStatus = "fail"
	StatusPartialSuccess StageStatus = "partial_success"
	StatusRetry          StageStatus = "retry"
	StatusSkipped        StageStatus = "skipped"
)

type StartHandler

type StartHandler struct{}

StartHandler handles the pipeline entry point node (shape=Mdiamond). It performs no work beyond recording the start time and returning success.

func (*StartHandler) Execute

func (h *StartHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)

Execute initializes context with a start timestamp and returns success.

func (*StartHandler) Type

func (h *StartHandler) Type() string

Type returns the handler type string "start".

type StyleRule

type StyleRule struct {
	Selector    string
	Properties  map[string]string
	Specificity int
}

StyleRule represents a single CSS-like rule with a selector, properties, and specificity.

type Stylesheet

type Stylesheet struct {
	Rules []StyleRule
}

Stylesheet is a collection of style rules parsed from a CSS-like DSL.

func ParseStylesheet

func ParseStylesheet(input string) (*Stylesheet, error)

ParseStylesheet parses a CSS-like stylesheet string into a Stylesheet. Supported selectors: * (universal, specificity=0), .class (specificity=1), #id (specificity=2).

func (*Stylesheet) Apply

func (ss *Stylesheet) Apply(g *Graph)

Apply applies the stylesheet rules to all nodes in the graph. Higher specificity rules override lower ones. Explicit node attributes override all.

func (*Stylesheet) MatchNode

func (ss *Stylesheet) MatchNode(node *Node) map[string]string

MatchNode resolves all properties that apply to a node from the stylesheet rules, applying specificity ordering (higher specificity overrides lower).

type StylesheetApplicationTransform

type StylesheetApplicationTransform struct{}

StylesheetApplicationTransform applies the model_stylesheet graph attribute to all nodes using CSS-like specificity rules.

func (*StylesheetApplicationTransform) Apply

Apply parses and applies the model_stylesheet from graph attributes.

type SubPipelineTransform

type SubPipelineTransform struct{}

SubPipelineTransform is a Transform that scans for nodes with a "sub_pipeline" attribute, loads the referenced DOT file, and composes it into the graph. The insert node's ID is used as the namespace to avoid conflicts.

func (*SubPipelineTransform) Apply

func (t *SubPipelineTransform) Apply(g *Graph) *Graph

Apply scans the graph for nodes with a sub_pipeline attribute and inlines the referenced child graphs. If a sub-pipeline file cannot be loaded or composed, the node is left intact and the error is silently skipped.

type Subgraph

type Subgraph = dot.Subgraph

type TokenUsage

type TokenUsage struct {
	InputTokens      int `json:"input_tokens"`
	OutputTokens     int `json:"output_tokens"`
	TotalTokens      int `json:"total_tokens"`
	ReasoningTokens  int `json:"reasoning_tokens"`
	CacheReadTokens  int `json:"cache_read_tokens"`
	CacheWriteTokens int `json:"cache_write_tokens"`
}

TokenUsage tracks granular token consumption across an agent run, broken down by input, output, reasoning, and cache categories.

func (TokenUsage) Add

func (u TokenUsage) Add(other TokenUsage) TokenUsage

Add combines two TokenUsage values by summing all fields.

type ToolCallEntry

type ToolCallEntry struct {
	ToolName string        `json:"tool_name"`
	CallID   string        `json:"call_id"`
	Duration time.Duration `json:"duration"`
	Output   string        `json:"output"` // truncated to 500 chars
}

ToolCallEntry records details about a single tool invocation during an agent run.

type ToolHandler

type ToolHandler struct{}

ToolHandler handles external tool execution nodes (shape=parallelogram). It reads the command from node attributes, executes it via the system shell, and captures stdout, stderr, and exit code.

func (*ToolHandler) Execute

func (h *ToolHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)

Execute runs the shell command specified in the node's "command" (or "prompt") attribute. It configures timeout, working directory, and environment variables from node attributes, then returns an Outcome with stdout, stderr, and exit code.

func (*ToolHandler) Type

func (h *ToolHandler) Type() string

Type returns the handler type string "tool".

type Transform

type Transform interface {
	Apply(g *Graph) *Graph
}

Transform is the interface for AST transformations applied to a parsed graph.

func DefaultTransforms

func DefaultTransforms() []Transform

DefaultTransforms returns the standard ordered transform chain.

type VariableExpansionTransform

type VariableExpansionTransform struct{}

VariableExpansionTransform expands $variable references in node attributes using graph-level attribute values.

func (*VariableExpansionTransform) Apply

func (t *VariableExpansionTransform) Apply(g *Graph) *Graph

Apply expands $variable references in node prompt and label attributes.

type WaitForHumanHandler

type WaitForHumanHandler struct {
	// Interviewer is the human interaction frontend. If nil, the handler
	// returns a failure indicating no interviewer is available.
	Interviewer Interviewer
}

WaitForHumanHandler handles human gate nodes (shape=hexagon). It presents choices derived from outgoing edges to a human via the Interviewer interface and returns their selection.

func (*WaitForHumanHandler) Execute

func (h *WaitForHumanHandler) Execute(ctx context.Context, node *Node, pctx *Context, store *ArtifactStore) (*Outcome, error)

Execute presents choices to a human and returns their selection. Choices are derived from outgoing edges of the node.

Supports optional node attributes:

  • timeout: Duration string (e.g. "5m", "1h") limiting how long to wait for human input.
  • default_choice: Edge label to select if the timeout expires.
  • reminder_interval: Duration string for periodic re-prompting (parsed and validated, but only effective if the Interviewer implementation supports it).

Context updates always include human.timed_out (bool) and human.response_time_ms (int64).

func (*WaitForHumanHandler) Type

func (h *WaitForHumanHandler) Type() string

Type returns the handler type string "wait.human".

type Watchdog

type Watchdog struct {
	// contains filtered or unexported fields
}

Watchdog monitors active pipeline nodes and emits EventStageStalled warnings when a node has not made progress within the configured StallTimeout. It never cancels execution -- it is purely an observability tool.

func NewWatchdog

func NewWatchdog(cfg WatchdogConfig, eventHandler func(EngineEvent)) *Watchdog

NewWatchdog creates a Watchdog with the given config and event handler. The event handler is called (from the watchdog goroutine) whenever a stall warning is emitted.

func (*Watchdog) ActiveNodes

func (w *Watchdog) ActiveNodes() []string

ActiveNodes returns a slice of node IDs currently being tracked. The order is non-deterministic.

func (*Watchdog) HandleEvent

func (w *Watchdog) HandleEvent(evt EngineEvent)

HandleEvent is a convenience method that routes engine events to NodeStarted or NodeFinished based on event type. This lets the watchdog be composed with another event handler in a pipeline's EventHandler chain.

func (*Watchdog) NodeFinished

func (w *Watchdog) NodeFinished(nodeID string)

NodeFinished removes a node from active tracking. After this call the watchdog will no longer consider the node for stall detection.

func (*Watchdog) NodeStarted

func (w *Watchdog) NodeStarted(nodeID string)

NodeStarted records that a node has become active. If the node was previously tracked and warned, the warning state is reset so a new stall can be detected if the node stalls again.

func (*Watchdog) Start

func (w *Watchdog) Start(ctx context.Context)

Start launches the background monitoring goroutine. It stops when ctx is cancelled.

type WatchdogConfig

type WatchdogConfig struct {
	StallTimeout  time.Duration // how long before a node is considered stalled
	CheckInterval time.Duration // how often to check for stalls
}

WatchdogConfig holds configuration for the stall-detection watchdog.

func DefaultWatchdogConfig

func DefaultWatchdogConfig() WatchdogConfig

DefaultWatchdogConfig returns a WatchdogConfig with sensible defaults: 5 minute stall timeout and 10 second check interval.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL