flow

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package flow implements Harbor's Flow-as-Tool registration (RFC §6.1, D-023): a typed DAG of Nodes assembled into a runnable Engine via `Compose(def)`, then registered as a single Tool in the catalog via `RegisterAsTool(catalog, def, eng)`. The planner sees one Tool with an args/result schema; invoking it runs the underlying DAG with the runtime's full reliability shell — per-node `NodePolicy` plus an aggregate `Budget` enforced at the flow boundary.

Composition with the parent run + identity-tier ceilings is via `min()` on each axis (deadline / hop budget / cost cap); whichever fires first aborts the flow with `ErrFlowBudgetExceeded`. Identity- tier governance budgets (Phase 36a, not yet shipped) compose uniformly through the same min() path — the Budget composition is open-ended.

Layering rule (D-024): when a flow is invoked AS a tool, the dispatcher's `ToolPolicy` wraps the OUTER invocation; the per-node `NodePolicy` runs INSIDE the flow's engine. No double-wrapping at any single layer.

Concurrent reuse contract (D-025): a composed `engine.Engine` is reusable across invocations; each invocation gets its own per-call Budget accumulator (lock-free atomic counters) so budget state never bleeds between concurrent flow invocations.

Index

Constants

View Source
const (
	// EventTypeFlowBudgetExceeded — emitted by invokeFlow when
	// any axis of the per-call Budget accumulator fires
	// (deadline / hop_budget / cost_cap). Carries the flow name
	// + the triggering axis. SafePayload.
	EventTypeFlowBudgetExceeded events.EventType = "flow.budget_exceeded"
)

Phase 26a flow event types.

Variables

View Source
var (
	// ErrFlowBudgetExceeded is returned when a run exceeds its
	// configured step/hop budget.
	ErrFlowBudgetExceeded = errors.New("flow: budget exceeded")
	// ErrFlowInvalidDefinition is returned when a Definition fails
	// structural validation.
	ErrFlowInvalidDefinition = errors.New("flow: invalid definition")
	// ErrFlowEntryExitMismatch is returned when a Definition's entry or
	// exit node is not present in the graph.
	ErrFlowEntryExitMismatch = errors.New("flow: entry/exit node not in graph")
)

Sentinel errors returned by the flow engine. Callers compare against them with errors.Is.

Functions

func Compose

func Compose(def Definition, opts ...ComposeOption) (engine.Engine, error)

Compose builds a runnable engine.Engine from a Definition. Engine sizing: per-channel queue size of 256 by default — large enough to absorb burst-concurrent invocations (N=100+) without blocking the worker on the dispatcher's anyRun channel.

func RegisterAsTool

func RegisterAsTool(cat tools.ToolCatalog, def Definition, eng engine.Engine) (tools.Tool, error)

RegisterAsTool wires a composed Engine into the Tool catalog with `Transport: TransportFlow`.

func WithBudget

func WithBudget(ctx context.Context, b Budget) context.Context

WithBudget attaches a parent Budget to ctx.

Types

type Budget

type Budget struct {
	Deadline  time.Duration
	HopBudget int
	CostCap   float64
}

Budget is the per-flow aggregate cap.

type BudgetExceededPayload

type BudgetExceededPayload struct {
	events.SafeSealed
	FlowName string
	Axis     string
}

BudgetExceededPayload is the typed payload for EventTypeFlowBudgetExceeded.

Axis names: "deadline", "hop_budget", "cost_cap".

type ComposeOption

type ComposeOption func(*composeConfig)

ComposeOption configures a Compose call.

func WithComposeQueueSize

func WithComposeQueueSize(n int) ComposeOption

WithComposeQueueSize overrides the engine's per-channel queue capacity (default 256). Higher values absorb more burst concurrency at the cost of memory; lower values apply tighter backpressure.

func WithRunErrorHandler added in v1.3.0

func WithRunErrorHandler(h engine.RunErrorHandler) ComposeOption

WithRunErrorHandler forwards a run-error handler to the composed engine (`engine.WithRunErrorHandler` — Phase 111f, D-203). An embedder composing flows passes the assembly's handler (`assemble.Stack.RunErrorHandler` — see docs/recipes/observe-an-embedded-runtime.md), which routes the structured engine.RunError through `telemetry.Logger.Error` so a terminal node failure on a flow-as-tool run emits the paired `runtime.error` bus event (RFC §6.14). The shipped binary registers flow Definitions without composing an engine, so the recipe + the flow-as-tool failure E2E are the seam's executing consumers. Nil is a no-op: the engine's slog path still logs the failure (the Phase 10 behaviour).

type Definition

type Definition struct {
	Name        string
	Description string
	Entry       NodeID
	Exit        NodeID
	Nodes       map[NodeID]NodeSpec
	Budget      Budget
	InSchema    json.RawMessage
	OutSchema   json.RawMessage
}

Definition is the canonical Go shape for a Flow.

func WithSchemasFrom

func WithSchemasFrom[I any, O any](def Definition) (Definition, error)

WithSchemasFrom annotates a Definition with reflection-derived InSchema / OutSchema from the given Go input/output types.

func (Definition) Validate

func (d Definition) Validate() error

Validate runs structural checks on the Definition.

type Metadata

type Metadata struct {
	// Owner is the agent / team that registered the flow. May be empty.
	Owner string
	// Version is the flow's version string. May be empty.
	Version string
	// PlannerFamily is the graph-family planner the flow runs on —
	// "graph" / "workflow" / "deterministic". Empty defaults to "graph".
	PlannerFamily string
	// Source is the source-of-truth reference — a Go path or a YAML
	// descriptor path (D-023: Go-coded V1; YAML V1.1). A string
	// reference, never executable code.
	Source string
}

Metadata carries the Console-facing descriptive fields a registered flow advertises beyond its runnable Definition. The runnable Definition (the engine graph) is tenant-agnostic; Metadata is the catalog-row decoration the Flows page (Phase 73i) renders.

type NodeID

type NodeID string

NodeID is the in-flow identifier for a node.

type NodeRunRecord

type NodeRunRecord struct {
	// NodeID is the node's identifier within the flow.
	NodeID string
	// Status is the node's outcome within the run.
	Status string
	// Duration is the node's wall-clock duration.
	Duration time.Duration
	// Retries is the number of times the node was retried.
	Retries int
	// ErrorClass is a short classification of a node failure.
	ErrorClass string
}

NodeRunRecord is one node's slice of a run's per-node timeline.

type NodeSpec

type NodeSpec struct {
	Name   string
	Func   engine.NodeFunc
	Policy engine.NodePolicy
	To     []NodeID
}

NodeSpec describes a single node in a Flow Definition.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is the runtime's source-of-truth for registered flows and their run history. It is the seam the Phase 73i Console Flows-page Catalog reads from. It is NOT a test stub — it is a real runtime subsystem: a flow registers into it at agent-definition time, the run loop records each invocation, and the Console projects the catalog from it.

Concurrent reuse (D-025): the Registry is safe for N concurrent callers — every field access is guarded by an RWMutex. Registration and run recording take the write lock; catalog reads take the read lock. The Registry holds no per-call state.

func NewRegistry

func NewRegistry() *Registry

NewRegistry builds an empty flow Registry.

func (*Registry) Definition

func (r *Registry) Definition(name string) (Definition, Metadata, bool)

Definition returns a registered flow's runnable Definition + its Metadata. The bool is false when the name is not registered.

func (*Registry) Names

func (r *Registry) Names() []string

Names returns the registered flow names, sorted lexicographically.

func (*Registry) RecordRun

func (r *Registry) RecordRun(rec RunRecord) error

RecordRun appends a RunRecord to the named flow's run-history ring. An unknown flow name fails loud. The ring is bounded — the oldest record is dropped once the ring is full.

func (*Registry) Register

func (r *Registry) Register(def Definition, meta Metadata) error

Register adds a flow Definition + its catalog Metadata to the Registry. A flow with an empty name, or a name already registered, fails loud — registration is not silently overwritten (CLAUDE.md §5).

func (*Registry) RunByID

func (r *Registry) RunByID(runID string) (RunRecord, bool)

RunByID finds a run record by its RunID across every registered flow. The bool is false when no flow holds a run with that id.

func (*Registry) Runs

func (r *Registry) Runs(name string) ([]RunRecord, bool)

Runs returns a copy of the named flow's run-history ring. The bool is false when the name is not registered. The caller is free to mutate the returned slice — it is a defensive copy.

type RunRecord

type RunRecord struct {
	// RunID is the run's stable identifier.
	RunID string
	// FlowName is the registered flow this run executed.
	FlowName string
	// Identity is the (tenant, user, session) the run executed under.
	Identity identity.Identity
	// Trigger is what initiated the run — "user" / "planner" / "system".
	Trigger string
	// Status is the run's outcome — "running" / "succeeded" / "failed"
	// / "cancelled".
	Status string
	// StartedAt is the wall-clock time the run started.
	StartedAt time.Time
	// Duration is the run's wall-clock duration. Zero for an in-flight
	// run.
	Duration time.Duration
	// CostUSD is the run's recorded cost.
	CostUSD float64
	// ErrorClass is a short classification of a failure. Empty for a
	// non-failed run.
	ErrorClass string
	// NodeStates is the run's per-node execution timeline.
	NodeStates []NodeRunRecord
	// Output is the run's final output preview (post-redaction). The
	// Catalog applies the D-026 heavy-content bypass before shipping it.
	Output string
}

RunRecord is a single recorded invocation of a registered flow. The Registry keeps a bounded run-history ring per flow; the Flows-page Catalog (Phase 73i) projects these into the wire `FlowRun` rows.

A RunRecord is identity-scoped: run history is tenant-scoped (a non-admin Console caller sees only their own tenant's runs), so the record carries the full triple it executed under.

Directories

Path Synopsis
Package protocol implements the runtime side of the Console Flows page (Phase 73i / D-117).
Package protocol implements the runtime side of the Console Flows page (Phase 73i / D-117).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL