telemetry

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultNormalizationVersion = "v1"

Variables

This section is empty.

Functions

func ApplyCanonicalTelemetryViewWithOptions

func ApplyCanonicalTelemetryViewWithOptions(
	ctx context.Context,
	dbPath string,
	snaps map[string]core.UsageSnapshot,
	options ReadModelOptions,
) (map[string]core.UsageSnapshot, error)

ApplyCanonicalTelemetryView hydrates snapshots from canonical telemetry streams. Root quota values come from limit_snapshot events, then canonical usage aggregates are applied.

func BuildDedupKey

func BuildDedupKey(event IngestRequest) string

BuildDedupKey computes a stable event fingerprint with priority for tool_call_id > message_id > turn_id > fallback fingerprint.

func DefaultDBPath

func DefaultDBPath() (string, error)

func DefaultHookSpoolDir added in v0.6.2

func DefaultHookSpoolDir() (string, error)

func DefaultSocketPath

func DefaultSocketPath() (string, error)

func DefaultSpoolDir

func DefaultSpoolDir() (string, error)

func DefaultStateDir

func DefaultStateDir() (string, error)

func RunWALCheckpointLoop added in v0.6.10

func RunWALCheckpointLoop(ctx context.Context, db *sql.DB, dbPath string, logFn func(string, string, string))

RunWALCheckpointLoop periodically checkpoints the WAL file to prevent unbounded growth. This is critical because with multiple open connections and continuous reads, SQLite's auto-checkpoint may never find a window to run.

func WALCheckpoint added in v0.6.10

func WALCheckpoint(ctx context.Context, db *sql.DB) error

WALCheckpoint runs a TRUNCATE checkpoint, folding the WAL back into the main database file and truncating the WAL to zero bytes. It is safe to call concurrently — SQLite serialises checkpoint operations internally.

func WALSizeBytes added in v0.6.10

func WALSizeBytes(dbPath string) int64

WALSizeBytes returns the current size of the WAL file for the given DB path. Returns 0 if the file does not exist.

Types

type CanonicalEvent

type CanonicalEvent struct {
	EventID string `json:"event_id"`

	OccurredAt     time.Time `json:"occurred_at"`
	ProviderID     string    `json:"provider_id,omitempty"`
	AgentName      string    `json:"agent_name"`
	AccountID      string    `json:"account_id,omitempty"`
	WorkspaceID    string    `json:"workspace_id,omitempty"`
	SessionID      string    `json:"session_id,omitempty"`
	TurnID         string    `json:"turn_id,omitempty"`
	MessageID      string    `json:"message_id,omitempty"`
	ToolCallID     string    `json:"tool_call_id,omitempty"`
	EventType      EventType `json:"event_type"`
	ModelRaw       string    `json:"model_raw,omitempty"`
	ModelCanonical string    `json:"model_canonical,omitempty"`
	ModelLineageID string    `json:"model_lineage_id,omitempty"`
	core.TokenUsage
	ToolName             string      `json:"tool_name,omitempty"`
	Status               EventStatus `json:"status"`
	DedupKey             string      `json:"dedup_key"`
	RawEventID           string      `json:"raw_event_id"`
	NormalizationVersion string      `json:"normalization_version"`
}

type Collector

type Collector interface {
	Name() string
	Collect(ctx context.Context) ([]IngestRequest, error)
}

type EventStatus

type EventStatus string
const (
	EventStatusOK      EventStatus = "ok"
	EventStatusError   EventStatus = "error"
	EventStatusAborted EventStatus = "aborted"
	EventStatusUnknown EventStatus = "unknown"
)

type EventType

type EventType string
const (
	EventTypeTurnCompleted   EventType = "turn_completed"
	EventTypeMessageUsage    EventType = "message_usage"
	EventTypeToolUsage       EventType = "tool_usage"
	EventTypeRawEnvelope     EventType = "raw_envelope"
	EventTypeLimitSnapshot   EventType = "limit_snapshot"
	EventTypeReconcileAdjust EventType = "reconcile_adjustment"
)

type FlushResult

type FlushResult struct {
	Processed int
	Ingested  int
	Deduped   int
	Failed    int
}

type IngestRequest

type IngestRequest struct {
	SourceSystem        SourceSystem  `json:"source_system"`
	SourceChannel       SourceChannel `json:"source_channel"`
	SourceSchemaVersion string        `json:"source_schema_version"`
	OccurredAt          time.Time     `json:"occurred_at"`
	WorkspaceID         string        `json:"workspace_id,omitempty"`
	SessionID           string        `json:"session_id,omitempty"`
	TurnID              string        `json:"turn_id,omitempty"`
	MessageID           string        `json:"message_id,omitempty"`
	ToolCallID          string        `json:"tool_call_id,omitempty"`
	ProviderID          string        `json:"provider_id,omitempty"`
	AccountID           string        `json:"account_id,omitempty"`

	AgentName      string    `json:"agent_name,omitempty"`
	EventType      EventType `json:"event_type,omitempty"`
	ModelRaw       string    `json:"model_raw,omitempty"`
	ModelCanonical string    `json:"model_canonical,omitempty"`
	ModelLineageID string    `json:"model_lineage_id,omitempty"`
	core.TokenUsage
	ToolName             string      `json:"tool_name,omitempty"`
	Status               EventStatus `json:"status,omitempty"`
	NormalizationVersion string      `json:"normalization_version,omitempty"`
	Payload              any         `json:"payload,omitempty"`
}

IngestRequest is the normalized contract used by local adapters and workers before writing to the telemetry store.

func BuildLimitSnapshotRequests

func BuildLimitSnapshotRequests(snaps map[string]core.UsageSnapshot) []IngestRequest

BuildLimitSnapshotRequests turns provider fetch snapshots into normalized telemetry events. This makes provider quota usage part of the same canonical stream.

func ParseSourceHookPayload

func ParseSourceHookPayload(
	source shared.TelemetrySource,
	raw []byte,
	options shared.TelemetryCollectOptions,
	accountOverride string,
) ([]IngestRequest, error)

type IngestResult

type IngestResult struct {
	Status     string `json:"status"`
	Deduped    bool   `json:"deduped"`
	EventID    string `json:"event_id"`
	RawEventID string `json:"raw_event_id"`
}

type PendingRecord

type PendingRecord struct {
	Path   string
	Record SpoolRecord
}

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(store *Store, spool *Spool) *Pipeline

func (*Pipeline) EnqueueRequests

func (p *Pipeline) EnqueueRequests(reqs []IngestRequest) (int, error)

func (*Pipeline) Flush

func (p *Pipeline) Flush(ctx context.Context, limit int) (FlushResult, error)

type QuotaSnapshotIngestor

type QuotaSnapshotIngestor struct {
	// contains filtered or unexported fields
}

func NewQuotaSnapshotIngestor

func NewQuotaSnapshotIngestor(store *Store) *QuotaSnapshotIngestor

func (*QuotaSnapshotIngestor) Ingest

type ReadModelOptions

type ReadModelOptions struct {
	ProviderLinks map[string]string
	Since         time.Time
	TodaySince    time.Time
	TimeWindow    core.TimeWindow
}

type SourceChannel

type SourceChannel string
const (
	SourceChannelHook   SourceChannel = "hook"
	SourceChannelSSE    SourceChannel = "sse"
	SourceChannelJSONL  SourceChannel = "jsonl"
	SourceChannelAPI    SourceChannel = "api"
	SourceChannelSQLite SourceChannel = "sqlite"
)

type SourceCollector

type SourceCollector struct {
	Source          shared.TelemetrySource
	Options         shared.TelemetryCollectOptions
	AccountOverride string
}

func NewSourceCollector

func NewSourceCollector(
	source shared.TelemetrySource,
	options shared.TelemetryCollectOptions,
	accountOverride string,
) *SourceCollector

func (*SourceCollector) Collect

func (c *SourceCollector) Collect(ctx context.Context) ([]IngestRequest, error)

func (*SourceCollector) Name

func (c *SourceCollector) Name() string

type SourceSystem

type SourceSystem string
const (
	SourceSystemPoller SourceSystem = "provider_poller"
)

type Spool

type Spool struct {
	// contains filtered or unexported fields
}

func NewSpool

func NewSpool(dir string) *Spool

func (*Spool) Ack

func (s *Spool) Ack(path string) error

func (*Spool) Append

func (s *Spool) Append(record SpoolRecord) (string, error)

func (*Spool) Cleanup

func (s *Spool) Cleanup(policy SpoolCleanupPolicy) (SpoolCleanupResult, error)

func (*Spool) MarkFailed

func (s *Spool) MarkFailed(path, lastError string) error

func (*Spool) ReadOldest

func (s *Spool) ReadOldest(limit int) ([]PendingRecord, error)

type SpoolCleanupPolicy

type SpoolCleanupPolicy struct {
	MaxAge   time.Duration
	MaxFiles int
	MaxBytes int64
}

type SpoolCleanupResult

type SpoolCleanupResult struct {
	RemovedFiles   int
	RemovedBytes   int64
	RemainingFiles int
	RemainingBytes int64
}

type SpoolRecord

type SpoolRecord struct {
	SpoolID       string          `json:"spool_id"`
	CreatedAt     time.Time       `json:"created_at"`
	SourceSystem  SourceSystem    `json:"source_system"`
	SourceChannel SourceChannel   `json:"source_channel"`
	Payload       json.RawMessage `json:"payload"`
	Attempt       int             `json:"attempt"`
	LastError     string          `json:"last_error,omitempty"`
}

type Store

type Store struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(db *sql.DB) *Store

func OpenStore

func OpenStore(path string) (*Store, error)

func (*Store) Analyze added in v0.8.0

func (s *Store) Analyze(ctx context.Context) error

Analyze updates SQLite's query planner statistics for all tables and indexes.

func (*Store) Close

func (s *Store) Close() error

func (*Store) DB added in v0.6.10

func (s *Store) DB() *sql.DB

DB returns the underlying database handle for operations that need direct access (e.g. WAL checkpointing).

func (*Store) Ingest

func (s *Store) Ingest(ctx context.Context, req IngestRequest) (IngestResult, error)

func (*Store) Init

func (s *Store) Init(ctx context.Context) error

func (*Store) PruneOldEvents added in v0.5.2

func (s *Store) PruneOldEvents(ctx context.Context, retentionDays int) (int64, error)

PruneOldEvents deletes usage_events older than retentionDays and returns the count deleted.

func (*Store) PruneOrphanRawEvents

func (s *Store) PruneOrphanRawEvents(ctx context.Context, limit int) (int64, error)

func (*Store) PruneRawEventPayloads added in v0.6.5

func (s *Store) PruneRawEventPayloads(ctx context.Context, retentionHours int, limit int) (int64, error)

PruneRawEventPayloads clears source_payload from old raw events to reclaim disk space. All useful data has already been extracted into usage_events. Keeps payloads for events newer than retentionHours.

func (*Store) RunMigrations added in v0.6.5

func (s *Store) RunMigrations(ctx context.Context) error

RunMigrations runs one-shot data repair migrations. Called at daemon startup.

func (*Store) Vacuum added in v0.8.0

func (s *Store) Vacuum(ctx context.Context) error

Vacuum reclaims disk space from deleted rows. Should be called after large batch deletions (e.g. retention pruning). This can be slow on large databases.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL