Documentation
¶
Overview ¶
Package trace defines trace models plus storage and async writer components.
Index ¶
- Constants
- Variables
- func ClassifyWriteError(err error) string
- func CoerceInt64(value any) (int64, bool)
- func DecodeMetadataMap(raw string) map[string]any
- func MetadataBool(metadata map[string]any, key string) (bool, bool)
- func MetadataInt64(metadata map[string]any, key string) (int64, bool)
- func MetadataString(metadata map[string]any, key string) string
- func OrderTime(item *Trace) time.Time
- func SortLineageTraces(items []*Trace)
- type AnalyticsFilter
- type CostPoint
- type CostSummary
- type ErrorRateStats
- type KeyStats
- type LatencyStats
- type ModelStats
- type PostgresStore
- func (s *PostgresStore) Close() error
- func (s *PostgresStore) GetCostSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]CostPoint, error)
- func (s *PostgresStore) GetCostSummary(ctx context.Context, filter AnalyticsFilter) (*CostSummary, error)
- func (s *PostgresStore) GetErrorRateBreakdown(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]ErrorRateStats, error)
- func (s *PostgresStore) GetKeyStats(ctx context.Context, filter AnalyticsFilter) ([]KeyStats, error)
- func (s *PostgresStore) GetLatencyPercentiles(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]LatencyStats, error)
- func (s *PostgresStore) GetModelStats(ctx context.Context, filter AnalyticsFilter) ([]ModelStats, error)
- func (s *PostgresStore) GetTrace(ctx context.Context, id string) (*Trace, error)
- func (s *PostgresStore) GetUsageSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]UsagePoint, error)
- func (s *PostgresStore) GetUsageSummary(ctx context.Context, filter AnalyticsFilter) (*UsageSummary, error)
- func (s *PostgresStore) QueryTraces(ctx context.Context, filter TraceFilter) (*TraceResult, error)
- func (s *PostgresStore) WriteBatch(ctx context.Context, traces []*Trace) error
- func (s *PostgresStore) WriteTrace(ctx context.Context, trace *Trace) error
- type SQLiteStore
- func (s *SQLiteStore) Close() error
- func (s *SQLiteStore) GetCostSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]CostPoint, error)
- func (s *SQLiteStore) GetCostSummary(ctx context.Context, filter AnalyticsFilter) (*CostSummary, error)
- func (s *SQLiteStore) GetErrorRateBreakdown(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]ErrorRateStats, error)
- func (s *SQLiteStore) GetKeyStats(ctx context.Context, filter AnalyticsFilter) ([]KeyStats, error)
- func (s *SQLiteStore) GetLatencyPercentiles(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]LatencyStats, error)
- func (s *SQLiteStore) GetModelStats(ctx context.Context, filter AnalyticsFilter) ([]ModelStats, error)
- func (s *SQLiteStore) GetTrace(ctx context.Context, id string) (*Trace, error)
- func (s *SQLiteStore) GetUsageSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]UsagePoint, error)
- func (s *SQLiteStore) GetUsageSummary(ctx context.Context, filter AnalyticsFilter) (*UsageSummary, error)
- func (s *SQLiteStore) QueryTraces(ctx context.Context, filter TraceFilter) (*TraceResult, error)
- func (s *SQLiteStore) WriteBatch(ctx context.Context, traces []*Trace) error
- func (s *SQLiteStore) WriteTrace(ctx context.Context, trace *Trace) error
- type Trace
- type TraceFilter
- type TracePipelineDiagnostics
- type TracePipelineDiagnosticsReader
- type TraceResult
- type TraceStore
- type UsagePoint
- type UsageSummary
- type WriteFailure
- type WriteFailureHandler
- type Writer
- func (w *Writer) Enqueue(t *Trace) bool
- func (w *Writer) QueueCap() int
- func (w *Writer) QueueLen() int
- func (w *Writer) SetMetrics(m *WriterMetrics)
- func (w *Writer) SetWriteFailureHandler(handler WriteFailureHandler)
- func (w *Writer) Shutdown(ctx context.Context) error
- func (w *Writer) Start(ctx context.Context)
- func (w *Writer) Stop()
- func (w *Writer) TracePipelineDiagnostics() TracePipelineDiagnostics
- type WriterMetrics
Constants ¶
const ( WriteErrorClassConnection = "connection" WriteErrorClassTimeout = "timeout" WriteErrorClassContention = "contention" WriteErrorClassConstraint = "constraint" WriteErrorClassUnknown = "unknown" )
Error class constants for trace write failure classification.
const ( TraceQueuePressureOK = "ok" TraceQueuePressureElevated = "elevated" TraceQueuePressureHigh = "high" TraceQueuePressureSaturated = "saturated" )
Variables ¶
var ErrInvalidCursor = errors.New("trace cursor is invalid")
var ErrNotFound = errors.New("trace store record not found")
var ErrNotImplemented = errors.New("trace store method not implemented")
Functions ¶
func ClassifyWriteError ¶ added in v1.1.8
ClassifyWriteError maps a trace write error to one of the defined error classes so operators can alert and dashboard on failure categories rather than opaque Go type names.
func CoerceInt64 ¶ added in v1.1.7
CoerceInt64 converts a loosely-typed value to int64, handling float64, float32, int, int64, int32, json.Number, and string representations.
func DecodeMetadataMap ¶ added in v1.1.7
DecodeMetadataMap decodes a JSON metadata string into a generic map. Returns nil for empty input or JSON parse errors.
func MetadataBool ¶ added in v1.1.7
MetadataBool extracts a boolean value from a metadata map key, handling native bools and "true"/"false" strings.
func MetadataInt64 ¶ added in v1.1.7
MetadataInt64 extracts an int64 value from a metadata map key.
func MetadataString ¶ added in v1.1.7
MetadataString extracts a trimmed string value from a metadata map.
func OrderTime ¶ added in v1.1.7
OrderTime returns the canonical ordering timestamp for a trace, preferring CreatedAt over Timestamp.
func SortLineageTraces ¶ added in v1.1.7
func SortLineageTraces(items []*Trace)
SortLineageTraces orders traces using lineage checkpoint metadata when present, then falls back to deterministic timestamp/id ordering.
Types ¶
type AnalyticsFilter ¶
type CostSummary ¶
type CostSummary struct {
TotalCostUSD float64
}
type ErrorRateStats ¶ added in v1.1.8
type LatencyStats ¶ added in v1.1.8
type ModelStats ¶
type PostgresStore ¶
type PostgresStore struct {
DSN string
// contains filtered or unexported fields
}
func NewPostgresStore ¶
func NewPostgresStore(dsn string) (*PostgresStore, error)
func (*PostgresStore) Close ¶
func (s *PostgresStore) Close() error
func (*PostgresStore) GetCostSeries ¶
func (s *PostgresStore) GetCostSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]CostPoint, error)
func (*PostgresStore) GetCostSummary ¶
func (s *PostgresStore) GetCostSummary(ctx context.Context, filter AnalyticsFilter) (*CostSummary, error)
func (*PostgresStore) GetErrorRateBreakdown ¶ added in v1.1.8
func (s *PostgresStore) GetErrorRateBreakdown(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]ErrorRateStats, error)
func (*PostgresStore) GetKeyStats ¶
func (s *PostgresStore) GetKeyStats(ctx context.Context, filter AnalyticsFilter) ([]KeyStats, error)
func (*PostgresStore) GetLatencyPercentiles ¶ added in v1.1.8
func (s *PostgresStore) GetLatencyPercentiles(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]LatencyStats, error)
func (*PostgresStore) GetModelStats ¶
func (s *PostgresStore) GetModelStats(ctx context.Context, filter AnalyticsFilter) ([]ModelStats, error)
func (*PostgresStore) GetUsageSeries ¶
func (s *PostgresStore) GetUsageSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]UsagePoint, error)
func (*PostgresStore) GetUsageSummary ¶
func (s *PostgresStore) GetUsageSummary(ctx context.Context, filter AnalyticsFilter) (*UsageSummary, error)
func (*PostgresStore) QueryTraces ¶
func (s *PostgresStore) QueryTraces(ctx context.Context, filter TraceFilter) (*TraceResult, error)
func (*PostgresStore) WriteBatch ¶
func (s *PostgresStore) WriteBatch(ctx context.Context, traces []*Trace) error
func (*PostgresStore) WriteTrace ¶
func (s *PostgresStore) WriteTrace(ctx context.Context, trace *Trace) error
type SQLiteStore ¶
type SQLiteStore struct {
Path string
// contains filtered or unexported fields
}
func NewSQLiteStore ¶
func NewSQLiteStore(path string) (*SQLiteStore, error)
func (*SQLiteStore) Close ¶
func (s *SQLiteStore) Close() error
func (*SQLiteStore) GetCostSeries ¶
func (s *SQLiteStore) GetCostSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]CostPoint, error)
func (*SQLiteStore) GetCostSummary ¶
func (s *SQLiteStore) GetCostSummary(ctx context.Context, filter AnalyticsFilter) (*CostSummary, error)
func (*SQLiteStore) GetErrorRateBreakdown ¶ added in v1.1.8
func (s *SQLiteStore) GetErrorRateBreakdown(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]ErrorRateStats, error)
func (*SQLiteStore) GetKeyStats ¶
func (s *SQLiteStore) GetKeyStats(ctx context.Context, filter AnalyticsFilter) ([]KeyStats, error)
func (*SQLiteStore) GetLatencyPercentiles ¶ added in v1.1.8
func (s *SQLiteStore) GetLatencyPercentiles(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]LatencyStats, error)
func (*SQLiteStore) GetModelStats ¶
func (s *SQLiteStore) GetModelStats(ctx context.Context, filter AnalyticsFilter) ([]ModelStats, error)
func (*SQLiteStore) GetUsageSeries ¶
func (s *SQLiteStore) GetUsageSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]UsagePoint, error)
func (*SQLiteStore) GetUsageSummary ¶
func (s *SQLiteStore) GetUsageSummary(ctx context.Context, filter AnalyticsFilter) (*UsageSummary, error)
func (*SQLiteStore) QueryTraces ¶
func (s *SQLiteStore) QueryTraces(ctx context.Context, filter TraceFilter) (*TraceResult, error)
func (*SQLiteStore) WriteBatch ¶
func (s *SQLiteStore) WriteBatch(ctx context.Context, traces []*Trace) error
func (*SQLiteStore) WriteTrace ¶
func (s *SQLiteStore) WriteTrace(ctx context.Context, trace *Trace) error
type Trace ¶
type Trace struct {
ID string
TraceGroupID string
OrgID string
WorkspaceID string
Timestamp time.Time
Provider string
Model string
RequestMethod string
RequestPath string
RequestHeaders string
RequestBody string
ResponseStatus int
ResponseHeaders string
ResponseBody string
InputTokens int
OutputTokens int
TotalTokens int
LatencyMS int64
TimeToFirstTokenMS int64
TimeToFirstTokenUS int64
APIKeyHash string
GatewayKeyID string
EstimatedCostUSD float64
Metadata string
CreatedAt time.Time
}
type TraceFilter ¶
type TracePipelineDiagnostics ¶ added in v1.1.7
type TracePipelineDiagnostics struct {
QueueCapacity int `json:"queue_capacity"`
QueueDepth int `json:"queue_depth"`
QueueDepthHighWatermark int `json:"queue_depth_high_watermark"`
QueueUtilizationPct int `json:"queue_utilization_pct"`
QueueHighWatermarkUtilizationPct int `json:"queue_high_watermark_utilization_pct"`
QueuePressureState string `json:"queue_pressure_state"`
QueueHighWatermarkPressureState string `json:"queue_high_watermark_pressure_state"`
EnqueueAcceptedTotal int64 `json:"enqueue_accepted_total"`
EnqueueDroppedTotal int64 `json:"enqueue_dropped_total"`
WriteDroppedTotal int64 `json:"write_dropped_total"`
TotalDroppedTotal int64 `json:"total_dropped_total"`
LastEnqueueDropAt *time.Time `json:"last_enqueue_drop_at,omitempty"`
LastWriteDropAt *time.Time `json:"last_write_drop_at,omitempty"`
LastWriteDropOperation string `json:"last_write_drop_operation,omitempty"`
WriteFailuresByClass map[string]int64 `json:"write_failures_by_class,omitempty"`
StoreDriver string `json:"store_driver,omitempty"`
}
TracePipelineDiagnostics captures trace pipeline queue pressure and drop signals.
type TracePipelineDiagnosticsReader ¶ added in v1.1.7
type TracePipelineDiagnosticsReader interface {
TracePipelineDiagnostics() TracePipelineDiagnostics
}
TracePipelineDiagnosticsReader exposes runtime queue/drop diagnostics.
type TraceResult ¶
type TraceStore ¶
type TraceStore interface {
WriteTrace(ctx context.Context, trace *Trace) error
WriteBatch(ctx context.Context, traces []*Trace) error
GetTrace(ctx context.Context, id string) (*Trace, error)
QueryTraces(ctx context.Context, filter TraceFilter) (*TraceResult, error)
GetUsageSummary(ctx context.Context, filter AnalyticsFilter) (*UsageSummary, error)
GetUsageSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]UsagePoint, error)
GetCostSummary(ctx context.Context, filter AnalyticsFilter) (*CostSummary, error)
GetCostSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]CostPoint, error)
GetModelStats(ctx context.Context, filter AnalyticsFilter) ([]ModelStats, error)
GetKeyStats(ctx context.Context, filter AnalyticsFilter) ([]KeyStats, error)
GetLatencyPercentiles(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]LatencyStats, error)
GetErrorRateBreakdown(ctx context.Context, filter AnalyticsFilter, groupBy string) ([]ErrorRateStats, error)
}
type UsagePoint ¶
type UsageSummary ¶
type WriteFailure ¶
type WriteFailure struct {
Operation string
BatchSize int
FailedCount int
Err error
ErrorClass string
}
WriteFailure describes trace records that could not be persisted.
type WriteFailureHandler ¶
type WriteFailureHandler func(WriteFailure)
WriteFailureHandler receives asynchronous trace write failure signals.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
func NewWriter ¶
func NewWriter(store TraceStore, bufferSize int) *Writer
func (*Writer) QueueLen ¶ added in v1.1.8
QueueLen returns the current number of items waiting in the write queue.
func (*Writer) SetMetrics ¶ added in v1.1.8
func (w *Writer) SetMetrics(m *WriterMetrics)
SetMetrics replaces the metric callbacks used by the writer pipeline.
func (*Writer) SetWriteFailureHandler ¶
func (w *Writer) SetWriteFailureHandler(handler WriteFailureHandler)
SetWriteFailureHandler replaces the callback used for dropped trace write signals.
func (*Writer) TracePipelineDiagnostics ¶ added in v1.1.7
func (w *Writer) TracePipelineDiagnostics() TracePipelineDiagnostics
TracePipelineDiagnostics returns a point-in-time snapshot of queue pressure and dropped-trace counters for operator diagnostics.
type WriterMetrics ¶ added in v1.1.8
type WriterMetrics struct {
// OnEnqueue is called each time a trace is successfully placed on the queue.
OnEnqueue func()
// OnDrop is called each time a trace is dropped because the queue is full.
OnDrop func()
// OnFlush is called after each batch is flushed to storage.
OnFlush func(batchSize int, duration time.Duration)
// OnWriteStart is called before each storage write. It returns an end
// function that the writer calls after the write completes (with error or nil).
OnWriteStart func(batchSize int) func(error)
// OnWriteSuccess is called after traces are successfully persisted to storage.
// The count parameter indicates how many traces were written.
OnWriteSuccess func(count int)
}
WriterMetrics holds optional callbacks the Writer invokes at key pipeline points.