telemetry

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultNormalizationVersion = "v1"

Variables

This section is empty.

Functions

func ApplyCanonicalTelemetryViewWithOptions

func ApplyCanonicalTelemetryViewWithOptions(
	ctx context.Context,
	dbPath string,
	snaps map[string]core.UsageSnapshot,
	options ReadModelOptions,
) (map[string]core.UsageSnapshot, error)

ApplyCanonicalTelemetryView hydrates snapshots from canonical telemetry streams. Root quota values come from limit_snapshot events, then canonical usage aggregates are applied.

func BuildDedupKey

func BuildDedupKey(event IngestRequest) string

BuildDedupKey computes a stable event fingerprint with priority for tool_call_id > message_id > turn_id > fallback fingerprint.

func DefaultDBPath

func DefaultDBPath() (string, error)

func DefaultSocketPath

func DefaultSocketPath() (string, error)

func DefaultSpoolDir

func DefaultSpoolDir() (string, error)

func DefaultStateDir

func DefaultStateDir() (string, error)

Types

type CanonicalEvent

type CanonicalEvent struct {
	EventID string `json:"event_id"`

	OccurredAt           time.Time   `json:"occurred_at"`
	ProviderID           string      `json:"provider_id,omitempty"`
	AgentName            string      `json:"agent_name"`
	AccountID            string      `json:"account_id,omitempty"`
	WorkspaceID          string      `json:"workspace_id,omitempty"`
	SessionID            string      `json:"session_id,omitempty"`
	TurnID               string      `json:"turn_id,omitempty"`
	MessageID            string      `json:"message_id,omitempty"`
	ToolCallID           string      `json:"tool_call_id,omitempty"`
	EventType            EventType   `json:"event_type"`
	ModelRaw             string      `json:"model_raw,omitempty"`
	ModelCanonical       string      `json:"model_canonical,omitempty"`
	ModelLineageID       string      `json:"model_lineage_id,omitempty"`
	InputTokens          *int64      `json:"input_tokens,omitempty"`
	OutputTokens         *int64      `json:"output_tokens,omitempty"`
	ReasoningTokens      *int64      `json:"reasoning_tokens,omitempty"`
	CacheReadTokens      *int64      `json:"cache_read_tokens,omitempty"`
	CacheWriteTokens     *int64      `json:"cache_write_tokens,omitempty"`
	TotalTokens          *int64      `json:"total_tokens,omitempty"`
	CostUSD              *float64    `json:"cost_usd,omitempty"`
	Requests             *int64      `json:"requests,omitempty"`
	ToolName             string      `json:"tool_name,omitempty"`
	Status               EventStatus `json:"status"`
	DedupKey             string      `json:"dedup_key"`
	RawEventID           string      `json:"raw_event_id"`
	NormalizationVersion string      `json:"normalization_version"`
}

type Collector

type Collector interface {
	Name() string
	Collect(ctx context.Context) ([]IngestRequest, error)
}

type EventStatus

type EventStatus string
const (
	EventStatusOK      EventStatus = "ok"
	EventStatusError   EventStatus = "error"
	EventStatusAborted EventStatus = "aborted"
	EventStatusUnknown EventStatus = "unknown"
)

type EventType

type EventType string
const (
	EventTypeTurnCompleted   EventType = "turn_completed"
	EventTypeMessageUsage    EventType = "message_usage"
	EventTypeToolUsage       EventType = "tool_usage"
	EventTypeRawEnvelope     EventType = "raw_envelope"
	EventTypeLimitSnapshot   EventType = "limit_snapshot"
	EventTypeReconcileAdjust EventType = "reconcile_adjustment"
)

type FlushResult

type FlushResult struct {
	Processed int
	Ingested  int
	Deduped   int
	Failed    int
}

type IngestRequest

type IngestRequest struct {
	SourceSystem        SourceSystem  `json:"source_system"`
	SourceChannel       SourceChannel `json:"source_channel"`
	SourceSchemaVersion string        `json:"source_schema_version"`
	OccurredAt          time.Time     `json:"occurred_at"`
	WorkspaceID         string        `json:"workspace_id,omitempty"`
	SessionID           string        `json:"session_id,omitempty"`
	TurnID              string        `json:"turn_id,omitempty"`
	MessageID           string        `json:"message_id,omitempty"`
	ToolCallID          string        `json:"tool_call_id,omitempty"`
	ProviderID          string        `json:"provider_id,omitempty"`
	AccountID           string        `json:"account_id,omitempty"`

	AgentName            string      `json:"agent_name,omitempty"`
	EventType            EventType   `json:"event_type,omitempty"`
	ModelRaw             string      `json:"model_raw,omitempty"`
	ModelCanonical       string      `json:"model_canonical,omitempty"`
	ModelLineageID       string      `json:"model_lineage_id,omitempty"`
	InputTokens          *int64      `json:"input_tokens,omitempty"`
	OutputTokens         *int64      `json:"output_tokens,omitempty"`
	ReasoningTokens      *int64      `json:"reasoning_tokens,omitempty"`
	CacheReadTokens      *int64      `json:"cache_read_tokens,omitempty"`
	CacheWriteTokens     *int64      `json:"cache_write_tokens,omitempty"`
	TotalTokens          *int64      `json:"total_tokens,omitempty"`
	CostUSD              *float64    `json:"cost_usd,omitempty"`
	Requests             *int64      `json:"requests,omitempty"`
	ToolName             string      `json:"tool_name,omitempty"`
	Status               EventStatus `json:"status,omitempty"`
	NormalizationVersion string      `json:"normalization_version,omitempty"`
	Payload              any         `json:"payload,omitempty"`
}

IngestRequest is the normalized contract used by local adapters and workers before writing to the telemetry store.

func BuildLimitSnapshotRequests

func BuildLimitSnapshotRequests(snaps map[string]core.UsageSnapshot) []IngestRequest

BuildLimitSnapshotRequests turns provider fetch snapshots into normalized telemetry events. This makes provider quota usage part of the same canonical stream.

func ParseSourceHookPayload

func ParseSourceHookPayload(
	source shared.TelemetrySource,
	raw []byte,
	options shared.TelemetryCollectOptions,
	accountOverride string,
) ([]IngestRequest, error)

type IngestResult

type IngestResult struct {
	Status     string `json:"status"`
	Deduped    bool   `json:"deduped"`
	EventID    string `json:"event_id"`
	RawEventID string `json:"raw_event_id"`
}

type PendingRecord

type PendingRecord struct {
	Path   string
	Record SpoolRecord
}

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(store *Store, spool *Spool) *Pipeline

func (*Pipeline) EnqueueRequests

func (p *Pipeline) EnqueueRequests(reqs []IngestRequest) (int, error)

func (*Pipeline) Flush

func (p *Pipeline) Flush(ctx context.Context, limit int) (FlushResult, error)

type QuotaSnapshotIngestor

type QuotaSnapshotIngestor struct {
	// contains filtered or unexported fields
}

func NewQuotaSnapshotIngestor

func NewQuotaSnapshotIngestor(store *Store) *QuotaSnapshotIngestor

func (*QuotaSnapshotIngestor) Ingest

type ReadModelOptions

type ReadModelOptions struct {
	ProviderLinks map[string]string
}

type SourceChannel

type SourceChannel string
const (
	SourceChannelHook   SourceChannel = "hook"
	SourceChannelSSE    SourceChannel = "sse"
	SourceChannelJSONL  SourceChannel = "jsonl"
	SourceChannelAPI    SourceChannel = "api"
	SourceChannelSQLite SourceChannel = "sqlite"
)

type SourceCollector

type SourceCollector struct {
	Source          shared.TelemetrySource
	Options         shared.TelemetryCollectOptions
	AccountOverride string
}

func NewSourceCollector

func NewSourceCollector(
	source shared.TelemetrySource,
	options shared.TelemetryCollectOptions,
	accountOverride string,
) *SourceCollector

func (*SourceCollector) Collect

func (c *SourceCollector) Collect(ctx context.Context) ([]IngestRequest, error)

func (*SourceCollector) Name

func (c *SourceCollector) Name() string

type SourceSystem

type SourceSystem string
const (
	SourceSystemPoller SourceSystem = "provider_poller"
)

type Spool

type Spool struct {
	// contains filtered or unexported fields
}

func NewSpool

func NewSpool(dir string) *Spool

func (*Spool) Ack

func (s *Spool) Ack(path string) error

func (*Spool) Append

func (s *Spool) Append(record SpoolRecord) (string, error)

func (*Spool) Cleanup

func (s *Spool) Cleanup(policy SpoolCleanupPolicy) (SpoolCleanupResult, error)

func (*Spool) MarkFailed

func (s *Spool) MarkFailed(path, lastError string) error

func (*Spool) ReadOldest

func (s *Spool) ReadOldest(limit int) ([]PendingRecord, error)

type SpoolCleanupPolicy

type SpoolCleanupPolicy struct {
	MaxAge   time.Duration
	MaxFiles int
	MaxBytes int64
}

type SpoolCleanupResult

type SpoolCleanupResult struct {
	RemovedFiles   int
	RemovedBytes   int64
	RemainingFiles int
	RemainingBytes int64
}

type SpoolRecord

type SpoolRecord struct {
	SpoolID       string          `json:"spool_id"`
	CreatedAt     time.Time       `json:"created_at"`
	SourceSystem  SourceSystem    `json:"source_system"`
	SourceChannel SourceChannel   `json:"source_channel"`
	Payload       json.RawMessage `json:"payload"`
	Attempt       int             `json:"attempt"`
	LastError     string          `json:"last_error,omitempty"`
}

type Store

type Store struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(db *sql.DB) *Store

func OpenStore

func OpenStore(path string) (*Store, error)

func (*Store) Close

func (s *Store) Close() error

func (*Store) Ingest

func (s *Store) Ingest(ctx context.Context, req IngestRequest) (IngestResult, error)

func (*Store) Init

func (s *Store) Init(ctx context.Context) error

func (*Store) PruneOrphanRawEvents

func (s *Store) PruneOrphanRawEvents(ctx context.Context, limit int) (int64, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL