Documentation
¶
Overview ¶
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides cross-trace merging and session-based correlation for hierarchy views.
Package observatory provides turn-based span grouping for hierarchy views.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides database migrations v8-v14 for the observatory schema.
Package observatory provides a unified observability platform for AILANG. It stores and queries traces, spans, metrics, and events from AILANG operations and external CLI tools (Claude Code, Gemini CLI).
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides helper functions for the OTLP receiver.
Package observatory provides OTLP metrics handling for the observatory.
Package observatory provides outlier detection for span metrics within tasks.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides span hierarchy query operations.
Package observatory provides execution chain CRUD operations.
Package observatory provides eval assessment operations for execution chains.
Package observatory provides chain query helpers, stats, and journey operations.
Package observatory provides chat message query methods for the Store.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides session tracking for Claude Code hooks.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides trace listing and task span summary operations.
Package observatory provides a unified observability platform for AILANG.
Package observatory provides a unified observability platform for AILANG.
Index ¶
- func CalculateCacheSavings(model string, cacheRead int64) float64
- func CalculateCostFromTokens(model string, tokensIn, tokensOut int64) float64
- func CalculateCostFromTokensWithCache(model string, tokensIn, tokensOut, cacheRead, cacheCreation int64) float64
- func CheckHealth(dbPath string)
- func DefaultDatabasePath() string
- func GetPricingConfig() *eval_harness.ModelsConfig
- func Migrate(db *sql.DB) error
- func MigrateWithVersion(db *sql.DB) (int, error)
- func RecalculateAgentAssignmentAggregates(ctx context.Context, db *sql.DB, assignmentID string) error
- func RecalculateTaskAggregates(ctx context.Context, db *sql.DB, taskID string) error
- func ResetPricingConfig()
- func UpdateAgentAssignmentAggregates(ctx context.Context, tx *sql.Tx, span *Span) error
- func UpdateTaskAggregates(ctx context.Context, tx *sql.Tx, span *Span) error
- func ValidateSchema(db *sql.DB) error
- type API
- type AgentAssignment
- type AgentAssignmentStatus
- type AgentHierarchy
- type AgentStats
- type AgentStatsResult
- type ApprovalStatus
- type ApprovalType
- type Backend
- type BreakdownItem
- type ChainCreateRequest
- type ChainListOptions
- type ChainReadOptions
- type ChainSourceType
- type ChainStage
- type ChainStageStatus
- type ChainStats
- type ChainStatus
- type ChainStatusCounts
- type ChainSummary
- type ChatContext
- type ChatMessage
- type ChatMessageQuery
- type ClaudeCodeEvent
- type ClaudeEvent
- type ClaudeMetrics
- type Client
- type CompositeBackend
- func (b *CompositeBackend) BackfillSpansWorkspace(ctx context.Context, sessionID, workspace string) (int64, error)
- func (b *CompositeBackend) Close() error
- func (b *CompositeBackend) CountChatMessages(ctx context.Context, q ChatMessageQuery) (int, int, error)
- func (b *CompositeBackend) CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error
- func (b *CompositeBackend) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)
- func (b *CompositeBackend) CreateMessage(ctx context.Context, m *Message) error
- func (b *CompositeBackend) CreateMetric(ctx context.Context, m *Metric) error
- func (b *CompositeBackend) CreateSpan(ctx context.Context, span *Span) error
- func (b *CompositeBackend) CreateSpanEvent(ctx context.Context, e *SpanEvent) error
- func (b *CompositeBackend) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)
- func (b *CompositeBackend) CreateTask(ctx context.Context, t *Task) error
- func (b *CompositeBackend) CreateWorkspace(ctx context.Context, w *Workspace) error
- func (b *CompositeBackend) DeleteAgentAssignment(ctx context.Context, id string) error
- func (b *CompositeBackend) DeleteMessage(ctx context.Context, id string) error
- func (b *CompositeBackend) DeleteSpan(ctx context.Context, id string) error
- func (b *CompositeBackend) DeleteSpanEvent(ctx context.Context, id int64) error
- func (b *CompositeBackend) DeleteTask(ctx context.Context, id string) error
- func (b *CompositeBackend) DeleteWorkspace(ctx context.Context, id string) error
- func (b *CompositeBackend) FindLatestUnfinishedTool(ctx context.Context, sessionID, toolName string) (string, error)
- func (b *CompositeBackend) GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)
- func (b *CompositeBackend) GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)
- func (b *CompositeBackend) GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)
- func (b *CompositeBackend) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)
- func (b *CompositeBackend) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)
- func (b *CompositeBackend) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)
- func (b *CompositeBackend) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)
- func (b *CompositeBackend) GetChainStats(ctx context.Context) (*ChainStats, error)
- func (b *CompositeBackend) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)
- func (b *CompositeBackend) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)
- func (b *CompositeBackend) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)
- func (b *CompositeBackend) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)
- func (b *CompositeBackend) GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)
- func (b *CompositeBackend) GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)
- func (b *CompositeBackend) GetMessage(ctx context.Context, id string) (*Message, error)
- func (b *CompositeBackend) GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)
- func (b *CompositeBackend) GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)
- func (b *CompositeBackend) GetSession(ctx context.Context, sessionID string) (*Session, error)
- func (b *CompositeBackend) GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)
- func (b *CompositeBackend) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)
- func (b *CompositeBackend) GetSessionWorkspace(sessionID string) (string, error)
- func (b *CompositeBackend) GetSpan(ctx context.Context, id string) (*Span, error)
- func (b *CompositeBackend) GetSpanEvents(ctx context.Context, spanID string) ([]SpanEvent, error)
- func (b *CompositeBackend) GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)
- func (b *CompositeBackend) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)
- func (b *CompositeBackend) GetSpansByStageID(ctx context.Context, stageID string) ([]*Span, error)
- func (b *CompositeBackend) GetStage(ctx context.Context, id string) (*ChainStage, error)
- func (b *CompositeBackend) GetTask(ctx context.Context, id string) (*Task, error)
- func (b *CompositeBackend) GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)
- func (b *CompositeBackend) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)
- func (b *CompositeBackend) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)
- func (b *CompositeBackend) GetTrace(ctx context.Context, traceID string) (*Trace, error)
- func (b *CompositeBackend) GetWorkspace(ctx context.Context, id string) (*Workspace, error)
- func (b *CompositeBackend) GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)
- func (b *CompositeBackend) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error
- func (b *CompositeBackend) LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)
- func (b *CompositeBackend) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error
- func (b *CompositeBackend) ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)
- func (b *CompositeBackend) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)
- func (b *CompositeBackend) ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)
- func (b *CompositeBackend) ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)
- func (b *CompositeBackend) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)
- func (b *CompositeBackend) ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)
- func (b *CompositeBackend) ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)
- func (b *CompositeBackend) ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)
- func (b *CompositeBackend) ListWorkspaces(ctx context.Context) ([]*Workspace, error)
- func (b *CompositeBackend) LookupTaskBySessionID(ctx context.Context, sessionID string) (string, string, string)
- func (b *CompositeBackend) MarkMessageArchived(ctx context.Context, id string) error
- func (b *CompositeBackend) MarkMessageRead(ctx context.Context, id string) error
- func (b *CompositeBackend) RecalculateTaskAggregates(ctx context.Context, taskID string) error
- func (b *CompositeBackend) UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error
- func (b *CompositeBackend) UpdateChainMetrics(ctx context.Context, id string, cost float64, tokens, turns int) error
- func (b *CompositeBackend) UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error
- func (b *CompositeBackend) UpdateMessage(ctx context.Context, m *Message) error
- func (b *CompositeBackend) UpdateSessionEnded(ctx context.Context, sessionID string) error
- func (b *CompositeBackend) UpdateSpan(ctx context.Context, span *Span) error
- func (b *CompositeBackend) UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error
- func (b *CompositeBackend) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, ...) error
- func (b *CompositeBackend) UpdateStageError(ctx context.Context, stageID, errorMessage string) error
- func (b *CompositeBackend) UpdateStageMetrics(ctx context.Context, stageID string, cost float64, ...) error
- func (b *CompositeBackend) UpdateStageSession(ctx context.Context, stageID, sessionID string) error
- func (b *CompositeBackend) UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error
- func (b *CompositeBackend) UpdateTask(ctx context.Context, t *Task) error
- func (b *CompositeBackend) UpdateToolEnd(ctx context.Context, toolUseID, toolResponse string, success bool) error
- func (b *CompositeBackend) UpdateWorkspace(ctx context.Context, w *Workspace) error
- func (b *CompositeBackend) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error
- func (b *CompositeBackend) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, ...) error
- type CompositeConfig
- type ControlPlaneFilter
- type CumulativePoint
- type EvalAssessment
- type EvalQueryOptions
- type Event
- type EventType
- type ExecHierarchyWithMessages
- type ExecTaskNode
- type ExecutionChain
- type FilterPattern
- type GCPConfig
- type GCPTraceBackend
- func (b *GCPTraceBackend) BackfillSpansWorkspace(ctx context.Context, sessionID, workspace string) (int64, error)
- func (b *GCPTraceBackend) Close() error
- func (b *GCPTraceBackend) CountChatMessages(ctx context.Context, q ChatMessageQuery) (int, int, error)
- func (b *GCPTraceBackend) CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error
- func (b *GCPTraceBackend) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)
- func (b *GCPTraceBackend) CreateMessage(ctx context.Context, m *Message) error
- func (b *GCPTraceBackend) CreateMetric(ctx context.Context, m *Metric) error
- func (b *GCPTraceBackend) CreateSpan(ctx context.Context, span *Span) error
- func (b *GCPTraceBackend) CreateSpanEvent(ctx context.Context, e *SpanEvent) error
- func (b *GCPTraceBackend) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)
- func (b *GCPTraceBackend) CreateTask(ctx context.Context, t *Task) error
- func (b *GCPTraceBackend) CreateWorkspace(ctx context.Context, w *Workspace) error
- func (b *GCPTraceBackend) DeleteAgentAssignment(ctx context.Context, id string) error
- func (b *GCPTraceBackend) DeleteMessage(ctx context.Context, id string) error
- func (b *GCPTraceBackend) DeleteSpan(ctx context.Context, id string) error
- func (b *GCPTraceBackend) DeleteSpanEvent(ctx context.Context, id int64) error
- func (b *GCPTraceBackend) DeleteTask(ctx context.Context, id string) error
- func (b *GCPTraceBackend) DeleteWorkspace(ctx context.Context, id string) error
- func (b *GCPTraceBackend) FindLatestUnfinishedTool(ctx context.Context, sessionID, toolName string) (string, error)
- func (b *GCPTraceBackend) GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)
- func (b *GCPTraceBackend) GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)
- func (b *GCPTraceBackend) GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)
- func (b *GCPTraceBackend) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)
- func (b *GCPTraceBackend) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)
- func (b *GCPTraceBackend) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)
- func (b *GCPTraceBackend) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)
- func (b *GCPTraceBackend) GetChainStats(ctx context.Context) (*ChainStats, error)
- func (b *GCPTraceBackend) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)
- func (b *GCPTraceBackend) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)
- func (b *GCPTraceBackend) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)
- func (b *GCPTraceBackend) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)
- func (b *GCPTraceBackend) GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)
- func (b *GCPTraceBackend) GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)
- func (b *GCPTraceBackend) GetMessage(ctx context.Context, id string) (*Message, error)
- func (b *GCPTraceBackend) GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)
- func (b *GCPTraceBackend) GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)
- func (b *GCPTraceBackend) GetSession(ctx context.Context, sessionID string) (*Session, error)
- func (b *GCPTraceBackend) GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)
- func (b *GCPTraceBackend) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)
- func (b *GCPTraceBackend) GetSessionWorkspace(sessionID string) (string, error)
- func (b *GCPTraceBackend) GetSpan(ctx context.Context, id string) (*Span, error)
- func (b *GCPTraceBackend) GetSpanEvents(ctx context.Context, spanID string) ([]SpanEvent, error)
- func (b *GCPTraceBackend) GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)
- func (b *GCPTraceBackend) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)
- func (b *GCPTraceBackend) GetSpansByStageID(ctx context.Context, stageID string) ([]*Span, error)
- func (b *GCPTraceBackend) GetStage(ctx context.Context, id string) (*ChainStage, error)
- func (b *GCPTraceBackend) GetTask(ctx context.Context, id string) (*Task, error)
- func (b *GCPTraceBackend) GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)
- func (b *GCPTraceBackend) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)
- func (b *GCPTraceBackend) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)
- func (b *GCPTraceBackend) GetTrace(ctx context.Context, traceID string) (*Trace, error)
- func (b *GCPTraceBackend) GetWorkspace(ctx context.Context, id string) (*Workspace, error)
- func (b *GCPTraceBackend) GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)
- func (b *GCPTraceBackend) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error
- func (b *GCPTraceBackend) LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)
- func (b *GCPTraceBackend) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error
- func (b *GCPTraceBackend) ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)
- func (b *GCPTraceBackend) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)
- func (b *GCPTraceBackend) ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)
- func (b *GCPTraceBackend) ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)
- func (b *GCPTraceBackend) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)
- func (b *GCPTraceBackend) ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)
- func (b *GCPTraceBackend) ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)
- func (b *GCPTraceBackend) ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)
- func (b *GCPTraceBackend) ListWorkspaces(ctx context.Context) ([]*Workspace, error)
- func (b *GCPTraceBackend) LookupTaskBySessionID(ctx context.Context, sessionID string) (string, string, string)
- func (b *GCPTraceBackend) MarkMessageArchived(ctx context.Context, id string) error
- func (b *GCPTraceBackend) MarkMessageRead(ctx context.Context, id string) error
- func (b *GCPTraceBackend) RecalculateTaskAggregates(ctx context.Context, taskID string) error
- func (b *GCPTraceBackend) UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error
- func (b *GCPTraceBackend) UpdateChainMetrics(ctx context.Context, id string, cost float64, tokens, turns int) error
- func (b *GCPTraceBackend) UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error
- func (b *GCPTraceBackend) UpdateMessage(ctx context.Context, m *Message) error
- func (b *GCPTraceBackend) UpdateSessionEnded(ctx context.Context, sessionID string) error
- func (b *GCPTraceBackend) UpdateSpan(ctx context.Context, span *Span) error
- func (b *GCPTraceBackend) UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error
- func (b *GCPTraceBackend) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, ...) error
- func (b *GCPTraceBackend) UpdateStageError(ctx context.Context, stageID, errorMessage string) error
- func (b *GCPTraceBackend) UpdateStageMetrics(ctx context.Context, stageID string, cost float64, ...) error
- func (b *GCPTraceBackend) UpdateStageSession(ctx context.Context, stageID, sessionID string) error
- func (b *GCPTraceBackend) UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error
- func (b *GCPTraceBackend) UpdateTask(ctx context.Context, t *Task) error
- func (b *GCPTraceBackend) UpdateToolEnd(ctx context.Context, toolUseID, toolResponse string, success bool) error
- func (b *GCPTraceBackend) UpdateWorkspace(ctx context.Context, w *Workspace) error
- func (b *GCPTraceBackend) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error
- func (b *GCPTraceBackend) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, ...) error
- type GeminiOTELSpan
- type HealthAction
- type HeatmapDataPoint
- type HierarchicalSpan
- type HierarchyOptions
- type HierarchyTraceSummary
- type Hub
- func (h *Hub) Broadcast(event *Event)
- func (h *Hub) BroadcastApprovalDecision(data any)
- func (h *Hub) BroadcastApprovalRequested(data any)
- func (h *Hub) BroadcastMessageCreated(msg *Message)
- func (h *Hub) BroadcastMetricsUpdated(summary *MetricsSummary)
- func (h *Hub) BroadcastSpanCreated(span *Span)
- func (h *Hub) BroadcastSpanUpdated(span *Span)
- func (h *Hub) BroadcastTaskCompleted(task *Task)
- func (h *Hub) BroadcastTaskCreated(task *Task)
- func (h *Hub) BroadcastTaskUpdated(task *Task)
- func (h *Hub) ClientCount() int
- func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request)
- func (h *Hub) Run()
- func (h *Hub) SetToken(token string)
- func (h *Hub) Stop()
- type JaegerBackend
- func (b *JaegerBackend) BackfillSpansWorkspace(ctx context.Context, sessionID, workspace string) (int64, error)
- func (b *JaegerBackend) Close() error
- func (b *JaegerBackend) CountChatMessages(ctx context.Context, q ChatMessageQuery) (int, int, error)
- func (b *JaegerBackend) CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error
- func (b *JaegerBackend) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)
- func (b *JaegerBackend) CreateMessage(ctx context.Context, m *Message) error
- func (b *JaegerBackend) CreateMetric(ctx context.Context, m *Metric) error
- func (b *JaegerBackend) CreateSpan(ctx context.Context, span *Span) error
- func (b *JaegerBackend) CreateSpanEvent(ctx context.Context, e *SpanEvent) error
- func (b *JaegerBackend) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)
- func (b *JaegerBackend) CreateTask(ctx context.Context, t *Task) error
- func (b *JaegerBackend) CreateWorkspace(ctx context.Context, w *Workspace) error
- func (b *JaegerBackend) DeleteAgentAssignment(ctx context.Context, id string) error
- func (b *JaegerBackend) DeleteMessage(ctx context.Context, id string) error
- func (b *JaegerBackend) DeleteSpan(ctx context.Context, id string) error
- func (b *JaegerBackend) DeleteSpanEvent(ctx context.Context, id int64) error
- func (b *JaegerBackend) DeleteTask(ctx context.Context, id string) error
- func (b *JaegerBackend) DeleteWorkspace(ctx context.Context, id string) error
- func (b *JaegerBackend) FindLatestUnfinishedTool(ctx context.Context, sessionID, toolName string) (string, error)
- func (b *JaegerBackend) GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)
- func (b *JaegerBackend) GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)
- func (b *JaegerBackend) GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)
- func (b *JaegerBackend) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)
- func (b *JaegerBackend) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)
- func (b *JaegerBackend) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)
- func (b *JaegerBackend) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)
- func (b *JaegerBackend) GetChainStats(ctx context.Context) (*ChainStats, error)
- func (b *JaegerBackend) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)
- func (b *JaegerBackend) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)
- func (b *JaegerBackend) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)
- func (b *JaegerBackend) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)
- func (b *JaegerBackend) GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)
- func (b *JaegerBackend) GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)
- func (b *JaegerBackend) GetMessage(ctx context.Context, id string) (*Message, error)
- func (b *JaegerBackend) GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)
- func (b *JaegerBackend) GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)
- func (b *JaegerBackend) GetSession(ctx context.Context, sessionID string) (*Session, error)
- func (b *JaegerBackend) GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)
- func (b *JaegerBackend) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)
- func (b *JaegerBackend) GetSessionWorkspace(sessionID string) (string, error)
- func (b *JaegerBackend) GetSpan(ctx context.Context, id string) (*Span, error)
- func (b *JaegerBackend) GetSpanEvents(ctx context.Context, spanID string) ([]SpanEvent, error)
- func (b *JaegerBackend) GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)
- func (b *JaegerBackend) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)
- func (b *JaegerBackend) GetSpansByStageID(ctx context.Context, stageID string) ([]*Span, error)
- func (b *JaegerBackend) GetStage(ctx context.Context, id string) (*ChainStage, error)
- func (b *JaegerBackend) GetTask(ctx context.Context, id string) (*Task, error)
- func (b *JaegerBackend) GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)
- func (b *JaegerBackend) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)
- func (b *JaegerBackend) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)
- func (b *JaegerBackend) GetTrace(ctx context.Context, traceID string) (*Trace, error)
- func (b *JaegerBackend) GetWorkspace(ctx context.Context, id string) (*Workspace, error)
- func (b *JaegerBackend) GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)
- func (b *JaegerBackend) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error
- func (b *JaegerBackend) LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)
- func (b *JaegerBackend) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error
- func (b *JaegerBackend) ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)
- func (b *JaegerBackend) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)
- func (b *JaegerBackend) ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)
- func (b *JaegerBackend) ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)
- func (b *JaegerBackend) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)
- func (b *JaegerBackend) ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)
- func (b *JaegerBackend) ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)
- func (b *JaegerBackend) ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)
- func (b *JaegerBackend) ListWorkspaces(ctx context.Context) ([]*Workspace, error)
- func (b *JaegerBackend) LookupTaskBySessionID(ctx context.Context, sessionID string) (string, string, string)
- func (b *JaegerBackend) MarkMessageArchived(ctx context.Context, id string) error
- func (b *JaegerBackend) MarkMessageRead(ctx context.Context, id string) error
- func (b *JaegerBackend) RecalculateTaskAggregates(ctx context.Context, taskID string) error
- func (b *JaegerBackend) UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error
- func (b *JaegerBackend) UpdateChainMetrics(ctx context.Context, id string, cost float64, tokens, turns int) error
- func (b *JaegerBackend) UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error
- func (b *JaegerBackend) UpdateMessage(ctx context.Context, m *Message) error
- func (b *JaegerBackend) UpdateSessionEnded(ctx context.Context, sessionID string) error
- func (b *JaegerBackend) UpdateSpan(ctx context.Context, span *Span) error
- func (b *JaegerBackend) UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error
- func (b *JaegerBackend) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, ...) error
- func (b *JaegerBackend) UpdateStageError(ctx context.Context, stageID, errorMessage string) error
- func (b *JaegerBackend) UpdateStageMetrics(ctx context.Context, stageID string, cost float64, ...) error
- func (b *JaegerBackend) UpdateStageSession(ctx context.Context, stageID, sessionID string) error
- func (b *JaegerBackend) UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error
- func (b *JaegerBackend) UpdateTask(ctx context.Context, t *Task) error
- func (b *JaegerBackend) UpdateToolEnd(ctx context.Context, toolUseID, toolResponse string, success bool) error
- func (b *JaegerBackend) UpdateWorkspace(ctx context.Context, w *Workspace) error
- func (b *JaegerBackend) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error
- func (b *JaegerBackend) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, ...) error
- type JaegerConfig
- type JourneyResponse
- type JourneyStep
- type KeyValue
- type Message
- type MessageListOptions
- type MessageNode
- type MessageStatus
- type Metric
- type MetricListOptions
- type MetricsSummary
- type OTELEvent
- type OTELResource
- type OTELStatus
- type OTLPReceiver
- type OutlierAnalysis
- type OutlierOptions
- type PendingApprovalInfo
- type Provider
- type ProviderComparison
- type ProviderNormalizer
- func (n *ProviderNormalizer) NormalizeClaudeMetrics(metrics *ClaudeMetrics) (*Span, error)
- func (n *ProviderNormalizer) NormalizeGeminiSpan(otelSpan *GeminiOTELSpan, taskID string) (*Span, error)
- func (n *ProviderNormalizer) NormalizeGeminiTrace(otelSpans []*GeminiOTELSpan, taskID string) ([]*Span, error)
- type Range
- type RateAnalysis
- type RetentionStats
- type SQLiteBackend
- func (b *SQLiteBackend) BackfillSpansWorkspace(ctx context.Context, sessionID, workspace string) (int64, error)
- func (b *SQLiteBackend) Close() error
- func (b *SQLiteBackend) CountChatMessages(ctx context.Context, q ChatMessageQuery) (int, int, error)
- func (b *SQLiteBackend) CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error
- func (b *SQLiteBackend) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)
- func (b *SQLiteBackend) CreateMessage(ctx context.Context, m *Message) error
- func (b *SQLiteBackend) CreateMetric(ctx context.Context, m *Metric) error
- func (b *SQLiteBackend) CreateSpan(ctx context.Context, span *Span) error
- func (b *SQLiteBackend) CreateSpanEvent(ctx context.Context, e *SpanEvent) error
- func (b *SQLiteBackend) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)
- func (b *SQLiteBackend) CreateTask(ctx context.Context, t *Task) error
- func (b *SQLiteBackend) CreateWorkspace(ctx context.Context, w *Workspace) error
- func (b *SQLiteBackend) DB() *sql.DB
- func (b *SQLiteBackend) DeleteAgentAssignment(ctx context.Context, id string) error
- func (b *SQLiteBackend) DeleteMessage(ctx context.Context, id string) error
- func (b *SQLiteBackend) DeleteSpan(ctx context.Context, id string) error
- func (b *SQLiteBackend) DeleteSpanEvent(ctx context.Context, id int64) error
- func (b *SQLiteBackend) DeleteTask(ctx context.Context, id string) error
- func (b *SQLiteBackend) DeleteWorkspace(ctx context.Context, id string) error
- func (b *SQLiteBackend) FindLatestUnfinishedTool(ctx context.Context, sessionID, toolName string) (string, error)
- func (b *SQLiteBackend) GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)
- func (b *SQLiteBackend) GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)
- func (b *SQLiteBackend) GetBreakdownByModel(ctx context.Context) ([]BreakdownItem, error)
- func (b *SQLiteBackend) GetBreakdownByProvider(ctx context.Context) ([]BreakdownItem, error)
- func (b *SQLiteBackend) GetBreakdownBySourceType(ctx context.Context) ([]BreakdownItem, error)
- func (b *SQLiteBackend) GetBreakdownByWorkspace(ctx context.Context) ([]BreakdownItem, error)
- func (b *SQLiteBackend) GetBreakdownByWorkspaceWithMapping(ctx context.Context, mapping WorkspaceMapping, wsConfig WorkspaceMapping) ([]BreakdownItem, error)
- func (b *SQLiteBackend) GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)
- func (b *SQLiteBackend) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)
- func (b *SQLiteBackend) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)
- func (b *SQLiteBackend) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)
- func (b *SQLiteBackend) GetChainJourney(ctx context.Context, chainID string) (*JourneyResponse, error)
- func (b *SQLiteBackend) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)
- func (b *SQLiteBackend) GetChainStats(ctx context.Context) (*ChainStats, error)
- func (b *SQLiteBackend) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)
- func (b *SQLiteBackend) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)
- func (b *SQLiteBackend) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)
- func (b *SQLiteBackend) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)
- func (b *SQLiteBackend) GetClaudeCodeEvents(ctx context.Context, limit int) ([]ClaudeCodeEvent, error)
- func (b *SQLiteBackend) GetClaudeCodeEventsWithLookup(ctx context.Context, limit int, lookup TaskAgentLookup, ...) ([]ClaudeCodeEvent, error)
- func (b *SQLiteBackend) GetClaudeCodeHierarchy(ctx context.Context, sessionID string) (*TaskHierarchy, error)
- func (b *SQLiteBackend) GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)
- func (b *SQLiteBackend) GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)
- func (b *SQLiteBackend) GetFilteredBreakdownByModel(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) ([]BreakdownItem, error)
- func (b *SQLiteBackend) GetFilteredBreakdownByProvider(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) ([]BreakdownItem, error)
- func (b *SQLiteBackend) GetFilteredBreakdownBySourceType(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) ([]BreakdownItem, error)
- func (b *SQLiteBackend) GetFilteredBreakdownByWorkspace(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) ([]BreakdownItem, error)
- func (b *SQLiteBackend) GetFilteredBreakdownByWorkspaceWithMapping(ctx context.Context, filter *ControlPlaneFilter, mapping WorkspaceMapping, ...) ([]BreakdownItem, error)
- func (b *SQLiteBackend) GetFilteredHeatmapData(ctx context.Context, filter *ControlPlaneFilter, days int, ...) ([]HeatmapDataPoint, error)
- func (b *SQLiteBackend) GetFilteredMetricsSummary(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) (*MetricsSummary, error)
- func (b *SQLiteBackend) GetMessage(ctx context.Context, id string) (*Message, error)
- func (b *SQLiteBackend) GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)
- func (b *SQLiteBackend) GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)
- func (b *SQLiteBackend) GetSession(ctx context.Context, sessionID string) (*Session, error)
- func (b *SQLiteBackend) GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)
- func (b *SQLiteBackend) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)
- func (b *SQLiteBackend) GetSessionWorkspace(sessionID string) (string, error)
- func (b *SQLiteBackend) GetSpan(ctx context.Context, id string) (*Span, error)
- func (b *SQLiteBackend) GetSpanEvents(ctx context.Context, spanID string) ([]SpanEvent, error)
- func (b *SQLiteBackend) GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)
- func (b *SQLiteBackend) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)
- func (b *SQLiteBackend) GetSpansByStageID(ctx context.Context, stageID string) ([]*Span, error)
- func (b *SQLiteBackend) GetStage(ctx context.Context, id string) (*ChainStage, error)
- func (b *SQLiteBackend) GetTask(ctx context.Context, id string) (*Task, error)
- func (b *SQLiteBackend) GetTaskSpanSummary(ctx context.Context, taskID string) (*TaskSpanSummary, error)
- func (b *SQLiteBackend) GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)
- func (b *SQLiteBackend) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)
- func (b *SQLiteBackend) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)
- func (b *SQLiteBackend) GetTrace(ctx context.Context, traceID string) (*Trace, error)
- func (b *SQLiteBackend) GetWorkspace(ctx context.Context, id string) (*Workspace, error)
- func (b *SQLiteBackend) GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)
- func (b *SQLiteBackend) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error
- func (b *SQLiteBackend) LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)
- func (b *SQLiteBackend) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error
- func (b *SQLiteBackend) ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)
- func (b *SQLiteBackend) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)
- func (b *SQLiteBackend) ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)
- func (b *SQLiteBackend) ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)
- func (b *SQLiteBackend) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)
- func (b *SQLiteBackend) ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)
- func (b *SQLiteBackend) ListSpansByTaskIDs(ctx context.Context, taskIDs []string, limitPerTask int) (map[string][]*Span, error)
- func (b *SQLiteBackend) ListSpansLightweight(ctx context.Context, opts SpanListOptions) ([]*Span, error)
- func (b *SQLiteBackend) ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)
- func (b *SQLiteBackend) ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)
- func (b *SQLiteBackend) ListWorkspaces(ctx context.Context) ([]*Workspace, error)
- func (b *SQLiteBackend) LookupTaskBySessionID(ctx context.Context, sessionID string) (taskID, assignmentID, traceID string)
- func (b *SQLiteBackend) MarkMessageArchived(ctx context.Context, id string) error
- func (b *SQLiteBackend) MarkMessageRead(ctx context.Context, id string) error
- func (b *SQLiteBackend) RecalculateTaskAggregates(ctx context.Context, taskID string) error
- func (b *SQLiteBackend) Store() *Store
- func (b *SQLiteBackend) UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error
- func (b *SQLiteBackend) UpdateChainMetrics(ctx context.Context, id string, cost float64, tokens, turns int) error
- func (b *SQLiteBackend) UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error
- func (b *SQLiteBackend) UpdateMessage(ctx context.Context, m *Message) error
- func (b *SQLiteBackend) UpdateSessionEnded(ctx context.Context, sessionID string) error
- func (b *SQLiteBackend) UpdateSpan(ctx context.Context, span *Span) error
- func (b *SQLiteBackend) UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error
- func (b *SQLiteBackend) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, ...) error
- func (b *SQLiteBackend) UpdateStageError(ctx context.Context, stageID, errorMessage string) error
- func (b *SQLiteBackend) UpdateStageMetrics(ctx context.Context, stageID string, cost float64, ...) error
- func (b *SQLiteBackend) UpdateStageSession(ctx context.Context, stageID, sessionID string) error
- func (b *SQLiteBackend) UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error
- func (b *SQLiteBackend) UpdateTask(ctx context.Context, t *Task) error
- func (b *SQLiteBackend) UpdateToolEnd(ctx context.Context, toolUseID, toolResponse string, success bool) error
- func (b *SQLiteBackend) UpdateWorkspace(ctx context.Context, w *Workspace) error
- func (b *SQLiteBackend) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error
- func (b *SQLiteBackend) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, ...) error
- type SeedConfig
- type SeedResult
- type Session
- type SessionCorrelation
- type SessionMetricsSummary
- type SessionTool
- type Span
- type SpanEvent
- type SpanFilterConfig
- type SpanHierarchyNode
- type SpanHierarchyNodeType
- type SpanHierarchyResult
- type SpanHierarchyStats
- type SpanKind
- type SpanListOptions
- type SpanLite
- type SpanLitePage
- type SpanNameCount
- type SpanNode
- type SpanOutlier
- type SpanStatus
- type StageCreateRequest
- type StageUpdateRequest
- type Store
- func (s *Store) BackfillSpansWorkspace(ctx context.Context, sessionID, workspace string) (int64, error)
- func (s *Store) Close() error
- func (s *Store) CountChatMessages(ctx context.Context, q ChatMessageQuery) (total int, withTaskID int, err error)
- func (s *Store) CreateAgentAssignment(a *AgentAssignment) error
- func (s *Store) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)
- func (s *Store) CreateMessage(m *Message) error
- func (s *Store) CreateMetric(m *Metric) error
- func (s *Store) CreateSpan(span *Span) error
- func (s *Store) CreateSpanEvent(e *SpanEvent) error
- func (s *Store) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)
- func (s *Store) CreateTask(t *Task) error
- func (s *Store) CreateWorkspace(w *Workspace) error
- func (s *Store) DB() *sql.DB
- func (s *Store) DeleteAgentAssignment(id string) error
- func (s *Store) DeleteChain(ctx context.Context, id string) error
- func (s *Store) DeleteMessage(id string) error
- func (s *Store) DeleteSpan(id string) error
- func (s *Store) DeleteSpanEvent(id int64) error
- func (s *Store) DeleteTask(id string) error
- func (s *Store) DeleteWorkspace(id string) error
- func (s *Store) FindLatestUnfinishedTool(ctx context.Context, sessionID, toolName string) (string, error)
- func (s *Store) GetAgentAssignment(id string) (*AgentAssignment, error)
- func (s *Store) GetAgentStats(agentID string) (*AgentStats, error)
- func (s *Store) GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)
- func (s *Store) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)
- func (s *Store) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)
- func (s *Store) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)
- func (s *Store) GetChainJourney(ctx context.Context, chainID string) (*JourneyResponse, error)
- func (s *Store) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)
- func (s *Store) GetChainStats(ctx context.Context) (*ChainStats, error)
- func (s *Store) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)
- func (s *Store) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)
- func (s *Store) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)
- func (s *Store) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)
- func (s *Store) GetExecTaskHierarchy(limit int) ([]*ExecTaskNode, error)
- func (s *Store) GetExecTaskHierarchyWithMessages(limit int) (*ExecHierarchyWithMessages, error)
- func (s *Store) GetMessage(id string) (*Message, error)
- func (s *Store) GetMetricsSummary() (*MetricsSummary, error)
- func (s *Store) GetProviderComparison() ([]*ProviderComparison, error)
- func (s *Store) GetSession(ctx context.Context, sessionID string) (*Session, error)
- func (s *Store) GetSessionMetricsSummary(sessionID string) (*SessionMetricsSummary, error)
- func (s *Store) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)
- func (s *Store) GetSessionWorkspace(sessionID string) (string, error)
- func (s *Store) GetSpan(id string) (*Span, error)
- func (s *Store) GetSpanEvents(spanID string) ([]SpanEvent, error)
- func (s *Store) GetSpanHierarchy(limit int) (*SpanHierarchyResult, error)
- func (s *Store) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)
- func (s *Store) GetSpansByStageID(ctx context.Context, stageID string) ([]*Span, error)
- func (s *Store) GetStage(ctx context.Context, id string) (*ChainStage, error)
- func (s *Store) GetStageEvalAssessment(ctx context.Context, stageID string) (*EvalAssessment, error)
- func (s *Store) GetTask(id string) (*Task, error)
- func (s *Store) GetTaskSpanSummary(ctx context.Context, taskID string) (*TaskSpanSummary, error)
- func (s *Store) GetTaskTimeline(taskID string) ([]*TaskTimeline, error)
- func (s *Store) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)
- func (s *Store) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)
- func (s *Store) GetToolsForSessions(ctx context.Context, sessionIDs []string) (map[string][]SessionTool, error)
- func (s *Store) GetTrace(traceID string) (*Trace, error)
- func (s *Store) GetWorkspace(id string) (*Workspace, error)
- func (s *Store) GetWorkspaceStats(workspaceID string) (*WorkspaceStats, error)
- func (s *Store) IncrementSessionTurns(ctx context.Context, sessionID string) error
- func (s *Store) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error
- func (s *Store) LinkOrphanedSpansBySession(sessionID, taskID, assignmentID string) (int64, error)
- func (s *Store) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error
- func (s *Store) ListAgentAssignments(taskID string) ([]*AgentAssignment, error)
- func (s *Store) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)
- func (s *Store) ListEvalChains(ctx context.Context, limit int) ([]*ChainSummary, error)
- func (s *Store) ListMessages(opts MessageListOptions) ([]*Message, error)
- func (s *Store) ListMetrics(opts MetricListOptions) ([]*Metric, error)
- func (s *Store) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)
- func (s *Store) ListRecentSessions(ctx context.Context, limit int) ([]Session, error)
- func (s *Store) ListSpans(opts SpanListOptions) ([]*Span, error)
- func (s *Store) ListSpansByTaskIDs(taskIDs []string, limitPerTask int) (map[string][]*Span, error)
- func (s *Store) ListSpansLightweight(opts SpanListOptions) ([]*Span, error)
- func (s *Store) ListTasks(opts TaskListOptions) ([]*Task, error)
- func (s *Store) ListTraces(opts TraceQuery) ([]*TraceSummary, error)
- func (s *Store) ListWorkspaces() ([]*Workspace, error)
- func (s *Store) LookupTaskBySessionID(sessionID string) (taskID, assignmentID, traceID string)
- func (s *Store) MarkMessageArchived(id string) error
- func (s *Store) MarkMessageRead(id string) error
- func (s *Store) QueryEvalResults(ctx context.Context, opts EvalQueryOptions) ([]*ChainStage, error)
- func (s *Store) RecalculateAgentAssignmentAggregates(assignmentID string) error
- func (s *Store) RecalculateTaskAggregates(taskID string) error
- func (s *Store) RunRetention(ctx context.Context) (RetentionStats, error)
- func (s *Store) UpdateAgentAssignment(a *AgentAssignment) error
- func (s *Store) UpdateChainMetrics(ctx context.Context, id string, cost float64, tokens, turns int) error
- func (s *Store) UpdateChainStatus(ctx context.Context, id string, status ChainStatus) error
- func (s *Store) UpdateMessage(m *Message) error
- func (s *Store) UpdateSessionEnded(ctx context.Context, sessionID string) error
- func (s *Store) UpdateSpan(span *Span) error
- func (s *Store) UpdateSpanLinks(spanID, taskID, assignmentID string) error
- func (s *Store) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, ...) error
- func (s *Store) UpdateStageError(ctx context.Context, stageID, errorMessage string) error
- func (s *Store) UpdateStageEvalAssessment(ctx context.Context, stageID string, assessment *EvalAssessment) error
- func (s *Store) UpdateStageMetrics(ctx context.Context, stageID string, cost float64, ...) error
- func (s *Store) UpdateStageSession(ctx context.Context, stageID, sessionID string) error
- func (s *Store) UpdateStageStatus(ctx context.Context, id string, status ChainStageStatus) error
- func (s *Store) UpdateTask(t *Task) error
- func (s *Store) UpdateToolEnd(ctx context.Context, toolUseID, toolResponse string, success bool) error
- func (s *Store) UpdateWorkspace(w *Workspace) error
- func (s *Store) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error
- func (s *Store) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, ...) error
- type Subscription
- type Task
- type TaskAgentLookup
- type TaskHierarchy
- type TaskListOptions
- type TaskMetricStats
- type TaskSourceType
- type TaskSpanSummary
- type TaskStatus
- type TaskTimeline
- type TimeRange
- type Trace
- type TraceHierarchy
- type TraceQuery
- type TraceSource
- type TraceSummary
- type TurnGroup
- type TurnGroupSession
- type TurnGroupStats
- type TurnGroupedHierarchy
- type TurnTool
- type ValueUnion
- type WSEventType
- type Workspace
- type WorkspaceMapping
- type WorkspaceStats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CalculateCacheSavings ¶
CalculateCacheSavings calculates how much was saved by using cache reads. Returns the difference between what full-price input would have cost and cache read cost. Cache reads are 90% cheaper than regular input tokens.
func CalculateCostFromTokens ¶
CalculateCostFromTokens calculates cost from model name and token counts. Returns 0.0 if:
- Pricing config not loaded
- Model not found in config
- Tokens are zero
This follows the "no silent fallbacks" principle - return 0 rather than guess.
func CalculateCostFromTokensWithCache ¶
func CalculateCostFromTokensWithCache(model string, tokensIn, tokensOut, cacheRead, cacheCreation int64) float64
CalculateCostFromTokensWithCache calculates cost including cache token pricing. Cache read tokens are charged at 10% of input price (90% discount). Cache creation tokens are charged at 125% of input price (25% premium). Returns 0.0 if model not found or tokens are all zero.
func CheckHealth ¶
func CheckHealth(dbPath string)
CheckHealth examines the observatory DB size and takes action. Fast path: just os.Stat (no DB open). Only opens the DB if cleanup is needed.
Thresholds:
- < 200MB: ok (no output)
- 200-500MB: warn (log only)
- 500MB-2GB: auto-cleanup (run retention)
- > 2GB: loud warning, no destructive action
The >2GB branch intentionally does NOT delete the database. The previous implementation called os.Remove(dbPath) on this threshold, which destroyed production data if the WAL bloated past 2GB (e.g. when retention issued a large DELETE with a long-lived reader holding back the WAL checkpoint). Silent data destruction violates the project's "no silent fallbacks" rule. If the DB is genuinely >2GB, the user needs to intervene (stop the server, VACUUM, etc.) — we surface the problem loudly and leave the data alone.
func DefaultDatabasePath ¶
func DefaultDatabasePath() string
DefaultDatabasePath returns the default path for the observatory database.
func GetPricingConfig ¶
func GetPricingConfig() *eval_harness.ModelsConfig
GetPricingConfig returns the loaded pricing configuration. Returns nil if not loaded. Used for testing.
func Migrate ¶
Migrate runs database migrations to create or update the observatory schema. It is idempotent - safe to call multiple times.
func MigrateWithVersion ¶
MigrateWithVersion runs migrations and tracks schema version. Returns the current schema version after migration.
func RecalculateAgentAssignmentAggregates ¶
func RecalculateAgentAssignmentAggregates(ctx context.Context, db *sql.DB, assignmentID string) error
RecalculateAgentAssignmentAggregates recalculates all aggregate metrics for an agent assignment. Use this for backfill operations or to fix inconsistent aggregates. Deprecated: Use Store.RecalculateAgentAssignmentAggregates instead for encapsulation.
func RecalculateTaskAggregates ¶
RecalculateTaskAggregates recalculates all aggregate metrics for a task from its spans. Use this for backfill operations or to fix inconsistent aggregates. Deprecated: Use Store.RecalculateTaskAggregates instead for encapsulation.
func ResetPricingConfig ¶
func ResetPricingConfig()
ResetPricingConfig resets the pricing config for testing. NOT for production use.
func UpdateAgentAssignmentAggregates ¶
UpdateAgentAssignmentAggregates updates an agent assignment's metrics based on a new span. This should be called within a transaction when creating a span with an agent_assignment_id.
func UpdateTaskAggregates ¶
UpdateTaskAggregates updates a task's aggregate metrics based on a new span. This should be called within a transaction when creating a span with a task_id.
func ValidateSchema ¶
ValidateSchema checks that all expected tables exist. Returns nil if schema is valid, error describing what's missing otherwise. NOTE: span_events table removed in v4 migration (M-DB-CLEANUP) NOTE: chat_messages and chat_import_status added in v6 migration (M-CHAT-HISTORY-DB) NOTE: execution_chains and chain_stages added in v7 migration (M-CHAINS-SIMPLIFY)
Types ¶
type API ¶
type API struct {
// contains filtered or unexported fields
}
API provides HTTP handlers for the observatory REST API.
func (*API) RegisterRoutes ¶
RegisterRoutes registers all observatory routes on the given mux. All routes are prefixed with /api/observatory/
type AgentAssignment ¶
type AgentAssignment struct {
ID string `json:"id"`
TaskID string `json:"task_id"`
AgentID string `json:"agent_id"`
Provider Provider `json:"provider"`
Status AgentAssignmentStatus `json:"status"`
AssignedAt time.Time `json:"assigned_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
ParentAssignmentID string `json:"parent_assignment_id,omitempty"`
// Agent-level aggregates
DurationMs int64 `json:"duration_ms"`
TokensIn int64 `json:"tokens_in"`
TokensOut int64 `json:"tokens_out"`
CostUSD float64 `json:"cost_usd"`
ToolCalls int `json:"tool_calls"`
Turns int `json:"turns"`
}
AgentAssignment represents a coordinator → agent delegation.
type AgentAssignmentStatus ¶
type AgentAssignmentStatus string
AgentAssignmentStatus represents the status of an agent assignment.
const ( AgentStatusPending AgentAssignmentStatus = "pending" AgentStatusRunning AgentAssignmentStatus = "running" AgentStatusCompleted AgentAssignmentStatus = "completed" AgentStatusFailed AgentAssignmentStatus = "failed" )
type AgentHierarchy ¶
type AgentHierarchy struct {
Agent *AgentAssignment `json:"agent"`
Traces []*TraceHierarchy `json:"traces"`
}
AgentHierarchy represents an agent assignment with its spans grouped by trace.
type AgentStats ¶
type AgentStats struct {
AgentID string `json:"agent_id"`
Provider Provider `json:"provider"`
ExecutionCount int `json:"execution_count"`
TotalDurationMs int64 `json:"total_duration_ms"`
AvgDurationMs float64 `json:"avg_duration_ms"`
TotalTokensIn int64 `json:"total_tokens_in"`
TotalTokensOut int64 `json:"total_tokens_out"`
TotalCost float64 `json:"total_cost"`
TotalToolCalls int `json:"total_tool_calls"`
SuccessRate float64 `json:"success_rate"`
}
AgentStats contains aggregated metrics for an agent.
type AgentStatsResult ¶
type AgentStatsResult struct {
AgentID string `json:"agent_id"`
Stages int `json:"stages"`
Completed int `json:"completed"`
Failed int `json:"failed"`
TotalCost float64 `json:"total_cost"`
TokensIn int `json:"total_tokens_in"`
TokensOut int `json:"total_tokens_out"`
}
AgentStatsResult holds per-agent aggregated stats from a single SQL query. Used by GetChainStatsByAgent to replace the N+1 query pattern (M-PERF-OBSERVATORY).
type ApprovalStatus ¶
type ApprovalStatus string
ApprovalStatus represents the status of an approval event.
const ( ApprovalStatusPending ApprovalStatus = "pending" ApprovalStatusApproved ApprovalStatus = "approved" ApprovalStatusRejected ApprovalStatus = "rejected" )
type ApprovalType ¶
type ApprovalType string
ApprovalType represents the type of approval required.
const ( ApprovalTypeMerge ApprovalType = "merge" ApprovalTypeHandoff ApprovalType = "handoff" ApprovalTypeMergeHandoff ApprovalType = "merge_handoff" )
type Backend ¶
type Backend interface {
// Workspace operations
CreateWorkspace(ctx context.Context, w *Workspace) error
GetWorkspace(ctx context.Context, id string) (*Workspace, error)
ListWorkspaces(ctx context.Context) ([]*Workspace, error)
UpdateWorkspace(ctx context.Context, w *Workspace) error
DeleteWorkspace(ctx context.Context, id string) error
GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)
// Task operations
CreateTask(ctx context.Context, t *Task) error
GetTask(ctx context.Context, id string) (*Task, error)
ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)
UpdateTask(ctx context.Context, t *Task) error
DeleteTask(ctx context.Context, id string) error
// Agent assignment operations
CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error
GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)
ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)
UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error
DeleteAgentAssignment(ctx context.Context, id string) error
GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)
// Span operations
CreateSpan(ctx context.Context, span *Span) error
GetSpan(ctx context.Context, id string) (*Span, error)
ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)
UpdateSpan(ctx context.Context, span *Span) error
UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error
RecalculateTaskAggregates(ctx context.Context, taskID string) error
DeleteSpan(ctx context.Context, id string) error
GetTrace(ctx context.Context, traceID string) (*Trace, error)
ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)
// LookupTaskBySessionID finds task hierarchy for Claude Code session correlation
LookupTaskBySessionID(ctx context.Context, sessionID string) (taskID, assignmentID, traceID string)
// LinkOrphanedSpansBySession updates spans with matching session.id that lack task linkage
// Called after storing claude.execute span to retroactively link orphaned Claude Code events
LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)
// Session operations (M-SESSION-WORKSPACE-HOOKS)
// GetSessionWorkspace returns workspace for a session (for span enrichment)
GetSessionWorkspace(sessionID string) (string, error)
// UpsertSession inserts or updates a session record from hook data
UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error
// UpsertSessionWithCorrelation inserts/updates a session with correlation IDs (M-DETERMINISTIC-CHAT-LINKING)
UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, corr *SessionCorrelation) error
// UpdateSessionEnded marks a session as ended
UpdateSessionEnded(ctx context.Context, sessionID string) error
// InsertToolStart records the start of a tool call
InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error
// FindLatestUnfinishedTool finds the most recent tool call that hasn't completed yet
// Used to correlate PostToolUse with PreToolUse when tool_use_id is not provided
FindLatestUnfinishedTool(ctx context.Context, sessionID, toolName string) (string, error)
// UpdateToolEnd records the completion of a tool call
UpdateToolEnd(ctx context.Context, toolUseID, toolResponse string, success bool) error
// GetToolForSpan finds the session_tool that best matches a span by timestamp + tool name
GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)
// BackfillSpansWorkspace updates existing spans that have session.id but missing workspace
BackfillSpansWorkspace(ctx context.Context, sessionID, workspace string) (int64, error)
// Span event operations
CreateSpanEvent(ctx context.Context, e *SpanEvent) error
GetSpanEvents(ctx context.Context, spanID string) ([]SpanEvent, error)
DeleteSpanEvent(ctx context.Context, id int64) error
// Message operations
CreateMessage(ctx context.Context, m *Message) error
GetMessage(ctx context.Context, id string) (*Message, error)
ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)
UpdateMessage(ctx context.Context, m *Message) error
DeleteMessage(ctx context.Context, id string) error
MarkMessageRead(ctx context.Context, id string) error
MarkMessageArchived(ctx context.Context, id string) error
// Aggregate operations
GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)
GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)
GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)
// GetExecTaskHierarchy returns hierarchy of ailang command spans (exec, run, check)
GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)
// GetExecTaskHierarchyWithMessages returns hierarchy grouped by triggering messages (4-level)
GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)
// GetSpanHierarchy returns hierarchical tree of spans using parent_span_id relationships
GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)
// GetToolsByTimestampRange returns session tools within a time range for enrichment
GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)
// Metric operations (Claude Code telemetry metrics)
CreateMetric(ctx context.Context, m *Metric) error
ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)
GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)
// Session detail operations (M-CHAINS-SOURCE-OF-TRUTH)
GetSession(ctx context.Context, sessionID string) (*Session, error)
GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)
// Chat message operations (M-CHAINS-SOURCE-OF-TRUTH)
GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)
GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)
CountChatMessages(ctx context.Context, q ChatMessageQuery) (total int, withTaskID int, err error)
// Chain operations (M-CHAINS-SIMPLIFY unified hierarchy)
CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)
GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)
GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)
GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)
GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)
ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)
UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error
UpdateChainMetrics(ctx context.Context, id string, cost float64, tokens, turns int) error
GetChainStats(ctx context.Context) (*ChainStats, error)
// GetChainStatusCounts returns chain counts grouped by status (single query, M-PERF-OBSERVATORY).
GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)
// GetChainStatsByAgent returns per-agent stats in a single SQL query (replaces N+1, M-PERF-OBSERVATORY).
GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)
// Chain stage operations
CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)
GetStage(ctx context.Context, id string) (*ChainStage, error)
GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)
UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error
UpdateStageSession(ctx context.Context, stageID, sessionID string) error
UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, approvalType ApprovalType, feedback string) error
UpdateStageMetrics(ctx context.Context, stageID string, cost float64, tokensIn, tokensOut, turns, toolCalls int, durationMs int64) error
UpdateStageError(ctx context.Context, stageID, errorMessage string) error
GetSpansByStageID(ctx context.Context, stageID string) ([]*Span, error)
// GetSpanLitesByStageID returns lightweight spans without attributes (M-PERF-OBSERVATORY).
GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)
LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error
ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)
// Lifecycle
Close() error
}
Backend defines the interface for observatory storage backends. Implementations include SQLite (local), GCP Trace, and Jaeger.
type BreakdownItem ¶
type BreakdownItem struct {
ID string `json:"id"`
Label string `json:"label"`
SpanCount int `json:"span_count"`
TaskCount int `json:"task_count,omitempty"`
TokensIn int64 `json:"tokens_in"`
TokensOut int64 `json:"tokens_out"`
CostUSD float64 `json:"cost_usd"`
DurationMs int64 `json:"duration_ms"` // Total execution time in ms
// Cache metrics
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheCreationTokens int64 `json:"cache_creation_tokens"`
CacheSavingsUSD float64 `json:"cache_savings_usd"`
}
BreakdownItem represents a single item in a breakdown aggregation
type ChainCreateRequest ¶
type ChainCreateRequest struct {
SourceType ChainSourceType `json:"source_type"`
SourceRef string `json:"source_ref,omitempty"`
GitHubRepo string `json:"github_repo,omitempty"`
GitHubIssueNumber int `json:"github_issue_number,omitempty"`
WorkspaceID string `json:"workspace_id,omitempty"`
WorkspacePath string `json:"workspace_path,omitempty"`
}
ChainCreateRequest contains the data needed to create a new chain.
type ChainListOptions ¶
type ChainListOptions struct {
Status ChainStatus `json:"status,omitempty"`
SourceType string `json:"source_type,omitempty"`
WorkspaceID string `json:"workspace_id,omitempty"`
GitHubRepo string `json:"github_repo,omitempty"`
AgentID string `json:"agent_id,omitempty"`
CreatedAfter *time.Time `json:"created_after,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
}
ChainListOptions specifies filters for listing chains.
type ChainReadOptions ¶
type ChainReadOptions struct {
IncludeStages bool `json:"include_stages"` // Fetch stages
IncludeSpans bool `json:"include_spans"` // Fetch spans for each stage
IncludeSessions bool `json:"include_sessions"` // Fetch session data for each stage
}
ChainWithStages returns the chain with all its stages populated. Options control what additional data is fetched.
func DefaultChainReadOptions ¶
func DefaultChainReadOptions() ChainReadOptions
DefaultChainReadOptions returns options for a full chain read.
type ChainSourceType ¶
type ChainSourceType string
ChainSourceType represents the origin of a chain.
const ( ChainSourceGitHubIssue ChainSourceType = "github_issue" ChainSourceMessage ChainSourceType = "message" ChainSourceManual ChainSourceType = "manual" ChainSourceEvalSuite ChainSourceType = "eval_suite" )
type ChainStage ¶
type ChainStage struct {
ID string `json:"id"` // UUID
ChainID string `json:"chain_id"`
StageNumber int `json:"stage_number"`
// Agent info
AgentID string `json:"agent_id"`
Provider Provider `json:"provider,omitempty"`
// Links to other tables (foreign keys or cross-DB IDs)
MessageID string `json:"message_id,omitempty"` // -> collaboration.db:inbox_messages.id
TaskID string `json:"task_id,omitempty"` // -> coordinator.db:tasks.id
SessionID string `json:"session_id,omitempty"` // -> sessions.session_id
// State
Status ChainStageStatus `json:"status"`
ApprovalStatus ApprovalStatus `json:"approval_status,omitempty"`
ApprovalType ApprovalType `json:"approval_type,omitempty"`
// Handoff info
HandoffTo string `json:"handoff_to,omitempty"`
Iteration int `json:"iteration"`
HumanFeedback string `json:"human_feedback,omitempty"`
// Timestamps
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
// Summary (denormalized from spans)
Cost float64 `json:"cost"`
TokensIn int `json:"tokens_in"`
TokensOut int `json:"tokens_out"`
Turns int `json:"turns"`
ToolCalls int `json:"tool_calls"`
DurationMs int64 `json:"duration_ms"`
// Error tracking
ErrorMessage string `json:"error_message,omitempty"`
ErrorCount int `json:"error_count"`
// Eval assessment (M-EVAL-CHAINS: populated for eval_suite chains)
EvalAssessment *EvalAssessment `json:"eval_assessment,omitempty"`
// Session data (populated on read with full=true)
Session *Session `json:"session,omitempty"`
// Spans (populated on read with include_spans=true)
Spans []*Span `json:"spans,omitempty"`
}
ChainStage represents a single agent execution within a chain.
type ChainStageStatus ¶
type ChainStageStatus string
ChainStageStatus represents the status of a chain stage.
const ( StageStatusPending ChainStageStatus = "pending" StageStatusRunning ChainStageStatus = "running" StageStatusAwaitingApproval ChainStageStatus = "awaiting_approval" StageStatusCompleted ChainStageStatus = "completed" StageStatusFailed ChainStageStatus = "failed" )
type ChainStats ¶
type ChainStats struct {
TotalChains int `json:"total_chains"`
ActiveChains int `json:"active_chains"`
PendingApprovals int `json:"pending_approvals"`
CompletedChains int `json:"completed_chains"`
FailedChains int `json:"failed_chains"`
TotalCost float64 `json:"total_cost"`
TotalTokens int64 `json:"total_tokens"`
AverageStagesCount float64 `json:"average_stages_count"`
AverageDurationMs float64 `json:"average_duration_ms"`
}
ChainStats contains aggregate statistics about chains.
type ChainStatus ¶
type ChainStatus string
ChainStatus represents the status of an execution chain.
const ( ChainStatusActive ChainStatus = "active" ChainStatusPendingApproval ChainStatus = "pending_approval" ChainStatusCompleted ChainStatus = "completed" ChainStatusFailed ChainStatus = "failed" )
type ChainStatusCounts ¶
type ChainStatusCounts struct {
Total int `json:"total_chains"`
Completed int `json:"completed"`
Active int `json:"active"`
Pending int `json:"pending_approval"`
Failed int `json:"failed"`
TotalCost float64 `json:"total_cost"`
TotalTokens int64 `json:"total_tokens"`
}
ChainStatusCounts holds per-status chain counts from a single SQL aggregation.
type ChainSummary ¶
type ChainSummary struct {
ID string `json:"id"`
SourceType string `json:"source_type"`
SourceRef string `json:"source_ref"`
GitHubRepo string `json:"github_repo,omitempty"`
GitHubIssueNumber int `json:"github_issue_number,omitempty"`
Status ChainStatus `json:"status"`
CurrentStage int `json:"current_stage"`
TotalCost float64 `json:"total_cost"`
TotalTokens int `json:"total_tokens"`
TotalTurns int `json:"total_turns"`
StagesCompleted int `json:"stages_completed"`
CreatedAt time.Time `json:"created_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
StageCount int `json:"stage_count"`
MaxStage int `json:"max_stage"`
AgentFlow string `json:"agent_flow"` // e.g., "design-doc-creator -> sprint-planner -> sprint-executor"
}
ChainSummary is a lightweight view of a chain for list views.
type ChatContext ¶
type ChatContext struct {
UserPrompt string `json:"user_prompt,omitempty"` // First 500 chars of user prompt
AssistantResponse string `json:"assistant_response,omitempty"` // First 500 chars of response
HasThinking bool `json:"has_thinking"`
TurnNumber int `json:"turn_number,omitempty"`
FullChatURL string `json:"full_chat_url,omitempty"` // Link to full conversation
}
ChatContext contains conversation content for spans with chat history. Populated when include_chat=true on API requests. This enables embedding chat prompts/responses directly in span data.
type ChatMessage ¶
type ChatMessage struct {
ID string `json:"id"`
SessionID string `json:"session_id"`
TurnNumber int `json:"turn_number"`
Role string `json:"role"`
ContentJSON string `json:"content_json,omitempty"` // Raw JSON content blocks
TokensIn int `json:"tokens_in,omitempty"`
TokensOut int `json:"tokens_out,omitempty"`
Model string `json:"model,omitempty"`
Timestamp time.Time `json:"timestamp"`
TaskID string `json:"task_id,omitempty"`
ChainID string `json:"chain_id,omitempty"`
}
ChatMessage represents a stored chat message from a Claude/Gemini session.
type ChatMessageQuery ¶
type ChatMessageQuery struct {
TaskID string
SessionID string
StartTime time.Time
EndTime time.Time
Limit int
}
ChatMessageQuery options for filtering chat messages.
type ClaudeCodeEvent ¶
type ClaudeCodeEvent struct {
ID string `json:"id"` // span_id (also used as task_id for hierarchy lookup)
CreatedAt string `json:"created_at"` // ISO8601 timestamp (matches inbox message format)
Type string `json:"message_type"` // "claude_code_turn"
FromAgent string `json:"from_agent"` // Agent ID (e.g., "design-doc-creator") or "claude-code" for user sessions
ToInbox string `json:"to_inbox"` // Agent inbox (e.g., "design-doc-creator") or "user" for user sessions
Title string `json:"title"` // "Claude Code Turn ($X.XX)"
TaskID string `json:"task_id"` // Same as ID for hierarchy lookup
Status string `json:"status"` // "read" (not actionable like inbox messages)
CostUSD float64 `json:"cost_usd"`
TokensIn int64 `json:"tokens_in"`
TokensOut int64 `json:"tokens_out"`
DurationMs int `json:"duration_ms"`
Workspace string `json:"workspace,omitempty"` // Working directory (from resource attributes)
Model string `json:"model,omitempty"` // Model used for this event (e.g., "claude-sonnet-4-5")
Provider string `json:"provider,omitempty"` // AI provider (e.g., "claude", "gemini")
Directive string `json:"directive,omitempty"` // Initial user prompt (truncated preview)
DirectiveFull string `json:"directive_full,omitempty"` // Full directive (for detail views)
TurnCount int `json:"turn_count"` // Number of turns in session
MetricsSummary string `json:"metrics_summary,omitempty"` // "3 turns • $0.42 • 12.5s"
}
ClaudeCodeEvent represents a Claude Code API request span formatted as an event for the Event Queue. Each api_request span becomes an event that can be clicked to show its hierarchy (tool calls correlated by timestamp).
type ClaudeEvent ¶
type ClaudeEvent struct {
Type string `json:"type"` // "tool_call", "approval", "error"
Timestamp time.Time `json:"timestamp"`
ToolName string `json:"tool_name,omitempty"`
ToolInput map[string]any `json:"tool_input,omitempty"`
ToolResult string `json:"tool_result,omitempty"`
Approved *bool `json:"approved,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
}
ClaudeEvent represents an event from Claude Code CLI.
type ClaudeMetrics ¶
type ClaudeMetrics struct {
SessionID string `json:"session_id"`
TaskID string `json:"task_id,omitempty"`
Model string `json:"model"`
TotalTokensIn int64 `json:"total_input_tokens"`
TotalTokenOut int64 `json:"total_output_tokens"`
TotalCostUSD float64 `json:"total_cost_usd"`
DurationMs int64 `json:"duration_ms"`
TurnCount int `json:"turn_count"`
ToolCalls int `json:"tool_calls"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Status string `json:"status"` // "completed", "failed", "cancelled"
ErrorMessage string `json:"error_message,omitempty"`
// Events captured during session
Events []ClaudeEvent `json:"events,omitempty"`
}
ClaudeMetrics represents metrics exported by Claude Code CLI. Claude exports metrics/events only, not full OTEL traces.
func ParseClaudeMetricsJSON ¶
func ParseClaudeMetricsJSON(data []byte) (*ClaudeMetrics, error)
ParseClaudeMetricsJSON parses Claude metrics from JSON.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents a connected WebSocket client.
type CompositeBackend ¶
type CompositeBackend struct {
// contains filtered or unexported fields
}
CompositeBackend implements Backend with write-local, read-remote pattern. - Writes always go to local SQLite for durability - Reads can query remote backends (GCP Trace, Jaeger) for historical traces - Merges results from multiple sources for comprehensive views
func NewCompositeBackend ¶
func NewCompositeBackend(config CompositeConfig) (*CompositeBackend, error)
NewCompositeBackend creates a new composite backend.
func (*CompositeBackend) BackfillSpansWorkspace ¶
func (*CompositeBackend) Close ¶
func (b *CompositeBackend) Close() error
Close closes all backends.
func (*CompositeBackend) CountChatMessages ¶
func (b *CompositeBackend) CountChatMessages(ctx context.Context, q ChatMessageQuery) (int, int, error)
func (*CompositeBackend) CreateAgentAssignment ¶
func (b *CompositeBackend) CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error
func (*CompositeBackend) CreateChain ¶
func (b *CompositeBackend) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)
func (*CompositeBackend) CreateMessage ¶
func (b *CompositeBackend) CreateMessage(ctx context.Context, m *Message) error
func (*CompositeBackend) CreateMetric ¶
func (b *CompositeBackend) CreateMetric(ctx context.Context, m *Metric) error
func (*CompositeBackend) CreateSpan ¶
func (b *CompositeBackend) CreateSpan(ctx context.Context, span *Span) error
func (*CompositeBackend) CreateSpanEvent ¶
func (b *CompositeBackend) CreateSpanEvent(ctx context.Context, e *SpanEvent) error
func (*CompositeBackend) CreateStage ¶
func (b *CompositeBackend) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)
func (*CompositeBackend) CreateTask ¶
func (b *CompositeBackend) CreateTask(ctx context.Context, t *Task) error
func (*CompositeBackend) CreateWorkspace ¶
func (b *CompositeBackend) CreateWorkspace(ctx context.Context, w *Workspace) error
func (*CompositeBackend) DeleteAgentAssignment ¶
func (b *CompositeBackend) DeleteAgentAssignment(ctx context.Context, id string) error
func (*CompositeBackend) DeleteMessage ¶
func (b *CompositeBackend) DeleteMessage(ctx context.Context, id string) error
func (*CompositeBackend) DeleteSpan ¶
func (b *CompositeBackend) DeleteSpan(ctx context.Context, id string) error
func (*CompositeBackend) DeleteSpanEvent ¶
func (b *CompositeBackend) DeleteSpanEvent(ctx context.Context, id int64) error
func (*CompositeBackend) DeleteTask ¶
func (b *CompositeBackend) DeleteTask(ctx context.Context, id string) error
func (*CompositeBackend) DeleteWorkspace ¶
func (b *CompositeBackend) DeleteWorkspace(ctx context.Context, id string) error
func (*CompositeBackend) FindLatestUnfinishedTool ¶
func (*CompositeBackend) GetAgentAssignment ¶
func (b *CompositeBackend) GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)
func (*CompositeBackend) GetAgentStats ¶
func (b *CompositeBackend) GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)
func (*CompositeBackend) GetChain ¶
func (b *CompositeBackend) GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)
func (*CompositeBackend) GetChainByGitHubIssue ¶
func (b *CompositeBackend) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)
func (*CompositeBackend) GetChainByMessageID ¶
func (b *CompositeBackend) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)
func (*CompositeBackend) GetChainByTaskID ¶
func (b *CompositeBackend) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)
func (*CompositeBackend) GetChainStages ¶
func (b *CompositeBackend) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)
func (*CompositeBackend) GetChainStats ¶
func (b *CompositeBackend) GetChainStats(ctx context.Context) (*ChainStats, error)
func (*CompositeBackend) GetChainStatsByAgent ¶
func (b *CompositeBackend) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)
func (*CompositeBackend) GetChainStatusCounts ¶
func (b *CompositeBackend) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)
func (*CompositeBackend) GetChatMessagesBySession ¶
func (b *CompositeBackend) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)
func (*CompositeBackend) GetChatMessagesByTaskID ¶
func (b *CompositeBackend) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)
func (*CompositeBackend) GetExecTaskHierarchy ¶
func (b *CompositeBackend) GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)
func (*CompositeBackend) GetExecTaskHierarchyWithMessages ¶
func (b *CompositeBackend) GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)
func (*CompositeBackend) GetMessage ¶
func (*CompositeBackend) GetMetricsSummary ¶
func (b *CompositeBackend) GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)
func (*CompositeBackend) GetProviderComparison ¶
func (b *CompositeBackend) GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)
func (*CompositeBackend) GetSession ¶
func (*CompositeBackend) GetSessionMetricsSummary ¶
func (b *CompositeBackend) GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)
func (*CompositeBackend) GetSessionTools ¶
func (b *CompositeBackend) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)
func (*CompositeBackend) GetSessionWorkspace ¶
func (b *CompositeBackend) GetSessionWorkspace(sessionID string) (string, error)
func (*CompositeBackend) GetSpanEvents ¶
func (*CompositeBackend) GetSpanHierarchy ¶
func (b *CompositeBackend) GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)
func (*CompositeBackend) GetSpanLitesByStageID ¶
func (b *CompositeBackend) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)
func (*CompositeBackend) GetSpansByStageID ¶
func (*CompositeBackend) GetStage ¶
func (b *CompositeBackend) GetStage(ctx context.Context, id string) (*ChainStage, error)
func (*CompositeBackend) GetTaskTimeline ¶
func (b *CompositeBackend) GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)
func (*CompositeBackend) GetToolForSpan ¶
func (b *CompositeBackend) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)
func (*CompositeBackend) GetToolsByTimestampRange ¶
func (b *CompositeBackend) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)
func (*CompositeBackend) GetWorkspace ¶
func (*CompositeBackend) GetWorkspaceStats ¶
func (b *CompositeBackend) GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)
func (*CompositeBackend) InsertToolStart ¶
func (b *CompositeBackend) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error
func (*CompositeBackend) LinkOrphanedSpansBySession ¶
func (b *CompositeBackend) LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)
LinkOrphanedSpansBySession delegates to local (session correlation is local only).
func (*CompositeBackend) LinkSpanToChain ¶
func (b *CompositeBackend) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error
func (*CompositeBackend) ListAgentAssignments ¶
func (b *CompositeBackend) ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)
func (*CompositeBackend) ListChains ¶
func (b *CompositeBackend) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)
func (*CompositeBackend) ListMessages ¶
func (b *CompositeBackend) ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)
func (*CompositeBackend) ListMetrics ¶
func (b *CompositeBackend) ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)
func (*CompositeBackend) ListPendingApprovals ¶
func (b *CompositeBackend) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)
func (*CompositeBackend) ListSpans ¶
func (b *CompositeBackend) ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)
func (*CompositeBackend) ListTasks ¶
func (b *CompositeBackend) ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)
func (*CompositeBackend) ListTraces ¶
func (b *CompositeBackend) ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)
func (*CompositeBackend) ListWorkspaces ¶
func (b *CompositeBackend) ListWorkspaces(ctx context.Context) ([]*Workspace, error)
func (*CompositeBackend) LookupTaskBySessionID ¶
func (b *CompositeBackend) LookupTaskBySessionID(ctx context.Context, sessionID string) (string, string, string)
LookupTaskBySessionID delegates to local (session correlation is local only).
func (*CompositeBackend) MarkMessageArchived ¶
func (b *CompositeBackend) MarkMessageArchived(ctx context.Context, id string) error
func (*CompositeBackend) MarkMessageRead ¶
func (b *CompositeBackend) MarkMessageRead(ctx context.Context, id string) error
func (*CompositeBackend) RecalculateTaskAggregates ¶
func (b *CompositeBackend) RecalculateTaskAggregates(ctx context.Context, taskID string) error
func (*CompositeBackend) UpdateAgentAssignment ¶
func (b *CompositeBackend) UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error
func (*CompositeBackend) UpdateChainMetrics ¶
func (*CompositeBackend) UpdateChainStatus ¶
func (b *CompositeBackend) UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error
func (*CompositeBackend) UpdateMessage ¶
func (b *CompositeBackend) UpdateMessage(ctx context.Context, m *Message) error
func (*CompositeBackend) UpdateSessionEnded ¶
func (b *CompositeBackend) UpdateSessionEnded(ctx context.Context, sessionID string) error
func (*CompositeBackend) UpdateSpan ¶
func (b *CompositeBackend) UpdateSpan(ctx context.Context, span *Span) error
func (*CompositeBackend) UpdateSpanLinks ¶
func (b *CompositeBackend) UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error
func (*CompositeBackend) UpdateStageApproval ¶
func (b *CompositeBackend) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, approvalType ApprovalType, feedback string) error
func (*CompositeBackend) UpdateStageError ¶
func (b *CompositeBackend) UpdateStageError(ctx context.Context, stageID, errorMessage string) error
func (*CompositeBackend) UpdateStageMetrics ¶
func (*CompositeBackend) UpdateStageSession ¶
func (b *CompositeBackend) UpdateStageSession(ctx context.Context, stageID, sessionID string) error
func (*CompositeBackend) UpdateStageStatus ¶
func (b *CompositeBackend) UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error
func (*CompositeBackend) UpdateTask ¶
func (b *CompositeBackend) UpdateTask(ctx context.Context, t *Task) error
func (*CompositeBackend) UpdateToolEnd ¶
func (*CompositeBackend) UpdateWorkspace ¶
func (b *CompositeBackend) UpdateWorkspace(ctx context.Context, w *Workspace) error
func (*CompositeBackend) UpsertSession ¶
func (b *CompositeBackend) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error
func (*CompositeBackend) UpsertSessionWithCorrelation ¶
func (b *CompositeBackend) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, corr *SessionCorrelation) error
type CompositeConfig ¶
type CompositeConfig struct {
Local Backend // Required: local storage backend
Remotes []Backend // Optional: remote backends for trace queries
}
CompositeConfig contains configuration for composite backend.
type ControlPlaneFilter ¶
type ControlPlaneFilter struct {
SourceType string // eval, coordinator, direct_api, local, other
Provider string // claude, gemini, openai, etc.
Model string // claude-sonnet-4-5, gemini-2-5-pro, etc.
Workspace string // workspace ID
StartDate string // YYYY-MM-DD format for time range filter (inclusive)
EndDate string // YYYY-MM-DD format for time range filter (inclusive)
}
ControlPlaneFilter defines filter parameters for Control Plane queries
func (*ControlPlaneFilter) HasTimeRange ¶
func (f *ControlPlaneFilter) HasTimeRange() bool
HasTimeRange returns true if time range filter is set
func (*ControlPlaneFilter) IsEmpty ¶
func (f *ControlPlaneFilter) IsEmpty() bool
IsEmpty returns true if no filters are set
type CumulativePoint ¶
type CumulativePoint struct {
SpanIndex int `json:"span_index"`
SpanName string `json:"span_name"`
Timestamp time.Time `json:"timestamp"`
Value float64 `json:"value"` // Value of this span
Cumulative float64 `json:"cumulative"` // Running total up to this span
DeltaPercent float64 `json:"delta_percent"` // This span's contribution as % of final total
}
CumulativePoint represents a point in the cumulative progression.
type EvalAssessment ¶
type EvalAssessment struct {
// Identity
BenchmarkID string `json:"benchmark_id"`
Model string `json:"model"`
Language string `json:"language"`
Condition string `json:"condition,omitempty"`
EvalMode string `json:"eval_mode"` // "agent" or "standard"
Executor string `json:"executor,omitempty"` // "claude", "gemini"
Seed int64 `json:"seed"`
// Assessment results
CompileOk bool `json:"compile_ok"`
RuntimeOk bool `json:"runtime_ok"`
StdoutOk bool `json:"stdout_ok"`
ErrorCategory string `json:"error_category"`
// Self-repair
FirstAttemptOk bool `json:"first_attempt_ok"`
RepairUsed bool `json:"repair_used"`
RepairOk bool `json:"repair_ok"`
ErrCode string `json:"err_code,omitempty"`
// Contract verification
VerifyOk bool `json:"verify_ok"`
VerifyVerified int `json:"verify_verified"`
VerifyCounterex int `json:"verify_counterexample"`
VerifySkipped int `json:"verify_skipped"`
VerifyErrors int `json:"verify_errors"`
// Reproducibility
PromptVersion string `json:"prompt_version,omitempty"`
CodeHash string `json:"code_hash,omitempty"`
// Output (truncated for storage efficiency)
Code string `json:"code,omitempty"`
Stdout string `json:"stdout,omitempty"`
ExpectedStdout string `json:"expected_stdout,omitempty"`
Stderr string `json:"stderr,omitempty"`
}
EvalAssessment stores structured evaluation results for agent benchmark stages. Stored as JSON in chain_stages.eval_assessment column (M-EVAL-CHAINS).
type EvalQueryOptions ¶
type EvalQueryOptions struct {
ChainID string // Filter by chain
Model string // Filter by model
Language string // Filter by language
BenchmarkID string // Filter by benchmark
Condition string // Filter by experimental condition
EvalMode string // "standard" or "agent"
SuccessOnly bool // Only passing benchmarks (stdout_ok = true)
FailureOnly bool // Only failing benchmarks (stdout_ok = false)
Limit int
}
EvalQueryOptions specifies filters for querying eval assessment stages.
type Event ¶
type Event struct {
Type WSEventType `json:"type"`
Timestamp time.Time `json:"timestamp"`
Data any `json:"data"`
}
Event represents a real-time event for WebSocket broadcast.
type ExecHierarchyWithMessages ¶
type ExecHierarchyWithMessages struct {
Messages []*MessageNode `json:"messages,omitempty"` // Messages that triggered execs
Orphan []*ExecTaskNode `json:"orphan,omitempty"` // Execs without a triggering message
Count int `json:"count"`
}
ExecHierarchyWithMessages groups exec tasks by their triggering messages
type ExecTaskNode ¶
type ExecTaskNode struct {
TaskID string `json:"task_id"`
ParentTaskID string `json:"parent_task_id"`
Command string `json:"command"` // exec, run, check, turn, tool_use
Provider string `json:"provider"` // for exec: claude, gemini, etc.
Workspace string `json:"workspace"` // for exec
FilePath string `json:"file_path"` // for run, check
Status string `json:"status"`
StartTime *time.Time `json:"start_time,omitempty"`
DurationMs int `json:"duration_ms,omitempty"`
Children []*ExecTaskNode `json:"children,omitempty"`
// Turn/tool specific fields
TurnNumber int `json:"turn_number,omitempty"` // for turn spans
ToolName string `json:"tool_name,omitempty"` // for tool_use spans
ToolInput string `json:"tool_input,omitempty"` // for tool_use spans
ToolOutput string `json:"tool_output,omitempty"` // for tool_use spans
DisplayName string `json:"display_name,omitempty"` // enriched name from session_tools (e.g., "Read: /path/file.go")
}
ExecTaskNode represents a node in the exec task hierarchy
type ExecutionChain ¶
type ExecutionChain struct {
ID string `json:"id"` // UUID
// Source (what triggered this chain)
SourceType ChainSourceType `json:"source_type"`
SourceRef string `json:"source_ref,omitempty"`
GitHubRepo string `json:"github_repo,omitempty"`
GitHubIssueNumber int `json:"github_issue_number,omitempty"`
// Current state
Status ChainStatus `json:"status"`
CurrentStage int `json:"current_stage"`
// Workspace context
WorkspaceID string `json:"workspace_id,omitempty"`
WorkspacePath string `json:"workspace_path,omitempty"`
// Timestamps
CreatedAt time.Time `json:"created_at"`
UpdatedAt *time.Time `json:"updated_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
// Summary metrics (denormalized for quick queries)
TotalCost float64 `json:"total_cost"`
TotalTokens int `json:"total_tokens"`
TotalTurns int `json:"total_turns"`
StagesCompleted int `json:"stages_completed"`
// Stages (populated on read)
Stages []*ChainStage `json:"stages,omitempty"`
}
ExecutionChain represents a complete execution flow from source to completion. This is the top-level hierarchy: Issue -> Message -> Task -> Session -> Traces
func (*ExecutionChain) ChainEnvironmentVars ¶
func (c *ExecutionChain) ChainEnvironmentVars(stageID, taskID, messageID string) map[string]string
ChainEnvironmentVars returns the environment variables to pass to executors for write-time linking of spans to this chain/stage.
type FilterPattern ¶
type FilterPattern struct {
Type string // "prefix", "exact", "suffix"
Pattern string // The pattern to match against span name
Service string // Optional: only match spans from this service (service.name)
}
FilterPattern defines a single span filter rule.
type GCPConfig ¶
type GCPConfig struct {
ProjectID string
CredentialsPath string // optional - uses default credentials if empty
CacheTTL time.Duration // Cache TTL for trace results (default: 60s)
}
GCPConfig contains configuration for GCP Trace backend.
type GCPTraceBackend ¶
type GCPTraceBackend struct {
// contains filtered or unexported fields
}
GCPTraceBackend implements Backend using Google Cloud Trace. This is a read-only backend for querying traces stored in GCP. Results are cached for CacheTTL duration to avoid quota exhaustion.
func NewGCPTraceBackend ¶
func NewGCPTraceBackend(config GCPConfig) (*GCPTraceBackend, error)
NewGCPTraceBackend creates a new GCP Trace backend.
func (*GCPTraceBackend) BackfillSpansWorkspace ¶
func (*GCPTraceBackend) Close ¶
func (b *GCPTraceBackend) Close() error
Close closes the GCP client.
func (*GCPTraceBackend) CountChatMessages ¶
func (b *GCPTraceBackend) CountChatMessages(ctx context.Context, q ChatMessageQuery) (int, int, error)
func (*GCPTraceBackend) CreateAgentAssignment ¶
func (b *GCPTraceBackend) CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error
func (*GCPTraceBackend) CreateChain ¶
func (b *GCPTraceBackend) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)
func (*GCPTraceBackend) CreateMessage ¶
func (b *GCPTraceBackend) CreateMessage(ctx context.Context, m *Message) error
func (*GCPTraceBackend) CreateMetric ¶
func (b *GCPTraceBackend) CreateMetric(ctx context.Context, m *Metric) error
func (*GCPTraceBackend) CreateSpan ¶
func (b *GCPTraceBackend) CreateSpan(ctx context.Context, span *Span) error
func (*GCPTraceBackend) CreateSpanEvent ¶
func (b *GCPTraceBackend) CreateSpanEvent(ctx context.Context, e *SpanEvent) error
func (*GCPTraceBackend) CreateStage ¶
func (b *GCPTraceBackend) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)
func (*GCPTraceBackend) CreateTask ¶
func (b *GCPTraceBackend) CreateTask(ctx context.Context, t *Task) error
func (*GCPTraceBackend) CreateWorkspace ¶
func (b *GCPTraceBackend) CreateWorkspace(ctx context.Context, w *Workspace) error
func (*GCPTraceBackend) DeleteAgentAssignment ¶
func (b *GCPTraceBackend) DeleteAgentAssignment(ctx context.Context, id string) error
func (*GCPTraceBackend) DeleteMessage ¶
func (b *GCPTraceBackend) DeleteMessage(ctx context.Context, id string) error
func (*GCPTraceBackend) DeleteSpan ¶
func (b *GCPTraceBackend) DeleteSpan(ctx context.Context, id string) error
func (*GCPTraceBackend) DeleteSpanEvent ¶
func (b *GCPTraceBackend) DeleteSpanEvent(ctx context.Context, id int64) error
func (*GCPTraceBackend) DeleteTask ¶
func (b *GCPTraceBackend) DeleteTask(ctx context.Context, id string) error
func (*GCPTraceBackend) DeleteWorkspace ¶
func (b *GCPTraceBackend) DeleteWorkspace(ctx context.Context, id string) error
func (*GCPTraceBackend) FindLatestUnfinishedTool ¶
func (*GCPTraceBackend) GetAgentAssignment ¶
func (b *GCPTraceBackend) GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)
func (*GCPTraceBackend) GetAgentStats ¶
func (b *GCPTraceBackend) GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)
func (*GCPTraceBackend) GetChain ¶
func (b *GCPTraceBackend) GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)
func (*GCPTraceBackend) GetChainByGitHubIssue ¶
func (b *GCPTraceBackend) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)
func (*GCPTraceBackend) GetChainByMessageID ¶
func (b *GCPTraceBackend) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)
func (*GCPTraceBackend) GetChainByTaskID ¶
func (b *GCPTraceBackend) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)
func (*GCPTraceBackend) GetChainStages ¶
func (b *GCPTraceBackend) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)
func (*GCPTraceBackend) GetChainStats ¶
func (b *GCPTraceBackend) GetChainStats(ctx context.Context) (*ChainStats, error)
func (*GCPTraceBackend) GetChainStatsByAgent ¶
func (b *GCPTraceBackend) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)
func (*GCPTraceBackend) GetChainStatusCounts ¶
func (b *GCPTraceBackend) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)
func (*GCPTraceBackend) GetChatMessagesBySession ¶
func (b *GCPTraceBackend) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)
func (*GCPTraceBackend) GetChatMessagesByTaskID ¶
func (b *GCPTraceBackend) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)
func (*GCPTraceBackend) GetExecTaskHierarchy ¶
func (b *GCPTraceBackend) GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)
func (*GCPTraceBackend) GetExecTaskHierarchyWithMessages ¶
func (b *GCPTraceBackend) GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)
func (*GCPTraceBackend) GetMessage ¶
func (*GCPTraceBackend) GetMetricsSummary ¶
func (b *GCPTraceBackend) GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)
func (*GCPTraceBackend) GetProviderComparison ¶
func (b *GCPTraceBackend) GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)
func (*GCPTraceBackend) GetSession ¶
func (*GCPTraceBackend) GetSessionMetricsSummary ¶
func (b *GCPTraceBackend) GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)
func (*GCPTraceBackend) GetSessionTools ¶
func (b *GCPTraceBackend) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)
func (*GCPTraceBackend) GetSessionWorkspace ¶
func (b *GCPTraceBackend) GetSessionWorkspace(sessionID string) (string, error)
func (*GCPTraceBackend) GetSpanEvents ¶
func (*GCPTraceBackend) GetSpanHierarchy ¶
func (b *GCPTraceBackend) GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)
func (*GCPTraceBackend) GetSpanLitesByStageID ¶
func (b *GCPTraceBackend) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)
func (*GCPTraceBackend) GetSpansByStageID ¶
func (*GCPTraceBackend) GetStage ¶
func (b *GCPTraceBackend) GetStage(ctx context.Context, id string) (*ChainStage, error)
func (*GCPTraceBackend) GetTaskTimeline ¶
func (b *GCPTraceBackend) GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)
func (*GCPTraceBackend) GetToolForSpan ¶
func (b *GCPTraceBackend) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)
func (*GCPTraceBackend) GetToolsByTimestampRange ¶
func (b *GCPTraceBackend) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)
func (*GCPTraceBackend) GetWorkspace ¶
func (*GCPTraceBackend) GetWorkspaceStats ¶
func (b *GCPTraceBackend) GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)
func (*GCPTraceBackend) InsertToolStart ¶
func (b *GCPTraceBackend) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error
func (*GCPTraceBackend) LinkOrphanedSpansBySession ¶
func (b *GCPTraceBackend) LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)
LinkOrphanedSpansBySession is not supported by GCP backend (session correlation is local only).
func (*GCPTraceBackend) LinkSpanToChain ¶
func (b *GCPTraceBackend) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error
func (*GCPTraceBackend) ListAgentAssignments ¶
func (b *GCPTraceBackend) ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)
func (*GCPTraceBackend) ListChains ¶
func (b *GCPTraceBackend) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)
func (*GCPTraceBackend) ListMessages ¶
func (b *GCPTraceBackend) ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)
func (*GCPTraceBackend) ListMetrics ¶
func (b *GCPTraceBackend) ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)
func (*GCPTraceBackend) ListPendingApprovals ¶
func (b *GCPTraceBackend) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)
func (*GCPTraceBackend) ListSpans ¶
func (b *GCPTraceBackend) ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)
func (*GCPTraceBackend) ListTasks ¶
func (b *GCPTraceBackend) ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)
func (*GCPTraceBackend) ListTraces ¶
func (b *GCPTraceBackend) ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)
func (*GCPTraceBackend) ListWorkspaces ¶
func (b *GCPTraceBackend) ListWorkspaces(ctx context.Context) ([]*Workspace, error)
func (*GCPTraceBackend) LookupTaskBySessionID ¶
func (b *GCPTraceBackend) LookupTaskBySessionID(ctx context.Context, sessionID string) (string, string, string)
LookupTaskBySessionID is not supported by GCP backend (session correlation is local only).
func (*GCPTraceBackend) MarkMessageArchived ¶
func (b *GCPTraceBackend) MarkMessageArchived(ctx context.Context, id string) error
func (*GCPTraceBackend) MarkMessageRead ¶
func (b *GCPTraceBackend) MarkMessageRead(ctx context.Context, id string) error
func (*GCPTraceBackend) RecalculateTaskAggregates ¶
func (b *GCPTraceBackend) RecalculateTaskAggregates(ctx context.Context, taskID string) error
func (*GCPTraceBackend) UpdateAgentAssignment ¶
func (b *GCPTraceBackend) UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error
func (*GCPTraceBackend) UpdateChainMetrics ¶
func (*GCPTraceBackend) UpdateChainStatus ¶
func (b *GCPTraceBackend) UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error
func (*GCPTraceBackend) UpdateMessage ¶
func (b *GCPTraceBackend) UpdateMessage(ctx context.Context, m *Message) error
func (*GCPTraceBackend) UpdateSessionEnded ¶
func (b *GCPTraceBackend) UpdateSessionEnded(ctx context.Context, sessionID string) error
func (*GCPTraceBackend) UpdateSpan ¶
func (b *GCPTraceBackend) UpdateSpan(ctx context.Context, span *Span) error
func (*GCPTraceBackend) UpdateSpanLinks ¶
func (b *GCPTraceBackend) UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error
func (*GCPTraceBackend) UpdateStageApproval ¶
func (b *GCPTraceBackend) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, approvalType ApprovalType, feedback string) error
func (*GCPTraceBackend) UpdateStageError ¶
func (b *GCPTraceBackend) UpdateStageError(ctx context.Context, stageID, errorMessage string) error
func (*GCPTraceBackend) UpdateStageMetrics ¶
func (*GCPTraceBackend) UpdateStageSession ¶
func (b *GCPTraceBackend) UpdateStageSession(ctx context.Context, stageID, sessionID string) error
func (*GCPTraceBackend) UpdateStageStatus ¶
func (b *GCPTraceBackend) UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error
func (*GCPTraceBackend) UpdateTask ¶
func (b *GCPTraceBackend) UpdateTask(ctx context.Context, t *Task) error
func (*GCPTraceBackend) UpdateToolEnd ¶
func (*GCPTraceBackend) UpdateWorkspace ¶
func (b *GCPTraceBackend) UpdateWorkspace(ctx context.Context, w *Workspace) error
func (*GCPTraceBackend) UpsertSession ¶
func (b *GCPTraceBackend) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error
func (*GCPTraceBackend) UpsertSessionWithCorrelation ¶
func (b *GCPTraceBackend) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, corr *SessionCorrelation) error
type GeminiOTELSpan ¶
type GeminiOTELSpan struct {
TraceID string `json:"traceId"`
SpanID string `json:"spanId"`
ParentSpanID string `json:"parentSpanId,omitempty"`
Name string `json:"name"`
Kind int `json:"kind"` // OTEL span kind enum
StartTimeUnixNano int64 `json:"startTimeUnixNano"`
EndTimeUnixNano int64 `json:"endTimeUnixNano"`
Attributes []KeyValue `json:"attributes,omitempty"`
Status *OTELStatus `json:"status,omitempty"`
Events []OTELEvent `json:"events,omitempty"`
Resource *OTELResource `json:"resource,omitempty"`
}
GeminiOTELSpan represents an OTEL span from Gemini CLI. Gemini exports full OTEL traces with parent-child relationships.
func ParseGeminiTraceJSON ¶
func ParseGeminiTraceJSON(data []byte) ([]*GeminiOTELSpan, error)
ParseGeminiTraceJSON parses Gemini OTEL trace from JSON.
type HealthAction ¶
type HealthAction string
HealthAction describes what action to take based on DB size.
const ( HealthOK HealthAction = "ok" HealthWarn HealthAction = "warn" HealthCleanup HealthAction = "cleanup" HealthDanger HealthAction = "danger" )
type HeatmapDataPoint ¶
type HeatmapDataPoint struct {
Date string `json:"date"` // YYYY-MM-DD
SpanCount int `json:"span_count"` // Number of spans
TaskCount int `json:"task_count"` // Number of distinct tasks
Cost float64 `json:"cost"` // Total cost USD
SuccessRate float64 `json:"success_rate"` // 0.0 to 1.0
TokensIn int64 `json:"tokens_in"`
TokensOut int64 `json:"tokens_out"`
}
HeatmapDataPoint represents activity data for a single day
type HierarchicalSpan ¶
type HierarchicalSpan struct {
ID string `json:"id"`
TraceID string `json:"trace_id"`
ParentSpanID string `json:"parent_span_id,omitempty"`
Name string `json:"name"`
DisplayName string `json:"display_name,omitempty"`
StartTime time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time,omitempty"`
DurationMs int64 `json:"duration_ms"`
Status SpanStatus `json:"status"`
Attributes map[string]any `json:"attributes,omitempty"`
Provider Provider `json:"provider,omitempty"`
Model string `json:"model,omitempty"`
TokensIn int64 `json:"tokens_in,omitempty"`
TokensOut int64 `json:"tokens_out,omitempty"`
CostUSD float64 `json:"cost_usd,omitempty"`
TaskID string `json:"task_id,omitempty"`
ChatContext *ChatContext `json:"chat_context,omitempty"`
Children []*HierarchicalSpan `json:"children,omitempty"`
}
HierarchicalSpan represents a span with children for hierarchical JSON response. This type is used by the /spans/enriched?hierarchical=true endpoint.
type HierarchyOptions ¶
type HierarchyOptions struct {
// MaxDepth limits the span tree depth (0 = unlimited)
MaxDepth int
// IncludeSpans controls whether to include individual spans
IncludeSpans bool
// Workspace filters spans to only include those belonging to this workspace path
// This prevents cross-workspace span bleeding in hierarchy views
Workspace string
// WorkspaceID filters spans by workspace_id directly (set automatically from task)
WorkspaceID string
}
HierarchyOptions configures the hierarchy query.
func DefaultHierarchyOptions ¶
func DefaultHierarchyOptions() HierarchyOptions
DefaultHierarchyOptions returns default options for hierarchy queries.
type HierarchyTraceSummary ¶
type HierarchyTraceSummary struct {
SpanCount int `json:"span_count"`
TotalTokens int64 `json:"total_tokens"`
TotalCostUSD float64 `json:"total_cost_usd"`
DurationMs int64 `json:"duration_ms"`
ErrorCount int `json:"error_count"`
}
HierarchyTraceSummary contains aggregate metrics for a trace in hierarchy view. This differs from TraceSummary in models.go which is for trace list views.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub maintains the set of active clients and broadcasts events.
func (*Hub) BroadcastApprovalDecision ¶
BroadcastApprovalDecision sends an approval.decision event.
func (*Hub) BroadcastApprovalRequested ¶
BroadcastApprovalRequested sends an approval.requested event.
func (*Hub) BroadcastMessageCreated ¶
BroadcastMessageCreated sends a message.created event.
func (*Hub) BroadcastMetricsUpdated ¶
func (h *Hub) BroadcastMetricsUpdated(summary *MetricsSummary)
BroadcastMetricsUpdated sends a metrics.updated event.
func (*Hub) BroadcastSpanCreated ¶
BroadcastSpanCreated sends a span.created event.
func (*Hub) BroadcastSpanUpdated ¶
BroadcastSpanUpdated sends a span.updated event.
func (*Hub) BroadcastTaskCompleted ¶
BroadcastTaskCompleted sends a task.completed event.
func (*Hub) BroadcastTaskCreated ¶
BroadcastTaskCreated sends a task.created event.
func (*Hub) BroadcastTaskUpdated ¶
BroadcastTaskUpdated sends a task.updated event.
func (*Hub) ClientCount ¶
ClientCount returns the number of connected clients.
func (*Hub) HandleWebSocket ¶
func (h *Hub) HandleWebSocket(w http.ResponseWriter, r *http.Request)
HandleWebSocket handles a WebSocket connection upgrade request.
type JaegerBackend ¶
type JaegerBackend struct {
// contains filtered or unexported fields
}
JaegerBackend implements Backend using Jaeger for trace storage. This is a read-only backend for querying traces from Jaeger.
func NewJaegerBackend ¶
func NewJaegerBackend(config JaegerConfig) (*JaegerBackend, error)
NewJaegerBackend creates a new Jaeger backend.
func (*JaegerBackend) BackfillSpansWorkspace ¶
func (*JaegerBackend) CountChatMessages ¶
func (b *JaegerBackend) CountChatMessages(ctx context.Context, q ChatMessageQuery) (int, int, error)
func (*JaegerBackend) CreateAgentAssignment ¶
func (b *JaegerBackend) CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error
func (*JaegerBackend) CreateChain ¶
func (b *JaegerBackend) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)
func (*JaegerBackend) CreateMessage ¶
func (b *JaegerBackend) CreateMessage(ctx context.Context, m *Message) error
func (*JaegerBackend) CreateMetric ¶
func (b *JaegerBackend) CreateMetric(ctx context.Context, m *Metric) error
func (*JaegerBackend) CreateSpan ¶
func (b *JaegerBackend) CreateSpan(ctx context.Context, span *Span) error
func (*JaegerBackend) CreateSpanEvent ¶
func (b *JaegerBackend) CreateSpanEvent(ctx context.Context, e *SpanEvent) error
func (*JaegerBackend) CreateStage ¶
func (b *JaegerBackend) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)
func (*JaegerBackend) CreateTask ¶
func (b *JaegerBackend) CreateTask(ctx context.Context, t *Task) error
func (*JaegerBackend) CreateWorkspace ¶
func (b *JaegerBackend) CreateWorkspace(ctx context.Context, w *Workspace) error
func (*JaegerBackend) DeleteAgentAssignment ¶
func (b *JaegerBackend) DeleteAgentAssignment(ctx context.Context, id string) error
func (*JaegerBackend) DeleteMessage ¶
func (b *JaegerBackend) DeleteMessage(ctx context.Context, id string) error
func (*JaegerBackend) DeleteSpan ¶
func (b *JaegerBackend) DeleteSpan(ctx context.Context, id string) error
func (*JaegerBackend) DeleteSpanEvent ¶
func (b *JaegerBackend) DeleteSpanEvent(ctx context.Context, id int64) error
func (*JaegerBackend) DeleteTask ¶
func (b *JaegerBackend) DeleteTask(ctx context.Context, id string) error
func (*JaegerBackend) DeleteWorkspace ¶
func (b *JaegerBackend) DeleteWorkspace(ctx context.Context, id string) error
func (*JaegerBackend) FindLatestUnfinishedTool ¶
func (*JaegerBackend) GetAgentAssignment ¶
func (b *JaegerBackend) GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)
func (*JaegerBackend) GetAgentStats ¶
func (b *JaegerBackend) GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)
func (*JaegerBackend) GetChain ¶
func (b *JaegerBackend) GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)
func (*JaegerBackend) GetChainByGitHubIssue ¶
func (b *JaegerBackend) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)
func (*JaegerBackend) GetChainByMessageID ¶
func (b *JaegerBackend) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)
func (*JaegerBackend) GetChainByTaskID ¶
func (b *JaegerBackend) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)
func (*JaegerBackend) GetChainStages ¶
func (b *JaegerBackend) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)
func (*JaegerBackend) GetChainStats ¶
func (b *JaegerBackend) GetChainStats(ctx context.Context) (*ChainStats, error)
func (*JaegerBackend) GetChainStatsByAgent ¶
func (b *JaegerBackend) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)
func (*JaegerBackend) GetChainStatusCounts ¶
func (b *JaegerBackend) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)
func (*JaegerBackend) GetChatMessagesBySession ¶
func (b *JaegerBackend) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)
func (*JaegerBackend) GetChatMessagesByTaskID ¶
func (b *JaegerBackend) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)
func (*JaegerBackend) GetExecTaskHierarchy ¶
func (b *JaegerBackend) GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)
func (*JaegerBackend) GetExecTaskHierarchyWithMessages ¶
func (b *JaegerBackend) GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)
func (*JaegerBackend) GetMessage ¶
func (*JaegerBackend) GetMetricsSummary ¶
func (b *JaegerBackend) GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)
func (*JaegerBackend) GetProviderComparison ¶
func (b *JaegerBackend) GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)
func (*JaegerBackend) GetSession ¶
func (*JaegerBackend) GetSessionMetricsSummary ¶
func (b *JaegerBackend) GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)
func (*JaegerBackend) GetSessionTools ¶
func (b *JaegerBackend) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)
func (*JaegerBackend) GetSessionWorkspace ¶
func (b *JaegerBackend) GetSessionWorkspace(sessionID string) (string, error)
func (*JaegerBackend) GetSpanEvents ¶
func (*JaegerBackend) GetSpanHierarchy ¶
func (b *JaegerBackend) GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)
func (*JaegerBackend) GetSpanLitesByStageID ¶
func (b *JaegerBackend) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)
func (*JaegerBackend) GetSpansByStageID ¶
func (*JaegerBackend) GetStage ¶
func (b *JaegerBackend) GetStage(ctx context.Context, id string) (*ChainStage, error)
func (*JaegerBackend) GetTaskTimeline ¶
func (b *JaegerBackend) GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)
func (*JaegerBackend) GetToolForSpan ¶
func (b *JaegerBackend) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)
func (*JaegerBackend) GetToolsByTimestampRange ¶
func (b *JaegerBackend) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)
func (*JaegerBackend) GetWorkspace ¶
func (*JaegerBackend) GetWorkspaceStats ¶
func (b *JaegerBackend) GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)
func (*JaegerBackend) InsertToolStart ¶
func (b *JaegerBackend) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error
func (*JaegerBackend) LinkOrphanedSpansBySession ¶
func (b *JaegerBackend) LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)
LinkOrphanedSpansBySession is not supported by Jaeger backend (session correlation is local only).
func (*JaegerBackend) LinkSpanToChain ¶
func (b *JaegerBackend) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error
func (*JaegerBackend) ListAgentAssignments ¶
func (b *JaegerBackend) ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)
func (*JaegerBackend) ListChains ¶
func (b *JaegerBackend) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)
func (*JaegerBackend) ListMessages ¶
func (b *JaegerBackend) ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)
func (*JaegerBackend) ListMetrics ¶
func (b *JaegerBackend) ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)
func (*JaegerBackend) ListPendingApprovals ¶
func (b *JaegerBackend) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)
func (*JaegerBackend) ListSpans ¶
func (b *JaegerBackend) ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)
func (*JaegerBackend) ListTasks ¶
func (b *JaegerBackend) ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)
func (*JaegerBackend) ListTraces ¶
func (b *JaegerBackend) ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)
func (*JaegerBackend) ListWorkspaces ¶
func (b *JaegerBackend) ListWorkspaces(ctx context.Context) ([]*Workspace, error)
func (*JaegerBackend) LookupTaskBySessionID ¶
func (b *JaegerBackend) LookupTaskBySessionID(ctx context.Context, sessionID string) (string, string, string)
LookupTaskBySessionID is not supported by Jaeger backend (session correlation is local only).
func (*JaegerBackend) MarkMessageArchived ¶
func (b *JaegerBackend) MarkMessageArchived(ctx context.Context, id string) error
func (*JaegerBackend) MarkMessageRead ¶
func (b *JaegerBackend) MarkMessageRead(ctx context.Context, id string) error
func (*JaegerBackend) RecalculateTaskAggregates ¶
func (b *JaegerBackend) RecalculateTaskAggregates(ctx context.Context, taskID string) error
func (*JaegerBackend) UpdateAgentAssignment ¶
func (b *JaegerBackend) UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error
func (*JaegerBackend) UpdateChainMetrics ¶
func (*JaegerBackend) UpdateChainStatus ¶
func (b *JaegerBackend) UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error
func (*JaegerBackend) UpdateMessage ¶
func (b *JaegerBackend) UpdateMessage(ctx context.Context, m *Message) error
func (*JaegerBackend) UpdateSessionEnded ¶
func (b *JaegerBackend) UpdateSessionEnded(ctx context.Context, sessionID string) error
func (*JaegerBackend) UpdateSpan ¶
func (b *JaegerBackend) UpdateSpan(ctx context.Context, span *Span) error
func (*JaegerBackend) UpdateSpanLinks ¶
func (b *JaegerBackend) UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error
func (*JaegerBackend) UpdateStageApproval ¶
func (b *JaegerBackend) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, approvalType ApprovalType, feedback string) error
func (*JaegerBackend) UpdateStageError ¶
func (b *JaegerBackend) UpdateStageError(ctx context.Context, stageID, errorMessage string) error
func (*JaegerBackend) UpdateStageMetrics ¶
func (*JaegerBackend) UpdateStageSession ¶
func (b *JaegerBackend) UpdateStageSession(ctx context.Context, stageID, sessionID string) error
func (*JaegerBackend) UpdateStageStatus ¶
func (b *JaegerBackend) UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error
func (*JaegerBackend) UpdateTask ¶
func (b *JaegerBackend) UpdateTask(ctx context.Context, t *Task) error
func (*JaegerBackend) UpdateToolEnd ¶
func (*JaegerBackend) UpdateWorkspace ¶
func (b *JaegerBackend) UpdateWorkspace(ctx context.Context, w *Workspace) error
func (*JaegerBackend) UpsertSession ¶
func (b *JaegerBackend) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error
func (*JaegerBackend) UpsertSessionWithCorrelation ¶
func (b *JaegerBackend) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, corr *SessionCorrelation) error
type JaegerConfig ¶
type JaegerConfig struct {
Endpoint string // Jaeger query endpoint (e.g., http://localhost:16686)
Username string // Optional basic auth
Password string
}
JaegerConfig contains configuration for Jaeger backend.
type JourneyResponse ¶
type JourneyResponse struct {
ChainID string `json:"chain_id"`
Steps []JourneyStep `json:"steps"`
Summary string `json:"summary"` // "Design doc approved -> Sprint planned -> Implementation in progress"
}
JourneyResponse is the pre-computed narrative for a chain's execution flow.
type JourneyStep ¶
type JourneyStep struct {
StageNumber int `json:"stage_number"`
AgentID string `json:"agent_id"`
Action string `json:"action"` // "Created design doc", "Planned sprint", etc.
Status string `json:"status"` // completed, failed, running, etc.
ApprovalStatus string `json:"approval_status,omitempty"` // approved, rejected, pending
Iteration int `json:"iteration"`
Feedback string `json:"feedback,omitempty"` // Truncated human feedback
ErrorExcerpt string `json:"error_excerpt,omitempty"` // First line of error
Cost float64 `json:"cost"`
DurationMs int64 `json:"duration_ms"`
}
JourneyStep describes one step in the execution journey narrative.
type KeyValue ¶
type KeyValue struct {
Key string `json:"key"`
Value ValueUnion `json:"value"`
}
KeyValue represents an OTEL attribute.
type Message ¶
type Message struct {
ID string `json:"id"`
TaskID string `json:"task_id,omitempty"`
Inbox string `json:"inbox"`
FromAgent string `json:"from_agent"`
Title string `json:"title"`
Content string `json:"content"`
MessageType string `json:"message_type"`
Status MessageStatus `json:"status"`
Priority string `json:"priority"`
// GitHub sync
GitHubIssueNumber int `json:"github_issue_number,omitempty"`
GitHubRepo string `json:"github_repo,omitempty"`
// Correlation
CorrelationID string `json:"correlation_id,omitempty"`
ReplyToID string `json:"reply_to_id,omitempty"`
CreatedAt time.Time `json:"created_at"`
ReadAt *time.Time `json:"read_at,omitempty"`
ArchivedAt *time.Time `json:"archived_at,omitempty"`
// Search
ContentHash string `json:"content_hash,omitempty"`
}
Message represents an agent-to-agent message.
type MessageListOptions ¶
type MessageListOptions struct {
Inbox string
Status MessageStatus
TaskID string
FromAgent string
Limit int
Offset int
}
MessageListOptions configures message listing.
type MessageNode ¶
type MessageNode struct {
MessageID string `json:"message_id"`
Title string `json:"title"`
FromAgent string `json:"from_agent"`
ToInbox string `json:"to_inbox"`
MessageType string `json:"message_type"`
Status string `json:"status"`
CreatedAt *time.Time `json:"created_at,omitempty"`
Execs []*ExecTaskNode `json:"execs,omitempty"`
}
MessageNode represents a message that triggered coordinator tasks
type MessageStatus ¶
type MessageStatus string
MessageStatus represents the status of a message.
const ( MessageStatusUnread MessageStatus = "unread" MessageStatusRead MessageStatus = "read" MessageStatusArchived MessageStatus = "archived" )
type Metric ¶
type Metric struct {
ID int64 `json:"id,omitempty"`
Name string `json:"name"`
Type string `json:"metric_type"` // "counter", "gauge"
SessionID string `json:"session_id,omitempty"`
Workspace string `json:"workspace,omitempty"`
Provider string `json:"provider,omitempty"`
// Denormalized labels for efficient queries
LabelType string `json:"label_type,omitempty"` // "added"/"removed" for LOC
LabelTool string `json:"label_tool,omitempty"` // tool name for tool metrics
LabelDecision string `json:"label_decision,omitempty"` // "approved"/"rejected" for code_edit_tool
LabelLanguage string `json:"label_language,omitempty"` // language for LOC
LabelModel string `json:"label_model,omitempty"` // model name for model-specific metrics
// Values (only one will be set based on metric type)
ValueInt int64 `json:"value_int,omitempty"`
ValueFloat float64 `json:"value_float,omitempty"`
// Full labels as JSON (for provider-specific data)
Labels map[string]any `json:"labels,omitempty"`
ResourceAttributes map[string]any `json:"resource_attributes,omitempty"`
Timestamp time.Time `json:"timestamp"`
CreatedAt time.Time `json:"created_at"`
}
Metric represents an OTLP counter or gauge metric from Claude Code telemetry. Captures: lines_of_code.count, commit.count, pull_request.count, active_time.total, etc.
func (*Metric) LabelsJSON ¶
LabelsJSON returns labels as a JSON string for storage.
func (*Metric) ResourceAttributesJSON ¶
ResourceAttributesJSON returns resource attributes as a JSON string for storage.
type MetricListOptions ¶
type MetricListOptions struct {
SessionID string `json:"session_id,omitempty"`
Workspace string `json:"workspace,omitempty"`
Name string `json:"name,omitempty"` // e.g., "claude_code.lines_of_code.count"
TimeRange *TimeRange `json:"time_range,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
}
MetricListOptions specifies filters for listing metrics.
type MetricsSummary ¶
type MetricsSummary struct {
TotalWorkspaces int `json:"total_workspaces"`
TotalTasks int `json:"total_tasks"`
TotalSpans int `json:"total_spans"`
TotalAgents int `json:"total_agents"`
TotalTokensIn int64 `json:"total_tokens_in"`
TotalTokensOut int64 `json:"total_tokens_out"`
TotalCostUSD float64 `json:"total_cost_usd"`
SuccessRate float64 `json:"success_rate"`
// Cache metrics (from spans)
TotalCacheReadTokens int64 `json:"total_cache_read_tokens,omitempty"`
TotalCacheCreationTokens int64 `json:"total_cache_creation_tokens,omitempty"`
CacheSavingsUSD float64 `json:"cache_savings_usd,omitempty"`
// Lines of Code metrics (from metrics table)
LinesAdded int64 `json:"lines_added,omitempty"`
LinesRemoved int64 `json:"lines_removed,omitempty"`
// Activity metrics (from metrics table)
CommitCount int64 `json:"commit_count,omitempty"`
PullRequestCount int64 `json:"pull_request_count,omitempty"`
ActiveTimeMs int64 `json:"active_time_ms,omitempty"`
// Session metrics
TurnCount int `json:"turn_count,omitempty"`
ToolCalls int `json:"tool_calls,omitempty"`
ErrorCount int `json:"error_count,omitempty"`
}
MetricsSummary contains global metrics summary.
type OTELEvent ¶
type OTELEvent struct {
Name string `json:"name"`
TimeUnixNano int64 `json:"timeUnixNano"`
Attributes []KeyValue `json:"attributes,omitempty"`
}
OTELEvent represents an OTEL span event.
type OTELResource ¶
type OTELResource struct {
Attributes []KeyValue `json:"attributes,omitempty"`
}
OTELResource represents OTEL resource attributes.
type OTELStatus ¶
type OTELStatus struct {
Code int `json:"code"` // 0=Unset, 1=OK, 2=Error
Message string `json:"message,omitempty"`
}
OTELStatus represents OTEL span status.
type OTLPReceiver ¶
type OTLPReceiver struct {
// contains filtered or unexported fields
}
OTLPReceiver receives spans via the standard OTLP HTTP protocol. This allows any OTEL-compatible exporter to send traces to the observatory.
func NewOTLPReceiver ¶
func NewOTLPReceiver(backend Backend) *OTLPReceiver
NewOTLPReceiver creates a new OTLP receiver that stores spans in the backend.
func (*OTLPReceiver) RegisterRoutes ¶
func (r *OTLPReceiver) RegisterRoutes(mux *http.ServeMux)
RegisterRoutes registers the OTLP HTTP endpoints on the given mux. Implements the OTLP/HTTP specification: - POST /v1/traces - Receive trace data (protobuf or JSON) - POST /v1/logs - Receive logs/events (for Claude Code events) - POST /v1/metrics - Receive metrics (for Claude Code metrics)
type OutlierAnalysis ¶
type OutlierAnalysis struct {
TaskID string `json:"task_id"`
TaskTitle string `json:"task_title"`
SpanCount int `json:"span_count"`
Stats []*TaskMetricStats `json:"stats"` // Per-metric statistics
Outliers []*SpanOutlier `json:"outliers"` // Detected outliers (sorted by |z-score| desc)
RateOfChange *RateAnalysis `json:"rate_of_change,omitempty"` // Optional rate analysis
Threshold float64 `json:"threshold"` // Z-score threshold used
AnalyzedAt time.Time `json:"analyzed_at"`
}
OutlierAnalysis contains the full outlier analysis for a task.
func AnalyzeTaskOutliers ¶
func AnalyzeTaskOutliers(ctx context.Context, backend Backend, taskID string, opts OutlierOptions) (*OutlierAnalysis, error)
AnalyzeTaskOutliers performs statistical outlier detection on spans within a task.
type OutlierOptions ¶
type OutlierOptions struct {
Threshold float64 // Z-score threshold (default: 2.0)
Metric string // Filter to specific metric: "cost", "duration", "tokens", or "" for all
ShowRate bool // Include rate-of-change analysis
Limit int // Max outliers to return (default: 10)
}
OutlierOptions configures outlier detection.
func DefaultOutlierOptions ¶
func DefaultOutlierOptions() OutlierOptions
DefaultOutlierOptions returns sensible defaults.
type PendingApprovalInfo ¶
type PendingApprovalInfo struct {
ChainID string `json:"chain_id"`
SourceType string `json:"source_type"`
SourceRef string `json:"source_ref"`
StageID string `json:"stage_id"`
StageNumber int `json:"stage_number"`
AgentID string `json:"agent_id"`
ApprovalStatus string `json:"approval_status"`
ApprovalType ApprovalType `json:"approval_type"`
TaskID string `json:"task_id"`
SessionID string `json:"session_id"`
Cost float64 `json:"cost"`
Turns int `json:"turns"`
StageCreated time.Time `json:"stage_created"`
}
PendingApprovalInfo contains approval info for dashboard views.
type ProviderComparison ¶
type ProviderComparison struct {
Provider Provider `json:"provider"`
TotalExecutions int `json:"total_executions"`
TotalTokensIn int64 `json:"total_tokens_in"`
TotalTokensOut int64 `json:"total_tokens_out"`
TotalCost float64 `json:"total_cost"`
AvgDurationMs float64 `json:"avg_duration_ms"`
SuccessRate float64 `json:"success_rate"`
}
ProviderComparison contains comparison metrics between providers.
type ProviderNormalizer ¶
type ProviderNormalizer struct{}
ProviderNormalizer normalizes telemetry from different providers into Spans.
func NewProviderNormalizer ¶
func NewProviderNormalizer() *ProviderNormalizer
NewProviderNormalizer creates a new normalizer.
func (*ProviderNormalizer) NormalizeClaudeMetrics ¶
func (n *ProviderNormalizer) NormalizeClaudeMetrics(metrics *ClaudeMetrics) (*Span, error)
NormalizeClaudeMetrics converts Claude metrics into a Span with events.
func (*ProviderNormalizer) NormalizeGeminiSpan ¶
func (n *ProviderNormalizer) NormalizeGeminiSpan(otelSpan *GeminiOTELSpan, taskID string) (*Span, error)
NormalizeGeminiSpan converts a Gemini OTEL span into our normalized Span.
func (*ProviderNormalizer) NormalizeGeminiTrace ¶
func (n *ProviderNormalizer) NormalizeGeminiTrace(otelSpans []*GeminiOTELSpan, taskID string) ([]*Span, error)
NormalizeGeminiTrace converts a complete Gemini OTEL trace into normalized Spans.
type RateAnalysis ¶
type RateAnalysis struct {
CumulativeCost []CumulativePoint `json:"cumulative_cost"`
CumulativeTokens []CumulativePoint `json:"cumulative_tokens"`
CumulativeDuration []CumulativePoint `json:"cumulative_duration"`
}
RateAnalysis shows how metrics accumulated during task execution.
type RetentionStats ¶
type RetentionStats struct {
SpansDeleted int64
SummariesDeleted int64
MetricsDeleted int64
ChatDeleted int64
ToolsDeleted int64
}
RetentionStats reports what was cleaned up by RunRetention.
func (RetentionStats) String ¶
func (rs RetentionStats) String() string
String formats retention stats for logging.
func (RetentionStats) Total ¶
func (rs RetentionStats) Total() int64
Total returns the total number of rows deleted.
type SQLiteBackend ¶
type SQLiteBackend struct {
// contains filtered or unexported fields
}
SQLiteBackend implements Backend using SQLite storage.
func NewSQLiteBackend ¶
func NewSQLiteBackend(db *sql.DB) (*SQLiteBackend, error)
NewSQLiteBackend creates a new SQLite backend.
func NewSQLiteBackendFromPath ¶
func NewSQLiteBackendFromPath(path string) (*SQLiteBackend, error)
NewSQLiteBackendFromPath creates a SQLite backend from a file path.
func (*SQLiteBackend) BackfillSpansWorkspace ¶
func (*SQLiteBackend) Close ¶
func (b *SQLiteBackend) Close() error
Close closes the database connection.
func (*SQLiteBackend) CountChatMessages ¶
func (b *SQLiteBackend) CountChatMessages(ctx context.Context, q ChatMessageQuery) (int, int, error)
func (*SQLiteBackend) CreateAgentAssignment ¶
func (b *SQLiteBackend) CreateAgentAssignment(ctx context.Context, a *AgentAssignment) error
func (*SQLiteBackend) CreateChain ¶
func (b *SQLiteBackend) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)
func (*SQLiteBackend) CreateMessage ¶
func (b *SQLiteBackend) CreateMessage(ctx context.Context, m *Message) error
func (*SQLiteBackend) CreateMetric ¶
func (b *SQLiteBackend) CreateMetric(ctx context.Context, m *Metric) error
func (*SQLiteBackend) CreateSpan ¶
func (b *SQLiteBackend) CreateSpan(ctx context.Context, span *Span) error
func (*SQLiteBackend) CreateSpanEvent ¶
func (b *SQLiteBackend) CreateSpanEvent(ctx context.Context, e *SpanEvent) error
func (*SQLiteBackend) CreateStage ¶
func (b *SQLiteBackend) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)
func (*SQLiteBackend) CreateTask ¶
func (b *SQLiteBackend) CreateTask(ctx context.Context, t *Task) error
func (*SQLiteBackend) CreateWorkspace ¶
func (b *SQLiteBackend) CreateWorkspace(ctx context.Context, w *Workspace) error
func (*SQLiteBackend) DB ¶
func (b *SQLiteBackend) DB() *sql.DB
DB returns the underlying database for direct SQL queries.
func (*SQLiteBackend) DeleteAgentAssignment ¶
func (b *SQLiteBackend) DeleteAgentAssignment(ctx context.Context, id string) error
func (*SQLiteBackend) DeleteMessage ¶
func (b *SQLiteBackend) DeleteMessage(ctx context.Context, id string) error
func (*SQLiteBackend) DeleteSpan ¶
func (b *SQLiteBackend) DeleteSpan(ctx context.Context, id string) error
func (*SQLiteBackend) DeleteSpanEvent ¶
func (b *SQLiteBackend) DeleteSpanEvent(ctx context.Context, id int64) error
func (*SQLiteBackend) DeleteTask ¶
func (b *SQLiteBackend) DeleteTask(ctx context.Context, id string) error
func (*SQLiteBackend) DeleteWorkspace ¶
func (b *SQLiteBackend) DeleteWorkspace(ctx context.Context, id string) error
func (*SQLiteBackend) FindLatestUnfinishedTool ¶
func (*SQLiteBackend) GetAgentAssignment ¶
func (b *SQLiteBackend) GetAgentAssignment(ctx context.Context, id string) (*AgentAssignment, error)
func (*SQLiteBackend) GetAgentStats ¶
func (b *SQLiteBackend) GetAgentStats(ctx context.Context, agentID string) (*AgentStats, error)
func (*SQLiteBackend) GetBreakdownByModel ¶
func (b *SQLiteBackend) GetBreakdownByModel(ctx context.Context) ([]BreakdownItem, error)
GetBreakdownByModel returns cost/token breakdown by model. Scoped to last 30 days for performance on large databases (M-PERF-OBSERVATORY).
func (*SQLiteBackend) GetBreakdownByProvider ¶
func (b *SQLiteBackend) GetBreakdownByProvider(ctx context.Context) ([]BreakdownItem, error)
GetBreakdownByProvider returns cost/token breakdown by provider. Scoped to last 30 days for performance on large databases (M-PERF-OBSERVATORY).
func (*SQLiteBackend) GetBreakdownBySourceType ¶
func (b *SQLiteBackend) GetBreakdownBySourceType(ctx context.Context) ([]BreakdownItem, error)
GetBreakdownBySourceType returns cost/token breakdown by source type. Uses two-phase aggregation: first aggregate per trace_id, then join with trace_summaries. M-PERF-OBSERVATORY: avoids 662K×213K JOIN by reducing to ~130K trace groups first.
func (*SQLiteBackend) GetBreakdownByWorkspace ¶
func (b *SQLiteBackend) GetBreakdownByWorkspace(ctx context.Context) ([]BreakdownItem, error)
GetBreakdownByWorkspace groups spans by workspace using config-driven path mapping. Note: Claude Code spans don't include process.cwd, so they show as "Unknown Workspace".
func (*SQLiteBackend) GetBreakdownByWorkspaceWithMapping ¶
func (b *SQLiteBackend) GetBreakdownByWorkspaceWithMapping(ctx context.Context, mapping WorkspaceMapping, wsConfig WorkspaceMapping) ([]BreakdownItem, error)
GetBreakdownByWorkspaceWithMapping uses the provided mapping to convert file paths to workspace IDs.
func (*SQLiteBackend) GetChain ¶
func (b *SQLiteBackend) GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)
func (*SQLiteBackend) GetChainByGitHubIssue ¶
func (b *SQLiteBackend) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)
func (*SQLiteBackend) GetChainByMessageID ¶
func (b *SQLiteBackend) GetChainByMessageID(ctx context.Context, messageID string) (*ExecutionChain, error)
func (*SQLiteBackend) GetChainByTaskID ¶
func (b *SQLiteBackend) GetChainByTaskID(ctx context.Context, taskID string) (*ExecutionChain, error)
func (*SQLiteBackend) GetChainJourney ¶
func (b *SQLiteBackend) GetChainJourney(ctx context.Context, chainID string) (*JourneyResponse, error)
func (*SQLiteBackend) GetChainStages ¶
func (b *SQLiteBackend) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)
func (*SQLiteBackend) GetChainStats ¶
func (b *SQLiteBackend) GetChainStats(ctx context.Context) (*ChainStats, error)
func (*SQLiteBackend) GetChainStatsByAgent ¶
func (b *SQLiteBackend) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)
func (*SQLiteBackend) GetChainStatusCounts ¶
func (b *SQLiteBackend) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)
func (*SQLiteBackend) GetChatMessagesBySession ¶
func (b *SQLiteBackend) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)
func (*SQLiteBackend) GetChatMessagesByTaskID ¶
func (b *SQLiteBackend) GetChatMessagesByTaskID(ctx context.Context, taskID string) ([]*ChatMessage, error)
func (*SQLiteBackend) GetClaudeCodeEvents ¶
func (b *SQLiteBackend) GetClaudeCodeEvents(ctx context.Context, limit int) ([]ClaudeCodeEvent, error)
GetClaudeCodeEvents returns Claude Code sessions aggregated by session.id. Multiple api_request spans (turns) in one session are aggregated into a single event. This provides a cleaner Event Queue with one entry per Claude Code conversation.
func (*SQLiteBackend) GetClaudeCodeEventsWithLookup ¶
func (b *SQLiteBackend) GetClaudeCodeEventsWithLookup(ctx context.Context, limit int, lookup TaskAgentLookup, sourceType, workspaceFilter string, wsConfig WorkspaceMapping) ([]ClaudeCodeEvent, error)
GetClaudeCodeEventsWithLookup returns Claude Code sessions with agent info resolved. For sessions spawned by coordinator tasks, FromAgent and ToInbox are set from agent_assignments. For user-initiated sessions (no coordinator task), defaults to "claude-code" / "user". Note: The lookup parameter is kept for API compatibility but is no longer used - agent info comes directly from observatory.db via JOIN with agent_assignments. The sourceType parameter filters by source:
- "coordinator": Only sessions with agent_assignment (coordinator-spawned)
- "user_session": Only sessions without agent_assignment (user-initiated)
- "": No filter (all sessions)
The workspaceFilter parameter filters by workspace path:
- Full path: "/Users/mark/dev/TwilightGame" (exact or contains match)
- "unknown" or "No Workspace": Sessions with empty workspace
- "coordinator_worktrees": Sessions in worktree directories
- "": No filter (all workspaces)
func (*SQLiteBackend) GetClaudeCodeHierarchy ¶
func (b *SQLiteBackend) GetClaudeCodeHierarchy(ctx context.Context, sessionID string) (*TaskHierarchy, error)
GetClaudeCodeHierarchy returns the hierarchy for a Claude Code session. The sessionID is the session.id from Claude Code telemetry (UUID format).
Claude Code telemetry creates a SEPARATE trace for each span (api_request and tool calls). This means we can't use standard trace hierarchy building (which relies on parent_span_id). Instead, we build a turn-based hierarchy using timestamp correlation:
- api_request spans = "turns" (top level)
- tool calls that occur before the next api_request = children of that turn
func (*SQLiteBackend) GetExecTaskHierarchy ¶
func (b *SQLiteBackend) GetExecTaskHierarchy(ctx context.Context, limit int) ([]*ExecTaskNode, error)
func (*SQLiteBackend) GetExecTaskHierarchyWithMessages ¶
func (b *SQLiteBackend) GetExecTaskHierarchyWithMessages(ctx context.Context, limit int) (*ExecHierarchyWithMessages, error)
func (*SQLiteBackend) GetFilteredBreakdownByModel ¶
func (b *SQLiteBackend) GetFilteredBreakdownByModel(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) ([]BreakdownItem, error)
GetFilteredBreakdownByModel returns model breakdown with filters applied
func (*SQLiteBackend) GetFilteredBreakdownByProvider ¶
func (b *SQLiteBackend) GetFilteredBreakdownByProvider(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) ([]BreakdownItem, error)
GetFilteredBreakdownByProvider returns provider breakdown with filters applied
func (*SQLiteBackend) GetFilteredBreakdownBySourceType ¶
func (b *SQLiteBackend) GetFilteredBreakdownBySourceType(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) ([]BreakdownItem, error)
GetFilteredBreakdownBySourceType returns source type breakdown with filters applied. Uses trace_summaries.root_span_name for fast categorization (M-PERF-OBSERVATORY).
func (*SQLiteBackend) GetFilteredBreakdownByWorkspace ¶
func (b *SQLiteBackend) GetFilteredBreakdownByWorkspace(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) ([]BreakdownItem, error)
GetFilteredBreakdownByWorkspace returns workspace breakdown with filters applied. Uses a basic fallback mapping - for full config support use GetFilteredBreakdownByWorkspaceWithMapping.
func (*SQLiteBackend) GetFilteredBreakdownByWorkspaceWithMapping ¶
func (b *SQLiteBackend) GetFilteredBreakdownByWorkspaceWithMapping(ctx context.Context, filter *ControlPlaneFilter, mapping WorkspaceMapping, wsConfig WorkspaceMapping) ([]BreakdownItem, error)
GetFilteredBreakdownByWorkspaceWithMapping returns workspace breakdown with config-driven path mapping. The mapping parameter converts file paths to Firestore workspace IDs. Note: Workspace filter is deliberately excluded from conditions since this query groups BY workspace.
func (*SQLiteBackend) GetFilteredHeatmapData ¶
func (b *SQLiteBackend) GetFilteredHeatmapData(ctx context.Context, filter *ControlPlaneFilter, days int, wsConfig WorkspaceMapping) ([]HeatmapDataPoint, error)
GetFilteredHeatmapData returns daily activity data aggregated from spans
func (*SQLiteBackend) GetFilteredMetricsSummary ¶
func (b *SQLiteBackend) GetFilteredMetricsSummary(ctx context.Context, filter *ControlPlaneFilter, wsConfig WorkspaceMapping) (*MetricsSummary, error)
GetFilteredMetricsSummary returns metrics filtered by Control Plane filter
func (*SQLiteBackend) GetMessage ¶
func (*SQLiteBackend) GetMetricsSummary ¶
func (b *SQLiteBackend) GetMetricsSummary(ctx context.Context) (*MetricsSummary, error)
func (*SQLiteBackend) GetProviderComparison ¶
func (b *SQLiteBackend) GetProviderComparison(ctx context.Context) ([]*ProviderComparison, error)
func (*SQLiteBackend) GetSession ¶
func (*SQLiteBackend) GetSessionMetricsSummary ¶
func (b *SQLiteBackend) GetSessionMetricsSummary(ctx context.Context, sessionID string) (*SessionMetricsSummary, error)
func (*SQLiteBackend) GetSessionTools ¶
func (b *SQLiteBackend) GetSessionTools(ctx context.Context, sessionID string) ([]SessionTool, error)
func (*SQLiteBackend) GetSessionWorkspace ¶
func (b *SQLiteBackend) GetSessionWorkspace(sessionID string) (string, error)
func (*SQLiteBackend) GetSpanEvents ¶
func (*SQLiteBackend) GetSpanHierarchy ¶
func (b *SQLiteBackend) GetSpanHierarchy(ctx context.Context, limit int) (*SpanHierarchyResult, error)
func (*SQLiteBackend) GetSpanLitesByStageID ¶
func (b *SQLiteBackend) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)
func (*SQLiteBackend) GetSpansByStageID ¶
func (*SQLiteBackend) GetStage ¶
func (b *SQLiteBackend) GetStage(ctx context.Context, id string) (*ChainStage, error)
func (*SQLiteBackend) GetTaskSpanSummary ¶
func (b *SQLiteBackend) GetTaskSpanSummary(ctx context.Context, taskID string) (*TaskSpanSummary, error)
GetTaskSpanSummary returns aggregated span statistics for a task_id. Works for ALL task_id formats (coordinator, eval, UUID sessions).
func (*SQLiteBackend) GetTaskTimeline ¶
func (b *SQLiteBackend) GetTaskTimeline(ctx context.Context, taskID string) ([]*TaskTimeline, error)
func (*SQLiteBackend) GetToolForSpan ¶
func (b *SQLiteBackend) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)
func (*SQLiteBackend) GetToolsByTimestampRange ¶
func (b *SQLiteBackend) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)
func (*SQLiteBackend) GetWorkspace ¶
func (*SQLiteBackend) GetWorkspaceStats ¶
func (b *SQLiteBackend) GetWorkspaceStats(ctx context.Context, id string) (*WorkspaceStats, error)
func (*SQLiteBackend) InsertToolStart ¶
func (b *SQLiteBackend) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error
func (*SQLiteBackend) LinkOrphanedSpansBySession ¶
func (b *SQLiteBackend) LinkOrphanedSpansBySession(ctx context.Context, sessionID, taskID, assignmentID string) (int64, error)
LinkOrphanedSpansBySession updates orphaned spans when claude.execute span arrives. Fixes race condition where Claude Code events arrive before parent span is batch-flushed.
func (*SQLiteBackend) LinkSpanToChain ¶
func (b *SQLiteBackend) LinkSpanToChain(ctx context.Context, spanID, chainID, stageID string) error
func (*SQLiteBackend) ListAgentAssignments ¶
func (b *SQLiteBackend) ListAgentAssignments(ctx context.Context, taskID string) ([]*AgentAssignment, error)
func (*SQLiteBackend) ListChains ¶
func (b *SQLiteBackend) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)
func (*SQLiteBackend) ListMessages ¶
func (b *SQLiteBackend) ListMessages(ctx context.Context, opts MessageListOptions) ([]*Message, error)
func (*SQLiteBackend) ListMetrics ¶
func (b *SQLiteBackend) ListMetrics(ctx context.Context, opts MetricListOptions) ([]*Metric, error)
func (*SQLiteBackend) ListPendingApprovals ¶
func (b *SQLiteBackend) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)
func (*SQLiteBackend) ListSpans ¶
func (b *SQLiteBackend) ListSpans(ctx context.Context, opts SpanListOptions) ([]*Span, error)
func (*SQLiteBackend) ListSpansByTaskIDs ¶
func (*SQLiteBackend) ListSpansLightweight ¶
func (b *SQLiteBackend) ListSpansLightweight(ctx context.Context, opts SpanListOptions) ([]*Span, error)
ListSpansLightweight returns spans without heavy attributes/resource_attributes columns. Extracts only tool_name and session.id from attributes via json_extract (M-AUDIT-OBSERVATORY).
func (*SQLiteBackend) ListTasks ¶
func (b *SQLiteBackend) ListTasks(ctx context.Context, opts TaskListOptions) ([]*Task, error)
func (*SQLiteBackend) ListTraces ¶
func (b *SQLiteBackend) ListTraces(ctx context.Context, opts TraceQuery) ([]*TraceSummary, error)
func (*SQLiteBackend) ListWorkspaces ¶
func (b *SQLiteBackend) ListWorkspaces(ctx context.Context) ([]*Workspace, error)
func (*SQLiteBackend) LookupTaskBySessionID ¶
func (b *SQLiteBackend) LookupTaskBySessionID(ctx context.Context, sessionID string) (taskID, assignmentID, traceID string)
LookupTaskBySessionID finds task hierarchy for Claude Code session correlation. Used by OTLP receiver to link Claude Code internal events to their parent executor span.
func (*SQLiteBackend) MarkMessageArchived ¶
func (b *SQLiteBackend) MarkMessageArchived(ctx context.Context, id string) error
func (*SQLiteBackend) MarkMessageRead ¶
func (b *SQLiteBackend) MarkMessageRead(ctx context.Context, id string) error
func (*SQLiteBackend) RecalculateTaskAggregates ¶
func (b *SQLiteBackend) RecalculateTaskAggregates(ctx context.Context, taskID string) error
func (*SQLiteBackend) Store ¶
func (b *SQLiteBackend) Store() *Store
Store returns the underlying store for direct queries.
func (*SQLiteBackend) UpdateAgentAssignment ¶
func (b *SQLiteBackend) UpdateAgentAssignment(ctx context.Context, a *AgentAssignment) error
func (*SQLiteBackend) UpdateChainMetrics ¶
func (*SQLiteBackend) UpdateChainStatus ¶
func (b *SQLiteBackend) UpdateChainStatus(ctx context.Context, chainID string, status ChainStatus) error
func (*SQLiteBackend) UpdateMessage ¶
func (b *SQLiteBackend) UpdateMessage(ctx context.Context, m *Message) error
func (*SQLiteBackend) UpdateSessionEnded ¶
func (b *SQLiteBackend) UpdateSessionEnded(ctx context.Context, sessionID string) error
func (*SQLiteBackend) UpdateSpan ¶
func (b *SQLiteBackend) UpdateSpan(ctx context.Context, span *Span) error
func (*SQLiteBackend) UpdateSpanLinks ¶
func (b *SQLiteBackend) UpdateSpanLinks(ctx context.Context, spanID, taskID, assignmentID string) error
func (*SQLiteBackend) UpdateStageApproval ¶
func (b *SQLiteBackend) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, approvalType ApprovalType, feedback string) error
func (*SQLiteBackend) UpdateStageError ¶
func (b *SQLiteBackend) UpdateStageError(ctx context.Context, stageID, errorMessage string) error
func (*SQLiteBackend) UpdateStageMetrics ¶
func (*SQLiteBackend) UpdateStageSession ¶
func (b *SQLiteBackend) UpdateStageSession(ctx context.Context, stageID, sessionID string) error
func (*SQLiteBackend) UpdateStageStatus ¶
func (b *SQLiteBackend) UpdateStageStatus(ctx context.Context, stageID string, status ChainStageStatus) error
func (*SQLiteBackend) UpdateTask ¶
func (b *SQLiteBackend) UpdateTask(ctx context.Context, t *Task) error
func (*SQLiteBackend) UpdateToolEnd ¶
func (*SQLiteBackend) UpdateWorkspace ¶
func (b *SQLiteBackend) UpdateWorkspace(ctx context.Context, w *Workspace) error
func (*SQLiteBackend) UpsertSession ¶
func (b *SQLiteBackend) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error
func (*SQLiteBackend) UpsertSessionWithCorrelation ¶
func (b *SQLiteBackend) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, corr *SessionCorrelation) error
type SeedConfig ¶
type SeedConfig struct {
NumWorkspaces int
TasksPerWorkspace Range
SpansPerTask Range
IncludeMessages bool
CleanFirst bool
Verbose bool
}
SeedConfig configures the seed data generator.
func DefaultSeedConfig ¶
func DefaultSeedConfig() SeedConfig
DefaultSeedConfig returns a realistic default configuration.
func MinimalSeedConfig ¶
func MinimalSeedConfig() SeedConfig
MinimalSeedConfig returns a minimal configuration for quick testing.
func StressSeedConfig ¶
func StressSeedConfig() SeedConfig
StressSeedConfig returns a configuration for load testing.
type SeedResult ¶
type SeedResult struct {
WorkspacesCreated int
TasksCreated int
AssignmentsCreated int
SpansCreated int
EventsCreated int
MessagesCreated int
}
SeedResult contains statistics from the seed operation.
func SeedDatabase ¶
func SeedDatabase(ctx context.Context, backend Backend, cfg SeedConfig) (*SeedResult, error)
SeedDatabase generates test data in the database.
type Session ¶
type Session struct {
SessionID string `json:"session_id"`
Workspace string `json:"workspace"`
ClaudeVersion string `json:"claude_version,omitempty"`
Source string `json:"source"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at,omitempty"`
TurnCount int `json:"turn_count"`
ToolCount int `json:"tool_count"`
// Correlation IDs from environment (M-DETERMINISTIC-CHAT-LINKING)
// Set by Claude Code hooks when coordinator spawns with AILANG_* env vars
TaskID string `json:"task_id,omitempty"`
ChainID string `json:"chain_id,omitempty"`
StageID string `json:"stage_id,omitempty"`
MessageID string `json:"message_id,omitempty"`
}
Session represents a Claude Code session with workspace metadata.
type SessionCorrelation ¶
SessionCorrelation holds optional correlation IDs from environment.
type SessionMetricsSummary ¶
type SessionMetricsSummary struct {
SessionID string `json:"session_id"`
// Token metrics (from spans)
TokensIn int64 `json:"tokens_in"`
TokensOut int64 `json:"tokens_out"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheCreationTokens int64 `json:"cache_creation_tokens"`
// Cost metrics (from spans + calculated)
TotalCostUSD float64 `json:"total_cost_usd"`
CacheSavingsUSD float64 `json:"cache_savings_usd"` // Savings from cache reads
// Lines of code (from metrics)
LinesAdded int64 `json:"lines_added"`
LinesRemoved int64 `json:"lines_removed"`
// Activity metrics (from metrics)
CommitCount int64 `json:"commit_count"`
PullRequestCount int64 `json:"pull_request_count"`
ActiveTimeMs int64 `json:"active_time_ms"`
TurnCount int `json:"turn_count"`
ToolCalls int `json:"tool_calls"`
DurationMs int64 `json:"duration_ms"`
SpanCount int `json:"span_count"`
ErrorCount int `json:"error_count"`
SuccessRate float64 `json:"success_rate"`
}
SessionMetricsSummary aggregates all metrics for a session.
type SessionTool ¶
type SessionTool struct {
ToolUseID string `json:"tool_use_id"`
SessionID string `json:"session_id"`
ToolName string `json:"tool_name"`
ToolInput json.RawMessage `json:"tool_input,omitempty"`
ToolResponse json.RawMessage `json:"tool_response,omitempty"`
StartTime time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time,omitempty"`
Success *bool `json:"success,omitempty"`
}
SessionTool represents a tool call within a Claude Code session.
type Span ¶
type Span struct {
ID string `json:"id"`
TraceID string `json:"trace_id"`
ParentSpanID string `json:"parent_span_id,omitempty"`
TaskID string `json:"task_id,omitempty"`
AgentAssignmentID string `json:"agent_assignment_id,omitempty"`
ChainID string `json:"chain_id,omitempty"` // M-CHAINS-SIMPLIFY: links to execution_chains
StageID string `json:"stage_id,omitempty"` // M-CHAINS-SIMPLIFY: links to chain_stages
Name string `json:"name"`
Kind SpanKind `json:"kind"`
Status SpanStatus `json:"status"`
StatusMessage string `json:"status_message,omitempty"`
StartTime time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
// Normalized attributes (common across providers)
TokensIn int64 `json:"tokens_in,omitempty"`
TokensOut int64 `json:"tokens_out,omitempty"`
CacheReadTokens int64 `json:"cache_read_tokens,omitempty"`
CacheCreationTokens int64 `json:"cache_creation_tokens,omitempty"`
CostUSD float64 `json:"cost_usd,omitempty"`
Model string `json:"model,omitempty"`
Provider Provider `json:"provider,omitempty"`
// Full attributes as JSON (for provider-specific data)
Attributes map[string]any `json:"attributes,omitempty"`
ResourceAttributes map[string]any `json:"resource_attributes,omitempty"`
// Children (for tree building, not stored in DB)
Children []*Span `json:"children,omitempty"`
// Events (loaded separately)
Events []SpanEvent `json:"events,omitempty"`
// DisplayName is an enriched human-readable label (not stored in DB, computed from session_tools)
DisplayName string `json:"display_name,omitempty"`
// ChatContext contains embedded chat content (populated with include_chat=true API param)
ChatContext *ChatContext `json:"chat_context,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
Span represents an OTEL span with normalized attributes.
func (*Span) AttributesJSON ¶
AttributesJSON returns attributes as a JSON string for storage.
func (*Span) ParseAttributes ¶
ParseAttributes parses a JSON string into attributes.
func (*Span) ParseResourceAttributes ¶
ParseResourceAttributes parses a JSON string into resource attributes.
func (*Span) ResourceAttributesJSON ¶
ResourceAttributesJSON returns resource attributes as a JSON string for storage.
type SpanEvent ¶
type SpanEvent struct {
ID int64 `json:"id,omitempty"`
SpanID string `json:"span_id"`
Name string `json:"name"`
Timestamp time.Time `json:"timestamp"`
EventType EventType `json:"event_type,omitempty"`
// Denormalized for common event types
ApprovalStatus ApprovalStatus `json:"approval_status,omitempty"`
ToolName string `json:"tool_name,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
Attributes map[string]any `json:"attributes,omitempty"`
}
SpanEvent represents an event attached to a span (approvals, tool calls, errors).
func (*SpanEvent) AttributesJSON ¶
AttributesJSON returns attributes as a JSON string for storage.
func (*SpanEvent) ParseAttributes ¶
ParseAttributes parses a JSON string into attributes.
type SpanFilterConfig ¶
type SpanFilterConfig struct {
AllowPatterns []FilterPattern // Allow takes priority — matching spans are ALWAYS kept
DenyPatterns []FilterPattern // Matching spans are dropped (unless allow-listed)
DisableAll bool // Bypass all filtering (debug mode)
}
SpanFilterConfig holds configurable span filtering rules. Loaded once at startup from environment variables; immutable after creation.
func DefaultSpanFilterConfig ¶
func DefaultSpanFilterConfig() *SpanFilterConfig
DefaultSpanFilterConfig returns the default deny rules matching the original hard-coded behavior.
func LoadSpanFilterConfig ¶
func LoadSpanFilterConfig() *SpanFilterConfig
LoadSpanFilterConfig creates a SpanFilterConfig from environment variables, merging with defaults. Exported for testing.
type SpanHierarchyNode ¶
type SpanHierarchyNode struct {
ID string `json:"id"`
Name string `json:"name"`
ParentID string `json:"parent_id,omitempty"`
Depth int `json:"depth"`
StartTime time.Time `json:"start_time"`
DurationMs int64 `json:"duration_ms"`
TokensIn int64 `json:"tokens_in,omitempty"`
TokensOut int64 `json:"tokens_out,omitempty"`
CacheReadTokens int64 `json:"cache_read_tokens,omitempty"`
CacheCreationTokens int64 `json:"cache_creation_tokens,omitempty"`
CostUSD float64 `json:"cost_usd,omitempty"`
TurnNumber int `json:"turn_number,omitempty"`
ToolName string `json:"tool_name,omitempty"`
NodeType SpanHierarchyNodeType `json:"node_type"`
SessionID string `json:"session_id,omitempty"`
Status SpanStatus `json:"status"`
Provider Provider `json:"provider,omitempty"`
Children []*SpanHierarchyNode `json:"children,omitempty"`
Attributes map[string]interface{} `json:"attributes,omitempty"` // Selected useful attributes
}
SpanHierarchyNode represents a span with its children for hierarchy visualization. Unlike Span, this is optimized for graph/tree rendering with parent_span_id-based relationships.
type SpanHierarchyNodeType ¶
type SpanHierarchyNodeType string
SpanHierarchyNodeType categorizes spans for visualization.
const ( NodeTypeCoordinator SpanHierarchyNodeType = "coordinator" NodeTypeExecutor SpanHierarchyNodeType = "executor" NodeTypeTurn SpanHierarchyNodeType = "turn" NodeTypeTool SpanHierarchyNodeType = "tool" NodeTypeOther SpanHierarchyNodeType = "other" )
type SpanHierarchyResult ¶
type SpanHierarchyResult struct {
Roots []*SpanHierarchyNode `json:"roots"`
Sessions map[string]int `json:"sessions"` // session_id -> turn_count
Stats SpanHierarchyStats `json:"stats"`
}
SpanHierarchyResult contains the complete hierarchy result with roots and sessions.
type SpanHierarchyStats ¶
type SpanHierarchyStats struct {
TotalSpans int `json:"total_spans"`
TotalCost float64 `json:"total_cost"`
TotalTokens struct {
In int64 `json:"in"`
Out int64 `json:"out"`
CacheRead int64 `json:"cache_read"`
CacheCreation int64 `json:"cache_creation"`
} `json:"total_tokens"`
CacheSavingsUSD float64 `json:"cache_savings_usd,omitempty"`
TimeRange struct {
Start time.Time `json:"start"`
End time.Time `json:"end"`
} `json:"time_range"`
MaxDepth int `json:"max_depth"`
}
SpanHierarchyStats contains aggregated stats for a hierarchy result.
type SpanListOptions ¶
type SpanListOptions struct {
TraceID string
TaskID string
AgentAssignmentID string
StartAfter time.Time
StartBefore time.Time
Limit int
Offset int
// Filter by provider (claude, gemini, openai, etc.)
Provider string
// Filter by model name (claude-sonnet-4-5, gemini-2-5-flash, etc.)
Model string
// Filter by span status (ok, error)
Status string
// Filter by workspace path (filters via task→workspace relationship)
Workspace string
// Filter by workspace ID directly (more efficient when workspace_id is known)
WorkspaceID string
}
SpanListOptions configures span listing.
type SpanLite ¶
type SpanLite struct {
ID string `json:"id"`
TraceID string `json:"trace_id"`
ParentSpanID string `json:"parent_span_id,omitempty"`
ChainID string `json:"chain_id,omitempty"`
StageID string `json:"stage_id,omitempty"`
Name string `json:"name"`
Kind SpanKind `json:"kind"`
Status string `json:"status"`
StatusMessage string `json:"status_message,omitempty"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time,omitempty"`
DurationMs int64 `json:"duration_ms"`
TokensIn int64 `json:"tokens_in,omitempty"`
TokensOut int64 `json:"tokens_out,omitempty"`
CostUSD float64 `json:"cost_usd,omitempty"`
Model string `json:"model,omitempty"`
Provider string `json:"provider,omitempty"`
}
SpanLite is a lightweight span without the heavy attributes/resource_attributes columns. Used for tree rendering, timeline, and waterfall views (M-PERF-OBSERVATORY). The full attributes are only loaded on-demand via GetSpan (Level 3).
type SpanLitePage ¶
type SpanLitePage struct {
Spans []*SpanLite `json:"spans"`
Total int `json:"total"`
Limit int `json:"limit"`
Offset int `json:"offset"`
}
SpanLitePage is a paginated response of SpanLite records.
type SpanNameCount ¶
SpanNameCount holds a span name and its count.
type SpanOutlier ¶
type SpanOutlier struct {
Span *Span `json:"span"`
Metric string `json:"metric"` // "cost_usd", "duration_ms", "tokens"
Value float64 `json:"value"` // Actual metric value
Mean float64 `json:"mean"` // Task mean for this metric
StdDev float64 `json:"std_dev"` // Task standard deviation
ZScore float64 `json:"z_score"` // (value - mean) / stddev
PercentOfTotal float64 `json:"percent_of_total"` // What % of task total this span represents
}
SpanOutlier represents a span that is statistically anomalous within its task.
type SpanStatus ¶
type SpanStatus string
SpanStatus represents the status of a span.
const ( SpanStatusOK SpanStatus = "ok" SpanStatusError SpanStatus = "error" SpanStatusUnset SpanStatus = "unset" )
type StageCreateRequest ¶
type StageCreateRequest struct {
ChainID string `json:"chain_id"`
AgentID string `json:"agent_id"`
Provider Provider `json:"provider,omitempty"`
MessageID string `json:"message_id,omitempty"`
TaskID string `json:"task_id,omitempty"`
HandoffTo string `json:"handoff_to,omitempty"`
Iteration int `json:"iteration,omitempty"` // Defaults to 1
}
StageCreateRequest contains the data needed to create a new stage in a chain.
type StageUpdateRequest ¶
type StageUpdateRequest struct {
Status *ChainStageStatus `json:"status,omitempty"`
SessionID *string `json:"session_id,omitempty"`
ApprovalStatus *ApprovalStatus `json:"approval_status,omitempty"`
ApprovalType *ApprovalType `json:"approval_type,omitempty"`
HumanFeedback *string `json:"human_feedback,omitempty"`
ErrorMessage *string `json:"error_message,omitempty"`
}
StageUpdateRequest contains fields that can be updated on a stage.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store provides CRUD operations for the observatory platform.
func OpenDefaultStore ¶
OpenDefaultStore opens the observatory database at the default path, runs migrations, and returns a ready-to-use Store. This is the recommended way to access observatory.db from CLI tools.
func OpenStore ¶
OpenStore opens the observatory database at the given path, runs migrations, and returns a ready-to-use Store.
func (*Store) BackfillSpansWorkspace ¶
func (s *Store) BackfillSpansWorkspace(ctx context.Context, sessionID, workspace string) (int64, error)
BackfillSpansWorkspace updates existing spans that have the given session.id but are missing workspace information. Returns the number of spans updated.
func (*Store) CountChatMessages ¶
func (s *Store) CountChatMessages(ctx context.Context, q ChatMessageQuery) (total int, withTaskID int, err error)
CountChatMessages returns total and linked message counts for health diagnostics.
func (*Store) CreateAgentAssignment ¶
func (s *Store) CreateAgentAssignment(a *AgentAssignment) error
CreateAgentAssignment inserts a new agent assignment.
func (*Store) CreateChain ¶
func (s *Store) CreateChain(ctx context.Context, req *ChainCreateRequest) (*ExecutionChain, error)
CreateChain creates a new execution chain. Returns the chain with its generated ID populated.
func (*Store) CreateMessage ¶
CreateMessage inserts a new message.
func (*Store) CreateMetric ¶
CreateMetric inserts a new metric.
func (*Store) CreateSpan ¶
CreateSpan inserts a new span. Note: This is the low-level insert WITHOUT aggregation updates. For transactional span creation with aggregation updates, use SQLiteBackend.CreateSpan which wraps this in a transaction with UpdateTaskAggregates and UpdateAgentAssignmentAggregates.
func (*Store) CreateSpanEvent ¶
CreateSpanEvent inserts a new span event.
func (*Store) CreateStage ¶
func (s *Store) CreateStage(ctx context.Context, req *StageCreateRequest) (*ChainStage, error)
CreateStage creates a new stage within a chain. Returns the stage with its generated ID populated.
func (*Store) CreateWorkspace ¶
CreateWorkspace inserts a new workspace.
func (*Store) DeleteAgentAssignment ¶
DeleteAgentAssignment removes an assignment by ID.
func (*Store) DeleteChain ¶
DeleteChain removes a chain and all its stages (CASCADE).
func (*Store) DeleteMessage ¶
DeleteMessage removes a message by ID.
func (*Store) DeleteSpan ¶
DeleteSpan removes a span by ID.
func (*Store) DeleteSpanEvent ¶
DeleteSpanEvent removes a span event by ID.
func (*Store) DeleteTask ¶
DeleteTask removes a task by ID.
func (*Store) DeleteWorkspace ¶
DeleteWorkspace removes a workspace by ID.
func (*Store) FindLatestUnfinishedTool ¶
func (s *Store) FindLatestUnfinishedTool(ctx context.Context, sessionID, toolName string) (string, error)
FindLatestUnfinishedTool finds the most recent tool call that hasn't completed yet. Used to correlate PostToolUse with PreToolUse when tool_use_id is not provided.
func (*Store) GetAgentAssignment ¶
func (s *Store) GetAgentAssignment(id string) (*AgentAssignment, error)
GetAgentAssignment retrieves an agent assignment by ID.
func (*Store) GetAgentStats ¶
func (s *Store) GetAgentStats(agentID string) (*AgentStats, error)
GetAgentStats returns aggregated stats for an agent.
func (*Store) GetChain ¶
func (s *Store) GetChain(ctx context.Context, id string, opts ChainReadOptions) (*ExecutionChain, error)
GetChain retrieves a chain by ID with optional related data.
func (*Store) GetChainByGitHubIssue ¶
func (s *Store) GetChainByGitHubIssue(ctx context.Context, repo string, issueNumber int) (*ExecutionChain, error)
GetChainByGitHubIssue finds the chain for a GitHub issue.
func (*Store) GetChainByMessageID ¶
GetChainByMessageID finds the chain containing a given message ID.
func (*Store) GetChainByTaskID ¶
GetChainByTaskID finds the chain containing a given task ID.
func (*Store) GetChainJourney ¶
GetChainJourney computes a narrative journey for a chain from its stages.
func (*Store) GetChainStages ¶
func (s *Store) GetChainStages(ctx context.Context, chainID string, opts ChainReadOptions) ([]*ChainStage, error)
GetChainStages returns all stages for a chain.
func (*Store) GetChainStats ¶
func (s *Store) GetChainStats(ctx context.Context) (*ChainStats, error)
GetChainStats returns aggregate statistics about chains.
func (*Store) GetChainStatsByAgent ¶
func (s *Store) GetChainStatsByAgent(ctx context.Context, createdAfter *time.Time) ([]*AgentStatsResult, error)
GetChainStatsByAgent returns per-agent aggregated stats in a single SQL query. Replaces the N+1 pattern of fetching all chains then querying stages per chain (M-PERF-OBSERVATORY).
func (*Store) GetChainStatusCounts ¶
func (s *Store) GetChainStatusCounts(ctx context.Context, createdAfter *time.Time) (*ChainStatusCounts, error)
GetChainStatusCounts returns chain counts grouped by status in a single query. Replaces the pattern of fetching all chains and counting in Go (M-PERF-OBSERVATORY).
func (*Store) GetChatMessagesBySession ¶
func (s *Store) GetChatMessagesBySession(ctx context.Context, sessionID string, startTime, endTime time.Time) ([]*ChatMessage, error)
GetChatMessagesBySession fetches chat messages for a session, optionally filtered by time range. Pass zero-value times to skip time filtering.
func (*Store) GetChatMessagesByTaskID ¶
GetChatMessagesByTaskID fetches chat messages linked to a task via task_id. This is the preferred deterministic query (M-DETERMINISTIC-CHAT-LINKING).
func (*Store) GetExecTaskHierarchy ¶
func (s *Store) GetExecTaskHierarchy(limit int) ([]*ExecTaskNode, error)
GetExecTaskHierarchy returns the hierarchy of ailang commands (exec, run, check) from span attributes including turn and tool_use child spans for complete hierarchy visualization
func (*Store) GetExecTaskHierarchyWithMessages ¶
func (s *Store) GetExecTaskHierarchyWithMessages(limit int) (*ExecHierarchyWithMessages, error)
GetExecTaskHierarchyWithMessages returns the exec hierarchy grouped by triggering messages. This provides a 4-level view: Messages -> Execs -> Turns -> Tool Uses
func (*Store) GetMessage ¶
GetMessage retrieves a message by ID.
func (*Store) GetMetricsSummary ¶
func (s *Store) GetMetricsSummary() (*MetricsSummary, error)
GetMetricsSummary returns global metrics.
func (*Store) GetProviderComparison ¶
func (s *Store) GetProviderComparison() ([]*ProviderComparison, error)
GetProviderComparison returns comparison metrics across providers.
func (*Store) GetSession ¶
GetSession returns full session details.
func (*Store) GetSessionMetricsSummary ¶
func (s *Store) GetSessionMetricsSummary(sessionID string) (*SessionMetricsSummary, error)
GetSessionMetricsSummary aggregates all metrics for a session.
func (*Store) GetSessionTools ¶
GetSessionTools returns all tool calls for a session.
func (*Store) GetSessionWorkspace ¶
GetSessionWorkspace returns the workspace for a session. Returns empty string if session not found.
func (*Store) GetSpanEvents ¶
GetSpanEvents retrieves all events for a span.
func (*Store) GetSpanHierarchy ¶
func (s *Store) GetSpanHierarchy(limit int) (*SpanHierarchyResult, error)
GetSpanHierarchy returns a hierarchical tree of spans using parent_span_id relationships. Unlike GetExecTaskHierarchy which requires custom attributes, this works with standard OTEL parenting. Returns roots (coordinator/executor spans), session groups, and stats.
func (*Store) GetSpanLitesByStageID ¶
func (s *Store) GetSpanLitesByStageID(ctx context.Context, stageID string, limit, offset int) (*SpanLitePage, error)
GetSpanLitesByStageID returns lightweight spans for a stage without the heavy attributes columns. This avoids reading the 3.9GB attributes data when only metadata is needed (M-PERF-OBSERVATORY).
func (*Store) GetSpansByStageID ¶
GetSpansByStageID returns spans linked to a stage via stage_id column.
func (*Store) GetStageEvalAssessment ¶
func (s *Store) GetStageEvalAssessment(ctx context.Context, stageID string) (*EvalAssessment, error)
GetStageEvalAssessment reads eval assessment data from a chain stage. Returns nil, nil if the stage exists but has no eval assessment.
func (*Store) GetTaskSpanSummary ¶
GetTaskSpanSummary returns aggregated span statistics for a task_id. Works for ALL task_id formats (coordinator, eval, UUID sessions).
func (*Store) GetTaskTimeline ¶
func (s *Store) GetTaskTimeline(taskID string) ([]*TaskTimeline, error)
GetTaskTimeline returns timeline data for a task.
func (*Store) GetToolForSpan ¶
func (s *Store) GetToolForSpan(ctx context.Context, sessionID, toolName string, spanTime time.Time) (*SessionTool, error)
GetToolForSpan finds the session_tool that best matches a span by timestamp + tool name. Uses a ±10 second window to handle clock drift and hook execution delay. Returns nil if no matching tool is found.
func (*Store) GetToolsByTimestampRange ¶
func (s *Store) GetToolsByTimestampRange(ctx context.Context, start, end time.Time, toolName string) ([]SessionTool, error)
GetToolsByTimestampRange returns tools that started within a time range. Used for cross-system correlation when session IDs don't match.
func (*Store) GetToolsForSessions ¶
func (s *Store) GetToolsForSessions(ctx context.Context, sessionIDs []string) (map[string][]SessionTool, error)
GetToolsForSessions returns tools for multiple session IDs, grouped by session. This is used for enriching spans with tool metadata.
func (*Store) GetWorkspace ¶
GetWorkspace retrieves a workspace by ID.
func (*Store) GetWorkspaceStats ¶
func (s *Store) GetWorkspaceStats(workspaceID string) (*WorkspaceStats, error)
GetWorkspaceStats returns aggregated stats for a workspace.
func (*Store) IncrementSessionTurns ¶
IncrementSessionTurns increments the turn count for a session.
func (*Store) InsertToolStart ¶
func (s *Store) InsertToolStart(ctx context.Context, sessionID, toolUseID, toolName, toolInput string) error
InsertToolStart records the start of a tool call.
func (*Store) LinkOrphanedSpansBySession ¶
LinkOrphanedSpansBySession updates spans that have a matching session.id but no task_id. This fixes the race condition where Claude Code internal events arrive via OTLP before the claude.execute span is batch-flushed by the OTEL SDK. Called after storing a claude.execute span to retroactively link orphaned child spans.
func (*Store) LinkSpanToChain ¶
LinkSpanToChain updates a span's chain_id and stage_id. Called during OTEL ingest when AILANG_CHAIN_ID/AILANG_STAGE_ID env vars are present.
func (*Store) ListAgentAssignments ¶
func (s *Store) ListAgentAssignments(taskID string) ([]*AgentAssignment, error)
ListAgentAssignments returns assignments for a task.
func (*Store) ListChains ¶
func (s *Store) ListChains(ctx context.Context, opts ChainListOptions) ([]*ChainSummary, error)
ListChains returns chains matching the given options.
func (*Store) ListEvalChains ¶
ListEvalChains returns chains with source_type = "eval_suite".
func (*Store) ListMessages ¶
func (s *Store) ListMessages(opts MessageListOptions) ([]*Message, error)
ListMessages returns messages with optional filtering.
func (*Store) ListMetrics ¶
func (s *Store) ListMetrics(opts MetricListOptions) ([]*Metric, error)
ListMetrics retrieves metrics with optional filtering.
func (*Store) ListPendingApprovals ¶
func (s *Store) ListPendingApprovals(ctx context.Context, limit int) ([]*PendingApprovalInfo, error)
ListPendingApprovals returns all stages awaiting approval.
func (*Store) ListRecentSessions ¶
ListRecentSessions returns recent sessions ordered by start time.
func (*Store) ListSpans ¶
func (s *Store) ListSpans(opts SpanListOptions) ([]*Span, error)
ListSpans returns spans with optional filtering.
func (*Store) ListSpansByTaskIDs ¶
ListSpansByTaskIDs fetches spans for multiple task IDs in a single query. Returns a map of task_id -> []*Span. Much more efficient than calling ListSpans once per task (single query vs N queries).
func (*Store) ListSpansLightweight ¶
func (s *Store) ListSpansLightweight(opts SpanListOptions) ([]*Span, error)
ListSpansLightweight returns spans without the heavy attributes/resource_attributes columns. Instead of reading the full JSON blobs (3.9GB total), it extracts only the specific attribute fields needed for enrichment: tool_name and session.id. This is 10-50x faster than ListSpans for large datasets. (M-AUDIT-OBSERVATORY)
func (*Store) ListTasks ¶
func (s *Store) ListTasks(opts TaskListOptions) ([]*Task, error)
ListTasks returns tasks with optional filtering.
func (*Store) ListTraces ¶
func (s *Store) ListTraces(opts TraceQuery) ([]*TraceSummary, error)
ListTraces returns trace summaries. Uses the trace_summaries materialized table (M-PERF-OBSERVATORY v12 migration) instead of correlated subqueries on the spans table.
func (*Store) ListWorkspaces ¶
ListWorkspaces returns all workspaces.
func (*Store) LookupTaskBySessionID ¶
LookupTaskBySessionID finds task hierarchy info for a given session ID. This enables correlating Claude Code internal events to their parent executor span. Returns taskID, agentAssignmentID, traceID (for trace linking), or empty strings if not found.
func (*Store) MarkMessageArchived ¶
MarkMessageArchived marks a message as archived.
func (*Store) MarkMessageRead ¶
MarkMessageRead marks a message as read.
func (*Store) QueryEvalResults ¶
func (s *Store) QueryEvalResults(ctx context.Context, opts EvalQueryOptions) ([]*ChainStage, error)
QueryEvalResults returns chain stages that have eval assessments, with optional filters. Uses json_extract() for filtering on assessment fields.
func (*Store) RecalculateAgentAssignmentAggregates ¶
RecalculateAgentAssignmentAggregates recalculates all aggregate metrics for an agent assignment. Use this for backfill operations or to fix inconsistent aggregates.
func (*Store) RecalculateTaskAggregates ¶
RecalculateTaskAggregates recalculates all aggregate metrics for a task from its spans. Use this for backfill operations or to fix inconsistent aggregates.
func (*Store) RunRetention ¶
func (s *Store) RunRetention(ctx context.Context) (RetentionStats, error)
RunRetention deletes rows older than their TTL and checkpoints the WAL.
TTLs:
- spans, trace_summaries, metrics: 7 days
- chat_messages, session_tools: 30 days
Time columns are stored as TEXT (ISO-8601 via the go-sqlite3 driver), so comparisons go through SQLite's datetime() function on both sides. Comparing a TEXT column directly against an int64 Unix/UnixNano cutoff silently matches zero rows — SQLite's storage-class ordering always ranks INTEGER < TEXT.
Deletions are chunked to avoid WAL bloat when another connection holds back the checkpoint. VACUUM is not attempted automatically: it requires an exclusive lock that the running server process blocks, and the failure modes are ugly (temp file growth, SQLITE_BUSY loops). VACUUM should be run manually during a maintenance window when 'ailang serve' is stopped.
Called periodically by the coordinator daemon and on startup if the DB is oversized.
func (*Store) UpdateAgentAssignment ¶
func (s *Store) UpdateAgentAssignment(a *AgentAssignment) error
UpdateAgentAssignment updates an existing assignment.
func (*Store) UpdateChainMetrics ¶
func (s *Store) UpdateChainMetrics(ctx context.Context, id string, cost float64, tokens, turns int) error
UpdateChainMetrics updates the denormalized metrics on a chain.
func (*Store) UpdateChainStatus ¶
UpdateChainStatus updates the status of a chain.
func (*Store) UpdateMessage ¶
UpdateMessage updates an existing message.
func (*Store) UpdateSessionEnded ¶
UpdateSessionEnded marks a session as ended.
func (*Store) UpdateSpan ¶
UpdateSpan updates an existing span.
func (*Store) UpdateSpanLinks ¶
UpdateSpanLinks updates a span's task and agent assignment links. Used for session correlation to link orphaned spans to their parent task.
func (*Store) UpdateStageApproval ¶
func (s *Store) UpdateStageApproval(ctx context.Context, stageID string, status ApprovalStatus, approvalType ApprovalType, feedback string) error
UpdateStageApproval updates the approval status of a stage.
func (*Store) UpdateStageError ¶
UpdateStageError records an error on a stage.
func (*Store) UpdateStageEvalAssessment ¶
func (s *Store) UpdateStageEvalAssessment(ctx context.Context, stageID string, assessment *EvalAssessment) error
UpdateStageEvalAssessment stores eval assessment data as JSON on a chain stage.
func (*Store) UpdateStageMetrics ¶
func (s *Store) UpdateStageMetrics(ctx context.Context, stageID string, cost float64, tokensIn, tokensOut, turns, toolCalls int, durationMs int64) error
UpdateStageMetrics updates the denormalized metrics on a stage.
func (*Store) UpdateStageSession ¶
UpdateStageSession links a session to a stage.
func (*Store) UpdateStageStatus ¶
UpdateStageStatus updates a stage's execution status.
func (*Store) UpdateTask ¶
UpdateTask updates an existing task.
func (*Store) UpdateToolEnd ¶
func (s *Store) UpdateToolEnd(ctx context.Context, toolUseID, toolResponse string, success bool) error
UpdateToolEnd records the completion of a tool call.
func (*Store) UpdateWorkspace ¶
UpdateWorkspace updates an existing workspace.
func (*Store) UpsertSession ¶
func (s *Store) UpsertSession(ctx context.Context, sessionID, workspace, version, source string) error
UpsertSession inserts or updates a session record. If the session exists, updates workspace and claude_version (for reconnection scenarios).
func (*Store) UpsertSessionWithCorrelation ¶
func (s *Store) UpsertSessionWithCorrelation(ctx context.Context, sessionID, workspace, version, source string, corr *SessionCorrelation) error
UpsertSessionWithCorrelation inserts or updates a session with optional correlation IDs. Correlation IDs enable deterministic linking from Claude Code hooks via AILANG_* env vars.
type Subscription ¶
type Subscription struct {
// Filter events by workspace
WorkspaceID string `json:"workspace_id,omitempty"`
// Filter events by task
TaskID string `json:"task_id,omitempty"`
// Filter events by type (empty = all types)
EventTypes []WSEventType `json:"event_types,omitempty"`
}
Subscription represents a client's subscription preferences.
type Task ¶
type Task struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
ParentTaskID string `json:"parent_task_id,omitempty"` // Links to parent task for handoff chains
Title string `json:"title"`
Description string `json:"description,omitempty"`
SourceType TaskSourceType `json:"source_type"`
SourceRef string `json:"source_ref,omitempty"`
Status TaskStatus `json:"status"`
Priority string `json:"priority"`
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
// Aggregated metrics
TotalDurationMs int64 `json:"total_duration_ms"`
TotalTokensIn int64 `json:"total_tokens_in"`
TotalTokensOut int64 `json:"total_tokens_out"`
TotalCostUSD float64 `json:"total_cost_usd"`
AgentCount int `json:"agent_count"`
SpanCount int `json:"span_count"`
ErrorCount int `json:"error_count"`
}
Task represents a unit of work (from GitHub, messages, email, etc.).
type TaskAgentLookup ¶
type TaskAgentLookup func(ctx context.Context, taskID string) (agentID, inbox, title string, err error)
TaskAgentLookup is a callback to resolve coordinator task_id to agent info. Returns: agentID (used for FromAgent), inbox (used for ToInbox), title If the task_id is not found, returns empty strings (not an error).
type TaskHierarchy ¶
type TaskHierarchy struct {
Task *Task `json:"task"`
Agents []*AgentHierarchy `json:"agents"`
}
TaskHierarchy represents a task with its full agent and span hierarchy.
func GetTaskHierarchy ¶
func GetTaskHierarchy(ctx context.Context, backend Backend, taskID string, opts HierarchyOptions) (*TaskHierarchy, error)
GetTaskHierarchy builds the full hierarchy for a task.
type TaskListOptions ¶
type TaskListOptions struct {
WorkspaceID string
ParentTaskID string // Filter by parent_task_id (for handoff chains)
Status TaskStatus
SourceType TaskSourceType
Limit int
Offset int
}
TaskListOptions configures task listing.
type TaskMetricStats ¶
type TaskMetricStats struct {
Metric string `json:"metric"`
Count int `json:"count"` // Number of spans with non-zero values
Sum float64 `json:"sum"` // Total value across all spans
Mean float64 `json:"mean"` // Average value
StdDev float64 `json:"std_dev"` // Standard deviation
Min float64 `json:"min"`
Max float64 `json:"max"`
}
TaskMetricStats holds statistical summary for a single metric within a task.
type TaskSourceType ¶
type TaskSourceType string
TaskSourceType represents the origin of a task.
const ( TaskSourceGitHub TaskSourceType = "github_issue" TaskSourceMessage TaskSourceType = "message" TaskSourceEmail TaskSourceType = "email" TaskSourceManual TaskSourceType = "manual" )
type TaskSpanSummary ¶
type TaskSpanSummary struct {
TaskID string `json:"task_id"`
SpanCount int `json:"span_count"`
TraceCount int `json:"trace_count"`
TokensIn int64 `json:"tokens_in"`
TokensOut int64 `json:"tokens_out"`
CostUSD float64 `json:"cost_usd"`
DurationMs int64 `json:"duration_ms"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
// Top span names by count (e.g., "api_request: 487, claude_code.tool.Read: 338")
TopSpanNames []SpanNameCount `json:"top_span_names"`
}
extractServiceName extracts service.name from resource_attributes JSON. TaskSpanSummary holds aggregated span statistics for a task_id. Used by "ailang chains find --task-id" when no execution_chain exists (e.g., Claude Code user sessions that aren't coordinator-managed).
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the status of a task.
const ( TaskStatusPending TaskStatus = "pending" TaskStatusRunning TaskStatus = "running" TaskStatusCompleted TaskStatus = "completed" TaskStatusFailed TaskStatus = "failed" )
type TaskTimeline ¶
type TaskTimeline struct {
TaskID string `json:"task_id"`
Title string `json:"title"`
Status TaskStatus `json:"status"`
SpanID string `json:"span_id,omitempty"`
SpanName string `json:"span_name,omitempty"`
StartTime *time.Time `json:"start_time,omitempty"`
EndTime *time.Time `json:"end_time,omitempty"`
DurationMs int64 `json:"duration_ms,omitempty"`
SpanStatus SpanStatus `json:"span_status,omitempty"`
TokensIn int64 `json:"tokens_in,omitempty"`
TokensOut int64 `json:"tokens_out,omitempty"`
CostUSD float64 `json:"cost_usd,omitempty"`
Provider Provider `json:"provider,omitempty"`
}
TaskTimeline represents a task with its span timeline.
type Trace ¶
type Trace struct {
TraceID string `json:"trace_id"`
RootSpan *Span `json:"root_span,omitempty"`
Spans []*Span `json:"spans"`
SpanCount int `json:"span_count"`
DurationMs int64 `json:"duration_ms"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
}
Trace represents a complete trace with its span tree.
type TraceHierarchy ¶
type TraceHierarchy struct {
TraceID string `json:"trace_id"`
RootSpan *SpanNode `json:"root_span,omitempty"`
Spans []*SpanNode `json:"spans"` // Flat list for easy rendering
Summary *HierarchyTraceSummary `json:"summary"`
}
TraceHierarchy represents a trace with its span tree.
type TraceQuery ¶
type TraceQuery struct {
TaskID string `json:"task_id,omitempty"`
TraceID string `json:"trace_id,omitempty"`
TimeRange *TimeRange `json:"time_range,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
}
TraceQuery represents query parameters for listing traces.
type TraceSource ¶
type TraceSource string
TraceSource indicates where the trace data originated from.
const ( TraceSourceLocal TraceSource = "local" // Local OTLP receiver TraceSourceGCP TraceSource = "gcp" // Google Cloud Trace TraceSourceJaeger TraceSource = "jaeger" // Jaeger backend )
type TraceSummary ¶
type TraceSummary struct {
TraceID string `json:"trace_id"`
RootSpan string `json:"root_span"`
SpanCount int `json:"span_count"`
DurationMs int64 `json:"duration_ms"`
StartTime time.Time `json:"start_time"`
Status SpanStatus `json:"status"`
TaskID string `json:"task_id,omitempty"`
ServiceName string `json:"service_name,omitempty"` // e.g., "ailang-run", "ailang-eval", "claude-code"
Source TraceSource `json:"source,omitempty"` // Where trace came from: local, gcp, jaeger
}
TraceSummary represents a summary of a trace for list views.
type TurnGroup ¶
type TurnGroup struct {
TurnNumber int `json:"turn_number"`
SpanID string `json:"span_id"`
DurationMs int64 `json:"duration_ms"`
Cost float64 `json:"cost"`
TokensIn int64 `json:"tokens_in"`
TokensOut int64 `json:"tokens_out"`
Tools []*TurnTool `json:"tools,omitempty"`
}
TurnGroup represents a single conversation turn with its tools.
type TurnGroupSession ¶
type TurnGroupSession struct {
ID string `json:"id"`
Name string `json:"name"`
DurationMs int64 `json:"duration_ms"`
Cost float64 `json:"cost"`
TokensIn int64 `json:"tokens_in"`
TokensOut int64 `json:"tokens_out"`
Provider string `json:"provider,omitempty"`
Model string `json:"model,omitempty"`
}
TurnGroupSession represents the top-level session/executor span.
type TurnGroupStats ¶
type TurnGroupStats struct {
TotalTurns int `json:"total_turns"`
TotalTools int `json:"total_tools"`
TotalCost float64 `json:"total_cost"`
TotalTokens int64 `json:"total_tokens"`
DurationMs int64 `json:"duration_ms"`
}
TurnGroupStats contains aggregate statistics for the turn-grouped view.
type TurnGroupedHierarchy ¶
type TurnGroupedHierarchy struct {
Session *TurnGroupSession `json:"session,omitempty"`
Turns []*TurnGroup `json:"turns"`
Stats *TurnGroupStats `json:"stats"`
}
TurnGroupedHierarchy represents spans organized by conversation turns.
func GroupSpansByTurn ¶
func GroupSpansByTurn(spans []*SpanNode) *TurnGroupedHierarchy
GroupSpansByTurn transforms a span tree into a turn-based hierarchy. This is useful for displaying execution in a more intuitive way.
type TurnTool ¶
type TurnTool struct {
ID string `json:"id"`
Name string `json:"name"`
ToolName string `json:"tool_name,omitempty"` // Extracted tool name (e.g., "Read", "Bash")
DurationMs int64 `json:"duration_ms"`
Cost float64 `json:"cost,omitempty"`
Status string `json:"status"`
}
TurnTool represents a tool call within a turn.
type ValueUnion ¶
type ValueUnion struct {
StringValue *string `json:"stringValue,omitempty"`
IntValue *int64 `json:"intValue,omitempty"`
DoubleValue *float64 `json:"doubleValue,omitempty"`
BoolValue *bool `json:"boolValue,omitempty"`
}
ValueUnion represents an OTEL attribute value.
type WSEventType ¶
type WSEventType string
WSEventType represents the type of real-time WebSocket event.
const ( EventTypeSpanCreated WSEventType = "span.created" EventTypeSpanUpdated WSEventType = "span.updated" EventTypeTaskCreated WSEventType = "task.created" EventTypeTaskUpdated WSEventType = "task.updated" EventTypeTaskCompleted WSEventType = "task.completed" EventTypeMessageCreated WSEventType = "message.created" EventTypeMessageRead WSEventType = "message.read" EventTypeAgentAssigned WSEventType = "agent.assigned" EventTypeAgentCompleted WSEventType = "agent.completed" EventTypeApprovalRequested WSEventType = "approval.requested" EventTypeApprovalDecision WSEventType = "approval.decision" EventTypeMetricsUpdated WSEventType = "metrics.updated" )
type Workspace ¶
type Workspace struct {
ID string `json:"id"`
Name string `json:"name"`
Path string `json:"path"`
GitRemote string `json:"git_remote,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
Workspace represents a git repository or project root.
type WorkspaceMapping ¶
type WorkspaceMapping interface {
// BuildWorkspaceMappingSQL returns a SQL CASE statement mapping cwdColumn values to workspace IDs.
BuildWorkspaceMappingSQL(cwdColumn string) string
// GetWorkspaceLabel returns a human-friendly label for a workspace ID.
GetWorkspaceLabel(workspaceID string) string
// GetPathPatternsForWorkspace returns SQL LIKE patterns that match a workspace ID.
// Used for reverse mapping (workspace ID → path patterns) when filtering spans.
GetPathPatternsForWorkspace(workspaceID string) []string
}
WorkspaceMapping provides workspace path-to-ID and ID-to-label mappings. This interface allows the observatory package to use workspace config without importing coordinator.
type WorkspaceStats ¶
type WorkspaceStats struct {
ID string `json:"id"`
Name string `json:"name"`
Path string `json:"path"`
TaskCount int `json:"task_count"`
TotalCost float64 `json:"total_cost"`
TotalTokens int64 `json:"total_tokens"`
SuccessRate float64 `json:"success_rate"`
UniqueAgents int `json:"unique_agents"`
LastActivity time.Time `json:"last_activity,omitempty"`
}
WorkspaceStats contains aggregated metrics for a workspace.
Source Files
¶
- aggregations.go
- api.go
- api_agents.go
- api_analytics.go
- api_enrichment.go
- api_ingest.go
- api_messages.go
- api_sessions.go
- api_spans.go
- api_tasks.go
- api_traces.go
- api_workspaces.go
- backend.go
- backend_composite.go
- backend_controlplane_breakdowns.go
- backend_controlplane_events.go
- backend_controlplane_filters.go
- backend_controlplane_hierarchy.go
- backend_controlplane_metrics.go
- backend_controlplane_types.go
- backend_controlplane_workspace.go
- backend_gcp.go
- backend_jaeger.go
- backend_sqlite.go
- health.go
- hierarchy.go
- hierarchy_correlation.go
- hierarchy_turns.go
- migrate.go
- migrate_v8.go
- models.go
- models_chains.go
- normalize.go
- otlp_receiver.go
- otlp_receiver_helpers.go
- otlp_receiver_metrics.go
- outliers.go
- pricing.go
- retention.go
- seed.go
- store.go
- store_agents.go
- store_aggregates.go
- store_aggregates_hierarchy.go
- store_chains.go
- store_chains_eval.go
- store_chains_query.go
- store_chat.go
- store_messages.go
- store_metrics.go
- store_sessions.go
- store_span_events.go
- store_spans.go
- store_spans_traces.go
- store_tasks.go
- websocket.go