metrics

package
v1.0.49 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2025 License: MIT Imports: 7 Imported by: 0

README

Metrics Package

The metrics package provides a complete implementation of the MetricsCollector interface for collecting, aggregating, and distributing metrics across all providers in ai-provider-kit.

Overview

The DefaultMetricsCollector is a thread-safe, high-performance metrics collector that supports:

  • Snapshot API: Poll metrics at any time with GetSnapshot()
  • Event Streaming: Subscribe to real-time metrics events via channels
  • Synchronous Hooks: Register callbacks for immediate event processing
  • Per-Provider Metrics: Track metrics separately for each provider
  • Per-Model Metrics: Track metrics separately for each model
  • Percentile Tracking: Calculate P50, P75, P90, P95, P99 latency percentiles
  • Streaming Metrics: Track Time-to-First-Token (TTFT) and throughput
  • Error Categorization: Detailed error tracking and categorization

Architecture

Files
  1. collector.go - Main DefaultMetricsCollector implementation

    • Implements all methods from MetricsCollector interface
    • Uses sync.RWMutex for thread-safety
    • Uses atomic operations for hot path counters
    • Manages subscriptions and hooks
  2. subscription.go - MetricsSubscription implementation

    • Non-blocking channel delivery with overflow tracking
    • Filter support for selective event delivery
    • Clean unsubscription and resource cleanup
  3. histogram.go - Percentile calculation helper

    • Circular buffer for latency samples
    • Efficient percentile calculation (P50, P75, P90, P95, P99)
    • Configurable sample size (default 1000)

Usage

Basic Usage
import "github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"

// Create a new collector
collector := metrics.NewDefaultMetricsCollector()
defer collector.Close()

// Record events
ctx := context.Background()
collector.RecordEvent(ctx, types.MetricEvent{
    Type:         types.MetricEventSuccess,
    ProviderName: "openai-prod",
    ProviderType: types.ProviderTypeOpenAI,
    ModelID:      "gpt-4",
    Timestamp:    time.Now(),
    Latency:      150 * time.Millisecond,
    TokensUsed:   200,
})

// Get snapshot
snapshot := collector.GetSnapshot()
fmt.Printf("Total requests: %d\n", snapshot.TotalRequests)
fmt.Printf("Success rate: %.2f%%\n", snapshot.SuccessRate*100)
Polling Pattern (Snapshots)

Get metrics on-demand:

// Get complete snapshot
snapshot := collector.GetSnapshot()

// Get provider-specific metrics
providerMetrics := collector.GetProviderMetrics("openai-prod")

// Get model-specific metrics
modelMetrics := collector.GetModelMetrics("gpt-4")

// List all providers and models
providers := collector.GetProviderNames()
models := collector.GetModelIDs()
Streaming Pattern (Subscriptions)

Subscribe to real-time events:

// Create subscription
sub := collector.Subscribe(1000) // Buffer size
defer sub.Unsubscribe()

// Process events
go func() {
    for event := range sub.Events() {
        fmt.Printf("Event: %s from %s\n", event.Type, event.ProviderName)
    }
}()

// Check for overflow
if sub.OverflowCount() > 0 {
    fmt.Printf("Warning: %d events dropped\n", sub.OverflowCount())
}
Filtered Subscriptions

Only receive specific events:

filter := types.MetricFilter{
    ProviderNames: []string{"critical-provider"},
    EventTypes:    []types.MetricEventType{
        types.MetricEventError,
        types.MetricEventRateLimit,
    },
}
sub := collector.SubscribeFiltered(500, filter)
defer sub.Unsubscribe()

// Only receive errors from critical-provider
for event := range sub.Events() {
    alert(event)
}
Synchronous Hooks

Register callbacks for immediate processing:

type AlertHook struct {
    threshold int
    count     int
}

func (h *AlertHook) OnEvent(ctx context.Context, event types.MetricEvent) {
    if event.Type.IsError() {
        h.count++
        if h.count >= h.threshold {
            sendAlert("Too many errors!")
        }
    }
}

func (h *AlertHook) Name() string { return "alert-hook" }
func (h *AlertHook) Filter() *types.MetricFilter { return nil }

// Register hook
hook := &AlertHook{threshold: 5}
hookID := collector.RegisterHook(hook)
defer collector.UnregisterHook(hookID)

Features

Thread Safety

All methods are safe for concurrent use:

  • Uses sync.RWMutex for map access
  • Uses atomic.Int64 for counters
  • Lock-free operations for hot paths
Memory Efficiency
  • Fixed-size circular buffers for histograms (default 1000 samples)
  • Non-blocking channel sends with overflow tracking
  • Deep copies for snapshots to avoid data races
Performance
  • Atomic operations for request counters (no locks)
  • Read-write locks for read-heavy workloads
  • Efficient percentile calculation using sorted samples
  • Minimal allocations in hot paths
Error Handling

Comprehensive error tracking:

  • By error type (rate_limit, timeout, authentication, etc.)
  • By HTTP status code
  • By provider and model
  • Consecutive error tracking
  • Last error tracking
Streaming Metrics

Track streaming-specific metrics:

  • Time to First Token (TTFT) with percentiles
  • Tokens per second (throughput)
  • Stream duration
  • Chunk statistics
  • Success/failure rates
Percentile Tracking

Calculate latency percentiles efficiently:

  • P50 (median)
  • P75
  • P90
  • P95
  • P99

Uses a circular buffer to maintain recent samples and calculate percentiles on-demand.

Configuration

Histogram Sample Size

Control memory usage vs. percentile accuracy:

// Default: 1000 samples
histogram := NewHistogram(1000)

// Higher accuracy (more memory)
histogram := NewHistogram(10000)

// Lower memory (less accurate)
histogram := NewHistogram(100)
Subscription Buffer Size

Choose based on event rate:

// Low frequency (< 10 events/sec)
sub := collector.Subscribe(100)

// Medium frequency (10-100 events/sec)
sub := collector.Subscribe(1000)

// High frequency (> 100 events/sec)
sub := collector.Subscribe(10000)

Best Practices

1. Always Close Resources
collector := metrics.NewDefaultMetricsCollector()
defer collector.Close()

sub := collector.Subscribe(100)
defer sub.Unsubscribe()
2. Monitor Overflow
if count := sub.OverflowCount(); count > 0 {
    log.Warnf("Subscription buffer overflow: %d events dropped", count)
    // Consider increasing buffer size or optimizing processing
}
3. Use Filters for Efficiency
// Instead of filtering in code:
for event := range sub.Events() {
    if event.Type == types.MetricEventError {
        process(event)
    }
}

// Use filtered subscription:
filter := types.MetricFilter{
    EventTypes: []types.MetricEventType{types.MetricEventError},
}
sub := collector.SubscribeFiltered(100, filter)
for event := range sub.Events() {
    process(event)
}
4. Keep Hooks Fast
// Bad: Slow hook blocks event processing
func (h *SlowHook) OnEvent(ctx context.Context, event types.MetricEvent) {
    sendHTTPRequest(event) // Blocks!
}

// Good: Fast hook spawns async work
func (h *FastHook) OnEvent(ctx context.Context, event types.MetricEvent) {
    select {
    case h.workQueue <- event:
    default:
        // Queue full, drop event
    }
}
5. Use Batch Recording for Efficiency
// Instead of:
for _, event := range events {
    collector.RecordEvent(ctx, event)
}

// Use:
collector.RecordEvents(ctx, events)

Testing

The package includes comprehensive tests:

# Run all tests
go test ./pkg/metrics/...

# Run with coverage
go test -cover ./pkg/metrics/...

# Run with race detector
go test -race ./pkg/metrics/...

# Run benchmarks
go test -bench=. ./pkg/metrics/...

Integration

Provider Integration

Providers should record events during operations:

// Record request start
collector.RecordEvent(ctx, types.MetricEvent{
    Type:         types.MetricEventRequest,
    ProviderName: p.name,
    ProviderType: p.providerType,
    ModelID:      req.Model,
    Timestamp:    time.Now(),
})

// Record success
collector.RecordEvent(ctx, types.MetricEvent{
    Type:         types.MetricEventSuccess,
    ProviderName: p.name,
    ProviderType: p.providerType,
    ModelID:      req.Model,
    Timestamp:    time.Now(),
    Latency:      time.Since(startTime),
    TokensUsed:   resp.Usage.TotalTokens,
    InputTokens:  resp.Usage.PromptTokens,
    OutputTokens: resp.Usage.CompletionTokens,
})
Export to Monitoring Systems
// Export to Prometheus, Datadog, etc.
go func() {
    ticker := time.NewTicker(60 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        snapshot := collector.GetSnapshot()
        exportToMonitoring(snapshot)
    }
}()

License

MIT License - See LICENSE file for details

Documentation

Overview

Package metrics provides a centralized metrics collection system for ai-provider-kit. It includes the DefaultMetricsCollector implementation, streaming metrics wrapper, cost calculation interface, and histogram for percentile calculations.

Example (BasicUsage)

Example demonstrating basic usage of DefaultMetricsCollector

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
	"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)

func main() {
	// Create a new metrics collector
	collector := metrics.NewDefaultMetricsCollector()
	defer func() { _ = collector.Close() }()

	ctx := context.Background()

	// Record a request
	_ = collector.RecordEvent(ctx, types.MetricEvent{
		Type:         types.MetricEventRequest,
		ProviderName: "openai-prod",
		ProviderType: types.ProviderTypeOpenAI,
		ModelID:      "gpt-4",
		Timestamp:    time.Now(),
	})

	// Record a successful response
	_ = collector.RecordEvent(ctx, types.MetricEvent{
		Type:         types.MetricEventSuccess,
		ProviderName: "openai-prod",
		ProviderType: types.ProviderTypeOpenAI,
		ModelID:      "gpt-4",
		Timestamp:    time.Now(),
		Latency:      150 * time.Millisecond,
		TokensUsed:   200,
		InputTokens:  50,
		OutputTokens: 150,
	})

	// Get snapshot
	snapshot := collector.GetSnapshot()
	fmt.Printf("Total requests: %d\n", snapshot.TotalRequests)
	fmt.Printf("Success rate: %.2f%%\n", snapshot.SuccessRate*100)
	fmt.Printf("Total tokens: %d\n", snapshot.Tokens.TotalTokens)

}
Output:
Total requests: 1
Success rate: 100.00%
Total tokens: 200
Example (BatchEvents)

Example demonstrating batch event recording

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
	"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)

func main() {
	collector := metrics.NewDefaultMetricsCollector()
	defer func() { _ = collector.Close() }()

	ctx := context.Background()

	// Create multiple events
	events := []types.MetricEvent{
		{
			Type:         types.MetricEventRequest,
			ProviderName: "batch-provider",
			ProviderType: types.ProviderTypeOpenAI,
			Timestamp:    time.Now(),
		},
		{
			Type:         types.MetricEventSuccess,
			ProviderName: "batch-provider",
			ProviderType: types.ProviderTypeOpenAI,
			Timestamp:    time.Now(),
			Latency:      100 * time.Millisecond,
		},
	}

	// Record batch
	err := collector.RecordEvents(ctx, events)
	if err == nil {
		fmt.Printf("Recorded %d events\n", len(events))
	}

}
Output:
Recorded 2 events
Example (ErrorMetrics)

Example demonstrating error metrics tracking

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
	"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)

func main() {
	collector := metrics.NewDefaultMetricsCollector()
	defer func() { _ = collector.Close() }()

	ctx := context.Background()

	// Record different types of errors
	errors := []struct {
		eventType types.MetricEventType
		errorType string
		status    int
	}{
		{types.MetricEventError, "rate_limit", 429},
		{types.MetricEventTimeout, "timeout", 0},
		{types.MetricEventError, "authentication", 401},
		{types.MetricEventError, "server_error", 500},
	}

	for _, e := range errors {
		_ = collector.RecordEvent(ctx, types.MetricEvent{
			Type:         e.eventType,
			ProviderName: "test-provider",
			ProviderType: types.ProviderTypeOpenAI,
			Timestamp:    time.Now(),
			ErrorType:    e.errorType,
			ErrorMessage: fmt.Sprintf("Error: %s", e.errorType),
			StatusCode:   e.status,
		})
	}

	// Get error metrics
	snapshot := collector.GetSnapshot()
	fmt.Printf("Total errors: %d\n", snapshot.Errors.TotalErrors)
	fmt.Printf("Rate limit errors: %d\n", snapshot.Errors.RateLimitErrors)
	fmt.Printf("Timeout errors: %d\n", snapshot.Errors.TimeoutErrors)
	fmt.Printf("Server errors: %d\n", snapshot.Errors.ServerErrors)

}
Output:
Total errors: 4
Rate limit errors: 0
Timeout errors: 1
Server errors: 1
Example (FilteredSubscription)

Example demonstrating filtered subscriptions

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
	"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)

func main() {
	collector := metrics.NewDefaultMetricsCollector()
	defer func() { _ = collector.Close() }()

	ctx := context.Background()

	// Create a filtered subscription (only errors from specific provider)
	filter := types.MetricFilter{
		ProviderNames: []string{"critical-provider"},
		EventTypes:    []types.MetricEventType{types.MetricEventError, types.MetricEventRateLimit},
	}
	sub := collector.SubscribeFiltered(50, filter)
	defer sub.Unsubscribe()

	// Start a goroutine to handle critical errors
	go func() {
		for event := range sub.Events() {
			fmt.Printf("ALERT: Error in critical provider: %s\n", event.ErrorMessage)
		}
	}()

	// Record various events
	_ = collector.RecordEvent(ctx, types.MetricEvent{
		Type:         types.MetricEventSuccess,
		ProviderName: "critical-provider",
		ProviderType: types.ProviderTypeOpenAI,
		Timestamp:    time.Now(),
	})

	_ = collector.RecordEvent(ctx, types.MetricEvent{
		Type:         types.MetricEventError,
		ProviderName: "critical-provider",
		ProviderType: types.ProviderTypeOpenAI,
		Timestamp:    time.Now(),
		ErrorMessage: "Service temporarily unavailable",
	})

	time.Sleep(10 * time.Millisecond)
}
Example (Hooks)

Example demonstrating hooks for synchronous event handling

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
	"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)

func main() {
	collector := metrics.NewDefaultMetricsCollector()
	defer func() { _ = collector.Close() }()

	ctx := context.Background()

	// Create a custom hook
	hook := &alertHook{threshold: 3}
	hookID := collector.RegisterHook(hook)
	defer collector.UnregisterHook(hookID)

	// Record some error events
	for i := 0; i < 5; i++ {
		_ = collector.RecordEvent(ctx, types.MetricEvent{
			Type:         types.MetricEventError,
			ProviderName: "test-provider",
			ProviderType: types.ProviderTypeOpenAI,
			Timestamp:    time.Now(),
			ErrorMessage: fmt.Sprintf("Error %d", i+1),
		})
	}

	time.Sleep(10 * time.Millisecond)
}

// alertHook is a custom hook that alerts when consecutive errors exceed a threshold
type alertHook struct {
	threshold         int
	consecutiveErrors int
}

func (h *alertHook) OnEvent(ctx context.Context, event types.MetricEvent) {
	if event.Type.IsError() {
		h.consecutiveErrors++
		if h.consecutiveErrors >= h.threshold {
			fmt.Printf("ALERT: %d consecutive errors detected!\n", h.consecutiveErrors)
		}
	} else {
		h.consecutiveErrors = 0
	}
}

func (h *alertHook) Name() string {
	return "alert-hook"
}

func (h *alertHook) Filter() *types.MetricFilter {
	return nil
}
Example (JsonSerialization)

Example demonstrating JSON serialization of metrics

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
	"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)

func main() {
	collector := metrics.NewDefaultMetricsCollector()
	defer func() { _ = collector.Close() }()

	ctx := context.Background()

	// Record some events
	_ = collector.RecordEvent(ctx, types.MetricEvent{
		Type:         types.MetricEventSuccess,
		ProviderName: "test-provider",
		ProviderType: types.ProviderTypeOpenAI,
		ModelID:      "gpt-4",
		Timestamp:    time.Now(),
		Latency:      100 * time.Millisecond,
		TokensUsed:   150,
	})

	// Get snapshot and serialize to JSON
	snapshot := collector.GetSnapshot()
	jsonData, err := json.MarshalIndent(snapshot, "", "  ")
	if err == nil {
		fmt.Printf("JSON output length: %d bytes\n", len(jsonData))
		// In production, you could send this to a monitoring system
	}
}
Example (ModelMetrics)

Example demonstrating per-model metrics

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
	"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)

func main() {
	collector := metrics.NewDefaultMetricsCollector()
	defer func() { _ = collector.Close() }()

	ctx := context.Background()

	// Record events for different models
	models := []string{"gpt-4", "gpt-3.5-turbo", "claude-3-opus"}
	for _, model := range models {
		_ = collector.RecordEvent(ctx, types.MetricEvent{
			Type:         types.MetricEventSuccess,
			ProviderName: "test-provider",
			ProviderType: types.ProviderTypeOpenAI,
			ModelID:      model,
			Timestamp:    time.Now(),
			TokensUsed:   1000,
			InputTokens:  200,
			OutputTokens: 800,
		})
	}

	// Get metrics for each model
	for _, model := range models {
		metrics := collector.GetModelMetrics(model)
		if metrics != nil {
			fmt.Printf("%s: %d total tokens, avg tokens/request: %.0f\n",
				model,
				metrics.Tokens.TotalTokens,
				metrics.AverageTokensPerRequest,
			)
		}
	}
}
Example (ProviderMetrics)

Example demonstrating per-provider metrics

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
	"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)

func main() {
	collector := metrics.NewDefaultMetricsCollector()
	defer func() { _ = collector.Close() }()

	ctx := context.Background()

	// Record events for multiple providers
	providers := []string{"openai-prod", "anthropic-prod", "azure-prod"}
	for _, provider := range providers {
		for i := 0; i < 10; i++ {
			_ = collector.RecordEvent(ctx, types.MetricEvent{
				Type:         types.MetricEventSuccess,
				ProviderName: provider,
				ProviderType: types.ProviderTypeOpenAI,
				Timestamp:    time.Now(),
				Latency:      time.Duration(50+i*10) * time.Millisecond,
			})
		}
	}

	// Get metrics for each provider
	for _, provider := range providers {
		metrics := collector.GetProviderMetrics(provider)
		if metrics != nil {
			fmt.Printf("%s: %d requests, avg latency: %v\n",
				provider,
				metrics.TotalRequests,
				metrics.Latency.AverageLatency,
			)
		}
	}
}
Example (Reset)

Example demonstrating reset functionality

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
	"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)

func main() {
	collector := metrics.NewDefaultMetricsCollector()
	defer func() { _ = collector.Close() }()

	ctx := context.Background()

	// Record some events
	for i := 0; i < 100; i++ {
		_ = collector.RecordEvent(ctx, types.MetricEvent{
			Type:         types.MetricEventRequest,
			ProviderName: "test-provider",
			ProviderType: types.ProviderTypeOpenAI,
			Timestamp:    time.Now(),
		})
	}

	fmt.Printf("Before reset: %d requests\n", collector.GetSnapshot().TotalRequests)

	// Reset metrics
	collector.Reset()

	fmt.Printf("After reset: %d requests\n", collector.GetSnapshot().TotalRequests)

}
Output:
Before reset: 100 requests
After reset: 0 requests
Example (StreamingMetrics)

Example demonstrating streaming metrics

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
	"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)

func main() {
	collector := metrics.NewDefaultMetricsCollector()
	defer func() { _ = collector.Close() }()

	ctx := context.Background()

	// Record streaming start
	_ = collector.RecordEvent(ctx, types.MetricEvent{
		Type:             types.MetricEventStreamStart,
		ProviderName:     "openai-stream",
		ProviderType:     types.ProviderTypeOpenAI,
		ModelID:          "gpt-4",
		Timestamp:        time.Now(),
		IsStreaming:      true,
		TimeToFirstToken: 50 * time.Millisecond,
	})

	// Record streaming end
	_ = collector.RecordEvent(ctx, types.MetricEvent{
		Type:            types.MetricEventStreamEnd,
		ProviderName:    "openai-stream",
		ProviderType:    types.ProviderTypeOpenAI,
		ModelID:         "gpt-4",
		Timestamp:       time.Now(),
		IsStreaming:     true,
		TokensUsed:      150,
		Latency:         2 * time.Second,
		TokensPerSecond: 75.0,
	})

	// Get snapshot with streaming metrics
	snapshot := collector.GetSnapshot()
	if snapshot.Streaming != nil {
		fmt.Printf("Streaming requests: %d\n", snapshot.Streaming.TotalStreamRequests)
		fmt.Printf("Avg TTFT: %v\n", snapshot.Streaming.TimeToFirstToken.AverageTTFT)
		fmt.Printf("Avg tokens/second: %.2f\n", snapshot.Streaming.AverageTokensPerSecond)
	}
}
Example (Subscription)

Example demonstrating subscription to metrics events

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
	"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)

func main() {
	collector := metrics.NewDefaultMetricsCollector()
	defer func() { _ = collector.Close() }()

	ctx := context.Background()

	// Create a subscription
	sub := collector.Subscribe(100)
	defer sub.Unsubscribe()

	// Start a goroutine to process events
	go func() {
		for event := range sub.Events() {
			fmt.Printf("Received event: %s from %s\n", event.Type, event.ProviderName)
		}
	}()

	// Record some events
	_ = collector.RecordEvent(ctx, types.MetricEvent{
		Type:         types.MetricEventSuccess,
		ProviderName: "test-provider",
		ProviderType: types.ProviderTypeOpenAI,
		Timestamp:    time.Now(),
	})

	// Give time for event processing
	time.Sleep(10 * time.Millisecond)
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cost

type Cost struct {
	InputCost  float64 `json:"input_cost"`
	OutputCost float64 `json:"output_cost"`
	TotalCost  float64 `json:"total_cost"`
	Currency   string  `json:"currency"`
}

Cost represents the calculated cost of an AI request

type CostCalculator

type CostCalculator interface {
	// CalculateCost returns the cost for a request
	CalculateCost(provider, model string, inputTokens, outputTokens int64) Cost

	// GetPricing returns pricing info (for display/debugging)
	// Returns inputPer1K, outputPer1K prices and ok=true if pricing exists
	GetPricing(provider, model string) (inputPer1K, outputPer1K float64, ok bool)
}

CostCalculator calculates costs for AI requests.

Example usage:

// Define your own cost calculator with your pricing
type MyCostCalculator struct {
	// your pricing data
}

func (c *MyCostCalculator) CalculateCost(provider, model string, inputTokens, outputTokens int64) Cost {
	// your pricing logic
	return Cost{
		InputCost:  float64(inputTokens) * inputPricePerToken,
		OutputCost: float64(outputTokens) * outputPricePerToken,
		TotalCost:  totalCost,
		Currency:   "USD",
	}
}

func (c *MyCostCalculator) GetPricing(provider, model string) (inputPer1K, outputPer1K float64, ok bool) {
	// return your pricing per 1K tokens
}

// Use it with the collector
costCalc := &MyCostCalculator{}
collector := NewDefaultMetricsCollector(costCalc)

CostCalculator calculates costs for AI requests. This is a pluggable interface - consumers provide their own implementation with their specific pricing models.

type DefaultMetricsCollector

type DefaultMetricsCollector struct {
	// contains filtered or unexported fields
}

DefaultMetricsCollector is the default implementation of types.MetricsCollector. It provides thread-safe metrics collection with support for subscriptions and hooks.

func NewDefaultMetricsCollector

func NewDefaultMetricsCollector(costCalculator ...CostCalculator) *DefaultMetricsCollector

NewDefaultMetricsCollector creates a new DefaultMetricsCollector instance. If no CostCalculator is provided, a NullCostCalculator will be used.

func (*DefaultMetricsCollector) Close

func (c *DefaultMetricsCollector) Close() error

Close shuts down the collector

func (*DefaultMetricsCollector) GetModelIDs

func (c *DefaultMetricsCollector) GetModelIDs() []string

GetModelIDs returns a sorted list of all model IDs

func (*DefaultMetricsCollector) GetModelMetrics

func (c *DefaultMetricsCollector) GetModelMetrics(modelID string) *types.ModelMetricsSnapshot

GetModelMetrics returns metrics for a specific model

func (*DefaultMetricsCollector) GetProviderMetrics

func (c *DefaultMetricsCollector) GetProviderMetrics(providerName string) *types.ProviderMetricsSnapshot

GetProviderMetrics returns metrics for a specific provider

func (*DefaultMetricsCollector) GetProviderNames

func (c *DefaultMetricsCollector) GetProviderNames() []string

GetProviderNames returns a sorted list of all provider names

func (*DefaultMetricsCollector) GetSnapshot

GetSnapshot returns a complete snapshot of all metrics

func (*DefaultMetricsCollector) RecordEvent

func (c *DefaultMetricsCollector) RecordEvent(ctx context.Context, event types.MetricEvent) error

RecordEvent records a single metrics event

func (*DefaultMetricsCollector) RecordEvents

func (c *DefaultMetricsCollector) RecordEvents(ctx context.Context, events []types.MetricEvent) error

RecordEvents records multiple events in a batch

func (*DefaultMetricsCollector) RegisterHook

func (c *DefaultMetricsCollector) RegisterHook(hook types.MetricsHook) types.HookID

RegisterHook registers a hook and returns its ID

func (*DefaultMetricsCollector) Reset

func (c *DefaultMetricsCollector) Reset()

Reset clears all metrics data

func (*DefaultMetricsCollector) Subscribe

func (c *DefaultMetricsCollector) Subscribe(bufferSize int) types.MetricsSubscription

Subscribe creates a new subscription with the given buffer size

func (*DefaultMetricsCollector) SubscribeFiltered

func (c *DefaultMetricsCollector) SubscribeFiltered(bufferSize int, filter types.MetricFilter) types.MetricsSubscription

SubscribeFiltered creates a filtered subscription

func (*DefaultMetricsCollector) UnregisterHook

func (c *DefaultMetricsCollector) UnregisterHook(id types.HookID)

UnregisterHook removes a hook

type Histogram

type Histogram struct {
	// contains filtered or unexported fields
}

Histogram is a simple circular buffer for storing latency samples and calculating percentiles

func NewHistogram

func NewHistogram(sampleSize int) *Histogram

NewHistogram creates a new histogram with the given sample size

func (*Histogram) Add

func (h *Histogram) Add(latency time.Duration)

Add adds a latency sample to the histogram

func (*Histogram) GetLatencyMetrics

func (h *Histogram) GetLatencyMetrics() types.LatencyMetrics

GetLatencyMetrics returns the current latency metrics including percentiles

func (*Histogram) Reset

func (h *Histogram) Reset()

Reset clears all samples from the histogram

type MetricsStreamWrapper

type MetricsStreamWrapper struct {
	// contains filtered or unexported fields
}

MetricsStreamWrapper wraps a ChatCompletionStream and tracks detailed streaming metrics. It measures:

  • TimeToFirstToken (TTFT): Time from stream start to first chunk
  • TokensPerSecond: Output throughput
  • ChunksReceived: Total SSE chunks received
  • StreamDuration: Total time from first Next() to Close()
  • StreamInterruptions: Connection drops that recovered

The wrapper emits MetricEvents to the MetricsCollector at key points:

  • MetricEventStreamStart: When first Next() is called (includes TTFT when first chunk arrives)
  • MetricEventStreamChunk: For each chunk received (optional, can be disabled for performance)
  • MetricEventStreamEnd: When stream completes successfully (includes tokens/sec)
  • MetricEventStreamAbort: If error occurs during streaming

Thread-safe: Safe for concurrent use by multiple goroutines.

func NewMetricsStreamWrapper

func NewMetricsStreamWrapper(config MetricsStreamWrapperConfig) (*MetricsStreamWrapper, error)

NewMetricsStreamWrapper creates a new MetricsStreamWrapper with the given configuration.

func (*MetricsStreamWrapper) Close

func (w *MetricsStreamWrapper) Close() error

Close closes the wrapped stream and records final metrics.

func (*MetricsStreamWrapper) GetMetrics

func (w *MetricsStreamWrapper) GetMetrics() StreamMetrics

GetMetrics returns a snapshot of the current streaming metrics.

func (*MetricsStreamWrapper) Next

Next returns the next chunk from the stream and tracks metrics. On the first call, it records the stream start time and emits MetricEventStreamStart. On each subsequent call, it tracks chunks and optionally emits MetricEventStreamChunk.

type MetricsStreamWrapperConfig

type MetricsStreamWrapperConfig struct {
	// Stream to wrap (required)
	Stream types.ChatCompletionStream

	// Metrics collector (required)
	Collector types.MetricsCollector

	// Context for event recording (required)
	Context context.Context

	// Provider name (required)
	ProviderName string

	// Provider type (required)
	ProviderType types.ProviderType

	// Model ID (required)
	ModelID string

	// Session ID for correlating stream events (optional, generated if not provided)
	SessionID string

	// EmitChunkEvents enables per-chunk event emission (default: false)
	// WARNING: This can generate high event volume for streams with many chunks
	EmitChunkEvents bool
}

MetricsStreamWrapperConfig configures the MetricsStreamWrapper.

type NullCostCalculator

type NullCostCalculator struct{}

NullCostCalculator is a no-op cost calculator that returns zero costs. Use this when no pricing configuration is available.

func NewNullCostCalculator

func NewNullCostCalculator() *NullCostCalculator

NewNullCostCalculator creates a new NullCostCalculator

func (*NullCostCalculator) CalculateCost

func (n *NullCostCalculator) CalculateCost(provider, model string, inputTokens, outputTokens int64) Cost

CalculateCost returns zero cost

func (*NullCostCalculator) GetPricing

func (n *NullCostCalculator) GetPricing(provider, model string) (inputPer1K, outputPer1K float64, ok bool)

GetPricing returns ok=false indicating no pricing is available

type StreamMetrics

type StreamMetrics struct {
	TimeToFirstToken    time.Duration
	TokensPerSecond     float64
	ChunksReceived      int64
	StreamDuration      time.Duration
	StreamInterruptions int64
	TokensReceived      int64
	Aborted             bool
}

StreamMetrics contains detailed metrics about a streaming session.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL