Documentation
¶
Index ¶
- Constants
- func ApplyCanonicalTelemetryViewWithOptions(ctx context.Context, dbPath string, snaps map[string]core.UsageSnapshot, ...) (map[string]core.UsageSnapshot, error)
- func BuildDedupKey(event IngestRequest) string
- func DefaultDBPath() (string, error)
- func DefaultHookSpoolDir() (string, error)
- func DefaultSocketPath() (string, error)
- func DefaultSpoolDir() (string, error)
- func DefaultStateDir() (string, error)
- type CanonicalEvent
- type Collector
- type EventStatus
- type EventType
- type FlushResult
- type IngestRequest
- type IngestResult
- type PendingRecord
- type Pipeline
- type QuotaSnapshotIngestor
- type ReadModelOptions
- type SourceChannel
- type SourceCollector
- type SourceSystem
- type Spool
- type SpoolCleanupPolicy
- type SpoolCleanupResult
- type SpoolRecord
- type Store
- func (s *Store) Close() error
- func (s *Store) Ingest(ctx context.Context, req IngestRequest) (IngestResult, error)
- func (s *Store) Init(ctx context.Context) error
- func (s *Store) PruneOldEvents(ctx context.Context, retentionDays int) (int64, error)
- func (s *Store) PruneOrphanRawEvents(ctx context.Context, limit int) (int64, error)
Constants ¶
View Source
const DefaultNormalizationVersion = "v1"
Variables ¶
This section is empty.
Functions ¶
func ApplyCanonicalTelemetryViewWithOptions ¶
func ApplyCanonicalTelemetryViewWithOptions( ctx context.Context, dbPath string, snaps map[string]core.UsageSnapshot, options ReadModelOptions, ) (map[string]core.UsageSnapshot, error)
ApplyCanonicalTelemetryView hydrates snapshots from canonical telemetry streams. Root quota values come from limit_snapshot events, then canonical usage aggregates are applied.
func BuildDedupKey ¶
func BuildDedupKey(event IngestRequest) string
BuildDedupKey computes a stable event fingerprint with priority for tool_call_id > message_id > turn_id > fallback fingerprint.
func DefaultDBPath ¶
func DefaultHookSpoolDir ¶ added in v0.6.2
func DefaultSocketPath ¶
func DefaultSpoolDir ¶
func DefaultStateDir ¶
Types ¶
type CanonicalEvent ¶
type CanonicalEvent struct {
EventID string `json:"event_id"`
OccurredAt time.Time `json:"occurred_at"`
ProviderID string `json:"provider_id,omitempty"`
AgentName string `json:"agent_name"`
AccountID string `json:"account_id,omitempty"`
WorkspaceID string `json:"workspace_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
TurnID string `json:"turn_id,omitempty"`
MessageID string `json:"message_id,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
EventType EventType `json:"event_type"`
ModelRaw string `json:"model_raw,omitempty"`
ModelCanonical string `json:"model_canonical,omitempty"`
ModelLineageID string `json:"model_lineage_id,omitempty"`
InputTokens *int64 `json:"input_tokens,omitempty"`
OutputTokens *int64 `json:"output_tokens,omitempty"`
ReasoningTokens *int64 `json:"reasoning_tokens,omitempty"`
CacheReadTokens *int64 `json:"cache_read_tokens,omitempty"`
CacheWriteTokens *int64 `json:"cache_write_tokens,omitempty"`
TotalTokens *int64 `json:"total_tokens,omitempty"`
CostUSD *float64 `json:"cost_usd,omitempty"`
Requests *int64 `json:"requests,omitempty"`
ToolName string `json:"tool_name,omitempty"`
Status EventStatus `json:"status"`
DedupKey string `json:"dedup_key"`
RawEventID string `json:"raw_event_id"`
NormalizationVersion string `json:"normalization_version"`
}
type Collector ¶
type Collector interface {
Name() string
Collect(ctx context.Context) ([]IngestRequest, error)
}
type EventStatus ¶
type EventStatus string
const ( EventStatusOK EventStatus = "ok" EventStatusError EventStatus = "error" EventStatusAborted EventStatus = "aborted" EventStatusUnknown EventStatus = "unknown" )
type EventType ¶
type EventType string
const ( EventTypeTurnCompleted EventType = "turn_completed" EventTypeMessageUsage EventType = "message_usage" EventTypeToolUsage EventType = "tool_usage" EventTypeRawEnvelope EventType = "raw_envelope" EventTypeLimitSnapshot EventType = "limit_snapshot" EventTypeReconcileAdjust EventType = "reconcile_adjustment" )
type IngestRequest ¶
type IngestRequest struct {
SourceSystem SourceSystem `json:"source_system"`
SourceChannel SourceChannel `json:"source_channel"`
SourceSchemaVersion string `json:"source_schema_version"`
OccurredAt time.Time `json:"occurred_at"`
WorkspaceID string `json:"workspace_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
TurnID string `json:"turn_id,omitempty"`
MessageID string `json:"message_id,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
ProviderID string `json:"provider_id,omitempty"`
AccountID string `json:"account_id,omitempty"`
AgentName string `json:"agent_name,omitempty"`
EventType EventType `json:"event_type,omitempty"`
ModelRaw string `json:"model_raw,omitempty"`
ModelCanonical string `json:"model_canonical,omitempty"`
ModelLineageID string `json:"model_lineage_id,omitempty"`
InputTokens *int64 `json:"input_tokens,omitempty"`
OutputTokens *int64 `json:"output_tokens,omitempty"`
ReasoningTokens *int64 `json:"reasoning_tokens,omitempty"`
CacheReadTokens *int64 `json:"cache_read_tokens,omitempty"`
CacheWriteTokens *int64 `json:"cache_write_tokens,omitempty"`
TotalTokens *int64 `json:"total_tokens,omitempty"`
CostUSD *float64 `json:"cost_usd,omitempty"`
Requests *int64 `json:"requests,omitempty"`
ToolName string `json:"tool_name,omitempty"`
Status EventStatus `json:"status,omitempty"`
NormalizationVersion string `json:"normalization_version,omitempty"`
Payload any `json:"payload,omitempty"`
}
IngestRequest is the normalized contract used by local adapters and workers before writing to the telemetry store.
func BuildLimitSnapshotRequests ¶
func BuildLimitSnapshotRequests(snaps map[string]core.UsageSnapshot) []IngestRequest
BuildLimitSnapshotRequests turns provider fetch snapshots into normalized telemetry events. This makes provider quota usage part of the same canonical stream.
func ParseSourceHookPayload ¶
func ParseSourceHookPayload( source shared.TelemetrySource, raw []byte, options shared.TelemetryCollectOptions, accountOverride string, ) ([]IngestRequest, error)
type IngestResult ¶
type PendingRecord ¶
type PendingRecord struct {
Path string
Record SpoolRecord
}
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
func NewPipeline ¶
func (*Pipeline) EnqueueRequests ¶
func (p *Pipeline) EnqueueRequests(reqs []IngestRequest) (int, error)
type QuotaSnapshotIngestor ¶
type QuotaSnapshotIngestor struct {
// contains filtered or unexported fields
}
func NewQuotaSnapshotIngestor ¶
func NewQuotaSnapshotIngestor(store *Store) *QuotaSnapshotIngestor
func (*QuotaSnapshotIngestor) Ingest ¶
func (i *QuotaSnapshotIngestor) Ingest(ctx context.Context, snaps map[string]core.UsageSnapshot) error
type ReadModelOptions ¶
type SourceChannel ¶
type SourceChannel string
const ( SourceChannelHook SourceChannel = "hook" SourceChannelSSE SourceChannel = "sse" SourceChannelJSONL SourceChannel = "jsonl" SourceChannelAPI SourceChannel = "api" SourceChannelSQLite SourceChannel = "sqlite" )
type SourceCollector ¶
type SourceCollector struct {
Source shared.TelemetrySource
Options shared.TelemetryCollectOptions
AccountOverride string
}
func NewSourceCollector ¶
func NewSourceCollector( source shared.TelemetrySource, options shared.TelemetryCollectOptions, accountOverride string, ) *SourceCollector
func (*SourceCollector) Collect ¶
func (c *SourceCollector) Collect(ctx context.Context) ([]IngestRequest, error)
func (*SourceCollector) Name ¶
func (c *SourceCollector) Name() string
type SourceSystem ¶
type SourceSystem string
const (
SourceSystemPoller SourceSystem = "provider_poller"
)
type Spool ¶
type Spool struct {
// contains filtered or unexported fields
}
func (*Spool) Cleanup ¶
func (s *Spool) Cleanup(policy SpoolCleanupPolicy) (SpoolCleanupResult, error)
func (*Spool) MarkFailed ¶
func (*Spool) ReadOldest ¶
func (s *Spool) ReadOldest(limit int) ([]PendingRecord, error)
type SpoolCleanupPolicy ¶
type SpoolCleanupResult ¶
type SpoolRecord ¶
type SpoolRecord struct {
SpoolID string `json:"spool_id"`
CreatedAt time.Time `json:"created_at"`
SourceSystem SourceSystem `json:"source_system"`
SourceChannel SourceChannel `json:"source_channel"`
Payload json.RawMessage `json:"payload"`
Attempt int `json:"attempt"`
LastError string `json:"last_error,omitempty"`
}
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
func (*Store) Ingest ¶
func (s *Store) Ingest(ctx context.Context, req IngestRequest) (IngestResult, error)
func (*Store) PruneOldEvents ¶ added in v0.5.2
PruneOldEvents deletes usage_events older than retentionDays and returns the count deleted.
Click to show internal directories.
Click to hide internal directories.