Documentation
¶
Overview ¶
Package agenticloop provides the agentic loop orchestrator component. It manages the state machine for agentic loops, coordinating between model calls and tool executions while capturing execution trajectories.
Package agenticloop provides the loop orchestrator for the SemStreams agentic system.
Overview ¶
The agentic-loop processor orchestrates autonomous agent execution by managing the lifecycle of agentic loops. It coordinates communication between the model processor (LLM calls) and tools processor (tool execution), tracks state through a 10-state machine, supports signal handling for user control, manages context memory with automatic compaction, and captures complete execution trajectories for observability.
This is the central component of the agentic system - it receives task requests, routes messages between model and tools, handles iteration limits, processes control signals, manages context memory, and publishes completion events.
Architecture ¶
The loop orchestrator sits at the center of the agentic component family:
┌─────────────────┐
agent.task.* ──▶ │ │ ──▶ agent.request.*
│ agentic-loop │
agent.response.>◀─│ (this pkg) │◀── agent.response.*
│ │
tool.result.> ──▶│ │ ──▶ tool.execute.*
│ │
agent.signal.* ──▶│ │ ──▶ agent.complete.*
│ │
│ │ ──▶ agent.context.compaction.*
└────────┬────────┘
│
┌────────┴────────┐
│ NATS KV │
│ AGENT_LOOPS │
│ AGENT_TRAJ... │
└─────────────────┘
Message Flow ¶
A typical loop execution follows this pattern:
- External system publishes TaskMessage to agent.task.*
- Loop creates LoopEntity, starts Trajectory, publishes AgentRequest to agent.request.*
- agentic-model processes request, publishes AgentResponse to agent.response.*
- Loop receives response: - If status="tool_call": publishes ToolCall to tool.execute.* for each tool - If status="complete": publishes completion to agent.complete.* - If status="error": marks loop as failed
- agentic-tools executes tools, publishes ToolResult to tool.result.*
- Loop receives tool results, when all complete: increments iteration, sends next request
- Cycle repeats until complete, failed, or max iterations reached
State Machine ¶
Loops progress through ten states defined in the agentic package:
exploring → planning → architecting → executing → reviewing → complete
↑ ↑ ↑ ↑ ↑ ↘ failed
└──────────┴────────────┴─────────────┴───────────┘ ↘ cancelled
↘ paused
↘ awaiting_approval
States:
- exploring: Initial state, gathering information
- planning: Developing approach
- architecting: Designing solution
- executing: Implementing solution
- reviewing: Validating results
- complete: Successfully finished (terminal)
- failed: Failed due to error or max iterations (terminal)
- cancelled: Cancelled by user signal (terminal)
- paused: Paused by user signal, can resume
- awaiting_approval: Waiting for user approval
States are fluid checkpoints - the loop can transition backward (e.g., from executing back to exploring) to support agent rethinking. Only terminal states (complete, failed, cancelled) prevent further transitions.
State transitions are managed by the LoopManager and persisted to NATS KV.
Signal Handling ¶
The loop accepts control signals via the agent.signal.* input port:
signal := agentic.UserSignal{
SignalID: "sig_abc123",
Type: "cancel", // cancel, pause, resume, approve, reject, feedback, retry
LoopID: "loop_456",
UserID: "user_789",
ChannelType: "cli",
ChannelID: "session_001",
Timestamp: time.Now(),
}
Signal types and their effects:
- cancel: Stop execution immediately, transition to cancelled state
- pause: Pause at next checkpoint, transition to paused state
- resume: Continue paused loop, restore previous state
- approve: Approve pending result, transition to complete
- reject: Reject with optional reason, transition to failed
- feedback: Add feedback without decision, no state change
- retry: Retry failed loop, transition to exploring
Context Management ¶
The loop includes automatic context memory management to handle long-running conversations that approach model token limits.
Context is organized into priority regions (lower priority evicted first):
- tool_results (priority 1) - Tool execution results, GC'd by age
- recent_history (priority 2) - Recent conversation messages
- hydrated_context (priority 3) - Retrieved context from memory
- compacted_history (priority 4) - Summarized old conversation
- system_prompt (priority 5) - Never evicted
Configuration:
context := agenticloop.ContextConfig{
Enabled: true,
CompactThreshold: 0.60, // Trigger compaction at 60% utilization
HeadroomTokens: 6400, // Reserve tokens for new content
}
Model context limits are resolved from the unified model registry (component.Dependencies.ModelRegistry). If a model is not found in the registry, DefaultContextLimit (128000) is used as fallback.
Context events are published to agent.context.compaction.*:
- compaction_starting: Context approaching limit, compaction starting
- compaction_complete: Compaction finished, includes tokens saved
Component Architecture ¶
The package is organized into three main components:
**LoopManager** - Manages loop entity lifecycle:
manager := NewLoopManager()
// Create a loop
loopID, err := manager.CreateLoop("task_123", "general", "gpt-4", 20)
// State transitions
err = manager.TransitionLoop(loopID, agentic.LoopStateExecuting)
// Iteration tracking
err = manager.IncrementIteration(loopID)
// Pending tool management
manager.AddPendingTool(loopID, "call_001")
manager.RemovePendingTool(loopID, "call_001")
allDone := manager.AllToolsComplete(loopID)
// Context management
cm := manager.GetContextManager(loopID)
**TrajectoryManager** - Captures execution traces:
trajManager := NewTrajectoryManager()
// Start trajectory for a loop
trajectory, err := trajManager.StartTrajectory(loopID)
// Add steps (model calls, tool calls)
trajManager.AddStep(loopID, agentic.TrajectoryStep{
Timestamp: time.Now(),
StepType: "model_call",
TokensIn: 150,
TokensOut: 200,
})
// Complete trajectory
trajectory, err = trajManager.CompleteTrajectory(loopID, "complete")
**MessageHandler** - Routes and processes messages:
handler := NewMessageHandler(config)
// Handle incoming task
result, err := handler.HandleTask(ctx, TaskMessage{
TaskID: "task_123",
Role: "general",
Model: "gpt-4",
Prompt: "Analyze this code for bugs",
})
// Handle model response
result, err = handler.HandleModelResponse(ctx, loopID, response)
// Handle tool result
result, err = handler.HandleToolResult(ctx, loopID, toolResult)
Configuration ¶
The processor is configured via JSON:
{
"max_iterations": 20,
"timeout": "120s",
"stream_name": "AGENT",
"loops_bucket": "AGENT_LOOPS",
"trajectories_bucket": "AGENT_TRAJECTORIES",
"context": {
"enabled": true,
"compact_threshold": 0.60,
"headroom_tokens": 6400,
},
"ports": {
"inputs": [...],
"outputs": [...]
}
}
Configuration fields:
- max_iterations: Maximum loop iterations before failure (default: 20, range: 1-1000)
- timeout: Loop execution timeout as duration string (default: "120s")
- stream_name: JetStream stream name for agentic messages (default: "AGENT")
- loops_bucket: NATS KV bucket for loop state (default: "AGENT_LOOPS")
- trajectories_bucket: NATS KV bucket for trajectories (default: "AGENT_TRAJECTORIES")
- consumer_name_suffix: Optional suffix for JetStream consumer names (for testing)
- context: Context management configuration (see ContextConfig)
- ports: Port configuration for inputs and outputs
Ports ¶
Input ports (JetStream consumers):
- agent.task: Task requests from external systems (subject: agent.task.*)
- agent.response: Model responses from agentic-model (subject: agent.response.>)
- tool.result: Tool results from agentic-tools (subject: tool.result.>)
- agent.signal: Control signals for loops (subject: agent.signal.*)
Output ports (JetStream publishers):
- agent.request: Model requests to agentic-model (subject: agent.request.*)
- tool.execute: Tool execution requests to agentic-tools (subject: tool.execute.*)
- agent.complete: Loop completion events (subject: agent.complete.*)
- agent.context.compaction: Context compaction events (subject: agent.context.compaction.*)
KV write ports:
- loops: Loop entity state (bucket: AGENT_LOOPS)
- trajectories: Trajectory data (bucket: AGENT_TRAJECTORIES)
KV Storage ¶
Loop state and trajectories are persisted to NATS KV for durability and queryability:
**AGENT_LOOPS bucket**: Stores LoopEntity as JSON, keyed by loop ID
{
"id": "loop_123",
"task_id": "task_456",
"state": "executing",
"role": "general",
"model": "gpt-4",
"iterations": 3,
"max_iterations": 20,
"parent_loop_id": "",
"pause_requested": false,
"user_id": "user_789",
"channel_type": "cli",
"channel_id": "session_001"
}
**COMPLETE_{loopID}**: Written when a loop completes, for rules engine consumption
{
"loop_id": "loop_123",
"task_id": "task_456",
"outcome": "success",
"role": "architect",
"result": "Designed authentication system...",
"model": "gpt-4",
"iterations": 3,
"parent_loop": ""
}
**AGENT_TRAJECTORIES bucket**: Stores Trajectory as JSON, keyed by loop ID
{
"loop_id": "loop_123",
"start_time": "2024-01-15T10:30:00Z",
"end_time": "2024-01-15T10:31:45Z",
"steps": [...],
"outcome": "complete",
"total_tokens_in": 1500,
"total_tokens_out": 800,
"duration": 105000
}
KV buckets are created automatically if they don't exist during component startup.
Rules/Workflow Integration ¶
The loop integrates with the rules engine for orchestration:
- On completion, writes COMPLETE_{loopID} key to KV
- Rules engine watches COMPLETE_* keys
- Rules can trigger follow-up actions (e.g., spawn editor when architect completes)
Architect/Editor pattern:
- Task arrives with role="architect"
- Architect loop executes and produces a plan
- On completion, COMPLETE_{loopID} written with role="architect"
- Rule matches COMPLETE_* where role="architect"
- Rule spawns new loop with role="editor", parent_loop={loopID}
- Editor receives architect's output as context
agentic-memory Integration ¶
The loop publishes context events that agentic-memory consumes:
- compaction_starting: agentic-memory extracts facts before compaction
- compaction_complete: agentic-memory injects recovered context
Quick Start ¶
Create and start the component:
config := agenticloop.Config{
MaxIterations: 20,
Timeout: "120s",
StreamName: "AGENT",
LoopsBucket: "AGENT_LOOPS",
Context: agenticloop.DefaultContextConfig(),
}
rawConfig, _ := json.Marshal(config)
comp, err := agenticloop.NewComponent(rawConfig, deps)
lc := comp.(component.LifecycleComponent)
lc.Initialize()
lc.Start(ctx)
defer lc.Stop(5 * time.Second)
Publish a task:
task := agenticloop.TaskMessage{
TaskID: "analyze_code",
Role: "general",
Model: "gpt-4",
Prompt: "Review main.go for security issues",
}
taskData, _ := json.Marshal(task)
natsClient.PublishToStream(ctx, "agent.task.review", taskData)
Thread Safety ¶
The LoopManager, TrajectoryManager, and ContextManager are thread-safe, using RWMutex for concurrent access. Multiple goroutines can safely:
- Create and manage different loops concurrently
- Read loop state while other loops are being modified
- Track pending tools across concurrent tool executions
- Add messages to context regions
The Component itself is not designed for concurrent Start/Stop calls.
Error Handling ¶
Errors are handled at multiple levels:
- Validation errors: Returned immediately from handlers
- State transition errors: Logged, loop may continue or fail depending on severity
- Max iterations: Loop transitions to failed state, not returned as error
- KV persistence errors: Logged but don't block message processing
- Context cancellation: Propagated up, handlers check ctx.Err() early
Observability ¶
The component provides observability through:
- Structured logging (slog) for all significant events
- Trajectory capture for complete execution audit trails
- Context events for memory management visibility
- Health status via Health() method
- Flow metrics via DataFlow() method
Testing ¶
For testing, use the ConsumerNameSuffix config option to create unique JetStream consumer names per test:
config := agenticloop.Config{
StreamName: "AGENT",
ConsumerNameSuffix: "test-" + t.Name(),
// ...
}
This prevents consumer name conflicts when running tests in parallel.
Limitations ¶
Current limitations:
- No streaming support for partial responses
- Trajectory size limited by NATS KV (1MB default)
- No built-in retry for failed tool executions
- Context summarization requires LLM call (cost consideration)
See Also ¶
Related packages:
- agentic: Shared types (LoopEntity, AgentRequest, UserSignal, etc.)
- processor/agentic-model: LLM endpoint integration
- processor/agentic-tools: Tool execution framework
- processor/agentic-memory: Graph-backed agent memory
- processor/agentic-dispatch: User message routing
- processor/workflow: Multi-step orchestration
Package agenticloop — subject-mode tool-call governance dispatcher (ADR-039).
The dispatcher sits between the model's tool-call response and the loop's serial-dispatch fan-out. It implements three modes per ToolCallGovernanceConfig.Mode:
- disabled: pass-through. All calls return as approved without publishing. No governance gate active. (Beta.70 retired the beta.67+68 in-process ToolCallFilter wiring; subject-mode is now the sole governance path.)
- audit: publishes each proposed call to agent.toolcall.proposed.* but returns all calls as approved IMMEDIATELY. Verdicts that arrive later are recorded for observability only. Shadow mode for rule development.
- enforce: subscribes-then-publishes per ADR-039 race-fix option 3 (per-call waiter registration BEFORE publish), waits up to ToolCallGovernanceConfig.Timeout for a per-call verdict, and returns approved/rejected sets. Fail-closed on timeout.
Race-condition fix: subscribe before publish ¶
JetStream consumer binding is async with publish. If the rule processor fires faster than the loop's verdict subscription binds, the verdict can be published to a subject with no subscribers, the loop times out, the call fails — same class as the natsclient handler-error payload convention bug.
Fix: the Component sets up a SINGLE wildcard subscription to `agent.toolcall.approved.>` and `agent.toolcall.rejected.>` at Start. The dispatcher REGISTERS per-call waiter channels in-process BEFORE publishing the proposed call. The wildcard subscription is bound before any task arrives, and the in-process waiter registration is synchronous with the publish call. Verdicts that arrive demux by call_id (trailing path segment) into the registered waiter channel.
Approved subject shape (ADR-039 open question 1) ¶
Implementations consume verdict subjects of the shape `agent.toolcall.{approved,rejected}.<loop_id>.<call_id>`. The call_id is the trailing path segment. Operators templating verdict subjects in their rule actions are expected to use `$message.loop_id.$message.call_id`; ADR-039 §"Implementation" lists this contract.
Package agenticloop provides Prometheus metrics for agentic-loop component.
Index ¶
- Constants
- Variables
- func BuildIterationBudgetMessage(iteration, maxIterations int) agentic.ChatMessage
- func BuildTodoStateMessage(todos []TodoState) agentic.ChatMessage
- func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
- func Register(registry *component.Registry) error
- type ApprovalTimeoutCandidate
- type CompactionResult
- type Compactor
- type CompactorOption
- type Component
- func (c *Component) ConfigSchema() component.ConfigSchema
- func (c *Component) DataFlow() component.FlowMetrics
- func (c *Component) Health() component.HealthStatus
- func (c *Component) Initialize() error
- func (c *Component) InputPorts() []component.Port
- func (c *Component) Meta() component.Metadata
- func (c *Component) OpenAPISpec() *service.OpenAPISpec
- func (c *Component) OutputPorts() []component.Port
- func (c *Component) Phase() string
- func (c *Component) RegisterHTTPHandlers(prefix string, mux *http.ServeMux)
- func (c *Component) Start(ctx context.Context) error
- func (c *Component) StateManager() *workflow.StateManager
- func (c *Component) Stop(timeout time.Duration) error
- func (c *Component) WorkflowID() string
- type Config
- type ConsumerConfig
- type ContextConfig
- type ContextManager
- func (cm *ContextManager) AddGraphEntityContext(entityID string, content string) error
- func (cm *ContextManager) AddMessage(region RegionType, msg agentic.ChatMessage) error
- func (cm *ContextManager) AdvanceIteration()
- func (cm *ContextManager) CheckBudget(budgetTokens int) (bool, int)
- func (cm *ContextManager) ClearGraphEntities()
- func (cm *ContextManager) GetContext() []agentic.ChatMessage
- func (cm *ContextManager) GetGraphEntityTokens() int
- func (cm *ContextManager) GetRegionTokens(region RegionType) int
- func (cm *ContextManager) ModelLimit() int
- func (cm *ContextManager) RepairToolPairs() int
- func (cm *ContextManager) ShouldCompact() bool
- func (cm *ContextManager) SliceForBudget(budget int, slice ContextSlice) error
- func (cm *ContextManager) TotalTokens() int
- func (cm *ContextManager) Utilization() float64
- type ContextManagerOption
- type ContextSlice
- type DispatcherMetrics
- type DispatcherResult
- type GovernanceDispatcher
- type HandlerResult
- type LLMSummarizer
- type LoopManager
- func (m *LoopManager) AddPendingTool(loopID, callID string) error
- func (m *LoopManager) AllToolsComplete(loopID string) bool
- func (m *LoopManager) CacheMetadata(loopID string, metadata map[string]any)
- func (m *LoopManager) CacheRequestTimeout(loopID, timeout string)
- func (m *LoopManager) CacheResponseFormat(loopID string, rf *agentic.ResponseFormat)
- func (m *LoopManager) CacheTaskPrompt(loopID, prompt string)
- func (m *LoopManager) CacheToolChoice(loopID string, tc *agentic.ToolChoice)
- func (m *LoopManager) CacheTools(loopID string, tools []agentic.ToolDefinition)
- func (m *LoopManager) CancelLoop(loopID, cancelledBy string) (agentic.LoopEntity, error)
- func (m *LoopManager) ClearQueuedTools(loopID string)
- func (m *LoopManager) CreateLoop(taskID, role, model string, maxIterations ...int) (string, error)
- func (m *LoopManager) CreateLoopWithID(loopID, taskID, role, model string, maxIterations ...int) (string, error)
- func (m *LoopManager) DeleteLoop(loopID string) error
- func (m *LoopManager) DequeueToolCall(loopID string) (agentic.ToolCall, bool)
- func (m *LoopManager) ExtractLoopIDFromRequest(requestID string) string
- func (m *LoopManager) ExtractLoopIDFromToolCall(toolCallID string) string
- func (m *LoopManager) GenerateRequestID(loopID string) string
- func (m *LoopManager) GenerateToolCallID(loopID string) string
- func (m *LoopManager) GetAndClearToolResults(loopID string) []agentic.ToolResult
- func (m *LoopManager) GetCachedMetadata(loopID string) map[string]any
- func (m *LoopManager) GetCachedRequestTimeout(loopID string) string
- func (m *LoopManager) GetCachedResponseFormat(loopID string) *agentic.ResponseFormat
- func (m *LoopManager) GetCachedToolChoice(loopID string) *agentic.ToolChoice
- func (m *LoopManager) GetCachedTools(loopID string) []agentic.ToolDefinition
- func (m *LoopManager) GetContextManager(loopID string) *ContextManager
- func (m *LoopManager) GetCurrentIteration(loopID string) int
- func (m *LoopManager) GetDepth(loopID string) (depth, maxDepth int, err error)
- func (m *LoopManager) GetLoop(loopID string) (agentic.LoopEntity, error)
- func (m *LoopManager) GetLoopForRequest(requestID string) (string, bool)
- func (m *LoopManager) GetLoopForRequestWithRecovery(requestID string) (string, bool)
- func (m *LoopManager) GetLoopForToolCall(callID string) (string, bool)
- func (m *LoopManager) GetLoopForToolCallWithRecovery(toolCallID string) (string, bool)
- func (m *LoopManager) GetPendingTools(loopID string) []string
- func (m *LoopManager) GetRequestStart(requestID string) time.Time
- func (m *LoopManager) GetTaskPrompt(loopID string) string
- func (m *LoopManager) GetToolArguments(callID string) map[string]any
- func (m *LoopManager) GetToolName(callID string) string
- func (m *LoopManager) GetToolStart(callID string) time.Time
- func (m *LoopManager) HasActiveLoopForTask(taskID string) (string, bool)
- func (m *LoopManager) HasQueuedTools(loopID string) bool
- func (m *LoopManager) IncrementIteration(loopID string) error
- func (m *LoopManager) IncrementTruncationRetry(loopID string) int
- func (m *LoopManager) IsTimedOut(loopID string) bool
- func (m *LoopManager) QueueToolCalls(loopID string, calls []agentic.ToolCall)
- func (m *LoopManager) RemovePendingTool(loopID, callID string) error
- func (m *LoopManager) ResetTruncationRetry(loopID string)
- func (m *LoopManager) ResolveApprovalIfPending(loopID, callID string) (agentic.PendingApprovalState, bool, error)
- func (m *LoopManager) SetDepth(loopID string, depth, maxDepth int) error
- func (m *LoopManager) SetMetadata(loopID string, metadata map[string]any) error
- func (m *LoopManager) SetParentLoop(loopID, parentLoopID string) error
- func (m *LoopManager) SetParentLoopID(loopID, parentLoopID string) error
- func (m *LoopManager) SetTimeout(loopID string, timeout time.Duration) error
- func (m *LoopManager) SetUserContext(loopID, channelType, channelID, userID string) error
- func (m *LoopManager) SetWorkflowContext(loopID, workflowSlug, workflowStep string) error
- func (m *LoopManager) SnapshotExpiredApprovals(now time.Time) []ApprovalTimeoutCandidate
- func (m *LoopManager) StoreToolResult(loopID string, result agentic.ToolResult) error
- func (m *LoopManager) TrackRequest(requestID, loopID string)
- func (m *LoopManager) TrackRequestStart(requestID string)
- func (m *LoopManager) TrackToolArguments(callID string, args map[string]any)
- func (m *LoopManager) TrackToolCall(callID, loopID string)
- func (m *LoopManager) TrackToolName(callID, name string)
- func (m *LoopManager) TrackToolStart(callID string)
- func (m *LoopManager) TransitionLoop(loopID string, newState agentic.LoopState) error
- func (m *LoopManager) UpdateCompletion(loopID, outcome, result, errMsg string) error
- func (m *LoopManager) UpdateLoop(entity agentic.LoopEntity) error
- type LoopManagerOption
- type MessageHandler
- func (h *MessageHandler) BuildFailureEvent(loopID, reason, errorMsg string) (*agentic.LoopFailedEvent, error)
- func (h *MessageHandler) BuildFailureMessages(loopID, reason, errorMsg string) (*agentic.LoopFailedEvent, []PublishedMessage, error)
- func (h *MessageHandler) CancelLoop(loopID, cancelledBy string) (agentic.LoopEntity, error)
- func (h *MessageHandler) GetContextManager(loopID string) *ContextManager
- func (h *MessageHandler) GetLoop(loopID string) (agentic.LoopEntity, error)
- func (h *MessageHandler) GetTrajectory(loopID string) (agentic.Trajectory, error)
- func (h *MessageHandler) GovernanceDispatcher() GovernanceDispatcher
- func (h *MessageHandler) HandleApprovalResponse(ctx context.Context, response agentic.ApprovalResponse) (result HandlerResult, err error)
- func (h *MessageHandler) HandleModelResponse(ctx context.Context, loopID string, response agentic.AgentResponse) (HandlerResult, error)
- func (h *MessageHandler) HandleTask(ctx context.Context, task TaskMessage) (HandlerResult, error)
- func (h *MessageHandler) HandleToolResult(ctx context.Context, loopID string, toolResult agentic.ToolResult) (HandlerResult, error)
- func (h *MessageHandler) SetGovernanceDispatcher(d GovernanceDispatcher)
- func (h *MessageHandler) SetLogger(logger *slog.Logger)
- func (h *MessageHandler) SetPersonaFragments(src PersonaFragmentSource)
- func (h *MessageHandler) SetPlatform(p types.PlatformMeta)
- func (h *MessageHandler) SetPromptRegistry(r *prompt.Registry)
- func (h *MessageHandler) SetSummarizer(s Summarizer, modelName string)
- func (h *MessageHandler) SetTodoReader(r TodoReader)
- func (h *MessageHandler) SetToolRegistry(r component.ToolRegistryReader)
- func (h *MessageHandler) UpdateLoop(entity agentic.LoopEntity) error
- type PersonaFragmentSource
- type ProposedToolCallPayload
- type PublishedMessage
- type RegionType
- type Summarizer
- type TaskMessage
- type TodoReader
- type TodoState
- type ToolCallGovernanceConfig
- type ToolCallRejection
- type TrajectoryManager
- func (m *TrajectoryManager) AddStep(loopID string, step agentic.TrajectoryStep) (agentic.Trajectory, error)
- func (m *TrajectoryManager) CompleteTrajectory(loopID, outcome string) (agentic.Trajectory, error)
- func (m *TrajectoryManager) GetTrajectory(loopID string) (agentic.Trajectory, error)
- func (m *TrajectoryManager) SaveTrajectory(_ context.Context, _ agentic.Trajectory) error
- func (m *TrajectoryManager) StartTrajectory(loopID string) (agentic.Trajectory, error)
- type VerdictPayload
- type VerdictPublisher
Constants ¶
const ( ToolCallGovernanceModeDisabled = "disabled" ToolCallGovernanceModeAudit = "audit" ToolCallGovernanceModeEnforce = "enforce" )
Tool-call governance mode constants. ADR-039.
Disabled is the default — no publish to the proposed subject, direct dispatch. The in-process ToolCallFilter wiring beta.67+68 shipped was retired in beta.70; subject-mode is now the sole governance path.
Audit publishes proposed tool calls to the proposed subject and dispatches IMMEDIATELY without waiting for a verdict — shadow mode for operators developing governance rules without blocking production tool-call flow. Verdicts that arrive later are counted for observability only.
Enforce subscribes-then-publishes, waits up to ToolCallGovernance.Timeout for a per-call verdict, and dispatches on approve / fails on reject / fail-closed on timeout. The default 1s timeout is deliberately generous for beta.69 to surface real p99 latency before tightening in beta.70.
const DefaultContextLimit = 128000
DefaultContextLimit is the fallback context window size when the model is unknown.
const DefaultToolCallGovernanceTimeout = "1s"
DefaultToolCallGovernanceTimeout is the wait window for a per-call verdict in enforce mode before the dispatcher fails-closed (treats the absence of a verdict as a reject). 1s is generous for beta.69 to surface real p99 latency; tightens after measurement in beta.70 per ADR-039 §"Observability".
Variables ¶
var ErrGovernancePublishFailed = errors.New("governance publish failed")
ErrGovernancePublishFailed is returned by Propose only when the dispatcher is unable to make any forward progress at all — currently unused (per-call publish failures are recorded as per-call rejections). Reserved for a future architectural failure mode that short-circuits the entire batch.
Functions ¶
func BuildIterationBudgetMessage ¶
func BuildIterationBudgetMessage(iteration, maxIterations int) agentic.ChatMessage
BuildIterationBudgetMessage creates a system message informing the model of its iteration budget. Tone escalates as the budget is consumed: neutral at ≤50%, a nudge to wrap up at 51-75%, and urgent at >75%.
func BuildTodoStateMessage ¶
func BuildTodoStateMessage(todos []TodoState) agentic.ChatMessage
BuildTodoStateMessage formats the current todo list as a system message the agent sees at the top of its iteration. Empty list returns the zero ChatMessage (handler skips appending).
Format is compact and mirrors Claude Code's TodoWrite display: per-status marker + content, one per line. Operators reading the prompt see what the agent sees.
func NewComponent ¶
func NewComponent(rawConfig json.RawMessage, deps component.Dependencies) (component.Discoverable, error)
NewComponent creates a new agentic-loop component
Types ¶
type ApprovalTimeoutCandidate ¶
type ApprovalTimeoutCandidate struct {
LoopID string
CallID string
ToolName string
RequestedAt time.Time
Timeout time.Duration
}
ApprovalTimeoutCandidate captures the loop+call coordinates the approval-timeout sweeper needs to publish an auto-rejection. The sweeper builds an agentic.ApprovalResponse from these and feeds it through HandleApprovalResponse — same code path a real human rejection would take.
type CompactionResult ¶
type CompactionResult struct {
Summary string
EvictedTokens int
NewTokens int
Model string // model used for summarization (empty if stub fallback)
}
CompactionResult contains the results of a compaction operation.
type Compactor ¶
type Compactor struct {
// contains filtered or unexported fields
}
Compactor handles context compaction operations.
func NewCompactor ¶
func NewCompactor(config ContextConfig, opts ...CompactorOption) *Compactor
NewCompactor creates a new compactor. Variadic opts allow optional injection of a Summarizer and logger without breaking existing callers.
func (*Compactor) Compact ¶
func (c *Compactor) Compact(ctx context.Context, cm *ContextManager) (CompactionResult, error)
Compact performs compaction on the context manager. When a Summarizer is injected, it calls the LLM to generate a real summary. If the summarizer returns an error, it falls back to a stub summary and logs a warning.
func (*Compactor) ShouldCompact ¶
func (c *Compactor) ShouldCompact(cm *ContextManager) bool
ShouldCompact delegates to the context manager's ShouldCompact.
type CompactorOption ¶
type CompactorOption func(*Compactor)
CompactorOption is a functional option for configuring a Compactor.
func WithCompactorLogger ¶
func WithCompactorLogger(l *slog.Logger) CompactorOption
WithCompactorLogger sets the logger used by the Compactor.
func WithModelName ¶
func WithModelName(name string) CompactorOption
WithModelName sets the resolved model name reported in CompactionResult.
func WithSummarizer ¶
func WithSummarizer(s Summarizer) CompactorOption
WithSummarizer injects an LLM-backed summarizer into the Compactor. When set, Compact() calls the summarizer instead of the stub fallback.
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component implements the agentic-loop processor
func (*Component) ConfigSchema ¶
func (c *Component) ConfigSchema() component.ConfigSchema
ConfigSchema returns the configuration schema
func (*Component) DataFlow ¶
func (c *Component) DataFlow() component.FlowMetrics
DataFlow returns current data flow metrics
func (*Component) Health ¶
func (c *Component) Health() component.HealthStatus
Health returns current health status
func (*Component) Initialize ¶
Initialize prepares the component (no-op for this component)
func (*Component) InputPorts ¶
InputPorts returns input port definitions
func (*Component) OpenAPISpec ¶
func (c *Component) OpenAPISpec() *service.OpenAPISpec
OpenAPISpec returns the OpenAPI specification for trajectory endpoints.
func (*Component) OutputPorts ¶
OutputPorts returns output port definitions
func (*Component) RegisterHTTPHandlers ¶
RegisterHTTPHandlers registers HTTP endpoints for trajectory access.
func (*Component) Start ¶
Start starts the component. The context is used for cancellation during startup operations.
func (*Component) StateManager ¶
func (c *Component) StateManager() *workflow.StateManager
StateManager returns nil since agentic-loop manages its own state internally. Workflows interact with agentic-loop via events and KV watches, not direct state access.
func (*Component) WorkflowID ¶
WorkflowID returns empty string since this component handles multiple workflows dynamically. The workflow context is tracked per-loop via WorkflowSlug/WorkflowStep fields.
type Config ¶
type Config struct {
MaxIterations int `` /* 153-byte string literal not displayed */
Timeout string `` /* 138-byte string literal not displayed */
StreamName string `json:"stream_name" schema:"type:string,description:JetStream stream name,default:AGENT,category:advanced"`
ConsumerNameSuffix string `json:"consumer_name_suffix" schema:"type:string,description:Suffix for consumer names,category:advanced"`
DeleteConsumerOnStop bool `` /* 157-byte string literal not displayed */
LoopsBucket string `` /* 142-byte string literal not displayed */
ToolResultMaxBytes int `` /* 174-byte string literal not displayed */
TrajectoryDetail string `` /* 152-byte string literal not displayed */
ContentBucket string `` /* 191-byte string literal not displayed */
TrajectoryCacheTTL string `` /* 208-byte string literal not displayed */
ApprovalTimeoutStr string `` /* 181-byte string literal not displayed */
Consumer ConsumerConfig `` /* 155-byte string literal not displayed */
Context ContextConfig `` /* 142-byte string literal not displayed */
ToolCallGovernance ToolCallGovernanceConfig `` /* 185-byte string literal not displayed */
Ports *component.PortConfig `json:"ports,omitempty" schema:"type:ports,description:Port configuration for inputs and outputs,category:basic"`
}
Config represents the configuration for the agentic-loop processor
func (Config) ApprovalTimeout ¶
ApprovalTimeout returns the parsed duration for ApprovalTimeoutStr. Returns zero (wait indefinitely) when unset or unparseable; the validation step is the safety net for malformed input.
type ConsumerConfig ¶
type ConsumerConfig struct {
AckWait string `` /* 149-byte string literal not displayed */
HeartbeatInterval string `` /* 173-byte string literal not displayed */
MaxDeliver int `` /* 154-byte string literal not displayed */
}
ConsumerConfig holds JetStream consumer tuning for long-running ports. Deployments with slow LLMs (local Ollama, constrained GPUs) should increase ack_wait and heartbeat_interval to prevent premature redelivery.
func DefaultConsumerConfig ¶
func DefaultConsumerConfig() ConsumerConfig
DefaultConsumerConfig returns the default consumer configuration. These defaults match the original hardcoded values.
func (*ConsumerConfig) EnsureDefaults ¶
func (c *ConsumerConfig) EnsureDefaults()
EnsureDefaults fills zero-valued fields with defaults.
func (ConsumerConfig) ParsedAckWait ¶
func (c ConsumerConfig) ParsedAckWait() time.Duration
ParsedAckWait returns AckWait as a time.Duration. Caller must validate first.
func (ConsumerConfig) ParsedHeartbeatInterval ¶
func (c ConsumerConfig) ParsedHeartbeatInterval() time.Duration
ParsedHeartbeatInterval returns HeartbeatInterval as a time.Duration. Caller must validate first.
func (ConsumerConfig) Validate ¶
func (c ConsumerConfig) Validate() error
Validate validates the consumer configuration.
type ContextConfig ¶
type ContextConfig struct {
Enabled bool `json:"enabled" description:"Deprecated: context management is always enabled (required for Gemini compatibility)"`
CompactThreshold float64 `json:"compact_threshold" description:"Utilization threshold (0.01-1.0) that triggers context compaction"`
HeadroomRatio float64 `` /* 169-byte string literal not displayed */
HeadroomTokens int `json:"headroom_tokens" description:"Minimum token headroom floor — ratio-based headroom never goes below this value"`
// Multi-agent context budget fields
MaxBudgetTokens int `json:"max_budget_tokens,omitempty" description:"Hard token limit for context budget (overrides model limits when set)"`
SliceOnBudget bool `json:"slice_on_budget,omitempty" description:"Enable context slicing when budget is exceeded"`
PreserveEntities []string `json:"preserve_entities,omitempty" description:"Entity IDs to always keep in context during slicing"`
EntityPriority int `` /* 127-byte string literal not displayed */
InjectProfileContext bool `` /* 126-byte string literal not displayed */
}
ContextConfig represents configuration for context memory management. Model context limits are resolved from the unified model registry (component.Dependencies.ModelRegistry).
func DefaultContextConfig ¶
func DefaultContextConfig() ContextConfig
DefaultContextConfig returns the default context configuration
func (*ContextConfig) EnsureDefaults ¶
func (c *ContextConfig) EnsureDefaults()
EnsureDefaults fills zero-valued fields with defaults. This is needed because json.Unmarshal overwrites nested structs even when the JSON contains zero values (e.g., "compact_threshold": 0).
func (ContextConfig) Validate ¶
func (c ContextConfig) Validate() error
Validate validates the context configuration. Context management is always enabled; the Enabled field is deprecated.
type ContextManager ¶
type ContextManager struct {
// contains filtered or unexported fields
}
ContextManager manages conversation context with memory optimization
func NewContextManager ¶
func NewContextManager(loopID, model string, config ContextConfig, opts ...ContextManagerOption) *ContextManager
NewContextManager creates a new context manager
func (*ContextManager) AddGraphEntityContext ¶
func (cm *ContextManager) AddGraphEntityContext(entityID string, content string) error
AddGraphEntityContext adds context from graph entities to the dedicated region
func (*ContextManager) AddMessage ¶
func (cm *ContextManager) AddMessage(region RegionType, msg agentic.ChatMessage) error
AddMessage adds a message to a specific region
func (*ContextManager) AdvanceIteration ¶
func (cm *ContextManager) AdvanceIteration()
AdvanceIteration moves to the next iteration. Call this after processing each loop iteration to ensure proper age tracking for GC.
func (*ContextManager) CheckBudget ¶
func (cm *ContextManager) CheckBudget(budgetTokens int) (bool, int)
CheckBudget checks if current context is within the token budget. Returns (withinBudget, currentTokens).
func (*ContextManager) ClearGraphEntities ¶
func (cm *ContextManager) ClearGraphEntities()
ClearGraphEntities clears the graph entities region
func (*ContextManager) GetContext ¶
func (cm *ContextManager) GetContext() []agentic.ChatMessage
GetContext returns all messages in region priority order
func (*ContextManager) GetGraphEntityTokens ¶
func (cm *ContextManager) GetGraphEntityTokens() int
GetGraphEntityTokens returns the total tokens in the graph entities region
func (*ContextManager) GetRegionTokens ¶
func (cm *ContextManager) GetRegionTokens(region RegionType) int
GetRegionTokens returns the total token count for a region
func (*ContextManager) ModelLimit ¶
func (cm *ContextManager) ModelLimit() int
ModelLimit returns the resolved model context limit.
func (*ContextManager) RepairToolPairs ¶
func (cm *ContextManager) RepairToolPairs() int
RepairToolPairs is the public entry point for the tool-pair integrity audit. Locks cm.mu and delegates to repairToolPairsLocked.
Beta.25 hoisted this from compaction-only (it was previously called only by SliceForBudget) to a general-purpose pre-request safety net. Beta.25 C1 wired synth-result emission at every known terminal-transition path (failLoop, max-iter, cancel signal, dispatch failure) so most orphan tool_calls get matched results before they ever reach context. RepairToolPairs is the belt-and- suspenders for cases the synth-result path missed:
- loops restored from KV with corrupt context written by an older binary version,
- any future failure path that adds an assistant tool_call to context without ensuring matched results,
- third-party state mutations (e.g., manual KV edits during debugging).
Returns the number of messages removed; zero on the hot path (well-formed contexts iterate the recent-history slice once and return without allocations).
func (*ContextManager) ShouldCompact ¶
func (cm *ContextManager) ShouldCompact() bool
ShouldCompact returns true if compaction should be triggered
func (*ContextManager) SliceForBudget ¶
func (cm *ContextManager) SliceForBudget(budget int, slice ContextSlice) error
SliceForBudget removes content to fit within the budget. Uses graph-aware slicing that preserves entity context when EntityPriority is set.
func (*ContextManager) TotalTokens ¶
func (cm *ContextManager) TotalTokens() int
TotalTokens returns the sum of all region tokens.
func (*ContextManager) Utilization ¶
func (cm *ContextManager) Utilization() float64
Utilization returns the current context utilization (0.0 to 1.0). Reserves headroom so compaction triggers before the model limit, leaving room for the response. Headroom is resolved as the greater of (HeadroomRatio * modelLimit) and HeadroomTokens (floor).
type ContextManagerOption ¶
type ContextManagerOption func(*ContextManager)
ContextManagerOption is a functional option for configuring ContextManager
func WithLogger ¶
func WithLogger(logger *slog.Logger) ContextManagerOption
WithLogger sets the logger for the ContextManager
func WithModelRegistry ¶
func WithModelRegistry(reg model.RegistryReader) ContextManagerOption
WithModelRegistry provides registry-based model limit resolution
type ContextSlice ¶
type ContextSlice struct {
IncludeRegions []RegionType // Regions to include
ExcludeRegions []RegionType // Regions to exclude (takes precedence)
PreserveEntities []string // Entity IDs to always keep
}
ContextSlice defines which regions to include when slicing context
type DispatcherMetrics ¶
type DispatcherMetrics interface {
// RecordGovernanceVerdict observes the wait duration and counts the
// verdict. decision is "approved" | "rejected" | "timeout"; mode is
// "audit" | "enforce".
RecordGovernanceVerdict(decision, mode string, duration float64)
// RecordGovernanceVerdictMissingWaiter signals that a verdict
// arrived for a call_id without a registered waiter — points at
// the subscribe-before-publish race regressing or a late arrival.
RecordGovernanceVerdictMissingWaiter()
}
DispatcherMetrics is the optional metrics-sink interface the dispatcher calls into for observability. The Component's *loopMetrics satisfies it; tests pass nil.
Kept narrow so the dispatcher stays free of Prometheus types and the metrics layer can evolve independently. Implementations should be non-blocking — the dispatcher is on the loop's hot path.
type DispatcherResult ¶
type DispatcherResult struct {
Approved []agentic.ToolCall
Rejected []ToolCallRejection
}
DispatcherResult is the verdict for a batch of proposed calls. Empty Rejected with full Approved is the disabled-mode/audit-mode steady state. Partial splits are produced by enforce mode.
type GovernanceDispatcher ¶
type GovernanceDispatcher interface {
// Propose submits a batch of tool calls for governance review.
// Returns the approved/rejected split. Disabled mode passes
// through; audit mode publishes-but-passes-through; enforce mode
// waits up to the configured timeout per call and fails-closed on
// timeout (treats absent verdict as a reject).
//
// loopID is the originating loop's bare UUID. parentLoopID is the
// spawn-tree parent (empty for top-level loops). Both ride along
// in the proposed payload so rule conditions can match across the
// hierarchy from day one.
Propose(ctx context.Context, loopID, parentLoopID string, calls []agentic.ToolCall) (DispatcherResult, error)
// HandleVerdict feeds an inbound verdict to the dispatcher. The
// Component's wildcard subscription handlers call this for every
// verdict, after extracting the decision and call_id from the
// payload via VerdictPayload.EffectiveCallID/EffectiveDecision.
// The dispatcher demuxes by call_id to the appropriate waiter
// channel. No-op when no waiter is registered for the call_id
// (audit mode, late arrivals, verdicts for other components'
// loops on a shared stream). Data is retained for audit logging
// only — routing decisions are made from decision + callID.
HandleVerdict(decision, callID string, data []byte)
// Mode returns the configured mode for inspection. Useful for
// observability gauges and conditional logging in the handler.
Mode() string
}
GovernanceDispatcher abstracts the subject-mode tool-call governance flow. The Component holds one instance per loop component and delegates from handleToolCallResponse before the existing serial- dispatch. Verdict arrival from the JetStream subscription is fed in via HandleVerdict.
func NewGovernanceDispatcher ¶
func NewGovernanceDispatcher(cfg ToolCallGovernanceConfig, publisher VerdictPublisher, logger *slog.Logger, metrics DispatcherMetrics) GovernanceDispatcher
NewGovernanceDispatcher constructs the dispatcher matching the configured mode. Publisher may be nil in disabled mode (no publishes happen). Metrics may be nil — observability records are skipped when unset. The returned dispatcher's HandleVerdict is safe to call from JetStream consumer callbacks.
The dispatcher does NOT subscribe — the Component owns the wildcard JetStream subscription and routes verdicts through HandleVerdict. That keeps the consumer lifecycle (create / bind / cleanup-on-stop) in one place and lets the dispatcher stay free of NATS dependencies.
type HandlerResult ¶
type HandlerResult struct {
LoopID string
State agentic.LoopState
PublishedMessages []PublishedMessage
PendingTools []string
TrajectorySteps []agentic.TrajectoryStep
ContextEvents []agentic.ContextEvent
RetryScheduled bool
MaxIterationsReached bool
// Created is true only when HandleTask actually created a new loop.
// False on the dedup short-circuit path (TaskMessage redelivered for
// an already-active task). Component uses this to gate
// recordLoopCreated so the active_loops gauge does not drift on
// JetStream redelivery.
Created bool
// CompletionState contains enriched completion data for KV persistence.
// This is populated when a loop completes and is used by component.go
// to write to the loops bucket with key pattern COMPLETE_{loopID}.
CompletionState *agentic.LoopCompletedEvent
// FailureState contains enriched failure data for graph emission.
// Populated when a loop fails, mirrors CompletionState for the failure path.
FailureState *agentic.LoopFailedEvent
}
HandlerResult contains the results of a handler operation
type LLMSummarizer ¶
type LLMSummarizer struct {
// contains filtered or unexported fields
}
LLMSummarizer implements Summarizer using a graph/llm.Client.
func NewLLMSummarizer ¶
func NewLLMSummarizer(client llm.Client, logger *slog.Logger) *LLMSummarizer
NewLLMSummarizer creates a summarizer backed by an LLM client.
func (*LLMSummarizer) Summarize ¶
func (s *LLMSummarizer) Summarize(ctx context.Context, messages []agentic.ChatMessage, maxTokens int) (string, error)
Summarize calls the LLM to generate a summary of the conversation messages.
type LoopManager ¶
type LoopManager struct {
// contains filtered or unexported fields
}
LoopManager manages loop entity lifecycle and state
func NewLoopManager ¶
func NewLoopManager(opts ...LoopManagerOption) *LoopManager
NewLoopManager creates a new LoopManager
func NewLoopManagerWithConfig ¶
func NewLoopManagerWithConfig(contextConfig ContextConfig, opts ...LoopManagerOption) *LoopManager
NewLoopManagerWithConfig creates a new LoopManager with custom context config
func (*LoopManager) AddPendingTool ¶
func (m *LoopManager) AddPendingTool(loopID, callID string) error
AddPendingTool adds a pending tool call to the loop
func (*LoopManager) AllToolsComplete ¶
func (m *LoopManager) AllToolsComplete(loopID string) bool
AllToolsComplete returns true if there are no pending tool calls
func (*LoopManager) CacheMetadata ¶
func (m *LoopManager) CacheMetadata(loopID string, metadata map[string]any)
CacheMetadata stores domain context metadata for a loop (set once from task, reused for all tool calls). Makes a defensive copy to isolate from the caller's map.
func (*LoopManager) CacheRequestTimeout ¶
func (m *LoopManager) CacheRequestTimeout(loopID, timeout string)
CacheRequestTimeout stores the per-request timeout for a loop (from TaskMessage.Timeout). Reused for all continuation iterations so the task-level budget persists across LLM calls in the same loop.
func (*LoopManager) CacheResponseFormat ¶
func (m *LoopManager) CacheResponseFormat(loopID string, rf *agentic.ResponseFormat)
CacheResponseFormat stores the per-task response_format for a loop (from TaskMessage.ResponseFormat). Reused for all continuation iterations so the structured-output constraint persists across LLM calls in the same loop. ADR-034.
func (*LoopManager) CacheTaskPrompt ¶
func (m *LoopManager) CacheTaskPrompt(loopID, prompt string)
CacheTaskPrompt stores the original task prompt for context recovery. If GC/repair leaves the context empty, this prompt is re-injected as a synthetic user message so the model always has contents to work with.
func (*LoopManager) CacheToolChoice ¶
func (m *LoopManager) CacheToolChoice(loopID string, tc *agentic.ToolChoice)
CacheToolChoice stores the tool choice strategy for a loop (set once from task, reused for all requests)
func (*LoopManager) CacheTools ¶
func (m *LoopManager) CacheTools(loopID string, tools []agentic.ToolDefinition)
CacheTools stores tool definitions for a loop (discovered once, reused for all requests)
func (*LoopManager) CancelLoop ¶
func (m *LoopManager) CancelLoop(loopID, cancelledBy string) (agentic.LoopEntity, error)
CancelLoop atomically cancels a loop and populates completion data. Returns the updated entity for further processing, or an error if the loop cannot be cancelled (not found or already terminal).
func (*LoopManager) ClearQueuedTools ¶
func (m *LoopManager) ClearQueuedTools(loopID string)
ClearQueuedTools discards all queued tool calls (e.g., when StopLoop fires).
func (*LoopManager) CreateLoop ¶
func (m *LoopManager) CreateLoop(taskID, role, model string, maxIterations ...int) (string, error)
CreateLoop creates a new loop entity with a generated UUID
func (*LoopManager) CreateLoopWithID ¶
func (m *LoopManager) CreateLoopWithID(loopID, taskID, role, model string, maxIterations ...int) (string, error)
CreateLoopWithID creates a new loop entity with a specific ID
func (*LoopManager) DeleteLoop ¶
func (m *LoopManager) DeleteLoop(loopID string) error
DeleteLoop deletes a loop entity and all associated tracking data.
func (*LoopManager) DequeueToolCall ¶
func (m *LoopManager) DequeueToolCall(loopID string) (agentic.ToolCall, bool)
DequeueToolCall removes and returns the next queued tool call for dispatch.
func (*LoopManager) ExtractLoopIDFromRequest ¶
func (m *LoopManager) ExtractLoopIDFromRequest(requestID string) string
ExtractLoopIDFromRequest extracts the loop ID from a structured request ID. Returns empty string if the ID is not in structured format.
func (*LoopManager) ExtractLoopIDFromToolCall ¶
func (m *LoopManager) ExtractLoopIDFromToolCall(toolCallID string) string
ExtractLoopIDFromToolCall extracts the loop ID from a structured tool call ID. Returns empty string if the ID is not in structured format.
func (*LoopManager) GenerateRequestID ¶
func (m *LoopManager) GenerateRequestID(loopID string) string
GenerateRequestID creates a structured request ID that embeds the loop ID. Format: loopID:req:shortUUID This allows recovery of loop ID from request ID if in-memory maps are lost.
func (*LoopManager) GenerateToolCallID ¶
func (m *LoopManager) GenerateToolCallID(loopID string) string
GenerateToolCallID creates a structured tool call ID that embeds the loop ID. Format: loopID:tool:shortUUID This allows recovery of loop ID from tool call ID if in-memory maps are lost.
func (*LoopManager) GetAndClearToolResults ¶
func (m *LoopManager) GetAndClearToolResults(loopID string) []agentic.ToolResult
GetAndClearToolResults retrieves all accumulated tool results and clears them. Also evicts the CallID→loop routing entry for each drained result so a late re-delivery (NATS redelivery, executor retry) lands on an empty mapping at handleToolResultMessage and is dropped at the wire instead of leaking into the next turn's PendingToolResults — which would otherwise produce a duplicate tool message in the message array sent to the model.
Eviction effectiveness depends on GetLoopForToolCallWithRecovery NOT resolving an evicted CallID via ExtractLoopIDFromToolCall — true today because model-issued CallIDs (toolu_, call_) don't carry the structured {loopID}:tool:{short} form and GenerateToolCallID is not wired to dispatch. If structured CallIDs ever become the dispatch default, this needs an evicted-set check inside the recovery path or it silently regresses.
Metadata maps (callIDToName, callIDToArguments, toolStartTimes) are preserved — buildToolMessages's empty-name fallback and the trajectory step builder still read them. They grow O(total-tool-calls-in-loop) and are cleaned up at DeleteLoop along with the rest of the loop's bookkeeping.
func (*LoopManager) GetCachedMetadata ¶
func (m *LoopManager) GetCachedMetadata(loopID string) map[string]any
GetCachedMetadata retrieves the cached metadata for a loop
func (*LoopManager) GetCachedRequestTimeout ¶
func (m *LoopManager) GetCachedRequestTimeout(loopID string) string
GetCachedRequestTimeout retrieves the cached per-request timeout for a loop. Returns empty string when no task-level timeout was set.
func (*LoopManager) GetCachedResponseFormat ¶
func (m *LoopManager) GetCachedResponseFormat(loopID string) *agentic.ResponseFormat
GetCachedResponseFormat retrieves the cached response_format for a loop. Returns nil when no task-level response_format was set, in which case AgentRequest.ResponseFormat stays nil and tool-calling behaviour is preserved.
func (*LoopManager) GetCachedToolChoice ¶
func (m *LoopManager) GetCachedToolChoice(loopID string) *agentic.ToolChoice
GetCachedToolChoice retrieves the cached tool choice for a loop
func (*LoopManager) GetCachedTools ¶
func (m *LoopManager) GetCachedTools(loopID string) []agentic.ToolDefinition
GetCachedTools retrieves the cached tool definitions for a loop
func (*LoopManager) GetContextManager ¶
func (m *LoopManager) GetContextManager(loopID string) *ContextManager
GetContextManager retrieves the context manager for a loop
func (*LoopManager) GetCurrentIteration ¶
func (m *LoopManager) GetCurrentIteration(loopID string) int
GetCurrentIteration returns the current iteration for a loop
func (*LoopManager) GetDepth ¶
func (m *LoopManager) GetDepth(loopID string) (depth, maxDepth int, err error)
GetDepth returns the current depth and max depth for a loop
func (*LoopManager) GetLoop ¶
func (m *LoopManager) GetLoop(loopID string) (agentic.LoopEntity, error)
GetLoop retrieves a loop entity by ID
func (*LoopManager) GetLoopForRequest ¶
func (m *LoopManager) GetLoopForRequest(requestID string) (string, bool)
GetLoopForRequest retrieves the loop ID for a request ID
func (*LoopManager) GetLoopForRequestWithRecovery ¶
func (m *LoopManager) GetLoopForRequestWithRecovery(requestID string) (string, bool)
GetLoopForRequestWithRecovery retrieves the loop ID for a request ID, attempting recovery from structured ID if not found in cache.
func (*LoopManager) GetLoopForToolCall ¶
func (m *LoopManager) GetLoopForToolCall(callID string) (string, bool)
GetLoopForToolCall retrieves the loop ID for a tool call ID
func (*LoopManager) GetLoopForToolCallWithRecovery ¶
func (m *LoopManager) GetLoopForToolCallWithRecovery(toolCallID string) (string, bool)
GetLoopForToolCallWithRecovery retrieves the loop ID for a tool call ID, attempting recovery from structured ID if not found in cache.
func (*LoopManager) GetPendingTools ¶
func (m *LoopManager) GetPendingTools(loopID string) []string
GetPendingTools returns all pending tool calls for a loop
func (*LoopManager) GetRequestStart ¶
func (m *LoopManager) GetRequestStart(requestID string) time.Time
GetRequestStart retrieves the start time for a model request.
func (*LoopManager) GetTaskPrompt ¶
func (m *LoopManager) GetTaskPrompt(loopID string) string
GetTaskPrompt retrieves the cached task prompt for a loop
func (*LoopManager) GetToolArguments ¶
func (m *LoopManager) GetToolArguments(callID string) map[string]any
GetToolArguments retrieves a shallow copy of the arguments for a tool call ID.
func (*LoopManager) GetToolName ¶
func (m *LoopManager) GetToolName(callID string) string
GetToolName retrieves the function name for a tool call ID.
func (*LoopManager) GetToolStart ¶
func (m *LoopManager) GetToolStart(callID string) time.Time
GetToolStart retrieves the start time for a tool call.
func (*LoopManager) HasActiveLoopForTask ¶
func (m *LoopManager) HasActiveLoopForTask(taskID string) (string, bool)
HasActiveLoopForTask returns true if a non-terminal loop already exists for the given task ID. This prevents duplicate loop creation on JetStream redelivery.
func (*LoopManager) HasQueuedTools ¶
func (m *LoopManager) HasQueuedTools(loopID string) bool
HasQueuedTools returns true if there are tool calls waiting to be dispatched.
func (*LoopManager) IncrementIteration ¶
func (m *LoopManager) IncrementIteration(loopID string) error
IncrementIteration increments the loop iteration counter
func (*LoopManager) IncrementTruncationRetry ¶
func (m *LoopManager) IncrementTruncationRetry(loopID string) int
IncrementTruncationRetry bumps the within-loop truncation retry counter and returns the new value. Caller branches on the return to decide between "first retry — compact and try again" (==1) and "already retried — fail loud" (>1). The counter is cleared by ResetTruncationRetry whenever the loop makes forward progress.
func (*LoopManager) IsTimedOut ¶
func (m *LoopManager) IsTimedOut(loopID string) bool
IsTimedOut checks if a loop has exceeded its timeout
func (*LoopManager) QueueToolCalls ¶
func (m *LoopManager) QueueToolCalls(loopID string, calls []agentic.ToolCall)
QueueToolCalls stores tool calls to be dispatched serially after the current call completes.
func (*LoopManager) RemovePendingTool ¶
func (m *LoopManager) RemovePendingTool(loopID, callID string) error
RemovePendingTool removes a pending tool call from the loop
func (*LoopManager) ResetTruncationRetry ¶
func (m *LoopManager) ResetTruncationRetry(loopID string)
ResetTruncationRetry clears the within-loop truncation retry counter. Called when the loop makes forward progress (a normal StatusComplete or StatusToolCall response arrives) so a future truncation can self-heal once.
func (*LoopManager) ResolveApprovalIfPending ¶
func (m *LoopManager) ResolveApprovalIfPending(loopID, callID string) (agentic.PendingApprovalState, bool, error)
ResolveApprovalIfPending atomically transitions the loop out of LoopStateAwaitingApproval if and only if the supplied call_id matches the currently pinned PendingApproval. Returns a snapshot of the pending state (so the caller has the original tool name + arguments + trace context for re-dispatch) plus a bool indicating whether the resolve actually happened. A false return is the idempotent drop case: the loop is no longer awaiting approval, or the response targets a different call_id (typical when a duplicate UI click races with an automated reject scheduler).
This is the only path that should mutate PendingApproval + State out of awaiting_approval after BeginAwaitingApproval. The previous load → mutate → UpdateLoop pattern in HandleApprovalResponse let two concurrent responses both pass the awaiting-state check and both dispatch — for a safety feature, that double-execution risk is unacceptable.
func (*LoopManager) SetDepth ¶
func (m *LoopManager) SetDepth(loopID string, depth, maxDepth int) error
SetDepth sets the depth tracking for a loop in the multi-agent hierarchy
func (*LoopManager) SetMetadata ¶
func (m *LoopManager) SetMetadata(loopID string, metadata map[string]any) error
SetMetadata sets domain context metadata on the loop entity.
func (*LoopManager) SetParentLoop ¶
func (m *LoopManager) SetParentLoop(loopID, parentLoopID string) error
SetParentLoop sets the parent loop ID for tracking architect->editor relationships
func (*LoopManager) SetParentLoopID ¶
func (m *LoopManager) SetParentLoopID(loopID, parentLoopID string) error
SetParentLoopID is an alias for SetParentLoop for consistency with TaskMessage field names
func (*LoopManager) SetTimeout ¶
func (m *LoopManager) SetTimeout(loopID string, timeout time.Duration) error
SetTimeout sets the timeout for a loop
func (*LoopManager) SetUserContext ¶
func (m *LoopManager) SetUserContext(loopID, channelType, channelID, userID string) error
SetUserContext sets the user routing info for error notifications
func (*LoopManager) SetWorkflowContext ¶
func (m *LoopManager) SetWorkflowContext(loopID, workflowSlug, workflowStep string) error
SetWorkflowContext sets the workflow slug and step for loops created by workflow commands
func (*LoopManager) SnapshotExpiredApprovals ¶
func (m *LoopManager) SnapshotExpiredApprovals(now time.Time) []ApprovalTimeoutCandidate
SnapshotExpiredApprovals returns a snapshot of loops whose pending approval has timed out (RequestedAt + Timeout <= now). Skips loops whose Timeout is zero (wait-indefinitely policy). Read-locked; the snapshot is taken under the lock and the lock released before return so callers can act on each candidate without holding the mutex.
Beta.25 adds this for the orphan-tool-call recovery work. The approval-timeout timer was a deferred item from beta.19; closing it now ensures a stuck human-approval flow doesn't leave the gated tool_call orphaned indefinitely (mode f of orphan recovery).
func (*LoopManager) StoreToolResult ¶
func (m *LoopManager) StoreToolResult(loopID string, result agentic.ToolResult) error
StoreToolResult stores a tool result in the loop entity for later retrieval
func (*LoopManager) TrackRequest ¶
func (m *LoopManager) TrackRequest(requestID, loopID string)
TrackRequest associates a request ID with a loop ID
func (*LoopManager) TrackRequestStart ¶
func (m *LoopManager) TrackRequestStart(requestID string)
TrackRequestStart records when a model request was sent.
func (*LoopManager) TrackToolArguments ¶
func (m *LoopManager) TrackToolArguments(callID string, args map[string]any)
TrackToolArguments associates a tool call ID with its arguments. This is used to populate the ToolArguments field on trajectory steps for audit.
func (*LoopManager) TrackToolCall ¶
func (m *LoopManager) TrackToolCall(callID, loopID string)
TrackToolCall associates a tool call ID with a loop ID
func (*LoopManager) TrackToolName ¶
func (m *LoopManager) TrackToolName(callID, name string)
TrackToolName associates a tool call ID with its function name. This is used to populate the name field on tool result messages (required by Gemini).
func (*LoopManager) TrackToolStart ¶
func (m *LoopManager) TrackToolStart(callID string)
TrackToolStart records when a tool call was dispatched for execution.
func (*LoopManager) TransitionLoop ¶
func (m *LoopManager) TransitionLoop(loopID string, newState agentic.LoopState) error
TransitionLoop transitions a loop to a new state
func (*LoopManager) UpdateCompletion ¶
func (m *LoopManager) UpdateCompletion(loopID, outcome, result, errMsg string) error
UpdateCompletion updates a loop with completion data (outcome, result, error). This is called when a loop finishes to populate fields for SSE delivery via KV watch.
func (*LoopManager) UpdateLoop ¶
func (m *LoopManager) UpdateLoop(entity agentic.LoopEntity) error
UpdateLoop updates an existing loop entity
type LoopManagerOption ¶
type LoopManagerOption func(*LoopManager)
LoopManagerOption is a functional option for configuring LoopManager
func WithLoopManagerLogger ¶
func WithLoopManagerLogger(logger *slog.Logger) LoopManagerOption
WithLoopManagerLogger sets the logger for the LoopManager and its context managers
func WithLoopManagerModelRegistry ¶
func WithLoopManagerModelRegistry(reg model.RegistryReader) LoopManagerOption
WithLoopManagerModelRegistry sets the model registry for context managers
type MessageHandler ¶
type MessageHandler struct {
// contains filtered or unexported fields
}
MessageHandler handles incoming messages and coordinates loop execution
func NewMessageHandler ¶
func NewMessageHandler(config Config, loopManagerOpts ...LoopManagerOption) *MessageHandler
NewMessageHandler creates a new MessageHandler
func (*MessageHandler) BuildFailureEvent ¶
func (h *MessageHandler) BuildFailureEvent(loopID, reason, errorMsg string) (*agentic.LoopFailedEvent, error)
BuildFailureEvent creates a failure event (public wrapper for component.go).
func (*MessageHandler) BuildFailureMessages ¶
func (h *MessageHandler) BuildFailureMessages(loopID, reason, errorMsg string) (*agentic.LoopFailedEvent, []PublishedMessage, error)
BuildFailureMessages creates a failure event and serializes it for NATS publishing. Returns the event (for graph emission) and published messages (for reactive workflows).
func (*MessageHandler) CancelLoop ¶
func (h *MessageHandler) CancelLoop(loopID, cancelledBy string) (agentic.LoopEntity, error)
CancelLoop atomically cancels a loop and populates completion data.
func (*MessageHandler) GetContextManager ¶
func (h *MessageHandler) GetContextManager(loopID string) *ContextManager
GetContextManager returns the ContextManager for a given loop ID.
func (*MessageHandler) GetLoop ¶
func (h *MessageHandler) GetLoop(loopID string) (agentic.LoopEntity, error)
GetLoop retrieves a loop entity (for testing)
func (*MessageHandler) GetTrajectory ¶
func (h *MessageHandler) GetTrajectory(loopID string) (agentic.Trajectory, error)
GetTrajectory retrieves a trajectory snapshot for a given loop ID.
func (*MessageHandler) GovernanceDispatcher ¶
func (h *MessageHandler) GovernanceDispatcher() GovernanceDispatcher
GovernanceDispatcher returns the installed dispatcher (or nil). Used by the Component's verdict subscription handlers to route inbound verdicts via HandleVerdict.
func (*MessageHandler) HandleApprovalResponse ¶
func (h *MessageHandler) HandleApprovalResponse(ctx context.Context, response agentic.ApprovalResponse) (result HandlerResult, err error)
HandleApprovalResponse processes an ApprovalResponse for a loop awaiting human approval. Returns a HandlerResult populated with either a re-dispatched ToolCall (approve/modify) or a synthesised rejection ToolResult fed through the normal HandleToolResult path (reject). The component publishes any messages and persists the loop state.
The transition out of awaiting_approval happens atomically inside LoopManager.ResolveApprovalIfPending. Two concurrent responses (e.g., a human approve racing an automated reject scheduler) cannot both pass the awaiting check — exactly one wins; the loser sees ok= false and we treat it as a stale-response idempotent drop. This is load-bearing for the safety claim: a sensitive tool must not dispatch twice off two responses for the same call_id.
Other mismatches (loop not found, response.Validate() fails) return a defensive log + non-error empty result so duplicate or stale UI clicks don't crash the loop.
func (*MessageHandler) HandleModelResponse ¶
func (h *MessageHandler) HandleModelResponse(ctx context.Context, loopID string, response agentic.AgentResponse) (HandlerResult, error)
HandleModelResponse processes a model response
func (*MessageHandler) HandleTask ¶
func (h *MessageHandler) HandleTask(ctx context.Context, task TaskMessage) (HandlerResult, error)
HandleTask processes an incoming task message and creates a new loop
func (*MessageHandler) HandleToolResult ¶
func (h *MessageHandler) HandleToolResult(ctx context.Context, loopID string, toolResult agentic.ToolResult) (HandlerResult, error)
HandleToolResult processes a tool execution result
func (*MessageHandler) SetGovernanceDispatcher ¶
func (h *MessageHandler) SetGovernanceDispatcher(d GovernanceDispatcher)
SetGovernanceDispatcher installs the subject-mode tool-call governance dispatcher (ADR-039). The dispatcher field is not synchronized once the JetStream consumer callbacks start reading it — the setter must be called between NewComponent and Start.
Nil clears any previously installed dispatcher (effectively reverting to disabled-mode pass-through). The Component wires this automatically in NewComponent based on Config.ToolCallGovernance.Mode; tests stub via this setter.
func (*MessageHandler) SetLogger ¶
func (h *MessageHandler) SetLogger(logger *slog.Logger)
SetLogger sets the logger for the handler
func (*MessageHandler) SetPersonaFragments ¶
func (h *MessageHandler) SetPersonaFragments(src PersonaFragmentSource)
SetPersonaFragments installs a source for KV-backed persona overrides. On each task the handler calls src.Fragments(ctx) and UpsertAll's the result onto the registry before assembly — this is what makes runtime persona edits take effect on the next loop without a restart. Passing nil disables the per-task refresh; the registry is used as-is.
func (*MessageHandler) SetPlatform ¶
func (h *MessageHandler) SetPlatform(p types.PlatformMeta)
SetPlatform installs the org/platform pair used to construct loop entity IDs for todo reads. The same identity is also used by other graph paths in this package (graph_writer); typically wired once from the same source at component boot.
func (*MessageHandler) SetPromptRegistry ¶
func (h *MessageHandler) SetPromptRegistry(r *prompt.Registry)
SetPromptRegistry installs the base fragment registry that composes the per-task system prompt. Passing nil clears the registry — callers get the legacy behaviour where buildInitialMessages emits only task.Context.Content + user prompt. The registry is expected to be preloaded with prompt.DefaultFragments. Product-supplied overrides (KV-backed personas) are merged at assembly time via the source passed to SetPersonaFragments.
func (*MessageHandler) SetSummarizer ¶
func (h *MessageHandler) SetSummarizer(s Summarizer, modelName string)
SetSummarizer injects an LLM-backed summarizer into the compactor. When set, context compaction generates real summaries instead of stubs. modelName is the resolved endpoint name reported in CompactionResult.
func (*MessageHandler) SetTodoReader ¶
func (h *MessageHandler) SetTodoReader(r TodoReader)
SetTodoReader installs the reader the handler uses to fetch each loop's write_todos list at iteration-build time (ADR-036 Stage 4). Passing nil disables the per-iteration todo block.
func (*MessageHandler) SetToolRegistry ¶
func (h *MessageHandler) SetToolRegistry(r component.ToolRegistryReader)
SetToolRegistry installs the shared tool registry used by discoverTools. Production wiring lives in component.go via deps.ToolRegistry; tests use this to inject a per-test registry.
func (*MessageHandler) UpdateLoop ¶
func (h *MessageHandler) UpdateLoop(entity agentic.LoopEntity) error
UpdateLoop updates a loop entity
type PersonaFragmentSource ¶
PersonaFragmentSource is the minimum surface the handler needs to pull KV-backed persona overrides at prompt-assembly time. persona.Manager satisfies it via its Fragments method; tests can stub with a trivial in-memory implementation. Kept as an interface here so the handler package doesn't carry a compile-time dependency on persona.
type ProposedToolCallPayload ¶
type ProposedToolCallPayload struct {
LoopID string `json:"loop_id"`
ParentLoopID string `json:"parent_loop_id,omitempty"`
CallID string `json:"call_id"`
ToolName string `json:"tool_name"`
Command string `json:"command,omitempty"`
URL string `json:"url,omitempty"`
Arguments map[string]any `json:"arguments,omitempty"`
}
ProposedToolCallPayload is the wire shape published to agent.toolcall.proposed.<loop_id> in audit and enforce modes. Rules match on these field paths via `$message.<field>` substitution.
ParentLoopID is nullable from day one (ADR-039) so rule conditions can match across loop hierarchies for sub-agent governance inheritance. Empty string means no parent.
type PublishedMessage ¶
PublishedMessage represents a message published to NATS
type RegionType ¶
type RegionType string
RegionType defines the type of context region
const ( RegionSystemPrompt RegionType = "system_prompt" // System prompt and instructions RegionCompactedHistory RegionType = "compacted_history" // Summarized old conversation RegionRecentHistory RegionType = "recent_history" // Recent uncompacted messages RegionToolResults RegionType = "tool_results" // Tool execution results RegionHydratedContext RegionType = "hydrated_context" // Retrieved context from memory RegionGraphEntities RegionType = "graph_entities" // Graph entity context (multi-agent) )
Context region types define different areas of the conversation context
type Summarizer ¶
type Summarizer interface {
// Summarize generates a concise summary of the given conversation messages.
// maxTokens limits the response length.
Summarize(ctx context.Context, messages []agentic.ChatMessage, maxTokens int) (string, error)
}
Summarizer abstracts the LLM call for context compaction.
type TaskMessage ¶
type TaskMessage = agentic.TaskMessage
TaskMessage is an alias for agentic.TaskMessage for backward compatibility. This allows existing code to use agenticloop.TaskMessage without modification.
type TodoReader ¶
type TodoReader interface {
ReadTodos(ctx context.Context, loopEntityID string) ([]TodoState, error)
}
TodoReader is the narrow surface MessageHandler uses to fetch the current todo list for a loop. Production satisfies it via a NATS adapter against graph.ingest.query.entity; tests substitute an in-memory implementation.
func NewNATSTodoReader ¶
func NewNATSTodoReader(client *natsclient.Client) TodoReader
NewNATSTodoReader builds a TodoReader backed by the graph.ingest.query.entity NATS surface.
type TodoState ¶
TodoState is the reconstructed shape of one todo item, assembled from the five agent.todo.* triples on a loop entity.
func ReconstructTodos ¶
ReconstructTodos rebuilds the ordered todo list from a loop entity's triples. Filters to agent.todo.* predicates, groups them into todo items, and returns the items sorted by Position (the agent's original array order).
Stage 3's write_todos appends triples in the interleaved order [id, content, status, position, updated_at] for each item in array order, then graph-ingest preserves arrival order in the entity's triple slice. So filtered triples come back in groups of five with a known per-group field order. We parse in stride and skip any malformed group (partial mid-write state, e.g. remove-then-add failure with stale triples on disk).
type ToolCallGovernanceConfig ¶
type ToolCallGovernanceConfig struct {
// Mode is one of "disabled", "audit", or "enforce". Default
// "disabled" leaves no governance gate.
Mode string `` /* 173-byte string literal not displayed */
// Timeout is the per-call wait window in enforce mode before the
// dispatcher fails-closed (rejects). Parsed as a Go duration string
// (e.g. "500ms", "2s"). Ignored when Mode is "disabled" or "audit".
Timeout string `` /* 162-byte string literal not displayed */
}
ToolCallGovernanceConfig configures subject-mode tool-call governance (ADR-039). When Mode is "disabled" (the default), tool calls dispatch directly with no governance gate. When Mode is "audit", every tool call is published to agent.toolcall.proposed.* but dispatch is NOT blocked — verdicts that arrive are logged/counted for observability. When Mode is "enforce", the loop publishes-then-waits for a per-call verdict on agent.toolcall.approved/rejected.* subjects, with Timeout as the fail-closed window.
Beta.70 retired the in-process ToolCallFilter wiring that the beta.67+68 release shipped (per ADR-039 Phase 3); subject-mode is now the sole governance path.
func DefaultToolCallGovernanceConfig ¶
func DefaultToolCallGovernanceConfig() ToolCallGovernanceConfig
DefaultToolCallGovernanceConfig returns the default tool-call governance configuration: disabled mode (preserves pre-ADR-039 behavior) with the framework default Timeout for when an operator later flips to enforce.
func (*ToolCallGovernanceConfig) EnsureDefaults ¶
func (c *ToolCallGovernanceConfig) EnsureDefaults()
EnsureDefaults fills zero-valued fields with defaults. Called from Config-level defaulting at boot so an unset governance section has a stable Mode + Timeout instead of empty strings.
func (ToolCallGovernanceConfig) IsEnabled ¶
func (c ToolCallGovernanceConfig) IsEnabled() bool
IsEnabled returns true when the governance flow is active (audit or enforce). Callers use this to skip publish setup work when governance is disabled.
func (ToolCallGovernanceConfig) IsEnforcing ¶
func (c ToolCallGovernanceConfig) IsEnforcing() bool
IsEnforcing returns true when verdicts gate dispatch. Callers use this to decide whether to wait for verdicts before dispatching.
func (ToolCallGovernanceConfig) ParsedTimeout ¶
func (c ToolCallGovernanceConfig) ParsedTimeout() time.Duration
ParsedTimeout returns the parsed Timeout duration. Falls back to the default when unset or unparseable; Validate is the safety net for malformed input.
func (ToolCallGovernanceConfig) Validate ¶
func (c ToolCallGovernanceConfig) Validate() error
Validate validates the tool-call governance configuration.
type ToolCallRejection ¶
ToolCallRejection records why a tool call was denied by the governance dispatcher. The downstream serial-dispatch path consumes this to write per-call error results for the model to react to on its next iteration.
type TrajectoryManager ¶
type TrajectoryManager struct {
// contains filtered or unexported fields
}
TrajectoryManager manages trajectory capture and persistence
func NewTrajectoryManager ¶
func NewTrajectoryManager() *TrajectoryManager
NewTrajectoryManager creates a new TrajectoryManager
func (*TrajectoryManager) AddStep ¶
func (m *TrajectoryManager) AddStep(loopID string, step agentic.TrajectoryStep) (agentic.Trajectory, error)
AddStep adds a step to a trajectory
func (*TrajectoryManager) CompleteTrajectory ¶
func (m *TrajectoryManager) CompleteTrajectory(loopID, outcome string) (agentic.Trajectory, error)
CompleteTrajectory marks a trajectory as complete
func (*TrajectoryManager) GetTrajectory ¶
func (m *TrajectoryManager) GetTrajectory(loopID string) (agentic.Trajectory, error)
GetTrajectory retrieves a trajectory by loop ID
func (*TrajectoryManager) SaveTrajectory ¶
func (m *TrajectoryManager) SaveTrajectory(_ context.Context, _ agentic.Trajectory) error
SaveTrajectory saves a trajectory to KV storage
func (*TrajectoryManager) StartTrajectory ¶
func (m *TrajectoryManager) StartTrajectory(loopID string) (agentic.Trajectory, error)
StartTrajectory starts a new trajectory for a loop
type VerdictPayload ¶
type VerdictPayload struct {
Decision string `json:"decision,omitempty"`
CallID string `json:"call_id,omitempty"`
LoopID string `json:"loop_id,omitempty"`
RuleID string `json:"rule_id,omitempty"`
Reason string `json:"reason,omitempty"`
EntityID string `json:"entity_id,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
Properties map[string]any `json:"properties,omitempty"`
}
VerdictPayload is the wire shape inbound on agent.toolcall.{approved,rejected}.<loop_id>.<call_id>. Produced by the rule engine's executeApprove action (top-level fields) OR by a rule `publish` action emitting a rejection (fields nested under Properties — the canonical ADR-039 reject pattern).
Both shapes are supported because the ADR's example rule uses `publish` + `deny` together for rejections (no extra `reject` action). EffectiveCallID and EffectiveDecision fall through the two shapes so callers don't write the same parsing code twice.
func (VerdictPayload) EffectiveCallID ¶
func (v VerdictPayload) EffectiveCallID() string
EffectiveCallID returns the routing call_id from the payload, preferring the top-level CallID (approve-action shape) and falling back to Properties["call_id"] (publish-action shape).
func (VerdictPayload) EffectiveDecision ¶
func (v VerdictPayload) EffectiveDecision() string
EffectiveDecision returns the routing decision from the payload, preferring the top-level Decision (approve-action shape) and falling back to Properties["decision"] (publish-action shape).
func (VerdictPayload) EffectiveReason ¶
func (v VerdictPayload) EffectiveReason() string
EffectiveReason returns the human-readable reason, with the same fall-through semantics as EffectiveCallID.
type VerdictPublisher ¶
type VerdictPublisher interface {
PublishToStream(ctx context.Context, subject string, data []byte) error
}
VerdictPublisher is the minimum surface the governance dispatcher needs to publish proposed-call payloads. *natsclient.Client satisfies it via PublishToStream. Kept as an interface here so tests can stub the publish path without a real NATS connection.