agent

package
v1.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2026 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Overview

Package agent — run_state.go provides structured run lifecycle tracking.

RunRegistry replaces the implicit "in-progress or gone" model with an observable state machine: QUEUED → RUNNING → COMPLETED|FAILED|TERMINATED|BLOCKED|DENIED. Operators can list active runs, inspect per-run cost/step metrics, and issue kill signals via the admin API.

Package agent implements the core agent orchestration pipeline.

The pipeline executes in a fixed sequence: load policy → classify input → scan attachments → evaluate OPA policy → resolve secrets → route LLM → call provider → classify output → generate evidence. Every invocation produces a signed evidence record, even on failures or policy denials.

Extension points:

  • Hooks: register pre/post callbacks at any pipeline stage (see HookRegistry).
  • Plan review: gate LLM calls behind human approval (see PlanReviewStore).
  • Tools: register MCP-compatible tools via agent/tools.ToolRegistry.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPlanNotFound   = errors.New("execution plan not found")
	ErrPlanNotPending = errors.New("plan is not in pending status")
)

Functions

func RequiresReview

func RequiresReview(humanOversight string, dataTier int, costEstimate float64, hasTools bool, planConfig *PlanReviewConfig, planText ...string) bool

RequiresReview checks if the current request needs human review based on policy. RequiresReview determines if a plan requires human review. planText is optional; when provided, volume detection (Gap E) checks for destructive verbs near large numbers.

Types

type ActiveRunTracker

type ActiveRunTracker struct {
	// contains filtered or unexported fields
}

ActiveRunTracker counts in-flight runs per tenant for rate-limit policy (concurrent_executions) and provides kill switch support via correlation-ID-keyed cancel functions. Safe for concurrent use. When nil, rate-limit policy input concurrent_executions is not set.

func NewActiveRunTracker

func NewActiveRunTracker() *ActiveRunTracker

NewActiveRunTracker creates a new tracker.

func (*ActiveRunTracker) ActiveRunCount

func (t *ActiveRunTracker) ActiveRunCount() int

ActiveRunCount returns the number of active runs being tracked with cancel functions.

func (*ActiveRunTracker) Count

func (t *ActiveRunTracker) Count(tenantID string) int

Count returns the current in-flight run count for the tenant (including the run that just called Increment).

func (*ActiveRunTracker) Decrement

func (t *ActiveRunTracker) Decrement(tenantID string)

Decrement removes one in-flight run for the tenant.

func (*ActiveRunTracker) Deregister

func (t *ActiveRunTracker) Deregister(correlationID string)

Deregister removes a completed run from the tracker.

func (*ActiveRunTracker) Increment

func (t *ActiveRunTracker) Increment(tenantID string)

Increment adds one in-flight run for the tenant.

func (*ActiveRunTracker) Kill

func (t *ActiveRunTracker) Kill(correlationID string) bool

Kill cancels a running agent by correlation ID. Returns true if found.

func (*ActiveRunTracker) KillAllForTenant

func (t *ActiveRunTracker) KillAllForTenant(tenantID string) int

KillAllForTenant cancels all running agents for a tenant. Returns count of killed runs.

func (*ActiveRunTracker) Register

func (t *ActiveRunTracker) Register(tenantID, correlationID string, cancel context.CancelFunc)

Register stores a cancel function for a running agent, keyed by correlation ID.

type Annotation

type Annotation struct {
	ID        string    `json:"id"`
	Type      string    `json:"type"` // "comment" | "delete" | "modify"
	Content   string    `json:"content"`
	Section   string    `json:"section,omitempty"`
	CreatedBy string    `json:"created_by"`
	CreatedAt time.Time `json:"created_at"`
}

Annotation represents a reviewer's comment or modification on a plan.

type ApprovalLevel

type ApprovalLevel struct {
	Role              string `yaml:"role"`
	TimeoutMinutes    int    `yaml:"timeout_minutes,omitempty"`
	EscalateOnTimeout bool   `yaml:"escalate_on_timeout,omitempty"`
}

type Attachment

type Attachment struct {
	Filename string
	Content  []byte
}

Attachment is a file attached to a run request.

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker tracks policy denial counts per agent and opens the circuit when repeated denials exceed the threshold within a window. Only policy denials (not tool execution failures) feed the circuit breaker.

func NewCircuitBreaker

func NewCircuitBreaker(threshold int, window time.Duration) *CircuitBreaker

NewCircuitBreaker creates a circuit breaker with the given threshold and window. threshold: number of denials in window to trip the circuit (default 5). window: sliding window duration (default 60s).

func (*CircuitBreaker) Check

func (cb *CircuitBreaker) Check(tenantID, agentID string) error

Check returns nil if the agent is allowed to proceed, or an error if the circuit is open. In half-open state, allows one probe request.

func (*CircuitBreaker) RecordPolicyDenial

func (cb *CircuitBreaker) RecordPolicyDenial(tenantID, agentID string)

RecordPolicyDenial records a policy denial for the agent. If the threshold is exceeded within the window, the circuit opens. In half-open state, a single denial (failed probe) reopens the circuit immediately without requiring threshold denials again.

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess(tenantID, agentID string)

RecordSuccess records a successful policy evaluation. If the circuit is half-open, this closes it (the probe succeeded).

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset(tenantID, agentID string)

Reset manually resets the circuit for an agent (operator override).

func (*CircuitBreaker) State

func (cb *CircuitBreaker) State(tenantID, agentID string) CircuitState

State returns the current circuit state for an agent.

type CircuitState

type CircuitState int

CircuitState represents the circuit breaker state.

const (
	CircuitClosed   CircuitState = iota // Normal: requests flow through
	CircuitOpen                         // Tripped: requests denied immediately
	CircuitHalfOpen                     // Probe: one request allowed to test recovery
)

type ExecutionPlan

type ExecutionPlan struct {
	ID               string       `json:"id"`
	CorrelationID    string       `json:"correlation_id"`
	SessionID        string       `json:"session_id,omitempty"`
	TenantID         string       `json:"tenant_id"`
	AgentID          string       `json:"agent_id"`
	Status           PlanStatus   `json:"status"`
	SelectedModel    string       `json:"selected_model"`
	DataTier         int          `json:"data_tier"`
	ToolsAvailable   []string     `json:"tools_available"`
	CostEstimate     float64      `json:"cost_estimate"`
	PolicyDecision   string       `json:"policy_decision"`
	SystemPromptHash string       `json:"system_prompt_hash"`
	InputHash        string       `json:"input_hash"`
	Prompt           string       `json:"prompt,omitempty"`      // canonical prompt to execute after approval
	PolicyPath       string       `json:"policy_path,omitempty"` // resolved policy path used when plan was created
	ReviewedBy       string       `json:"reviewed_by,omitempty"`
	ReviewedAt       *time.Time   `json:"reviewed_at,omitempty"`
	ReviewReason     string       `json:"review_reason,omitempty"`
	Annotations      []Annotation `json:"annotations,omitempty"`
	ProposedSteps    []string     `json:"proposed_steps,omitempty"` // LLM-proposed task steps for Art. 11/13 transparency
	CreatedAt        time.Time    `json:"created_at"`
	TimeoutAt        time.Time    `json:"timeout_at"`
}

ExecutionPlan captures agent intent before LLM invocation. Stored as first-class evidence for EU AI Act Art. 11 (Technical Documentation) and Art. 13 (Transparency) compliance.

func GenerateExecutionPlan

func GenerateExecutionPlan(
	correlationID, tenantID, agentID, selectedModel string,
	dataTier int,
	toolsAvailable []string,
	costEstimate float64,
	policyDecision string,
	systemPrompt string,
	inputPrompt string,
	timeoutMinutes int,
) *ExecutionPlan

GenerateExecutionPlan creates a plan capturing agent intent before execution.

type FailureReason

type FailureReason string

FailureReason classifies why a run ended in a non-success state.

const (
	FailureNone                 FailureReason = ""
	FailureCostExceeded         FailureReason = "cost_exceeded"
	FailureToolTimeout          FailureReason = "tool_timeout"
	FailureToolFailure          FailureReason = "tool_failure"
	FailureLLMError             FailureReason = "llm_error"
	FailureLLMTimeout           FailureReason = "llm_timeout"
	FailureContainmentDeny      FailureReason = "containment_deny"
	FailureCircuitBreaker       FailureReason = "circuit_breaker"
	FailureOperatorKill         FailureReason = "operator_kill"
	FailureContextTimeout       FailureReason = "context_timeout"
	FailurePolicyDeny           FailureReason = "policy_deny"
	FailurePIIBlock             FailureReason = "pii_block"
	FailureToolEscalation       FailureReason = "tool_escalation"
	FailureHookDeny             FailureReason = "hook_deny"
	FailureInternalError        FailureReason = "internal_error"
	FailureMaxStepsExceeded     FailureReason = "max_steps_exceeded"
	FailureMaxToolCallsExceeded FailureReason = "max_tool_calls_exceeded"
)

type Hook

type Hook interface {
	Point() HookPoint
	Execute(ctx context.Context, data *HookData) (*HookResult, error)
}

Hook is the interface for all pipeline hooks.

type HookConfig

type HookConfig struct {
	Type string `yaml:"type"` // "webhook"
	URL  string `yaml:"url"`
	On   string `yaml:"on"` // "fired" | "allowed" | "denied" | "all"
}

HookConfig from .talon.yaml.

type HookData

type HookData struct {
	TenantID      string          `json:"tenant_id"`
	AgentID       string          `json:"agent_id"`
	CorrelationID string          `json:"correlation_id"`
	Stage         HookPoint       `json:"stage"`
	Payload       json.RawMessage `json:"payload"`
}

HookData provides context to hook implementations.

type HookPoint

type HookPoint string

HookPoint identifies where in the pipeline a hook fires.

const (
	HookPrePolicy      HookPoint = "pre_policy"
	HookPostPolicy     HookPoint = "post_policy"
	HookPrePlanReview  HookPoint = "pre_plan_review"
	HookPostPlanReview HookPoint = "post_plan_review"
	HookPreLLM         HookPoint = "pre_llm"
	HookPostLLM        HookPoint = "post_llm"
	HookPreTool        HookPoint = "pre_tool"
	HookPostTool       HookPoint = "post_tool"
	HookPreMemory      HookPoint = "pre_memory_write"
	HookPostEvidence   HookPoint = "post_evidence"
)

type HookRegistry

type HookRegistry struct {
	// contains filtered or unexported fields
}

HookRegistry manages registered hooks for each pipeline stage.

func LoadHooksFromConfig

func LoadHooksFromConfig(hooksConfig map[string][]HookConfig) *HookRegistry

LoadHooksFromConfig creates hooks from .talon.yaml hook configuration.

func NewHookRegistry

func NewHookRegistry() *HookRegistry

NewHookRegistry creates an empty hook registry.

func (*HookRegistry) Execute

func (r *HookRegistry) Execute(ctx context.Context, point HookPoint, data *HookData) (*HookResult, error)

Execute runs all hooks for a given pipeline point. Hook failures do not abort the pipeline by default.

func (*HookRegistry) Register

func (r *HookRegistry) Register(hook Hook)

Register adds a hook at the specified pipeline point.

type HookResult

type HookResult struct {
	Continue bool            `json:"continue"`
	Modified json.RawMessage `json:"modified,omitempty"`
}

HookResult controls pipeline flow after hook execution.

type IdempotencyKey

type IdempotencyKey struct {
	AgentID       string
	CorrelationID string
	ToolName      string
	ArgumentHash  string // sha256 of canonical JSON args
}

IdempotencyKey identifies a unique tool call by its deterministic inputs.

func DeriveIdempotencyKey

func DeriveIdempotencyKey(agentID, correlationID, toolName string, args json.RawMessage) IdempotencyKey

DeriveIdempotencyKey builds a deterministic key from tool call inputs. The argument hash is SHA-256 of the canonical (sorted-key) JSON representation.

func (IdempotencyKey) CompositeKey

func (k IdempotencyKey) CompositeKey() string

CompositeKey returns the dedup lookup key: "agent_id:correlation_id:tool_name:argument_hash".

type IdempotencyResult

type IdempotencyResult struct {
	Found  bool
	Status string // "pending" or "completed"
	Result []byte // cached result (only when Status == "completed")
}

IdempotencyResult is the outcome of checking the idempotency store.

type IdempotencyStore

type IdempotencyStore struct {
	// contains filtered or unexported fields
}

IdempotencyStore checks whether a tool call has already completed successfully in this or a prior run, using a deterministic key derived from (agent_id, correlation_id, tool_name, argument_hash). Backed by SQLite via the same *sql.DB as the evidence store.

func NewIdempotencyStore

func NewIdempotencyStore(db *sql.DB) (*IdempotencyStore, error)

NewIdempotencyStore creates the idempotency store and ensures the schema exists.

func (*IdempotencyStore) Check

Check looks up a tool call by its idempotency key. If maxAge > 0 and the stored row is completed, the row is treated as not found when completed_at is older than maxAge.

func (*IdempotencyStore) ClaimPending

func (s *IdempotencyStore) ClaimPending(ctx context.Context, key IdempotencyKey, maxAge time.Duration) (claimed bool, err error)

ClaimPending atomically claims the slot for this key so only one caller may execute the tool. When maxAge > 0, it first tries to transition an expired completed row (completed_at older than maxAge) to pending; otherwise it inserts a new pending row. Returns true if this caller claimed the slot, false if another request already has it (pending or completed within TTL). Used to prevent duplicate execution after TTL expiry.

func (*IdempotencyStore) RecordCompleted

func (s *IdempotencyStore) RecordCompleted(ctx context.Context, key IdempotencyKey, result []byte) error

RecordCompleted updates a pending entry to "completed" with the tool result.

func (*IdempotencyStore) RecordPending

func (s *IdempotencyStore) RecordPending(ctx context.Context, key IdempotencyKey) error

RecordPending inserts a "pending" entry before tool execution begins. It does not transition an existing row (e.g. TTL-expired completed). Use ClaimPending when a TTL may have expired.

type IntentClassDefinition

type IntentClassDefinition struct {
	Class       string   `json:"class"`
	DefaultRisk string   `json:"default_risk"`
	Description string   `json:"description"`
	Examples    []string `json:"examples"`
}

IntentClassDefinition documents a supported operation class and example tool names.

func IntentClassCatalog

func IntentClassCatalog() []IntentClassDefinition

IntentClassCatalog returns the supported intent classes for CLI/help output.

type IntentClassification

type IntentClassification struct {
	ToolName       string `json:"tool_name"`
	OperationClass string `json:"operation_class"`
	RiskLevel      string `json:"risk_level"`
	IsBulk         bool   `json:"is_bulk"`
	RequiresReview bool   `json:"requires_review"`
	Reason         string `json:"reason"`
}

IntentClassification describes the inferred risk posture for a tool invocation.

func ClassifyToolIntent

func ClassifyToolIntent(toolName string, paramsJSON []byte, planReviewCfg *PlanReviewConfig) *IntentClassification

ClassifyToolIntent classifies a tool call into operation class and review posture.

type OverrideStore

type OverrideStore struct {
	// contains filtered or unexported fields
}

OverrideStore provides in-memory runtime overrides per tenant. Thread-safe. Checked at Run() entry for lockdown and in buildLLMTools for tool disabling.

func NewOverrideStore

func NewOverrideStore() *OverrideStore

NewOverrideStore creates an empty override store.

func (*OverrideStore) ClearOverride

func (os *OverrideStore) ClearOverride(tenantID string)

ClearOverride removes all overrides for a tenant.

func (*OverrideStore) DisableTools

func (os *OverrideStore) DisableTools(tenantID string, tools []string, reason string)

DisableTools adds tools to the disabled list for a tenant.

func (*OverrideStore) DisabledToolsFor

func (os *OverrideStore) DisabledToolsFor(tenantID string) []string

DisabledToolsFor returns the list of disabled tools for a tenant.

func (*OverrideStore) EnableTools

func (os *OverrideStore) EnableTools(tenantID string, tools []string)

EnableTools removes tools from the disabled list for a tenant.

func (*OverrideStore) Get

func (os *OverrideStore) Get(tenantID string) *TenantOverride

Get returns the current override for a tenant, or nil if none.

func (*OverrideStore) IsLocked

func (os *OverrideStore) IsLocked(tenantID string) bool

IsLocked returns true if the tenant is in lockdown mode.

func (*OverrideStore) ListAll

func (os *OverrideStore) ListAll() map[string]TenantOverride

ListAll returns all tenant overrides.

func (*OverrideStore) SetLockdown

func (os *OverrideStore) SetLockdown(tenantID string, locked bool, by string)

SetLockdown enables or disables tenant lockdown.

func (*OverrideStore) SetPolicyOverride

func (os *OverrideStore) SetPolicyOverride(tenantID string, maxCostPerRun *float64, maxToolCalls *int)

SetPolicyOverride sets a tighter cost or tool call cap for a tenant. Only non-nil fields are updated; omit a field to leave it unchanged.

type PlanReviewConfig

type PlanReviewConfig struct {
	RequireForTools bool            `yaml:"require_for_tools"`
	RequireForTier  string          `yaml:"require_for_tier"`
	CostThreshold   float64         `yaml:"cost_threshold"`
	TimeoutMinutes  int             `yaml:"timeout_minutes"`
	NotifyWebhook   string          `yaml:"notify_webhook"`
	VolumeThreshold int             `yaml:"volume_threshold,omitempty"`
	Mode            string          `yaml:"mode,omitempty"` // sequential | any
	ApprovalChain   []ApprovalLevel `yaml:"approval_chain,omitempty"`
}

PlanReviewConfig from .talon.yaml.

type PlanReviewStore

type PlanReviewStore struct {
	// contains filtered or unexported fields
}

PlanReviewStore persists execution plans for human review.

func NewPlanReviewStore

func NewPlanReviewStore(db *sql.DB) (*PlanReviewStore, error)

NewPlanReviewStore creates the plan review store with SQLite backend.

func (*PlanReviewStore) Approve

func (s *PlanReviewStore) Approve(ctx context.Context, planID, tenantID, reviewedBy string) error

Approve marks a plan as approved by a reviewer, scoped to tenantID.

func (*PlanReviewStore) Get

func (s *PlanReviewStore) Get(ctx context.Context, planID, tenantID string) (*ExecutionPlan, error)

Get returns a single plan by ID, scoped to tenantID. Returns ErrPlanNotFound if the plan does not exist or belongs to another tenant.

func (*PlanReviewStore) GetApprovedUndispatched

func (s *PlanReviewStore) GetApprovedUndispatched(ctx context.Context, tenantID string) ([]*ExecutionPlan, error)

GetApprovedUndispatched returns approved plans that have not been auto-dispatched yet. If tenantID is empty, all tenants are included.

func (*PlanReviewStore) GetPending

func (s *PlanReviewStore) GetPending(ctx context.Context, tenantID string) ([]*ExecutionPlan, error)

GetPending returns all plans awaiting review, optionally filtered by tenant. Uses a bound time parameter (time.Now()) so the comparison matches go-sqlite3's serialization of timeout_at; datetime('now') would differ in format and break in non-UTC.

func (*PlanReviewStore) ListReviewed

func (s *PlanReviewStore) ListReviewed(ctx context.Context, tenantID string, limit int) ([]ReviewHistoryEntry, error)

ListReviewed returns recently reviewed plans (approved/rejected/modified) for the dashboard review history.

func (*PlanReviewStore) MarkDispatched

func (s *PlanReviewStore) MarkDispatched(ctx context.Context, planID, tenantID, dispatchErr string) error

MarkDispatched marks an approved plan as dispatched so it won't be executed again. dispatchErr is persisted for diagnostics (empty string means success).

func (*PlanReviewStore) Modify

func (s *PlanReviewStore) Modify(ctx context.Context, planID, tenantID, reviewedBy string, annotations []Annotation) error

Modify marks a plan as approved-with-modifications and stores reviewer annotations, scoped to tenantID.

func (*PlanReviewStore) Reject

func (s *PlanReviewStore) Reject(ctx context.Context, planID, tenantID, reviewedBy, reason string) error

Reject marks a plan as rejected with a reason, scoped to tenantID.

func (*PlanReviewStore) Save

func (s *PlanReviewStore) Save(ctx context.Context, plan *ExecutionPlan) error

Save persists a new execution plan.

func (*PlanReviewStore) Stats

func (s *PlanReviewStore) Stats(ctx context.Context, tenantID string) (PlanStats, error)

Stats returns aggregate plan lifecycle counters, optionally scoped by tenant.

func (*PlanReviewStore) UpdateDispatchResult

func (s *PlanReviewStore) UpdateDispatchResult(ctx context.Context, planID, tenantID, dispatchErr string) error

UpdateDispatchResult updates the dispatch_error for an already-claimed plan. Used after MarkDispatched("dispatching") to record the final dispatch outcome.

type PlanStats

type PlanStats struct {
	Pending          int `json:"pending"`
	Approved         int `json:"approved"`
	Rejected         int `json:"rejected"`
	Modified         int `json:"modified"`
	Dispatched       int `json:"dispatched"`
	DispatchFailures int `json:"dispatch_failures"`
}

PlanStats aggregates plan lifecycle counters for dashboards/CLI summaries.

type PlanStatus

type PlanStatus string

PlanStatus represents the review state of an execution plan.

const (
	PlanPending      PlanStatus = "pending"
	PlanApproved     PlanStatus = "approved"
	PlanRejected     PlanStatus = "rejected"
	PlanModified     PlanStatus = "modified"
	PlanAutoApproved PlanStatus = "auto_approved"
	PlanTimedOut     PlanStatus = "timed_out"
)

type ReviewHistoryEntry

type ReviewHistoryEntry struct {
	PlanID     string     `json:"plan_id"`
	TenantID   string     `json:"tenant_id"`
	AgentID    string     `json:"agent_id"`
	Status     PlanStatus `json:"status"`
	ReviewedBy string     `json:"reviewed_by"`
	ReviewedAt *time.Time `json:"reviewed_at"`
}

ReviewHistoryEntry is a minimal record for the dashboard review history.

type RunRegistry

type RunRegistry struct {
	// contains filtered or unexported fields
}

RunRegistry tracks all in-flight runs with observable state. Thread-safe. Replaces ActiveRunTracker for state management while delegating count/cancel responsibilities.

func NewRunRegistry

func NewRunRegistry() *RunRegistry

NewRunRegistry creates an empty registry.

func (*RunRegistry) ActiveRunCount

func (rr *RunRegistry) ActiveRunCount() int

ActiveRunCount returns total active (non-terminal) runs across all tenants.

func (*RunRegistry) Count

func (rr *RunRegistry) Count(tenantID string) int

Count returns the number of active (non-terminal) runs for a tenant.

func (*RunRegistry) Deregister

func (rr *RunRegistry) Deregister(correlationID string)

Deregister removes a run from the registry. Called after terminal evidence is recorded and the run is fully cleaned up.

func (*RunRegistry) Get

func (rr *RunRegistry) Get(correlationID string) *RunState

Get returns a snapshot of a run's state. Returns nil if not found.

func (*RunRegistry) IsPaused

func (rr *RunRegistry) IsPaused(correlationID string) bool

IsPaused returns true if the run is currently in PAUSED state.

func (*RunRegistry) IsPausedWithCh

func (rr *RunRegistry) IsPausedWithCh(correlationID string) (paused bool, ch <-chan struct{})

IsPausedWithCh atomically checks if a run is paused and returns the resume channel under a single lock acquisition. This avoids a race where Resume() closes the old channel and replaces it between separate IsPaused() and channel-read calls — the new channel would never be closed, hanging the agent loop until context cancellation.

func (*RunRegistry) Kill

func (rr *RunRegistry) Kill(correlationID string) bool

Kill cancels a run and transitions it to TERMINATED. Returns true if found.

func (*RunRegistry) KillAllForTenant

func (rr *RunRegistry) KillAllForTenant(tenantID string) int

KillAllForTenant cancels all non-terminal runs for a tenant. Returns count.

func (*RunRegistry) List

func (rr *RunRegistry) List() []RunState

List returns snapshots of all active (non-terminal) runs.

func (*RunRegistry) ListAll

func (rr *RunRegistry) ListAll() []RunState

ListAll returns snapshots of all runs including terminal ones still in the registry.

func (*RunRegistry) Pause

func (rr *RunRegistry) Pause(correlationID string) bool

Pause sends a pause signal to a running run. The agentic loop checks IsPaused between iterations and blocks on WaitResume until resumed or killed.

func (*RunRegistry) Register

func (rr *RunRegistry) Register(correlationID, tenantID, agentID, sessionID string, cancel context.CancelFunc)

Register adds a new run in QUEUED state.

func (*RunRegistry) Resume

func (rr *RunRegistry) Resume(correlationID string) bool

Resume unpauses a paused run. Closes the pauseCh so WaitResume unblocks.

func (*RunRegistry) SetStatus

func (rr *RunRegistry) SetStatus(correlationID string, status RunStatus, reason FailureReason)

SetStatus transitions a run to a new status. No-op if the run is not found or already in a terminal state.

func (*RunRegistry) UpdateMetrics

func (rr *RunRegistry) UpdateMetrics(correlationID string, steps, toolCalls int, cost float64)

UpdateMetrics atomically updates step/cost/tool counters for a run.

type RunRequest

type RunRequest struct {
	TenantID         string
	AgentName        string
	Prompt           string
	AgentReasoning   string // Optional caller-provided reasoning (e.g. X-Talon-Reasoning)
	AgentVerified    bool   // True when per-agent signature has been verified by API layer
	SessionID        string // Optional session to join; empty means create a new one
	Attachments      []Attachment
	InvocationType   string // "manual", "scheduled", "webhook:name"
	DryRun           bool
	PolicyPath       string           // explicit path to .talon.yaml
	ToolInvocations  []ToolInvocation // optional; when set, each is policy-checked and executed, and names recorded in evidence
	SkipMemory       bool             // if true, do not write memory observation for this run (e.g. --no-memory)
	SovereigntyMode  string           // optional: eu_strict | eu_preferred | global; when set, router uses OPA routing and records RouteDecision for evidence
	BypassPlanReview bool             // internal: when true, skip plan-review gate (used by approved-plan dispatcher)
}

RunRequest is the input for a single agent invocation.

type RunResponse

type RunResponse struct {
	Response     string
	EvidenceID   string
	SessionID    string
	Cost         float64
	DurationMS   int64
	PolicyAllow  bool
	DenyReason   string
	PlanPending  string   // set when execution is gated for human review (EU AI Act Art. 14)
	ModelUsed    string   // LLM model used for generation
	ToolsCalled  []string // MCP tools invoked
	InputTokens  int      // prompt tokens (for OpenAI-compatible API responses)
	OutputTokens int      // completion tokens (for OpenAI-compatible API responses)
	// Dry-run / CLI feedback: PII and injection scan results (set even when DryRun is true).
	PIIDetected                  []string // entity names detected in input
	InputTier                    int      // classification tier of input (0–2)
	AttachmentInjectionsDetected int      // number of injection patterns found in attachments
	AttachmentBlocked            bool     // true if any attachment was blocked due to injection
}

RunResponse is the output of an agent invocation.

type RunState

type RunState struct {
	CorrelationID string        `json:"correlation_id"`
	TenantID      string        `json:"tenant_id"`
	AgentID       string        `json:"agent_id"`
	SessionID     string        `json:"session_id,omitempty"`
	Status        RunStatus     `json:"status"`
	FailureReason FailureReason `json:"failure_reason,omitempty"`
	StartedAt     time.Time     `json:"started_at"`
	UpdatedAt     time.Time     `json:"updated_at"`
	StepCount     int           `json:"step_count"`
	CostAccrued   float64       `json:"cost_accrued"`
	ToolCalls     int           `json:"tool_calls"`
	DurationMS    int64         `json:"duration_ms"`
	// contains filtered or unexported fields
}

RunState captures the observable state of a single agent run.

type RunStatus

type RunStatus string

RunStatus is the lifecycle state of an agent run.

const (
	RunStatusQueued           RunStatus = "queued"
	RunStatusRunning          RunStatus = "running"
	RunStatusPaused           RunStatus = "paused"
	RunStatusAwaitingApproval RunStatus = "awaiting_approval"
	RunStatusCompleted        RunStatus = "completed"
	RunStatusFailed           RunStatus = "failed"
	RunStatusTerminated       RunStatus = "terminated"
	RunStatusBlocked          RunStatus = "blocked"
	RunStatusDenied           RunStatus = "denied"
)

func (RunStatus) IsTerminal

func (s RunStatus) IsTerminal() bool

IsTerminal returns true for states that represent a finished run.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner executes the full agent orchestration pipeline.

func NewRunner

func NewRunner(cfg RunnerConfig) *Runner

NewRunner creates an agent runner with the given dependencies.

func (*Runner) OverrideStoreRef

func (r *Runner) OverrideStoreRef() *OverrideStore

OverrideStoreRef returns the runner's OverrideStore (may be nil).

func (*Runner) Run

func (r *Runner) Run(ctx context.Context, req *RunRequest) (*RunResponse, error)

Run executes the complete agent pipeline:

  1. Load policy
  2. Classify input (PII detection)
  3. Process attachments (extract + scan + sandbox)
  4. Evaluate policy (OPA)
  5. Check secrets access
  6. Route LLM (tier-based)
  7. Call LLM provider
  8. Classify output
  9. Generate evidence

func (*Runner) RunFromTrigger

func (r *Runner) RunFromTrigger(ctx context.Context, agentName, prompt, invocationType string) error

RunFromTrigger implements the trigger.AgentRunner interface for cron/webhook execution. It uses the runner's default policy path so cron/webhook runs load the same .talon.yaml as the process (e.g. agent.talon.yaml), instead of deriving agentName+".talon.yaml".

func (*Runner) RunRegistryRef

func (r *Runner) RunRegistryRef() *RunRegistry

RunRegistryRef returns the runner's RunRegistry (may be nil).

func (*Runner) ToolApprovalStoreRef

func (r *Runner) ToolApprovalStoreRef() *ToolApprovalStore

ToolApprovalStoreRef returns the runner's ToolApprovalStore (may be nil).

type RunnerCacheConfig

type RunnerCacheConfig struct {
	Enabled             bool
	DefaultTTL          int
	SimilarityThreshold float64
	MaxEntriesPerTenant int
}

RunnerCacheConfig is a subset of config.CacheConfig for the runner (avoids circular import).

type RunnerConfig

type RunnerConfig struct {
	PolicyDir         string // base directory for policy path resolution
	DefaultPolicyPath string // path to default .talon.yaml (e.g. agent.talon.yaml); used by RunFromTrigger when request has no PolicyPath
	Classifier        *classifier.Scanner
	AttScanner        *attachment.Scanner
	Extractor         *attachment.Extractor
	Router            *llm.Router
	Secrets           *secrets.SecretStore
	Evidence          *evidence.Store
	PlanReview        *PlanReviewStore
	ToolRegistry      *tools.ToolRegistry
	ActiveRunTracker  *ActiveRunTracker     // optional; when set, rate-limit policy receives concurrent_executions
	RunRegistry       *RunRegistry          // optional; when set, tracks run lifecycle states for observability and control
	Overrides         *OverrideStore        // optional; when set, runtime overrides (lockdown, tool disable, cost caps) are checked
	ToolApprovals     *ToolApprovalStore    // optional; when set, tools requiring approval pause for human decision
	CircuitBreaker    *CircuitBreaker       // optional; when set, suspends agents after repeated policy denials
	ToolFailures      *ToolFailureTracker   // optional; tracks tool execution failures separately from circuit breaker
	Hooks             *HookRegistry         // optional; nil = no hooks
	Memory            *memory.Store         // optional; nil = memory disabled
	Pricing           *pricing.PricingTable // optional; when set, evidence gets pre_request_estimate and post_request_cost
	// Semantic cache (all optional; when nil or CacheConfig.Enabled false, cache is skipped)
	CacheStore    *cache.Store
	CacheEmbedder *cache.BM25
	CacheScrubber *cache.PIIScrubber
	CachePolicy   *cache.Evaluator
	CacheConfig   *RunnerCacheConfig
	SessionStore  *talonsession.Store
	PromptStore   *talonprompt.Store
	Idempotency   *IdempotencyStore // optional; deduplicates tool calls by (agent, correlation, tool, args_hash)
}

RunnerConfig holds the dependencies for constructing a Runner.

type TenantOverride

type TenantOverride struct {
	Lockdown      bool      `json:"lockdown"`                 // When true, all new runs are rejected
	LockdownAt    time.Time `json:"lockdown_at,omitempty"`    // When lockdown was activated
	LockdownBy    string    `json:"lockdown_by,omitempty"`    // Admin identifier who set lockdown
	DisabledTools []string  `json:"disabled_tools,omitempty"` // Tools blocked by override
	DisableReason string    `json:"disable_reason,omitempty"` // Incident ID or explanation

	MaxCostPerRun *float64 `json:"max_cost_per_run,omitempty"` // Tighter cost cap (overrides YAML)
	MaxToolCalls  *int     `json:"max_tool_calls,omitempty"`   // Tighter tool call cap
}

TenantOverride holds runtime overrides for a single tenant. Overrides take precedence over .talon.yaml policy and are applied at Run() entry and tool dispatch.

type ToolApprovalRequest

type ToolApprovalRequest struct {
	ID            string             `json:"id"`
	CorrelationID string             `json:"correlation_id"`
	TenantID      string             `json:"tenant_id"`
	AgentID       string             `json:"agent_id"`
	ToolName      string             `json:"tool_name"`
	ToolCallID    string             `json:"tool_call_id"`
	Arguments     map[string]any     `json:"arguments"`
	Status        ToolApprovalStatus `json:"status"`
	CreatedAt     time.Time          `json:"created_at"`
	ResolvedAt    time.Time          `json:"resolved_at,omitempty"`
	ResolvedBy    string             `json:"resolved_by,omitempty"`
	Reason        string             `json:"reason,omitempty"`
	// contains filtered or unexported fields
}

ToolApprovalRequest represents a pending tool execution awaiting human approval.

type ToolApprovalStatus

type ToolApprovalStatus string

ToolApprovalStatus tracks the state of a tool approval request.

const (
	ToolApprovalPending   ToolApprovalStatus = "pending"
	ToolApprovalApproved  ToolApprovalStatus = "approved"
	ToolApprovalDenied    ToolApprovalStatus = "denied"
	ToolApprovalTimeout   ToolApprovalStatus = "timeout"
	ToolApprovalCancelled ToolApprovalStatus = "cancelled"
)

type ToolApprovalStore

type ToolApprovalStore struct {
	// contains filtered or unexported fields
}

ToolApprovalStore tracks pending tool approval requests. The agentic loop creates a request and blocks on the channel until approval/denial/timeout.

func NewToolApprovalStore

func NewToolApprovalStore(timeout time.Duration) *ToolApprovalStore

NewToolApprovalStore creates a store with the given default approval timeout.

func (*ToolApprovalStore) Approve

func (s *ToolApprovalStore) Approve(reqID, approvedBy, reason string) bool

Approve approves a pending tool execution. Returns true if the request was found and pending.

func (*ToolApprovalStore) Cleanup

func (s *ToolApprovalStore) Cleanup(olderThan time.Duration) int

Cleanup removes resolved requests older than the given duration.

func (*ToolApprovalStore) Deny

func (s *ToolApprovalStore) Deny(reqID, deniedBy, reason string) bool

Deny rejects a pending tool execution. Returns true if the request was found and pending.

func (*ToolApprovalStore) Get

Get returns a specific approval request.

func (*ToolApprovalStore) ListPending

func (s *ToolApprovalStore) ListPending() []ToolApprovalRequest

ListPending returns all pending approval requests.

func (*ToolApprovalStore) RequestApproval

func (s *ToolApprovalStore) RequestApproval(ctx context.Context, correlationID, tenantID, agentID, toolName, toolCallID string, args map[string]any) ToolApprovalStatus

RequestApproval creates a pending approval and blocks until approved, denied, or timed out. Returns the final status. The caller should check the status to decide whether to execute.

type ToolCallResult

type ToolCallResult struct {
	Content        string
	Executed       bool
	ToolName       string
	PIIFindings    []ToolPIIFinding
	ExecutionError string // non-empty when tool.Execute() returned an error (distinct from policy denial)
}

ToolCallResult extends the basic return values with PII findings for evidence.

type ToolFailureTracker

type ToolFailureTracker struct {
	// contains filtered or unexported fields
}

ToolFailureTracker counts tool execution failures per agent, separate from the circuit breaker (which only tracks policy denials). When a tool execution fails (allowed by policy but the tool returned an error), the failure is recorded here for operator alerting without suspending the agent.

func NewToolFailureTracker

func NewToolFailureTracker(threshold int, window time.Duration) *ToolFailureTracker

NewToolFailureTracker creates a tracker. When an agent exceeds threshold failures within window, a warning is logged for operator alerting. threshold <= 0 defaults to 10; window <= 0 defaults to 5 minutes.

func (*ToolFailureTracker) FailureCount

func (t *ToolFailureTracker) FailureCount(tenantID, agentID string) int

FailureCount returns the current failure count within the window for an agent.

func (*ToolFailureTracker) RecordToolFailure

func (t *ToolFailureTracker) RecordToolFailure(tenantID, agentID, toolName, errMsg string) bool

RecordToolFailure records a tool execution failure for the agent. If the threshold is exceeded within the window, an operator alert is logged. Returns true if the alert threshold was just crossed.

type ToolInvocation

type ToolInvocation struct {
	Name   string          `json:"name"`
	Params json.RawMessage `json:"params"`
}

ToolInvocation represents a single tool call (e.g. from MCP or a future agent loop).

type ToolPIIFinding

type ToolPIIFinding struct {
	Field     string   `json:"field"`
	Action    string   `json:"action"`
	PIITypes  []string `json:"pii_types"`
	PIICount  int      `json:"pii_count"`
	Direction string   `json:"direction"` // "argument" or "result"
}

ToolPIIFinding records a PII detection in a tool argument or result.

type WebhookHook

type WebhookHook struct {
	// contains filtered or unexported fields
}

WebhookHook sends HTTP POST to a configured URL.

func NewWebhookHook

func NewWebhookHook(point HookPoint, config HookConfig) *WebhookHook

NewWebhookHook creates a webhook hook from config.

func (*WebhookHook) Execute

func (h *WebhookHook) Execute(ctx context.Context, data *HookData) (*HookResult, error)

Execute sends the hook data as JSON POST to the configured URL. It only delivers when the configured On filter matches the payload outcome (e.g. on: "denied" only for denials).

func (*WebhookHook) Point

func (h *WebhookHook) Point() HookPoint

Point returns the pipeline point this hook is registered at.

Directories

Path Synopsis
Package graphadapter defines the framework-agnostic event contract for governing external agent runtimes (LangGraph, LangChain, OpenAI SDK, etc.).
Package graphadapter defines the framework-agnostic event contract for governing external agent runtimes (LangGraph, LangChain, OpenAI SDK, etc.).
Package tools provides a thread-safe registry for MCP-compatible tools that agents can invoke during execution.
Package tools provides a thread-safe registry for MCP-compatible tools that agents can invoke during execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL