observatory

package
v0.14.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2026 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Overview

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides cross-trace merging and session-based correlation for hierarchy views.

Package observatory provides turn-based span grouping for hierarchy views.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides database migrations v8-v14 for the observatory schema.

Package observatory provides a unified observability platform for AILANG. It stores and queries traces, spans, metrics, and events from AILANG operations and external CLI tools (Claude Code, Gemini CLI).

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides helper functions for the OTLP receiver.

Package observatory provides OTLP metrics handling for the observatory.

Package observatory provides outlier detection for span metrics within tasks.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides span hierarchy query operations.

Package observatory provides execution chain CRUD operations.

Package observatory provides eval assessment operations for execution chains.

Package observatory provides chain query helpers, stats, and journey operations.

Package observatory provides chat message query methods for the Store.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides session tracking for Claude Code hooks.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides trace listing and task span summary operations.

Package observatory provides a unified observability platform for AILANG.

Package observatory provides a unified observability platform for AILANG.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CalculateCacheSavings

func CalculateCacheSavings(model string, cacheRead int64) float64

CalculateCacheSavings calculates how much was saved by using cache reads. Returns the difference between what full-price input would have cost and cache read cost. Cache reads are 90% cheaper than regular input tokens.

func CalculateCostFromTokens

func CalculateCostFromTokens(model string, tokensIn, tokensOut int64) float64

CalculateCostFromTokens calculates cost from model name and token counts. Returns 0.0 if:

  • Pricing config not loaded
  • Model not found in config
  • Tokens are zero

This follows the "no silent fallbacks" principle - return 0 rather than guess.

func CalculateCostFromTokensWithCache

func CalculateCostFromTokensWithCache(model string, tokensIn, tokensOut, cacheRead, cacheCreation int64) float64

CalculateCostFromTokensWithCache calculates cost including cache token pricing. Cache read tokens are charged at 10% of input price (90% discount). Cache creation tokens are charged at 125% of input price (25% premium). Returns 0.0 if model not found or tokens are all zero.

func CheckHealth

func CheckHealth(dbPath string)

CheckHealth examines the observatory DB size and takes action. Fast path: just os.Stat (no DB open). Only opens the DB if cleanup is needed.

Thresholds:

  • < 200MB: ok (no output)
  • 200-500MB: warn (log only)
  • 500MB-2GB: auto-cleanup (run retention)
  • > 2GB: loud warning, no destructive action

The >2GB branch intentionally does NOT delete the database. The previous implementation called os.Remove(dbPath) on this threshold, which destroyed production data if the WAL bloated past 2GB (e.g. when retention issued a large DELETE with a long-lived reader holding back the WAL checkpoint). Silent data destruction violates the project's "no silent fallbacks" rule. If the DB is genuinely >2GB, the user needs to intervene (stop the server, VACUUM, etc.) — we surface the problem loudly and leave the data alone.

func DefaultDatabasePath

func DefaultDatabasePath() string

DefaultDatabasePath returns the default path for the observatory database.

func GetPricingConfig

func GetPricingConfig() *eval_harness.ModelsConfig

GetPricingConfig returns the loaded pricing configuration. Returns nil if not loaded. Used for testing.

func Migrate

func Migrate(db *sql.DB) error

Migrate runs database migrations to create or update the observatory schema. It is idempotent - safe to call multiple times.

func MigrateWithVersion

func MigrateWithVersion(db *sql.DB) (int, error)

MigrateWithVersion runs migrations and tracks schema version. Returns the current schema version after migration.

func RecalculateAgentAssignmentAggregates

func RecalculateAgentAssignmentAggregates(ctx context.Context, db *sql.DB, assignmentID string) error

RecalculateAgentAssignmentAggregates recalculates all aggregate metrics for an agent assignment. Use this for backfill operations or to fix inconsistent aggregates. Deprecated: Use Store.RecalculateAgentAssignmentAggregates instead for encapsulation.

func RecalculateTaskAggregates

func RecalculateTaskAggregates(ctx context.Context, db *sql.DB, taskID string) error

RecalculateTaskAggregates recalculates all aggregate metrics for a task from its spans. Use this for backfill operations or to fix inconsistent aggregates. Deprecated: Use Store.RecalculateTaskAggregates instead for encapsulation.

func ResetPricingConfig

func ResetPricingConfig()

ResetPricingConfig resets the pricing config for testing. NOT for production use.

func UpdateAgentAssignmentAggregates

func UpdateAgentAssignmentAggregates(ctx context.Context, tx *sql.Tx, span *Span) error

UpdateAgentAssignmentAggregates updates an agent assignment's metrics based on a new span. This should be called within a transaction when creating a span with an agent_assignment_id.

func UpdateTaskAggregates

func UpdateTaskAggregates(ctx context.Context, tx *sql.Tx, span *Span) error

UpdateTaskAggregates updates a task's aggregate metrics based on a new span. This should be called within a transaction when creating a span with a task_id.

func ValidateSchema

func ValidateSchema(db *sql.DB) error

ValidateSchema checks that all expected tables exist. Returns nil if schema is valid, error describing what's missing otherwise. NOTE: span_events table removed in v4 migration (M-DB-CLEANUP) NOTE: chat_messages and chat_import_status added in v6 migration (M-CHAT-HISTORY-DB) NOTE: execution_chains and chain_stages added in v7 migration (M-CHAINS-SIMPLIFY)

Types

type API

type API struct {
	// contains filtered or unexported fields
}

API provides HTTP handlers for the observatory REST API.

func NewAPI

func NewAPI(backend Backend) *API

NewAPI creates a new API handler.

func (*API) RegisterRoutes

func (a *API) RegisterRoutes(mux *http.ServeMux)

RegisterRoutes registers all observatory routes on the given mux. All routes are prefixed with /api/observatory/

type AgentAssignment

type AgentAssignment struct {
	ID                 string                `json:"id"`
	TaskID             string                `json:"task_id"`
	AgentID            string                `json:"agent_id"`
	Provider           Provider              `json:"provider"`
	Status             AgentAssignmentStatus `json:"status"`
	AssignedAt         time.Time             `json:"assigned_at"`
	StartedAt          *time.Time            `json:"started_at,omitempty"`
	CompletedAt        *time.Time            `json:"completed_at,omitempty"`
	ParentAssignmentID string                `json:"parent_assignment_id,omitempty"`

	// Agent-level aggregates
	DurationMs int64   `json:"duration_ms"`
	TokensIn   int64   `json:"tokens_in"`
	TokensOut  int64   `json:"tokens_out"`
	CostUSD    float64 `json:"cost_usd"`
	ToolCalls  int     `json:"tool_calls"`
	Turns      int     `json:"turns"`
}

AgentAssignment represents a coordinator → agent delegation.

type AgentAssignmentStatus

type AgentAssignmentStatus string

AgentAssignmentStatus represents the status of an agent assignment.

const (
	AgentStatusPending   AgentAssignmentStatus = "pending"
	AgentStatusRunning   AgentAssignmentStatus = "running"
	AgentStatusCompleted AgentAssignmentStatus = "completed"
	AgentStatusFailed    AgentAssignmentStatus = "failed"
)

type AgentHierarchy

type AgentHierarchy struct {
	Agent  *AgentAssignment  `json:"agent"`
	Traces []*TraceHierarchy `json:"traces"`
}

AgentHierarchy represents an agent assignment with its spans grouped by trace.

type AgentStats

type AgentStats struct {
	AgentID         string   `json:"agent_id"`
	Provider        Provider `json:"provider"`
	ExecutionCount  int      `json:"execution_count"`
	TotalDurationMs int64    `json:"total_duration_ms"`
	AvgDurationMs   float64  `json:"avg_duration_ms"`
	TotalTokensIn   int64    `json:"total_tokens_in"`
	TotalTokensOut  int64    `json:"total_tokens_out"`
	TotalCost       float64  `json:"total_cost"`
	TotalToolCalls  int      `json:"total_tool_calls"`
	SuccessRate     float64  `json:"success_rate"`
}

AgentStats contains aggregated metrics for an agent.

type AgentStatsResult

type AgentStatsResult struct {
	AgentID   string  `json:"agent_id"`
	Stages    int     `json:"stages"`
	Completed int     `json:"completed"`
	Failed    int     `json:"failed"`
	TotalCost float64 `json:"total_cost"`
	TokensIn  int     `json:"total_tokens_in"`
	TokensOut int     `json:"total_tokens_out"`
}

AgentStatsResult holds per-agent aggregated stats from a single SQL query. Used by GetChainStatsByAgent to replace the N+1 query pattern (M-PERF-OBSERVATORY).

type ApprovalStatus

type ApprovalStatus string

ApprovalStatus represents the status of an approval event.

const (
	ApprovalStatusPending  ApprovalStatus = "pending"
	ApprovalStatusApproved ApprovalStatus = "approved"
	ApprovalStatusRejected ApprovalStatus = "rejected"
)

type ApprovalType

type ApprovalType string

ApprovalType represents the type of approval required.

const (
	ApprovalTypeMerge        ApprovalType = "merge"
	ApprovalTypeHandoff      ApprovalType = "handoff"
	ApprovalTypeMergeHandoff ApprovalType = "merge_handoff"
)

type Backend

type Backend interface {
	// Workspace operations
	CreateWorkspace(ctx context.Context, w *Workspace) error
	GetWorkspace(ctx context.Context, id string) (*Workspace, error)
	ListWorkspaces(ctx context.Context) ([]*Workspace, error)
	UpdateWorkspace(ctx context.Context, w *Workspace) error
	DeleteWorkspace(ctx context.Context, id string) error
	GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)

	// Task operations
	CreateTask(ctx context.Context, t *Task) error
	GetTask(ctx context.Context, id string) (*Task, error)
	ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)
	UpdateTask(ctx context.Context, t *Task) error
	DeleteTask(ctx context.Context, id string) error

	// Agent assignment operations
	CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error
	GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)
	ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)
	UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error
	DeleteAgentAssignment(ctx context.Context, id string) error
	GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)

	// Span operations
	CreateSpan(ctx context.Context, span *Span) error
	GetSpan(ctx context.Context, id string) (*Span, error)
	ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)
	UpdateSpan(ctx context.Context, span *Span) error
	UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error
	RecalculateTaskAggregates(ctx context.Context, taskID string) error
	DeleteSpan(ctx context.Context, id string) error
	GetTrace(ctx context.Context, traceID string) (*Trace, error)
	ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)
	// LookupTaskBySessionID finds task hierarchy for Claude Code session correlation
	LookupTaskBySessionID(ctx context.Context, sessionID string) (taskID, assignmentID, traceID string)
	// LinkOrphanedSpansBySession updates spans with matching session.id that lack task linkage
	// Called after storing claude.execute span to retroactively link orphaned Claude Code events
	LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)

	// Session operations (M-SESSION-WORKSPACE-HOOKS)
	// GetSessionWorkspace returns workspace for a session (for span enrichment)
	GetSessionWorkspace(sessionID string) (string, error)
	// UpsertSession inserts or updates a session record from hook data
	UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error
	// UpsertSessionWithCorrelation inserts/updates a session with correlation IDs (M-DETERMINISTIC-CHAT-LINKING)
	UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, corr *SessionCorrelation) error
	// UpdateSessionEnded marks a session as ended
	UpdateSessionEnded(ctx context.Context, sessionID string) error
	// InsertToolStart records the start of a tool call
	InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error
	// FindLatestUnfinishedTool finds the most recent tool call that hasn't completed yet
	// Used to correlate PostToolUse with PreToolUse when tool_use_id is not provided
	FindLatestUnfinishedTool(ctx context.Context, sessionID, toolName string) (string, error)
	// UpdateToolEnd records the completion of a tool call
	UpdateToolEnd(ctx context.Context, toolUseID, toolResponse string, success bool) error
	// GetToolForSpan finds the session_tool that best matches a span by timestamp + tool name
	GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)
	// BackfillSpansWorkspace updates existing spans that have session.id but missing workspace
	BackfillSpansWorkspace(ctx context.Context, sessionID, workspace string) (int64, error)

	// Span event operations
	CreateSpanEvent(ctx context.Context, e *SpanEvent) error
	GetSpanEvents(ctx context.Context, spanID string) ([]SpanEvent, error)
	DeleteSpanEvent(ctx context.Context, id int64) error

	// Message operations
	CreateMessage(ctx context.Context, m *Message) error
	GetMessage(ctx context.Context, id string) (*Message, error)
	ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)
	UpdateMessage(ctx context.Context, m *Message) error
	DeleteMessage(ctx context.Context, id string) error
	MarkMessageRead(ctx context.Context, id string) error
	MarkMessageArchived(ctx context.Context, id string) error

	// Aggregate operations
	GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)
	GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)
	GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)
	// GetExecTaskHierarchy returns hierarchy of ailang command spans (exec, run, check)
	GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)
	// GetExecTaskHierarchyWithMessages returns hierarchy grouped by triggering messages (4-level)
	GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)
	// GetSpanHierarchy returns hierarchical tree of spans using parent_span_id relationships
	GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)
	// GetToolsByTimestampRange returns session tools within a time range for enrichment
	GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)

	// Metric operations (Claude Code telemetry metrics)
	CreateMetric(ctx context.Context, m *Metric) error
	ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)
	GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)

	// Session detail operations (M-CHAINS-SOURCE-OF-TRUTH)
	GetSession(ctx context.Context, sessionID string) (*Session, error)
	GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)

	// Chat message operations (M-CHAINS-SOURCE-OF-TRUTH)
	GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)
	GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)
	CountChatMessages(ctx context.Context, q ChatMessageQuery) (total int, withTaskID int, err error)

	// Chain operations (M-CHAINS-SIMPLIFY unified hierarchy)
	CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)
	GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)
	GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)
	GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)
	GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)
	ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)
	UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error
	UpdateChainMetrics(ctx context.Context, id string, cost float64, tokens, turns int) error
	GetChainStats(ctx context.Context) (*ChainStats, error)
	// GetChainStatusCounts returns chain counts grouped by status (single query, M-PERF-OBSERVATORY).
	GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)
	// GetChainStatsByAgent returns per-agent stats in a single SQL query (replaces N+1, M-PERF-OBSERVATORY).
	GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)

	// Chain stage operations
	CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)
	GetStage(ctx context.Context, id string) (*ChainStage, error)
	GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)
	UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error
	UpdateStageSession(ctx context.Context, stageID, sessionID string) error
	UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, approvalType ApprovalType, feedback string) error
	UpdateStageMetrics(ctx context.Context, stageID string, cost float64, tokensIn, tokensOut, turns, toolCalls int, durationMs int64) error
	UpdateStageError(ctx context.Context, stageID, errorMessage string) error
	GetSpansByStageID(ctx context.Context, stageID string) ([]*Span, error)
	// GetSpanLitesByStageID returns lightweight spans without attributes (M-PERF-OBSERVATORY).
	GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)
	LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error
	ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)

	// Lifecycle
	Close() error
}

Backend defines the interface for observatory storage backends. Implementations include SQLite (local), GCP Trace, and Jaeger.

type BreakdownItem

type BreakdownItem struct {
	ID         string  `json:"id"`
	Label      string  `json:"label"`
	SpanCount  int     `json:"span_count"`
	TaskCount  int     `json:"task_count,omitempty"`
	TokensIn   int64   `json:"tokens_in"`
	TokensOut  int64   `json:"tokens_out"`
	CostUSD    float64 `json:"cost_usd"`
	DurationMs int64   `json:"duration_ms"` // Total execution time in ms

	// Cache metrics
	CacheReadTokens     int64   `json:"cache_read_tokens"`
	CacheCreationTokens int64   `json:"cache_creation_tokens"`
	CacheSavingsUSD     float64 `json:"cache_savings_usd"`
}

BreakdownItem represents a single item in a breakdown aggregation

type ChainCreateRequest

type ChainCreateRequest struct {
	SourceType        ChainSourceType `json:"source_type"`
	SourceRef         string          `json:"source_ref,omitempty"`
	GitHubRepo        string          `json:"github_repo,omitempty"`
	GitHubIssueNumber int             `json:"github_issue_number,omitempty"`
	WorkspaceID       string          `json:"workspace_id,omitempty"`
	WorkspacePath     string          `json:"workspace_path,omitempty"`
}

ChainCreateRequest contains the data needed to create a new chain.

type ChainListOptions

type ChainListOptions struct {
	Status       ChainStatus `json:"status,omitempty"`
	SourceType   string      `json:"source_type,omitempty"`
	WorkspaceID  string      `json:"workspace_id,omitempty"`
	GitHubRepo   string      `json:"github_repo,omitempty"`
	AgentID      string      `json:"agent_id,omitempty"`
	CreatedAfter *time.Time  `json:"created_after,omitempty"`
	Limit        int         `json:"limit,omitempty"`
	Offset       int         `json:"offset,omitempty"`
}

ChainListOptions specifies filters for listing chains.

type ChainReadOptions

type ChainReadOptions struct {
	IncludeStages   bool `json:"include_stages"`   // Fetch stages
	IncludeSpans    bool `json:"include_spans"`    // Fetch spans for each stage
	IncludeSessions bool `json:"include_sessions"` // Fetch session data for each stage
}

ChainWithStages returns the chain with all its stages populated. Options control what additional data is fetched.

func DefaultChainReadOptions

func DefaultChainReadOptions() ChainReadOptions

DefaultChainReadOptions returns options for a full chain read.

type ChainSourceType

type ChainSourceType string

ChainSourceType represents the origin of a chain.

const (
	ChainSourceGitHubIssue ChainSourceType = "github_issue"
	ChainSourceMessage     ChainSourceType = "message"
	ChainSourceManual      ChainSourceType = "manual"
	ChainSourceEvalSuite   ChainSourceType = "eval_suite"
)

type ChainStage

type ChainStage struct {
	ID          string `json:"id"` // UUID
	ChainID     string `json:"chain_id"`
	StageNumber int    `json:"stage_number"`

	// Agent info
	AgentID  string   `json:"agent_id"`
	Provider Provider `json:"provider,omitempty"`

	// Links to other tables (foreign keys or cross-DB IDs)
	MessageID string `json:"message_id,omitempty"` // -> collaboration.db:inbox_messages.id
	TaskID    string `json:"task_id,omitempty"`    // -> coordinator.db:tasks.id
	SessionID string `json:"session_id,omitempty"` // -> sessions.session_id

	// State
	Status         ChainStageStatus `json:"status"`
	ApprovalStatus ApprovalStatus   `json:"approval_status,omitempty"`
	ApprovalType   ApprovalType     `json:"approval_type,omitempty"`

	// Handoff info
	HandoffTo     string `json:"handoff_to,omitempty"`
	Iteration     int    `json:"iteration"`
	HumanFeedback string `json:"human_feedback,omitempty"`

	// Timestamps
	StartedAt   *time.Time `json:"started_at,omitempty"`
	CompletedAt *time.Time `json:"completed_at,omitempty"`

	// Summary (denormalized from spans)
	Cost       float64 `json:"cost"`
	TokensIn   int     `json:"tokens_in"`
	TokensOut  int     `json:"tokens_out"`
	Turns      int     `json:"turns"`
	ToolCalls  int     `json:"tool_calls"`
	DurationMs int64   `json:"duration_ms"`

	// Error tracking
	ErrorMessage string `json:"error_message,omitempty"`
	ErrorCount   int    `json:"error_count"`

	// Eval assessment (M-EVAL-CHAINS: populated for eval_suite chains)
	EvalAssessment *EvalAssessment `json:"eval_assessment,omitempty"`

	// Session data (populated on read with full=true)
	Session *Session `json:"session,omitempty"`

	// Spans (populated on read with include_spans=true)
	Spans []*Span `json:"spans,omitempty"`
}

ChainStage represents a single agent execution within a chain.

type ChainStageStatus

type ChainStageStatus string

ChainStageStatus represents the status of a chain stage.

const (
	StageStatusPending          ChainStageStatus = "pending"
	StageStatusRunning          ChainStageStatus = "running"
	StageStatusAwaitingApproval ChainStageStatus = "awaiting_approval"
	StageStatusCompleted        ChainStageStatus = "completed"
	StageStatusFailed           ChainStageStatus = "failed"
)

type ChainStats

type ChainStats struct {
	TotalChains        int     `json:"total_chains"`
	ActiveChains       int     `json:"active_chains"`
	PendingApprovals   int     `json:"pending_approvals"`
	CompletedChains    int     `json:"completed_chains"`
	FailedChains       int     `json:"failed_chains"`
	TotalCost          float64 `json:"total_cost"`
	TotalTokens        int64   `json:"total_tokens"`
	AverageStagesCount float64 `json:"average_stages_count"`
	AverageDurationMs  float64 `json:"average_duration_ms"`
}

ChainStats contains aggregate statistics about chains.

type ChainStatus

type ChainStatus string

ChainStatus represents the status of an execution chain.

const (
	ChainStatusActive          ChainStatus = "active"
	ChainStatusPendingApproval ChainStatus = "pending_approval"
	ChainStatusCompleted       ChainStatus = "completed"
	ChainStatusFailed          ChainStatus = "failed"
)

type ChainStatusCounts

type ChainStatusCounts struct {
	Total       int     `json:"total_chains"`
	Completed   int     `json:"completed"`
	Active      int     `json:"active"`
	Pending     int     `json:"pending_approval"`
	Failed      int     `json:"failed"`
	TotalCost   float64 `json:"total_cost"`
	TotalTokens int64   `json:"total_tokens"`
}

ChainStatusCounts holds per-status chain counts from a single SQL aggregation.

type ChainSummary

type ChainSummary struct {
	ID                string      `json:"id"`
	SourceType        string      `json:"source_type"`
	SourceRef         string      `json:"source_ref"`
	GitHubRepo        string      `json:"github_repo,omitempty"`
	GitHubIssueNumber int         `json:"github_issue_number,omitempty"`
	Status            ChainStatus `json:"status"`
	CurrentStage      int         `json:"current_stage"`
	TotalCost         float64     `json:"total_cost"`
	TotalTokens       int         `json:"total_tokens"`
	TotalTurns        int         `json:"total_turns"`
	StagesCompleted   int         `json:"stages_completed"`
	CreatedAt         time.Time   `json:"created_at"`
	CompletedAt       *time.Time  `json:"completed_at,omitempty"`
	StageCount        int         `json:"stage_count"`
	MaxStage          int         `json:"max_stage"`
	AgentFlow         string      `json:"agent_flow"` // e.g., "design-doc-creator -> sprint-planner -> sprint-executor"
}

ChainSummary is a lightweight view of a chain for list views.

type ChatContext

type ChatContext struct {
	UserPrompt        string `json:"user_prompt,omitempty"`        // First 500 chars of user prompt
	AssistantResponse string `json:"assistant_response,omitempty"` // First 500 chars of response
	HasThinking       bool   `json:"has_thinking"`
	TurnNumber        int    `json:"turn_number,omitempty"`
	FullChatURL       string `json:"full_chat_url,omitempty"` // Link to full conversation
}

ChatContext contains conversation content for spans with chat history. Populated when include_chat=true on API requests. This enables embedding chat prompts/responses directly in span data.

type ChatMessage

type ChatMessage struct {
	ID          string    `json:"id"`
	SessionID   string    `json:"session_id"`
	TurnNumber  int       `json:"turn_number"`
	Role        string    `json:"role"`
	ContentJSON string    `json:"content_json,omitempty"` // Raw JSON content blocks
	TokensIn    int       `json:"tokens_in,omitempty"`
	TokensOut   int       `json:"tokens_out,omitempty"`
	Model       string    `json:"model,omitempty"`
	Timestamp   time.Time `json:"timestamp"`
	TaskID      string    `json:"task_id,omitempty"`
	ChainID     string    `json:"chain_id,omitempty"`
}

ChatMessage represents a stored chat message from a Claude/Gemini session.

type ChatMessageQuery

type ChatMessageQuery struct {
	TaskID    string
	SessionID string
	StartTime time.Time
	EndTime   time.Time
	Limit     int
}

ChatMessageQuery options for filtering chat messages.

type ClaudeCodeEvent

type ClaudeCodeEvent struct {
	ID             string  `json:"id"`           // span_id (also used as task_id for hierarchy lookup)
	CreatedAt      string  `json:"created_at"`   // ISO8601 timestamp (matches inbox message format)
	Type           string  `json:"message_type"` // "claude_code_turn"
	FromAgent      string  `json:"from_agent"`   // Agent ID (e.g., "design-doc-creator") or "claude-code" for user sessions
	ToInbox        string  `json:"to_inbox"`     // Agent inbox (e.g., "design-doc-creator") or "user" for user sessions
	Title          string  `json:"title"`        // "Claude Code Turn ($X.XX)"
	TaskID         string  `json:"task_id"`      // Same as ID for hierarchy lookup
	Status         string  `json:"status"`       // "read" (not actionable like inbox messages)
	CostUSD        float64 `json:"cost_usd"`
	TokensIn       int64   `json:"tokens_in"`
	TokensOut      int64   `json:"tokens_out"`
	DurationMs     int     `json:"duration_ms"`
	Workspace      string  `json:"workspace,omitempty"`       // Working directory (from resource attributes)
	Model          string  `json:"model,omitempty"`           // Model used for this event (e.g., "claude-sonnet-4-5")
	Provider       string  `json:"provider,omitempty"`        // AI provider (e.g., "claude", "gemini")
	Directive      string  `json:"directive,omitempty"`       // Initial user prompt (truncated preview)
	DirectiveFull  string  `json:"directive_full,omitempty"`  // Full directive (for detail views)
	TurnCount      int     `json:"turn_count"`                // Number of turns in session
	MetricsSummary string  `json:"metrics_summary,omitempty"` // "3 turns • $0.42 • 12.5s"
}

ClaudeCodeEvent represents a Claude Code API request span formatted as an event for the Event Queue. Each api_request span becomes an event that can be clicked to show its hierarchy (tool calls correlated by timestamp).

type ClaudeEvent

type ClaudeEvent struct {
	Type         string         `json:"type"` // "tool_call", "approval", "error"
	Timestamp    time.Time      `json:"timestamp"`
	ToolName     string         `json:"tool_name,omitempty"`
	ToolInput    map[string]any `json:"tool_input,omitempty"`
	ToolResult   string         `json:"tool_result,omitempty"`
	Approved     *bool          `json:"approved,omitempty"`
	ErrorMessage string         `json:"error_message,omitempty"`
}

ClaudeEvent represents an event from Claude Code CLI.

type ClaudeMetrics

type ClaudeMetrics struct {
	SessionID     string    `json:"session_id"`
	TaskID        string    `json:"task_id,omitempty"`
	Model         string    `json:"model"`
	TotalTokensIn int64     `json:"total_input_tokens"`
	TotalTokenOut int64     `json:"total_output_tokens"`
	TotalCostUSD  float64   `json:"total_cost_usd"`
	DurationMs    int64     `json:"duration_ms"`
	TurnCount     int       `json:"turn_count"`
	ToolCalls     int       `json:"tool_calls"`
	StartTime     time.Time `json:"start_time"`
	EndTime       time.Time `json:"end_time"`
	Status        string    `json:"status"` // "completed", "failed", "cancelled"
	ErrorMessage  string    `json:"error_message,omitempty"`

	// Events captured during session
	Events []ClaudeEvent `json:"events,omitempty"`
}

ClaudeMetrics represents metrics exported by Claude Code CLI. Claude exports metrics/events only, not full OTEL traces.

func ParseClaudeMetricsJSON

func ParseClaudeMetricsJSON(data []byte) (*ClaudeMetrics, error)

ParseClaudeMetricsJSON parses Claude metrics from JSON.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a connected WebSocket client.

type CompositeBackend

type CompositeBackend struct {
	// contains filtered or unexported fields
}

CompositeBackend implements Backend with write-local, read-remote pattern. - Writes always go to local SQLite for durability - Reads can query remote backends (GCP Trace, Jaeger) for historical traces - Merges results from multiple sources for comprehensive views

func NewCompositeBackend

func NewCompositeBackend(config CompositeConfig) (*CompositeBackend, error)

NewCompositeBackend creates a new composite backend.

func (*CompositeBackend) BackfillSpansWorkspace

func (b *CompositeBackend) BackfillSpansWorkspace(ctx context.Context, sessionID, workspace string) (int64, error)

func (*CompositeBackend) Close

func (b *CompositeBackend) Close() error

Close closes all backends.

func (*CompositeBackend) CountChatMessages

func (b *CompositeBackend) CountChatMessages(ctx context.Context, q ChatMessageQuery) (int, int, error)

func (*CompositeBackend) CreateAgentAssignment

func (b *CompositeBackend) CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error

func (*CompositeBackend) CreateChain

func (*CompositeBackend) CreateMessage

func (b *CompositeBackend) CreateMessage(ctx context.Context, m *Message) error

func (*CompositeBackend) CreateMetric

func (b *CompositeBackend) CreateMetric(ctx context.Context, m *Metric) error

func (*CompositeBackend) CreateSpan

func (b *CompositeBackend) CreateSpan(ctx context.Context, span *Span) error

func (*CompositeBackend) CreateSpanEvent

func (b *CompositeBackend) CreateSpanEvent(ctx context.Context, e *SpanEvent) error

func (*CompositeBackend) CreateStage

func (b *CompositeBackend) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)

func (*CompositeBackend) CreateTask

func (b *CompositeBackend) CreateTask(ctx context.Context, t *Task) error

func (*CompositeBackend) CreateWorkspace

func (b *CompositeBackend) CreateWorkspace(ctx context.Context, w *Workspace) error

func (*CompositeBackend) DeleteAgentAssignment

func (b *CompositeBackend) DeleteAgentAssignment(ctx context.Context, id string) error

func (*CompositeBackend) DeleteMessage

func (b *CompositeBackend) DeleteMessage(ctx context.Context, id string) error

func (*CompositeBackend) DeleteSpan

func (b *CompositeBackend) DeleteSpan(ctx context.Context, id string) error

func (*CompositeBackend) DeleteSpanEvent

func (b *CompositeBackend) DeleteSpanEvent(ctx context.Context, id int64) error

func (*CompositeBackend) DeleteTask

func (b *CompositeBackend) DeleteTask(ctx context.Context, id string) error

func (*CompositeBackend) DeleteWorkspace

func (b *CompositeBackend) DeleteWorkspace(ctx context.Context, id string) error

func (*CompositeBackend) FindLatestUnfinishedTool

func (b *CompositeBackend) FindLatestUnfinishedTool(ctx context.Context, sessionID, toolName string) (string, error)

func (*CompositeBackend) GetAgentAssignment

func (b *CompositeBackend) GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)

func (*CompositeBackend) GetAgentStats

func (b *CompositeBackend) GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)

func (*CompositeBackend) GetChain

func (*CompositeBackend) GetChainByGitHubIssue

func (b *CompositeBackend) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)

func (*CompositeBackend) GetChainByMessageID

func (b *CompositeBackend) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)

func (*CompositeBackend) GetChainByTaskID

func (b *CompositeBackend) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)

func (*CompositeBackend) GetChainStages

func (b *CompositeBackend) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)

func (*CompositeBackend) GetChainStats

func (b *CompositeBackend) GetChainStats(ctx context.Context) (*ChainStats, error)

func (*CompositeBackend) GetChainStatsByAgent

func (b *CompositeBackend) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)

func (*CompositeBackend) GetChainStatusCounts

func (b *CompositeBackend) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)

func (*CompositeBackend) GetChatMessagesBySession

func (b *CompositeBackend) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)

func (*CompositeBackend) GetChatMessagesByTaskID

func (b *CompositeBackend) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)

func (*CompositeBackend) GetExecTaskHierarchy

func (b *CompositeBackend) GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)

func (*CompositeBackend) GetExecTaskHierarchyWithMessages

func (b *CompositeBackend) GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)

func (*CompositeBackend) GetMessage

func (b *CompositeBackend) GetMessage(ctx context.Context, id string) (*Message, error)

func (*CompositeBackend) GetMetricsSummary

func (b *CompositeBackend) GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)

func (*CompositeBackend) GetProviderComparison

func (b *CompositeBackend) GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)

func (*CompositeBackend) GetSession

func (b *CompositeBackend) GetSession(ctx context.Context, sessionID string) (*Session, error)

func (*CompositeBackend) GetSessionMetricsSummary

func (b *CompositeBackend) GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)

func (*CompositeBackend) GetSessionTools

func (b *CompositeBackend) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)

func (*CompositeBackend) GetSessionWorkspace

func (b *CompositeBackend) GetSessionWorkspace(sessionID string) (string, error)

func (*CompositeBackend) GetSpan

func (b *CompositeBackend) GetSpan(ctx context.Context, id string) (*Span, error)

func (*CompositeBackend) GetSpanEvents

func (b *CompositeBackend) GetSpanEvents(ctx context.Context, spanID string) ([]SpanEvent, error)

func (*CompositeBackend) GetSpanHierarchy

func (b *CompositeBackend) GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)

func (*CompositeBackend) GetSpanLitesByStageID

func (b *CompositeBackend) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)

func (*CompositeBackend) GetSpansByStageID

func (b *CompositeBackend) GetSpansByStageID(ctx context.Context, stageID string) ([]*Span, error)

func (*CompositeBackend) GetStage

func (b *CompositeBackend) GetStage(ctx context.Context, id string) (*ChainStage, error)

func (*CompositeBackend) GetTask

func (b *CompositeBackend) GetTask(ctx context.Context, id string) (*Task, error)

func (*CompositeBackend) GetTaskTimeline

func (b *CompositeBackend) GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)

func (*CompositeBackend) GetToolForSpan

func (b *CompositeBackend) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)

func (*CompositeBackend) GetToolsByTimestampRange

func (b *CompositeBackend) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)

func (*CompositeBackend) GetTrace

func (b *CompositeBackend) GetTrace(ctx context.Context, traceID string) (*Trace, error)

func (*CompositeBackend) GetWorkspace

func (b *CompositeBackend) GetWorkspace(ctx context.Context, id string) (*Workspace, error)

func (*CompositeBackend) GetWorkspaceStats

func (b *CompositeBackend) GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)

func (*CompositeBackend) InsertToolStart

func (b *CompositeBackend) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error

func (*CompositeBackend) LinkOrphanedSpansBySession

func (b *CompositeBackend) LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)

LinkOrphanedSpansBySession delegates to local (session correlation is local only).

func (*CompositeBackend) LinkSpanToChain

func (b *CompositeBackend) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error

func (*CompositeBackend) ListAgentAssignments

func (b *CompositeBackend) ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)

func (*CompositeBackend) ListChains

func (b *CompositeBackend) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)

func (*CompositeBackend) ListMessages

func (b *CompositeBackend) ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)

func (*CompositeBackend) ListMetrics

func (b *CompositeBackend) ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)

func (*CompositeBackend) ListPendingApprovals

func (b *CompositeBackend) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)

func (*CompositeBackend) ListSpans

func (b *CompositeBackend) ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)

func (*CompositeBackend) ListTasks

func (b *CompositeBackend) ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)

func (*CompositeBackend) ListTraces

func (b *CompositeBackend) ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)

func (*CompositeBackend) ListWorkspaces

func (b *CompositeBackend) ListWorkspaces(ctx context.Context) ([]*Workspace, error)

func (*CompositeBackend) LookupTaskBySessionID

func (b *CompositeBackend) LookupTaskBySessionID(ctx context.Context, sessionID string) (string, string, string)

LookupTaskBySessionID delegates to local (session correlation is local only).

func (*CompositeBackend) MarkMessageArchived

func (b *CompositeBackend) MarkMessageArchived(ctx context.Context, id string) error

func (*CompositeBackend) MarkMessageRead

func (b *CompositeBackend) MarkMessageRead(ctx context.Context, id string) error

func (*CompositeBackend) RecalculateTaskAggregates

func (b *CompositeBackend) RecalculateTaskAggregates(ctx context.Context, taskID string) error

func (*CompositeBackend) UpdateAgentAssignment

func (b *CompositeBackend) UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error

func (*CompositeBackend) UpdateChainMetrics

func (b *CompositeBackend) UpdateChainMetrics(ctx context.Context, id string, cost float64, tokens, turns int) error

func (*CompositeBackend) UpdateChainStatus

func (b *CompositeBackend) UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error

func (*CompositeBackend) UpdateMessage

func (b *CompositeBackend) UpdateMessage(ctx context.Context, m *Message) error

func (*CompositeBackend) UpdateSessionEnded

func (b *CompositeBackend) UpdateSessionEnded(ctx context.Context, sessionID string) error

func (*CompositeBackend) UpdateSpan

func (b *CompositeBackend) UpdateSpan(ctx context.Context, span *Span) error
func (b *CompositeBackend) UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error

func (*CompositeBackend) UpdateStageApproval

func (b *CompositeBackend) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, approvalType ApprovalType, feedback string) error

func (*CompositeBackend) UpdateStageError

func (b *CompositeBackend) UpdateStageError(ctx context.Context, stageID, errorMessage string) error

func (*CompositeBackend) UpdateStageMetrics

func (b *CompositeBackend) UpdateStageMetrics(ctx context.Context, stageID string, cost float64, tokensIn, tokensOut, turns, toolCalls int, durationMs int64) error

func (*CompositeBackend) UpdateStageSession

func (b *CompositeBackend) UpdateStageSession(ctx context.Context, stageID, sessionID string) error

func (*CompositeBackend) UpdateStageStatus

func (b *CompositeBackend) UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error

func (*CompositeBackend) UpdateTask

func (b *CompositeBackend) UpdateTask(ctx context.Context, t *Task) error

func (*CompositeBackend) UpdateToolEnd

func (b *CompositeBackend) UpdateToolEnd(ctx context.Context, toolUseID, toolResponse string, success bool) error

func (*CompositeBackend) UpdateWorkspace

func (b *CompositeBackend) UpdateWorkspace(ctx context.Context, w *Workspace) error

func (*CompositeBackend) UpsertSession

func (b *CompositeBackend) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error

func (*CompositeBackend) UpsertSessionWithCorrelation

func (b *CompositeBackend) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, corr *SessionCorrelation) error

type CompositeConfig

type CompositeConfig struct {
	Local   Backend   // Required: local storage backend
	Remotes []Backend // Optional: remote backends for trace queries
}

CompositeConfig contains configuration for composite backend.

type ControlPlaneFilter

type ControlPlaneFilter struct {
	SourceType string // eval, coordinator, direct_api, local, other
	Provider   string // claude, gemini, openai, etc.
	Model      string // claude-sonnet-4-5, gemini-2-5-pro, etc.
	Workspace  string // workspace ID
	StartDate  string // YYYY-MM-DD format for time range filter (inclusive)
	EndDate    string // YYYY-MM-DD format for time range filter (inclusive)
}

ControlPlaneFilter defines filter parameters for Control Plane queries

func (*ControlPlaneFilter) HasTimeRange

func (f *ControlPlaneFilter) HasTimeRange() bool

HasTimeRange returns true if time range filter is set

func (*ControlPlaneFilter) IsEmpty

func (f *ControlPlaneFilter) IsEmpty() bool

IsEmpty returns true if no filters are set

type CumulativePoint

type CumulativePoint struct {
	SpanIndex    int       `json:"span_index"`
	SpanName     string    `json:"span_name"`
	Timestamp    time.Time `json:"timestamp"`
	Value        float64   `json:"value"`         // Value of this span
	Cumulative   float64   `json:"cumulative"`    // Running total up to this span
	DeltaPercent float64   `json:"delta_percent"` // This span's contribution as % of final total
}

CumulativePoint represents a point in the cumulative progression.

type EvalAssessment

type EvalAssessment struct {
	// Identity
	BenchmarkID string `json:"benchmark_id"`
	Model       string `json:"model"`
	Language    string `json:"language"`
	Condition   string `json:"condition,omitempty"`
	EvalMode    string `json:"eval_mode"`          // "agent" or "standard"
	Executor    string `json:"executor,omitempty"` // "claude", "gemini"
	Seed        int64  `json:"seed"`

	// Assessment results
	CompileOk     bool   `json:"compile_ok"`
	RuntimeOk     bool   `json:"runtime_ok"`
	StdoutOk      bool   `json:"stdout_ok"`
	ErrorCategory string `json:"error_category"`

	// Self-repair
	FirstAttemptOk bool   `json:"first_attempt_ok"`
	RepairUsed     bool   `json:"repair_used"`
	RepairOk       bool   `json:"repair_ok"`
	ErrCode        string `json:"err_code,omitempty"`

	// Contract verification
	VerifyOk        bool `json:"verify_ok"`
	VerifyVerified  int  `json:"verify_verified"`
	VerifyCounterex int  `json:"verify_counterexample"`
	VerifySkipped   int  `json:"verify_skipped"`
	VerifyErrors    int  `json:"verify_errors"`

	// Reproducibility
	PromptVersion string `json:"prompt_version,omitempty"`
	CodeHash      string `json:"code_hash,omitempty"`

	// Output (truncated for storage efficiency)
	Code           string `json:"code,omitempty"`
	Stdout         string `json:"stdout,omitempty"`
	ExpectedStdout string `json:"expected_stdout,omitempty"`
	Stderr         string `json:"stderr,omitempty"`
}

EvalAssessment stores structured evaluation results for agent benchmark stages. Stored as JSON in chain_stages.eval_assessment column (M-EVAL-CHAINS).

type EvalQueryOptions

type EvalQueryOptions struct {
	ChainID     string // Filter by chain
	Model       string // Filter by model
	Language    string // Filter by language
	BenchmarkID string // Filter by benchmark
	Condition   string // Filter by experimental condition
	EvalMode    string // "standard" or "agent"
	SuccessOnly bool   // Only passing benchmarks (stdout_ok = true)
	FailureOnly bool   // Only failing benchmarks (stdout_ok = false)
	Limit       int
}

EvalQueryOptions specifies filters for querying eval assessment stages.

type Event

type Event struct {
	Type      WSEventType `json:"type"`
	Timestamp time.Time   `json:"timestamp"`
	Data      any         `json:"data"`
}

Event represents a real-time event for WebSocket broadcast.

type EventType

type EventType string

EventType represents the type of span event.

const (
	EventTypeApproval EventType = "approval"
	EventTypeTool     EventType = "tool"
	EventTypeError    EventType = "error"
	EventTypeCustom   EventType = "custom"
)

type ExecHierarchyWithMessages

type ExecHierarchyWithMessages struct {
	Messages []*MessageNode  `json:"messages,omitempty"` // Messages that triggered execs
	Orphan   []*ExecTaskNode `json:"orphan,omitempty"`   // Execs without a triggering message
	Count    int             `json:"count"`
}

ExecHierarchyWithMessages groups exec tasks by their triggering messages

type ExecTaskNode

type ExecTaskNode struct {
	TaskID       string          `json:"task_id"`
	ParentTaskID string          `json:"parent_task_id"`
	Command      string          `json:"command"`   // exec, run, check, turn, tool_use
	Provider     string          `json:"provider"`  // for exec: claude, gemini, etc.
	Workspace    string          `json:"workspace"` // for exec
	FilePath     string          `json:"file_path"` // for run, check
	Status       string          `json:"status"`
	StartTime    *time.Time      `json:"start_time,omitempty"`
	DurationMs   int             `json:"duration_ms,omitempty"`
	Children     []*ExecTaskNode `json:"children,omitempty"`
	// Turn/tool specific fields
	TurnNumber  int    `json:"turn_number,omitempty"`  // for turn spans
	ToolName    string `json:"tool_name,omitempty"`    // for tool_use spans
	ToolInput   string `json:"tool_input,omitempty"`   // for tool_use spans
	ToolOutput  string `json:"tool_output,omitempty"`  // for tool_use spans
	DisplayName string `json:"display_name,omitempty"` // enriched name from session_tools (e.g., "Read: /path/file.go")
}

ExecTaskNode represents a node in the exec task hierarchy

type ExecutionChain

type ExecutionChain struct {
	ID string `json:"id"` // UUID

	// Source (what triggered this chain)
	SourceType        ChainSourceType `json:"source_type"`
	SourceRef         string          `json:"source_ref,omitempty"`
	GitHubRepo        string          `json:"github_repo,omitempty"`
	GitHubIssueNumber int             `json:"github_issue_number,omitempty"`

	// Current state
	Status       ChainStatus `json:"status"`
	CurrentStage int         `json:"current_stage"`

	// Workspace context
	WorkspaceID   string `json:"workspace_id,omitempty"`
	WorkspacePath string `json:"workspace_path,omitempty"`

	// Timestamps
	CreatedAt   time.Time  `json:"created_at"`
	UpdatedAt   *time.Time `json:"updated_at,omitempty"`
	CompletedAt *time.Time `json:"completed_at,omitempty"`

	// Summary metrics (denormalized for quick queries)
	TotalCost       float64 `json:"total_cost"`
	TotalTokens     int     `json:"total_tokens"`
	TotalTurns      int     `json:"total_turns"`
	StagesCompleted int     `json:"stages_completed"`

	// Stages (populated on read)
	Stages []*ChainStage `json:"stages,omitempty"`
}

ExecutionChain represents a complete execution flow from source to completion. This is the top-level hierarchy: Issue -> Message -> Task -> Session -> Traces

func (*ExecutionChain) ChainEnvironmentVars

func (c *ExecutionChain) ChainEnvironmentVars(stageID, taskID, messageID string) map[string]string

ChainEnvironmentVars returns the environment variables to pass to executors for write-time linking of spans to this chain/stage.

type FilterPattern

type FilterPattern struct {
	Type    string // "prefix", "exact", "suffix"
	Pattern string // The pattern to match against span name
	Service string // Optional: only match spans from this service (service.name)
}

FilterPattern defines a single span filter rule.

type GCPConfig

type GCPConfig struct {
	ProjectID       string
	CredentialsPath string        // optional - uses default credentials if empty
	CacheTTL        time.Duration // Cache TTL for trace results (default: 60s)
}

GCPConfig contains configuration for GCP Trace backend.

type GCPTraceBackend

type GCPTraceBackend struct {
	// contains filtered or unexported fields
}

GCPTraceBackend implements Backend using Google Cloud Trace. This is a read-only backend for querying traces stored in GCP. Results are cached for CacheTTL duration to avoid quota exhaustion.

func NewGCPTraceBackend

func NewGCPTraceBackend(config GCPConfig) (*GCPTraceBackend, error)

NewGCPTraceBackend creates a new GCP Trace backend.

func (*GCPTraceBackend) BackfillSpansWorkspace

func (b *GCPTraceBackend) BackfillSpansWorkspace(ctx context.Context, sessionID, workspace string) (int64, error)

func (*GCPTraceBackend) Close

func (b *GCPTraceBackend) Close() error

Close closes the GCP client.

func (*GCPTraceBackend) CountChatMessages

func (b *GCPTraceBackend) CountChatMessages(ctx context.Context, q ChatMessageQuery) (int, int, error)

func (*GCPTraceBackend) CreateAgentAssignment

func (b *GCPTraceBackend) CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error

func (*GCPTraceBackend) CreateChain

func (b *GCPTraceBackend) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)

func (*GCPTraceBackend) CreateMessage

func (b *GCPTraceBackend) CreateMessage(ctx context.Context, m *Message) error

func (*GCPTraceBackend) CreateMetric

func (b *GCPTraceBackend) CreateMetric(ctx context.Context, m *Metric) error

func (*GCPTraceBackend) CreateSpan

func (b *GCPTraceBackend) CreateSpan(ctx context.Context, span *Span) error

func (*GCPTraceBackend) CreateSpanEvent

func (b *GCPTraceBackend) CreateSpanEvent(ctx context.Context, e *SpanEvent) error

func (*GCPTraceBackend) CreateStage

func (b *GCPTraceBackend) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)

func (*GCPTraceBackend) CreateTask

func (b *GCPTraceBackend) CreateTask(ctx context.Context, t *Task) error

func (*GCPTraceBackend) CreateWorkspace

func (b *GCPTraceBackend) CreateWorkspace(ctx context.Context, w *Workspace) error

func (*GCPTraceBackend) DeleteAgentAssignment

func (b *GCPTraceBackend) DeleteAgentAssignment(ctx context.Context, id string) error

func (*GCPTraceBackend) DeleteMessage

func (b *GCPTraceBackend) DeleteMessage(ctx context.Context, id string) error

func (*GCPTraceBackend) DeleteSpan

func (b *GCPTraceBackend) DeleteSpan(ctx context.Context, id string) error

func (*GCPTraceBackend) DeleteSpanEvent

func (b *GCPTraceBackend) DeleteSpanEvent(ctx context.Context, id int64) error

func (*GCPTraceBackend) DeleteTask

func (b *GCPTraceBackend) DeleteTask(ctx context.Context, id string) error

func (*GCPTraceBackend) DeleteWorkspace

func (b *GCPTraceBackend) DeleteWorkspace(ctx context.Context, id string) error

func (*GCPTraceBackend) FindLatestUnfinishedTool

func (b *GCPTraceBackend) FindLatestUnfinishedTool(ctx context.Context, sessionID, toolName string) (string, error)

func (*GCPTraceBackend) GetAgentAssignment

func (b *GCPTraceBackend) GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)

func (*GCPTraceBackend) GetAgentStats

func (b *GCPTraceBackend) GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)

func (*GCPTraceBackend) GetChain

func (*GCPTraceBackend) GetChainByGitHubIssue

func (b *GCPTraceBackend) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)

func (*GCPTraceBackend) GetChainByMessageID

func (b *GCPTraceBackend) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)

func (*GCPTraceBackend) GetChainByTaskID

func (b *GCPTraceBackend) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)

func (*GCPTraceBackend) GetChainStages

func (b *GCPTraceBackend) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)

func (*GCPTraceBackend) GetChainStats

func (b *GCPTraceBackend) GetChainStats(ctx context.Context) (*ChainStats, error)

func (*GCPTraceBackend) GetChainStatsByAgent

func (b *GCPTraceBackend) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)

func (*GCPTraceBackend) GetChainStatusCounts

func (b *GCPTraceBackend) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)

func (*GCPTraceBackend) GetChatMessagesBySession

func (b *GCPTraceBackend) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)

func (*GCPTraceBackend) GetChatMessagesByTaskID

func (b *GCPTraceBackend) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)

func (*GCPTraceBackend) GetExecTaskHierarchy

func (b *GCPTraceBackend) GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)

func (*GCPTraceBackend) GetExecTaskHierarchyWithMessages

func (b *GCPTraceBackend) GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)

func (*GCPTraceBackend) GetMessage

func (b *GCPTraceBackend) GetMessage(ctx context.Context, id string) (*Message, error)

func (*GCPTraceBackend) GetMetricsSummary

func (b *GCPTraceBackend) GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)

func (*GCPTraceBackend) GetProviderComparison

func (b *GCPTraceBackend) GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)

func (*GCPTraceBackend) GetSession

func (b *GCPTraceBackend) GetSession(ctx context.Context, sessionID string) (*Session, error)

func (*GCPTraceBackend) GetSessionMetricsSummary

func (b *GCPTraceBackend) GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)

func (*GCPTraceBackend) GetSessionTools

func (b *GCPTraceBackend) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)

func (*GCPTraceBackend) GetSessionWorkspace

func (b *GCPTraceBackend) GetSessionWorkspace(sessionID string) (string, error)

func (*GCPTraceBackend) GetSpan

func (b *GCPTraceBackend) GetSpan(ctx context.Context, id string) (*Span, error)

func (*GCPTraceBackend) GetSpanEvents

func (b *GCPTraceBackend) GetSpanEvents(ctx context.Context, spanID string) ([]SpanEvent, error)

func (*GCPTraceBackend) GetSpanHierarchy

func (b *GCPTraceBackend) GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)

func (*GCPTraceBackend) GetSpanLitesByStageID

func (b *GCPTraceBackend) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)

func (*GCPTraceBackend) GetSpansByStageID

func (b *GCPTraceBackend) GetSpansByStageID(ctx context.Context, stageID string) ([]*Span, error)

func (*GCPTraceBackend) GetStage

func (b *GCPTraceBackend) GetStage(ctx context.Context, id string) (*ChainStage, error)

func (*GCPTraceBackend) GetTask

func (b *GCPTraceBackend) GetTask(ctx context.Context, id string) (*Task, error)

func (*GCPTraceBackend) GetTaskTimeline

func (b *GCPTraceBackend) GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)

func (*GCPTraceBackend) GetToolForSpan

func (b *GCPTraceBackend) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)

func (*GCPTraceBackend) GetToolsByTimestampRange

func (b *GCPTraceBackend) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)

func (*GCPTraceBackend) GetTrace

func (b *GCPTraceBackend) GetTrace(ctx context.Context, traceID string) (*Trace, error)

func (*GCPTraceBackend) GetWorkspace

func (b *GCPTraceBackend) GetWorkspace(ctx context.Context, id string) (*Workspace, error)

func (*GCPTraceBackend) GetWorkspaceStats

func (b *GCPTraceBackend) GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)

func (*GCPTraceBackend) InsertToolStart

func (b *GCPTraceBackend) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error

func (*GCPTraceBackend) LinkOrphanedSpansBySession

func (b *GCPTraceBackend) LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)

LinkOrphanedSpansBySession is not supported by GCP backend (session correlation is local only).

func (*GCPTraceBackend) LinkSpanToChain

func (b *GCPTraceBackend) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error

func (*GCPTraceBackend) ListAgentAssignments

func (b *GCPTraceBackend) ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)

func (*GCPTraceBackend) ListChains

func (b *GCPTraceBackend) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)

func (*GCPTraceBackend) ListMessages

func (b *GCPTraceBackend) ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)

func (*GCPTraceBackend) ListMetrics

func (b *GCPTraceBackend) ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)

func (*GCPTraceBackend) ListPendingApprovals

func (b *GCPTraceBackend) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)

func (*GCPTraceBackend) ListSpans

func (b *GCPTraceBackend) ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)

func (*GCPTraceBackend) ListTasks

func (b *GCPTraceBackend) ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)

func (*GCPTraceBackend) ListTraces

func (b *GCPTraceBackend) ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)

func (*GCPTraceBackend) ListWorkspaces

func (b *GCPTraceBackend) ListWorkspaces(ctx context.Context) ([]*Workspace, error)

func (*GCPTraceBackend) LookupTaskBySessionID

func (b *GCPTraceBackend) LookupTaskBySessionID(ctx context.Context, sessionID string) (string, string, string)

LookupTaskBySessionID is not supported by GCP backend (session correlation is local only).

func (*GCPTraceBackend) MarkMessageArchived

func (b *GCPTraceBackend) MarkMessageArchived(ctx context.Context, id string) error

func (*GCPTraceBackend) MarkMessageRead

func (b *GCPTraceBackend) MarkMessageRead(ctx context.Context, id string) error

func (*GCPTraceBackend) RecalculateTaskAggregates

func (b *GCPTraceBackend) RecalculateTaskAggregates(ctx context.Context, taskID string) error

func (*GCPTraceBackend) UpdateAgentAssignment

func (b *GCPTraceBackend) UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error

func (*GCPTraceBackend) UpdateChainMetrics

func (b *GCPTraceBackend) UpdateChainMetrics(ctx context.Context, id string, cost float64, tokens, turns int) error

func (*GCPTraceBackend) UpdateChainStatus

func (b *GCPTraceBackend) UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error

func (*GCPTraceBackend) UpdateMessage

func (b *GCPTraceBackend) UpdateMessage(ctx context.Context, m *Message) error

func (*GCPTraceBackend) UpdateSessionEnded

func (b *GCPTraceBackend) UpdateSessionEnded(ctx context.Context, sessionID string) error

func (*GCPTraceBackend) UpdateSpan

func (b *GCPTraceBackend) UpdateSpan(ctx context.Context, span *Span) error
func (b *GCPTraceBackend) UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error

func (*GCPTraceBackend) UpdateStageApproval

func (b *GCPTraceBackend) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, approvalType ApprovalType, feedback string) error

func (*GCPTraceBackend) UpdateStageError

func (b *GCPTraceBackend) UpdateStageError(ctx context.Context, stageID, errorMessage string) error

func (*GCPTraceBackend) UpdateStageMetrics

func (b *GCPTraceBackend) UpdateStageMetrics(ctx context.Context, stageID string, cost float64, tokensIn, tokensOut, turns, toolCalls int, durationMs int64) error

func (*GCPTraceBackend) UpdateStageSession

func (b *GCPTraceBackend) UpdateStageSession(ctx context.Context, stageID, sessionID string) error

func (*GCPTraceBackend) UpdateStageStatus

func (b *GCPTraceBackend) UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error

func (*GCPTraceBackend) UpdateTask

func (b *GCPTraceBackend) UpdateTask(ctx context.Context, t *Task) error

func (*GCPTraceBackend) UpdateToolEnd

func (b *GCPTraceBackend) UpdateToolEnd(ctx context.Context, toolUseID, toolResponse string, success bool) error

func (*GCPTraceBackend) UpdateWorkspace

func (b *GCPTraceBackend) UpdateWorkspace(ctx context.Context, w *Workspace) error

func (*GCPTraceBackend) UpsertSession

func (b *GCPTraceBackend) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error

func (*GCPTraceBackend) UpsertSessionWithCorrelation

func (b *GCPTraceBackend) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, corr *SessionCorrelation) error

type GeminiOTELSpan

type GeminiOTELSpan struct {
	TraceID           string        `json:"traceId"`
	SpanID            string        `json:"spanId"`
	ParentSpanID      string        `json:"parentSpanId,omitempty"`
	Name              string        `json:"name"`
	Kind              int           `json:"kind"` // OTEL span kind enum
	StartTimeUnixNano int64         `json:"startTimeUnixNano"`
	EndTimeUnixNano   int64         `json:"endTimeUnixNano"`
	Attributes        []KeyValue    `json:"attributes,omitempty"`
	Status            *OTELStatus   `json:"status,omitempty"`
	Events            []OTELEvent   `json:"events,omitempty"`
	Resource          *OTELResource `json:"resource,omitempty"`
}

GeminiOTELSpan represents an OTEL span from Gemini CLI. Gemini exports full OTEL traces with parent-child relationships.

func ParseGeminiTraceJSON

func ParseGeminiTraceJSON(data []byte) ([]*GeminiOTELSpan, error)

ParseGeminiTraceJSON parses Gemini OTEL trace from JSON.

type HealthAction

type HealthAction string

HealthAction describes what action to take based on DB size.

const (
	HealthOK      HealthAction = "ok"
	HealthWarn    HealthAction = "warn"
	HealthCleanup HealthAction = "cleanup"
	HealthDanger  HealthAction = "danger"
)

type HeatmapDataPoint

type HeatmapDataPoint struct {
	Date        string  `json:"date"`         // YYYY-MM-DD
	SpanCount   int     `json:"span_count"`   // Number of spans
	TaskCount   int     `json:"task_count"`   // Number of distinct tasks
	Cost        float64 `json:"cost"`         // Total cost USD
	SuccessRate float64 `json:"success_rate"` // 0.0 to 1.0
	TokensIn    int64   `json:"tokens_in"`
	TokensOut   int64   `json:"tokens_out"`
}

HeatmapDataPoint represents activity data for a single day

type HierarchicalSpan

type HierarchicalSpan struct {
	ID           string              `json:"id"`
	TraceID      string              `json:"trace_id"`
	ParentSpanID string              `json:"parent_span_id,omitempty"`
	Name         string              `json:"name"`
	DisplayName  string              `json:"display_name,omitempty"`
	StartTime    time.Time           `json:"start_time"`
	EndTime      *time.Time          `json:"end_time,omitempty"`
	DurationMs   int64               `json:"duration_ms"`
	Status       SpanStatus          `json:"status"`
	Attributes   map[string]any      `json:"attributes,omitempty"`
	Provider     Provider            `json:"provider,omitempty"`
	Model        string              `json:"model,omitempty"`
	TokensIn     int64               `json:"tokens_in,omitempty"`
	TokensOut    int64               `json:"tokens_out,omitempty"`
	CostUSD      float64             `json:"cost_usd,omitempty"`
	TaskID       string              `json:"task_id,omitempty"`
	ChatContext  *ChatContext        `json:"chat_context,omitempty"`
	Children     []*HierarchicalSpan `json:"children,omitempty"`
}

HierarchicalSpan represents a span with children for hierarchical JSON response. This type is used by the /spans/enriched?hierarchical=true endpoint.

type HierarchyOptions

type HierarchyOptions struct {
	// MaxDepth limits the span tree depth (0 = unlimited)
	MaxDepth int
	// IncludeSpans controls whether to include individual spans
	IncludeSpans bool
	// Workspace filters spans to only include those belonging to this workspace path
	// This prevents cross-workspace span bleeding in hierarchy views
	Workspace string
	// WorkspaceID filters spans by workspace_id directly (set automatically from task)
	WorkspaceID string
}

HierarchyOptions configures the hierarchy query.

func DefaultHierarchyOptions

func DefaultHierarchyOptions() HierarchyOptions

DefaultHierarchyOptions returns default options for hierarchy queries.

type HierarchyTraceSummary

type HierarchyTraceSummary struct {
	SpanCount    int     `json:"span_count"`
	TotalTokens  int64   `json:"total_tokens"`
	TotalCostUSD float64 `json:"total_cost_usd"`
	DurationMs   int64   `json:"duration_ms"`
	ErrorCount   int     `json:"error_count"`
}

HierarchyTraceSummary contains aggregate metrics for a trace in hierarchy view. This differs from TraceSummary in models.go which is for trace list views.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub maintains the set of active clients and broadcasts events.

func NewHub

func NewHub() *Hub

NewHub creates a new WebSocket hub.

func (*Hub) Broadcast

func (h *Hub) Broadcast(event *Event)

Broadcast sends an event to all matching clients.

func (*Hub) BroadcastApprovalDecision

func (h *Hub) BroadcastApprovalDecision(data any)

BroadcastApprovalDecision sends an approval.decision event.

func (*Hub) BroadcastApprovalRequested

func (h *Hub) BroadcastApprovalRequested(data any)

BroadcastApprovalRequested sends an approval.requested event.

func (*Hub) BroadcastMessageCreated

func (h *Hub) BroadcastMessageCreated(msg *Message)

BroadcastMessageCreated sends a message.created event.

func (*Hub) BroadcastMetricsUpdated

func (h *Hub) BroadcastMetricsUpdated(summary *MetricsSummary)

BroadcastMetricsUpdated sends a metrics.updated event.

func (*Hub) BroadcastSpanCreated

func (h *Hub) BroadcastSpanCreated(span *Span)

BroadcastSpanCreated sends a span.created event.

func (*Hub) BroadcastSpanUpdated

func (h *Hub) BroadcastSpanUpdated(span *Span)

BroadcastSpanUpdated sends a span.updated event.

func (*Hub) BroadcastTaskCompleted

func (h *Hub) BroadcastTaskCompleted(task *Task)

BroadcastTaskCompleted sends a task.completed event.

func (*Hub) BroadcastTaskCreated

func (h *Hub) BroadcastTaskCreated(task *Task)

BroadcastTaskCreated sends a task.created event.

func (*Hub) BroadcastTaskUpdated

func (h *Hub) BroadcastTaskUpdated(task *Task)

BroadcastTaskUpdated sends a task.updated event.

func (*Hub) ClientCount

func (h *Hub) ClientCount() int

ClientCount returns the number of connected clients.

func (*Hub) HandleWebSocket

func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request)

HandleWebSocket handles a WebSocket connection upgrade request.

func (*Hub) Run

func (h *Hub) Run()

Run starts the hub's main event loop.

func (*Hub) SetToken

func (h *Hub) SetToken(token string)

SetToken sets the authentication token for external WebSocket clients.

func (*Hub) Stop

func (h *Hub) Stop()

Stop gracefully shuts down the hub.

type JaegerBackend

type JaegerBackend struct {
	// contains filtered or unexported fields
}

JaegerBackend implements Backend using Jaeger for trace storage. This is a read-only backend for querying traces from Jaeger.

func NewJaegerBackend

func NewJaegerBackend(config JaegerConfig) (*JaegerBackend, error)

NewJaegerBackend creates a new Jaeger backend.

func (*JaegerBackend) BackfillSpansWorkspace

func (b *JaegerBackend) BackfillSpansWorkspace(ctx context.Context, sessionID, workspace string) (int64, error)

func (*JaegerBackend) Close

func (b *JaegerBackend) Close() error

Close closes the Jaeger client.

func (*JaegerBackend) CountChatMessages

func (b *JaegerBackend) CountChatMessages(ctx context.Context, q ChatMessageQuery) (int, int, error)

func (*JaegerBackend) CreateAgentAssignment

func (b *JaegerBackend) CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error

func (*JaegerBackend) CreateChain

func (b *JaegerBackend) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)

func (*JaegerBackend) CreateMessage

func (b *JaegerBackend) CreateMessage(ctx context.Context, m *Message) error

func (*JaegerBackend) CreateMetric

func (b *JaegerBackend) CreateMetric(ctx context.Context, m *Metric) error

func (*JaegerBackend) CreateSpan

func (b *JaegerBackend) CreateSpan(ctx context.Context, span *Span) error

func (*JaegerBackend) CreateSpanEvent

func (b *JaegerBackend) CreateSpanEvent(ctx context.Context, e *SpanEvent) error

func (*JaegerBackend) CreateStage

func (b *JaegerBackend) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)

func (*JaegerBackend) CreateTask

func (b *JaegerBackend) CreateTask(ctx context.Context, t *Task) error

func (*JaegerBackend) CreateWorkspace

func (b *JaegerBackend) CreateWorkspace(ctx context.Context, w *Workspace) error

func (*JaegerBackend) DeleteAgentAssignment

func (b *JaegerBackend) DeleteAgentAssignment(ctx context.Context, id string) error

func (*JaegerBackend) DeleteMessage

func (b *JaegerBackend) DeleteMessage(ctx context.Context, id string) error

func (*JaegerBackend) DeleteSpan

func (b *JaegerBackend) DeleteSpan(ctx context.Context, id string) error

func (*JaegerBackend) DeleteSpanEvent

func (b *JaegerBackend) DeleteSpanEvent(ctx context.Context, id int64) error

func (*JaegerBackend) DeleteTask

func (b *JaegerBackend) DeleteTask(ctx context.Context, id string) error

func (*JaegerBackend) DeleteWorkspace

func (b *JaegerBackend) DeleteWorkspace(ctx context.Context, id string) error

func (*JaegerBackend) FindLatestUnfinishedTool

func (b *JaegerBackend) FindLatestUnfinishedTool(ctx context.Context, sessionID, toolName string) (string, error)

func (*JaegerBackend) GetAgentAssignment

func (b *JaegerBackend) GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)

func (*JaegerBackend) GetAgentStats

func (b *JaegerBackend) GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)

func (*JaegerBackend) GetChain

func (b *JaegerBackend) GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)

func (*JaegerBackend) GetChainByGitHubIssue

func (b *JaegerBackend) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)

func (*JaegerBackend) GetChainByMessageID

func (b *JaegerBackend) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)

func (*JaegerBackend) GetChainByTaskID

func (b *JaegerBackend) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)

func (*JaegerBackend) GetChainStages

func (b *JaegerBackend) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)

func (*JaegerBackend) GetChainStats

func (b *JaegerBackend) GetChainStats(ctx context.Context) (*ChainStats, error)

func (*JaegerBackend) GetChainStatsByAgent

func (b *JaegerBackend) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)

func (*JaegerBackend) GetChainStatusCounts

func (b *JaegerBackend) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)

func (*JaegerBackend) GetChatMessagesBySession

func (b *JaegerBackend) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)

func (*JaegerBackend) GetChatMessagesByTaskID

func (b *JaegerBackend) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)

func (*JaegerBackend) GetExecTaskHierarchy

func (b *JaegerBackend) GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)

func (*JaegerBackend) GetExecTaskHierarchyWithMessages

func (b *JaegerBackend) GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)

func (*JaegerBackend) GetMessage

func (b *JaegerBackend) GetMessage(ctx context.Context, id string) (*Message, error)

func (*JaegerBackend) GetMetricsSummary

func (b *JaegerBackend) GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)

func (*JaegerBackend) GetProviderComparison

func (b *JaegerBackend) GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)

func (*JaegerBackend) GetSession

func (b *JaegerBackend) GetSession(ctx context.Context, sessionID string) (*Session, error)

func (*JaegerBackend) GetSessionMetricsSummary

func (b *JaegerBackend) GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)

func (*JaegerBackend) GetSessionTools

func (b *JaegerBackend) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)

func (*JaegerBackend) GetSessionWorkspace

func (b *JaegerBackend) GetSessionWorkspace(sessionID string) (string, error)

func (*JaegerBackend) GetSpan

func (b *JaegerBackend) GetSpan(ctx context.Context, id string) (*Span, error)

func (*JaegerBackend) GetSpanEvents

func (b *JaegerBackend) GetSpanEvents(ctx context.Context, spanID string) ([]SpanEvent, error)

func (*JaegerBackend) GetSpanHierarchy

func (b *JaegerBackend) GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)

func (*JaegerBackend) GetSpanLitesByStageID

func (b *JaegerBackend) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)

func (*JaegerBackend) GetSpansByStageID

func (b *JaegerBackend) GetSpansByStageID(ctx context.Context, stageID string) ([]*Span, error)

func (*JaegerBackend) GetStage

func (b *JaegerBackend) GetStage(ctx context.Context, id string) (*ChainStage, error)

func (*JaegerBackend) GetTask

func (b *JaegerBackend) GetTask(ctx context.Context, id string) (*Task, error)

func (*JaegerBackend) GetTaskTimeline

func (b *JaegerBackend) GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)

func (*JaegerBackend) GetToolForSpan

func (b *JaegerBackend) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)

func (*JaegerBackend) GetToolsByTimestampRange

func (b *JaegerBackend) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)

func (*JaegerBackend) GetTrace

func (b *JaegerBackend) GetTrace(ctx context.Context, traceID string) (*Trace, error)

func (*JaegerBackend) GetWorkspace

func (b *JaegerBackend) GetWorkspace(ctx context.Context, id string) (*Workspace, error)

func (*JaegerBackend) GetWorkspaceStats

func (b *JaegerBackend) GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)

func (*JaegerBackend) InsertToolStart

func (b *JaegerBackend) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error

func (*JaegerBackend) LinkOrphanedSpansBySession

func (b *JaegerBackend) LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)

LinkOrphanedSpansBySession is not supported by Jaeger backend (session correlation is local only).

func (*JaegerBackend) LinkSpanToChain

func (b *JaegerBackend) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error

func (*JaegerBackend) ListAgentAssignments

func (b *JaegerBackend) ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)

func (*JaegerBackend) ListChains

func (b *JaegerBackend) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)

func (*JaegerBackend) ListMessages

func (b *JaegerBackend) ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)

func (*JaegerBackend) ListMetrics

func (b *JaegerBackend) ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)

func (*JaegerBackend) ListPendingApprovals

func (b *JaegerBackend) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)

func (*JaegerBackend) ListSpans

func (b *JaegerBackend) ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)

func (*JaegerBackend) ListTasks

func (b *JaegerBackend) ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)

func (*JaegerBackend) ListTraces

func (b *JaegerBackend) ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)

func (*JaegerBackend) ListWorkspaces

func (b *JaegerBackend) ListWorkspaces(ctx context.Context) ([]*Workspace, error)

func (*JaegerBackend) LookupTaskBySessionID

func (b *JaegerBackend) LookupTaskBySessionID(ctx context.Context, sessionID string) (string, string, string)

LookupTaskBySessionID is not supported by Jaeger backend (session correlation is local only).

func (*JaegerBackend) MarkMessageArchived

func (b *JaegerBackend) MarkMessageArchived(ctx context.Context, id string) error

func (*JaegerBackend) MarkMessageRead

func (b *JaegerBackend) MarkMessageRead(ctx context.Context, id string) error

func (*JaegerBackend) RecalculateTaskAggregates

func (b *JaegerBackend) RecalculateTaskAggregates(ctx context.Context, taskID string) error

func (*JaegerBackend) UpdateAgentAssignment

func (b *JaegerBackend) UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error

func (*JaegerBackend) UpdateChainMetrics

func (b *JaegerBackend) UpdateChainMetrics(ctx context.Context, id string, cost float64, tokens, turns int) error

func (*JaegerBackend) UpdateChainStatus

func (b *JaegerBackend) UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error

func (*JaegerBackend) UpdateMessage

func (b *JaegerBackend) UpdateMessage(ctx context.Context, m *Message) error

func (*JaegerBackend) UpdateSessionEnded

func (b *JaegerBackend) UpdateSessionEnded(ctx context.Context, sessionID string) error

func (*JaegerBackend) UpdateSpan

func (b *JaegerBackend) UpdateSpan(ctx context.Context, span *Span) error
func (b *JaegerBackend) UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error

func (*JaegerBackend) UpdateStageApproval

func (b *JaegerBackend) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, approvalType ApprovalType, feedback string) error

func (*JaegerBackend) UpdateStageError

func (b *JaegerBackend) UpdateStageError(ctx context.Context, stageID, errorMessage string) error

func (*JaegerBackend) UpdateStageMetrics

func (b *JaegerBackend) UpdateStageMetrics(ctx context.Context, stageID string, cost float64, tokensIn, tokensOut, turns, toolCalls int, durationMs int64) error

func (*JaegerBackend) UpdateStageSession

func (b *JaegerBackend) UpdateStageSession(ctx context.Context, stageID, sessionID string) error

func (*JaegerBackend) UpdateStageStatus

func (b *JaegerBackend) UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error

func (*JaegerBackend) UpdateTask

func (b *JaegerBackend) UpdateTask(ctx context.Context, t *Task) error

func (*JaegerBackend) UpdateToolEnd

func (b *JaegerBackend) UpdateToolEnd(ctx context.Context, toolUseID, toolResponse string, success bool) error

func (*JaegerBackend) UpdateWorkspace

func (b *JaegerBackend) UpdateWorkspace(ctx context.Context, w *Workspace) error

func (*JaegerBackend) UpsertSession

func (b *JaegerBackend) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error

func (*JaegerBackend) UpsertSessionWithCorrelation

func (b *JaegerBackend) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, corr *SessionCorrelation) error

type JaegerConfig

type JaegerConfig struct {
	Endpoint string // Jaeger query endpoint (e.g., http://localhost:16686)
	Username string // Optional basic auth
	Password string
}

JaegerConfig contains configuration for Jaeger backend.

type JourneyResponse

type JourneyResponse struct {
	ChainID string        `json:"chain_id"`
	Steps   []JourneyStep `json:"steps"`
	Summary string        `json:"summary"` // "Design doc approved -> Sprint planned -> Implementation in progress"
}

JourneyResponse is the pre-computed narrative for a chain's execution flow.

type JourneyStep

type JourneyStep struct {
	StageNumber    int     `json:"stage_number"`
	AgentID        string  `json:"agent_id"`
	Action         string  `json:"action"`                    // "Created design doc", "Planned sprint", etc.
	Status         string  `json:"status"`                    // completed, failed, running, etc.
	ApprovalStatus string  `json:"approval_status,omitempty"` // approved, rejected, pending
	Iteration      int     `json:"iteration"`
	Feedback       string  `json:"feedback,omitempty"`      // Truncated human feedback
	ErrorExcerpt   string  `json:"error_excerpt,omitempty"` // First line of error
	Cost           float64 `json:"cost"`
	DurationMs     int64   `json:"duration_ms"`
}

JourneyStep describes one step in the execution journey narrative.

type KeyValue

type KeyValue struct {
	Key   string     `json:"key"`
	Value ValueUnion `json:"value"`
}

KeyValue represents an OTEL attribute.

type Message

type Message struct {
	ID          string        `json:"id"`
	TaskID      string        `json:"task_id,omitempty"`
	Inbox       string        `json:"inbox"`
	FromAgent   string        `json:"from_agent"`
	Title       string        `json:"title"`
	Content     string        `json:"content"`
	MessageType string        `json:"message_type"`
	Status      MessageStatus `json:"status"`
	Priority    string        `json:"priority"`

	// GitHub sync
	GitHubIssueNumber int    `json:"github_issue_number,omitempty"`
	GitHubRepo        string `json:"github_repo,omitempty"`

	// Correlation
	CorrelationID string `json:"correlation_id,omitempty"`
	ReplyToID     string `json:"reply_to_id,omitempty"`

	CreatedAt  time.Time  `json:"created_at"`
	ReadAt     *time.Time `json:"read_at,omitempty"`
	ArchivedAt *time.Time `json:"archived_at,omitempty"`

	// Search
	ContentHash string `json:"content_hash,omitempty"`
}

Message represents an agent-to-agent message.

type MessageListOptions

type MessageListOptions struct {
	Inbox     string
	Status    MessageStatus
	TaskID    string
	FromAgent string
	Limit     int
	Offset    int
}

MessageListOptions configures message listing.

type MessageNode

type MessageNode struct {
	MessageID   string          `json:"message_id"`
	Title       string          `json:"title"`
	FromAgent   string          `json:"from_agent"`
	ToInbox     string          `json:"to_inbox"`
	MessageType string          `json:"message_type"`
	Status      string          `json:"status"`
	CreatedAt   *time.Time      `json:"created_at,omitempty"`
	Execs       []*ExecTaskNode `json:"execs,omitempty"`
}

MessageNode represents a message that triggered coordinator tasks

type MessageStatus

type MessageStatus string

MessageStatus represents the status of a message.

const (
	MessageStatusUnread   MessageStatus = "unread"
	MessageStatusRead     MessageStatus = "read"
	MessageStatusArchived MessageStatus = "archived"
)

type Metric

type Metric struct {
	ID        int64  `json:"id,omitempty"`
	Name      string `json:"name"`
	Type      string `json:"metric_type"` // "counter", "gauge"
	SessionID string `json:"session_id,omitempty"`
	Workspace string `json:"workspace,omitempty"`
	Provider  string `json:"provider,omitempty"`

	// Denormalized labels for efficient queries
	LabelType     string `json:"label_type,omitempty"`     // "added"/"removed" for LOC
	LabelTool     string `json:"label_tool,omitempty"`     // tool name for tool metrics
	LabelDecision string `json:"label_decision,omitempty"` // "approved"/"rejected" for code_edit_tool
	LabelLanguage string `json:"label_language,omitempty"` // language for LOC
	LabelModel    string `json:"label_model,omitempty"`    // model name for model-specific metrics

	// Values (only one will be set based on metric type)
	ValueInt   int64   `json:"value_int,omitempty"`
	ValueFloat float64 `json:"value_float,omitempty"`

	// Full labels as JSON (for provider-specific data)
	Labels             map[string]any `json:"labels,omitempty"`
	ResourceAttributes map[string]any `json:"resource_attributes,omitempty"`

	Timestamp time.Time `json:"timestamp"`
	CreatedAt time.Time `json:"created_at"`
}

Metric represents an OTLP counter or gauge metric from Claude Code telemetry. Captures: lines_of_code.count, commit.count, pull_request.count, active_time.total, etc.

func (*Metric) LabelsJSON

func (m *Metric) LabelsJSON() string

LabelsJSON returns labels as a JSON string for storage.

func (*Metric) ResourceAttributesJSON

func (m *Metric) ResourceAttributesJSON() string

ResourceAttributesJSON returns resource attributes as a JSON string for storage.

type MetricListOptions

type MetricListOptions struct {
	SessionID string     `json:"session_id,omitempty"`
	Workspace string     `json:"workspace,omitempty"`
	Name      string     `json:"name,omitempty"` // e.g., "claude_code.lines_of_code.count"
	TimeRange *TimeRange `json:"time_range,omitempty"`
	Limit     int        `json:"limit,omitempty"`
	Offset    int        `json:"offset,omitempty"`
}

MetricListOptions specifies filters for listing metrics.

type MetricsSummary

type MetricsSummary struct {
	TotalWorkspaces int     `json:"total_workspaces"`
	TotalTasks      int     `json:"total_tasks"`
	TotalSpans      int     `json:"total_spans"`
	TotalAgents     int     `json:"total_agents"`
	TotalTokensIn   int64   `json:"total_tokens_in"`
	TotalTokensOut  int64   `json:"total_tokens_out"`
	TotalCostUSD    float64 `json:"total_cost_usd"`
	SuccessRate     float64 `json:"success_rate"`

	// Cache metrics (from spans)
	TotalCacheReadTokens     int64   `json:"total_cache_read_tokens,omitempty"`
	TotalCacheCreationTokens int64   `json:"total_cache_creation_tokens,omitempty"`
	CacheSavingsUSD          float64 `json:"cache_savings_usd,omitempty"`

	// Lines of Code metrics (from metrics table)
	LinesAdded   int64 `json:"lines_added,omitempty"`
	LinesRemoved int64 `json:"lines_removed,omitempty"`

	// Activity metrics (from metrics table)
	CommitCount      int64 `json:"commit_count,omitempty"`
	PullRequestCount int64 `json:"pull_request_count,omitempty"`
	ActiveTimeMs     int64 `json:"active_time_ms,omitempty"`

	// Session metrics
	TurnCount  int `json:"turn_count,omitempty"`
	ToolCalls  int `json:"tool_calls,omitempty"`
	ErrorCount int `json:"error_count,omitempty"`
}

MetricsSummary contains global metrics summary.

type OTELEvent

type OTELEvent struct {
	Name         string     `json:"name"`
	TimeUnixNano int64      `json:"timeUnixNano"`
	Attributes   []KeyValue `json:"attributes,omitempty"`
}

OTELEvent represents an OTEL span event.

type OTELResource

type OTELResource struct {
	Attributes []KeyValue `json:"attributes,omitempty"`
}

OTELResource represents OTEL resource attributes.

type OTELStatus

type OTELStatus struct {
	Code    int    `json:"code"` // 0=Unset, 1=OK, 2=Error
	Message string `json:"message,omitempty"`
}

OTELStatus represents OTEL span status.

type OTLPReceiver

type OTLPReceiver struct {
	// contains filtered or unexported fields
}

OTLPReceiver receives spans via the standard OTLP HTTP protocol. This allows any OTEL-compatible exporter to send traces to the observatory.

func NewOTLPReceiver

func NewOTLPReceiver(backend Backend) *OTLPReceiver

NewOTLPReceiver creates a new OTLP receiver that stores spans in the backend.

func (*OTLPReceiver) RegisterRoutes

func (r *OTLPReceiver) RegisterRoutes(mux *http.ServeMux)

RegisterRoutes registers the OTLP HTTP endpoints on the given mux. Implements the OTLP/HTTP specification: - POST /v1/traces - Receive trace data (protobuf or JSON) - POST /v1/logs - Receive logs/events (for Claude Code events) - POST /v1/metrics - Receive metrics (for Claude Code metrics)

type OutlierAnalysis

type OutlierAnalysis struct {
	TaskID       string             `json:"task_id"`
	TaskTitle    string             `json:"task_title"`
	SpanCount    int                `json:"span_count"`
	Stats        []*TaskMetricStats `json:"stats"`                    // Per-metric statistics
	Outliers     []*SpanOutlier     `json:"outliers"`                 // Detected outliers (sorted by |z-score| desc)
	RateOfChange *RateAnalysis      `json:"rate_of_change,omitempty"` // Optional rate analysis
	Threshold    float64            `json:"threshold"`                // Z-score threshold used
	AnalyzedAt   time.Time          `json:"analyzed_at"`
}

OutlierAnalysis contains the full outlier analysis for a task.

func AnalyzeTaskOutliers

func AnalyzeTaskOutliers(ctx context.Context, backend Backend, taskID string, opts OutlierOptions) (*OutlierAnalysis, error)

AnalyzeTaskOutliers performs statistical outlier detection on spans within a task.

type OutlierOptions

type OutlierOptions struct {
	Threshold float64 // Z-score threshold (default: 2.0)
	Metric    string  // Filter to specific metric: "cost", "duration", "tokens", or "" for all
	ShowRate  bool    // Include rate-of-change analysis
	Limit     int     // Max outliers to return (default: 10)
}

OutlierOptions configures outlier detection.

func DefaultOutlierOptions

func DefaultOutlierOptions() OutlierOptions

DefaultOutlierOptions returns sensible defaults.

type PendingApprovalInfo

type PendingApprovalInfo struct {
	ChainID        string       `json:"chain_id"`
	SourceType     string       `json:"source_type"`
	SourceRef      string       `json:"source_ref"`
	StageID        string       `json:"stage_id"`
	StageNumber    int          `json:"stage_number"`
	AgentID        string       `json:"agent_id"`
	ApprovalStatus string       `json:"approval_status"`
	ApprovalType   ApprovalType `json:"approval_type"`
	TaskID         string       `json:"task_id"`
	SessionID      string       `json:"session_id"`
	Cost           float64      `json:"cost"`
	Turns          int          `json:"turns"`
	StageCreated   time.Time    `json:"stage_created"`
}

PendingApprovalInfo contains approval info for dashboard views.

type Provider

type Provider string

Provider represents an AI provider.

const (
	ProviderClaude Provider = "claude"
	ProviderGemini Provider = "gemini"
	ProviderOllama Provider = "ollama"
)

type ProviderComparison

type ProviderComparison struct {
	Provider        Provider `json:"provider"`
	TotalExecutions int      `json:"total_executions"`
	TotalTokensIn   int64    `json:"total_tokens_in"`
	TotalTokensOut  int64    `json:"total_tokens_out"`
	TotalCost       float64  `json:"total_cost"`
	AvgDurationMs   float64  `json:"avg_duration_ms"`
	SuccessRate     float64  `json:"success_rate"`
}

ProviderComparison contains comparison metrics between providers.

type ProviderNormalizer

type ProviderNormalizer struct{}

ProviderNormalizer normalizes telemetry from different providers into Spans.

func NewProviderNormalizer

func NewProviderNormalizer() *ProviderNormalizer

NewProviderNormalizer creates a new normalizer.

func (*ProviderNormalizer) NormalizeClaudeMetrics

func (n *ProviderNormalizer) NormalizeClaudeMetrics(metrics *ClaudeMetrics) (*Span, error)

NormalizeClaudeMetrics converts Claude metrics into a Span with events.

func (*ProviderNormalizer) NormalizeGeminiSpan

func (n *ProviderNormalizer) NormalizeGeminiSpan(otelSpan *GeminiOTELSpan, taskID string) (*Span, error)

NormalizeGeminiSpan converts a Gemini OTEL span into our normalized Span.

func (*ProviderNormalizer) NormalizeGeminiTrace

func (n *ProviderNormalizer) NormalizeGeminiTrace(otelSpans []*GeminiOTELSpan, taskID string) ([]*Span, error)

NormalizeGeminiTrace converts a complete Gemini OTEL trace into normalized Spans.

type Range

type Range struct {
	Min int
	Max int
}

Range represents a min/max range for random generation.

type RateAnalysis

type RateAnalysis struct {
	CumulativeCost     []CumulativePoint `json:"cumulative_cost"`
	CumulativeTokens   []CumulativePoint `json:"cumulative_tokens"`
	CumulativeDuration []CumulativePoint `json:"cumulative_duration"`
}

RateAnalysis shows how metrics accumulated during task execution.

type RetentionStats

type RetentionStats struct {
	SpansDeleted     int64
	SummariesDeleted int64
	MetricsDeleted   int64
	ChatDeleted      int64
	ToolsDeleted     int64
}

RetentionStats reports what was cleaned up by RunRetention.

func (RetentionStats) String

func (rs RetentionStats) String() string

String formats retention stats for logging.

func (RetentionStats) Total

func (rs RetentionStats) Total() int64

Total returns the total number of rows deleted.

type SQLiteBackend

type SQLiteBackend struct {
	// contains filtered or unexported fields
}

SQLiteBackend implements Backend using SQLite storage.

func NewSQLiteBackend

func NewSQLiteBackend(db *sql.DB) (*SQLiteBackend, error)

NewSQLiteBackend creates a new SQLite backend.

func NewSQLiteBackendFromPath

func NewSQLiteBackendFromPath(path string) (*SQLiteBackend, error)

NewSQLiteBackendFromPath creates a SQLite backend from a file path.

func (*SQLiteBackend) BackfillSpansWorkspace

func (b *SQLiteBackend) BackfillSpansWorkspace(ctx context.Context, sessionID, workspace string) (int64, error)

func (*SQLiteBackend) Close

func (b *SQLiteBackend) Close() error

Close closes the database connection.

func (*SQLiteBackend) CountChatMessages

func (b *SQLiteBackend) CountChatMessages(ctx context.Context, q ChatMessageQuery) (int, int, error)

func (*SQLiteBackend) CreateAgentAssignment

func (b *SQLiteBackend) CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error

func (*SQLiteBackend) CreateChain

func (b *SQLiteBackend) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)

func (*SQLiteBackend) CreateMessage

func (b *SQLiteBackend) CreateMessage(ctx context.Context, m *Message) error

func (*SQLiteBackend) CreateMetric

func (b *SQLiteBackend) CreateMetric(ctx context.Context, m *Metric) error

func (*SQLiteBackend) CreateSpan

func (b *SQLiteBackend) CreateSpan(ctx context.Context, span *Span) error

func (*SQLiteBackend) CreateSpanEvent

func (b *SQLiteBackend) CreateSpanEvent(ctx context.Context, e *SpanEvent) error

func (*SQLiteBackend) CreateStage

func (b *SQLiteBackend) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)

func (*SQLiteBackend) CreateTask

func (b *SQLiteBackend) CreateTask(ctx context.Context, t *Task) error

func (*SQLiteBackend) CreateWorkspace

func (b *SQLiteBackend) CreateWorkspace(ctx context.Context, w *Workspace) error

func (*SQLiteBackend) DB

func (b *SQLiteBackend) DB() *sql.DB

DB returns the underlying database for direct SQL queries.

func (*SQLiteBackend) DeleteAgentAssignment

func (b *SQLiteBackend) DeleteAgentAssignment(ctx context.Context, id string) error

func (*SQLiteBackend) DeleteMessage

func (b *SQLiteBackend) DeleteMessage(ctx context.Context, id string) error

func (*SQLiteBackend) DeleteSpan

func (b *SQLiteBackend) DeleteSpan(ctx context.Context, id string) error

func (*SQLiteBackend) DeleteSpanEvent

func (b *SQLiteBackend) DeleteSpanEvent(ctx context.Context, id int64) error

func (*SQLiteBackend) DeleteTask

func (b *SQLiteBackend) DeleteTask(ctx context.Context, id string) error

func (*SQLiteBackend) DeleteWorkspace

func (b *SQLiteBackend) DeleteWorkspace(ctx context.Context, id string) error

func (*SQLiteBackend) FindLatestUnfinishedTool

func (b *SQLiteBackend) FindLatestUnfinishedTool(ctx context.Context, sessionID, toolName string) (string, error)

func (*SQLiteBackend) GetAgentAssignment

func (b *SQLiteBackend) GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)

func (*SQLiteBackend) GetAgentStats

func (b *SQLiteBackend) GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)

func (*SQLiteBackend) GetBreakdownByModel

func (b *SQLiteBackend) GetBreakdownByModel(ctx context.Context) ([]BreakdownItem, error)

GetBreakdownByModel returns cost/token breakdown by model. Scoped to last 30 days for performance on large databases (M-PERF-OBSERVATORY).

func (*SQLiteBackend) GetBreakdownByProvider

func (b *SQLiteBackend) GetBreakdownByProvider(ctx context.Context) ([]BreakdownItem, error)

GetBreakdownByProvider returns cost/token breakdown by provider. Scoped to last 30 days for performance on large databases (M-PERF-OBSERVATORY).

func (*SQLiteBackend) GetBreakdownBySourceType

func (b *SQLiteBackend) GetBreakdownBySourceType(ctx context.Context) ([]BreakdownItem, error)

GetBreakdownBySourceType returns cost/token breakdown by source type. Uses two-phase aggregation: first aggregate per trace_id, then join with trace_summaries. M-PERF-OBSERVATORY: avoids 662K×213K JOIN by reducing to ~130K trace groups first.

func (*SQLiteBackend) GetBreakdownByWorkspace

func (b *SQLiteBackend) GetBreakdownByWorkspace(ctx context.Context) ([]BreakdownItem, error)

GetBreakdownByWorkspace groups spans by workspace using config-driven path mapping. Note: Claude Code spans don't include process.cwd, so they show as "Unknown Workspace".

func (*SQLiteBackend) GetBreakdownByWorkspaceWithMapping

func (b *SQLiteBackend) GetBreakdownByWorkspaceWithMapping(ctx context.Context, mapping WorkspaceMapping, wsConfig WorkspaceMapping) ([]BreakdownItem, error)

GetBreakdownByWorkspaceWithMapping uses the provided mapping to convert file paths to workspace IDs.

func (*SQLiteBackend) GetChain

func (b *SQLiteBackend) GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)

func (*SQLiteBackend) GetChainByGitHubIssue

func (b *SQLiteBackend) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)

func (*SQLiteBackend) GetChainByMessageID

func (b *SQLiteBackend) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)

func (*SQLiteBackend) GetChainByTaskID

func (b *SQLiteBackend) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)

func (*SQLiteBackend) GetChainJourney

func (b *SQLiteBackend) GetChainJourney(ctx context.Context, chainID string) (*JourneyResponse, error)

func (*SQLiteBackend) GetChainStages

func (b *SQLiteBackend) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)

func (*SQLiteBackend) GetChainStats

func (b *SQLiteBackend) GetChainStats(ctx context.Context) (*ChainStats, error)

func (*SQLiteBackend) GetChainStatsByAgent

func (b *SQLiteBackend) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)

func (*SQLiteBackend) GetChainStatusCounts

func (b *SQLiteBackend) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)

func (*SQLiteBackend) GetChatMessagesBySession

func (b *SQLiteBackend) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)

func (*SQLiteBackend) GetChatMessagesByTaskID

func (b *SQLiteBackend) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)

func (*SQLiteBackend) GetClaudeCodeEvents

func (b *SQLiteBackend) GetClaudeCodeEvents(ctx context.Context, limit int) ([]ClaudeCodeEvent, error)

GetClaudeCodeEvents returns Claude Code sessions aggregated by session.id. Multiple api_request spans (turns) in one session are aggregated into a single event. This provides a cleaner Event Queue with one entry per Claude Code conversation.

func (*SQLiteBackend) GetClaudeCodeEventsWithLookup

func (b *SQLiteBackend) GetClaudeCodeEventsWithLookup(ctx context.Context, limit int, lookup TaskAgentLookup, sourceType, workspaceFilter string, wsConfig WorkspaceMapping) ([]ClaudeCodeEvent, error)

GetClaudeCodeEventsWithLookup returns Claude Code sessions with agent info resolved. For sessions spawned by coordinator tasks, FromAgent and ToInbox are set from agent_assignments. For user-initiated sessions (no coordinator task), defaults to "claude-code" / "user". Note: The lookup parameter is kept for API compatibility but is no longer used - agent info comes directly from observatory.db via JOIN with agent_assignments. The sourceType parameter filters by source:

  • "coordinator": Only sessions with agent_assignment (coordinator-spawned)
  • "user_session": Only sessions without agent_assignment (user-initiated)
  • "": No filter (all sessions)

The workspaceFilter parameter filters by workspace path:

  • Full path: "/Users/mark/dev/TwilightGame" (exact or contains match)
  • "unknown" or "No Workspace": Sessions with empty workspace
  • "coordinator_worktrees": Sessions in worktree directories
  • "": No filter (all workspaces)

func (*SQLiteBackend) GetClaudeCodeHierarchy

func (b *SQLiteBackend) GetClaudeCodeHierarchy(ctx context.Context, sessionID string) (*TaskHierarchy, error)

GetClaudeCodeHierarchy returns the hierarchy for a Claude Code session. The sessionID is the session.id from Claude Code telemetry (UUID format).

Claude Code telemetry creates a SEPARATE trace for each span (api_request and tool calls). This means we can't use standard trace hierarchy building (which relies on parent_span_id). Instead, we build a turn-based hierarchy using timestamp correlation:

  • api_request spans = "turns" (top level)
  • tool calls that occur before the next api_request = children of that turn

func (*SQLiteBackend) GetExecTaskHierarchy

func (b *SQLiteBackend) GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)

func (*SQLiteBackend) GetExecTaskHierarchyWithMessages

func (b *SQLiteBackend) GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)

func (*SQLiteBackend) GetFilteredBreakdownByModel

func (b *SQLiteBackend) GetFilteredBreakdownByModel(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) ([]BreakdownItem, error)

GetFilteredBreakdownByModel returns model breakdown with filters applied

func (*SQLiteBackend) GetFilteredBreakdownByProvider

func (b *SQLiteBackend) GetFilteredBreakdownByProvider(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) ([]BreakdownItem, error)

GetFilteredBreakdownByProvider returns provider breakdown with filters applied

func (*SQLiteBackend) GetFilteredBreakdownBySourceType

func (b *SQLiteBackend) GetFilteredBreakdownBySourceType(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) ([]BreakdownItem, error)

GetFilteredBreakdownBySourceType returns source type breakdown with filters applied. Uses trace_summaries.root_span_name for fast categorization (M-PERF-OBSERVATORY).

func (*SQLiteBackend) GetFilteredBreakdownByWorkspace

func (b *SQLiteBackend) GetFilteredBreakdownByWorkspace(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) ([]BreakdownItem, error)

GetFilteredBreakdownByWorkspace returns workspace breakdown with filters applied. Uses a basic fallback mapping - for full config support use GetFilteredBreakdownByWorkspaceWithMapping.

func (*SQLiteBackend) GetFilteredBreakdownByWorkspaceWithMapping

func (b *SQLiteBackend) GetFilteredBreakdownByWorkspaceWithMapping(ctx context.Context, filter *ControlPlaneFilter, mapping WorkspaceMapping, wsConfig WorkspaceMapping) ([]BreakdownItem, error)

GetFilteredBreakdownByWorkspaceWithMapping returns workspace breakdown with config-driven path mapping. The mapping parameter converts file paths to Firestore workspace IDs. Note: Workspace filter is deliberately excluded from conditions since this query groups BY workspace.

func (*SQLiteBackend) GetFilteredHeatmapData

func (b *SQLiteBackend) GetFilteredHeatmapData(ctx context.Context, filter *ControlPlaneFilter, days int, wsConfig WorkspaceMapping) ([]HeatmapDataPoint, error)

GetFilteredHeatmapData returns daily activity data aggregated from spans

func (*SQLiteBackend) GetFilteredMetricsSummary

func (b *SQLiteBackend) GetFilteredMetricsSummary(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) (*MetricsSummary, error)

GetFilteredMetricsSummary returns metrics filtered by Control Plane filter

func (*SQLiteBackend) GetMessage

func (b *SQLiteBackend) GetMessage(ctx context.Context, id string) (*Message, error)

func (*SQLiteBackend) GetMetricsSummary

func (b *SQLiteBackend) GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)

func (*SQLiteBackend) GetProviderComparison

func (b *SQLiteBackend) GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)

func (*SQLiteBackend) GetSession

func (b *SQLiteBackend) GetSession(ctx context.Context, sessionID string) (*Session, error)

func (*SQLiteBackend) GetSessionMetricsSummary

func (b *SQLiteBackend) GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)

func (*SQLiteBackend) GetSessionTools

func (b *SQLiteBackend) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)

func (*SQLiteBackend) GetSessionWorkspace

func (b *SQLiteBackend) GetSessionWorkspace(sessionID string) (string, error)

func (*SQLiteBackend) GetSpan

func (b *SQLiteBackend) GetSpan(ctx context.Context, id string) (*Span, error)

func (*SQLiteBackend) GetSpanEvents

func (b *SQLiteBackend) GetSpanEvents(ctx context.Context, spanID string) ([]SpanEvent, error)

func (*SQLiteBackend) GetSpanHierarchy

func (b *SQLiteBackend) GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)

func (*SQLiteBackend) GetSpanLitesByStageID

func (b *SQLiteBackend) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)

func (*SQLiteBackend) GetSpansByStageID

func (b *SQLiteBackend) GetSpansByStageID(ctx context.Context, stageID string) ([]*Span, error)

func (*SQLiteBackend) GetStage

func (b *SQLiteBackend) GetStage(ctx context.Context, id string) (*ChainStage, error)

func (*SQLiteBackend) GetTask

func (b *SQLiteBackend) GetTask(ctx context.Context, id string) (*Task, error)

func (*SQLiteBackend) GetTaskSpanSummary

func (b *SQLiteBackend) GetTaskSpanSummary(ctx context.Context, taskID string) (*TaskSpanSummary, error)

GetTaskSpanSummary returns aggregated span statistics for a task_id. Works for ALL task_id formats (coordinator, eval, UUID sessions).

func (*SQLiteBackend) GetTaskTimeline

func (b *SQLiteBackend) GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)

func (*SQLiteBackend) GetToolForSpan

func (b *SQLiteBackend) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)

func (*SQLiteBackend) GetToolsByTimestampRange

func (b *SQLiteBackend) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)

func (*SQLiteBackend) GetTrace

func (b *SQLiteBackend) GetTrace(ctx context.Context, traceID string) (*Trace, error)

func (*SQLiteBackend) GetWorkspace

func (b *SQLiteBackend) GetWorkspace(ctx context.Context, id string) (*Workspace, error)

func (*SQLiteBackend) GetWorkspaceStats

func (b *SQLiteBackend) GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)

func (*SQLiteBackend) InsertToolStart

func (b *SQLiteBackend) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error

func (*SQLiteBackend) LinkOrphanedSpansBySession

func (b *SQLiteBackend) LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)

LinkOrphanedSpansBySession updates orphaned spans when claude.execute span arrives. Fixes race condition where Claude Code events arrive before parent span is batch-flushed.

func (*SQLiteBackend) LinkSpanToChain

func (b *SQLiteBackend) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error

func (*SQLiteBackend) ListAgentAssignments

func (b *SQLiteBackend) ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)

func (*SQLiteBackend) ListChains

func (b *SQLiteBackend) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)

func (*SQLiteBackend) ListMessages

func (b *SQLiteBackend) ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)

func (*SQLiteBackend) ListMetrics

func (b *SQLiteBackend) ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)

func (*SQLiteBackend) ListPendingApprovals

func (b *SQLiteBackend) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)

func (*SQLiteBackend) ListSpans

func (b *SQLiteBackend) ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)

func (*SQLiteBackend) ListSpansByTaskIDs

func (b *SQLiteBackend) ListSpansByTaskIDs(ctx context.Context, taskIDs []string, limitPerTask int) (map[string][]*Span, error)

func (*SQLiteBackend) ListSpansLightweight

func (b *SQLiteBackend) ListSpansLightweight(ctx context.Context, opts SpanListOptions) ([]*Span, error)

ListSpansLightweight returns spans without heavy attributes/resource_attributes columns. Extracts only tool_name and session.id from attributes via json_extract (M-AUDIT-OBSERVATORY).

func (*SQLiteBackend) ListTasks

func (b *SQLiteBackend) ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)

func (*SQLiteBackend) ListTraces

func (b *SQLiteBackend) ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)

func (*SQLiteBackend) ListWorkspaces

func (b *SQLiteBackend) ListWorkspaces(ctx context.Context) ([]*Workspace, error)

func (*SQLiteBackend) LookupTaskBySessionID

func (b *SQLiteBackend) LookupTaskBySessionID(ctx context.Context, sessionID string) (taskID, assignmentID, traceID string)

LookupTaskBySessionID finds task hierarchy for Claude Code session correlation. Used by OTLP receiver to link Claude Code internal events to their parent executor span.

func (*SQLiteBackend) MarkMessageArchived

func (b *SQLiteBackend) MarkMessageArchived(ctx context.Context, id string) error

func (*SQLiteBackend) MarkMessageRead

func (b *SQLiteBackend) MarkMessageRead(ctx context.Context, id string) error

func (*SQLiteBackend) RecalculateTaskAggregates

func (b *SQLiteBackend) RecalculateTaskAggregates(ctx context.Context, taskID string) error

func (*SQLiteBackend) Store

func (b *SQLiteBackend) Store() *Store

Store returns the underlying store for direct queries.

func (*SQLiteBackend) UpdateAgentAssignment

func (b *SQLiteBackend) UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error

func (*SQLiteBackend) UpdateChainMetrics

func (b *SQLiteBackend) UpdateChainMetrics(ctx context.Context, id string, cost float64, tokens, turns int) error

func (*SQLiteBackend) UpdateChainStatus

func (b *SQLiteBackend) UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error

func (*SQLiteBackend) UpdateMessage

func (b *SQLiteBackend) UpdateMessage(ctx context.Context, m *Message) error

func (*SQLiteBackend) UpdateSessionEnded

func (b *SQLiteBackend) UpdateSessionEnded(ctx context.Context, sessionID string) error

func (*SQLiteBackend) UpdateSpan

func (b *SQLiteBackend) UpdateSpan(ctx context.Context, span *Span) error
func (b *SQLiteBackend) UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error

func (*SQLiteBackend) UpdateStageApproval

func (b *SQLiteBackend) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, approvalType ApprovalType, feedback string) error

func (*SQLiteBackend) UpdateStageError

func (b *SQLiteBackend) UpdateStageError(ctx context.Context, stageID, errorMessage string) error

func (*SQLiteBackend) UpdateStageMetrics

func (b *SQLiteBackend) UpdateStageMetrics(ctx context.Context, stageID string, cost float64, tokensIn, tokensOut, turns, toolCalls int, durationMs int64) error

func (*SQLiteBackend) UpdateStageSession

func (b *SQLiteBackend) UpdateStageSession(ctx context.Context, stageID, sessionID string) error

func (*SQLiteBackend) UpdateStageStatus

func (b *SQLiteBackend) UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error

func (*SQLiteBackend) UpdateTask

func (b *SQLiteBackend) UpdateTask(ctx context.Context, t *Task) error

func (*SQLiteBackend) UpdateToolEnd

func (b *SQLiteBackend) UpdateToolEnd(ctx context.Context, toolUseID, toolResponse string, success bool) error

func (*SQLiteBackend) UpdateWorkspace

func (b *SQLiteBackend) UpdateWorkspace(ctx context.Context, w *Workspace) error

func (*SQLiteBackend) UpsertSession

func (b *SQLiteBackend) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error

func (*SQLiteBackend) UpsertSessionWithCorrelation

func (b *SQLiteBackend) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, corr *SessionCorrelation) error

type SeedConfig

type SeedConfig struct {
	NumWorkspaces     int
	TasksPerWorkspace Range
	SpansPerTask      Range
	IncludeMessages   bool
	CleanFirst        bool
	Verbose           bool
}

SeedConfig configures the seed data generator.

func DefaultSeedConfig

func DefaultSeedConfig() SeedConfig

DefaultSeedConfig returns a realistic default configuration.

func MinimalSeedConfig

func MinimalSeedConfig() SeedConfig

MinimalSeedConfig returns a minimal configuration for quick testing.

func StressSeedConfig

func StressSeedConfig() SeedConfig

StressSeedConfig returns a configuration for load testing.

type SeedResult

type SeedResult struct {
	WorkspacesCreated  int
	TasksCreated       int
	AssignmentsCreated int
	SpansCreated       int
	EventsCreated      int
	MessagesCreated    int
}

SeedResult contains statistics from the seed operation.

func SeedDatabase

func SeedDatabase(ctx context.Context, backend Backend, cfg SeedConfig) (*SeedResult, error)

SeedDatabase generates test data in the database.

type Session

type Session struct {
	SessionID     string     `json:"session_id"`
	Workspace     string     `json:"workspace"`
	ClaudeVersion string     `json:"claude_version,omitempty"`
	Source        string     `json:"source"`
	StartedAt     time.Time  `json:"started_at"`
	EndedAt       *time.Time `json:"ended_at,omitempty"`
	TurnCount     int        `json:"turn_count"`
	ToolCount     int        `json:"tool_count"`

	// Correlation IDs from environment (M-DETERMINISTIC-CHAT-LINKING)
	// Set by Claude Code hooks when coordinator spawns with AILANG_* env vars
	TaskID    string `json:"task_id,omitempty"`
	ChainID   string `json:"chain_id,omitempty"`
	StageID   string `json:"stage_id,omitempty"`
	MessageID string `json:"message_id,omitempty"`
}

Session represents a Claude Code session with workspace metadata.

type SessionCorrelation

type SessionCorrelation struct {
	TaskID    string
	ChainID   string
	StageID   string
	MessageID string
}

SessionCorrelation holds optional correlation IDs from environment.

type SessionMetricsSummary

type SessionMetricsSummary struct {
	SessionID string `json:"session_id"`

	// Token metrics (from spans)
	TokensIn            int64 `json:"tokens_in"`
	TokensOut           int64 `json:"tokens_out"`
	CacheReadTokens     int64 `json:"cache_read_tokens"`
	CacheCreationTokens int64 `json:"cache_creation_tokens"`

	// Cost metrics (from spans + calculated)
	TotalCostUSD    float64 `json:"total_cost_usd"`
	CacheSavingsUSD float64 `json:"cache_savings_usd"` // Savings from cache reads

	// Lines of code (from metrics)
	LinesAdded   int64 `json:"lines_added"`
	LinesRemoved int64 `json:"lines_removed"`

	// Activity metrics (from metrics)
	CommitCount      int64   `json:"commit_count"`
	PullRequestCount int64   `json:"pull_request_count"`
	ActiveTimeMs     int64   `json:"active_time_ms"`
	TurnCount        int     `json:"turn_count"`
	ToolCalls        int     `json:"tool_calls"`
	DurationMs       int64   `json:"duration_ms"`
	SpanCount        int     `json:"span_count"`
	ErrorCount       int     `json:"error_count"`
	SuccessRate      float64 `json:"success_rate"`
}

SessionMetricsSummary aggregates all metrics for a session.

type SessionTool

type SessionTool struct {
	ToolUseID    string          `json:"tool_use_id"`
	SessionID    string          `json:"session_id"`
	ToolName     string          `json:"tool_name"`
	ToolInput    json.RawMessage `json:"tool_input,omitempty"`
	ToolResponse json.RawMessage `json:"tool_response,omitempty"`
	StartTime    time.Time       `json:"start_time"`
	EndTime      *time.Time      `json:"end_time,omitempty"`
	Success      *bool           `json:"success,omitempty"`
}

SessionTool represents a tool call within a Claude Code session.

type Span

type Span struct {
	ID                string     `json:"id"`
	TraceID           string     `json:"trace_id"`
	ParentSpanID      string     `json:"parent_span_id,omitempty"`
	TaskID            string     `json:"task_id,omitempty"`
	AgentAssignmentID string     `json:"agent_assignment_id,omitempty"`
	ChainID           string     `json:"chain_id,omitempty"` // M-CHAINS-SIMPLIFY: links to execution_chains
	StageID           string     `json:"stage_id,omitempty"` // M-CHAINS-SIMPLIFY: links to chain_stages
	Name              string     `json:"name"`
	Kind              SpanKind   `json:"kind"`
	Status            SpanStatus `json:"status"`
	StatusMessage     string     `json:"status_message,omitempty"`
	StartTime         time.Time  `json:"start_time"`
	EndTime           *time.Time `json:"end_time,omitempty"`
	DurationMs        int64      `json:"duration_ms,omitempty"`

	// Normalized attributes (common across providers)
	TokensIn            int64    `json:"tokens_in,omitempty"`
	TokensOut           int64    `json:"tokens_out,omitempty"`
	CacheReadTokens     int64    `json:"cache_read_tokens,omitempty"`
	CacheCreationTokens int64    `json:"cache_creation_tokens,omitempty"`
	CostUSD             float64  `json:"cost_usd,omitempty"`
	Model               string   `json:"model,omitempty"`
	Provider            Provider `json:"provider,omitempty"`

	// Full attributes as JSON (for provider-specific data)
	Attributes         map[string]any `json:"attributes,omitempty"`
	ResourceAttributes map[string]any `json:"resource_attributes,omitempty"`

	// Children (for tree building, not stored in DB)
	Children []*Span `json:"children,omitempty"`

	// Events (loaded separately)
	Events []SpanEvent `json:"events,omitempty"`

	// DisplayName is an enriched human-readable label (not stored in DB, computed from session_tools)
	DisplayName string `json:"display_name,omitempty"`

	// ChatContext contains embedded chat content (populated with include_chat=true API param)
	ChatContext *ChatContext `json:"chat_context,omitempty"`

	CreatedAt time.Time `json:"created_at"`
}

Span represents an OTEL span with normalized attributes.

func (*Span) AttributesJSON

func (s *Span) AttributesJSON() string

AttributesJSON returns attributes as a JSON string for storage.

func (*Span) ParseAttributes

func (s *Span) ParseAttributes(jsonStr string) error

ParseAttributes parses a JSON string into attributes.

func (*Span) ParseResourceAttributes

func (s *Span) ParseResourceAttributes(jsonStr string) error

ParseResourceAttributes parses a JSON string into resource attributes.

func (*Span) ResourceAttributesJSON

func (s *Span) ResourceAttributesJSON() string

ResourceAttributesJSON returns resource attributes as a JSON string for storage.

type SpanEvent

type SpanEvent struct {
	ID        int64     `json:"id,omitempty"`
	SpanID    string    `json:"span_id"`
	Name      string    `json:"name"`
	Timestamp time.Time `json:"timestamp"`
	EventType EventType `json:"event_type,omitempty"`

	// Denormalized for common event types
	ApprovalStatus ApprovalStatus `json:"approval_status,omitempty"`
	ToolName       string         `json:"tool_name,omitempty"`
	ErrorMessage   string         `json:"error_message,omitempty"`

	Attributes map[string]any `json:"attributes,omitempty"`
}

SpanEvent represents an event attached to a span (approvals, tool calls, errors).

func (*SpanEvent) AttributesJSON

func (e *SpanEvent) AttributesJSON() string

AttributesJSON returns attributes as a JSON string for storage.

func (*SpanEvent) ParseAttributes

func (e *SpanEvent) ParseAttributes(jsonStr string) error

ParseAttributes parses a JSON string into attributes.

type SpanFilterConfig

type SpanFilterConfig struct {
	AllowPatterns []FilterPattern // Allow takes priority — matching spans are ALWAYS kept
	DenyPatterns  []FilterPattern // Matching spans are dropped (unless allow-listed)
	DisableAll    bool            // Bypass all filtering (debug mode)
}

SpanFilterConfig holds configurable span filtering rules. Loaded once at startup from environment variables; immutable after creation.

func DefaultSpanFilterConfig

func DefaultSpanFilterConfig() *SpanFilterConfig

DefaultSpanFilterConfig returns the default deny rules matching the original hard-coded behavior.

func LoadSpanFilterConfig

func LoadSpanFilterConfig() *SpanFilterConfig

LoadSpanFilterConfig creates a SpanFilterConfig from environment variables, merging with defaults. Exported for testing.

type SpanHierarchyNode

type SpanHierarchyNode struct {
	ID                  string                 `json:"id"`
	Name                string                 `json:"name"`
	ParentID            string                 `json:"parent_id,omitempty"`
	Depth               int                    `json:"depth"`
	StartTime           time.Time              `json:"start_time"`
	DurationMs          int64                  `json:"duration_ms"`
	TokensIn            int64                  `json:"tokens_in,omitempty"`
	TokensOut           int64                  `json:"tokens_out,omitempty"`
	CacheReadTokens     int64                  `json:"cache_read_tokens,omitempty"`
	CacheCreationTokens int64                  `json:"cache_creation_tokens,omitempty"`
	CostUSD             float64                `json:"cost_usd,omitempty"`
	TurnNumber          int                    `json:"turn_number,omitempty"`
	ToolName            string                 `json:"tool_name,omitempty"`
	NodeType            SpanHierarchyNodeType  `json:"node_type"`
	SessionID           string                 `json:"session_id,omitempty"`
	Status              SpanStatus             `json:"status"`
	Provider            Provider               `json:"provider,omitempty"`
	Children            []*SpanHierarchyNode   `json:"children,omitempty"`
	Attributes          map[string]interface{} `json:"attributes,omitempty"` // Selected useful attributes
}

SpanHierarchyNode represents a span with its children for hierarchy visualization. Unlike Span, this is optimized for graph/tree rendering with parent_span_id-based relationships.

type SpanHierarchyNodeType

type SpanHierarchyNodeType string

SpanHierarchyNodeType categorizes spans for visualization.

const (
	NodeTypeCoordinator SpanHierarchyNodeType = "coordinator"
	NodeTypeExecutor    SpanHierarchyNodeType = "executor"
	NodeTypeTurn        SpanHierarchyNodeType = "turn"
	NodeTypeTool        SpanHierarchyNodeType = "tool"
	NodeTypeOther       SpanHierarchyNodeType = "other"
)

type SpanHierarchyResult

type SpanHierarchyResult struct {
	Roots    []*SpanHierarchyNode `json:"roots"`
	Sessions map[string]int       `json:"sessions"` // session_id -> turn_count
	Stats    SpanHierarchyStats   `json:"stats"`
}

SpanHierarchyResult contains the complete hierarchy result with roots and sessions.

type SpanHierarchyStats

type SpanHierarchyStats struct {
	TotalSpans  int     `json:"total_spans"`
	TotalCost   float64 `json:"total_cost"`
	TotalTokens struct {
		In            int64 `json:"in"`
		Out           int64 `json:"out"`
		CacheRead     int64 `json:"cache_read"`
		CacheCreation int64 `json:"cache_creation"`
	} `json:"total_tokens"`
	CacheSavingsUSD float64 `json:"cache_savings_usd,omitempty"`
	TimeRange       struct {
		Start time.Time `json:"start"`
		End   time.Time `json:"end"`
	} `json:"time_range"`
	MaxDepth int `json:"max_depth"`
}

SpanHierarchyStats contains aggregated stats for a hierarchy result.

type SpanKind

type SpanKind string

SpanKind represents the type of span.

const (
	SpanKindInternal SpanKind = "internal"
	SpanKindClient   SpanKind = "client"
	SpanKindServer   SpanKind = "server"
	SpanKindProducer SpanKind = "producer"
	SpanKindConsumer SpanKind = "consumer"
)

type SpanListOptions

type SpanListOptions struct {
	TraceID           string
	TaskID            string
	AgentAssignmentID string
	StartAfter        time.Time
	StartBefore       time.Time
	Limit             int
	Offset            int
	// Filter by provider (claude, gemini, openai, etc.)
	Provider string
	// Filter by model name (claude-sonnet-4-5, gemini-2-5-flash, etc.)
	Model string
	// Filter by span status (ok, error)
	Status string
	// Filter by workspace path (filters via task→workspace relationship)
	Workspace string
	// Filter by workspace ID directly (more efficient when workspace_id is known)
	WorkspaceID string
}

SpanListOptions configures span listing.

type SpanLite

type SpanLite struct {
	ID            string    `json:"id"`
	TraceID       string    `json:"trace_id"`
	ParentSpanID  string    `json:"parent_span_id,omitempty"`
	ChainID       string    `json:"chain_id,omitempty"`
	StageID       string    `json:"stage_id,omitempty"`
	Name          string    `json:"name"`
	Kind          SpanKind  `json:"kind"`
	Status        string    `json:"status"`
	StatusMessage string    `json:"status_message,omitempty"`
	StartTime     time.Time `json:"start_time"`
	EndTime       time.Time `json:"end_time,omitempty"`
	DurationMs    int64     `json:"duration_ms"`
	TokensIn      int64     `json:"tokens_in,omitempty"`
	TokensOut     int64     `json:"tokens_out,omitempty"`
	CostUSD       float64   `json:"cost_usd,omitempty"`
	Model         string    `json:"model,omitempty"`
	Provider      string    `json:"provider,omitempty"`
}

SpanLite is a lightweight span without the heavy attributes/resource_attributes columns. Used for tree rendering, timeline, and waterfall views (M-PERF-OBSERVATORY). The full attributes are only loaded on-demand via GetSpan (Level 3).

type SpanLitePage

type SpanLitePage struct {
	Spans  []*SpanLite `json:"spans"`
	Total  int         `json:"total"`
	Limit  int         `json:"limit"`
	Offset int         `json:"offset"`
}

SpanLitePage is a paginated response of SpanLite records.

type SpanNameCount

type SpanNameCount struct {
	Name  string `json:"name"`
	Count int    `json:"count"`
}

SpanNameCount holds a span name and its count.

type SpanNode

type SpanNode struct {
	Span     *Span       `json:"span"`
	Children []*SpanNode `json:"children,omitempty"`
}

SpanNode represents a span with its children for hierarchical display.

type SpanOutlier

type SpanOutlier struct {
	Span           *Span   `json:"span"`
	Metric         string  `json:"metric"`           // "cost_usd", "duration_ms", "tokens"
	Value          float64 `json:"value"`            // Actual metric value
	Mean           float64 `json:"mean"`             // Task mean for this metric
	StdDev         float64 `json:"std_dev"`          // Task standard deviation
	ZScore         float64 `json:"z_score"`          // (value - mean) / stddev
	PercentOfTotal float64 `json:"percent_of_total"` // What % of task total this span represents
}

SpanOutlier represents a span that is statistically anomalous within its task.

type SpanStatus

type SpanStatus string

SpanStatus represents the status of a span.

const (
	SpanStatusOK    SpanStatus = "ok"
	SpanStatusError SpanStatus = "error"
	SpanStatusUnset SpanStatus = "unset"
)

type StageCreateRequest

type StageCreateRequest struct {
	ChainID   string   `json:"chain_id"`
	AgentID   string   `json:"agent_id"`
	Provider  Provider `json:"provider,omitempty"`
	MessageID string   `json:"message_id,omitempty"`
	TaskID    string   `json:"task_id,omitempty"`
	HandoffTo string   `json:"handoff_to,omitempty"`
	Iteration int      `json:"iteration,omitempty"` // Defaults to 1
}

StageCreateRequest contains the data needed to create a new stage in a chain.

type StageUpdateRequest

type StageUpdateRequest struct {
	Status         *ChainStageStatus `json:"status,omitempty"`
	SessionID      *string           `json:"session_id,omitempty"`
	ApprovalStatus *ApprovalStatus   `json:"approval_status,omitempty"`
	ApprovalType   *ApprovalType     `json:"approval_type,omitempty"`
	HumanFeedback  *string           `json:"human_feedback,omitempty"`
	ErrorMessage   *string           `json:"error_message,omitempty"`
}

StageUpdateRequest contains fields that can be updated on a stage.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store provides CRUD operations for the observatory platform.

func NewStore

func NewStore(db *sql.DB) *Store

NewStore creates a new Store with the given database connection.

func OpenDefaultStore

func OpenDefaultStore() (*Store, error)

OpenDefaultStore opens the observatory database at the default path, runs migrations, and returns a ready-to-use Store. This is the recommended way to access observatory.db from CLI tools.

func OpenStore

func OpenStore(dbPath string) (*Store, error)

OpenStore opens the observatory database at the given path, runs migrations, and returns a ready-to-use Store.

func (*Store) BackfillSpansWorkspace

func (s *Store) BackfillSpansWorkspace(ctx context.Context, sessionID, workspace string) (int64, error)

BackfillSpansWorkspace updates existing spans that have the given session.id but are missing workspace information. Returns the number of spans updated.

func (*Store) Close

func (s *Store) Close() error

Close closes the underlying database connection.

func (*Store) CountChatMessages

func (s *Store) CountChatMessages(ctx context.Context, q ChatMessageQuery) (total int, withTaskID int, err error)

CountChatMessages returns total and linked message counts for health diagnostics.

func (*Store) CreateAgentAssignment

func (s *Store) CreateAgentAssignment(a *AgentAssignment) error

CreateAgentAssignment inserts a new agent assignment.

func (*Store) CreateChain

func (s *Store) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)

CreateChain creates a new execution chain. Returns the chain with its generated ID populated.

func (*Store) CreateMessage

func (s *Store) CreateMessage(m *Message) error

CreateMessage inserts a new message.

func (*Store) CreateMetric

func (s *Store) CreateMetric(m *Metric) error

CreateMetric inserts a new metric.

func (*Store) CreateSpan

func (s *Store) CreateSpan(span *Span) error

CreateSpan inserts a new span. Note: This is the low-level insert WITHOUT aggregation updates. For transactional span creation with aggregation updates, use SQLiteBackend.CreateSpan which wraps this in a transaction with UpdateTaskAggregates and UpdateAgentAssignmentAggregates.

func (*Store) CreateSpanEvent

func (s *Store) CreateSpanEvent(e *SpanEvent) error

CreateSpanEvent inserts a new span event.

func (*Store) CreateStage

func (s *Store) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)

CreateStage creates a new stage within a chain. Returns the stage with its generated ID populated.

func (*Store) CreateTask

func (s *Store) CreateTask(t *Task) error

CreateTask inserts a new task.

func (*Store) CreateWorkspace

func (s *Store) CreateWorkspace(w *Workspace) error

CreateWorkspace inserts a new workspace.

func (*Store) DB

func (s *Store) DB() *sql.DB

DB returns the underlying database connection.

func (*Store) DeleteAgentAssignment

func (s *Store) DeleteAgentAssignment(id string) error

DeleteAgentAssignment removes an assignment by ID.

func (*Store) DeleteChain

func (s *Store) DeleteChain(ctx context.Context, id string) error

DeleteChain removes a chain and all its stages (CASCADE).

func (*Store) DeleteMessage

func (s *Store) DeleteMessage(id string) error

DeleteMessage removes a message by ID.

func (*Store) DeleteSpan

func (s *Store) DeleteSpan(id string) error

DeleteSpan removes a span by ID.

func (*Store) DeleteSpanEvent

func (s *Store) DeleteSpanEvent(id int64) error

DeleteSpanEvent removes a span event by ID.

func (*Store) DeleteTask

func (s *Store) DeleteTask(id string) error

DeleteTask removes a task by ID.

func (*Store) DeleteWorkspace

func (s *Store) DeleteWorkspace(id string) error

DeleteWorkspace removes a workspace by ID.

func (*Store) FindLatestUnfinishedTool

func (s *Store) FindLatestUnfinishedTool(ctx context.Context, sessionID, toolName string) (string, error)

FindLatestUnfinishedTool finds the most recent tool call that hasn't completed yet. Used to correlate PostToolUse with PreToolUse when tool_use_id is not provided.

func (*Store) GetAgentAssignment

func (s *Store) GetAgentAssignment(id string) (*AgentAssignment, error)

GetAgentAssignment retrieves an agent assignment by ID.

func (*Store) GetAgentStats

func (s *Store) GetAgentStats(agentID string) (*AgentStats, error)

GetAgentStats returns aggregated stats for an agent.

func (*Store) GetChain

func (s *Store) GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)

GetChain retrieves a chain by ID with optional related data.

func (*Store) GetChainByGitHubIssue

func (s *Store) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)

GetChainByGitHubIssue finds the chain for a GitHub issue.

func (*Store) GetChainByMessageID

func (s *Store) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)

GetChainByMessageID finds the chain containing a given message ID.

func (*Store) GetChainByTaskID

func (s *Store) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)

GetChainByTaskID finds the chain containing a given task ID.

func (*Store) GetChainJourney

func (s *Store) GetChainJourney(ctx context.Context, chainID string) (*JourneyResponse, error)

GetChainJourney computes a narrative journey for a chain from its stages.

func (*Store) GetChainStages

func (s *Store) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)

GetChainStages returns all stages for a chain.

func (*Store) GetChainStats

func (s *Store) GetChainStats(ctx context.Context) (*ChainStats, error)

GetChainStats returns aggregate statistics about chains.

func (*Store) GetChainStatsByAgent

func (s *Store) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)

GetChainStatsByAgent returns per-agent aggregated stats in a single SQL query. Replaces the N+1 pattern of fetching all chains then querying stages per chain (M-PERF-OBSERVATORY).

func (*Store) GetChainStatusCounts

func (s *Store) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)

GetChainStatusCounts returns chain counts grouped by status in a single query. Replaces the pattern of fetching all chains and counting in Go (M-PERF-OBSERVATORY).

func (*Store) GetChatMessagesBySession

func (s *Store) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)

GetChatMessagesBySession fetches chat messages for a session, optionally filtered by time range. Pass zero-value times to skip time filtering.

func (*Store) GetChatMessagesByTaskID

func (s *Store) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)

GetChatMessagesByTaskID fetches chat messages linked to a task via task_id. This is the preferred deterministic query (M-DETERMINISTIC-CHAT-LINKING).

func (*Store) GetExecTaskHierarchy

func (s *Store) GetExecTaskHierarchy(limit int) ([]*ExecTaskNode, error)

GetExecTaskHierarchy returns the hierarchy of ailang commands (exec, run, check) from span attributes including turn and tool_use child spans for complete hierarchy visualization

func (*Store) GetExecTaskHierarchyWithMessages

func (s *Store) GetExecTaskHierarchyWithMessages(limit int) (*ExecHierarchyWithMessages, error)

GetExecTaskHierarchyWithMessages returns the exec hierarchy grouped by triggering messages. This provides a 4-level view: Messages -> Execs -> Turns -> Tool Uses

func (*Store) GetMessage

func (s *Store) GetMessage(id string) (*Message, error)

GetMessage retrieves a message by ID.

func (*Store) GetMetricsSummary

func (s *Store) GetMetricsSummary() (*MetricsSummary, error)

GetMetricsSummary returns global metrics.

func (*Store) GetProviderComparison

func (s *Store) GetProviderComparison() ([]*ProviderComparison, error)

GetProviderComparison returns comparison metrics across providers.

func (*Store) GetSession

func (s *Store) GetSession(ctx context.Context, sessionID string) (*Session, error)

GetSession returns full session details.

func (*Store) GetSessionMetricsSummary

func (s *Store) GetSessionMetricsSummary(sessionID string) (*SessionMetricsSummary, error)

GetSessionMetricsSummary aggregates all metrics for a session.

func (*Store) GetSessionTools

func (s *Store) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)

GetSessionTools returns all tool calls for a session.

func (*Store) GetSessionWorkspace

func (s *Store) GetSessionWorkspace(sessionID string) (string, error)

GetSessionWorkspace returns the workspace for a session. Returns empty string if session not found.

func (*Store) GetSpan

func (s *Store) GetSpan(id string) (*Span, error)

GetSpan retrieves a span by ID.

func (*Store) GetSpanEvents

func (s *Store) GetSpanEvents(spanID string) ([]SpanEvent, error)

GetSpanEvents retrieves all events for a span.

func (*Store) GetSpanHierarchy

func (s *Store) GetSpanHierarchy(limit int) (*SpanHierarchyResult, error)

GetSpanHierarchy returns a hierarchical tree of spans using parent_span_id relationships. Unlike GetExecTaskHierarchy which requires custom attributes, this works with standard OTEL parenting. Returns roots (coordinator/executor spans), session groups, and stats.

func (*Store) GetSpanLitesByStageID

func (s *Store) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)

GetSpanLitesByStageID returns lightweight spans for a stage without the heavy attributes columns. This avoids reading the 3.9GB attributes data when only metadata is needed (M-PERF-OBSERVATORY).

func (*Store) GetSpansByStageID

func (s *Store) GetSpansByStageID(ctx context.Context, stageID string) ([]*Span, error)

GetSpansByStageID returns spans linked to a stage via stage_id column.

func (*Store) GetStage

func (s *Store) GetStage(ctx context.Context, id string) (*ChainStage, error)

GetStage retrieves a single stage by ID.

func (*Store) GetStageEvalAssessment

func (s *Store) GetStageEvalAssessment(ctx context.Context, stageID string) (*EvalAssessment, error)

GetStageEvalAssessment reads eval assessment data from a chain stage. Returns nil, nil if the stage exists but has no eval assessment.

func (*Store) GetTask

func (s *Store) GetTask(id string) (*Task, error)

GetTask retrieves a task by ID.

func (*Store) GetTaskSpanSummary

func (s *Store) GetTaskSpanSummary(ctx context.Context, taskID string) (*TaskSpanSummary, error)

GetTaskSpanSummary returns aggregated span statistics for a task_id. Works for ALL task_id formats (coordinator, eval, UUID sessions).

func (*Store) GetTaskTimeline

func (s *Store) GetTaskTimeline(taskID string) ([]*TaskTimeline, error)

GetTaskTimeline returns timeline data for a task.

func (*Store) GetToolForSpan

func (s *Store) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)

GetToolForSpan finds the session_tool that best matches a span by timestamp + tool name. Uses a ±10 second window to handle clock drift and hook execution delay. Returns nil if no matching tool is found.

func (*Store) GetToolsByTimestampRange

func (s *Store) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)

GetToolsByTimestampRange returns tools that started within a time range. Used for cross-system correlation when session IDs don't match.

func (*Store) GetToolsForSessions

func (s *Store) GetToolsForSessions(ctx context.Context, sessionIDs []string) (map[string][]SessionTool, error)

GetToolsForSessions returns tools for multiple session IDs, grouped by session. This is used for enriching spans with tool metadata.

func (*Store) GetTrace

func (s *Store) GetTrace(traceID string) (*Trace, error)

GetTrace retrieves a complete trace with all spans.

func (*Store) GetWorkspace

func (s *Store) GetWorkspace(id string) (*Workspace, error)

GetWorkspace retrieves a workspace by ID.

func (*Store) GetWorkspaceStats

func (s *Store) GetWorkspaceStats(workspaceID string) (*WorkspaceStats, error)

GetWorkspaceStats returns aggregated stats for a workspace.

func (*Store) IncrementSessionTurns

func (s *Store) IncrementSessionTurns(ctx context.Context, sessionID string) error

IncrementSessionTurns increments the turn count for a session.

func (*Store) InsertToolStart

func (s *Store) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error

InsertToolStart records the start of a tool call.

func (*Store) LinkOrphanedSpansBySession

func (s *Store) LinkOrphanedSpansBySession(sessionID, taskID, assignmentID string) (int64, error)

LinkOrphanedSpansBySession updates spans that have a matching session.id but no task_id. This fixes the race condition where Claude Code internal events arrive via OTLP before the claude.execute span is batch-flushed by the OTEL SDK. Called after storing a claude.execute span to retroactively link orphaned child spans.

func (*Store) LinkSpanToChain

func (s *Store) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error

LinkSpanToChain updates a span's chain_id and stage_id. Called during OTEL ingest when AILANG_CHAIN_ID/AILANG_STAGE_ID env vars are present.

func (*Store) ListAgentAssignments

func (s *Store) ListAgentAssignments(taskID string) ([]*AgentAssignment, error)

ListAgentAssignments returns assignments for a task.

func (*Store) ListChains

func (s *Store) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)

ListChains returns chains matching the given options.

func (*Store) ListEvalChains

func (s *Store) ListEvalChains(ctx context.Context, limit int) ([]*ChainSummary, error)

ListEvalChains returns chains with source_type = "eval_suite".

func (*Store) ListMessages

func (s *Store) ListMessages(opts MessageListOptions) ([]*Message, error)

ListMessages returns messages with optional filtering.

func (*Store) ListMetrics

func (s *Store) ListMetrics(opts MetricListOptions) ([]*Metric, error)

ListMetrics retrieves metrics with optional filtering.

func (*Store) ListPendingApprovals

func (s *Store) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)

ListPendingApprovals returns all stages awaiting approval.

func (*Store) ListRecentSessions

func (s *Store) ListRecentSessions(ctx context.Context, limit int) ([]Session, error)

ListRecentSessions returns recent sessions ordered by start time.

func (*Store) ListSpans

func (s *Store) ListSpans(opts SpanListOptions) ([]*Span, error)

ListSpans returns spans with optional filtering.

func (*Store) ListSpansByTaskIDs

func (s *Store) ListSpansByTaskIDs(taskIDs []string, limitPerTask int) (map[string][]*Span, error)

ListSpansByTaskIDs fetches spans for multiple task IDs in a single query. Returns a map of task_id -> []*Span. Much more efficient than calling ListSpans once per task (single query vs N queries).

func (*Store) ListSpansLightweight

func (s *Store) ListSpansLightweight(opts SpanListOptions) ([]*Span, error)

ListSpansLightweight returns spans without the heavy attributes/resource_attributes columns. Instead of reading the full JSON blobs (3.9GB total), it extracts only the specific attribute fields needed for enrichment: tool_name and session.id. This is 10-50x faster than ListSpans for large datasets. (M-AUDIT-OBSERVATORY)

func (*Store) ListTasks

func (s *Store) ListTasks(opts TaskListOptions) ([]*Task, error)

ListTasks returns tasks with optional filtering.

func (*Store) ListTraces

func (s *Store) ListTraces(opts TraceQuery) ([]*TraceSummary, error)

ListTraces returns trace summaries. Uses the trace_summaries materialized table (M-PERF-OBSERVATORY v12 migration) instead of correlated subqueries on the spans table.

func (*Store) ListWorkspaces

func (s *Store) ListWorkspaces() ([]*Workspace, error)

ListWorkspaces returns all workspaces.

func (*Store) LookupTaskBySessionID

func (s *Store) LookupTaskBySessionID(sessionID string) (taskID, assignmentID, traceID string)

LookupTaskBySessionID finds task hierarchy info for a given session ID. This enables correlating Claude Code internal events to their parent executor span. Returns taskID, agentAssignmentID, traceID (for trace linking), or empty strings if not found.

func (*Store) MarkMessageArchived

func (s *Store) MarkMessageArchived(id string) error

MarkMessageArchived marks a message as archived.

func (*Store) MarkMessageRead

func (s *Store) MarkMessageRead(id string) error

MarkMessageRead marks a message as read.

func (*Store) QueryEvalResults

func (s *Store) QueryEvalResults(ctx context.Context, opts EvalQueryOptions) ([]*ChainStage, error)

QueryEvalResults returns chain stages that have eval assessments, with optional filters. Uses json_extract() for filtering on assessment fields.

func (*Store) RecalculateAgentAssignmentAggregates

func (s *Store) RecalculateAgentAssignmentAggregates(assignmentID string) error

RecalculateAgentAssignmentAggregates recalculates all aggregate metrics for an agent assignment. Use this for backfill operations or to fix inconsistent aggregates.

func (*Store) RecalculateTaskAggregates

func (s *Store) RecalculateTaskAggregates(taskID string) error

RecalculateTaskAggregates recalculates all aggregate metrics for a task from its spans. Use this for backfill operations or to fix inconsistent aggregates.

func (*Store) RunRetention

func (s *Store) RunRetention(ctx context.Context) (RetentionStats, error)

RunRetention deletes rows older than their TTL and checkpoints the WAL.

TTLs:

  • spans, trace_summaries, metrics: 7 days
  • chat_messages, session_tools: 30 days

Time columns are stored as TEXT (ISO-8601 via the go-sqlite3 driver), so comparisons go through SQLite's datetime() function on both sides. Comparing a TEXT column directly against an int64 Unix/UnixNano cutoff silently matches zero rows — SQLite's storage-class ordering always ranks INTEGER < TEXT.

Deletions are chunked to avoid WAL bloat when another connection holds back the checkpoint. VACUUM is not attempted automatically: it requires an exclusive lock that the running server process blocks, and the failure modes are ugly (temp file growth, SQLITE_BUSY loops). VACUUM should be run manually during a maintenance window when 'ailang serve' is stopped.

Called periodically by the coordinator daemon and on startup if the DB is oversized.

func (*Store) UpdateAgentAssignment

func (s *Store) UpdateAgentAssignment(a *AgentAssignment) error

UpdateAgentAssignment updates an existing assignment.

func (*Store) UpdateChainMetrics

func (s *Store) UpdateChainMetrics(ctx context.Context, id string, cost float64, tokens, turns int) error

UpdateChainMetrics updates the denormalized metrics on a chain.

func (*Store) UpdateChainStatus

func (s *Store) UpdateChainStatus(ctx context.Context, id string, status ChainStatus) error

UpdateChainStatus updates the status of a chain.

func (*Store) UpdateMessage

func (s *Store) UpdateMessage(m *Message) error

UpdateMessage updates an existing message.

func (*Store) UpdateSessionEnded

func (s *Store) UpdateSessionEnded(ctx context.Context, sessionID string) error

UpdateSessionEnded marks a session as ended.

func (*Store) UpdateSpan

func (s *Store) UpdateSpan(span *Span) error

UpdateSpan updates an existing span.

func (s *Store) UpdateSpanLinks(spanID, taskID, assignmentID string) error

UpdateSpanLinks updates a span's task and agent assignment links. Used for session correlation to link orphaned spans to their parent task.

func (*Store) UpdateStageApproval

func (s *Store) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, approvalType ApprovalType, feedback string) error

UpdateStageApproval updates the approval status of a stage.

func (*Store) UpdateStageError

func (s *Store) UpdateStageError(ctx context.Context, stageID, errorMessage string) error

UpdateStageError records an error on a stage.

func (*Store) UpdateStageEvalAssessment

func (s *Store) UpdateStageEvalAssessment(ctx context.Context, stageID string, assessment *EvalAssessment) error

UpdateStageEvalAssessment stores eval assessment data as JSON on a chain stage.

func (*Store) UpdateStageMetrics

func (s *Store) UpdateStageMetrics(ctx context.Context, stageID string, cost float64, tokensIn, tokensOut, turns, toolCalls int, durationMs int64) error

UpdateStageMetrics updates the denormalized metrics on a stage.

func (*Store) UpdateStageSession

func (s *Store) UpdateStageSession(ctx context.Context, stageID, sessionID string) error

UpdateStageSession links a session to a stage.

func (*Store) UpdateStageStatus

func (s *Store) UpdateStageStatus(ctx context.Context, id string, status ChainStageStatus) error

UpdateStageStatus updates a stage's execution status.

func (*Store) UpdateTask

func (s *Store) UpdateTask(t *Task) error

UpdateTask updates an existing task.

func (*Store) UpdateToolEnd

func (s *Store) UpdateToolEnd(ctx context.Context, toolUseID, toolResponse string, success bool) error

UpdateToolEnd records the completion of a tool call.

func (*Store) UpdateWorkspace

func (s *Store) UpdateWorkspace(w *Workspace) error

UpdateWorkspace updates an existing workspace.

func (*Store) UpsertSession

func (s *Store) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error

UpsertSession inserts or updates a session record. If the session exists, updates workspace and claude_version (for reconnection scenarios).

func (*Store) UpsertSessionWithCorrelation

func (s *Store) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, corr *SessionCorrelation) error

UpsertSessionWithCorrelation inserts or updates a session with optional correlation IDs. Correlation IDs enable deterministic linking from Claude Code hooks via AILANG_* env vars.

type Subscription

type Subscription struct {
	// Filter events by workspace
	WorkspaceID string `json:"workspace_id,omitempty"`
	// Filter events by task
	TaskID string `json:"task_id,omitempty"`
	// Filter events by type (empty = all types)
	EventTypes []WSEventType `json:"event_types,omitempty"`
}

Subscription represents a client's subscription preferences.

type Task

type Task struct {
	ID           string         `json:"id"`
	WorkspaceID  string         `json:"workspace_id"`
	ParentTaskID string         `json:"parent_task_id,omitempty"` // Links to parent task for handoff chains
	Title        string         `json:"title"`
	Description  string         `json:"description,omitempty"`
	SourceType   TaskSourceType `json:"source_type"`
	SourceRef    string         `json:"source_ref,omitempty"`
	Status       TaskStatus     `json:"status"`
	Priority     string         `json:"priority"`
	CreatedAt    time.Time      `json:"created_at"`
	StartedAt    *time.Time     `json:"started_at,omitempty"`
	CompletedAt  *time.Time     `json:"completed_at,omitempty"`

	// Aggregated metrics
	TotalDurationMs int64   `json:"total_duration_ms"`
	TotalTokensIn   int64   `json:"total_tokens_in"`
	TotalTokensOut  int64   `json:"total_tokens_out"`
	TotalCostUSD    float64 `json:"total_cost_usd"`
	AgentCount      int     `json:"agent_count"`
	SpanCount       int     `json:"span_count"`
	ErrorCount      int     `json:"error_count"`
}

Task represents a unit of work (from GitHub, messages, email, etc.).

type TaskAgentLookup

type TaskAgentLookup func(ctx context.Context, taskID string) (agentID, inbox, title string, err error)

TaskAgentLookup is a callback to resolve coordinator task_id to agent info. Returns: agentID (used for FromAgent), inbox (used for ToInbox), title If the task_id is not found, returns empty strings (not an error).

type TaskHierarchy

type TaskHierarchy struct {
	Task   *Task             `json:"task"`
	Agents []*AgentHierarchy `json:"agents"`
}

TaskHierarchy represents a task with its full agent and span hierarchy.

func GetTaskHierarchy

func GetTaskHierarchy(ctx context.Context, backend Backend, taskID string, opts HierarchyOptions) (*TaskHierarchy, error)

GetTaskHierarchy builds the full hierarchy for a task.

type TaskListOptions

type TaskListOptions struct {
	WorkspaceID  string
	ParentTaskID string // Filter by parent_task_id (for handoff chains)
	Status       TaskStatus
	SourceType   TaskSourceType
	Limit        int
	Offset       int
}

TaskListOptions configures task listing.

type TaskMetricStats

type TaskMetricStats struct {
	Metric string  `json:"metric"`
	Count  int     `json:"count"`   // Number of spans with non-zero values
	Sum    float64 `json:"sum"`     // Total value across all spans
	Mean   float64 `json:"mean"`    // Average value
	StdDev float64 `json:"std_dev"` // Standard deviation
	Min    float64 `json:"min"`
	Max    float64 `json:"max"`
}

TaskMetricStats holds statistical summary for a single metric within a task.

type TaskSourceType

type TaskSourceType string

TaskSourceType represents the origin of a task.

const (
	TaskSourceGitHub  TaskSourceType = "github_issue"
	TaskSourceMessage TaskSourceType = "message"
	TaskSourceEmail   TaskSourceType = "email"
	TaskSourceManual  TaskSourceType = "manual"
)

type TaskSpanSummary

type TaskSpanSummary struct {
	TaskID     string    `json:"task_id"`
	SpanCount  int       `json:"span_count"`
	TraceCount int       `json:"trace_count"`
	TokensIn   int64     `json:"tokens_in"`
	TokensOut  int64     `json:"tokens_out"`
	CostUSD    float64   `json:"cost_usd"`
	DurationMs int64     `json:"duration_ms"`
	StartTime  time.Time `json:"start_time"`
	EndTime    time.Time `json:"end_time"`
	// Top span names by count (e.g., "api_request: 487, claude_code.tool.Read: 338")
	TopSpanNames []SpanNameCount `json:"top_span_names"`
}

extractServiceName extracts service.name from resource_attributes JSON. TaskSpanSummary holds aggregated span statistics for a task_id. Used by "ailang chains find --task-id" when no execution_chain exists (e.g., Claude Code user sessions that aren't coordinator-managed).

type TaskStatus

type TaskStatus string

TaskStatus represents the status of a task.

const (
	TaskStatusPending   TaskStatus = "pending"
	TaskStatusRunning   TaskStatus = "running"
	TaskStatusCompleted TaskStatus = "completed"
	TaskStatusFailed    TaskStatus = "failed"
)

type TaskTimeline

type TaskTimeline struct {
	TaskID     string     `json:"task_id"`
	Title      string     `json:"title"`
	Status     TaskStatus `json:"status"`
	SpanID     string     `json:"span_id,omitempty"`
	SpanName   string     `json:"span_name,omitempty"`
	StartTime  *time.Time `json:"start_time,omitempty"`
	EndTime    *time.Time `json:"end_time,omitempty"`
	DurationMs int64      `json:"duration_ms,omitempty"`
	SpanStatus SpanStatus `json:"span_status,omitempty"`
	TokensIn   int64      `json:"tokens_in,omitempty"`
	TokensOut  int64      `json:"tokens_out,omitempty"`
	CostUSD    float64    `json:"cost_usd,omitempty"`
	Provider   Provider   `json:"provider,omitempty"`
}

TaskTimeline represents a task with its span timeline.

type TimeRange

type TimeRange struct {
	Start time.Time `json:"start"`
	End   time.Time `json:"end"`
}

TimeRange represents a time range for queries.

type Trace

type Trace struct {
	TraceID    string    `json:"trace_id"`
	RootSpan   *Span     `json:"root_span,omitempty"`
	Spans      []*Span   `json:"spans"`
	SpanCount  int       `json:"span_count"`
	DurationMs int64     `json:"duration_ms"`
	StartTime  time.Time `json:"start_time"`
	EndTime    time.Time `json:"end_time"`
}

Trace represents a complete trace with its span tree.

type TraceHierarchy

type TraceHierarchy struct {
	TraceID  string                 `json:"trace_id"`
	RootSpan *SpanNode              `json:"root_span,omitempty"`
	Spans    []*SpanNode            `json:"spans"` // Flat list for easy rendering
	Summary  *HierarchyTraceSummary `json:"summary"`
}

TraceHierarchy represents a trace with its span tree.

type TraceQuery

type TraceQuery struct {
	TaskID    string     `json:"task_id,omitempty"`
	TraceID   string     `json:"trace_id,omitempty"`
	TimeRange *TimeRange `json:"time_range,omitempty"`
	Limit     int        `json:"limit,omitempty"`
	Offset    int        `json:"offset,omitempty"`
}

TraceQuery represents query parameters for listing traces.

type TraceSource

type TraceSource string

TraceSource indicates where the trace data originated from.

const (
	TraceSourceLocal  TraceSource = "local"  // Local OTLP receiver
	TraceSourceGCP    TraceSource = "gcp"    // Google Cloud Trace
	TraceSourceJaeger TraceSource = "jaeger" // Jaeger backend
)

type TraceSummary

type TraceSummary struct {
	TraceID     string      `json:"trace_id"`
	RootSpan    string      `json:"root_span"`
	SpanCount   int         `json:"span_count"`
	DurationMs  int64       `json:"duration_ms"`
	StartTime   time.Time   `json:"start_time"`
	Status      SpanStatus  `json:"status"`
	TaskID      string      `json:"task_id,omitempty"`
	ServiceName string      `json:"service_name,omitempty"` // e.g., "ailang-run", "ailang-eval", "claude-code"
	Source      TraceSource `json:"source,omitempty"`       // Where trace came from: local, gcp, jaeger
}

TraceSummary represents a summary of a trace for list views.

type TurnGroup

type TurnGroup struct {
	TurnNumber int         `json:"turn_number"`
	SpanID     string      `json:"span_id"`
	DurationMs int64       `json:"duration_ms"`
	Cost       float64     `json:"cost"`
	TokensIn   int64       `json:"tokens_in"`
	TokensOut  int64       `json:"tokens_out"`
	Tools      []*TurnTool `json:"tools,omitempty"`
}

TurnGroup represents a single conversation turn with its tools.

type TurnGroupSession

type TurnGroupSession struct {
	ID         string  `json:"id"`
	Name       string  `json:"name"`
	DurationMs int64   `json:"duration_ms"`
	Cost       float64 `json:"cost"`
	TokensIn   int64   `json:"tokens_in"`
	TokensOut  int64   `json:"tokens_out"`
	Provider   string  `json:"provider,omitempty"`
	Model      string  `json:"model,omitempty"`
}

TurnGroupSession represents the top-level session/executor span.

type TurnGroupStats

type TurnGroupStats struct {
	TotalTurns  int     `json:"total_turns"`
	TotalTools  int     `json:"total_tools"`
	TotalCost   float64 `json:"total_cost"`
	TotalTokens int64   `json:"total_tokens"`
	DurationMs  int64   `json:"duration_ms"`
}

TurnGroupStats contains aggregate statistics for the turn-grouped view.

type TurnGroupedHierarchy

type TurnGroupedHierarchy struct {
	Session *TurnGroupSession `json:"session,omitempty"`
	Turns   []*TurnGroup      `json:"turns"`
	Stats   *TurnGroupStats   `json:"stats"`
}

TurnGroupedHierarchy represents spans organized by conversation turns.

func GroupSpansByTurn

func GroupSpansByTurn(spans []*SpanNode) *TurnGroupedHierarchy

GroupSpansByTurn transforms a span tree into a turn-based hierarchy. This is useful for displaying execution in a more intuitive way.

type TurnTool

type TurnTool struct {
	ID         string  `json:"id"`
	Name       string  `json:"name"`
	ToolName   string  `json:"tool_name,omitempty"` // Extracted tool name (e.g., "Read", "Bash")
	DurationMs int64   `json:"duration_ms"`
	Cost       float64 `json:"cost,omitempty"`
	Status     string  `json:"status"`
}

TurnTool represents a tool call within a turn.

type ValueUnion

type ValueUnion struct {
	StringValue *string  `json:"stringValue,omitempty"`
	IntValue    *int64   `json:"intValue,omitempty"`
	DoubleValue *float64 `json:"doubleValue,omitempty"`
	BoolValue   *bool    `json:"boolValue,omitempty"`
}

ValueUnion represents an OTEL attribute value.

type WSEventType

type WSEventType string

WSEventType represents the type of real-time WebSocket event.

const (
	EventTypeSpanCreated       WSEventType = "span.created"
	EventTypeSpanUpdated       WSEventType = "span.updated"
	EventTypeTaskCreated       WSEventType = "task.created"
	EventTypeTaskUpdated       WSEventType = "task.updated"
	EventTypeTaskCompleted     WSEventType = "task.completed"
	EventTypeMessageCreated    WSEventType = "message.created"
	EventTypeMessageRead       WSEventType = "message.read"
	EventTypeAgentAssigned     WSEventType = "agent.assigned"
	EventTypeAgentCompleted    WSEventType = "agent.completed"
	EventTypeApprovalRequested WSEventType = "approval.requested"
	EventTypeApprovalDecision  WSEventType = "approval.decision"
	EventTypeMetricsUpdated    WSEventType = "metrics.updated"
)

type Workspace

type Workspace struct {
	ID        string    `json:"id"`
	Name      string    `json:"name"`
	Path      string    `json:"path"`
	GitRemote string    `json:"git_remote,omitempty"`
	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

Workspace represents a git repository or project root.

type WorkspaceMapping

type WorkspaceMapping interface {
	// BuildWorkspaceMappingSQL returns a SQL CASE statement mapping cwdColumn values to workspace IDs.
	BuildWorkspaceMappingSQL(cwdColumn string) string
	// GetWorkspaceLabel returns a human-friendly label for a workspace ID.
	GetWorkspaceLabel(workspaceID string) string
	// GetPathPatternsForWorkspace returns SQL LIKE patterns that match a workspace ID.
	// Used for reverse mapping (workspace ID → path patterns) when filtering spans.
	GetPathPatternsForWorkspace(workspaceID string) []string
}

WorkspaceMapping provides workspace path-to-ID and ID-to-label mappings. This interface allows the observatory package to use workspace config without importing coordinator.

type WorkspaceStats

type WorkspaceStats struct {
	ID           string    `json:"id"`
	Name         string    `json:"name"`
	Path         string    `json:"path"`
	TaskCount    int       `json:"task_count"`
	TotalCost    float64   `json:"total_cost"`
	TotalTokens  int64     `json:"total_tokens"`
	SuccessRate  float64   `json:"success_rate"`
	UniqueAgents int       `json:"unique_agents"`
	LastActivity time.Time `json:"last_activity,omitempty"`
}

WorkspaceStats contains aggregated metrics for a workspace.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL