Documentation
¶
Overview ¶
Package agent — run_state.go provides structured run lifecycle tracking.
RunRegistry replaces the implicit "in-progress or gone" model with an observable state machine: QUEUED → RUNNING → COMPLETED|FAILED|TERMINATED|BLOCKED|DENIED. Operators can list active runs, inspect per-run cost/step metrics, and issue kill signals via the admin API.
Package agent implements the core agent orchestration pipeline.
The pipeline executes in a fixed sequence: load policy → classify input → scan attachments → evaluate OPA policy → resolve secrets → route LLM → call provider → classify output → generate evidence. Every invocation produces a signed evidence record, even on failures or policy denials.
Extension points:
- Hooks: register pre/post callbacks at any pipeline stage (see HookRegistry).
- Plan review: gate LLM calls behind human approval (see PlanReviewStore).
- Tools: register MCP-compatible tools via agent/tools.ToolRegistry.
Index ¶
- Variables
- func RequiresReview(humanOversight string, dataTier int, costEstimate float64, hasTools bool, ...) bool
- type ActiveRunTracker
- func (t *ActiveRunTracker) ActiveRunCount() int
- func (t *ActiveRunTracker) Count(tenantID string) int
- func (t *ActiveRunTracker) Decrement(tenantID string)
- func (t *ActiveRunTracker) Deregister(correlationID string)
- func (t *ActiveRunTracker) Increment(tenantID string)
- func (t *ActiveRunTracker) Kill(correlationID string) bool
- func (t *ActiveRunTracker) KillAllForTenant(tenantID string) int
- func (t *ActiveRunTracker) Register(tenantID, correlationID string, cancel context.CancelFunc)
- type Annotation
- type ApprovalLevel
- type Attachment
- type CircuitBreaker
- func (cb *CircuitBreaker) Check(tenantID, agentID string) error
- func (cb *CircuitBreaker) RecordPolicyDenial(tenantID, agentID string)
- func (cb *CircuitBreaker) RecordSuccess(tenantID, agentID string)
- func (cb *CircuitBreaker) Reset(tenantID, agentID string)
- func (cb *CircuitBreaker) State(tenantID, agentID string) CircuitState
- type CircuitState
- type ExecutionPlan
- type FailureReason
- type Hook
- type HookConfig
- type HookData
- type HookPoint
- type HookRegistry
- type HookResult
- type IdempotencyKey
- type IdempotencyResult
- type IdempotencyStore
- func (s *IdempotencyStore) Check(ctx context.Context, key IdempotencyKey, maxAge time.Duration) (IdempotencyResult, error)
- func (s *IdempotencyStore) ClaimPending(ctx context.Context, key IdempotencyKey, maxAge time.Duration) (claimed bool, err error)
- func (s *IdempotencyStore) RecordCompleted(ctx context.Context, key IdempotencyKey, result []byte) error
- func (s *IdempotencyStore) RecordPending(ctx context.Context, key IdempotencyKey) error
- type IntentClassDefinition
- type IntentClassification
- type OverrideStore
- func (os *OverrideStore) ClearOverride(tenantID string)
- func (os *OverrideStore) DisableTools(tenantID string, tools []string, reason string)
- func (os *OverrideStore) DisabledToolsFor(tenantID string) []string
- func (os *OverrideStore) EnableTools(tenantID string, tools []string)
- func (os *OverrideStore) Get(tenantID string) *TenantOverride
- func (os *OverrideStore) IsLocked(tenantID string) bool
- func (os *OverrideStore) ListAll() map[string]TenantOverride
- func (os *OverrideStore) SetLockdown(tenantID string, locked bool, by string)
- func (os *OverrideStore) SetPolicyOverride(tenantID string, maxCostPerRun *float64, maxToolCalls *int)
- type PlanReviewConfig
- type PlanReviewStore
- func (s *PlanReviewStore) Approve(ctx context.Context, planID, tenantID, reviewedBy string) error
- func (s *PlanReviewStore) Get(ctx context.Context, planID, tenantID string) (*ExecutionPlan, error)
- func (s *PlanReviewStore) GetApprovedUndispatched(ctx context.Context, tenantID string) ([]*ExecutionPlan, error)
- func (s *PlanReviewStore) GetPending(ctx context.Context, tenantID string) ([]*ExecutionPlan, error)
- func (s *PlanReviewStore) ListReviewed(ctx context.Context, tenantID string, limit int) ([]ReviewHistoryEntry, error)
- func (s *PlanReviewStore) MarkDispatched(ctx context.Context, planID, tenantID, dispatchErr string) error
- func (s *PlanReviewStore) Modify(ctx context.Context, planID, tenantID, reviewedBy string, ...) error
- func (s *PlanReviewStore) Reject(ctx context.Context, planID, tenantID, reviewedBy, reason string) error
- func (s *PlanReviewStore) Save(ctx context.Context, plan *ExecutionPlan) error
- func (s *PlanReviewStore) Stats(ctx context.Context, tenantID string) (PlanStats, error)
- func (s *PlanReviewStore) UpdateDispatchResult(ctx context.Context, planID, tenantID, dispatchErr string) error
- type PlanStats
- type PlanStatus
- type ReviewHistoryEntry
- type RunRegistry
- func (rr *RunRegistry) ActiveRunCount() int
- func (rr *RunRegistry) Count(tenantID string) int
- func (rr *RunRegistry) Deregister(correlationID string)
- func (rr *RunRegistry) Get(correlationID string) *RunState
- func (rr *RunRegistry) IsPaused(correlationID string) bool
- func (rr *RunRegistry) IsPausedWithCh(correlationID string) (paused bool, ch <-chan struct{})
- func (rr *RunRegistry) Kill(correlationID string) bool
- func (rr *RunRegistry) KillAllForTenant(tenantID string) int
- func (rr *RunRegistry) List() []RunState
- func (rr *RunRegistry) ListAll() []RunState
- func (rr *RunRegistry) Pause(correlationID string) bool
- func (rr *RunRegistry) Register(correlationID, tenantID, agentID, sessionID string, cancel context.CancelFunc)
- func (rr *RunRegistry) Resume(correlationID string) bool
- func (rr *RunRegistry) SetStatus(correlationID string, status RunStatus, reason FailureReason)
- func (rr *RunRegistry) UpdateMetrics(correlationID string, steps, toolCalls int, cost float64)
- type RunRequest
- type RunResponse
- type RunState
- type RunStatus
- type Runner
- func (r *Runner) OverrideStoreRef() *OverrideStore
- func (r *Runner) Run(ctx context.Context, req *RunRequest) (*RunResponse, error)
- func (r *Runner) RunFromTrigger(ctx context.Context, agentName, prompt, invocationType string) error
- func (r *Runner) RunRegistryRef() *RunRegistry
- func (r *Runner) ToolApprovalStoreRef() *ToolApprovalStore
- type RunnerCacheConfig
- type RunnerConfig
- type TenantOverride
- type ToolApprovalRequest
- type ToolApprovalStatus
- type ToolApprovalStore
- func (s *ToolApprovalStore) Approve(reqID, approvedBy, reason string) bool
- func (s *ToolApprovalStore) Cleanup(olderThan time.Duration) int
- func (s *ToolApprovalStore) Deny(reqID, deniedBy, reason string) bool
- func (s *ToolApprovalStore) Get(reqID string) *ToolApprovalRequest
- func (s *ToolApprovalStore) ListPending() []ToolApprovalRequest
- func (s *ToolApprovalStore) RequestApproval(ctx context.Context, ...) ToolApprovalStatus
- type ToolCallResult
- type ToolFailureTracker
- type ToolInvocation
- type ToolPIIFinding
- type WebhookHook
Constants ¶
This section is empty.
Variables ¶
var ( ErrPlanNotFound = errors.New("execution plan not found") ErrPlanNotPending = errors.New("plan is not in pending status") )
Functions ¶
func RequiresReview ¶
func RequiresReview(humanOversight string, dataTier int, costEstimate float64, hasTools bool, planConfig *PlanReviewConfig, planText ...string) bool
RequiresReview checks if the current request needs human review based on policy. RequiresReview determines if a plan requires human review. planText is optional; when provided, volume detection (Gap E) checks for destructive verbs near large numbers.
Types ¶
type ActiveRunTracker ¶
type ActiveRunTracker struct {
// contains filtered or unexported fields
}
ActiveRunTracker counts in-flight runs per tenant for rate-limit policy (concurrent_executions) and provides kill switch support via correlation-ID-keyed cancel functions. Safe for concurrent use. When nil, rate-limit policy input concurrent_executions is not set.
func NewActiveRunTracker ¶
func NewActiveRunTracker() *ActiveRunTracker
NewActiveRunTracker creates a new tracker.
func (*ActiveRunTracker) ActiveRunCount ¶
func (t *ActiveRunTracker) ActiveRunCount() int
ActiveRunCount returns the number of active runs being tracked with cancel functions.
func (*ActiveRunTracker) Count ¶
func (t *ActiveRunTracker) Count(tenantID string) int
Count returns the current in-flight run count for the tenant (including the run that just called Increment).
func (*ActiveRunTracker) Decrement ¶
func (t *ActiveRunTracker) Decrement(tenantID string)
Decrement removes one in-flight run for the tenant.
func (*ActiveRunTracker) Deregister ¶
func (t *ActiveRunTracker) Deregister(correlationID string)
Deregister removes a completed run from the tracker.
func (*ActiveRunTracker) Increment ¶
func (t *ActiveRunTracker) Increment(tenantID string)
Increment adds one in-flight run for the tenant.
func (*ActiveRunTracker) Kill ¶
func (t *ActiveRunTracker) Kill(correlationID string) bool
Kill cancels a running agent by correlation ID. Returns true if found.
func (*ActiveRunTracker) KillAllForTenant ¶
func (t *ActiveRunTracker) KillAllForTenant(tenantID string) int
KillAllForTenant cancels all running agents for a tenant. Returns count of killed runs.
func (*ActiveRunTracker) Register ¶
func (t *ActiveRunTracker) Register(tenantID, correlationID string, cancel context.CancelFunc)
Register stores a cancel function for a running agent, keyed by correlation ID.
type Annotation ¶
type Annotation struct {
ID string `json:"id"`
Type string `json:"type"` // "comment" | "delete" | "modify"
Content string `json:"content"`
Section string `json:"section,omitempty"`
CreatedBy string `json:"created_by"`
CreatedAt time.Time `json:"created_at"`
}
Annotation represents a reviewer's comment or modification on a plan.
type ApprovalLevel ¶
type Attachment ¶
Attachment is a file attached to a run request.
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker tracks policy denial counts per agent and opens the circuit when repeated denials exceed the threshold within a window. Only policy denials (not tool execution failures) feed the circuit breaker.
func NewCircuitBreaker ¶
func NewCircuitBreaker(threshold int, window time.Duration) *CircuitBreaker
NewCircuitBreaker creates a circuit breaker with the given threshold and window. threshold: number of denials in window to trip the circuit (default 5). window: sliding window duration (default 60s).
func (*CircuitBreaker) Check ¶
func (cb *CircuitBreaker) Check(tenantID, agentID string) error
Check returns nil if the agent is allowed to proceed, or an error if the circuit is open. In half-open state, allows one probe request.
func (*CircuitBreaker) RecordPolicyDenial ¶
func (cb *CircuitBreaker) RecordPolicyDenial(tenantID, agentID string)
RecordPolicyDenial records a policy denial for the agent. If the threshold is exceeded within the window, the circuit opens. In half-open state, a single denial (failed probe) reopens the circuit immediately without requiring threshold denials again.
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess(tenantID, agentID string)
RecordSuccess records a successful policy evaluation. If the circuit is half-open, this closes it (the probe succeeded).
func (*CircuitBreaker) Reset ¶
func (cb *CircuitBreaker) Reset(tenantID, agentID string)
Reset manually resets the circuit for an agent (operator override).
func (*CircuitBreaker) State ¶
func (cb *CircuitBreaker) State(tenantID, agentID string) CircuitState
State returns the current circuit state for an agent.
type CircuitState ¶
type CircuitState int
CircuitState represents the circuit breaker state.
const ( CircuitClosed CircuitState = iota // Normal: requests flow through CircuitOpen // Tripped: requests denied immediately CircuitHalfOpen // Probe: one request allowed to test recovery )
type ExecutionPlan ¶
type ExecutionPlan struct {
ID string `json:"id"`
CorrelationID string `json:"correlation_id"`
SessionID string `json:"session_id,omitempty"`
TenantID string `json:"tenant_id"`
AgentID string `json:"agent_id"`
Status PlanStatus `json:"status"`
SelectedModel string `json:"selected_model"`
DataTier int `json:"data_tier"`
ToolsAvailable []string `json:"tools_available"`
CostEstimate float64 `json:"cost_estimate"`
PolicyDecision string `json:"policy_decision"`
SystemPromptHash string `json:"system_prompt_hash"`
InputHash string `json:"input_hash"`
Prompt string `json:"prompt,omitempty"` // canonical prompt to execute after approval
PolicyPath string `json:"policy_path,omitempty"` // resolved policy path used when plan was created
ReviewedBy string `json:"reviewed_by,omitempty"`
ReviewedAt *time.Time `json:"reviewed_at,omitempty"`
ReviewReason string `json:"review_reason,omitempty"`
Annotations []Annotation `json:"annotations,omitempty"`
ProposedSteps []string `json:"proposed_steps,omitempty"` // LLM-proposed task steps for Art. 11/13 transparency
CreatedAt time.Time `json:"created_at"`
TimeoutAt time.Time `json:"timeout_at"`
}
ExecutionPlan captures agent intent before LLM invocation. Stored as first-class evidence for EU AI Act Art. 11 (Technical Documentation) and Art. 13 (Transparency) compliance.
func GenerateExecutionPlan ¶
func GenerateExecutionPlan( correlationID, tenantID, agentID, selectedModel string, dataTier int, toolsAvailable []string, costEstimate float64, policyDecision string, systemPrompt string, inputPrompt string, timeoutMinutes int, ) *ExecutionPlan
GenerateExecutionPlan creates a plan capturing agent intent before execution.
type FailureReason ¶
type FailureReason string
FailureReason classifies why a run ended in a non-success state.
const ( FailureNone FailureReason = "" FailureCostExceeded FailureReason = "cost_exceeded" FailureToolTimeout FailureReason = "tool_timeout" FailureToolFailure FailureReason = "tool_failure" FailureLLMError FailureReason = "llm_error" FailureLLMTimeout FailureReason = "llm_timeout" FailureContainmentDeny FailureReason = "containment_deny" FailureCircuitBreaker FailureReason = "circuit_breaker" FailureOperatorKill FailureReason = "operator_kill" FailureContextTimeout FailureReason = "context_timeout" FailurePolicyDeny FailureReason = "policy_deny" FailurePIIBlock FailureReason = "pii_block" FailureToolEscalation FailureReason = "tool_escalation" FailureHookDeny FailureReason = "hook_deny" FailureInternalError FailureReason = "internal_error" FailureMaxStepsExceeded FailureReason = "max_steps_exceeded" FailureMaxToolCallsExceeded FailureReason = "max_tool_calls_exceeded" )
type Hook ¶
type Hook interface {
Point() HookPoint
Execute(ctx context.Context, data *HookData) (*HookResult, error)
}
Hook is the interface for all pipeline hooks.
type HookConfig ¶
type HookConfig struct {
Type string `yaml:"type"` // "webhook"
URL string `yaml:"url"`
On string `yaml:"on"` // "fired" | "allowed" | "denied" | "all"
}
HookConfig from .talon.yaml.
type HookData ¶
type HookData struct {
TenantID string `json:"tenant_id"`
AgentID string `json:"agent_id"`
CorrelationID string `json:"correlation_id"`
Stage HookPoint `json:"stage"`
Payload json.RawMessage `json:"payload"`
}
HookData provides context to hook implementations.
type HookPoint ¶
type HookPoint string
HookPoint identifies where in the pipeline a hook fires.
const ( HookPrePolicy HookPoint = "pre_policy" HookPostPolicy HookPoint = "post_policy" HookPrePlanReview HookPoint = "pre_plan_review" HookPostPlanReview HookPoint = "post_plan_review" HookPreLLM HookPoint = "pre_llm" HookPostLLM HookPoint = "post_llm" HookPreTool HookPoint = "pre_tool" HookPostTool HookPoint = "post_tool" HookPreMemory HookPoint = "pre_memory_write" HookPostEvidence HookPoint = "post_evidence" )
type HookRegistry ¶
type HookRegistry struct {
// contains filtered or unexported fields
}
HookRegistry manages registered hooks for each pipeline stage.
func LoadHooksFromConfig ¶
func LoadHooksFromConfig(hooksConfig map[string][]HookConfig) *HookRegistry
LoadHooksFromConfig creates hooks from .talon.yaml hook configuration.
func NewHookRegistry ¶
func NewHookRegistry() *HookRegistry
NewHookRegistry creates an empty hook registry.
func (*HookRegistry) Execute ¶
func (r *HookRegistry) Execute(ctx context.Context, point HookPoint, data *HookData) (*HookResult, error)
Execute runs all hooks for a given pipeline point. Hook failures do not abort the pipeline by default.
func (*HookRegistry) Register ¶
func (r *HookRegistry) Register(hook Hook)
Register adds a hook at the specified pipeline point.
type HookResult ¶
type HookResult struct {
Continue bool `json:"continue"`
Modified json.RawMessage `json:"modified,omitempty"`
}
HookResult controls pipeline flow after hook execution.
type IdempotencyKey ¶
type IdempotencyKey struct {
AgentID string
CorrelationID string
ToolName string
ArgumentHash string // sha256 of canonical JSON args
}
IdempotencyKey identifies a unique tool call by its deterministic inputs.
func DeriveIdempotencyKey ¶
func DeriveIdempotencyKey(agentID, correlationID, toolName string, args json.RawMessage) IdempotencyKey
DeriveIdempotencyKey builds a deterministic key from tool call inputs. The argument hash is SHA-256 of the canonical (sorted-key) JSON representation.
func (IdempotencyKey) CompositeKey ¶
func (k IdempotencyKey) CompositeKey() string
CompositeKey returns the dedup lookup key: "agent_id:correlation_id:tool_name:argument_hash".
type IdempotencyResult ¶
type IdempotencyResult struct {
Found bool
Status string // "pending" or "completed"
Result []byte // cached result (only when Status == "completed")
}
IdempotencyResult is the outcome of checking the idempotency store.
type IdempotencyStore ¶
type IdempotencyStore struct {
// contains filtered or unexported fields
}
IdempotencyStore checks whether a tool call has already completed successfully in this or a prior run, using a deterministic key derived from (agent_id, correlation_id, tool_name, argument_hash). Backed by SQLite via the same *sql.DB as the evidence store.
func NewIdempotencyStore ¶
func NewIdempotencyStore(db *sql.DB) (*IdempotencyStore, error)
NewIdempotencyStore creates the idempotency store and ensures the schema exists.
func (*IdempotencyStore) Check ¶
func (s *IdempotencyStore) Check(ctx context.Context, key IdempotencyKey, maxAge time.Duration) (IdempotencyResult, error)
Check looks up a tool call by its idempotency key. If maxAge > 0 and the stored row is completed, the row is treated as not found when completed_at is older than maxAge.
func (*IdempotencyStore) ClaimPending ¶
func (s *IdempotencyStore) ClaimPending(ctx context.Context, key IdempotencyKey, maxAge time.Duration) (claimed bool, err error)
ClaimPending atomically claims the slot for this key so only one caller may execute the tool. When maxAge > 0, it first tries to transition an expired completed row (completed_at older than maxAge) to pending; otherwise it inserts a new pending row. Returns true if this caller claimed the slot, false if another request already has it (pending or completed within TTL). Used to prevent duplicate execution after TTL expiry.
func (*IdempotencyStore) RecordCompleted ¶
func (s *IdempotencyStore) RecordCompleted(ctx context.Context, key IdempotencyKey, result []byte) error
RecordCompleted updates a pending entry to "completed" with the tool result.
func (*IdempotencyStore) RecordPending ¶
func (s *IdempotencyStore) RecordPending(ctx context.Context, key IdempotencyKey) error
RecordPending inserts a "pending" entry before tool execution begins. It does not transition an existing row (e.g. TTL-expired completed). Use ClaimPending when a TTL may have expired.
type IntentClassDefinition ¶
type IntentClassDefinition struct {
Class string `json:"class"`
DefaultRisk string `json:"default_risk"`
Description string `json:"description"`
Examples []string `json:"examples"`
}
IntentClassDefinition documents a supported operation class and example tool names.
func IntentClassCatalog ¶
func IntentClassCatalog() []IntentClassDefinition
IntentClassCatalog returns the supported intent classes for CLI/help output.
type IntentClassification ¶
type IntentClassification struct {
ToolName string `json:"tool_name"`
OperationClass string `json:"operation_class"`
RiskLevel string `json:"risk_level"`
IsBulk bool `json:"is_bulk"`
RequiresReview bool `json:"requires_review"`
Reason string `json:"reason"`
}
IntentClassification describes the inferred risk posture for a tool invocation.
func ClassifyToolIntent ¶
func ClassifyToolIntent(toolName string, paramsJSON []byte, planReviewCfg *PlanReviewConfig) *IntentClassification
ClassifyToolIntent classifies a tool call into operation class and review posture.
type OverrideStore ¶
type OverrideStore struct {
// contains filtered or unexported fields
}
OverrideStore provides in-memory runtime overrides per tenant. Thread-safe. Checked at Run() entry for lockdown and in buildLLMTools for tool disabling.
func NewOverrideStore ¶
func NewOverrideStore() *OverrideStore
NewOverrideStore creates an empty override store.
func (*OverrideStore) ClearOverride ¶
func (os *OverrideStore) ClearOverride(tenantID string)
ClearOverride removes all overrides for a tenant.
func (*OverrideStore) DisableTools ¶
func (os *OverrideStore) DisableTools(tenantID string, tools []string, reason string)
DisableTools adds tools to the disabled list for a tenant.
func (*OverrideStore) DisabledToolsFor ¶
func (os *OverrideStore) DisabledToolsFor(tenantID string) []string
DisabledToolsFor returns the list of disabled tools for a tenant.
func (*OverrideStore) EnableTools ¶
func (os *OverrideStore) EnableTools(tenantID string, tools []string)
EnableTools removes tools from the disabled list for a tenant.
func (*OverrideStore) Get ¶
func (os *OverrideStore) Get(tenantID string) *TenantOverride
Get returns the current override for a tenant, or nil if none.
func (*OverrideStore) IsLocked ¶
func (os *OverrideStore) IsLocked(tenantID string) bool
IsLocked returns true if the tenant is in lockdown mode.
func (*OverrideStore) ListAll ¶
func (os *OverrideStore) ListAll() map[string]TenantOverride
ListAll returns all tenant overrides.
func (*OverrideStore) SetLockdown ¶
func (os *OverrideStore) SetLockdown(tenantID string, locked bool, by string)
SetLockdown enables or disables tenant lockdown.
func (*OverrideStore) SetPolicyOverride ¶
func (os *OverrideStore) SetPolicyOverride(tenantID string, maxCostPerRun *float64, maxToolCalls *int)
SetPolicyOverride sets a tighter cost or tool call cap for a tenant. Only non-nil fields are updated; omit a field to leave it unchanged.
type PlanReviewConfig ¶
type PlanReviewConfig struct {
RequireForTools bool `yaml:"require_for_tools"`
RequireForTier string `yaml:"require_for_tier"`
CostThreshold float64 `yaml:"cost_threshold"`
TimeoutMinutes int `yaml:"timeout_minutes"`
NotifyWebhook string `yaml:"notify_webhook"`
VolumeThreshold int `yaml:"volume_threshold,omitempty"`
Mode string `yaml:"mode,omitempty"` // sequential | any
ApprovalChain []ApprovalLevel `yaml:"approval_chain,omitempty"`
}
PlanReviewConfig from .talon.yaml.
type PlanReviewStore ¶
type PlanReviewStore struct {
// contains filtered or unexported fields
}
PlanReviewStore persists execution plans for human review.
func NewPlanReviewStore ¶
func NewPlanReviewStore(db *sql.DB) (*PlanReviewStore, error)
NewPlanReviewStore creates the plan review store with SQLite backend.
func (*PlanReviewStore) Approve ¶
func (s *PlanReviewStore) Approve(ctx context.Context, planID, tenantID, reviewedBy string) error
Approve marks a plan as approved by a reviewer, scoped to tenantID.
func (*PlanReviewStore) Get ¶
func (s *PlanReviewStore) Get(ctx context.Context, planID, tenantID string) (*ExecutionPlan, error)
Get returns a single plan by ID, scoped to tenantID. Returns ErrPlanNotFound if the plan does not exist or belongs to another tenant.
func (*PlanReviewStore) GetApprovedUndispatched ¶
func (s *PlanReviewStore) GetApprovedUndispatched(ctx context.Context, tenantID string) ([]*ExecutionPlan, error)
GetApprovedUndispatched returns approved plans that have not been auto-dispatched yet. If tenantID is empty, all tenants are included.
func (*PlanReviewStore) GetPending ¶
func (s *PlanReviewStore) GetPending(ctx context.Context, tenantID string) ([]*ExecutionPlan, error)
GetPending returns all plans awaiting review, optionally filtered by tenant. Uses a bound time parameter (time.Now()) so the comparison matches go-sqlite3's serialization of timeout_at; datetime('now') would differ in format and break in non-UTC.
func (*PlanReviewStore) ListReviewed ¶
func (s *PlanReviewStore) ListReviewed(ctx context.Context, tenantID string, limit int) ([]ReviewHistoryEntry, error)
ListReviewed returns recently reviewed plans (approved/rejected/modified) for the dashboard review history.
func (*PlanReviewStore) MarkDispatched ¶
func (s *PlanReviewStore) MarkDispatched(ctx context.Context, planID, tenantID, dispatchErr string) error
MarkDispatched marks an approved plan as dispatched so it won't be executed again. dispatchErr is persisted for diagnostics (empty string means success).
func (*PlanReviewStore) Modify ¶
func (s *PlanReviewStore) Modify(ctx context.Context, planID, tenantID, reviewedBy string, annotations []Annotation) error
Modify marks a plan as approved-with-modifications and stores reviewer annotations, scoped to tenantID.
func (*PlanReviewStore) Reject ¶
func (s *PlanReviewStore) Reject(ctx context.Context, planID, tenantID, reviewedBy, reason string) error
Reject marks a plan as rejected with a reason, scoped to tenantID.
func (*PlanReviewStore) Save ¶
func (s *PlanReviewStore) Save(ctx context.Context, plan *ExecutionPlan) error
Save persists a new execution plan.
func (*PlanReviewStore) Stats ¶
Stats returns aggregate plan lifecycle counters, optionally scoped by tenant.
func (*PlanReviewStore) UpdateDispatchResult ¶
func (s *PlanReviewStore) UpdateDispatchResult(ctx context.Context, planID, tenantID, dispatchErr string) error
UpdateDispatchResult updates the dispatch_error for an already-claimed plan. Used after MarkDispatched("dispatching") to record the final dispatch outcome.
type PlanStats ¶
type PlanStats struct {
Pending int `json:"pending"`
Approved int `json:"approved"`
Rejected int `json:"rejected"`
Modified int `json:"modified"`
Dispatched int `json:"dispatched"`
DispatchFailures int `json:"dispatch_failures"`
}
PlanStats aggregates plan lifecycle counters for dashboards/CLI summaries.
type PlanStatus ¶
type PlanStatus string
PlanStatus represents the review state of an execution plan.
const ( PlanPending PlanStatus = "pending" PlanApproved PlanStatus = "approved" PlanRejected PlanStatus = "rejected" PlanModified PlanStatus = "modified" PlanAutoApproved PlanStatus = "auto_approved" PlanTimedOut PlanStatus = "timed_out" )
type ReviewHistoryEntry ¶
type ReviewHistoryEntry struct {
PlanID string `json:"plan_id"`
TenantID string `json:"tenant_id"`
AgentID string `json:"agent_id"`
Status PlanStatus `json:"status"`
ReviewedBy string `json:"reviewed_by"`
ReviewedAt *time.Time `json:"reviewed_at"`
}
ReviewHistoryEntry is a minimal record for the dashboard review history.
type RunRegistry ¶
type RunRegistry struct {
// contains filtered or unexported fields
}
RunRegistry tracks all in-flight runs with observable state. Thread-safe. Replaces ActiveRunTracker for state management while delegating count/cancel responsibilities.
func (*RunRegistry) ActiveRunCount ¶
func (rr *RunRegistry) ActiveRunCount() int
ActiveRunCount returns total active (non-terminal) runs across all tenants.
func (*RunRegistry) Count ¶
func (rr *RunRegistry) Count(tenantID string) int
Count returns the number of active (non-terminal) runs for a tenant.
func (*RunRegistry) Deregister ¶
func (rr *RunRegistry) Deregister(correlationID string)
Deregister removes a run from the registry. Called after terminal evidence is recorded and the run is fully cleaned up.
func (*RunRegistry) Get ¶
func (rr *RunRegistry) Get(correlationID string) *RunState
Get returns a snapshot of a run's state. Returns nil if not found.
func (*RunRegistry) IsPaused ¶
func (rr *RunRegistry) IsPaused(correlationID string) bool
IsPaused returns true if the run is currently in PAUSED state.
func (*RunRegistry) IsPausedWithCh ¶
func (rr *RunRegistry) IsPausedWithCh(correlationID string) (paused bool, ch <-chan struct{})
IsPausedWithCh atomically checks if a run is paused and returns the resume channel under a single lock acquisition. This avoids a race where Resume() closes the old channel and replaces it between separate IsPaused() and channel-read calls — the new channel would never be closed, hanging the agent loop until context cancellation.
func (*RunRegistry) Kill ¶
func (rr *RunRegistry) Kill(correlationID string) bool
Kill cancels a run and transitions it to TERMINATED. Returns true if found.
func (*RunRegistry) KillAllForTenant ¶
func (rr *RunRegistry) KillAllForTenant(tenantID string) int
KillAllForTenant cancels all non-terminal runs for a tenant. Returns count.
func (*RunRegistry) List ¶
func (rr *RunRegistry) List() []RunState
List returns snapshots of all active (non-terminal) runs.
func (*RunRegistry) ListAll ¶
func (rr *RunRegistry) ListAll() []RunState
ListAll returns snapshots of all runs including terminal ones still in the registry.
func (*RunRegistry) Pause ¶
func (rr *RunRegistry) Pause(correlationID string) bool
Pause sends a pause signal to a running run. The agentic loop checks IsPaused between iterations and blocks on WaitResume until resumed or killed.
func (*RunRegistry) Register ¶
func (rr *RunRegistry) Register(correlationID, tenantID, agentID, sessionID string, cancel context.CancelFunc)
Register adds a new run in QUEUED state.
func (*RunRegistry) Resume ¶
func (rr *RunRegistry) Resume(correlationID string) bool
Resume unpauses a paused run. Closes the pauseCh so WaitResume unblocks.
func (*RunRegistry) SetStatus ¶
func (rr *RunRegistry) SetStatus(correlationID string, status RunStatus, reason FailureReason)
SetStatus transitions a run to a new status. No-op if the run is not found or already in a terminal state.
func (*RunRegistry) UpdateMetrics ¶
func (rr *RunRegistry) UpdateMetrics(correlationID string, steps, toolCalls int, cost float64)
UpdateMetrics atomically updates step/cost/tool counters for a run.
type RunRequest ¶
type RunRequest struct {
TenantID string
AgentName string
Prompt string
AgentReasoning string // Optional caller-provided reasoning (e.g. X-Talon-Reasoning)
AgentVerified bool // True when per-agent signature has been verified by API layer
SessionID string // Optional session to join; empty means create a new one
Attachments []Attachment
InvocationType string // "manual", "scheduled", "webhook:name"
DryRun bool
PolicyPath string // explicit path to .talon.yaml
ToolInvocations []ToolInvocation // optional; when set, each is policy-checked and executed, and names recorded in evidence
SkipMemory bool // if true, do not write memory observation for this run (e.g. --no-memory)
SovereigntyMode string // optional: eu_strict | eu_preferred | global; when set, router uses OPA routing and records RouteDecision for evidence
BypassPlanReview bool // internal: when true, skip plan-review gate (used by approved-plan dispatcher)
}
RunRequest is the input for a single agent invocation.
type RunResponse ¶
type RunResponse struct {
Response string
EvidenceID string
SessionID string
Cost float64
DurationMS int64
PolicyAllow bool
DenyReason string
PlanPending string // set when execution is gated for human review (EU AI Act Art. 14)
ModelUsed string // LLM model used for generation
ToolsCalled []string // MCP tools invoked
InputTokens int // prompt tokens (for OpenAI-compatible API responses)
OutputTokens int // completion tokens (for OpenAI-compatible API responses)
// Dry-run / CLI feedback: PII and injection scan results (set even when DryRun is true).
PIIDetected []string // entity names detected in input
InputTier int // classification tier of input (0–2)
AttachmentInjectionsDetected int // number of injection patterns found in attachments
AttachmentBlocked bool // true if any attachment was blocked due to injection
}
RunResponse is the output of an agent invocation.
type RunState ¶
type RunState struct {
CorrelationID string `json:"correlation_id"`
TenantID string `json:"tenant_id"`
AgentID string `json:"agent_id"`
SessionID string `json:"session_id,omitempty"`
Status RunStatus `json:"status"`
FailureReason FailureReason `json:"failure_reason,omitempty"`
StartedAt time.Time `json:"started_at"`
UpdatedAt time.Time `json:"updated_at"`
StepCount int `json:"step_count"`
CostAccrued float64 `json:"cost_accrued"`
ToolCalls int `json:"tool_calls"`
DurationMS int64 `json:"duration_ms"`
// contains filtered or unexported fields
}
RunState captures the observable state of a single agent run.
type RunStatus ¶
type RunStatus string
RunStatus is the lifecycle state of an agent run.
const ( RunStatusQueued RunStatus = "queued" RunStatusRunning RunStatus = "running" RunStatusPaused RunStatus = "paused" RunStatusAwaitingApproval RunStatus = "awaiting_approval" RunStatusCompleted RunStatus = "completed" RunStatusFailed RunStatus = "failed" RunStatusTerminated RunStatus = "terminated" RunStatusBlocked RunStatus = "blocked" RunStatusDenied RunStatus = "denied" )
func (RunStatus) IsTerminal ¶
IsTerminal returns true for states that represent a finished run.
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner executes the full agent orchestration pipeline.
func NewRunner ¶
func NewRunner(cfg RunnerConfig) *Runner
NewRunner creates an agent runner with the given dependencies.
func (*Runner) OverrideStoreRef ¶
func (r *Runner) OverrideStoreRef() *OverrideStore
OverrideStoreRef returns the runner's OverrideStore (may be nil).
func (*Runner) Run ¶
func (r *Runner) Run(ctx context.Context, req *RunRequest) (*RunResponse, error)
Run executes the complete agent pipeline:
- Load policy
- Classify input (PII detection)
- Process attachments (extract + scan + sandbox)
- Evaluate policy (OPA)
- Check secrets access
- Route LLM (tier-based)
- Call LLM provider
- Classify output
- Generate evidence
func (*Runner) RunFromTrigger ¶
func (r *Runner) RunFromTrigger(ctx context.Context, agentName, prompt, invocationType string) error
RunFromTrigger implements the trigger.AgentRunner interface for cron/webhook execution. It uses the runner's default policy path so cron/webhook runs load the same .talon.yaml as the process (e.g. agent.talon.yaml), instead of deriving agentName+".talon.yaml".
func (*Runner) RunRegistryRef ¶
func (r *Runner) RunRegistryRef() *RunRegistry
RunRegistryRef returns the runner's RunRegistry (may be nil).
func (*Runner) ToolApprovalStoreRef ¶
func (r *Runner) ToolApprovalStoreRef() *ToolApprovalStore
ToolApprovalStoreRef returns the runner's ToolApprovalStore (may be nil).
type RunnerCacheConfig ¶
type RunnerCacheConfig struct {
Enabled bool
DefaultTTL int
SimilarityThreshold float64
MaxEntriesPerTenant int
}
RunnerCacheConfig is a subset of config.CacheConfig for the runner (avoids circular import).
type RunnerConfig ¶
type RunnerConfig struct {
PolicyDir string // base directory for policy path resolution
DefaultPolicyPath string // path to default .talon.yaml (e.g. agent.talon.yaml); used by RunFromTrigger when request has no PolicyPath
Classifier *classifier.Scanner
AttScanner *attachment.Scanner
Extractor *attachment.Extractor
Router *llm.Router
Secrets *secrets.SecretStore
Evidence *evidence.Store
PlanReview *PlanReviewStore
ToolRegistry *tools.ToolRegistry
ActiveRunTracker *ActiveRunTracker // optional; when set, rate-limit policy receives concurrent_executions
RunRegistry *RunRegistry // optional; when set, tracks run lifecycle states for observability and control
Overrides *OverrideStore // optional; when set, runtime overrides (lockdown, tool disable, cost caps) are checked
ToolApprovals *ToolApprovalStore // optional; when set, tools requiring approval pause for human decision
CircuitBreaker *CircuitBreaker // optional; when set, suspends agents after repeated policy denials
ToolFailures *ToolFailureTracker // optional; tracks tool execution failures separately from circuit breaker
Hooks *HookRegistry // optional; nil = no hooks
Memory *memory.Store // optional; nil = memory disabled
Pricing *pricing.PricingTable // optional; when set, evidence gets pre_request_estimate and post_request_cost
// Semantic cache (all optional; when nil or CacheConfig.Enabled false, cache is skipped)
CacheStore *cache.Store
CacheEmbedder *cache.BM25
CacheScrubber *cache.PIIScrubber
CachePolicy *cache.Evaluator
CacheConfig *RunnerCacheConfig
SessionStore *talonsession.Store
PromptStore *talonprompt.Store
Idempotency *IdempotencyStore // optional; deduplicates tool calls by (agent, correlation, tool, args_hash)
}
RunnerConfig holds the dependencies for constructing a Runner.
type TenantOverride ¶
type TenantOverride struct {
Lockdown bool `json:"lockdown"` // When true, all new runs are rejected
LockdownAt time.Time `json:"lockdown_at,omitempty"` // When lockdown was activated
LockdownBy string `json:"lockdown_by,omitempty"` // Admin identifier who set lockdown
DisabledTools []string `json:"disabled_tools,omitempty"` // Tools blocked by override
DisableReason string `json:"disable_reason,omitempty"` // Incident ID or explanation
MaxCostPerRun *float64 `json:"max_cost_per_run,omitempty"` // Tighter cost cap (overrides YAML)
MaxToolCalls *int `json:"max_tool_calls,omitempty"` // Tighter tool call cap
}
TenantOverride holds runtime overrides for a single tenant. Overrides take precedence over .talon.yaml policy and are applied at Run() entry and tool dispatch.
type ToolApprovalRequest ¶
type ToolApprovalRequest struct {
ID string `json:"id"`
CorrelationID string `json:"correlation_id"`
TenantID string `json:"tenant_id"`
AgentID string `json:"agent_id"`
ToolName string `json:"tool_name"`
ToolCallID string `json:"tool_call_id"`
Arguments map[string]any `json:"arguments"`
Status ToolApprovalStatus `json:"status"`
CreatedAt time.Time `json:"created_at"`
ResolvedAt time.Time `json:"resolved_at,omitempty"`
ResolvedBy string `json:"resolved_by,omitempty"`
Reason string `json:"reason,omitempty"`
// contains filtered or unexported fields
}
ToolApprovalRequest represents a pending tool execution awaiting human approval.
type ToolApprovalStatus ¶
type ToolApprovalStatus string
ToolApprovalStatus tracks the state of a tool approval request.
const ( ToolApprovalPending ToolApprovalStatus = "pending" ToolApprovalApproved ToolApprovalStatus = "approved" ToolApprovalDenied ToolApprovalStatus = "denied" ToolApprovalTimeout ToolApprovalStatus = "timeout" ToolApprovalCancelled ToolApprovalStatus = "cancelled" )
type ToolApprovalStore ¶
type ToolApprovalStore struct {
// contains filtered or unexported fields
}
ToolApprovalStore tracks pending tool approval requests. The agentic loop creates a request and blocks on the channel until approval/denial/timeout.
func NewToolApprovalStore ¶
func NewToolApprovalStore(timeout time.Duration) *ToolApprovalStore
NewToolApprovalStore creates a store with the given default approval timeout.
func (*ToolApprovalStore) Approve ¶
func (s *ToolApprovalStore) Approve(reqID, approvedBy, reason string) bool
Approve approves a pending tool execution. Returns true if the request was found and pending.
func (*ToolApprovalStore) Cleanup ¶
func (s *ToolApprovalStore) Cleanup(olderThan time.Duration) int
Cleanup removes resolved requests older than the given duration.
func (*ToolApprovalStore) Deny ¶
func (s *ToolApprovalStore) Deny(reqID, deniedBy, reason string) bool
Deny rejects a pending tool execution. Returns true if the request was found and pending.
func (*ToolApprovalStore) Get ¶
func (s *ToolApprovalStore) Get(reqID string) *ToolApprovalRequest
Get returns a specific approval request.
func (*ToolApprovalStore) ListPending ¶
func (s *ToolApprovalStore) ListPending() []ToolApprovalRequest
ListPending returns all pending approval requests.
func (*ToolApprovalStore) RequestApproval ¶
func (s *ToolApprovalStore) RequestApproval(ctx context.Context, correlationID, tenantID, agentID, toolName, toolCallID string, args map[string]any) ToolApprovalStatus
RequestApproval creates a pending approval and blocks until approved, denied, or timed out. Returns the final status. The caller should check the status to decide whether to execute.
type ToolCallResult ¶
type ToolCallResult struct {
Content string
Executed bool
ToolName string
PIIFindings []ToolPIIFinding
ExecutionError string // non-empty when tool.Execute() returned an error (distinct from policy denial)
}
ToolCallResult extends the basic return values with PII findings for evidence.
type ToolFailureTracker ¶
type ToolFailureTracker struct {
// contains filtered or unexported fields
}
ToolFailureTracker counts tool execution failures per agent, separate from the circuit breaker (which only tracks policy denials). When a tool execution fails (allowed by policy but the tool returned an error), the failure is recorded here for operator alerting without suspending the agent.
func NewToolFailureTracker ¶
func NewToolFailureTracker(threshold int, window time.Duration) *ToolFailureTracker
NewToolFailureTracker creates a tracker. When an agent exceeds threshold failures within window, a warning is logged for operator alerting. threshold <= 0 defaults to 10; window <= 0 defaults to 5 minutes.
func (*ToolFailureTracker) FailureCount ¶
func (t *ToolFailureTracker) FailureCount(tenantID, agentID string) int
FailureCount returns the current failure count within the window for an agent.
func (*ToolFailureTracker) RecordToolFailure ¶
func (t *ToolFailureTracker) RecordToolFailure(tenantID, agentID, toolName, errMsg string) bool
RecordToolFailure records a tool execution failure for the agent. If the threshold is exceeded within the window, an operator alert is logged. Returns true if the alert threshold was just crossed.
type ToolInvocation ¶
type ToolInvocation struct {
Name string `json:"name"`
Params json.RawMessage `json:"params"`
}
ToolInvocation represents a single tool call (e.g. from MCP or a future agent loop).
type ToolPIIFinding ¶
type ToolPIIFinding struct {
Field string `json:"field"`
Action string `json:"action"`
PIITypes []string `json:"pii_types"`
PIICount int `json:"pii_count"`
Direction string `json:"direction"` // "argument" or "result"
}
ToolPIIFinding records a PII detection in a tool argument or result.
type WebhookHook ¶
type WebhookHook struct {
// contains filtered or unexported fields
}
WebhookHook sends HTTP POST to a configured URL.
func NewWebhookHook ¶
func NewWebhookHook(point HookPoint, config HookConfig) *WebhookHook
NewWebhookHook creates a webhook hook from config.
func (*WebhookHook) Execute ¶
func (h *WebhookHook) Execute(ctx context.Context, data *HookData) (*HookResult, error)
Execute sends the hook data as JSON POST to the configured URL. It only delivers when the configured On filter matches the payload outcome (e.g. on: "denied" only for denials).
func (*WebhookHook) Point ¶
func (h *WebhookHook) Point() HookPoint
Point returns the pipeline point this hook is registered at.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package graphadapter defines the framework-agnostic event contract for governing external agent runtimes (LangGraph, LangChain, OpenAI SDK, etc.).
|
Package graphadapter defines the framework-agnostic event contract for governing external agent runtimes (LangGraph, LangChain, OpenAI SDK, etc.). |
|
Package tools provides a thread-safe registry for MCP-compatible tools that agents can invoke during execution.
|
Package tools provides a thread-safe registry for MCP-compatible tools that agents can invoke during execution. |