agentrt

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddRerouteHint

func AddRerouteHint(input string, failure RecoveryContext) string

AddRerouteHint wraps the input with a hint for the root orchestrator to try a different specialist agent. Incorporates learning-based fix if available.

func BuildControlTools

func BuildControlTools(cp *AgentControlPlane) []*agent.Tool

BuildControlTools creates the agent lifecycle tools: agent_spawn, agent_wait, agent_stop.

func BuildTaskTools

func BuildTaskTools(store TaskStore) []*agent.Tool

BuildTaskTools creates task management tools backed by the given TaskStore.

func ComputeBackoff

func ComputeBackoff(attempt int) time.Duration

ComputeBackoff returns an exponential backoff duration for the given attempt. The formula is min(baseDelay * 2^attempt, maxBackoff).

Types

type AgentControlPlane

type AgentControlPlane struct {
	RunStore   AgentRunStore
	Projection *AgentRunProjection
}

AgentControlPlane provides the dependencies needed by agent lifecycle tools. Actual bgManager.Submit integration is deferred to the wiring layer (D4).

type AgentRun

type AgentRun struct {
	ID             string // unified with bgManager task ID
	ParentID       string // parent agent/session
	RequestedAgent string // advisory target specialist (not guaranteed routing)
	Instruction    string
	Status         AgentRunStatus
	ChildSession   string // child session key
	Result         string
	Error          string
	SpawnDepth     int
	AllowedTools   []string // tool restrictions for this run
	CreatedAt      time.Time
	CompletedAt    time.Time
	CancelFn       context.CancelFunc `json:"-"`
}

AgentRun tracks a spawned agent's lifecycle. ID is unified with the background manager's task ID via D1a projection.

type AgentRunProjection

type AgentRunProjection struct {
	// contains filtered or unexported fields
}

AgentRunProjection implements background.Projection to synchronize background task lifecycle events to AgentRunStore.

ID unification: RegisterPending pre-assigns the AgentRun.ID so that PrepareTask returns it to the background manager, ensuring both layers share the same canonical ID.

func NewAgentRunProjection

func NewAgentRunProjection(store AgentRunStore) *AgentRunProjection

NewAgentRunProjection creates a new AgentRunProjection backed by the given store.

func (*AgentRunProjection) PrepareTask

PrepareTask implements background.Projection. It returns a pre-assigned AgentRun ID if one was registered via RegisterPending. If no pending ID exists, it returns an error — callers must always register before submit.

PrepareTask does NOT change AgentRun status; the run stays in its current state (typically Spawned) until SyncTask is called by the manager.

func (*AgentRunProjection) RegisterPending

func (p *AgentRunProjection) RegisterPending(agentRunID string)

RegisterPending pre-registers an AgentRun ID so that the next PrepareTask call returns it instead of generating a new one. Called by the spawn path (D2) before bgManager.Submit.

func (*AgentRunProjection) SyncTask

SyncTask implements background.Projection. It maps background task status transitions to AgentRun status and persists them via the store.

Status mapping:

  • Pending → AgentRunSpawned
  • Running → AgentRunRunning
  • Done → AgentRunCompleted (with result)
  • Failed → AgentRunFailed (with error)
  • Cancelled → AgentRunCancelled

type AgentRunStatus

type AgentRunStatus string

AgentRunStatus represents the lifecycle status of an agent run.

const (
	AgentRunSpawned   AgentRunStatus = "spawned"
	AgentRunRunning   AgentRunStatus = "running"
	AgentRunCompleted AgentRunStatus = "completed"
	AgentRunFailed    AgentRunStatus = "failed"
	AgentRunCancelled AgentRunStatus = "cancelled"
)

type AgentRunStore

type AgentRunStore interface {
	Create(run *AgentRun) error
	Get(id string) (*AgentRun, error)
	List() []*AgentRun
	UpdateStatus(id string, status AgentRunStatus, result, errMsg string) error
	Cancel(id string) error
}

AgentRunStore manages agent run lifecycle.

type BudgetAlert

type BudgetAlert struct {
	Resource   string // "turns" or "delegations"
	Used       int
	Limit      int
	Percentage float64
}

BudgetAlert describes a budget threshold crossing.

type BudgetAlertEvent

type BudgetAlertEvent struct {
	Resource   string // "turns" or "delegations"
	Used       int
	Limit      int
	Percentage float64
	SessionID  string
}

BudgetAlertEvent is published when a budget threshold is crossed.

func (BudgetAlertEvent) EventName

func (e BudgetAlertEvent) EventName() string

type BudgetPolicy

type BudgetPolicy struct {
	// contains filtered or unexported fields
}

BudgetPolicy mirrors the inner executor's turn and delegation counts observationally. It does NOT enforce limits — the inner executor's hardcoded limits remain authoritative. Mirroring uses the same counting semantics as agent.go:350: only events with function calls that are not delegations count as turns.

func NewBudgetPolicy

func NewBudgetPolicy(cfg config.BudgetCfg) *BudgetPolicy

NewBudgetPolicy creates a budget policy from config.

func (*BudgetPolicy) Clone

func (b *BudgetPolicy) Clone() *BudgetPolicy

Clone returns a new per-run tracker with the same immutable thresholds and alert handler. Mutable counters are intentionally reset so concurrent turns never share observational state.

func (*BudgetPolicy) DelegationCount

func (b *BudgetPolicy) DelegationCount() int

DelegationCount returns the current mirrored delegation count.

func (*BudgetPolicy) RecordDelegation

func (b *BudgetPolicy) RecordDelegation(target string)

RecordDelegation records a delegation to a target agent.

func (*BudgetPolicy) RecordTurn

func (b *BudgetPolicy) RecordTurn()

RecordTurn increments the turn counter. Should only be called for function-call events that are not delegations, matching inner budget semantics (agent.go:350).

func (*BudgetPolicy) Reset

func (b *BudgetPolicy) Reset()

Reset clears all counters for a new turn.

func (*BudgetPolicy) Restore

func (b *BudgetPolicy) Restore(state map[string]string)

Restore sets the budget counters from a previously serialized state map. Missing or malformed keys are silently ignored.

func (*BudgetPolicy) Serialize

func (b *BudgetPolicy) Serialize() map[string]string

Serialize returns the budget counters as a string map suitable for persisting into Session.Metadata.

func (*BudgetPolicy) SetAlertHandler

func (b *BudgetPolicy) SetAlertHandler(fn func(BudgetAlert))

SetAlertHandler sets the callback for budget alerts.

func (*BudgetPolicy) TurnCount

func (b *BudgetPolicy) TurnCount() int

TurnCount returns the current mirrored turn count.

func (*BudgetPolicy) UniqueAgentCount

func (b *BudgetPolicy) UniqueAgentCount() int

UniqueAgentCount returns the number of distinct agents delegated to.

type CauseClass

type CauseClass string

CauseClass categorizes errors for per-class retry limit decisions.

const (
	CauseRateLimit         CauseClass = "rate_limit"
	CauseTransient         CauseClass = "transient"
	CauseMalformedToolCall CauseClass = "malformed_tool_call"
	CauseTimeout           CauseClass = "timeout"
	CauseUnknown           CauseClass = "unknown"
)

type CircuitBreakerTrippedEvent

type CircuitBreakerTrippedEvent struct {
	AgentName    string
	FailureCount int
	ResetAt      time.Time
}

CircuitBreakerTrippedEvent is published when an agent's circuit breaker opens.

func (CircuitBreakerTrippedEvent) EventName

func (e CircuitBreakerTrippedEvent) EventName() string

type CircuitState

type CircuitState int

CircuitState represents the circuit breaker state per agent.

const (
	CircuitClosed CircuitState = iota + 1
	CircuitOpen
	CircuitHalfOpen
)

type CoordinatingExecutor

type CoordinatingExecutor struct {
	// contains filtered or unexported fields
}

CoordinatingExecutor wraps a turnrunner.Executor to apply DelegationGuard, BudgetPolicy, and RecoveryPolicy. It is a policy/observation wrapper, not a new execution engine. Routing authority remains with the root orchestrator LLM.

func NewCoordinatingExecutor

func NewCoordinatingExecutor(
	inner turnrunner.Executor,
	guard *DelegationGuard,
	budget *BudgetPolicy,
	recovery *RecoveryPolicy,
	bus *eventbus.Bus,
) *CoordinatingExecutor

NewCoordinatingExecutor creates a coordinating executor wrapping the inner executor.

func (*CoordinatingExecutor) LastRunStatsForSession

func (c *CoordinatingExecutor) LastRunStatsForSession(sessionID string) (RunStats, bool)

