protocol

package
v1.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package protocol implements the runtime side of the Console Flows page (Phase 73i / D-117). It is the transport-agnostic surface the six `flows.*` Protocol methods dispatch through:

  • flows.list — paginated catalog of registered flows
  • flows.describe — a flow's full engine-graph description
  • flows.runs.list — a flow's paginated run history
  • flows.runs.describe — a single run's per-node timeline
  • flows.run — invoke a one-shot run of a flow (mutating)
  • flows.metrics — a flow's time-bucketed sparkline metrics

Five methods are read-only; `flows.run` is the single mutating method. The Flows page is view-only at V1 (D-063) — there is no authoring surface here, by construction.

The source-of-truth seam (§4.4)

The Surface depends only on two interfaces — Catalog (the registered flows + their engine-graph descriptions + run history) and Invoker (the one-shot run launcher). The concrete Catalog is the runtime's flow registry; the concrete Invoker is the task registry's `start` path. Keeping the Surface behind interfaces means the wire surface is testable with deterministic fixtures (no live engine) and the concrete registry can evolve without a Protocol-shape break.

Multi-isolation (CLAUDE.md §6)

Identity is mandatory on every method — an incomplete triple fails closed with ErrIdentityRequired. The catalog is per-runtime (flow definitions are tenant-agnostic descriptors), but run history is tenant-scoped: a non-admin caller sees only their own tenant's runs. A cross-tenant filter (a `Tenants` value reaching outside the caller's own tenant, or naming more than one tenant) requires the verified `auth.ScopeAdmin` claim (D-079) — the Surface fails closed with ErrCrossTenantScope when the claim is absent.

The mutating gate (D-079)

`flows.run` mutates: it launches a run. It is gated on identity AND the verified `auth.ScopeAdmin` claim. The Surface never mints a new scope (D-079 closed two-scope set) — `auth.ScopeAdmin` is the run entitlement. A request without the claim fails closed with ErrRunScopeRequired.

Concurrent reuse (D-025)

A Surface is a compiled artifact: every field is set once at construction and never mutated. ServeMethod holds no per-call state — per-call data flows through the request argument. One Surface is safe for N concurrent callers under -race; concurrent_reuse_test.go pins N≥100.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrMisconfigured — NewSurface was called with a nil mandatory
	// dependency.
	ErrMisconfigured = errors.New("flow/protocol: Surface missing a mandatory dependency")
	// ErrIdentityRequired — the request carried an incomplete identity
	// triple. Maps onto CodeIdentityRequired / HTTP 401.
	ErrIdentityRequired = errors.New("flow/protocol: identity scope incomplete")
	// ErrInvalidRequest — the request was structurally malformed (an
	// empty required id, an out-of-range page size). Maps onto
	// CodeInvalidRequest / HTTP 400.
	ErrInvalidRequest = errors.New("flow/protocol: request is invalid")
	// ErrNotFound — the request's target flow or run does not exist.
	// Maps onto CodeNotFound / HTTP 404.
	ErrNotFound = errors.New("flow/protocol: target not found")
	// ErrCrossTenantScope — a cross-tenant filter was requested without
	// the verified `auth.ScopeAdmin` claim. Maps onto
	// CodeIdentityScopeRequired / HTTP 403.
	ErrCrossTenantScope = errors.New("flow/protocol: cross-tenant filter requires the admin scope claim")
	// ErrRunScopeRequired — `flows.run` was called without the verified
	// `auth.ScopeAdmin` claim. Maps onto CodeScopeMismatch / HTTP 403.
	ErrRunScopeRequired = errors.New("flow/protocol: flows.run requires the admin scope claim")
	// ErrRuntime — a Catalog / Invoker call failed for a reason the
	// Surface could not classify. Maps onto CodeRuntimeError / HTTP 500.
	ErrRuntime = errors.New("flow/protocol: runtime error")
)

Sentinel errors the Surface returns. Transport adapters map each onto a canonical Protocol error Code + HTTP status. They are explicit — the Surface never silently degrades (CLAUDE.md §5 fail-loudly).

Functions

This section is empty.

Types

type Catalog

type Catalog interface {
	// ListFlows returns every registered graph-family flow with its
	// aggregate run metrics over the trailing 24h window, scoped to the
	// caller's identity (admin fans across tenants).
	ListFlows(ctx context.Context, id identity.Identity, adminScoped bool) ([]prototypes.Flow, error)
	// DescribeFlow returns a single flow's full engine-graph
	// description. An unknown id returns ErrNotFound.
	DescribeFlow(ctx context.Context, id identity.Identity, adminScoped bool, flowID string) (prototypes.FlowDescription, error)
	// ListRuns returns a flow's run history, newest first, scoped to the
	// caller's identity (admin fans across tenants when crossTenant is
	// requested). An unknown flow id returns ErrNotFound.
	ListRuns(ctx context.Context, id identity.Identity, adminScoped bool, flowID string, tenants []string) ([]prototypes.FlowRun, error)
	// DescribeRun returns a single run's per-node timeline + output
	// reference. An unknown run id returns ErrNotFound. The
	// implementation routes heavy outputs by-reference (D-026).
	DescribeRun(ctx context.Context, id identity.Identity, adminScoped bool, runID string) (prototypes.FlowRunDescription, error)
	// FlowMetrics returns a flow's time-bucketed sparkline aggregates
	// over the requested window. An unknown flow id returns ErrNotFound.
	FlowMetrics(ctx context.Context, id identity.Identity, adminScoped bool, flowID string, window, bucket time.Duration) (prototypes.FlowMetrics, error)
}

Catalog is the read-only source-of-truth seam the Flows-page Surface depends on. The concrete implementation is the runtime's flow registry + run-history store; tests inject a deterministic fixture.

Every method takes the caller identity + an adminScoped flag. The implementation MUST scope run history to the caller's tenant unless adminScoped is true. Flow definitions themselves are tenant-agnostic (they are descriptors registered at agent-definition time), so the catalog listing is not tenant-filtered — but the per-flow run aggregates ARE (a non-admin caller sees only their own tenant's run counts).

type CatalogOption

type CatalogOption func(*RegistryCatalog)

CatalogOption configures a RegistryCatalog at construction.

func WithCatalogClock

func WithCatalogClock(now func() time.Time) CatalogOption

WithCatalogClock overrides the wall-clock seam the trailing-24h-window run aggregates read. A nil clock keeps the default `time.Now`. Production never sets this; the Phase 73i flows integration test injects a fixed clock so its run-count assertions are deterministic regardless of the real wall clock.

type FuncInvoker

type FuncInvoker struct {
	// contains filtered or unexported fields
}

FuncInvoker is the production Invoker implementation. It adapts a runtime-supplied LaunchFunc onto the Invoker interface. It is NOT a test stub (CLAUDE.md §13): the binary binds the LaunchFunc to the real `start` path; the FuncInvoker is the thin adapter that keeps the Flows-page surface decoupled from the task subsystem.

Concurrent reuse (D-025): the FuncInvoker is a compiled artifact — the LaunchFunc is set once at construction and never mutated.

func NewFuncInvoker

func NewFuncInvoker(launch LaunchFunc, registry *flow.Registry) (*FuncInvoker, error)

NewFuncInvoker builds the production Invoker over a runtime-supplied LaunchFunc + the flow Registry. launch is mandatory — a nil fails loud with ErrMisconfigured. registry is mandatory: the invoker rejects a run targeting an unregistered flow before reaching the launcher, so an unknown flow id is a clean CodeNotFound rather than an opaque launcher failure.

The returned *FuncInvoker is immutable after construction and safe for concurrent use by N goroutines.

func (*FuncInvoker) Invoke

func (i *FuncInvoker) Invoke(ctx context.Context, id identity.Identity, flowID string, inputs map[string]any) (prototypes.FlowRunResponse, error)

Invoke launches a one-shot run of flowID under the caller's identity. It rejects an unregistered flow id with ErrNotFound before reaching the launcher.

type Invoker

type Invoker interface {
	// Invoke launches a one-shot run of flowID with the supplied inputs
	// under the caller's identity. It returns the accepted run's
	// identifier + start time. An unknown flow id returns ErrNotFound; a
	// malformed input form returns ErrInvalidRequest.
	Invoke(ctx context.Context, id identity.Identity, flowID string, inputs map[string]any) (prototypes.FlowRunResponse, error)
}

Invoker launches a one-shot run of a registered flow. The concrete implementation wraps the task registry's `start` path; tests inject a deterministic fixture. Invoke runs under the caller's identity — the launched run inherits the (tenant, user, session) triple.

type LaunchFunc

type LaunchFunc func(ctx context.Context, id identity.Identity, flowID string, inputs map[string]any) (runID string, startedAt time.Time, err error)

LaunchFunc is the runtime-supplied launcher the FuncInvoker wraps. It launches a one-shot run of the named flow under the caller's identity and returns the accepted run's identifier + start time. The runtime binds this to its real task-registry `start` path (Phase 54) — the FuncInvoker never reaches the task registry directly, so the Flows- page surface stays decoupled from the task subsystem's concrete type.

A LaunchFunc that targets an unknown flow returns an error wrapping ErrNotFound; a malformed input form returns an error wrapping ErrInvalidRequest. Any other failure is wrapped as ErrRuntime by the Surface's classifyCatalogErr.

type RegistryCatalog

type RegistryCatalog struct {
	// contains filtered or unexported fields
}

RegistryCatalog is the production Catalog implementation. It projects the Console Flows-page wire shapes from a *flow.Registry — the runtime's source-of-truth for registered flows + run history. It is NOT a test stub (CLAUDE.md §13): it is the real catalog backed by the real registry; the binary wires it at boot.

Heavy run outputs are routed by-reference through an ArtifactStore (D-026): a run whose final output meets or exceeds the configured heavy-content threshold ships a FlowArtifactRef, never inline bytes.

Concurrent reuse (D-025): the RegistryCatalog is a compiled artifact — registry / store / threshold are set once at construction. Every method reads through the registry's own RWMutex; the catalog holds no per-call state.

func NewRegistryCatalog

func NewRegistryCatalog(registry *flow.Registry, store artifacts.ArtifactStore, threshold int, opts ...CatalogOption) (*RegistryCatalog, error)

NewRegistryCatalog builds the production Catalog over a flow.Registry + an ArtifactStore. Both are mandatory — a nil fails loud with ErrMisconfigured. threshold is the configured heavy-content byte size (cfg.Artifacts.HeavyOutputThresholdBytes); a non-positive value fails loud (a zero threshold would route every output by-reference).

The returned *RegistryCatalog is immutable after construction and safe for concurrent use by N goroutines.

func (*RegistryCatalog) DescribeFlow

func (c *RegistryCatalog) DescribeFlow(ctx context.Context, id identity.Identity, adminScoped bool, flowID string) (prototypes.FlowDescription, error)

DescribeFlow projects a single flow's full engine-graph description.

func (*RegistryCatalog) DescribeRun

func (c *RegistryCatalog) DescribeRun(ctx context.Context, id identity.Identity, adminScoped bool, runID string) (prototypes.FlowRunDescription, error)

DescribeRun projects a single run's per-node timeline + final-output reference. A run whose output exceeds the heavy-content threshold (D-026) is routed by-reference through the ArtifactStore.

func (*RegistryCatalog) FlowMetrics

func (c *RegistryCatalog) FlowMetrics(ctx context.Context, id identity.Identity, adminScoped bool, flowID string, window, bucket time.Duration) (prototypes.FlowMetrics, error)

FlowMetrics projects a flow's run history into time-bucketed sparkline aggregates over the requested window.

func (*RegistryCatalog) ListFlows

func (c *RegistryCatalog) ListFlows(ctx context.Context, id identity.Identity, adminScoped bool) ([]prototypes.Flow, error)

ListFlows projects every registered flow into a wire Flow row with its aggregate run metrics over the trailing 24h window. The run aggregates are tenant-scoped: a non-admin caller's counts cover only their own tenant.

func (*RegistryCatalog) ListRuns

func (c *RegistryCatalog) ListRuns(ctx context.Context, id identity.Identity, adminScoped bool, flowID string, tenants []string) ([]prototypes.FlowRun, error)

ListRuns projects a flow's run history into wire FlowRun rows, scoped to the caller's identity (admin fans across the requested tenants).

type Surface

type Surface struct {
	// contains filtered or unexported fields
}

Surface is the transport-agnostic Flows-page Protocol surface. It is a compiled artifact (D-025): catalog + invoker are set once at construction and never mutated; ServeMethod holds no per-call state.

func NewSurface

func NewSurface(catalog Catalog, invoker Invoker) (*Surface, error)

NewSurface builds the Flows-page Surface over a Catalog + an Invoker. Both are mandatory — a nil fails loud with ErrMisconfigured rather than building a Surface that would nil-panic on the first request (CLAUDE.md §5). The returned *Surface is immutable after construction and safe for concurrent use by N goroutines.

func (*Surface) Describe

Describe handles `flows.describe`. It validates identity + the flow id, dispatches to the Catalog, and sorts the projection deterministically (nodes by ID, edges by From/To).

func (*Surface) List

List handles `flows.list`. It validates identity, gates a cross-tenant filter on the admin scope, dispatches to the Catalog, and paginates + sorts the result deterministically (by flow ID).

func (*Surface) Metrics

func (s *Surface) Metrics(ctx context.Context, req prototypes.FlowMetricsRequest, adminScoped bool) (prototypes.FlowMetrics, error)

Metrics handles `flows.metrics`. It validates identity + the flow id and dispatches to the Catalog with the resolved window / bucket.

func (*Surface) Run

Run handles `flows.run` — the single mutating Flows-page method. It validates identity, gates the call on the verified admin scope claim (D-079), and dispatches to the Invoker. A request without the claim fails closed with ErrRunScopeRequired.

func (*Surface) RunsDescribe

RunsDescribe handles `flows.runs.describe`. It validates identity + the run id and dispatches to the Catalog. Heavy outputs are routed by-reference by the Catalog (D-026).

func (*Surface) RunsList

RunsList handles `flows.runs.list`. It validates identity + the flow id, gates a cross-tenant filter on the admin scope, dispatches to the Catalog, and paginates + sorts the runs (newest first).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL