Documentation
¶
Overview ¶
Package server provides the HTTP server for the Collaboration Hub. ailang_bridge.go provides AILANG integration for dashboard transforms.
audit.go provides audit logging functionality for tracking important operations.
handlers_auth.go provides authentication and authorization middleware for HTTP handlers.
Package server provides HTTP handlers for the collaboration hub.
Package server provides HTTP handlers for the collaboration hub.
Package server implements the HTTP server for the Collaboration Hub dashboard.
API Namespace Reference ¶
The dashboard exposes several API namespaces:
## /api/observatory/* (Core Telemetry - 44 endpoints)
Primary telemetry data: spans, traces, tasks, sessions, messages. Source: internal/observatory/api.go
Key endpoints:
- GET /api/observatory/spans - List spans with filters
- GET /api/observatory/spans/enriched - Spans with display names from session_tools
- GET /api/observatory/traces - List traces
- GET /api/observatory/sessions - List Claude Code sessions
- GET /api/observatory/sessions/{id}/tools - Session tool usage
## /api/controlplane/* (Aggregated Analytics - 6 endpoints)
Unified stats combining Observatory + Coordinator data. Source: internal/server/handlers_controlplane.go
Key endpoints:
- GET /api/controlplane/stats - Unified statistics with filters (USE THIS)
- GET /api/controlplane/stats/breakdown - Detailed breakdown by dimension
- GET /api/controlplane/exec-hierarchy - Execution tree (Messages→Execs→Turns→Tools)
- GET /api/controlplane/heatmap - Activity heatmap
## /api/coordinator/* (Task Execution - ~8 endpoints)
Task management and approval workflow. Source: internal/server/handlers_coordinator.go
Key endpoints:
- POST /api/coordinator/approve/{id} - Approve a task
- POST /api/coordinator/reject/{id} - Reject a task
- GET /api/coordinator/events - SSE stream of task events
## Legacy Endpoints (Deprecated)
- GET /api/statistics - DEPRECATED: Use /api/controlplane/stats
- GET /api/observatory/metrics/summary - DEPRECATED: Use /api/controlplane/stats
Data Source Architecture ¶
All endpoints pull from the same underlying data stores:
┌─────────────────────────────────────────┐
│ SQLite Databases │
├─────────────────────────────────────────┤
│ ~/.ailang/state/collaboration.db │
│ ├── spans (OTEL telemetry) │
│ ├── traces, tasks, sessions │
│ └── session_tools, messages │
├─────────────────────────────────────────┤
│ ~/.ailang/state/coordinator.db │
│ └── tasks, approvals, agent_state │
└─────────────────────────────────────────┘
│
▼
Same Go store methods used by:
- API handlers (this package)
- CLI commands (cmd/ailang/dashboard.go)
- UI hooks (ui/src/features/controlplane/hooks/)
Index ¶
- func BenchmarkHookEventMarshal(b *testing.B)
- func BenchmarkHookEventUnmarshal(b *testing.B)
- func BenchmarkHookEventWithComplexStructure(b *testing.B)
- func BenchmarkHookEventWithLargeToolResponse(b *testing.B)
- func GetUserFromContext(r *http.Request) (*auth.User, error)
- func InferInboxSourceType(fromAgent, toInbox string) string
- func RequireApprover(next http.Handler) http.Handler
- func RequireAuth(next http.Handler) http.Handler
- func TestHookEventComplexJSONStructures(t *testing.T)
- func TestHookEventContextHandling(t *testing.T)
- func TestHookEventEdgeCaseToolNames(t *testing.T)
- func TestHookEventNullValues(t *testing.T)
- func TestHookEventTimestampPrecision(t *testing.T)
- func TestHookEventUnknownEventType(t *testing.T)
- func WhoAmIHandler(w http.ResponseWriter, r *http.Request)
- type AILANGBridge
- func (b *AILANGBridge) BuildHeatmapGrid(cells []HeatmapCell, totalTasks int, totalCost float64, days int) HeatmapGridResponse
- func (b *AILANGBridge) CalculateBurnRate(costs []CostRecord, windowMillis int64) float64
- func (b *AILANGBridge) CheckTaskBudget(config BudgetConfig, estimatedCost, workspaceSpend, dailySpend float64) BudgetStatus
- func (b *AILANGBridge) Close() error
- func (b *AILANGBridge) CountTurns(events []*coordinator.TaskEventRecord) int
- func (b *AILANGBridge) ForecastExhaustion(remainingBudget, burnRate float64) int
- func (b *AILANGBridge) IsEnabled() bool
- func (b *AILANGBridge) SummarizeEvents(events []*coordinator.TaskEventRecord) string
- func (b *AILANGBridge) Truncate(text string, maxLen int) string
- type AgentProcess
- type ApprovalsResponse
- type AuditLogEntry
- type AuditLogger
- func (al *AuditLogger) GetAuditLogs(ctx context.Context, workspaceID string, limit int) ([]*AuditLogEntry, error)
- func (al *AuditLogger) LogAction(ctx context.Context, user *auth.User, action, resource, resourceID string, ...) error
- func (al *AuditLogger) LogApproval(ctx context.Context, user *auth.User, taskID, action string) error
- func (al *AuditLogger) LogFailedAttempt(ctx context.Context, email, reason string) error
- func (al *AuditLogger) LogRoleChange(ctx context.Context, user *auth.User, targetUserID, oldRole, newRole string) error
- func (al *AuditLogger) LogWorkspaceAccess(ctx context.Context, user *auth.User, workspaceID string) error
- type AuthContextKey
- type BreakdownItem
- type BreakdownResponse
- type BudgetCheckRequest
- type BudgetConfig
- type BudgetStatus
- type BudgetStatusResponse
- type BudgetUsage
- type BurnRateInfo
- type ChatContextPreview
- type ClaudeHistoryHandler
- type ClaudeHookPayload
- type CoordinatorApprovalStore
- type CoordinatorRuntimeStats
- type CoordinatorStats
- type CoordinatorStatus
- type CoordinatorStore
- type CoordinatorSummary
- type CoordinatorTaskEventStore
- type CostRecord
- type DataSources
- type ErrorResponse
- type ExecEventRequest
- type ExecEventStorer
- type ExecSessionRequest
- type HeatmapCell
- type HeatmapGridCell
- type HeatmapGridResponse
- type HeatmapMonthLabel
- type HeatmapResponse
- type HookEvent
- type InboxSourceCount
- type MonitorResponse
- type MonitorSummary
- type ObservatoryStats
- type ObservedTopologyEdge
- type ObservedTopologyNode
- type ObservedTopologyResponse
- type OutliersAnalysisResponse
- type ProcessStats
- type ProviderBudget
- type ProviderUsage
- type PubSubEventSubscriber
- type Server
- func (s *Server) AddToHistory(proc ProcessStats)
- func (srv *Server) AuthMiddleware(next http.Handler) http.Handler
- func (s *Server) Close() error
- func (s *Server) GetChatContextForSpan(ctx context.Context, span *observatory.Span) *ChatContextPreview
- func (s *Server) GetExecEventStorer() ExecEventStorer
- func (s *Server) GetExternalTelemetry(pid int) *websocket.TelemetryEvent
- func (srv *Server) HealthHandler(w http.ResponseWriter, r *http.Request)
- func (srv *Server) OptionalAuthMiddleware(next http.Handler) http.Handler
- func (srv *Server) RequireWorkspaceAccessMiddleware(next http.Handler) http.Handler
- func (s *Server) SetApprovalStore(store CoordinatorApprovalStore)
- func (s *Server) SetCoordinatorStore(store CoordinatorStore)
- func (s *Server) SetCoordinatorStoreRaw(store coordinator.Store)
- func (s *Server) SetTaskEventStore(store CoordinatorTaskEventStore)
- func (s *Server) Start() error
- type ServerOption
- func WithCoordinatorStore(store CoordinatorStore) ServerOption
- func WithFirebaseAuth(projectID string) ServerOption
- func WithHookToken(token string) ServerOption
- func WithMessagingStore(store messaging.MessageStore) ServerOption
- func WithObservatoryBackend(backend observatory.Backend) ServerOption
- func WithObservatoryDB(dbPath string) ServerOption
- func WithPubSubEvents(subscriber *pubsub.Subscriber, subName string) ServerOption
- func WithResourceRegistry(registry *coordinator.ResourceTrackerRegistry) ServerOption
- func WithVersion(version string) ServerOption
- func WithWebSocketToken(token string) ServerOption
- type StatisticsResponse
- type TaskEvolutionPoint
- type TaskEvolutionResponse
- type TaskEvolutionTask
- type TaskHierarchyEdge
- type TaskHierarchyNode
- type TaskHierarchyResult
- type TaskMetrics
- type TaskSpanNode
- type TaskStreamEventRequest
- type TeeWriter
- type TelemetryParser
- type TelemetryReportRequest
- type ThreadStatistics
- type TokenDistributionBucket
- type TokenDistributionResponse
- type TopologyAgent
- type TopologyEdge
- type TopologyResponse
- type TopologySink
- type UIApproval
- type UnifiedEvent
- type UnifiedStatsResponse
- type UsageTimeSeriesPoint
- type UsageTimeSeriesResponse
- type WorkspaceAccessInfo
- type WorkspaceWithAccess
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BenchmarkHookEventMarshal ¶
BenchmarkHookEventMarshal benchmarks marshaling of hook events
func BenchmarkHookEventUnmarshal ¶
BenchmarkHookEventUnmarshal benchmarks unmarshaling of hook events
func BenchmarkHookEventWithComplexStructure ¶
BenchmarkHookEventWithComplexStructure benchmarks handling of complex nested JSON
func BenchmarkHookEventWithLargeToolResponse ¶
BenchmarkHookEventWithLargeToolResponse benchmarks handling of large tool responses
func GetUserFromContext ¶
GetUserFromContext is exported for use by other packages.
func InferInboxSourceType ¶
InferInboxSourceType derives source_type from inbox message fields. This uses the SAME taxonomy as GetBreakdownBySourceType for consistency.
Canonical source types (matching breakdown): - "github": Messages from GitHub sync/webhooks - "eval": Messages from eval suite - "coordinator": Messages from/to coordinator agents - "user_session": Messages from user/dashboard (manual sends) - "messaging": General agent-to-agent messaging - "cli": CLI-related messages (rarely used for inbox) - "other": Catch-all
func RequireApprover ¶
RequireApprover is a middleware that requires the Approver role.
func RequireAuth ¶
RequireAuth is a middleware that requires authentication.
func TestHookEventComplexJSONStructures ¶
TestHookEventComplexJSONStructures tests handling of nested JSON in tool input/output
func TestHookEventContextHandling ¶
TestHookEventContextHandling tests that context is properly handled
func TestHookEventEdgeCaseToolNames ¶
TestHookEventEdgeCaseToolNames tests edge case tool names
func TestHookEventNullValues ¶
TestHookEventNullValues tests handling of null vs empty values
func TestHookEventTimestampPrecision ¶
TestHookEventTimestampPrecision tests timestamp precision handling
func TestHookEventUnknownEventType ¶
TestHookEventUnknownEventType tests handling of unknown event types
func WhoAmIHandler ¶
func WhoAmIHandler(w http.ResponseWriter, r *http.Request)
WhoAmIHandler returns information about the current authenticated user.
Types ¶
type AILANGBridge ¶
type AILANGBridge struct {
// contains filtered or unexported fields
}
AILANGBridge provides AILANG-based event formatting as an alternative to Go. Enable with AILANG_DASHBOARD=1 environment variable.
func GetAILANGBridge ¶
func GetAILANGBridge() *AILANGBridge
GetAILANGBridge returns the singleton AILANG bridge instance.
func (*AILANGBridge) BuildHeatmapGrid ¶
func (b *AILANGBridge) BuildHeatmapGrid(cells []HeatmapCell, totalTasks int, totalCost float64, days int) HeatmapGridResponse
BuildHeatmapGrid calls the AILANG buildHeatmapGridAt function. Falls back to Go implementation on error.
func (*AILANGBridge) CalculateBurnRate ¶
func (b *AILANGBridge) CalculateBurnRate(costs []CostRecord, windowMillis int64) float64
CalculateBurnRate calls the AILANG calculateBurnRate function. Returns cost per hour based on recent spending within the time window. Falls back to Go implementation on error.
func (*AILANGBridge) CheckTaskBudget ¶
func (b *AILANGBridge) CheckTaskBudget(config BudgetConfig, estimatedCost, workspaceSpend, dailySpend float64) BudgetStatus
CheckTaskBudget calls the AILANG checkTaskBudget function with contracts. Falls back to Go implementation on error. Demonstrates: requires contracts for input validation
func (*AILANGBridge) Close ¶
func (b *AILANGBridge) Close() error
Close shuts down the AILANG engine.
func (*AILANGBridge) CountTurns ¶
func (b *AILANGBridge) CountTurns(events []*coordinator.TaskEventRecord) int
CountTurns calls the AILANG countTurns function. Falls back to Go implementation on error.
func (*AILANGBridge) ForecastExhaustion ¶
func (b *AILANGBridge) ForecastExhaustion(remainingBudget, burnRate float64) int
ForecastExhaustion calls the AILANG forecastExhaustion function. Returns estimated hours until budget exhaustion, or -1 if burn rate is zero. Falls back to Go implementation on error.
func (*AILANGBridge) IsEnabled ¶
func (b *AILANGBridge) IsEnabled() bool
IsEnabled returns true if the AILANG bridge is enabled.
func (*AILANGBridge) SummarizeEvents ¶
func (b *AILANGBridge) SummarizeEvents(events []*coordinator.TaskEventRecord) string
SummarizeEvents calls the AILANG summarizeEvents function. Falls back to Go implementation on error.
type AgentProcess ¶
type AgentProcess struct {
InstanceID string `json:"instance_id"`
PID int `json:"pid"`
StartedAt time.Time `json:"started_at"`
// contains filtered or unexported fields
}
AgentProcess tracks a running agent process
type ApprovalsResponse ¶
type ApprovalsResponse struct {
Approvals []UIApproval `json:"approvals"`
Total int `json:"total"`
Pending int `json:"pending_count"`
Approved int `json:"approved_count"`
Rejected int `json:"rejected_count"`
}
ApprovalsResponse wraps approval list with metadata for the unified endpoint
type AuditLogEntry ¶
type AuditLogEntry struct {
ID string `firestore:"id"`
UserID string `firestore:"user_id"`
UserEmail string `firestore:"user_email"`
WorkspaceID string `firestore:"workspace_id"`
Action string `firestore:"action"`
Resource string `firestore:"resource"`
ResourceID string `firestore:"resource_id"`
Status string `firestore:"status"` // "success", "failure"
Details map[string]interface{} `firestore:"details"`
Timestamp time.Time `firestore:"timestamp"`
IP string `firestore:"ip_address"`
UserAgent string `firestore:"user_agent"`
}
AuditLogEntry represents a single audit log entry.
type AuditLogger ¶
type AuditLogger struct {
// contains filtered or unexported fields
}
AuditLogger provides audit logging functionality.
func NewAuditLogger ¶
func NewAuditLogger(fs *firestore.Client) *AuditLogger
NewAuditLogger creates a new AuditLogger.
func (*AuditLogger) GetAuditLogs ¶
func (al *AuditLogger) GetAuditLogs(ctx context.Context, workspaceID string, limit int) ([]*AuditLogEntry, error)
GetAuditLogs retrieves audit logs for a user or workspace.
func (*AuditLogger) LogAction ¶
func (al *AuditLogger) LogAction(ctx context.Context, user *auth.User, action, resource, resourceID string, details map[string]interface{}) error
LogAction logs an audit entry for a user action.
func (*AuditLogger) LogApproval ¶
func (al *AuditLogger) LogApproval(ctx context.Context, user *auth.User, taskID, action string) error
LogApproval logs an approval action.
func (*AuditLogger) LogFailedAttempt ¶
func (al *AuditLogger) LogFailedAttempt(ctx context.Context, email, reason string) error
LogFailedAttempt logs a failed authentication or authorization attempt.
func (*AuditLogger) LogRoleChange ¶
func (al *AuditLogger) LogRoleChange(ctx context.Context, user *auth.User, targetUserID, oldRole, newRole string) error
LogRoleChange logs a role change for a user.
func (*AuditLogger) LogWorkspaceAccess ¶
func (al *AuditLogger) LogWorkspaceAccess(ctx context.Context, user *auth.User, workspaceID string) error
LogWorkspaceAccess logs workspace access events.
type AuthContextKey ¶
type AuthContextKey string
AuthContextKey is used to store user context in HTTP requests.
const UserContextKey AuthContextKey = "user"
const WorkspaceAccessContextKey AuthContextKey = "workspace_access"
WorkspaceAccessContextKey is used to store workspace access info in HTTP requests.
type BreakdownItem ¶
type BreakdownItem struct {
ID string `json:"id"`
Label string `json:"label"`
SpanCount int `json:"span_count"`
TaskCount int `json:"task_count,omitempty"`
TokensIn int64 `json:"tokens_in"`
TokensOut int64 `json:"tokens_out"`
CostUSD float64 `json:"cost_usd"`
DurationMs int64 `json:"duration_ms"` // Total execution time in ms
Percentage float64 `json:"percentage,omitempty"` // Percentage of total cost
// Cache metrics
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheCreationTokens int64 `json:"cache_creation_tokens"`
CacheSavingsUSD float64 `json:"cache_savings_usd"`
}
BreakdownItem represents a single item in a breakdown
type BreakdownResponse ¶
type BreakdownResponse struct {
// By provider (claude, gemini, openai)
ByProvider []BreakdownItem `json:"by_provider"`
// By source type (inferred from span name patterns)
BySourceType []BreakdownItem `json:"by_source_type"`
// By model
ByModel []BreakdownItem `json:"by_model"`
// By workspace
ByWorkspace []BreakdownItem `json:"by_workspace"`
// Totals for percentage calculations
TotalCost float64 `json:"total_cost"`
}
BreakdownResponse contains hierarchical breakdown data
type BudgetCheckRequest ¶
type BudgetCheckRequest struct {
EstimatedCost float64 `json:"estimatedCost"`
Provider string `json:"provider,omitempty"` // Optional: specific provider to check against
}
BudgetCheckRequest is the request body for checking task budget
type BudgetConfig ¶
type BudgetConfig struct {
WorkspaceBudget float64 `json:"workspaceBudget"`
DailyBudget float64 `json:"dailyBudget"`
TaskMaxCost float64 `json:"taskMaxCost"`
WarningThreshold float64 `json:"warningThreshold"`
ProviderBudgets map[string]*ProviderBudget `json:"providerBudgets,omitempty"` // Per-provider overrides
}
BudgetConfig represents budget configuration for task execution.
func DefaultBudgetConfig ¶
func DefaultBudgetConfig() BudgetConfig
DefaultBudgetConfig returns a sensible default budget configuration
func LoadBudgetConfig ¶
func LoadBudgetConfig() BudgetConfig
LoadBudgetConfig loads budget configuration from ~/.ailang/config.yaml and converts it to the server's BudgetConfig format.
func (*BudgetConfig) GetProviderBudget ¶
func (c *BudgetConfig) GetProviderBudget(provider string) *ProviderBudget
GetProviderBudget returns the budget for a specific provider, falling back to global limits.
type BudgetStatus ¶
type BudgetStatus struct {
Allowed bool `json:"allowed"`
RemainingWorkspace float64 `json:"remainingWorkspace"`
RemainingDaily float64 `json:"remainingDaily"`
WarningLevel string `json:"warningLevel"`
Message string `json:"message"`
}
BudgetStatus represents the result of a budget check.
type BudgetStatusResponse ¶
type BudgetStatusResponse struct {
Config BudgetConfig `json:"config"`
Status BudgetStatus `json:"status"`
Usage BudgetUsage `json:"usage"`
BurnRate BurnRateInfo `json:"burnRate"` // Burn rate and forecast
ByProvider map[string]*ProviderUsage `json:"byProvider,omitempty"` // Per-provider breakdown
UsingAILANG bool `json:"usingAilang"` // True if AILANG bridge is active
}
BudgetStatusResponse provides budget status for the dashboard
type BudgetUsage ¶
type BudgetUsage struct {
WorkspaceSpend float64 `json:"workspaceSpend"`
DailySpend float64 `json:"dailySpend"`
UsagePercent float64 `json:"usagePercent"`
}
BudgetUsage represents current budget usage
type BurnRateInfo ¶
type BurnRateInfo struct {
CostPerHour float64 `json:"costPerHour"` // $/hour based on recent activity
HoursUntilExhaustion int `json:"hoursUntilExhaustion"` // -1 if no burn rate
WindowHours int `json:"windowHours"` // Time window used for calculation
}
BurnRateInfo represents burn rate and exhaustion forecast
type ChatContextPreview ¶
type ChatContextPreview struct {
UserPrompt string `json:"user_prompt,omitempty"` // First 500 chars of user prompt
AssistantResponse string `json:"assistant_response,omitempty"` // First 500 chars of response
HasThinking bool `json:"has_thinking"`
TurnNumber int `json:"turn_number"`
FullChatURL string `json:"full_chat_url"` // Link to full conversation
}
ChatContextPreview represents a preview of chat context for embedding in span responses
type ClaudeHistoryHandler ¶
type ClaudeHistoryHandler struct {
// contains filtered or unexported fields
}
ClaudeHistoryHandler provides HTTP handlers for Claude Code conversation history.
func NewClaudeHistoryHandler ¶
func NewClaudeHistoryHandler() *ClaudeHistoryHandler
NewClaudeHistoryHandler creates a new handler with a claudehistory reader.
type ClaudeHookPayload ¶
type ClaudeHookPayload struct {
// Common fields (always present in all hook events)
SessionID string `json:"session_id"`
TranscriptPath string `json:"transcript_path"`
Cwd string `json:"cwd"`
PermissionMode string `json:"permission_mode"`
HookEventName string `json:"hook_event_name"`
// Agent fields (present in --agent mode or inside subagents)
AgentID string `json:"agent_id,omitempty"`
AgentType string `json:"agent_type,omitempty"`
// Tool fields (PreToolUse, PostToolUse, PostToolUseFailure)
ToolName string `json:"tool_name,omitempty"`
ToolInput json.RawMessage `json:"tool_input,omitempty"`
ToolResponse json.RawMessage `json:"tool_response,omitempty"`
ToolUseID string `json:"tool_use_id,omitempty"`
// SessionStart fields
Source string `json:"source,omitempty"`
Model string `json:"model,omitempty"`
// AILANG correlation IDs (extracted from HTTP headers, not JSON body)
TaskID string `json:"-"`
ChainID string `json:"-"`
StageID string `json:"-"`
MessageID string `json:"-"`
}
ClaudeHookPayload matches Claude Code's native hook JSON schema. HTTP hooks POST the full JSON — we store it without field subsetting.
type CoordinatorApprovalStore ¶
type CoordinatorApprovalStore interface {
GetApprovalRequest(ctx context.Context, id string) (*coordinator.ApprovalRequestRecord, error)
ListPendingApprovals(ctx context.Context) ([]*coordinator.ApprovalRequestRecord, error)
ListResolvedApprovals(ctx context.Context, limit int) ([]*coordinator.ApprovalRequestRecord, error)
ResolveApprovalRequest(ctx context.Context, id string, status string, resolvedBy string) error
}
CoordinatorApprovalStore provides approval request operations
type CoordinatorRuntimeStats ¶
type CoordinatorRuntimeStats struct {
Running bool `json:"running"`
CompletedTasks int `json:"completed_tasks"`
PendingTasks int `json:"pending_tasks"`
RunningTasks int `json:"running_tasks"`
FailedTasks int `json:"failed_tasks"`
PendingApprovals int `json:"pending_approvals"`
ActiveAgents int `json:"active_agents"`
TotalCost float64 `json:"total_cost"` // Coordinator-tracked subset
TotalTokens int `json:"total_tokens"` // Coordinator-tracked subset
}
CoordinatorRuntimeStats holds live coordinator state
type CoordinatorStats ¶
type CoordinatorStats struct {
Running bool
PID int
Uptime string
TasksRun int
PendingTasks int
RunningTasks int
FailedTasks int
TotalCost float64
TotalTokens int
}
CoordinatorStats holds coordinator daemon statistics
type CoordinatorStatus ¶
type CoordinatorStatus struct {
Running bool `json:"running"`
PID int `json:"pid,omitempty"`
Uptime string `json:"uptime,omitempty"`
TasksRun int `json:"tasks_run"`
PendingTasks int `json:"pending_tasks"`
RunningTasks int `json:"running_tasks"`
FailedTasks int `json:"failed_tasks"`
TotalCost float64 `json:"total_cost"`
TotalTokens int `json:"total_tokens"`
ActiveTasks []*TaskMetrics `json:"active_tasks,omitempty"`
}
CoordinatorStatus represents the coordinator daemon status
type CoordinatorStore ¶
type CoordinatorStore interface {
GetCoordinatorStats() (*CoordinatorStats, error)
GetCostByProvider() (map[string]float64, error)
}
CoordinatorStore provides coordinator statistics
type CoordinatorSummary ¶
type CoordinatorSummary struct {
TotalTasks int `json:"total_tasks"`
PendingTasks int `json:"pending_tasks"`
RunningTasks int `json:"running_tasks"`
CompletedTasks int `json:"completed_tasks"`
FailedTasks int `json:"failed_tasks"`
ByProvider map[string]int `json:"by_provider,omitempty"`
ByWorkspace map[string]int `json:"by_workspace,omitempty"`
TotalCost float64 `json:"total_cost"`
TotalTokens int `json:"total_tokens"`
ActiveAgents int `json:"active_agents"` // Count of agents with running tasks
PendingApprovals int `json:"pending_approvals"` // Tasks awaiting human approval
SuccessRate float64 `json:"success_rate"` // Completed / (Completed + Failed)
}
CoordinatorSummary provides coordinator task statistics
type CoordinatorTaskEventStore ¶
type CoordinatorTaskEventStore interface {
GetTaskEvents(ctx context.Context, taskID string, limit int) ([]*coordinator.TaskEventRecord, error)
ListTasks(ctx context.Context, filter *coordinator.TaskFilter) ([]*coordinator.TaskRecord, error)
GetTask(ctx context.Context, id string) (*coordinator.TaskRecord, error)
}
CoordinatorTaskEventStore provides task event operations for historical replay
type CostRecord ¶
type CostRecord struct {
Timestamp int64 `json:"timestamp"` // Unix milliseconds
Cost float64 `json:"cost"`
}
CostRecord represents a historical cost entry for burn rate calculation.
type DataSources ¶
type DataSources struct {
ObservatoryDB string `json:"observatory_db"` // Path to observatory.db
CoordinatorDB string `json:"coordinator_db"` // Path to coordinator.db
ObservatoryOK bool `json:"observatory_ok"` // Whether observatory is available
CoordinatorOK bool `json:"coordinator_ok"` // Whether coordinator is available
}
DataSources documents where each metric comes from
type ErrorResponse ¶
type ErrorResponse struct {
Error string `json:"error"`
Status int `json:"status"`
Message string `json:"message,omitempty"`
}
ErrorResponse represents a JSON error response.
type ExecEventRequest ¶
type ExecEventRequest struct {
SessionID string `json:"session_id"`
StreamType string `json:"stream_type"` // text, tool_use, tool_result, turn_start, turn_end, error
TurnNum int `json:"turn_num,omitempty"`
Text string `json:"text,omitempty"`
ToolName string `json:"tool_name,omitempty"`
ToolInput string `json:"tool_input,omitempty"`
ToolOutput string `json:"tool_output,omitempty"`
ErrorMsg string `json:"error_msg,omitempty"`
}
ExecEventRequest represents a request to store an exec event
type ExecEventStorer ¶
type ExecEventStorer interface {
StoreTaskEvent(ctx context.Context, event *coordinator.TaskEventRecord) error
}
ExecEventStorer is a helper interface for storing exec events Can be used by the Claude Code hooks endpoint
type ExecSessionRequest ¶
type ExecSessionRequest struct {
SessionID string `json:"session_id"`
Workspace string `json:"workspace,omitempty"`
Provider string `json:"provider,omitempty"`
}
ExecSessionRequest represents a request to create an exec session
type HeatmapCell ¶
type HeatmapCell struct {
Date string `json:"date"` // YYYY-MM-DD
TaskCount int `json:"taskCount"`
Cost float64 `json:"cost"`
SuccessRate float64 `json:"successRate"` // 0.0 to 1.0
}
HeatmapCell represents a single day's activity data
type HeatmapGridCell ¶
type HeatmapGridCell struct {
Date string `json:"date"`
TaskCount int `json:"count"`
Cost float64 `json:"cost"`
SuccessRate float64 `json:"successRate"`
Intensity float64 `json:"intensity"` // 0.0-1.0 for coloring
DayOfWeek int `json:"dayOfWeek"` // 0=Sunday, 6=Saturday
}
HeatmapGridCell is a cell in the grid format response
type HeatmapGridResponse ¶
type HeatmapGridResponse struct {
Weeks [][]HeatmapGridCell `json:"weeks"` // weeks[weekIndex][dayIndex]
MonthLabels []HeatmapMonthLabel `json:"monthLabels"` // month markers
Totals struct {
Tasks int `json:"tasks"`
Cost float64 `json:"cost"`
} `json:"totals"`
DateRange struct {
Start string `json:"start"`
End string `json:"end"`
} `json:"dateRange"`
}
HeatmapGridResponse is the grid format response for heatmap
type HeatmapMonthLabel ¶
type HeatmapMonthLabel struct {
Name string `json:"name"` // "Jan", "Feb", etc.
WeekIndex int `json:"weekIndex"` // 0-based week column index
}
HeatmapMonthLabel is a month label for the grid header
type HeatmapResponse ¶
type HeatmapResponse struct {
Cells []HeatmapCell `json:"cells"`
Totals struct {
Tasks int `json:"tasks"`
Cost float64 `json:"cost"`
} `json:"totals"`
}
HeatmapResponse is the response for GET /api/controlplane/heatmap
type HookEvent ¶
type HookEvent struct {
Event string `json:"event"`
SessionID string `json:"session_id"`
Workspace string `json:"workspace,omitempty"`
ClaudeVersion string `json:"claude_version,omitempty"`
ToolName string `json:"tool_name,omitempty"`
ToolUseID string `json:"tool_use_id,omitempty"`
ToolInput json.RawMessage `json:"tool_input,omitempty"`
ToolResponse json.RawMessage `json:"tool_response,omitempty"`
Timestamp time.Time `json:"timestamp"`
// Correlation IDs from environment (M-CHAINS-SIMPLIFY)
// These are set when coordinator spawns Claude Code with env vars
TaskID string `json:"task_id,omitempty"`
ChainID string `json:"chain_id,omitempty"`
StageID string `json:"stage_id,omitempty"`
MessageID string `json:"message_id,omitempty"`
}
HookEvent represents a Claude Code hook event from the telemetry script. These events are sent by ~/.claude/hooks/claude_telemetry.sh.
type InboxSourceCount ¶
InboxSourceCount holds message count for a source type
type MonitorResponse ¶
type MonitorResponse struct {
Timestamp time.Time `json:"timestamp"`
Processes []ProcessStats `json:"processes"`
History []ProcessStats `json:"history,omitempty"` // Recently completed/failed processes
Summary MonitorSummary `json:"summary"`
}
MonitorResponse is the API response for /api/monitor
type MonitorSummary ¶
type MonitorSummary struct {
TotalProcesses int `json:"total_processes"`
TotalCPU float64 `json:"total_cpu_percent"`
TotalMemoryMB float64 `json:"total_memory_mb"`
TotalCost float64 `json:"total_cost"`
WarningCount int `json:"warning_count"` // Processes exceeding thresholds
}
MonitorSummary provides aggregate stats
type ObservatoryStats ¶
type ObservatoryStats struct {
TotalSpans int `json:"total_spans"`
TotalTasks int `json:"total_tasks"`
TotalWorkspaces int `json:"total_workspaces"`
TotalAgents int `json:"total_agents"`
TotalTokensIn int64 `json:"total_tokens_in"`
TotalTokensOut int64 `json:"total_tokens_out"`
TotalCostUSD float64 `json:"total_cost_usd"`
SuccessRate float64 `json:"success_rate"`
// Cache metrics (from spans)
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
TotalCacheCreationTokens int64 `json:"total_cache_creation_tokens"`
CacheSavingsUSD float64 `json:"cache_savings_usd"`
// Lines of Code metrics (from metrics table)
LinesAdded int64 `json:"lines_added"`
LinesRemoved int64 `json:"lines_removed"`
// Activity metrics (from metrics table)
CommitCount int64 `json:"commit_count"`
PullRequestCount int64 `json:"pull_request_count"`
ActiveTimeMs int64 `json:"active_time_ms"`
// Session metrics
TurnCount int `json:"turn_count"`
ToolCalls int `json:"tool_calls"`
ErrorCount int `json:"error_count"`
}
ObservatoryStats holds metrics from the Observatory database
type ObservedTopologyEdge ¶
type ObservedTopologyEdge struct {
Source string `json:"source"`
Target string `json:"target"`
MessageCount int `json:"message_count"`
LastActivity string `json:"last_activity,omitempty"`
Active bool `json:"active"`
}
ObservedTopologyEdge represents an edge derived from actual message flows
type ObservedTopologyNode ¶
type ObservedTopologyNode struct {
ID string `json:"id"`
Label string `json:"label"`
NodeType string `json:"node_type"` // agent, source, sink
MessagesSent int `json:"messages_sent"`
MessagesRecv int `json:"messages_recv"`
LastActivity string `json:"last_activity,omitempty"`
}
ObservedTopologyNode represents a node in the observed topology
type ObservedTopologyResponse ¶
type ObservedTopologyResponse struct {
Nodes []ObservedTopologyNode `json:"nodes"`
Edges []ObservedTopologyEdge `json:"edges"`
IsEmpty bool `json:"is_empty"`
}
ObservedTopologyResponse is the response for GET /api/controlplane/topology/observed
type OutliersAnalysisResponse ¶
type OutliersAnalysisResponse struct {
TaskID string `json:"task_id"`
TaskTitle string `json:"task_title"`
SpanCount int `json:"span_count"`
Threshold float64 `json:"threshold"`
Stats []*observatory.TaskMetricStats `json:"stats"`
Outliers []*observatory.SpanOutlier `json:"outliers"`
RateOfChange *observatory.RateAnalysis `json:"rate_of_change,omitempty"`
CliCommand string `json:"cli_command"`
AnalyzedAt string `json:"analyzed_at"`
}
OutliersAnalysisResponse is the response for GET /api/controlplane/outliers
type ProcessStats ¶
type ProcessStats struct {
InstanceID string `json:"instance_id"`
PID int `json:"pid"`
StartedAt time.Time `json:"started_at"`
DurationSec int `json:"duration_sec"`
CPUPercent float64 `json:"cpu_percent"`
MemoryMB float64 `json:"memory_mb"`
Status string `json:"status"` // running, completed, failed
// Source information - where the process was started from
Source string `json:"source"` // "ui", "eval", "cli", "agent"
Command string `json:"command,omitempty"` // Short command description
FullCmd string `json:"full_cmd,omitempty"` // Full command line (for debugging)
StoppedAt *time.Time `json:"stopped_at,omitempty"` // When the process stopped
// Telemetry from Claude sessions (populated when available)
Turns int `json:"turns,omitempty"`
TokensIn int `json:"tokens_in,omitempty"`
TokensOut int `json:"tokens_out,omitempty"`
Cost float64 `json:"cost,omitempty"`
}
ProcessStats contains runtime statistics for a monitored process
type ProviderBudget ¶
type ProviderBudget struct {
DailyBudget float64 `json:"dailyBudget" yaml:"daily_budget"` // Per-provider daily limit
TaskMaxCost float64 `json:"taskMaxCost" yaml:"task_max_cost"` // Per-provider task limit
HardLimit bool `json:"hardLimit" yaml:"hard_limit"` // Block if exceeded (vs warn only)
WarningThreshold float64 `json:"warningThreshold" yaml:"warning_threshold"` // Override global threshold
}
ProviderBudget defines budget limits for a specific AI provider.
type ProviderUsage ¶
type ProviderUsage struct {
Spend float64 `json:"spend"`
Budget float64 `json:"budget"`
UsagePercent float64 `json:"usagePercent"`
WarningLevel string `json:"warningLevel"`
HardLimit bool `json:"hardLimit"`
}
ProviderUsage represents budget usage for a specific provider
type PubSubEventSubscriber ¶
type PubSubEventSubscriber struct {
// contains filtered or unexported fields
}
PubSubEventSubscriber pulls real-time task stream events from the ailang-events-dashboard Pub/Sub subscription and broadcasts them to connected WebSocket clients. Only active in cloud mode.
func NewPubSubEventSubscriber ¶
func NewPubSubEventSubscriber(subscriber *pubsub.Subscriber, wsServer *websocket.Server, subName string, logger *log.Logger) *PubSubEventSubscriber
NewPubSubEventSubscriber creates a subscriber that bridges Pub/Sub events to WebSocket broadcasts. The subName should include the topic prefix (e.g., "ailang-dev-events-dashboard").
func (*PubSubEventSubscriber) Start ¶
func (s *PubSubEventSubscriber) Start(ctx context.Context)
Start begins pulling events from Pub/Sub. Blocks until ctx is cancelled or an unrecoverable error occurs. Should be called in a goroutine.
func (*PubSubEventSubscriber) Stop ¶
func (s *PubSubEventSubscriber) Stop()
Stop cancels the subscription pull loop.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server represents the HTTP server for the collaboration hub
func NewServer ¶
func NewServer(dbPath string, httpAddr string, opts ...ServerOption) (*Server, error)
NewServer creates a new HTTP server. If a messaging store was set via WithMessagingStore, dbPath is only used for display.
func (*Server) AddToHistory ¶
func (s *Server) AddToHistory(proc ProcessStats)
AddToHistory adds a completed/failed process to history
func (*Server) AuthMiddleware ¶
AuthMiddleware verifies Firebase JWT tokens and loads user information.
func (*Server) GetChatContextForSpan ¶
func (s *Server) GetChatContextForSpan(ctx context.Context, span *observatory.Span) *ChatContextPreview
GetChatContextForSpan retrieves chat context preview for a span from the database. This is used to enrich span responses with conversation context.
func (*Server) GetExecEventStorer ¶
func (s *Server) GetExecEventStorer() ExecEventStorer
GetExecEventStorer returns the event storer for external use
func (*Server) GetExternalTelemetry ¶
func (s *Server) GetExternalTelemetry(pid int) *websocket.TelemetryEvent
GetExternalTelemetry returns stored telemetry for a PID (used by monitor)
func (*Server) HealthHandler ¶
func (srv *Server) HealthHandler(w http.ResponseWriter, r *http.Request)
HealthHandler returns a simple health check response.
func (*Server) OptionalAuthMiddleware ¶
OptionalAuthMiddleware extracts user info if a token is present but doesn't reject unauthenticated requests. Use this for endpoints that work for both authenticated and unauthenticated users.
func (*Server) RequireWorkspaceAccessMiddleware ¶
RequireWorkspaceAccess is a middleware that checks if the user has access to the specified workspace. For authenticated users, checks against Firestore permissions. For unauthenticated users, only allows access to public workspaces.
func (*Server) SetApprovalStore ¶
func (s *Server) SetApprovalStore(store CoordinatorApprovalStore)
SetApprovalStore sets the coordinator approval store
func (*Server) SetCoordinatorStore ¶
func (s *Server) SetCoordinatorStore(store CoordinatorStore)
SetCoordinatorStore sets the coordinator store for statistics
func (*Server) SetCoordinatorStoreRaw ¶
func (s *Server) SetCoordinatorStoreRaw(store coordinator.Store)
SetCoordinatorStoreRaw sets the raw coordinator store for Control Plane queries
func (*Server) SetTaskEventStore ¶
func (s *Server) SetTaskEventStore(store CoordinatorTaskEventStore)
SetTaskEventStore sets the coordinator task event store
type ServerOption ¶
type ServerOption func(*Server)
ServerOption configures the server
func WithCoordinatorStore ¶
func WithCoordinatorStore(store CoordinatorStore) ServerOption
WithCoordinatorStore sets the coordinator store for task statistics
func WithFirebaseAuth ¶
func WithFirebaseAuth(projectID string) ServerOption
WithFirebaseAuth initializes Firebase authentication and Firestore-based access control. The projectID should match your Firebase/GCP project (e.g., "ailang-dev"). Requires GOOGLE_APPLICATION_CREDENTIALS environment variable to be set with service account key.
If initialization fails, the server will continue without authentication (all routes public). Check logs for "Firebase Auth initialized" to confirm successful setup.
func WithHookToken ¶
func WithHookToken(token string) ServerOption
WithHookToken sets a bearer token for authenticating hook requests. When set, requests to /api/hooks/* must include Authorization: Bearer <token>. When empty (default), no authentication is required (local mode).
func WithMessagingStore ¶
func WithMessagingStore(store messaging.MessageStore) ServerOption
WithMessagingStore sets a pre-created messaging store, skipping local SQLite open. Use this when running with cloud backends (AILANG_STORAGE=gcp).
func WithObservatoryBackend ¶
func WithObservatoryBackend(backend observatory.Backend) ServerOption
WithObservatoryBackend sets a pre-created observatory backend. Use this instead of WithObservatoryDB when running with cloud backends.
func WithObservatoryDB ¶
func WithObservatoryDB(dbPath string) ServerOption
WithObservatoryDB sets up the observatory backend with SQLite at the given path. If GCP project is configured (via GOOGLE_CLOUD_PROJECT or OTLP_GOOGLE_CLOUD_PROJECT), it also adds a GCP Trace remote backend for federated trace queries.
func WithPubSubEvents ¶
func WithPubSubEvents(subscriber *pubsub.Subscriber, subName string) ServerOption
WithPubSubEvents enables Pub/Sub event streaming for cloud mode. The subscriber and subName are stored; the actual PubSubEventSubscriber is created in NewServer after the WebSocket server is initialized.
func WithResourceRegistry ¶
func WithResourceRegistry(registry *coordinator.ResourceTrackerRegistry) ServerOption
WithResourceRegistry sets the resource tracker registry for coordinator metrics
func WithVersion ¶
func WithVersion(version string) ServerOption
WithVersion sets the AILANG version displayed in the UI
func WithWebSocketToken ¶
func WithWebSocketToken(token string) ServerOption
WithWebSocketToken sets a token for authenticating external WebSocket connections. When set, external clients must connect with ?token=<value> query parameter. Same-origin browser connections (embedded React UI) are exempt. When empty (default), no authentication is required (local mode).
type StatisticsResponse ¶
type StatisticsResponse struct {
Threads ThreadStatistics `json:"threads"`
Coordinator *CoordinatorSummary `json:"coordinator,omitempty"`
}
StatisticsResponse represents aggregate statistics for the dashboard
type TaskEvolutionPoint ¶
type TaskEvolutionPoint struct {
X int `json:"x"` // Normalized index (0 = start)
Timestamp string `json:"timestamp"` // ISO8601
Cost float64 `json:"cost"` // Cumulative cost
Tokens int64 `json:"tokens"` // Cumulative tokens (in + out)
TokensIn int64 `json:"tokens_in"` // Cumulative input tokens
TokensOut int64 `json:"tokens_out"` // Cumulative output tokens
Turns int `json:"turns"` // Cumulative turn count
Spans int `json:"spans"` // Cumulative span count
DeltaCost float64 `json:"delta_cost"` // Cost change since last point
DeltaSpans int `json:"delta_spans"` // Span count change since last point
DurationMs int64 `json:"duration_ms"` // Cumulative execution time in ms
DeltaDurationMs int64 `json:"delta_duration_ms"` // Duration of this span in ms
ElapsedMs int64 `json:"elapsed_ms"` // Wall clock time since task start in ms
}
TaskEvolutionPoint represents a single point in time for task metrics
type TaskEvolutionResponse ¶
type TaskEvolutionResponse struct {
Tasks []TaskEvolutionTask `json:"tasks"`
CliCommand string `json:"cli_command"`
}
TaskEvolutionResponse is the response for GET /api/controlplane/task-evolution
type TaskEvolutionTask ¶
type TaskEvolutionTask struct {
TaskID string `json:"task_id"`
Title string `json:"title,omitempty"`
Provider string `json:"provider,omitempty"`
Model string `json:"model,omitempty"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time,omitempty"`
Status string `json:"status,omitempty"`
Points []TaskEvolutionPoint `json:"points"`
}
TaskEvolutionTask represents a single task's evolution data
type TaskHierarchyEdge ¶
type TaskHierarchyEdge struct {
Source string `json:"source"`
Target string `json:"target"`
Type string `json:"type"` // "handoff" (parent_task_id) or "session" (shared session_id)
}
TaskHierarchyEdge represents a relationship between tasks.
type TaskHierarchyNode ¶
type TaskHierarchyNode struct {
ID string `json:"id"`
Title string `json:"title"`
AgentID string `json:"agent_id,omitempty"`
ParentTaskID string `json:"parent_task_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
Status string `json:"status"`
ApprovalStatus string `json:"approval_status,omitempty"` // "pending", "approved", "rejected", ""
ApprovalType string `json:"approval_type,omitempty"` // "merge", "merge_handoff", etc.
Iteration int `json:"iteration,omitempty"`
Cost float64 `json:"cost"`
TokensIn int `json:"tokens_in"`
TokensOut int `json:"tokens_out"`
Turns int `json:"turns,omitempty"`
DurationMs int64 `json:"duration_ms"`
CreatedAt time.Time `json:"created_at"`
Provider string `json:"provider,omitempty"`
Workspace string `json:"workspace,omitempty"`
Children []*TaskHierarchyNode `json:"children,omitempty"` // Child tasks (via parent_task_id)
// Execution spans nested within this task (from observatory.db)
Spans []*TaskSpanNode `json:"spans,omitempty"`
// Turn-grouped hierarchy (when group_by=turns is requested)
TurnGrouped *observatory.TurnGroupedHierarchy `json:"turn_grouped,omitempty"`
}
TaskHierarchyNode represents a task with its relationships for cross-task visualization.
type TaskHierarchyResult ¶
type TaskHierarchyResult struct {
Tasks []*TaskHierarchyNode `json:"tasks"`
Edges []TaskHierarchyEdge `json:"edges"`
Stats struct {
TotalTasks int `json:"total_tasks"`
TotalSpans int `json:"total_spans"`
PendingApprovals int `json:"pending_approvals"`
TotalCost float64 `json:"total_cost"`
} `json:"stats"`
}
TaskHierarchyResult contains the full cross-task hierarchy.
type TaskMetrics ¶
type TaskMetrics struct {
TaskID string `json:"task_id"`
ThreadID string `json:"thread_id,omitempty"`
Status string `json:"status"`
CPUPercent float64 `json:"cpu_percent"`
MemoryMB float64 `json:"memory_mb"`
TokensIn int `json:"tokens_in"`
TokensOut int `json:"tokens_out"`
Cost float64 `json:"cost"`
DurationSec int `json:"duration_sec"`
PeakCPU float64 `json:"peak_cpu"`
PeakMemory float64 `json:"peak_memory_mb"`
TurnNum int `json:"turn_num,omitempty"`
LastEvent string `json:"last_event,omitempty"`
}
TaskMetrics represents live metrics for a running task
type TaskSpanNode ¶
type TaskSpanNode struct {
ID string `json:"id"`
Name string `json:"name"`
NodeType string `json:"node_type"` // coordinator, executor, turn, tool, other
DurationMs int64 `json:"duration_ms"`
TokensIn int64 `json:"tokens_in,omitempty"`
TokensOut int64 `json:"tokens_out,omitempty"`
CostUSD float64 `json:"cost_usd,omitempty"`
TurnNumber int `json:"turn_number,omitempty"`
ToolName string `json:"tool_name,omitempty"`
Status string `json:"status"`
Children []*TaskSpanNode `json:"children,omitempty"`
}
TaskSpanNode represents a span within a task for the unified task hierarchy view. Simplified version of SpanHierarchyNode for API response.
type TaskStreamEventRequest ¶
type TaskStreamEventRequest struct {
TaskID string `json:"task_id"`
ThreadID string `json:"thread_id,omitempty"`
StreamType string `json:"stream_type"`
TurnNum int `json:"turn_num,omitempty"`
Text string `json:"text,omitempty"`
ToolName string `json:"tool_name,omitempty"`
ToolInput string `json:"tool_input,omitempty"`
ToolOutput string `json:"tool_output,omitempty"`
Status string `json:"status,omitempty"`
TokensIn int `json:"tokens_in,omitempty"`
TokensOut int `json:"tokens_out,omitempty"`
Cost float64 `json:"cost,omitempty"`
DurationSec int `json:"duration_sec,omitempty"`
ErrorMsg string `json:"error_msg,omitempty"`
}
TaskStreamEventRequest is the JSON body for task stream events
type TeeWriter ¶
type TeeWriter struct {
// contains filtered or unexported fields
}
TeeWriter wraps an io.Writer and also writes to a TelemetryParser
func NewTeeWriter ¶
func NewTeeWriter(original io.Writer, parser *TelemetryParser) *TeeWriter
NewTeeWriter creates a writer that writes to both original and telemetry parser
type TelemetryParser ¶
type TelemetryParser struct {
// contains filtered or unexported fields
}
TelemetryParser captures stdout from a Claude process and broadcasts telemetry updates
func NewTelemetryParser ¶
func NewTelemetryParser(instanceID string, pid int, wsServer *websocket.Server) *TelemetryParser
NewTelemetryParser creates a new telemetry parser for a process
func (*TelemetryParser) GetTelemetry ¶
func (t *TelemetryParser) GetTelemetry() (turns, tokensIn, tokensOut int, cost float64, status string)
GetTelemetry returns current accumulated telemetry
func (*TelemetryParser) MarkComplete ¶
func (t *TelemetryParser) MarkComplete(exitErr error)
MarkComplete marks the process as complete with final status
func (*TelemetryParser) PipeReader ¶
func (t *TelemetryParser) PipeReader(r io.Reader) io.Reader
PipeReader creates a pipe that captures output for telemetry while still allowing normal reading
type TelemetryReportRequest ¶
type TelemetryReportRequest struct {
PID int `json:"pid"` // Process ID (required)
InstanceID string `json:"instance_id"` // Optional friendly name
Turns int `json:"turns"`
TokensIn int `json:"tokens_in"`
TokensOut int `json:"tokens_out"`
Cost float64 `json:"cost"`
Status string `json:"status"` // running, completed, error
}
TelemetryReportRequest is the JSON body for POST /api/telemetry
type ThreadStatistics ¶
type ThreadStatistics struct {
Total int `json:"total"`
ByStatus map[string]int `json:"by_status"`
ByWorkspace map[string]int `json:"by_workspace"`
}
ThreadStatistics provides thread-level statistics
type TokenDistributionBucket ¶
type TokenDistributionBucket struct {
Label string `json:"label"` // e.g., "0-1K", "1K-5K"
Min int64 `json:"min"` // Minimum tokens (inclusive)
Max int64 `json:"max"` // Maximum tokens (exclusive, -1 for no limit)
TaskCount int `json:"task_count"` // Number of tasks in bucket
SpanCount int `json:"span_count"` // Number of spans in bucket
TotalCost float64 `json:"total_cost"` // Total cost in bucket
}
TokenDistributionBucket represents a bucket in the token histogram
type TokenDistributionResponse ¶
type TokenDistributionResponse struct {
Buckets []TokenDistributionBucket `json:"buckets"`
TotalTasks int `json:"total_tasks"`
TotalCost float64 `json:"total_cost"`
CliCommand string `json:"cli_command"`
}
TokenDistributionResponse is the response for GET /api/controlplane/token-distribution
type TopologyAgent ¶
type TopologyAgent struct {
ID string `json:"id"`
Label string `json:"label"`
Status string `json:"status"` // idle, busy, blocked, error
TrustScore int `json:"trustScore"`
TaskCount int `json:"taskCount"`
Cost float64 `json:"cost"`
}
TopologyAgent represents an agent in the topology graph
type TopologyEdge ¶
type TopologyEdge struct {
Source string `json:"source"`
Target string `json:"target"`
MessageCount int `json:"messageCount"`
LastActivity string `json:"lastActivity,omitempty"`
}
TopologyEdge represents a connection between agents
type TopologyResponse ¶
type TopologyResponse struct {
Agents []TopologyAgent `json:"agents"`
Edges []TopologyEdge `json:"edges"`
Sinks []TopologySink `json:"sinks"`
}
TopologyResponse is the response for GET /api/controlplane/topology
type TopologySink ¶
type TopologySink struct {
ID string `json:"id"`
Label string `json:"label,omitempty"`
PendingCount int `json:"pendingCount,omitempty"`
}
TopologySink represents a terminal node (approval, main branch)
type UIApproval ¶
type UIApproval struct {
ID string `json:"id"`
ThreadID string `json:"thread_id"` // Maps to task_id
ThreadTitle string `json:"thread_title"` // Task title
InstanceID string `json:"instance_id"` // Agent ID
CreatedAt int64 `json:"created_at"` // Unix timestamp
EffectDeltaJSON string `json:"effect_delta_json"` // Empty for coordinator approvals
Proposal string `json:"proposal"` // Description
Impact string `json:"impact"` // low/medium/high
EstimatedCost float64 `json:"estimated_cost"` // From task if available
Status string `json:"status"` // pending/approved/rejected
ReviewedBy string `json:"reviewed_by,omitempty"`
ReviewedAt *int64 `json:"reviewed_at,omitempty"`
ReviewNotes string `json:"review_notes,omitempty"`
// Multi-channel fields
RequestType string `json:"request_type,omitempty"` // merge/handoff
TaskID string `json:"task_id,omitempty"` // Direct task reference
WorktreePath string `json:"worktree_path,omitempty"`
BranchName string `json:"branch_name,omitempty"`
Workspace string `json:"workspace,omitempty"` // Source workspace (e.g., "/Users/mark/dev/sunholo/stapledons_voyage")
Summary string `json:"summary,omitempty"` // Short summary for display
// Display info for consistent rendering
StatusDisplay *display.StatusDisplay `json:"status_display,omitempty"`
}
UIApproval is the format expected by the frontend Approval interface
type UnifiedEvent ¶
type UnifiedEvent struct {
ID string `json:"id"`
CreatedAt string `json:"created_at"` // ISO8601 timestamp
Type string `json:"message_type"` // e.g., "notification", "handoff", "claude_code_turn"
FromAgent string `json:"from_agent"` // e.g., "eval-suite", "claude-code"
ToInbox string `json:"to_inbox"` // e.g., "user", "coordinator"
Title string `json:"title"` // Display title
TaskID string `json:"task_id"` // For linking to hierarchy/waterfall
Status string `json:"status"` // e.g., "unread", "read"
Payload string `json:"payload,omitempty"` // Message payload (inbox messages only)
CorrelationID string `json:"correlation_id,omitempty"` // For linking related messages
Source string `json:"source"` // "inbox" or "claude_code"
// Claude Code specific fields
CostUSD float64 `json:"cost_usd,omitempty"`
TokensIn int64 `json:"tokens_in,omitempty"`
TokensOut int64 `json:"tokens_out,omitempty"`
TurnCount int `json:"turn_count,omitempty"`
DurationMs int `json:"duration_ms,omitempty"`
Workspace string `json:"workspace,omitempty"` // Working directory for Claude Code events
Model string `json:"model,omitempty"` // AI model used (e.g., "claude-opus-4-5-20251101")
Provider string `json:"provider,omitempty"` // AI provider (e.g., "claude", "gemini")
SourceType string `json:"source_type,omitempty"` // Source type: coordinator, eval, user_session, etc.
Directive string `json:"directive,omitempty"` // Initial user prompt (truncated preview)
DirectiveFull string `json:"directive_full,omitempty"` // Full directive (for detail views)
MetricsSummary string `json:"metrics_summary,omitempty"` // "3 turns • $0.42 • 12.5s"
}
UnifiedEvent represents either an inbox message or a Claude Code event This provides a consistent format for the Event Queue in the dashboard
type UnifiedStatsResponse ¶
type UnifiedStatsResponse struct {
// Observatory metrics (canonical source of truth for telemetry)
Observatory *ObservatoryStats `json:"observatory"`
// Coordinator runtime state (subset of observatory - delegated tasks only)
Coordinator *CoordinatorRuntimeStats `json:"coordinator"`
// Metadata about data sources
Sources DataSources `json:"sources"`
}
UnifiedStatsResponse combines Observatory telemetry with Coordinator runtime state
type UsageTimeSeriesPoint ¶
type UsageTimeSeriesPoint struct {
Bucket string `json:"bucket"` // ISO8601 bucket start
BucketEnd string `json:"bucket_end"` // ISO8601 bucket end
Cost float64 `json:"cost"` // Total cost in bucket
Tokens int64 `json:"tokens"` // Total tokens
TokensIn int64 `json:"tokens_in"` // Total input tokens
TokensOut int64 `json:"tokens_out"` // Total output tokens
Turns int `json:"turns"` // Turn count (api_request spans)
Spans int `json:"spans"` // Total span count
TaskCount int `json:"task_count"` // Distinct tasks
DurationMs int64 `json:"duration_ms"` // Total duration in bucket (ms)
ByDimension map[string]float64 `json:"by_dimension,omitempty"` // Split by dimension
}
UsageTimeSeriesPoint represents aggregated metrics for a time bucket
type UsageTimeSeriesResponse ¶
type UsageTimeSeriesResponse struct {
Points []UsageTimeSeriesPoint `json:"points"`
Interval string `json:"interval"` // hour, day, week
Metric string `json:"metric"` // Primary metric shown
SplitBy string `json:"split_by,omitempty"` // Dimension for split
TotalCost float64 `json:"total_cost"`
CliCommand string `json:"cli_command"`
}
UsageTimeSeriesResponse is the response for GET /api/controlplane/usage-timeseries
type WorkspaceAccessInfo ¶
type WorkspaceAccessInfo struct {
RequestedWorkspace string // The workspace requested in query/header
AccessibleWorkspaces []string // All workspaces the user can access
Role string // User's role in the requested workspace (Viewer/Approver)
IsPublicOnly bool // True if user is unauthenticated (public workspaces only)
}
WorkspaceAccessInfo contains the user's workspace access information for this request.
func GetWorkspaceAccessFromContext ¶
func GetWorkspaceAccessFromContext(r *http.Request) *WorkspaceAccessInfo
GetWorkspaceAccessFromContext retrieves workspace access info from request context.
Source Files
¶
- ailang_bridge.go
- audit.go
- handlers_agents.go
- handlers_analytics.go
- handlers_analytics_types.go
- handlers_analytics_usage.go
- handlers_approvals.go
- handlers_auth.go
- handlers_budget.go
- handlers_chains.go
- handlers_chains_routes.go
- handlers_claude_hooks.go
- handlers_claudehistory.go
- handlers_controlplane_exec_hierarchy.go
- handlers_controlplane_heatmap.go
- handlers_controlplane_stats.go
- handlers_controlplane_task_hierarchy.go
- handlers_controlplane_topology.go
- handlers_coordinator.go
- handlers_exec.go
- handlers_hooks.go
- handlers_hooks_test_bench.go
- handlers_inbox.go
- handlers_messages.go
- handlers_monitor.go
- handlers_statistics.go
- handlers_threads.go
- handlers_util.go
- monitor.go
- pubsub_events.go
- response_cache.go
- server.go
- telemetry.go