Documentation
¶
Overview ¶
Package flow implements Harbor's Flow-as-Tool registration (RFC §6.1, D-023): a typed DAG of Nodes assembled into a runnable Engine via `Compose(def)`, then registered as a single Tool in the catalog via `RegisterAsTool(catalog, def, eng)`. The planner sees one Tool with an args/result schema; invoking it runs the underlying DAG with the runtime's full reliability shell — per-node `NodePolicy` plus an aggregate `Budget` enforced at the flow boundary.
Composition with the parent run + identity-tier ceilings is via `min()` on each axis (deadline / hop budget / cost cap); whichever fires first aborts the flow with `ErrFlowBudgetExceeded`. Identity- tier governance budgets (Phase 36a, not yet shipped) compose uniformly through the same min() path — the Budget composition is open-ended.
Layering rule (D-024): when a flow is invoked AS a tool, the dispatcher's `ToolPolicy` wraps the OUTER invocation; the per-node `NodePolicy` runs INSIDE the flow's engine. No double-wrapping at any single layer.
Concurrent reuse contract (D-025): a composed `engine.Engine` is reusable across invocations; each invocation gets its own per-call Budget accumulator (lock-free atomic counters) so budget state never bleeds between concurrent flow invocations.
Index ¶
- Constants
- Variables
- func Compose(def Definition, opts ...ComposeOption) (engine.Engine, error)
- func RegisterAsTool(cat tools.ToolCatalog, def Definition, eng engine.Engine) (tools.Tool, error)
- func WithBudget(ctx context.Context, b Budget) context.Context
- type Budget
- type BudgetExceededPayload
- type ComposeOption
- type Definition
- type Metadata
- type NodeID
- type NodeRunRecord
- type NodeSpec
- type Registry
- func (r *Registry) Definition(name string) (Definition, Metadata, bool)
- func (r *Registry) Names() []string
- func (r *Registry) RecordRun(rec RunRecord) error
- func (r *Registry) Register(def Definition, meta Metadata) error
- func (r *Registry) RunByID(runID string) (RunRecord, bool)
- func (r *Registry) Runs(name string) ([]RunRecord, bool)
- type RunRecord
Constants ¶
const ( // EventTypeFlowBudgetExceeded — emitted by invokeFlow when // any axis of the per-call Budget accumulator fires // (deadline / hop_budget / cost_cap). Carries the flow name // + the triggering axis. SafePayload. EventTypeFlowBudgetExceeded events.EventType = "flow.budget_exceeded" )
Phase 26a flow event types.
Variables ¶
var ( // ErrFlowBudgetExceeded is returned when a run exceeds its // configured step/hop budget. ErrFlowBudgetExceeded = errors.New("flow: budget exceeded") // ErrFlowInvalidDefinition is returned when a Definition fails // structural validation. ErrFlowInvalidDefinition = errors.New("flow: invalid definition") // ErrFlowEntryExitMismatch is returned when a Definition's entry or // exit node is not present in the graph. ErrFlowEntryExitMismatch = errors.New("flow: entry/exit node not in graph") )
Sentinel errors returned by the flow engine. Callers compare against them with errors.Is.
Functions ¶
func Compose ¶
func Compose(def Definition, opts ...ComposeOption) (engine.Engine, error)
Compose builds a runnable engine.Engine from a Definition. Engine sizing: per-channel queue size of 256 by default — large enough to absorb burst-concurrent invocations (N=100+) without blocking the worker on the dispatcher's anyRun channel.
func RegisterAsTool ¶
func RegisterAsTool(cat tools.ToolCatalog, def Definition, eng engine.Engine) (tools.Tool, error)
RegisterAsTool wires a composed Engine into the Tool catalog with `Transport: TransportFlow`.
Types ¶
type BudgetExceededPayload ¶
type BudgetExceededPayload struct {
events.SafeSealed
FlowName string
Axis string
}
BudgetExceededPayload is the typed payload for EventTypeFlowBudgetExceeded.
Axis names: "deadline", "hop_budget", "cost_cap".
type ComposeOption ¶
type ComposeOption func(*composeConfig)
ComposeOption configures a Compose call.
func WithComposeQueueSize ¶
func WithComposeQueueSize(n int) ComposeOption
WithComposeQueueSize overrides the engine's per-channel queue capacity (default 256). Higher values absorb more burst concurrency at the cost of memory; lower values apply tighter backpressure.
func WithRunErrorHandler ¶ added in v1.3.0
func WithRunErrorHandler(h engine.RunErrorHandler) ComposeOption
WithRunErrorHandler forwards a run-error handler to the composed engine (`engine.WithRunErrorHandler` — Phase 111f, D-203). An embedder composing flows passes the assembly's handler (`assemble.Stack.RunErrorHandler` — see docs/recipes/observe-an-embedded-runtime.md), which routes the structured engine.RunError through `telemetry.Logger.Error` so a terminal node failure on a flow-as-tool run emits the paired `runtime.error` bus event (RFC §6.14). The shipped binary registers flow Definitions without composing an engine, so the recipe + the flow-as-tool failure E2E are the seam's executing consumers. Nil is a no-op: the engine's slog path still logs the failure (the Phase 10 behaviour).
type Definition ¶
type Definition struct {
Name string
Description string
Entry NodeID
Exit NodeID
Nodes map[NodeID]NodeSpec
Budget Budget
InSchema json.RawMessage
OutSchema json.RawMessage
}
Definition is the canonical Go shape for a Flow.
func WithSchemasFrom ¶
func WithSchemasFrom[I any, O any](def Definition) (Definition, error)
WithSchemasFrom annotates a Definition with reflection-derived InSchema / OutSchema from the given Go input/output types.
func (Definition) Validate ¶
func (d Definition) Validate() error
Validate runs structural checks on the Definition.
type Metadata ¶
type Metadata struct {
// Owner is the agent / team that registered the flow. May be empty.
Owner string
// Version is the flow's version string. May be empty.
Version string
// PlannerFamily is the graph-family planner the flow runs on —
// "graph" / "workflow" / "deterministic". Empty defaults to "graph".
PlannerFamily string
// Source is the source-of-truth reference — a Go path or a YAML
// descriptor path (D-023: Go-coded V1; YAML V1.1). A string
// reference, never executable code.
Source string
}
Metadata carries the Console-facing descriptive fields a registered flow advertises beyond its runnable Definition. The runnable Definition (the engine graph) is tenant-agnostic; Metadata is the catalog-row decoration the Flows page (Phase 73i) renders.
type NodeRunRecord ¶
type NodeRunRecord struct {
// NodeID is the node's identifier within the flow.
NodeID string
// Status is the node's outcome within the run.
Status string
// Duration is the node's wall-clock duration.
Duration time.Duration
// Retries is the number of times the node was retried.
Retries int
// ErrorClass is a short classification of a node failure.
ErrorClass string
}
NodeRunRecord is one node's slice of a run's per-node timeline.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry is the runtime's source-of-truth for registered flows and their run history. It is the seam the Phase 73i Console Flows-page Catalog reads from. It is NOT a test stub — it is a real runtime subsystem: a flow registers into it at agent-definition time, the run loop records each invocation, and the Console projects the catalog from it.
Concurrent reuse (D-025): the Registry is safe for N concurrent callers — every field access is guarded by an RWMutex. Registration and run recording take the write lock; catalog reads take the read lock. The Registry holds no per-call state.
func (*Registry) Definition ¶
func (r *Registry) Definition(name string) (Definition, Metadata, bool)
Definition returns a registered flow's runnable Definition + its Metadata. The bool is false when the name is not registered.
func (*Registry) RecordRun ¶
RecordRun appends a RunRecord to the named flow's run-history ring. An unknown flow name fails loud. The ring is bounded — the oldest record is dropped once the ring is full.
func (*Registry) Register ¶
func (r *Registry) Register(def Definition, meta Metadata) error
Register adds a flow Definition + its catalog Metadata to the Registry. A flow with an empty name, or a name already registered, fails loud — registration is not silently overwritten (CLAUDE.md §5).
type RunRecord ¶
type RunRecord struct {
// RunID is the run's stable identifier.
RunID string
// FlowName is the registered flow this run executed.
FlowName string
// Identity is the (tenant, user, session) the run executed under.
Identity identity.Identity
// Trigger is what initiated the run — "user" / "planner" / "system".
Trigger string
// Status is the run's outcome — "running" / "succeeded" / "failed"
// / "cancelled".
Status string
// StartedAt is the wall-clock time the run started.
StartedAt time.Time
// Duration is the run's wall-clock duration. Zero for an in-flight
// run.
Duration time.Duration
// CostUSD is the run's recorded cost.
CostUSD float64
// ErrorClass is a short classification of a failure. Empty for a
// non-failed run.
ErrorClass string
// NodeStates is the run's per-node execution timeline.
NodeStates []NodeRunRecord
// Output is the run's final output preview (post-redaction). The
// Catalog applies the D-026 heavy-content bypass before shipping it.
Output string
}
RunRecord is a single recorded invocation of a registered flow. The Registry keeps a bounded run-history ring per flow; the Flows-page Catalog (Phase 73i) projects these into the wire `FlowRun` rows.
A RunRecord is identity-scoped: run history is tenant-scoped (a non-admin Console caller sees only their own tenant's runs), so the record carries the full triple it executed under.