trace

package
v1.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package trace defines trace models plus storage and async writer components.

Index

Constants

View Source
const (
	TraceQueuePressureOK        = "ok"
	TraceQueuePressureElevated  = "elevated"
	TraceQueuePressureHigh      = "high"
	TraceQueuePressureSaturated = "saturated"
)

Variables

View Source
var ErrInvalidCursor = errors.New("trace cursor is invalid")
View Source
var ErrNotFound = errors.New("trace store record not found")
View Source
var ErrNotImplemented = errors.New("trace store method not implemented")

Functions

func CoerceInt64 added in v1.1.7

func CoerceInt64(value any) (int64, bool)

CoerceInt64 converts a loosely-typed value to int64, handling float64, float32, int, int64, int32, json.Number, and string representations.

func DecodeMetadataMap added in v1.1.7

func DecodeMetadataMap(raw string) map[string]any

DecodeMetadataMap decodes a JSON metadata string into a generic map. Returns nil for empty input or JSON parse errors.

func MetadataBool added in v1.1.7

func MetadataBool(metadata map[string]any, key string) (bool, bool)

MetadataBool extracts a boolean value from a metadata map key, handling native bools and "true"/"false" strings.

func MetadataInt64 added in v1.1.7

func MetadataInt64(metadata map[string]any, key string) (int64, bool)

MetadataInt64 extracts an int64 value from a metadata map key.

func MetadataString added in v1.1.7

func MetadataString(metadata map[string]any, key string) string

MetadataString extracts a trimmed string value from a metadata map.

func OrderTime added in v1.1.7

func OrderTime(item *Trace) time.Time

OrderTime returns the canonical ordering timestamp for a trace, preferring CreatedAt over Timestamp.

func SortLineageTraces added in v1.1.7

func SortLineageTraces(items []*Trace)

SortLineageTraces orders traces using lineage checkpoint metadata when present, then falls back to deterministic timestamp/id ordering.

Types

type AnalyticsFilter

type AnalyticsFilter struct {
	OrgID        string
	WorkspaceID  string
	GatewayKeyID string
	Provider     string
	Model        string
	From         time.Time
	To           time.Time
}

type CostPoint

type CostPoint struct {
	BucketStart  time.Time
	Group        string
	TotalCostUSD float64
}

type CostSummary

type CostSummary struct {
	TotalCostUSD float64
}

type KeyStats

type KeyStats struct {
	APIKeyHash   string
	RequestCount int64
	TotalTokens  int64
	TotalCostUSD float64
	LastActiveAt time.Time
}

type ModelStats

type ModelStats struct {
	Model        string
	RequestCount int64
	AvgLatencyMS float64
	AvgTTFTMS    float64
	TotalTokens  int64
	TotalCostUSD float64
}

type PostgresStore

type PostgresStore struct {
	DSN string
	// contains filtered or unexported fields
}

func NewPostgresStore

func NewPostgresStore(dsn string) (*PostgresStore, error)

func (*PostgresStore) Close

func (s *PostgresStore) Close() error

func (*PostgresStore) GetCostSeries

func (s *PostgresStore) GetCostSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]CostPoint, error)

func (*PostgresStore) GetCostSummary

func (s *PostgresStore) GetCostSummary(ctx context.Context, filter AnalyticsFilter) (*CostSummary, error)

func (*PostgresStore) GetKeyStats

func (s *PostgresStore) GetKeyStats(ctx context.Context, filter AnalyticsFilter) ([]KeyStats, error)

func (*PostgresStore) GetModelStats

func (s *PostgresStore) GetModelStats(ctx context.Context, filter AnalyticsFilter) ([]ModelStats, error)

func (*PostgresStore) GetTrace

func (s *PostgresStore) GetTrace(ctx context.Context, id string) (*Trace, error)

func (*PostgresStore) GetUsageSeries

func (s *PostgresStore) GetUsageSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]UsagePoint, error)

func (*PostgresStore) GetUsageSummary

func (s *PostgresStore) GetUsageSummary(ctx context.Context, filter AnalyticsFilter) (*UsageSummary, error)

func (*PostgresStore) QueryTraces

func (s *PostgresStore) QueryTraces(ctx context.Context, filter TraceFilter) (*TraceResult, error)

func (*PostgresStore) WriteBatch

func (s *PostgresStore) WriteBatch(ctx context.Context, traces []*Trace) error

func (*PostgresStore) WriteTrace

func (s *PostgresStore) WriteTrace(ctx context.Context, trace *Trace) error

type SQLiteStore

type SQLiteStore struct {
	Path string
	// contains filtered or unexported fields
}

func NewSQLiteStore

func NewSQLiteStore(path string) (*SQLiteStore, error)

func (*SQLiteStore) Close

func (s *SQLiteStore) Close() error

func (*SQLiteStore) GetCostSeries

func (s *SQLiteStore) GetCostSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]CostPoint, error)

func (*SQLiteStore) GetCostSummary

func (s *SQLiteStore) GetCostSummary(ctx context.Context, filter AnalyticsFilter) (*CostSummary, error)

func (*SQLiteStore) GetKeyStats

func (s *SQLiteStore) GetKeyStats(ctx context.Context, filter AnalyticsFilter) ([]KeyStats, error)

func (*SQLiteStore) GetModelStats

func (s *SQLiteStore) GetModelStats(ctx context.Context, filter AnalyticsFilter) ([]ModelStats, error)

func (*SQLiteStore) GetTrace

func (s *SQLiteStore) GetTrace(ctx context.Context, id string) (*Trace, error)

func (*SQLiteStore) GetUsageSeries

func (s *SQLiteStore) GetUsageSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]UsagePoint, error)

func (*SQLiteStore) GetUsageSummary

func (s *SQLiteStore) GetUsageSummary(ctx context.Context, filter AnalyticsFilter) (*UsageSummary, error)

func (*SQLiteStore) QueryTraces

func (s *SQLiteStore) QueryTraces(ctx context.Context, filter TraceFilter) (*TraceResult, error)

func (*SQLiteStore) WriteBatch

func (s *SQLiteStore) WriteBatch(ctx context.Context, traces []*Trace) error

func (*SQLiteStore) WriteTrace

func (s *SQLiteStore) WriteTrace(ctx context.Context, trace *Trace) error

type Trace

type Trace struct {
	ID                 string
	TraceGroupID       string
	OrgID              string
	WorkspaceID        string
	Timestamp          time.Time
	Provider           string
	Model              string
	RequestMethod      string
	RequestPath        string
	RequestHeaders     string
	RequestBody        string
	ResponseStatus     int
	ResponseHeaders    string
	ResponseBody       string
	InputTokens        int
	OutputTokens       int
	TotalTokens        int
	LatencyMS          int64
	TimeToFirstTokenMS int64
	TimeToFirstTokenUS int64
	APIKeyHash         string
	GatewayKeyID       string
	EstimatedCostUSD   float64
	Metadata           string
	CreatedAt          time.Time
}

type TraceFilter

type TraceFilter struct {
	OrgID        string
	WorkspaceID  string
	TraceGroupID string
	ThreadID     string
	RunID        string
	Provider     string
	Model        string
	APIKeyHash   string
	StatusCode   int
	MinTokens    int
	MaxTokens    int
	From         time.Time
	To           time.Time
	Limit        int
	Cursor       string
}

type TracePipelineDiagnostics added in v1.1.7

type TracePipelineDiagnostics struct {
	QueueCapacity                    int        `json:"queue_capacity"`
	QueueDepth                       int        `json:"queue_depth"`
	QueueDepthHighWatermark          int        `json:"queue_depth_high_watermark"`
	QueueUtilizationPct              int        `json:"queue_utilization_pct"`
	QueueHighWatermarkUtilizationPct int        `json:"queue_high_watermark_utilization_pct"`
	QueuePressureState               string     `json:"queue_pressure_state"`
	QueueHighWatermarkPressureState  string     `json:"queue_high_watermark_pressure_state"`
	EnqueueAcceptedTotal             int64      `json:"enqueue_accepted_total"`
	EnqueueDroppedTotal              int64      `json:"enqueue_dropped_total"`
	WriteDroppedTotal                int64      `json:"write_dropped_total"`
	TotalDroppedTotal                int64      `json:"total_dropped_total"`
	LastEnqueueDropAt                *time.Time `json:"last_enqueue_drop_at,omitempty"`
	LastWriteDropAt                  *time.Time `json:"last_write_drop_at,omitempty"`
	LastWriteDropOperation           string     `json:"last_write_drop_operation,omitempty"`
}

TracePipelineDiagnostics captures trace pipeline queue pressure and drop signals.

type TracePipelineDiagnosticsReader added in v1.1.7

type TracePipelineDiagnosticsReader interface {
	TracePipelineDiagnostics() TracePipelineDiagnostics
}

TracePipelineDiagnosticsReader exposes runtime queue/drop diagnostics.

type TraceResult

type TraceResult struct {
	Items      []*Trace
	NextCursor string
}

type TraceStore

type TraceStore interface {
	WriteTrace(ctx context.Context, trace *Trace) error
	WriteBatch(ctx context.Context, traces []*Trace) error
	GetTrace(ctx context.Context, id string) (*Trace, error)
	QueryTraces(ctx context.Context, filter TraceFilter) (*TraceResult, error)
	GetUsageSummary(ctx context.Context, filter AnalyticsFilter) (*UsageSummary, error)
	GetUsageSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]UsagePoint, error)
	GetCostSummary(ctx context.Context, filter AnalyticsFilter) (*CostSummary, error)
	GetCostSeries(ctx context.Context, filter AnalyticsFilter, groupBy, bucket string) ([]CostPoint, error)
	GetModelStats(ctx context.Context, filter AnalyticsFilter) ([]ModelStats, error)
	GetKeyStats(ctx context.Context, filter AnalyticsFilter) ([]KeyStats, error)
}

type UsagePoint

type UsagePoint struct {
	BucketStart  time.Time
	Group        string
	InputTokens  int64
	OutputTokens int64
	TotalTokens  int64
}

type UsageSummary

type UsageSummary struct {
	TotalInputTokens  int64
	TotalOutputTokens int64
	TotalTokens       int64
}

type WriteFailure

type WriteFailure struct {
	Operation   string
	BatchSize   int
	FailedCount int
	Err         error
}

WriteFailure describes trace records that could not be persisted.

type WriteFailureHandler

type WriteFailureHandler func(WriteFailure)

WriteFailureHandler receives asynchronous trace write failure signals.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(store TraceStore, bufferSize int) *Writer

func (*Writer) Enqueue

func (w *Writer) Enqueue(t *Trace) bool

func (*Writer) SetWriteFailureHandler

func (w *Writer) SetWriteFailureHandler(handler WriteFailureHandler)

SetWriteFailureHandler replaces the callback used for dropped trace write signals.

func (*Writer) Shutdown

func (w *Writer) Shutdown(ctx context.Context) error

func (*Writer) Start

func (w *Writer) Start(ctx context.Context)

func (*Writer) Stop

func (w *Writer) Stop()

func (*Writer) TracePipelineDiagnostics added in v1.1.7

func (w *Writer) TracePipelineDiagnostics() TracePipelineDiagnostics

TracePipelineDiagnostics returns a point-in-time snapshot of queue pressure and dropped-trace counters for operator diagnostics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL