Documentation
¶
Index ¶
- func AddRerouteHint(input string, failure RecoveryContext) string
- func BuildControlTools(cp *AgentControlPlane) []*agent.Tool
- func BuildTaskTools(store TaskStore) []*agent.Tool
- func ComputeBackoff(attempt int) time.Duration
- type AgentControlPlane
- type AgentRun
- type AgentRunProjection
- type AgentRunStatus
- type AgentRunStore
- type BudgetAlert
- type BudgetAlertEvent
- type BudgetPolicy
- func (b *BudgetPolicy) Clone() *BudgetPolicy
- func (b *BudgetPolicy) DelegationCount() int
- func (b *BudgetPolicy) RecordDelegation(target string)
- func (b *BudgetPolicy) RecordTurn()
- func (b *BudgetPolicy) Reset()
- func (b *BudgetPolicy) Restore(state map[string]string)
- func (b *BudgetPolicy) Serialize() map[string]string
- func (b *BudgetPolicy) SetAlertHandler(fn func(BudgetAlert))
- func (b *BudgetPolicy) TurnCount() int
- func (b *BudgetPolicy) UniqueAgentCount() int
- type CauseClass
- type CircuitBreakerTrippedEvent
- type CircuitState
- type CoordinatingExecutor
- type DelegationGuard
- func (g *DelegationGuard) IsOpen(agentName string) bool
- func (g *DelegationGuard) IsProviderOpen(provider string) bool
- func (g *DelegationGuard) RecordOutcome(agentName string, success bool)
- func (g *DelegationGuard) RecordProviderFailure(provider string, success bool)
- func (g *DelegationGuard) State(agentName string) CircuitState
- type DelegationObservedEvent
- type InMemoryAgentRunStore
- func (s *InMemoryAgentRunStore) Cancel(id string) error
- func (s *InMemoryAgentRunStore) Create(run *AgentRun) error
- func (s *InMemoryAgentRunStore) Get(id string) (*AgentRun, error)
- func (s *InMemoryAgentRunStore) List() []*AgentRun
- func (s *InMemoryAgentRunStore) UpdateStatus(id string, status AgentRunStatus, result, errMsg string) error
- type InMemoryTaskStore
- type RecoveryAction
- type RecoveryContext
- type RecoveryDecisionEvent
- type RecoveryEvent
- type RecoveryPolicy
- type RecursionGuard
- type RunStats
- type TaskEntry
- type TaskStore
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddRerouteHint ¶
func AddRerouteHint(input string, failure RecoveryContext) string
AddRerouteHint wraps the input with a hint for the root orchestrator to try a different specialist agent. Incorporates learning-based fix if available.
func BuildControlTools ¶
func BuildControlTools(cp *AgentControlPlane) []*agent.Tool
BuildControlTools creates the agent lifecycle tools: agent_spawn, agent_wait, agent_stop.
func BuildTaskTools ¶
BuildTaskTools creates task management tools backed by the given TaskStore.
func ComputeBackoff ¶
ComputeBackoff returns an exponential backoff duration for the given attempt. The formula is min(baseDelay * 2^attempt, maxBackoff).
Types ¶
type AgentControlPlane ¶
type AgentControlPlane struct {
RunStore AgentRunStore
Projection *AgentRunProjection
}
AgentControlPlane provides the dependencies needed by agent lifecycle tools. Actual bgManager.Submit integration is deferred to the wiring layer (D4).
type AgentRun ¶
type AgentRun struct {
ID string // unified with bgManager task ID
ParentID string // parent agent/session
RequestedAgent string // advisory target specialist (not guaranteed routing)
Instruction string
Status AgentRunStatus
ChildSession string // child session key
Result string
Error string
SpawnDepth int
AllowedTools []string // tool restrictions for this run
CreatedAt time.Time
CompletedAt time.Time
CancelFn context.CancelFunc `json:"-"`
}
AgentRun tracks a spawned agent's lifecycle. ID is unified with the background manager's task ID via D1a projection.
type AgentRunProjection ¶
type AgentRunProjection struct {
// contains filtered or unexported fields
}
AgentRunProjection implements background.Projection to synchronize background task lifecycle events to AgentRunStore.
ID unification: RegisterPending pre-assigns the AgentRun.ID so that PrepareTask returns it to the background manager, ensuring both layers share the same canonical ID.
func NewAgentRunProjection ¶
func NewAgentRunProjection(store AgentRunStore) *AgentRunProjection
NewAgentRunProjection creates a new AgentRunProjection backed by the given store.
func (*AgentRunProjection) PrepareTask ¶
func (p *AgentRunProjection) PrepareTask(_ context.Context, _ string, _ background.Origin) (string, error)
PrepareTask implements background.Projection. It returns a pre-assigned AgentRun ID if one was registered via RegisterPending. If no pending ID exists, it returns an error — callers must always register before submit.
PrepareTask does NOT change AgentRun status; the run stays in its current state (typically Spawned) until SyncTask is called by the manager.
func (*AgentRunProjection) RegisterPending ¶
func (p *AgentRunProjection) RegisterPending(agentRunID string)
RegisterPending pre-registers an AgentRun ID so that the next PrepareTask call returns it instead of generating a new one. Called by the spawn path (D2) before bgManager.Submit.
func (*AgentRunProjection) SyncTask ¶
func (p *AgentRunProjection) SyncTask(_ context.Context, snap background.TaskSnapshot) error
SyncTask implements background.Projection. It maps background task status transitions to AgentRun status and persists them via the store.
Status mapping:
- Pending → AgentRunSpawned
- Running → AgentRunRunning
- Done → AgentRunCompleted (with result)
- Failed → AgentRunFailed (with error)
- Cancelled → AgentRunCancelled
type AgentRunStatus ¶
type AgentRunStatus string
AgentRunStatus represents the lifecycle status of an agent run.
const ( AgentRunSpawned AgentRunStatus = "spawned" AgentRunRunning AgentRunStatus = "running" AgentRunCompleted AgentRunStatus = "completed" AgentRunFailed AgentRunStatus = "failed" AgentRunCancelled AgentRunStatus = "cancelled" )
type AgentRunStore ¶
type AgentRunStore interface {
Create(run *AgentRun) error
Get(id string) (*AgentRun, error)
List() []*AgentRun
UpdateStatus(id string, status AgentRunStatus, result, errMsg string) error
Cancel(id string) error
}
AgentRunStore manages agent run lifecycle.
type BudgetAlert ¶
type BudgetAlert struct {
Resource string // "turns" or "delegations"
Used int
Limit int
Percentage float64
}
BudgetAlert describes a budget threshold crossing.
type BudgetAlertEvent ¶
type BudgetAlertEvent struct {
Resource string // "turns" or "delegations"
Used int
Limit int
Percentage float64
SessionID string
}
BudgetAlertEvent is published when a budget threshold is crossed.
func (BudgetAlertEvent) EventName ¶
func (e BudgetAlertEvent) EventName() string
type BudgetPolicy ¶
type BudgetPolicy struct {
// contains filtered or unexported fields
}
BudgetPolicy mirrors the inner executor's turn and delegation counts observationally. It does NOT enforce limits — the inner executor's hardcoded limits remain authoritative. Mirroring uses the same counting semantics as agent.go:350: only events with function calls that are not delegations count as turns.
func NewBudgetPolicy ¶
func NewBudgetPolicy(cfg config.BudgetCfg) *BudgetPolicy
NewBudgetPolicy creates a budget policy from config.
func (*BudgetPolicy) Clone ¶
func (b *BudgetPolicy) Clone() *BudgetPolicy
Clone returns a new per-run tracker with the same immutable thresholds and alert handler. Mutable counters are intentionally reset so concurrent turns never share observational state.
func (*BudgetPolicy) DelegationCount ¶
func (b *BudgetPolicy) DelegationCount() int
DelegationCount returns the current mirrored delegation count.
func (*BudgetPolicy) RecordDelegation ¶
func (b *BudgetPolicy) RecordDelegation(target string)
RecordDelegation records a delegation to a target agent.
func (*BudgetPolicy) RecordTurn ¶
func (b *BudgetPolicy) RecordTurn()
RecordTurn increments the turn counter. Should only be called for function-call events that are not delegations, matching inner budget semantics (agent.go:350).
func (*BudgetPolicy) Reset ¶
func (b *BudgetPolicy) Reset()
Reset clears all counters for a new turn.
func (*BudgetPolicy) Restore ¶
func (b *BudgetPolicy) Restore(state map[string]string)
Restore sets the budget counters from a previously serialized state map. Missing or malformed keys are silently ignored.
func (*BudgetPolicy) Serialize ¶
func (b *BudgetPolicy) Serialize() map[string]string
Serialize returns the budget counters as a string map suitable for persisting into Session.Metadata.
func (*BudgetPolicy) SetAlertHandler ¶
func (b *BudgetPolicy) SetAlertHandler(fn func(BudgetAlert))
SetAlertHandler sets the callback for budget alerts.
func (*BudgetPolicy) TurnCount ¶
func (b *BudgetPolicy) TurnCount() int
TurnCount returns the current mirrored turn count.
func (*BudgetPolicy) UniqueAgentCount ¶
func (b *BudgetPolicy) UniqueAgentCount() int
UniqueAgentCount returns the number of distinct agents delegated to.
type CauseClass ¶
type CauseClass string
CauseClass categorizes errors for per-class retry limit decisions.
const ( CauseRateLimit CauseClass = "rate_limit" CauseTransient CauseClass = "transient" CauseMalformedToolCall CauseClass = "malformed_tool_call" CauseTimeout CauseClass = "timeout" CauseUnknown CauseClass = "unknown" )
type CircuitBreakerTrippedEvent ¶
CircuitBreakerTrippedEvent is published when an agent's circuit breaker opens.
func (CircuitBreakerTrippedEvent) EventName ¶
func (e CircuitBreakerTrippedEvent) EventName() string
type CircuitState ¶
type CircuitState int
CircuitState represents the circuit breaker state per agent.
const ( CircuitClosed CircuitState = iota + 1 CircuitOpen CircuitHalfOpen )
type CoordinatingExecutor ¶
type CoordinatingExecutor struct {
// contains filtered or unexported fields
}
CoordinatingExecutor wraps a turnrunner.Executor to apply DelegationGuard, BudgetPolicy, and RecoveryPolicy. It is a policy/observation wrapper, not a new execution engine. Routing authority remains with the root orchestrator LLM.
func NewCoordinatingExecutor ¶
func NewCoordinatingExecutor( inner turnrunner.Executor, guard *DelegationGuard, budget *BudgetPolicy, recovery *RecoveryPolicy, bus *eventbus.Bus, ) *CoordinatingExecutor
NewCoordinatingExecutor creates a coordinating executor wrapping the inner executor.
func (*CoordinatingExecutor) LastRunStatsForSession ¶
func (c *CoordinatingExecutor) LastRunStatsForSession(sessionID string) (RunStats, bool)
LastRunStatsForSession returns the budget counters from the most recent RunStreamingDetailed call for the given session, then removes the entry. This is session-safe: concurrent runs for different sessions do not overwrite each other's stats.
func (*CoordinatingExecutor) RunStreamingDetailed ¶
func (c *CoordinatingExecutor) RunStreamingDetailed( ctx context.Context, sessionID, input string, onChunk adk.ChunkCallback, opts ...adk.RunOption, ) (adk.RunReport, error)
RunStreamingDetailed implements turnrunner.Executor.
type DelegationGuard ¶
type DelegationGuard struct {
// contains filtered or unexported fields
}
DelegationGuard observes delegation events and maintains per-agent circuit breaker state. It does NOT make routing decisions — routing authority remains with the root orchestrator LLM.
func NewDelegationGuard ¶
func NewDelegationGuard(cfg config.CircuitBreakerCfg, bus *eventbus.Bus) *DelegationGuard
NewDelegationGuard creates a delegation guard.
func (*DelegationGuard) IsOpen ¶
func (g *DelegationGuard) IsOpen(agentName string) bool
IsOpen returns true if the agent's circuit is open (failing, should not receive delegations).
func (*DelegationGuard) IsProviderOpen ¶
func (g *DelegationGuard) IsProviderOpen(provider string) bool
IsProviderOpen returns true if the provider's circuit is open.
func (*DelegationGuard) RecordOutcome ¶
func (g *DelegationGuard) RecordOutcome(agentName string, success bool)
RecordOutcome records the result of a delegation to update circuit breaker state.
func (*DelegationGuard) RecordProviderFailure ¶
func (g *DelegationGuard) RecordProviderFailure(provider string, success bool)
RecordProviderFailure records the result of a provider-level operation to update circuit breaker state. Provider keys are prefixed with "provider:" to avoid collision with agent names.
func (*DelegationGuard) State ¶
func (g *DelegationGuard) State(agentName string) CircuitState
State returns the current circuit state for an agent.
type DelegationObservedEvent ¶
type DelegationObservedEvent struct {
From string
To string
IsOpen bool // true if target agent's circuit is open
SessionID string
}
DelegationObservedEvent is published when a delegation is observed by the guard.
func (DelegationObservedEvent) EventName ¶
func (e DelegationObservedEvent) EventName() string
type InMemoryAgentRunStore ¶
type InMemoryAgentRunStore struct {
// contains filtered or unexported fields
}
InMemoryAgentRunStore is a thread-safe in-memory implementation of AgentRunStore.
func NewInMemoryAgentRunStore ¶
func NewInMemoryAgentRunStore() *InMemoryAgentRunStore
NewInMemoryAgentRunStore creates a new in-memory agent run store.
func (*InMemoryAgentRunStore) Cancel ¶
func (s *InMemoryAgentRunStore) Cancel(id string) error
Cancel cancels an agent run by calling its CancelFn (if set) and setting the status to Cancelled. Returns an error if the run is not found or is already in a terminal status. This follows the same guard pattern as background.Manager.
func (*InMemoryAgentRunStore) Create ¶
func (s *InMemoryAgentRunStore) Create(run *AgentRun) error
Create stores a new agent run. Returns an error if the ID already exists.
func (*InMemoryAgentRunStore) Get ¶
func (s *InMemoryAgentRunStore) Get(id string) (*AgentRun, error)
Get returns a copy of the agent run with the given ID. Returns an error if the run is not found.
func (*InMemoryAgentRunStore) List ¶
func (s *InMemoryAgentRunStore) List() []*AgentRun
List returns copies of all agent runs.
func (*InMemoryAgentRunStore) UpdateStatus ¶
func (s *InMemoryAgentRunStore) UpdateStatus(id string, status AgentRunStatus, result, errMsg string) error
UpdateStatus updates the status, result, error, and completedAt fields of an agent run. Returns an error if the run is not found or is already in a terminal status.
type InMemoryTaskStore ¶
type InMemoryTaskStore struct {
// contains filtered or unexported fields
}
InMemoryTaskStore is a thread-safe in-memory implementation of TaskStore.
func NewInMemoryTaskStore ¶
func NewInMemoryTaskStore() *InMemoryTaskStore
NewInMemoryTaskStore creates a new in-memory task store.
func (*InMemoryTaskStore) Create ¶
func (s *InMemoryTaskStore) Create(entry *TaskEntry) error
Create stores a new task entry. Returns an error if the entry is nil or the ID already exists.
func (*InMemoryTaskStore) Get ¶
func (s *InMemoryTaskStore) Get(id string) (*TaskEntry, error)
Get returns a copy of the task entry with the given ID. Returns an error if the task is not found.
func (*InMemoryTaskStore) List ¶
func (s *InMemoryTaskStore) List(statusFilter, parentFilter string) []*TaskEntry
List returns copies of tasks matching the optional filters. Empty filter strings match all tasks.
type RecoveryAction ¶
type RecoveryAction int
RecoveryAction describes the decision made by the recovery policy.
const ( RecoveryNone RecoveryAction = iota + 1 RecoveryRetry // same agent, same input RecoveryRetryWithHint // root orchestrator with "try different specialist" hint RecoveryDirectAnswer // use partial result to compose response RecoveryEscalate // return error to caller )
func (RecoveryAction) String ¶
func (a RecoveryAction) String() string
type RecoveryContext ¶
type RecoveryContext struct {
Error error
AgentName string
PartialResult string
RetryCount int
SessionID string
LearningFix string // populated by tryLearningFix if ErrorFixProvider returns a fix
ClassRetryCounts map[CauseClass]int // per-class retry counts across attempts
}
RecoveryContext provides information about a failed execution.
type RecoveryDecisionEvent ¶
type RecoveryDecisionEvent struct {
CauseClass string
Action string
Attempt int
Backoff time.Duration
SessionKey string
}
RecoveryDecisionEvent is published when a recovery decision is made, carrying detailed metadata for observability.
func (RecoveryDecisionEvent) EventName ¶
func (e RecoveryDecisionEvent) EventName() string
type RecoveryEvent ¶
type RecoveryEvent struct {
Action RecoveryAction
AgentName string
Error string
SessionID string
}
RecoveryEvent is published when a recovery action is taken.
func (RecoveryEvent) EventName ¶
func (e RecoveryEvent) EventName() string
type RecoveryPolicy ¶
type RecoveryPolicy struct {
// contains filtered or unexported fields
}
RecoveryPolicy decides how to handle agent execution failures. RecoveryRetryWithHint is NOT per-agent direct execution — it adds a prompt hint requesting the root orchestrator to try a different specialist.
func NewRecoveryPolicy ¶
func NewRecoveryPolicy(cfg config.RecoveryCfg, provider adk.ErrorFixProvider) *RecoveryPolicy
NewRecoveryPolicy creates a recovery policy from config.
func (*RecoveryPolicy) Decide ¶
func (p *RecoveryPolicy) Decide(ctx context.Context, failure *RecoveryContext) RecoveryAction
Decide evaluates a failure and returns the recommended recovery action. It checks per-error-class retry limits in addition to the global maxRetries.
type RecursionGuard ¶
type RecursionGuard struct {
MaxDepth int
}
RecursionGuard prevents runaway agent spawn recursion by enforcing depth limits, self-spawn rejection, and cycle detection.
func NewRecursionGuard ¶
func NewRecursionGuard(maxDepth int) *RecursionGuard
NewRecursionGuard creates a RecursionGuard with the given maximum spawn depth. If maxDepth <= 0, the default of 3 is used.
func (*RecursionGuard) Check ¶
func (g *RecursionGuard) Check(ctx context.Context, spawner, target string) error
Check verifies that a spawn from spawner to target is allowed under the current context. It returns an error if:
- SpawnDepth from context >= MaxDepth (depth exceeded)
- spawner == target (self-spawn)
- target already appears in the spawn chain (cycle detected)
type TaskEntry ¶
type TaskEntry struct {
ID string
Title string
Status string // "todo", "in_progress", "done", "blocked"
AgentID string // optional link to spawned agent
ParentID string // for hierarchical tasks
Description string
CreatedAt time.Time
UpdatedAt time.Time
}
TaskEntry represents a structured task for tracking work.