LastRunStatsForSession returns the budget counters from the most recent RunStreamingDetailed call for the given session, then removes the entry. This is session-safe: concurrent runs for different sessions do not overwrite each other's stats.

func (*CoordinatingExecutor) RunStreamingDetailed

func (c *CoordinatingExecutor) RunStreamingDetailed(
	ctx context.Context,
	sessionID, input string,
	onChunk adk.ChunkCallback,
	opts ...adk.RunOption,
) (adk.RunReport, error)

RunStreamingDetailed implements turnrunner.Executor.

type DelegationGuard

type DelegationGuard struct {
	// contains filtered or unexported fields
}

DelegationGuard observes delegation events and maintains per-agent circuit breaker state. It does NOT make routing decisions — routing authority remains with the root orchestrator LLM.

func NewDelegationGuard

func NewDelegationGuard(cfg config.CircuitBreakerCfg, bus *eventbus.Bus) *DelegationGuard

NewDelegationGuard creates a delegation guard.

func (*DelegationGuard) IsOpen

func (g *DelegationGuard) IsOpen(agentName string) bool

IsOpen returns true if the agent's circuit is open (failing, should not receive delegations).

func (*DelegationGuard) IsProviderOpen

func (g *DelegationGuard) IsProviderOpen(provider string) bool

IsProviderOpen returns true if the provider's circuit is open.

func (*DelegationGuard) RecordOutcome

func (g *DelegationGuard) RecordOutcome(agentName string, success bool)

RecordOutcome records the result of a delegation to update circuit breaker state.

func (*DelegationGuard) RecordProviderFailure

func (g *DelegationGuard) RecordProviderFailure(provider string, success bool)

RecordProviderFailure records the result of a provider-level operation to update circuit breaker state. Provider keys are prefixed with "provider:" to avoid collision with agent names.

func (*DelegationGuard) State

func (g *DelegationGuard) State(agentName string) CircuitState

State returns the current circuit state for an agent.

type DelegationObservedEvent

type DelegationObservedEvent struct {
	From      string
	To        string
	IsOpen    bool // true if target agent's circuit is open
	SessionID string
}

DelegationObservedEvent is published when a delegation is observed by the guard.

func (DelegationObservedEvent) EventName

func (e DelegationObservedEvent) EventName() string

type InMemoryAgentRunStore

type InMemoryAgentRunStore struct {
	// contains filtered or unexported fields
}

InMemoryAgentRunStore is a thread-safe in-memory implementation of AgentRunStore.

func NewInMemoryAgentRunStore

func NewInMemoryAgentRunStore() *InMemoryAgentRunStore

NewInMemoryAgentRunStore creates a new in-memory agent run store.

func (*InMemoryAgentRunStore) Cancel

func (s *InMemoryAgentRunStore) Cancel(id string) error

Cancel cancels an agent run by calling its CancelFn (if set) and setting the status to Cancelled. Returns an error if the run is not found or is already in a terminal status. This follows the same guard pattern as background.Manager.

func (*InMemoryAgentRunStore) Create

func (s *InMemoryAgentRunStore) Create(run *AgentRun) error

Create stores a new agent run. Returns an error if the ID already exists.

func (*InMemoryAgentRunStore) Get

func (s *InMemoryAgentRunStore) Get(id string) (*AgentRun, error)

Get returns a copy of the agent run with the given ID. Returns an error if the run is not found.

func (*InMemoryAgentRunStore) List

func (s *InMemoryAgentRunStore) List() []*AgentRun

List returns copies of all agent runs.

func (*InMemoryAgentRunStore) UpdateStatus

func (s *InMemoryAgentRunStore) UpdateStatus(id string, status AgentRunStatus, result, errMsg string) error

UpdateStatus updates the status, result, error, and completedAt fields of an agent run. Returns an error if the run is not found or is already in a terminal status.

type InMemoryTaskStore

type InMemoryTaskStore struct {
	// contains filtered or unexported fields
}

InMemoryTaskStore is a thread-safe in-memory implementation of TaskStore.

func NewInMemoryTaskStore

func NewInMemoryTaskStore() *InMemoryTaskStore

NewInMemoryTaskStore creates a new in-memory task store.

func (*InMemoryTaskStore) Create

func (s *InMemoryTaskStore) Create(entry *TaskEntry) error

Create stores a new task entry. Returns an error if the entry is nil or the ID already exists.

func (*InMemoryTaskStore) Get

func (s *InMemoryTaskStore) Get(id string) (*TaskEntry, error)

Get returns a copy of the task entry with the given ID. Returns an error if the task is not found.

func (*InMemoryTaskStore) List

func (s *InMemoryTaskStore) List(statusFilter, parentFilter string) []*TaskEntry

List returns copies of tasks matching the optional filters. Empty filter strings match all tasks.

func (*InMemoryTaskStore) Update

func (s *InMemoryTaskStore) Update(id string, status, description string) error

Update updates the status and/or description of a task. Returns an error if the task is not found. Empty strings are treated as "no change" for each field.

type RecoveryAction

type RecoveryAction int

RecoveryAction describes the decision made by the recovery policy.

const (
	RecoveryNone          RecoveryAction = iota + 1
	RecoveryRetry                        // same agent, same input
	RecoveryRetryWithHint                // root orchestrator with "try different specialist" hint
	RecoveryDirectAnswer                 // use partial result to compose response
	RecoveryEscalate                     // return error to caller
)

func (RecoveryAction) String

func (a RecoveryAction) String() string

type RecoveryContext

type RecoveryContext struct {
	Error            error
	AgentName        string
	PartialResult    string
	RetryCount       int
	SessionID        string
	LearningFix      string             // populated by tryLearningFix if ErrorFixProvider returns a fix
	ClassRetryCounts map[CauseClass]int // per-class retry counts across attempts
}

RecoveryContext provides information about a failed execution.

type RecoveryDecisionEvent

type RecoveryDecisionEvent struct {
	CauseClass string
	Action     string
	Attempt    int
	Backoff    time.Duration
	SessionKey string
}

RecoveryDecisionEvent is published when a recovery decision is made, carrying detailed metadata for observability.

func (RecoveryDecisionEvent) EventName

func (e RecoveryDecisionEvent) EventName() string

type RecoveryEvent

type RecoveryEvent struct {
	Action    RecoveryAction
	AgentName string
	Error     string
	SessionID string
}

RecoveryEvent is published when a recovery action is taken.

func (RecoveryEvent) EventName

func (e RecoveryEvent) EventName() string

type RecoveryPolicy

type RecoveryPolicy struct {
	// contains filtered or unexported fields
}

RecoveryPolicy decides how to handle agent execution failures. RecoveryRetryWithHint is NOT per-agent direct execution — it adds a prompt hint requesting the root orchestrator to try a different specialist.

func NewRecoveryPolicy

func NewRecoveryPolicy(cfg config.RecoveryCfg, provider adk.ErrorFixProvider) *RecoveryPolicy

NewRecoveryPolicy creates a recovery policy from config.

func (*RecoveryPolicy) Decide

func (p *RecoveryPolicy) Decide(ctx context.Context, failure *RecoveryContext) RecoveryAction

Decide evaluates a failure and returns the recommended recovery action. It checks per-error-class retry limits in addition to the global maxRetries.

type RecursionGuard

type RecursionGuard struct {
	MaxDepth int
}

RecursionGuard prevents runaway agent spawn recursion by enforcing depth limits, self-spawn rejection, and cycle detection.

func NewRecursionGuard

func NewRecursionGuard(maxDepth int) *RecursionGuard

NewRecursionGuard creates a RecursionGuard with the given maximum spawn depth. If maxDepth <= 0, the default of 3 is used.

func (*RecursionGuard) Check

func (g *RecursionGuard) Check(ctx context.Context, spawner, target string) error

Check verifies that a spawn from spawner to target is allowed under the current context. It returns an error if:

  • SpawnDepth from context >= MaxDepth (depth exceeded)
  • spawner == target (self-spawn)
  • target already appears in the spawn chain (cycle detected)

type RunStats

type RunStats struct {
	Turns       int
	Delegations int
}

RunStats holds the final budget counters from a single RunStreamingDetailed call.

type TaskEntry

type TaskEntry struct {
	ID          string
	Title       string
	Status      string // "todo", "in_progress", "done", "blocked"
	AgentID     string // optional link to spawned agent
	ParentID    string // for hierarchical tasks
	Description string
	CreatedAt   time.Time
	UpdatedAt   time.Time
}

TaskEntry represents a structured task for tracking work.

type TaskStore

type TaskStore interface {
	Create(entry *TaskEntry) error
	Get(id string) (*TaskEntry, error)
	List(statusFilter, parentFilter string) []*TaskEntry
	Update(id string, status, description string) error
}

TaskStore manages task lifecycle.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL