Documentation
¶
Overview ¶
Package protocol implements the runtime side of the Console Flows page (Phase 73i / D-117). It is the transport-agnostic surface the six `flows.*` Protocol methods dispatch through:
- flows.list — paginated catalog of registered flows
- flows.describe — a flow's full engine-graph description
- flows.runs.list — a flow's paginated run history
- flows.runs.describe — a single run's per-node timeline
- flows.run — invoke a one-shot run of a flow (mutating)
- flows.metrics — a flow's time-bucketed sparkline metrics
Five methods are read-only; `flows.run` is the single mutating method. The Flows page is view-only at V1 (D-063) — there is no authoring surface here, by construction.
The source-of-truth seam (§4.4) ¶
The Surface depends only on two interfaces — Catalog (the registered flows + their engine-graph descriptions + run history) and Invoker (the one-shot run launcher). The concrete Catalog is the runtime's flow registry; the concrete Invoker is the task registry's `start` path. Keeping the Surface behind interfaces means the wire surface is testable with deterministic fixtures (no live engine) and the concrete registry can evolve without a Protocol-shape break.
Multi-isolation (CLAUDE.md §6) ¶
Identity is mandatory on every method — an incomplete triple fails closed with ErrIdentityRequired. The catalog is per-runtime (flow definitions are tenant-agnostic descriptors), but run history is tenant-scoped: a non-admin caller sees only their own tenant's runs. A cross-tenant filter (a `Tenants` value reaching outside the caller's own tenant, or naming more than one tenant) requires the verified `auth.ScopeAdmin` claim (D-079) — the Surface fails closed with ErrCrossTenantScope when the claim is absent.
The mutating gate (D-079) ¶
`flows.run` mutates: it launches a run. It is gated on identity AND the verified `auth.ScopeAdmin` claim. The Surface never mints a new scope (D-079 closed two-scope set) — `auth.ScopeAdmin` is the run entitlement. A request without the claim fails closed with ErrRunScopeRequired.
Concurrent reuse (D-025) ¶
A Surface is a compiled artifact: every field is set once at construction and never mutated. ServeMethod holds no per-call state — per-call data flows through the request argument. One Surface is safe for N concurrent callers under -race; concurrent_reuse_test.go pins N≥100.
Index ¶
- Variables
- type Catalog
- type CatalogOption
- type FuncInvoker
- type Invoker
- type LaunchFunc
- type RegistryCatalog
- func (c *RegistryCatalog) DescribeFlow(ctx context.Context, id identity.Identity, adminScoped bool, flowID string) (prototypes.FlowDescription, error)
- func (c *RegistryCatalog) DescribeRun(ctx context.Context, id identity.Identity, adminScoped bool, runID string) (prototypes.FlowRunDescription, error)
- func (c *RegistryCatalog) FlowMetrics(ctx context.Context, id identity.Identity, adminScoped bool, flowID string, ...) (prototypes.FlowMetrics, error)
- func (c *RegistryCatalog) ListFlows(ctx context.Context, id identity.Identity, adminScoped bool) ([]prototypes.Flow, error)
- func (c *RegistryCatalog) ListRuns(ctx context.Context, id identity.Identity, adminScoped bool, flowID string, ...) ([]prototypes.FlowRun, error)
- type Surface
- func (s *Surface) Describe(ctx context.Context, req prototypes.FlowDescribeRequest, adminScoped bool) (prototypes.FlowDescription, error)
- func (s *Surface) List(ctx context.Context, req prototypes.FlowListRequest, adminScoped bool) (prototypes.FlowListResponse, error)
- func (s *Surface) Metrics(ctx context.Context, req prototypes.FlowMetricsRequest, adminScoped bool) (prototypes.FlowMetrics, error)
- func (s *Surface) Run(ctx context.Context, req prototypes.FlowRunRequest, adminScoped bool) (prototypes.FlowRunResponse, error)
- func (s *Surface) RunsDescribe(ctx context.Context, req prototypes.FlowRunDescribeRequest, adminScoped bool) (prototypes.FlowRunDescription, error)
- func (s *Surface) RunsList(ctx context.Context, req prototypes.FlowRunsListRequest, adminScoped bool) (prototypes.FlowRunsListResponse, error)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrMisconfigured — NewSurface was called with a nil mandatory // dependency. ErrMisconfigured = errors.New("flow/protocol: Surface missing a mandatory dependency") // ErrIdentityRequired — the request carried an incomplete identity // triple. Maps onto CodeIdentityRequired / HTTP 401. ErrIdentityRequired = errors.New("flow/protocol: identity scope incomplete") // ErrInvalidRequest — the request was structurally malformed (an // empty required id, an out-of-range page size). Maps onto // CodeInvalidRequest / HTTP 400. ErrInvalidRequest = errors.New("flow/protocol: request is invalid") // ErrNotFound — the request's target flow or run does not exist. // Maps onto CodeNotFound / HTTP 404. ErrNotFound = errors.New("flow/protocol: target not found") // ErrCrossTenantScope — a cross-tenant filter was requested without // the verified `auth.ScopeAdmin` claim. Maps onto // CodeIdentityScopeRequired / HTTP 403. ErrCrossTenantScope = errors.New("flow/protocol: cross-tenant filter requires the admin scope claim") // ErrRunScopeRequired — `flows.run` was called without the verified // `auth.ScopeAdmin` claim. Maps onto CodeScopeMismatch / HTTP 403. ErrRunScopeRequired = errors.New("flow/protocol: flows.run requires the admin scope claim") // ErrRuntime — a Catalog / Invoker call failed for a reason the // Surface could not classify. Maps onto CodeRuntimeError / HTTP 500. ErrRuntime = errors.New("flow/protocol: runtime error") )
Sentinel errors the Surface returns. Transport adapters map each onto a canonical Protocol error Code + HTTP status. They are explicit — the Surface never silently degrades (CLAUDE.md §5 fail-loudly).
Functions ¶
This section is empty.
Types ¶
type Catalog ¶
type Catalog interface {
// ListFlows returns every registered graph-family flow with its
// aggregate run metrics over the trailing 24h window, scoped to the
// caller's identity (admin fans across tenants).
ListFlows(ctx context.Context, id identity.Identity, adminScoped bool) ([]prototypes.Flow, error)
// DescribeFlow returns a single flow's full engine-graph
// description. An unknown id returns ErrNotFound.
DescribeFlow(ctx context.Context, id identity.Identity, adminScoped bool, flowID string) (prototypes.FlowDescription, error)
// ListRuns returns a flow's run history, newest first, scoped to the
// caller's identity (admin fans across tenants when crossTenant is
// requested). An unknown flow id returns ErrNotFound.
ListRuns(ctx context.Context, id identity.Identity, adminScoped bool, flowID string, tenants []string) ([]prototypes.FlowRun, error)
// DescribeRun returns a single run's per-node timeline + output
// reference. An unknown run id returns ErrNotFound. The
// implementation routes heavy outputs by-reference (D-026).
DescribeRun(ctx context.Context, id identity.Identity, adminScoped bool, runID string) (prototypes.FlowRunDescription, error)
// FlowMetrics returns a flow's time-bucketed sparkline aggregates
// over the requested window. An unknown flow id returns ErrNotFound.
FlowMetrics(ctx context.Context, id identity.Identity, adminScoped bool, flowID string, window, bucket time.Duration) (prototypes.FlowMetrics, error)
}
Catalog is the read-only source-of-truth seam the Flows-page Surface depends on. The concrete implementation is the runtime's flow registry + run-history store; tests inject a deterministic fixture.
Every method takes the caller identity + an adminScoped flag. The implementation MUST scope run history to the caller's tenant unless adminScoped is true. Flow definitions themselves are tenant-agnostic (they are descriptors registered at agent-definition time), so the catalog listing is not tenant-filtered — but the per-flow run aggregates ARE (a non-admin caller sees only their own tenant's run counts).
type CatalogOption ¶
type CatalogOption func(*RegistryCatalog)
CatalogOption configures a RegistryCatalog at construction.
func WithCatalogClock ¶
func WithCatalogClock(now func() time.Time) CatalogOption
WithCatalogClock overrides the wall-clock seam the trailing-24h-window run aggregates read. A nil clock keeps the default `time.Now`. Production never sets this; the Phase 73i flows integration test injects a fixed clock so its run-count assertions are deterministic regardless of the real wall clock.
type FuncInvoker ¶
type FuncInvoker struct {
// contains filtered or unexported fields
}
FuncInvoker is the production Invoker implementation. It adapts a runtime-supplied LaunchFunc onto the Invoker interface. It is NOT a test stub (CLAUDE.md §13): the binary binds the LaunchFunc to the real `start` path; the FuncInvoker is the thin adapter that keeps the Flows-page surface decoupled from the task subsystem.
Concurrent reuse (D-025): the FuncInvoker is a compiled artifact — the LaunchFunc is set once at construction and never mutated.
func NewFuncInvoker ¶
func NewFuncInvoker(launch LaunchFunc, registry *flow.Registry) (*FuncInvoker, error)
NewFuncInvoker builds the production Invoker over a runtime-supplied LaunchFunc + the flow Registry. launch is mandatory — a nil fails loud with ErrMisconfigured. registry is mandatory: the invoker rejects a run targeting an unregistered flow before reaching the launcher, so an unknown flow id is a clean CodeNotFound rather than an opaque launcher failure.
The returned *FuncInvoker is immutable after construction and safe for concurrent use by N goroutines.
func (*FuncInvoker) Invoke ¶
func (i *FuncInvoker) Invoke(ctx context.Context, id identity.Identity, flowID string, inputs map[string]any) (prototypes.FlowRunResponse, error)
Invoke launches a one-shot run of flowID under the caller's identity. It rejects an unregistered flow id with ErrNotFound before reaching the launcher.
type Invoker ¶
type Invoker interface {
// Invoke launches a one-shot run of flowID with the supplied inputs
// under the caller's identity. It returns the accepted run's
// identifier + start time. An unknown flow id returns ErrNotFound; a
// malformed input form returns ErrInvalidRequest.
Invoke(ctx context.Context, id identity.Identity, flowID string, inputs map[string]any) (prototypes.FlowRunResponse, error)
}
Invoker launches a one-shot run of a registered flow. The concrete implementation wraps the task registry's `start` path; tests inject a deterministic fixture. Invoke runs under the caller's identity — the launched run inherits the (tenant, user, session) triple.
type LaunchFunc ¶
type LaunchFunc func(ctx context.Context, id identity.Identity, flowID string, inputs map[string]any) (runID string, startedAt time.Time, err error)
LaunchFunc is the runtime-supplied launcher the FuncInvoker wraps. It launches a one-shot run of the named flow under the caller's identity and returns the accepted run's identifier + start time. The runtime binds this to its real task-registry `start` path (Phase 54) — the FuncInvoker never reaches the task registry directly, so the Flows- page surface stays decoupled from the task subsystem's concrete type.
A LaunchFunc that targets an unknown flow returns an error wrapping ErrNotFound; a malformed input form returns an error wrapping ErrInvalidRequest. Any other failure is wrapped as ErrRuntime by the Surface's classifyCatalogErr.
type RegistryCatalog ¶
type RegistryCatalog struct {
// contains filtered or unexported fields
}
RegistryCatalog is the production Catalog implementation. It projects the Console Flows-page wire shapes from a *flow.Registry — the runtime's source-of-truth for registered flows + run history. It is NOT a test stub (CLAUDE.md §13): it is the real catalog backed by the real registry; the binary wires it at boot.
Heavy run outputs are routed by-reference through an ArtifactStore (D-026): a run whose final output meets or exceeds the configured heavy-content threshold ships a FlowArtifactRef, never inline bytes.
Concurrent reuse (D-025): the RegistryCatalog is a compiled artifact — registry / store / threshold are set once at construction. Every method reads through the registry's own RWMutex; the catalog holds no per-call state.
func NewRegistryCatalog ¶
func NewRegistryCatalog(registry *flow.Registry, store artifacts.ArtifactStore, threshold int, opts ...CatalogOption) (*RegistryCatalog, error)
NewRegistryCatalog builds the production Catalog over a flow.Registry + an ArtifactStore. Both are mandatory — a nil fails loud with ErrMisconfigured. threshold is the configured heavy-content byte size (cfg.Artifacts.HeavyOutputThresholdBytes); a non-positive value fails loud (a zero threshold would route every output by-reference).
The returned *RegistryCatalog is immutable after construction and safe for concurrent use by N goroutines.
func (*RegistryCatalog) DescribeFlow ¶
func (c *RegistryCatalog) DescribeFlow(ctx context.Context, id identity.Identity, adminScoped bool, flowID string) (prototypes.FlowDescription, error)
DescribeFlow projects a single flow's full engine-graph description.
func (*RegistryCatalog) DescribeRun ¶
func (c *RegistryCatalog) DescribeRun(ctx context.Context, id identity.Identity, adminScoped bool, runID string) (prototypes.FlowRunDescription, error)
DescribeRun projects a single run's per-node timeline + final-output reference. A run whose output exceeds the heavy-content threshold (D-026) is routed by-reference through the ArtifactStore.
func (*RegistryCatalog) FlowMetrics ¶
func (c *RegistryCatalog) FlowMetrics(ctx context.Context, id identity.Identity, adminScoped bool, flowID string, window, bucket time.Duration) (prototypes.FlowMetrics, error)
FlowMetrics projects a flow's run history into time-bucketed sparkline aggregates over the requested window.
func (*RegistryCatalog) ListFlows ¶
func (c *RegistryCatalog) ListFlows(ctx context.Context, id identity.Identity, adminScoped bool) ([]prototypes.Flow, error)
ListFlows projects every registered flow into a wire Flow row with its aggregate run metrics over the trailing 24h window. The run aggregates are tenant-scoped: a non-admin caller's counts cover only their own tenant.
func (*RegistryCatalog) ListRuns ¶
func (c *RegistryCatalog) ListRuns(ctx context.Context, id identity.Identity, adminScoped bool, flowID string, tenants []string) ([]prototypes.FlowRun, error)
ListRuns projects a flow's run history into wire FlowRun rows, scoped to the caller's identity (admin fans across the requested tenants).
type Surface ¶
type Surface struct {
// contains filtered or unexported fields
}
Surface is the transport-agnostic Flows-page Protocol surface. It is a compiled artifact (D-025): catalog + invoker are set once at construction and never mutated; ServeMethod holds no per-call state.
func NewSurface ¶
NewSurface builds the Flows-page Surface over a Catalog + an Invoker. Both are mandatory — a nil fails loud with ErrMisconfigured rather than building a Surface that would nil-panic on the first request (CLAUDE.md §5). The returned *Surface is immutable after construction and safe for concurrent use by N goroutines.
func (*Surface) Describe ¶
func (s *Surface) Describe(ctx context.Context, req prototypes.FlowDescribeRequest, adminScoped bool) (prototypes.FlowDescription, error)
Describe handles `flows.describe`. It validates identity + the flow id, dispatches to the Catalog, and sorts the projection deterministically (nodes by ID, edges by From/To).
func (*Surface) List ¶
func (s *Surface) List(ctx context.Context, req prototypes.FlowListRequest, adminScoped bool) (prototypes.FlowListResponse, error)
List handles `flows.list`. It validates identity, gates a cross-tenant filter on the admin scope, dispatches to the Catalog, and paginates + sorts the result deterministically (by flow ID).
func (*Surface) Metrics ¶
func (s *Surface) Metrics(ctx context.Context, req prototypes.FlowMetricsRequest, adminScoped bool) (prototypes.FlowMetrics, error)
Metrics handles `flows.metrics`. It validates identity + the flow id and dispatches to the Catalog with the resolved window / bucket.
func (*Surface) Run ¶
func (s *Surface) Run(ctx context.Context, req prototypes.FlowRunRequest, adminScoped bool) (prototypes.FlowRunResponse, error)
Run handles `flows.run` — the single mutating Flows-page method. It validates identity, gates the call on the verified admin scope claim (D-079), and dispatches to the Invoker. A request without the claim fails closed with ErrRunScopeRequired.
func (*Surface) RunsDescribe ¶
func (s *Surface) RunsDescribe(ctx context.Context, req prototypes.FlowRunDescribeRequest, adminScoped bool) (prototypes.FlowRunDescription, error)
RunsDescribe handles `flows.runs.describe`. It validates identity + the run id and dispatches to the Catalog. Heavy outputs are routed by-reference by the Catalog (D-026).
func (*Surface) RunsList ¶
func (s *Surface) RunsList(ctx context.Context, req prototypes.FlowRunsListRequest, adminScoped bool) (prototypes.FlowRunsListResponse, error)
RunsList handles `flows.runs.list`. It validates identity + the flow id, gates a cross-tenant filter on the admin scope, dispatches to the Catalog, and paginates + sorts the runs (newest first).