Documentation
¶
Overview ¶
Package metrics provides a centralized metrics collection system for ai-provider-kit. It includes the DefaultMetricsCollector implementation, streaming metrics wrapper, cost calculation interface, and histogram for percentile calculations.
Example (BasicUsage) ¶
Example demonstrating basic usage of DefaultMetricsCollector
package main
import (
"context"
"fmt"
"time"
"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)
func main() {
// Create a new metrics collector
collector := metrics.NewDefaultMetricsCollector()
defer func() { _ = collector.Close() }()
ctx := context.Background()
// Record a request
_ = collector.RecordEvent(ctx, types.MetricEvent{
Type: types.MetricEventRequest,
ProviderName: "openai-prod",
ProviderType: types.ProviderTypeOpenAI,
ModelID: "gpt-4",
Timestamp: time.Now(),
})
// Record a successful response
_ = collector.RecordEvent(ctx, types.MetricEvent{
Type: types.MetricEventSuccess,
ProviderName: "openai-prod",
ProviderType: types.ProviderTypeOpenAI,
ModelID: "gpt-4",
Timestamp: time.Now(),
Latency: 150 * time.Millisecond,
TokensUsed: 200,
InputTokens: 50,
OutputTokens: 150,
})
// Get snapshot
snapshot := collector.GetSnapshot()
fmt.Printf("Total requests: %d\n", snapshot.TotalRequests)
fmt.Printf("Success rate: %.2f%%\n", snapshot.SuccessRate*100)
fmt.Printf("Total tokens: %d\n", snapshot.Tokens.TotalTokens)
}
Output: Total requests: 1 Success rate: 100.00% Total tokens: 200
Example (BatchEvents) ¶
Example demonstrating batch event recording
package main
import (
"context"
"fmt"
"time"
"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)
func main() {
collector := metrics.NewDefaultMetricsCollector()
defer func() { _ = collector.Close() }()
ctx := context.Background()
// Create multiple events
events := []types.MetricEvent{
{
Type: types.MetricEventRequest,
ProviderName: "batch-provider",
ProviderType: types.ProviderTypeOpenAI,
Timestamp: time.Now(),
},
{
Type: types.MetricEventSuccess,
ProviderName: "batch-provider",
ProviderType: types.ProviderTypeOpenAI,
Timestamp: time.Now(),
Latency: 100 * time.Millisecond,
},
}
// Record batch
err := collector.RecordEvents(ctx, events)
if err == nil {
fmt.Printf("Recorded %d events\n", len(events))
}
}
Output: Recorded 2 events
Example (ErrorMetrics) ¶
Example demonstrating error metrics tracking
package main
import (
"context"
"fmt"
"time"
"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)
func main() {
collector := metrics.NewDefaultMetricsCollector()
defer func() { _ = collector.Close() }()
ctx := context.Background()
// Record different types of errors
errors := []struct {
eventType types.MetricEventType
errorType string
status int
}{
{types.MetricEventError, "rate_limit", 429},
{types.MetricEventTimeout, "timeout", 0},
{types.MetricEventError, "authentication", 401},
{types.MetricEventError, "server_error", 500},
}
for _, e := range errors {
_ = collector.RecordEvent(ctx, types.MetricEvent{
Type: e.eventType,
ProviderName: "test-provider",
ProviderType: types.ProviderTypeOpenAI,
Timestamp: time.Now(),
ErrorType: e.errorType,
ErrorMessage: fmt.Sprintf("Error: %s", e.errorType),
StatusCode: e.status,
})
}
// Get error metrics
snapshot := collector.GetSnapshot()
fmt.Printf("Total errors: %d\n", snapshot.Errors.TotalErrors)
fmt.Printf("Rate limit errors: %d\n", snapshot.Errors.RateLimitErrors)
fmt.Printf("Timeout errors: %d\n", snapshot.Errors.TimeoutErrors)
fmt.Printf("Server errors: %d\n", snapshot.Errors.ServerErrors)
}
Output: Total errors: 4 Rate limit errors: 0 Timeout errors: 1 Server errors: 1
Example (FilteredSubscription) ¶
Example demonstrating filtered subscriptions
package main
import (
"context"
"fmt"
"time"
"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)
func main() {
collector := metrics.NewDefaultMetricsCollector()
defer func() { _ = collector.Close() }()
ctx := context.Background()
// Create a filtered subscription (only errors from specific provider)
filter := types.MetricFilter{
ProviderNames: []string{"critical-provider"},
EventTypes: []types.MetricEventType{types.MetricEventError, types.MetricEventRateLimit},
}
sub := collector.SubscribeFiltered(50, filter)
defer sub.Unsubscribe()
// Start a goroutine to handle critical errors
go func() {
for event := range sub.Events() {
fmt.Printf("ALERT: Error in critical provider: %s\n", event.ErrorMessage)
}
}()
// Record various events
_ = collector.RecordEvent(ctx, types.MetricEvent{
Type: types.MetricEventSuccess,
ProviderName: "critical-provider",
ProviderType: types.ProviderTypeOpenAI,
Timestamp: time.Now(),
})
_ = collector.RecordEvent(ctx, types.MetricEvent{
Type: types.MetricEventError,
ProviderName: "critical-provider",
ProviderType: types.ProviderTypeOpenAI,
Timestamp: time.Now(),
ErrorMessage: "Service temporarily unavailable",
})
time.Sleep(10 * time.Millisecond)
}
Output:
Example (Hooks) ¶
Example demonstrating hooks for synchronous event handling
package main
import (
"context"
"fmt"
"time"
"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)
func main() {
collector := metrics.NewDefaultMetricsCollector()
defer func() { _ = collector.Close() }()
ctx := context.Background()
// Create a custom hook
hook := &alertHook{threshold: 3}
hookID := collector.RegisterHook(hook)
defer collector.UnregisterHook(hookID)
// Record some error events
for i := 0; i < 5; i++ {
_ = collector.RecordEvent(ctx, types.MetricEvent{
Type: types.MetricEventError,
ProviderName: "test-provider",
ProviderType: types.ProviderTypeOpenAI,
Timestamp: time.Now(),
ErrorMessage: fmt.Sprintf("Error %d", i+1),
})
}
time.Sleep(10 * time.Millisecond)
}
// alertHook is a custom hook that alerts when consecutive errors exceed a threshold
type alertHook struct {
threshold int
consecutiveErrors int
}
func (h *alertHook) OnEvent(ctx context.Context, event types.MetricEvent) {
if event.Type.IsError() {
h.consecutiveErrors++
if h.consecutiveErrors >= h.threshold {
fmt.Printf("ALERT: %d consecutive errors detected!\n", h.consecutiveErrors)
}
} else {
h.consecutiveErrors = 0
}
}
func (h *alertHook) Name() string {
return "alert-hook"
}
func (h *alertHook) Filter() *types.MetricFilter {
return nil
}
Output:
Example (JsonSerialization) ¶
Example demonstrating JSON serialization of metrics
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)
func main() {
collector := metrics.NewDefaultMetricsCollector()
defer func() { _ = collector.Close() }()
ctx := context.Background()
// Record some events
_ = collector.RecordEvent(ctx, types.MetricEvent{
Type: types.MetricEventSuccess,
ProviderName: "test-provider",
ProviderType: types.ProviderTypeOpenAI,
ModelID: "gpt-4",
Timestamp: time.Now(),
Latency: 100 * time.Millisecond,
TokensUsed: 150,
})
// Get snapshot and serialize to JSON
snapshot := collector.GetSnapshot()
jsonData, err := json.MarshalIndent(snapshot, "", " ")
if err == nil {
fmt.Printf("JSON output length: %d bytes\n", len(jsonData))
// In production, you could send this to a monitoring system
}
}
Output:
Example (ModelMetrics) ¶
Example demonstrating per-model metrics
package main
import (
"context"
"fmt"
"time"
"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)
func main() {
collector := metrics.NewDefaultMetricsCollector()
defer func() { _ = collector.Close() }()
ctx := context.Background()
// Record events for different models
models := []string{"gpt-4", "gpt-3.5-turbo", "claude-3-opus"}
for _, model := range models {
_ = collector.RecordEvent(ctx, types.MetricEvent{
Type: types.MetricEventSuccess,
ProviderName: "test-provider",
ProviderType: types.ProviderTypeOpenAI,
ModelID: model,
Timestamp: time.Now(),
TokensUsed: 1000,
InputTokens: 200,
OutputTokens: 800,
})
}
// Get metrics for each model
for _, model := range models {
metrics := collector.GetModelMetrics(model)
if metrics != nil {
fmt.Printf("%s: %d total tokens, avg tokens/request: %.0f\n",
model,
metrics.Tokens.TotalTokens,
metrics.AverageTokensPerRequest,
)
}
}
}
Output:
Example (ProviderMetrics) ¶
Example demonstrating per-provider metrics
package main
import (
"context"
"fmt"
"time"
"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)
func main() {
collector := metrics.NewDefaultMetricsCollector()
defer func() { _ = collector.Close() }()
ctx := context.Background()
// Record events for multiple providers
providers := []string{"openai-prod", "anthropic-prod", "azure-prod"}
for _, provider := range providers {
for i := 0; i < 10; i++ {
_ = collector.RecordEvent(ctx, types.MetricEvent{
Type: types.MetricEventSuccess,
ProviderName: provider,
ProviderType: types.ProviderTypeOpenAI,
Timestamp: time.Now(),
Latency: time.Duration(50+i*10) * time.Millisecond,
})
}
}
// Get metrics for each provider
for _, provider := range providers {
metrics := collector.GetProviderMetrics(provider)
if metrics != nil {
fmt.Printf("%s: %d requests, avg latency: %v\n",
provider,
metrics.TotalRequests,
metrics.Latency.AverageLatency,
)
}
}
}
Output:
Example (Reset) ¶
Example demonstrating reset functionality
package main
import (
"context"
"fmt"
"time"
"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)
func main() {
collector := metrics.NewDefaultMetricsCollector()
defer func() { _ = collector.Close() }()
ctx := context.Background()
// Record some events
for i := 0; i < 100; i++ {
_ = collector.RecordEvent(ctx, types.MetricEvent{
Type: types.MetricEventRequest,
ProviderName: "test-provider",
ProviderType: types.ProviderTypeOpenAI,
Timestamp: time.Now(),
})
}
fmt.Printf("Before reset: %d requests\n", collector.GetSnapshot().TotalRequests)
// Reset metrics
collector.Reset()
fmt.Printf("After reset: %d requests\n", collector.GetSnapshot().TotalRequests)
}
Output: Before reset: 100 requests After reset: 0 requests
Example (StreamingMetrics) ¶
Example demonstrating streaming metrics
package main
import (
"context"
"fmt"
"time"
"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)
func main() {
collector := metrics.NewDefaultMetricsCollector()
defer func() { _ = collector.Close() }()
ctx := context.Background()
// Record streaming start
_ = collector.RecordEvent(ctx, types.MetricEvent{
Type: types.MetricEventStreamStart,
ProviderName: "openai-stream",
ProviderType: types.ProviderTypeOpenAI,
ModelID: "gpt-4",
Timestamp: time.Now(),
IsStreaming: true,
TimeToFirstToken: 50 * time.Millisecond,
})
// Record streaming end
_ = collector.RecordEvent(ctx, types.MetricEvent{
Type: types.MetricEventStreamEnd,
ProviderName: "openai-stream",
ProviderType: types.ProviderTypeOpenAI,
ModelID: "gpt-4",
Timestamp: time.Now(),
IsStreaming: true,
TokensUsed: 150,
Latency: 2 * time.Second,
TokensPerSecond: 75.0,
})
// Get snapshot with streaming metrics
snapshot := collector.GetSnapshot()
if snapshot.Streaming != nil {
fmt.Printf("Streaming requests: %d\n", snapshot.Streaming.TotalStreamRequests)
fmt.Printf("Avg TTFT: %v\n", snapshot.Streaming.TimeToFirstToken.AverageTTFT)
fmt.Printf("Avg tokens/second: %.2f\n", snapshot.Streaming.AverageTokensPerSecond)
}
}
Output:
Example (Subscription) ¶
Example demonstrating subscription to metrics events
package main
import (
"context"
"fmt"
"time"
"github.com/cecil-the-coder/ai-provider-kit/pkg/metrics"
"github.com/cecil-the-coder/ai-provider-kit/pkg/types"
)
func main() {
collector := metrics.NewDefaultMetricsCollector()
defer func() { _ = collector.Close() }()
ctx := context.Background()
// Create a subscription
sub := collector.Subscribe(100)
defer sub.Unsubscribe()
// Start a goroutine to process events
go func() {
for event := range sub.Events() {
fmt.Printf("Received event: %s from %s\n", event.Type, event.ProviderName)
}
}()
// Record some events
_ = collector.RecordEvent(ctx, types.MetricEvent{
Type: types.MetricEventSuccess,
ProviderName: "test-provider",
ProviderType: types.ProviderTypeOpenAI,
Timestamp: time.Now(),
})
// Give time for event processing
time.Sleep(10 * time.Millisecond)
}
Output:
Index ¶
- type Cost
- type CostCalculator
- type DefaultMetricsCollector
- func (c *DefaultMetricsCollector) Close() error
- func (c *DefaultMetricsCollector) GetModelIDs() []string
- func (c *DefaultMetricsCollector) GetModelMetrics(modelID string) *types.ModelMetricsSnapshot
- func (c *DefaultMetricsCollector) GetProviderMetrics(providerName string) *types.ProviderMetricsSnapshot
- func (c *DefaultMetricsCollector) GetProviderNames() []string
- func (c *DefaultMetricsCollector) GetSnapshot() types.MetricsSnapshot
- func (c *DefaultMetricsCollector) RecordEvent(ctx context.Context, event types.MetricEvent) error
- func (c *DefaultMetricsCollector) RecordEvents(ctx context.Context, events []types.MetricEvent) error
- func (c *DefaultMetricsCollector) RegisterHook(hook types.MetricsHook) types.HookID
- func (c *DefaultMetricsCollector) Reset()
- func (c *DefaultMetricsCollector) Subscribe(bufferSize int) types.MetricsSubscription
- func (c *DefaultMetricsCollector) SubscribeFiltered(bufferSize int, filter types.MetricFilter) types.MetricsSubscription
- func (c *DefaultMetricsCollector) UnregisterHook(id types.HookID)
- type Histogram
- type MetricsStreamWrapper
- type MetricsStreamWrapperConfig
- type NullCostCalculator
- type StreamMetrics
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Cost ¶
type Cost struct {
InputCost float64 `json:"input_cost"`
OutputCost float64 `json:"output_cost"`
TotalCost float64 `json:"total_cost"`
Currency string `json:"currency"`
}
Cost represents the calculated cost of an AI request
type CostCalculator ¶
type CostCalculator interface {
// CalculateCost returns the cost for a request
CalculateCost(provider, model string, inputTokens, outputTokens int64) Cost
// GetPricing returns pricing info (for display/debugging)
// Returns inputPer1K, outputPer1K prices and ok=true if pricing exists
GetPricing(provider, model string) (inputPer1K, outputPer1K float64, ok bool)
}
CostCalculator calculates costs for AI requests.
Example usage:
// Define your own cost calculator with your pricing
type MyCostCalculator struct {
// your pricing data
}
func (c *MyCostCalculator) CalculateCost(provider, model string, inputTokens, outputTokens int64) Cost {
// your pricing logic
return Cost{
InputCost: float64(inputTokens) * inputPricePerToken,
OutputCost: float64(outputTokens) * outputPricePerToken,
TotalCost: totalCost,
Currency: "USD",
}
}
func (c *MyCostCalculator) GetPricing(provider, model string) (inputPer1K, outputPer1K float64, ok bool) {
// return your pricing per 1K tokens
}
// Use it with the collector
costCalc := &MyCostCalculator{}
collector := NewDefaultMetricsCollector(costCalc)
CostCalculator calculates costs for AI requests. This is a pluggable interface - consumers provide their own implementation with their specific pricing models.
type DefaultMetricsCollector ¶
type DefaultMetricsCollector struct {
// contains filtered or unexported fields
}
DefaultMetricsCollector is the default implementation of types.MetricsCollector. It provides thread-safe metrics collection with support for subscriptions and hooks.
func NewDefaultMetricsCollector ¶
func NewDefaultMetricsCollector(costCalculator ...CostCalculator) *DefaultMetricsCollector
NewDefaultMetricsCollector creates a new DefaultMetricsCollector instance. If no CostCalculator is provided, a NullCostCalculator will be used.
func (*DefaultMetricsCollector) Close ¶
func (c *DefaultMetricsCollector) Close() error
Close shuts down the collector
func (*DefaultMetricsCollector) GetModelIDs ¶
func (c *DefaultMetricsCollector) GetModelIDs() []string
GetModelIDs returns a sorted list of all model IDs
func (*DefaultMetricsCollector) GetModelMetrics ¶
func (c *DefaultMetricsCollector) GetModelMetrics(modelID string) *types.ModelMetricsSnapshot
GetModelMetrics returns metrics for a specific model
func (*DefaultMetricsCollector) GetProviderMetrics ¶
func (c *DefaultMetricsCollector) GetProviderMetrics(providerName string) *types.ProviderMetricsSnapshot
GetProviderMetrics returns metrics for a specific provider
func (*DefaultMetricsCollector) GetProviderNames ¶
func (c *DefaultMetricsCollector) GetProviderNames() []string
GetProviderNames returns a sorted list of all provider names
func (*DefaultMetricsCollector) GetSnapshot ¶
func (c *DefaultMetricsCollector) GetSnapshot() types.MetricsSnapshot
GetSnapshot returns a complete snapshot of all metrics
func (*DefaultMetricsCollector) RecordEvent ¶
func (c *DefaultMetricsCollector) RecordEvent(ctx context.Context, event types.MetricEvent) error
RecordEvent records a single metrics event
func (*DefaultMetricsCollector) RecordEvents ¶
func (c *DefaultMetricsCollector) RecordEvents(ctx context.Context, events []types.MetricEvent) error
RecordEvents records multiple events in a batch
func (*DefaultMetricsCollector) RegisterHook ¶
func (c *DefaultMetricsCollector) RegisterHook(hook types.MetricsHook) types.HookID
RegisterHook registers a hook and returns its ID
func (*DefaultMetricsCollector) Reset ¶
func (c *DefaultMetricsCollector) Reset()
Reset clears all metrics data
func (*DefaultMetricsCollector) Subscribe ¶
func (c *DefaultMetricsCollector) Subscribe(bufferSize int) types.MetricsSubscription
Subscribe creates a new subscription with the given buffer size
func (*DefaultMetricsCollector) SubscribeFiltered ¶
func (c *DefaultMetricsCollector) SubscribeFiltered(bufferSize int, filter types.MetricFilter) types.MetricsSubscription
SubscribeFiltered creates a filtered subscription
func (*DefaultMetricsCollector) UnregisterHook ¶
func (c *DefaultMetricsCollector) UnregisterHook(id types.HookID)
UnregisterHook removes a hook
type Histogram ¶
type Histogram struct {
// contains filtered or unexported fields
}
Histogram is a simple circular buffer for storing latency samples and calculating percentiles
func NewHistogram ¶
NewHistogram creates a new histogram with the given sample size
func (*Histogram) GetLatencyMetrics ¶
func (h *Histogram) GetLatencyMetrics() types.LatencyMetrics
GetLatencyMetrics returns the current latency metrics including percentiles
type MetricsStreamWrapper ¶
type MetricsStreamWrapper struct {
// contains filtered or unexported fields
}
MetricsStreamWrapper wraps a ChatCompletionStream and tracks detailed streaming metrics. It measures:
- TimeToFirstToken (TTFT): Time from stream start to first chunk
- TokensPerSecond: Output throughput
- ChunksReceived: Total SSE chunks received
- StreamDuration: Total time from first Next() to Close()
- StreamInterruptions: Connection drops that recovered
The wrapper emits MetricEvents to the MetricsCollector at key points:
- MetricEventStreamStart: When first Next() is called (includes TTFT when first chunk arrives)
- MetricEventStreamChunk: For each chunk received (optional, can be disabled for performance)
- MetricEventStreamEnd: When stream completes successfully (includes tokens/sec)
- MetricEventStreamAbort: If error occurs during streaming
Thread-safe: Safe for concurrent use by multiple goroutines.
func NewMetricsStreamWrapper ¶
func NewMetricsStreamWrapper(config MetricsStreamWrapperConfig) (*MetricsStreamWrapper, error)
NewMetricsStreamWrapper creates a new MetricsStreamWrapper with the given configuration.
func (*MetricsStreamWrapper) Close ¶
func (w *MetricsStreamWrapper) Close() error
Close closes the wrapped stream and records final metrics.
func (*MetricsStreamWrapper) GetMetrics ¶
func (w *MetricsStreamWrapper) GetMetrics() StreamMetrics
GetMetrics returns a snapshot of the current streaming metrics.
func (*MetricsStreamWrapper) Next ¶
func (w *MetricsStreamWrapper) Next() (types.ChatCompletionChunk, error)
Next returns the next chunk from the stream and tracks metrics. On the first call, it records the stream start time and emits MetricEventStreamStart. On each subsequent call, it tracks chunks and optionally emits MetricEventStreamChunk.
type MetricsStreamWrapperConfig ¶
type MetricsStreamWrapperConfig struct {
// Stream to wrap (required)
Stream types.ChatCompletionStream
// Metrics collector (required)
Collector types.MetricsCollector
// Context for event recording (required)
Context context.Context
// Provider name (required)
ProviderName string
// Provider type (required)
ProviderType types.ProviderType
// Model ID (required)
ModelID string
// Session ID for correlating stream events (optional, generated if not provided)
SessionID string
// EmitChunkEvents enables per-chunk event emission (default: false)
// WARNING: This can generate high event volume for streams with many chunks
EmitChunkEvents bool
}
MetricsStreamWrapperConfig configures the MetricsStreamWrapper.
type NullCostCalculator ¶
type NullCostCalculator struct{}
NullCostCalculator is a no-op cost calculator that returns zero costs. Use this when no pricing configuration is available.
func NewNullCostCalculator ¶
func NewNullCostCalculator() *NullCostCalculator
NewNullCostCalculator creates a new NullCostCalculator
func (*NullCostCalculator) CalculateCost ¶
func (n *NullCostCalculator) CalculateCost(provider, model string, inputTokens, outputTokens int64) Cost
CalculateCost returns zero cost
func (*NullCostCalculator) GetPricing ¶
func (n *NullCostCalculator) GetPricing(provider, model string) (inputPer1K, outputPer1K float64, ok bool)
GetPricing returns ok=false indicating no pricing is available