agentrun

package
v1.0.0-beta.102 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package agentrun implements the AgentRun lifecycle Participant (ADR-053 D1–D6).

An AgentRun represents the framework-level entity for a nested agentic loop tree: a coordinator loop that spawns research/architect/builder child loops forms a "run" whose lifecycle (dispatched → executing ⇄ awaiting_approval → terminal) is managed here via the pkg/lifecycle harness.

The package provides:

  • AgentRun — the lifecycle.Participant struct (D1)
  • Register — registers the "agent-run" workflow with a lifecycle.Manager (D2)
  • Mint — idempotent run creation at dispatch time (D4)
  • ResolveRun — typed-first resolution with ancestry-walk fallback (D6)
  • MilestoneSubscriber — subscribes to terminal loop events, pre-resolves the run, and fans out to product-registered MilestoneHandlers (D6)

Import discipline: this package imports agentic + pkg/lifecycle only. pkg/lifecycle MUST NOT import agentic/agentrun — verify with go mod graph.

Package agentrun — NATS-backed production adapters.

NATSLoopTripleReader satisfies LoopTripleReader by reading entity triples directly from the ENTITY_STATES KV bucket via a natsclient.Client.

NATSTriplePublisher satisfies TriplePublisher by writing triples through the graph.mutation.triple.add NATS request/response surface (same path as the rule engine's TripleMutator). Neither adapter tracks KV revisions for feedback-loop prevention — they are event-subscriber side effects, not rule-engine transitions.

Both adapters are wired in cmd/semstreams and cmd/e2e-semstreams at startup.

Index

Constants

View Source
const AgentStreamName = "AGENT"

AgentStreamName is the default JetStream stream name for agentic events. Matches agentic-loop config.go default ("AGENT").

View Source
const EntityIDPattern = "*.*.agent.chain.execution.*"

EntityIDPattern matches agent-run chain execution entities in the federated graph. Six-segment shape per the federated EntityID contract; org + platform + instance are wildcarded; domain (agent), system (chain), type (execution) are pinned.

View Source
const (
	// PhasePredicate is the triple carrying the run's current phase.
	PhasePredicate = "agent.run.phase"
)

Predicates stamped by the Manager for agent-run entities.

View Source
const WorkflowName = "agent-run"

WorkflowName is the registered workflow type name for agent-run entries. Matches what AgentRun.Workflow() returns.

Variables

This section is empty.

Functions

func Register

func Register(mgr *lifecycle.Manager) error

Register declares the "agent-run" workflow to the given Manager (ADR-053 D2). Must be called at app startup before any create/transition calls land. Returns ErrWorkflowAlreadyRegistered (from lifecycle package) on duplicate registration.

func WorkflowDeclaration

func WorkflowDeclaration() lifecycle.Workflow

WorkflowDeclaration returns the lifecycle.Workflow ready to pass to Manager.Register (ADR-053 D2). Callers use Register(mgr) rather than calling this directly — it is exported for diagnostic/test use.

Types

type AgentRun

type AgentRun struct {
	// EntityIDField is the full 6-part federated ID: org.platform.agent.chain.execution.<runID>
	// Tagged lifecycle:"id" so the projection layer populates it from the KV key, not a triple.
	EntityIDField string `json:"-" lifecycle:"id"`

	// PhaseField is the current lifecycle phase.
	PhaseField string `json:"phase" lifecycle:"phase,predicate=agent.run.phase"`

	// ParentRunEntityID is the parent run's full entity ID, or empty for root runs.
	ParentRunEntityID string `json:"parent_run_entity_id,omitempty" lifecycle:"predicate=agent.run.parent_entity_id"`
}

AgentRun is the lifecycle.Participant for an agent run (ADR-053 D1).

Field tags:

  • lifecycle:"id" — entity identity (full 6-part ID, from KV key)
  • lifecycle:"phase,predicate=agent.run.phase" — current phase triple
  • lifecycle:"predicate=agent.run.parent_entity_id" — parent run entity ID triple

D1 CRITICAL: EntityIDField MUST hold the FULL 6-part chain.execution entity ID because the projection layer populates it from the entity-state KEY (not a triple). A bare RunID would round-trip to the full dotted key and garble when passed back through TryChainExecutionEntityID (which rejects dots). RunID() derives the bare loop UUID from EntityIDField at read time.

func Mint

func Mint(ctx context.Context, mgr MintableManager, org, platform, rootLoopID string) (*AgentRun, error)

Mint creates (or retrieves if already exists) an AgentRun for the given rootLoopID (ADR-053 D4). The run's entity ID is org.platform.agent.chain.execution.<rootLoopID>, initial phase is "dispatched".

Idempotent: if Manager.Create returns lifecycle.ErrAlreadyExists (the run was already minted — common on JetStream redelivery or concurrent rule firings), Mint treats it as success and returns the existing run via Manager.Get.

NOTE: there is a narrow concurrent-create race (gh#178) where two goroutines both call Mint for the same ID simultaneously; both may observe ErrAlreadyExists but only one wrote the initial "dispatched" phase. The second caller falls back to Manager.Get which returns the already-minted run — this is correct: the run exists and is in a valid state. The gh#178 concern (which caller's Create "won") does not apply here because all callers want the same initial phase ("dispatched") and the run entity is immutable in terms of identity.

func ResolveRun

func ResolveRun(ctx context.Context, mgr MockableManager, reader LoopTripleReader, org, platform, loopID string) (*AgentRun, error)

ResolveRun resolves the AgentRun for a given loop (ADR-053 D6).

Resolution order:

  1. Typed-first: read the loop entity's agent.run triple (the RunID stamped at spawn by buildSpawnIdentityTriples). If present, construct the run entity ID directly.
  2. Ancestry-walk fallback (for pre-migration / un-threaded loops): walk agent.loop.parent triples up to the root (bounded at maxAncestryHops), then use the root loop's ID as the run ID. Logs WARN on the fallback path so un-threaded loops are visible in operator dashboards.

Returns lifecycle.ErrEntityNotFound when neither the typed path nor the walk can locate a valid run entity.

func (*AgentRun) EntityID

func (r *AgentRun) EntityID() string

EntityID returns the full 6-part federated entity ID. Implements lifecycle.Participant.

func (*AgentRun) IsTerminal

func (r *AgentRun) IsTerminal() bool

IsTerminal returns true when the current phase has no declared out-edges. Consults the package-level agentRunTransitions table. Implements lifecycle.Participant.

func (*AgentRun) ParentEntityID

func (r *AgentRun) ParentEntityID() string

ParentEntityID returns the parent run's full entity ID, or "" for root runs. Implements lifecycle.Participant.

func (*AgentRun) Phase

func (r *AgentRun) Phase() string

Phase returns the current lifecycle phase. Implements lifecycle.Participant.

func (*AgentRun) RunID

func (r *AgentRun) RunID() (string, bool)

RunID derives the bare run loop-id from the full entity ID. Returns ("", false) when EntityIDField is not a valid chain.execution entity ID.

The bare RunID == the dispatch-root loop UUID; it is NOT stored as a triple but derived by parsing parts[5] from the 6-part entity ID.

func (*AgentRun) Workflow

func (r *AgentRun) Workflow() string

Workflow returns the registered workflow type name. Implements lifecycle.Participant.

type LoopTerminalEvent

type LoopTerminalEvent struct {
	// LoopID is the bare loop UUID that terminated.
	LoopID string
	// RunID is the bare run loop-id from the event wire (ADR-053 D8).
	RunID string
	// RunEntityID is the full 6-part chain execution entity ID from the wire.
	RunEntityID string
	// Category is the agentic message category that identifies the event type:
	// CategoryLoopCompleted, CategoryLoopFailed, or CategoryLoopCancelled.
	// Cancellation rides agent.complete (not agent.cancelled), so callers MUST
	// demux by Category, not by subject.
	Category string
	// Outcome is the loop outcome string (success/failed/cancelled/truncated).
	Outcome string
	// Role is the loop's role.
	Role string
}

LoopTerminalEvent carries the terminal event data passed to MilestoneHandlers. Product handlers receive this along with the pre-resolved *AgentRun.

type LoopTripleReader

type LoopTripleReader interface {
	// GetLoopRunID reads the agent.run triple from the given loop entity ID.
	// Returns ("", false, nil) when the triple is absent (not an error — the
	// loop simply has no run association).
	// Returns ("", false, err) on read failures (NATS, decode errors).
	GetLoopRunID(ctx context.Context, loopEntityID string) (runID string, ok bool, err error)

	// GetLoopParentEntityID reads the agent.loop.parent triple from the given
	// loop entity ID. Returns ("", false, nil) when absent.
	GetLoopParentEntityID(ctx context.Context, loopEntityID string) (parentEntityID string, ok bool, err error)
}

LoopTripleReader is the narrow interface ResolveRun requires to read entity triples. A concrete *natsclient.Client or any adapter satisfies this. Defined here so callers can mock resolution in tests without a live NATS server.

type MilestoneHandler

type MilestoneHandler interface {
	OnLoopTerminal(ctx context.Context, ev LoopTerminalEvent, run *AgentRun, pub TriplePublisher) error
}

MilestoneHandler is the product-registered handler for terminal loop events. Implementations receive the pre-resolved *AgentRun and a TriplePublisher so they can stamp milestone triples directly onto the run entity via graph-ingest (NOT via Manager — see ADR-053 D5 for the cardinality contract).

type MilestoneSubscriber

type MilestoneSubscriber struct {
	// contains filtered or unexported fields
}

MilestoneSubscriber decodes terminal loop events from NATS (agent.complete.* and agent.failed.*), pre-resolves the run, and fans out to registered handlers (ADR-053 D6, D3).

Terminal authority (D3): the subscriber initiates a run → failed/cancelled transition ONLY when the terminating loop IS the run root AND the run is still in "dispatched" with no children (zombie prevention). Product/coordinator closes runs via lifecycle_transition rule actions in all other cases.

Never call Manager.Complete — with executing→3 terminals it is non-deterministic (manager.go:671). Always use Manager.Transition with an explicit terminal.

func NewMilestoneSubscriber

func NewMilestoneSubscriber(
	mgr *lifecycle.Manager,
	reader LoopTripleReader,
	pub TriplePublisher,
	org, platform string,
	logger *slog.Logger,
) *MilestoneSubscriber

NewMilestoneSubscriber constructs a subscriber wired to the given concrete lifecycle.Manager, reader, publisher, org, and platform. Handlers are registered separately via AddHandler. A nil logger falls back to slog.Default.

Production callers use this constructor with a *lifecycle.Manager. Test callers use NewMilestoneSubscriberWithManager with a mock.

func NewMilestoneSubscriberWithManager

func NewMilestoneSubscriberWithManager(
	mgr MockableManager,
	reader LoopTripleReader,
	pub TriplePublisher,
	org, platform string,
	logger *slog.Logger,
) *MilestoneSubscriber

NewMilestoneSubscriberWithManager constructs a subscriber with any MockableManager (real or test double). Prefer NewMilestoneSubscriber for production callers.

func (*MilestoneSubscriber) AddHandler

func (s *MilestoneSubscriber) AddHandler(h MilestoneHandler)

AddHandler registers a product MilestoneHandler. Must be called before HandleEvent is invoked. Thread-safe with respect to concurrent HandleEvent calls only when called before the subscriber is started.

func (*MilestoneSubscriber) HandleEvent

func (s *MilestoneSubscriber) HandleEvent(ctx context.Context, data []byte) error

HandleEvent processes a raw NATS message payload from agent.complete.* or agent.failed.* subjects. It decodes the BaseMessage envelope, demuxes by payload category (not subject — cancellation rides agent.complete), resolves the AgentRun, applies terminal-authority logic (D3), and fans out to handlers.

Panic guard: each handler invocation is wrapped in a recover so a panicking product handler does not crash the subscriber goroutine.

Returns an error only for infrastructure failures (decode, NATS). Handler errors are logged but do not propagate — the subscriber continues processing subsequent events.

func (*MilestoneSubscriber) Start

func (s *MilestoneSubscriber) Start(ctx context.Context, client *natsclient.Client, cfg StartConfig) (stop func(), err error)

Start wires the MilestoneSubscriber to the live NATS connection using DURABLE JetStream consumers. agent.complete.* and agent.failed.* are published into the AGENT JetStream stream by the agentic-loop component; core Subscribe would drop events during subscriber downtime, violating D3/milestone restart-recovery.

Two stable durable consumers are created (one per filter subject). They survive subscriber restarts and resume from the last-acked message.

cfg.StreamName must be non-empty (use AgentStreamName as the default). The ctx controls the consumers' lifetime; Stop cancels consumption but preserves the durable consumer offsets in NATS for restart recovery.

Returns a stop func that cancels consumption — call it on shutdown.

type MintableManager

type MintableManager interface {
	// Get reads the entity at entityID for the given workflow.
	Get(ctx context.Context, workflow, entityID string) (lifecycle.Participant, error)
	// Create attaches lifecycle to the entity. Returns lifecycle.ErrAlreadyExists
	// when already lifecycle-managed.
	Create(ctx context.Context, initial lifecycle.Participant) error
}

MintableManager is the narrowest interface Mint requires — Create + Get only. This lets the rule engine's LifecycleManager satisfy the Mint call without requiring Transition (which the rule path never uses for minting). Production callers use *lifecycle.Manager which satisfies both MintableManager and MockableManager. The rule engine's LifecycleManager satisfies MintableManager.

type MockableManager

type MockableManager interface {
	// Get reads the entity at entityID for the given workflow and projects its
	// triples into a fresh Participant. Returns lifecycle.ErrEntityNotFound when absent.
	Get(ctx context.Context, workflow, entityID string) (lifecycle.Participant, error)
	// Transition moves the entity to newPhase. Used by the terminal-authority
	// zombie-prevention path (D3). Never called with Manager.Complete — use
	// an explicit terminal phase.
	Transition(ctx context.Context, workflow, entityID, newPhase string, source lifecycle.TransitionSource, note string) error
	// Create attaches lifecycle to the entity at initial.EntityID(). Returns
	// lifecycle.ErrAlreadyExists when already lifecycle-managed.
	Create(ctx context.Context, initial lifecycle.Participant) error
}

MockableManager is the subset of *lifecycle.Manager that MilestoneSubscriber uses, expressed as an interface so tests can inject a mock without constructing a real Manager. Production callers use *lifecycle.Manager which satisfies this.

type NATSLoopTripleReader

type NATSLoopTripleReader struct {
	// contains filtered or unexported fields
}

NATSLoopTripleReader implements LoopTripleReader backed by the ENTITY_STATES KV bucket. Reads are lazy-bucket: the bucket is opened on first use and cached for subsequent calls.

Satisfies the LoopTripleReader interface required by ResolveRun and MilestoneSubscriber.

func NewNATSLoopTripleReader

func NewNATSLoopTripleReader(client *natsclient.Client) *NATSLoopTripleReader

NewNATSLoopTripleReader constructs a LoopTripleReader backed by a natsclient. The client must be connected; the ENTITY_STATES bucket is opened lazily on first read.

func (*NATSLoopTripleReader) GetLoopParentEntityID

func (r *NATSLoopTripleReader) GetLoopParentEntityID(ctx context.Context, loopEntityID string) (string, bool, error)

GetLoopParentEntityID reads the agent.loop.parent triple from the given loop entity ID. Returns ("", false, nil) when absent.

func (*NATSLoopTripleReader) GetLoopRunID

func (r *NATSLoopTripleReader) GetLoopRunID(ctx context.Context, loopEntityID string) (string, bool, error)

GetLoopRunID reads the agent.run triple from the given loop entity ID. Returns ("", false, nil) when the triple is absent (not an error). Returns ("", false, err) on NATS or decode failures.

type NATSTriplePublisher

type NATSTriplePublisher struct {
	// contains filtered or unexported fields
}

NATSTriplePublisher satisfies agentrun.TriplePublisher via the graph.mutation.triple.add NATS surface. Used by MilestoneSubscriber to stamp milestone triples onto run entities via graph-ingest.

The ruleID argument is accepted to satisfy the interface but is NOT forwarded — milestone writes are not rule-engine transitions and need no feedback-loop revision tracking.

func NewNATSTriplePublisher

func NewNATSTriplePublisher(client *natsclient.Client) *NATSTriplePublisher

NewNATSTriplePublisher builds a TriplePublisher backed by the shared graph.mutation.triple.add NATS surface.

func (*NATSTriplePublisher) AddTriple

func (p *NATSTriplePublisher) AddTriple(ctx context.Context, _ string, triple message.Triple) (uint64, error)

AddTriple writes a triple through graph-ingest and returns the KV revision.

type StartConfig

type StartConfig struct {
	// StreamName is the JetStream stream that holds agent.complete.* and
	// agent.failed.* messages (default "AGENT"). Must match the agentic-loop's
	// stream_name config value.
	StreamName string

	// ConsumerNameSuffix is appended to the stable durable consumer names to
	// disambiguate instances (e.g. test-specific suffixes). Empty = no suffix.
	ConsumerNameSuffix string
}

StartConfig configures the durable JetStream consumers created by Start. Zero-value is invalid — callers must supply a non-empty StreamName.

type TriplePublisher

type TriplePublisher interface {
	// AddTriple writes a triple via graph-ingest. Returns the KV revision after write.
	AddTriple(ctx context.Context, ruleID string, triple message.Triple) (uint64, error)
}

TriplePublisher is the narrow interface MilestoneHandlers use to write milestone triples onto the run entity (or any entity) via graph-ingest. Reuse the TripleMutator interface shape from the rule action executor; a concrete natsclient-backed implementation satisfies this.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL