Documentation
¶
Index ¶
- Constants
- func ApplyCanonicalTelemetryViewWithOptions(ctx context.Context, dbPath string, snaps map[string]core.UsageSnapshot, ...) (map[string]core.UsageSnapshot, error)
- func BuildDedupKey(event IngestRequest) string
- func DefaultDBPath() (string, error)
- func DefaultHookSpoolDir() (string, error)
- func DefaultSocketPath() (string, error)
- func DefaultSpoolDir() (string, error)
- func DefaultStateDir() (string, error)
- func RunWALCheckpointLoop(ctx context.Context, db *sql.DB, dbPath string, ...)
- func WALCheckpoint(ctx context.Context, db *sql.DB) error
- func WALSizeBytes(dbPath string) int64
- type CanonicalEvent
- type Collector
- type EventStatus
- type EventType
- type FlushResult
- type IngestRequest
- type IngestResult
- type PendingRecord
- type Pipeline
- type QuotaSnapshotIngestor
- type ReadModelOptions
- type SourceChannel
- type SourceCollector
- type SourceSystem
- type Spool
- type SpoolCleanupPolicy
- type SpoolCleanupResult
- type SpoolRecord
- type Store
- func (s *Store) Analyze(ctx context.Context) error
- func (s *Store) Close() error
- func (s *Store) DB() *sql.DB
- func (s *Store) Ingest(ctx context.Context, req IngestRequest) (IngestResult, error)
- func (s *Store) Init(ctx context.Context) error
- func (s *Store) PruneOldEvents(ctx context.Context, retentionDays int) (int64, error)
- func (s *Store) PruneOrphanRawEvents(ctx context.Context, limit int) (int64, error)
- func (s *Store) PruneRawEventPayloads(ctx context.Context, retentionHours int, limit int) (int64, error)
- func (s *Store) RunMigrations(ctx context.Context) error
- func (s *Store) Vacuum(ctx context.Context) error
Constants ¶
const DefaultNormalizationVersion = "v1"
Variables ¶
This section is empty.
Functions ¶
func ApplyCanonicalTelemetryViewWithOptions ¶
func ApplyCanonicalTelemetryViewWithOptions( ctx context.Context, dbPath string, snaps map[string]core.UsageSnapshot, options ReadModelOptions, ) (map[string]core.UsageSnapshot, error)
ApplyCanonicalTelemetryView hydrates snapshots from canonical telemetry streams. Root quota values come from limit_snapshot events, then canonical usage aggregates are applied.
func BuildDedupKey ¶
func BuildDedupKey(event IngestRequest) string
BuildDedupKey computes a stable event fingerprint with priority for tool_call_id > message_id > turn_id > fallback fingerprint.
func DefaultDBPath ¶
func DefaultHookSpoolDir ¶ added in v0.6.2
func DefaultSocketPath ¶
func DefaultSpoolDir ¶
func DefaultStateDir ¶
func RunWALCheckpointLoop ¶ added in v0.6.10
func RunWALCheckpointLoop(ctx context.Context, db *sql.DB, dbPath string, logFn func(string, string, string))
RunWALCheckpointLoop periodically checkpoints the WAL file to prevent unbounded growth. This is critical because with multiple open connections and continuous reads, SQLite's auto-checkpoint may never find a window to run.
func WALCheckpoint ¶ added in v0.6.10
WALCheckpoint runs a TRUNCATE checkpoint, folding the WAL back into the main database file and truncating the WAL to zero bytes. It is safe to call concurrently — SQLite serialises checkpoint operations internally.
func WALSizeBytes ¶ added in v0.6.10
WALSizeBytes returns the current size of the WAL file for the given DB path. Returns 0 if the file does not exist.
Types ¶
type CanonicalEvent ¶
type CanonicalEvent struct {
EventID string `json:"event_id"`
OccurredAt time.Time `json:"occurred_at"`
ProviderID string `json:"provider_id,omitempty"`
AgentName string `json:"agent_name"`
AccountID string `json:"account_id,omitempty"`
WorkspaceID string `json:"workspace_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
TurnID string `json:"turn_id,omitempty"`
MessageID string `json:"message_id,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
EventType EventType `json:"event_type"`
ModelRaw string `json:"model_raw,omitempty"`
ModelCanonical string `json:"model_canonical,omitempty"`
ModelLineageID string `json:"model_lineage_id,omitempty"`
core.TokenUsage
ToolName string `json:"tool_name,omitempty"`
Status EventStatus `json:"status"`
DedupKey string `json:"dedup_key"`
RawEventID string `json:"raw_event_id"`
NormalizationVersion string `json:"normalization_version"`
}
type Collector ¶
type Collector interface {
Name() string
Collect(ctx context.Context) ([]IngestRequest, error)
}
type EventStatus ¶
type EventStatus string
const ( EventStatusOK EventStatus = "ok" EventStatusError EventStatus = "error" EventStatusAborted EventStatus = "aborted" EventStatusUnknown EventStatus = "unknown" )
type EventType ¶
type EventType string
const ( EventTypeTurnCompleted EventType = "turn_completed" EventTypeMessageUsage EventType = "message_usage" EventTypeToolUsage EventType = "tool_usage" EventTypeRawEnvelope EventType = "raw_envelope" EventTypeLimitSnapshot EventType = "limit_snapshot" EventTypeReconcileAdjust EventType = "reconcile_adjustment" )
type IngestRequest ¶
type IngestRequest struct {
SourceSystem SourceSystem `json:"source_system"`
SourceChannel SourceChannel `json:"source_channel"`
SourceSchemaVersion string `json:"source_schema_version"`
OccurredAt time.Time `json:"occurred_at"`
WorkspaceID string `json:"workspace_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
TurnID string `json:"turn_id,omitempty"`
MessageID string `json:"message_id,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
ProviderID string `json:"provider_id,omitempty"`
AccountID string `json:"account_id,omitempty"`
AgentName string `json:"agent_name,omitempty"`
EventType EventType `json:"event_type,omitempty"`
ModelRaw string `json:"model_raw,omitempty"`
ModelCanonical string `json:"model_canonical,omitempty"`
ModelLineageID string `json:"model_lineage_id,omitempty"`
core.TokenUsage
ToolName string `json:"tool_name,omitempty"`
Status EventStatus `json:"status,omitempty"`
NormalizationVersion string `json:"normalization_version,omitempty"`
Payload any `json:"payload,omitempty"`
}
IngestRequest is the normalized contract used by local adapters and workers before writing to the telemetry store.
func BuildLimitSnapshotRequests ¶
func BuildLimitSnapshotRequests(snaps map[string]core.UsageSnapshot) []IngestRequest
BuildLimitSnapshotRequests turns provider fetch snapshots into normalized telemetry events. This makes provider quota usage part of the same canonical stream.
func ParseSourceHookPayload ¶
func ParseSourceHookPayload( source shared.TelemetrySource, raw []byte, options shared.TelemetryCollectOptions, accountOverride string, ) ([]IngestRequest, error)
type IngestResult ¶
type PendingRecord ¶
type PendingRecord struct {
Path string
Record SpoolRecord
}
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
func NewPipeline ¶
func (*Pipeline) EnqueueRequests ¶
func (p *Pipeline) EnqueueRequests(reqs []IngestRequest) (int, error)
type QuotaSnapshotIngestor ¶
type QuotaSnapshotIngestor struct {
// contains filtered or unexported fields
}
func NewQuotaSnapshotIngestor ¶
func NewQuotaSnapshotIngestor(store *Store) *QuotaSnapshotIngestor
func (*QuotaSnapshotIngestor) Ingest ¶
func (i *QuotaSnapshotIngestor) Ingest(ctx context.Context, snaps map[string]core.UsageSnapshot) error
type ReadModelOptions ¶
type SourceChannel ¶
type SourceChannel string
const ( SourceChannelHook SourceChannel = "hook" SourceChannelSSE SourceChannel = "sse" SourceChannelJSONL SourceChannel = "jsonl" SourceChannelAPI SourceChannel = "api" SourceChannelSQLite SourceChannel = "sqlite" )
type SourceCollector ¶
type SourceCollector struct {
Source shared.TelemetrySource
Options shared.TelemetryCollectOptions
AccountOverride string
}
func NewSourceCollector ¶
func NewSourceCollector( source shared.TelemetrySource, options shared.TelemetryCollectOptions, accountOverride string, ) *SourceCollector
func (*SourceCollector) Collect ¶
func (c *SourceCollector) Collect(ctx context.Context) ([]IngestRequest, error)
func (*SourceCollector) Name ¶
func (c *SourceCollector) Name() string
type SourceSystem ¶
type SourceSystem string
const (
SourceSystemPoller SourceSystem = "provider_poller"
)
type Spool ¶
type Spool struct {
// contains filtered or unexported fields
}
func (*Spool) Cleanup ¶
func (s *Spool) Cleanup(policy SpoolCleanupPolicy) (SpoolCleanupResult, error)
func (*Spool) MarkFailed ¶
func (*Spool) ReadOldest ¶
func (s *Spool) ReadOldest(limit int) ([]PendingRecord, error)
type SpoolCleanupPolicy ¶
type SpoolCleanupResult ¶
type SpoolRecord ¶
type SpoolRecord struct {
SpoolID string `json:"spool_id"`
CreatedAt time.Time `json:"created_at"`
SourceSystem SourceSystem `json:"source_system"`
SourceChannel SourceChannel `json:"source_channel"`
Payload json.RawMessage `json:"payload"`
Attempt int `json:"attempt"`
LastError string `json:"last_error,omitempty"`
}
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
func (*Store) Analyze ¶ added in v0.8.0
Analyze updates SQLite's query planner statistics for all tables and indexes.
func (*Store) DB ¶ added in v0.6.10
DB returns the underlying database handle for operations that need direct access (e.g. WAL checkpointing).
func (*Store) Ingest ¶
func (s *Store) Ingest(ctx context.Context, req IngestRequest) (IngestResult, error)
func (*Store) PruneOldEvents ¶ added in v0.5.2
PruneOldEvents deletes usage_events older than retentionDays and returns the count deleted.
func (*Store) PruneOrphanRawEvents ¶
func (*Store) PruneRawEventPayloads ¶ added in v0.6.5
func (s *Store) PruneRawEventPayloads(ctx context.Context, retentionHours int, limit int) (int64, error)
PruneRawEventPayloads clears source_payload from old raw events to reclaim disk space. All useful data has already been extracted into usage_events. Keeps payloads for events newer than retentionHours.
func (*Store) RunMigrations ¶ added in v0.6.5
RunMigrations runs one-shot data repair migrations. Called at daemon startup.
Source Files
¶
- collector.go
- collector_source.go
- dedup.go
- hook_source.go
- paths.go
- pipeline.go
- provider_event_mapper.go
- provider_links.go
- quota_stream.go
- read_model.go
- spool.go
- sqlite.go
- store.go
- types.go
- usage_view.go
- usage_view_aggregate.go
- usage_view_helpers.go
- usage_view_languages.go
- usage_view_materialize.go
- usage_view_projection.go
- usage_view_queries.go
- usage_view_query_aggregates.go
- usage_view_query_daily.go