Documentation
¶
Index ¶
- Constants
- func ComputeExponentialHistogramPercentiles(dp *metricspb.ExponentialHistogramDataPoint) map[string]float64
- func ComputeHistogramPercentiles(dp *metricspb.HistogramDataPoint) map[string]float64
- func GroupSpansByTraceID(spans []*StoredSpan) map[string][]*StoredSpan
- type ActivityCache
- func (h *ActivityCache) Clear()
- func (h *ActivityCache) Close()
- func (h *ActivityCache) Generation() uint64
- func (h *ActivityCache) LogsReceived() uint64
- func (h *ActivityCache) MetricsReceived() uint64
- func (h *ActivityCache) PeekMetrics(names []string) []*MetricPeek
- func (h *ActivityCache) RecentErrorCount() int
- func (h *ActivityCache) RecentErrors(n int) []*ErrorEntry
- func (h *ActivityCache) RecentTraces(n int) []*TraceEntry
- func (h *ActivityCache) RecordLog()
- func (h *ActivityCache) RecordMetric(stored *StoredMetric)
- func (h *ActivityCache) RecordSpan(span *StoredSpan)
- func (h *ActivityCache) SpansReceived() uint64
- func (h *ActivityCache) Subscribe() (<-chan struct{}, func())
- func (h *ActivityCache) UptimeSeconds() float64
- type AllStats
- type ErrorEntry
- type FilterOptions
- type LogStorage
- func (ls *LogStorage) Clear()
- func (ls *LogStorage) CurrentPosition() int
- func (ls *LogStorage) GetAllLogs() []*StoredLog
- func (ls *LogStorage) GetLogsByService(serviceName string) []*StoredLog
- func (ls *LogStorage) GetLogsBySeverity(severity string) []*StoredLog
- func (ls *LogStorage) GetLogsByTraceID(traceID string) []*StoredLog
- func (ls *LogStorage) GetRange(start, end int) []*StoredLog
- func (ls *LogStorage) GetRecentLogs(n int) []*StoredLog
- func (ls *LogStorage) ReceiveLogs(ctx context.Context, resourceLogs []*logspb.ResourceLogs) error
- func (ls *LogStorage) Stats() LogStorageStats
- type LogStorageStats
- type MetricPeek
- type MetricStorage
- func (ms *MetricStorage) Clear()
- func (ms *MetricStorage) CurrentPosition() int
- func (ms *MetricStorage) GetAllMetrics() []*StoredMetric
- func (ms *MetricStorage) GetMetricNames() []string
- func (ms *MetricStorage) GetMetricsByName(name string) []*StoredMetric
- func (ms *MetricStorage) GetMetricsByService(serviceName string) []*StoredMetric
- func (ms *MetricStorage) GetMetricsByType(metricType MetricType) []*StoredMetric
- func (ms *MetricStorage) GetRange(start, end int) []*StoredMetric
- func (ms *MetricStorage) GetRecentMetrics(n int) []*StoredMetric
- func (ms *MetricStorage) ReceiveMetrics(ctx context.Context, resourceMetrics []*metricspb.ResourceMetrics) error
- func (ms *MetricStorage) Stats() MetricStorageStats
- type MetricStorageStats
- type MetricType
- type ObservabilityStorage
- func (os *ObservabilityStorage) ActivityCache() *ActivityCache
- func (os *ObservabilityStorage) Clear()
- func (os *ObservabilityStorage) CreateSnapshot(name string) error
- func (os *ObservabilityStorage) GetSnapshotData(startSnapshot, endSnapshot string) (*SnapshotData, error)
- func (os *ObservabilityStorage) Logs() *LogStorage
- func (os *ObservabilityStorage) Metrics() *MetricStorage
- func (os *ObservabilityStorage) Query(filter QueryFilter) (*QueryResult, error)
- func (os *ObservabilityStorage) ReceiveLogs(ctx context.Context, resourceLogs []*logspb.ResourceLogs) error
- func (os *ObservabilityStorage) ReceiveMetrics(ctx context.Context, resourceMetrics []*metricspb.ResourceMetrics) error
- func (os *ObservabilityStorage) ReceiveSpans(ctx context.Context, resourceSpans []*tracepb.ResourceSpans) error
- func (os *ObservabilityStorage) Services() []string
- func (os *ObservabilityStorage) Snapshots() *SnapshotManager
- func (os *ObservabilityStorage) Stats() AllStats
- func (os *ObservabilityStorage) Traces() *TraceStorage
- type QueryFilter
- type QueryResult
- type RingBuffer
- func (rb *RingBuffer[T]) Add(item T)
- func (rb *RingBuffer[T]) Capacity() int
- func (rb *RingBuffer[T]) Clear()
- func (rb *RingBuffer[T]) CurrentPosition() int
- func (rb *RingBuffer[T]) GetAll() []T
- func (rb *RingBuffer[T]) GetRange(start, end int) []T
- func (rb *RingBuffer[T]) GetRecent(n int) []T
- func (rb *RingBuffer[T]) Size() int
- type Snapshot
- type SnapshotData
- type SnapshotDataSummary
- type SnapshotManager
- func (sm *SnapshotManager) Clear()
- func (sm *SnapshotManager) Count() int
- func (sm *SnapshotManager) Create(name string, tracePos, logPos, metricPos int) error
- func (sm *SnapshotManager) Delete(name string) error
- func (sm *SnapshotManager) Get(name string) (*Snapshot, error)
- func (sm *SnapshotManager) List() []string
- type StorageStats
- type StoredLog
- type StoredMetric
- type StoredSpan
- func FilterSpans(spans []*StoredSpan, opts FilterOptions) []*StoredSpan
- func FilterSpansByName(spans []*StoredSpan, spanName string) []*StoredSpan
- func FilterSpansByService(spans []*StoredSpan, service string) []*StoredSpan
- func FilterSpansByTraceID(spans []*StoredSpan, traceID string) []*StoredSpan
- type TimeRange
- type TraceEntry
- type TraceStorage
- func (ts *TraceStorage) Clear()
- func (ts *TraceStorage) CurrentPosition() int
- func (ts *TraceStorage) GetAllSpans() []*StoredSpan
- func (ts *TraceStorage) GetRange(start, end int) []*StoredSpan
- func (ts *TraceStorage) GetRecentSpans(n int) []*StoredSpan
- func (ts *TraceStorage) GetSpansByName(spanName string) []*StoredSpan
- func (ts *TraceStorage) GetSpansByService(serviceName string) []*StoredSpan
- func (ts *TraceStorage) GetSpansByTraceID(traceID string) []*StoredSpan
- func (ts *TraceStorage) ReceiveSpans(ctx context.Context, resourceSpans []*tracepb.ResourceSpans) error
- func (ts *TraceStorage) Stats() StorageStats
Constants ¶
const ( // DefaultRecentErrorsCapacity is the number of recent errors to track. DefaultRecentErrorsCapacity = 100 // DefaultRecentTracesCapacity is the number of recent traces to track. DefaultRecentTracesCapacity = 50 )
Variables ¶
This section is empty.
Functions ¶
func ComputeExponentialHistogramPercentiles ¶
func ComputeExponentialHistogramPercentiles(dp *metricspb.ExponentialHistogramDataPoint) map[string]float64
ComputeExponentialHistogramPercentiles estimates percentiles from exponential histogram data. Exponential histograms use a base and scale to define bucket boundaries. Returns nil if the histogram is empty or has no buckets.
func ComputeHistogramPercentiles ¶
func ComputeHistogramPercentiles(dp *metricspb.HistogramDataPoint) map[string]float64
ComputeHistogramPercentiles estimates percentiles from OTLP histogram bucket data. Uses linear interpolation within buckets (standard approach used by Prometheus). Returns nil if bucket data is not available or the histogram is empty.
func GroupSpansByTraceID ¶
func GroupSpansByTraceID(spans []*StoredSpan) map[string][]*StoredSpan
GroupSpansByTraceID groups spans by their trace ID. Returns a map of trace ID to spans.
Types ¶
type ActivityCache ¶
type ActivityCache struct {
// contains filtered or unexported fields
}
ActivityCache provides fast access to telemetry data for frequent polling. All counters use atomic operations for lock-free reads.
func NewActivityCache ¶
func NewActivityCache() *ActivityCache
NewActivityCache creates a new activity cache.
func (*ActivityCache) Close ¶
func (h *ActivityCache) Close()
Close shuts down the activity cache by closing all subscriber channels. After Close, no further notifications will be sent.
func (*ActivityCache) Generation ¶
func (h *ActivityCache) Generation() uint64
Generation returns the current generation counter.
func (*ActivityCache) LogsReceived ¶
func (h *ActivityCache) LogsReceived() uint64
LogsReceived returns the total number of logs received.
func (*ActivityCache) MetricsReceived ¶
func (h *ActivityCache) MetricsReceived() uint64
MetricsReceived returns the total number of metrics received.
func (*ActivityCache) PeekMetrics ¶
func (h *ActivityCache) PeekMetrics(names []string) []*MetricPeek
PeekMetrics returns the current values for the specified metric names. Returns only metrics that exist in the cache. Limit to MaxMetricPeek names.
func (*ActivityCache) RecentErrorCount ¶
func (h *ActivityCache) RecentErrorCount() int
RecentErrorCount returns the number of recent errors tracked.
func (*ActivityCache) RecentErrors ¶
func (h *ActivityCache) RecentErrors(n int) []*ErrorEntry
RecentErrors returns the N most recent errors.
func (*ActivityCache) RecentTraces ¶
func (h *ActivityCache) RecentTraces(n int) []*TraceEntry
RecentTraces returns the N most recent traces.
func (*ActivityCache) RecordLog ¶
func (h *ActivityCache) RecordLog()
RecordLog records a log entry for activity tracking.
func (*ActivityCache) RecordMetric ¶
func (h *ActivityCache) RecordMetric(stored *StoredMetric)
RecordMetric records a metric for activity tracking.
func (*ActivityCache) RecordSpan ¶
func (h *ActivityCache) RecordSpan(span *StoredSpan)
RecordSpan records a span for activity tracking. Called from ObservabilityStorage when spans are received.
func (*ActivityCache) SpansReceived ¶
func (h *ActivityCache) SpansReceived() uint64
SpansReceived returns the total number of spans received.
func (*ActivityCache) Subscribe ¶
func (h *ActivityCache) Subscribe() (<-chan struct{}, func())
Subscribe returns a notification channel and an unsubscribe function. The channel receives a signal (non-blocking) whenever new telemetry arrives. The channel is buffered with capacity 1 to coalesce rapid updates.
func (*ActivityCache) UptimeSeconds ¶
func (h *ActivityCache) UptimeSeconds() float64
UptimeSeconds returns the uptime in seconds.
type AllStats ¶
type AllStats struct {
Traces StorageStats `json:"traces"`
Logs LogStorageStats `json:"logs"`
Metrics MetricStorageStats `json:"metrics"`
Snapshots int `json:"snapshot_count"`
}
AllStats returns comprehensive statistics across all signal types.
type ErrorEntry ¶
type ErrorEntry struct {
TraceID string
SpanID string
Service string
SpanName string
ErrorMsg string
Timestamp uint64 // Unix nano
}
ErrorEntry captures minimal error info for activity tracking.
type FilterOptions ¶
type FilterOptions struct {
TraceID string
Service string
SpanName string
Severity string
MinTime int64
MaxTime int64
}
FilterOptions contains options for filtering telemetry data.
type LogStorage ¶
type LogStorage struct {
// contains filtered or unexported fields
}
LogStorage stores OTLP log records without content indexes. Queries use position-based ranges with in-memory filtering.
func NewLogStorage ¶
func NewLogStorage(capacity int) *LogStorage
NewLogStorage creates a new log storage with the specified capacity.
func (*LogStorage) CurrentPosition ¶
func (ls *LogStorage) CurrentPosition() int
CurrentPosition returns the current write position. Used by snapshots to bookmark a point in time.
func (*LogStorage) GetAllLogs ¶
func (ls *LogStorage) GetAllLogs() []*StoredLog
GetAllLogs returns all stored logs in chronological order.
func (*LogStorage) GetLogsByService ¶
func (ls *LogStorage) GetLogsByService(serviceName string) []*StoredLog
GetLogsByService returns all logs for a given service. This performs an in-memory scan.
func (*LogStorage) GetLogsBySeverity ¶
func (ls *LogStorage) GetLogsBySeverity(severity string) []*StoredLog
GetLogsBySeverity returns all logs matching a severity level. This performs an in-memory scan.
func (*LogStorage) GetLogsByTraceID ¶
func (ls *LogStorage) GetLogsByTraceID(traceID string) []*StoredLog
GetLogsByTraceID returns all currently stored logs for a given trace ID. This performs an in-memory scan.
func (*LogStorage) GetRange ¶
func (ls *LogStorage) GetRange(start, end int) []*StoredLog
GetRange returns logs between start and end positions (inclusive). Positions are absolute and represent the logical sequence of logs added.
func (*LogStorage) GetRecentLogs ¶
func (ls *LogStorage) GetRecentLogs(n int) []*StoredLog
GetRecentLogs returns the N most recent logs.
func (*LogStorage) ReceiveLogs ¶
func (ls *LogStorage) ReceiveLogs(ctx context.Context, resourceLogs []*logspb.ResourceLogs) error
ReceiveLogs stores received log records.
func (*LogStorage) Stats ¶
func (ls *LogStorage) Stats() LogStorageStats
Stats returns current storage statistics.
type LogStorageStats ¶
type LogStorageStats struct {
LogCount int
Capacity int
TraceCount int
ServiceCount int
Severities map[string]int
}
LogStorageStats contains statistics about log storage.
type MetricPeek ¶
type MetricPeek struct {
Name string
Type MetricType
LastUpdated uint64
// For Gauge/Sum
Value *float64
// For Histogram
Count *uint64
Sum *float64
Min *float64
Max *float64
Percentiles map[string]float64 // p50, p95, p99
}
MetricPeek holds the current value(s) of a metric for quick access.
type MetricStorage ¶
type MetricStorage struct {
// contains filtered or unexported fields
}
MetricStorage stores OTLP metric data without content indexes. Queries use position-based ranges with in-memory filtering.
func NewMetricStorage ¶
func NewMetricStorage(capacity int) *MetricStorage
NewMetricStorage creates a new metric storage with the specified capacity.
func (*MetricStorage) CurrentPosition ¶
func (ms *MetricStorage) CurrentPosition() int
CurrentPosition returns the current write position. Used by snapshots to bookmark a point in time.
func (*MetricStorage) GetAllMetrics ¶
func (ms *MetricStorage) GetAllMetrics() []*StoredMetric
GetAllMetrics returns all stored metrics in chronological order.
func (*MetricStorage) GetMetricNames ¶
func (ms *MetricStorage) GetMetricNames() []string
GetMetricNames returns all unique metric names currently in storage.
func (*MetricStorage) GetMetricsByName ¶
func (ms *MetricStorage) GetMetricsByName(name string) []*StoredMetric
GetMetricsByName returns all currently stored metrics with the given name. This performs an in-memory scan.
func (*MetricStorage) GetMetricsByService ¶
func (ms *MetricStorage) GetMetricsByService(serviceName string) []*StoredMetric
GetMetricsByService returns all metrics for a given service. This performs an in-memory scan.
func (*MetricStorage) GetMetricsByType ¶
func (ms *MetricStorage) GetMetricsByType(metricType MetricType) []*StoredMetric
GetMetricsByType returns all metrics of a specific type. This performs an in-memory scan.
func (*MetricStorage) GetRange ¶
func (ms *MetricStorage) GetRange(start, end int) []*StoredMetric
GetRange returns metrics between start and end positions (inclusive). Positions are absolute and represent the logical sequence of metrics added.
func (*MetricStorage) GetRecentMetrics ¶
func (ms *MetricStorage) GetRecentMetrics(n int) []*StoredMetric
GetRecentMetrics returns the N most recent metrics.
func (*MetricStorage) ReceiveMetrics ¶
func (ms *MetricStorage) ReceiveMetrics(ctx context.Context, resourceMetrics []*metricspb.ResourceMetrics) error
ReceiveMetrics stores received metric data.
func (*MetricStorage) Stats ¶
func (ms *MetricStorage) Stats() MetricStorageStats
Stats returns current storage statistics.
type MetricStorageStats ¶
type MetricStorageStats struct {
MetricCount int
Capacity int
UniqueNames int
ServiceCount int
TypeCounts map[string]int
TotalDataPoints int
}
MetricStorageStats contains statistics about metric storage.
type MetricType ¶
type MetricType int
MetricType represents the type of metric.
const ( MetricTypeUnknown MetricType = iota MetricTypeGauge MetricTypeSum MetricTypeHistogram MetricTypeExponentialHistogram MetricTypeSummary )
func (MetricType) String ¶
func (mt MetricType) String() string
type ObservabilityStorage ¶
type ObservabilityStorage struct {
// contains filtered or unexported fields
}
ObservabilityStorage provides unified access to all telemetry signals (traces, logs, metrics) with snapshot support for time-based queries. This is the primary interface for MCP tools.
func NewObservabilityStorage ¶
func NewObservabilityStorage(traceCapacity, logCapacity, metricCapacity int) *ObservabilityStorage
NewObservabilityStorage creates a unified storage layer with the specified capacities.
func (*ObservabilityStorage) ActivityCache ¶
func (os *ObservabilityStorage) ActivityCache() *ActivityCache
ActivityCache returns the activity cache for fast polling.
func (*ObservabilityStorage) Clear ¶
func (os *ObservabilityStorage) Clear()
Clear removes all telemetry data AND snapshots. This is a complete reset - use sparingly. For normal cleanup, delete individual snapshots with manage_snapshots instead.
func (*ObservabilityStorage) CreateSnapshot ¶
func (os *ObservabilityStorage) CreateSnapshot(name string) error
CreateSnapshot creates a named snapshot of current buffer positions. This allows querying "what happened between snapshot A and snapshot B?"
func (*ObservabilityStorage) GetSnapshotData ¶
func (os *ObservabilityStorage) GetSnapshotData(startSnapshot, endSnapshot string) (*SnapshotData, error)
GetSnapshotData retrieves all telemetry data between two snapshots. If endSnapshot is empty, uses current positions.
func (*ObservabilityStorage) Logs ¶
func (os *ObservabilityStorage) Logs() *LogStorage
Logs returns the underlying log storage for receiver integration.
func (*ObservabilityStorage) Metrics ¶
func (os *ObservabilityStorage) Metrics() *MetricStorage
Metrics returns the underlying metric storage for receiver integration.
func (*ObservabilityStorage) Query ¶
func (os *ObservabilityStorage) Query(filter QueryFilter) (*QueryResult, error)
Query performs a multi-signal query with optional snapshot-based time range.
func (*ObservabilityStorage) ReceiveLogs ¶
func (os *ObservabilityStorage) ReceiveLogs(ctx context.Context, resourceLogs []*logspb.ResourceLogs) error
ReceiveLogs implements the logs receiver interface. It delegates storage to the underlying logs receiver and updates activity cache counters.
func (*ObservabilityStorage) ReceiveMetrics ¶
func (os *ObservabilityStorage) ReceiveMetrics(ctx context.Context, resourceMetrics []*metricspb.ResourceMetrics) error
ReceiveMetrics implements the metrics receiver interface. It stores metrics inline and updates the activity cache for fast polling.
func (*ObservabilityStorage) ReceiveSpans ¶
func (os *ObservabilityStorage) ReceiveSpans(ctx context.Context, resourceSpans []*tracepb.ResourceSpans) error
ReceiveSpans implements the trace receiver interface. It stores spans inline and updates the activity cache for fast polling.
func (*ObservabilityStorage) Services ¶
func (os *ObservabilityStorage) Services() []string
Services returns a sorted, deduplicated list of service names across all signal types.
func (*ObservabilityStorage) Snapshots ¶
func (os *ObservabilityStorage) Snapshots() *SnapshotManager
Snapshots returns the snapshot manager.
func (*ObservabilityStorage) Stats ¶
func (os *ObservabilityStorage) Stats() AllStats
Stats returns comprehensive statistics for all storage.
func (*ObservabilityStorage) Traces ¶
func (os *ObservabilityStorage) Traces() *TraceStorage
Traces returns the underlying trace storage for receiver integration.
type QueryFilter ¶
type QueryFilter struct {
// Basic filters
ServiceName string `json:"service_name,omitempty"`
TraceID string `json:"trace_id,omitempty"`
SpanName string `json:"span_name,omitempty"`
LogSeverity string `json:"log_severity,omitempty"`
MetricNames []string `json:"metric_names,omitempty"`
StartSnapshot string `json:"start_snapshot,omitempty"`
EndSnapshot string `json:"end_snapshot,omitempty"`
Limit int `json:"limit,omitempty"` // 0 = no limit
// Status filters
ErrorsOnly bool `json:"errors_only,omitempty"`
SpanStatus string `json:"span_status,omitempty"` // "OK", "ERROR", "UNSET"
// Duration filters (nanoseconds)
MinDurationNs *uint64 `json:"min_duration_ns,omitempty"`
MaxDurationNs *uint64 `json:"max_duration_ns,omitempty"`
// Attribute filters
HasAttribute string `json:"has_attribute,omitempty"`
AttributeEquals map[string]string `json:"attribute_equals,omitempty"`
}
QueryFilter specifies multi-signal query criteria.
type QueryResult ¶
type QueryResult struct {
Filter QueryFilter `json:"filter"`
Traces []*StoredSpan `json:"traces"`
Logs []*StoredLog `json:"logs"`
Metrics []*StoredMetric `json:"metrics"`
Summary SnapshotDataSummary `json:"summary"`
}
QueryResult contains filtered telemetry data across all signals.
type RingBuffer ¶
RingBuffer is a generic thread-safe ring buffer that stores a fixed number of items. When the buffer is full, adding a new item overwrites the oldest item. All operations are O(1) except GetAll() which is O(n) where n is the current size.
func NewRingBuffer ¶
func NewRingBuffer[T any](capacity int) *RingBuffer[T]
NewRingBuffer creates a new ring buffer with the specified capacity. The capacity must be greater than zero.
func (*RingBuffer[T]) Add ¶
func (rb *RingBuffer[T]) Add(item T)
Add inserts an item into the ring buffer. If the buffer is at capacity, this overwrites the oldest item.
func (*RingBuffer[T]) Capacity ¶
func (rb *RingBuffer[T]) Capacity() int
Capacity returns the maximum capacity of the buffer.
func (*RingBuffer[T]) Clear ¶
func (rb *RingBuffer[T]) Clear()
Clear removes all items from the buffer.
func (*RingBuffer[T]) CurrentPosition ¶
func (rb *RingBuffer[T]) CurrentPosition() int
CurrentPosition returns the total number of items ever added to the buffer. This is a monotonically increasing value used by snapshots to bookmark a point in time. Use with GetRange to retrieve items between two positions.
func (*RingBuffer[T]) GetAll ¶
func (rb *RingBuffer[T]) GetAll() []T
GetAll returns all items in chronological order (oldest to newest). The returned slice is a copy and safe to modify.
func (*RingBuffer[T]) GetRange ¶
func (rb *RingBuffer[T]) GetRange(start, end int) []T
GetRange returns items between start and end positions (inclusive). Positions are absolute (monotonically increasing) values from CurrentPosition. Returns nil if the range is invalid, empty, or has been evicted from the buffer.
func (*RingBuffer[T]) GetRecent ¶
func (rb *RingBuffer[T]) GetRecent(n int) []T
GetRecent returns the N most recent items in chronological order. If N is greater than the current size, all items are returned.
func (*RingBuffer[T]) Size ¶
func (rb *RingBuffer[T]) Size() int
Size returns the current number of items in the buffer.
type Snapshot ¶
type Snapshot struct {
Name string
CreatedAt time.Time
TracePos int // Position in trace buffer
LogPos int // Position in log buffer
MetricPos int // Position in metric buffer
}
Snapshot represents a point-in-time bookmark across all storage buffers. Each snapshot is only 24 bytes (3 ints) making them extremely lightweight.
type SnapshotData ¶
type SnapshotData struct {
StartSnapshot string `json:"start_snapshot"`
EndSnapshot string `json:"end_snapshot"`
TimeRange TimeRange `json:"time_range"`
Traces []*StoredSpan `json:"traces"`
Logs []*StoredLog `json:"logs"`
Metrics []*StoredMetric `json:"metrics"`
Summary SnapshotDataSummary `json:"summary"`
}
SnapshotData represents all telemetry data between two points in time.
type SnapshotDataSummary ¶
type SnapshotDataSummary struct {
SpanCount int `json:"span_count"`
LogCount int `json:"log_count"`
MetricCount int `json:"metric_count"`
Services []string `json:"services"`
TraceIDs []string `json:"trace_ids"`
LogSeverities map[string]int `json:"log_severities"`
MetricNames []string `json:"metric_names"`
}
SnapshotDataSummary provides quick stats about the snapshot data.
type SnapshotManager ¶
SnapshotManager manages named snapshots of buffer positions. Snapshots provide lightweight bookmarks for time-based queries.
func NewSnapshotManager ¶
func NewSnapshotManager() *SnapshotManager
NewSnapshotManager creates a new snapshot manager.
func (*SnapshotManager) Count ¶
func (sm *SnapshotManager) Count() int
Count returns the number of snapshots.
func (*SnapshotManager) Create ¶
func (sm *SnapshotManager) Create(name string, tracePos, logPos, metricPos int) error
Create creates a new snapshot with the specified name and positions. Returns an error if a snapshot with the same name already exists.
func (*SnapshotManager) Delete ¶
func (sm *SnapshotManager) Delete(name string) error
Delete removes a snapshot by name. Returns an error if the snapshot does not exist.
func (*SnapshotManager) Get ¶
func (sm *SnapshotManager) Get(name string) (*Snapshot, error)
Get retrieves a snapshot by name. Returns an error if the snapshot does not exist.
func (*SnapshotManager) List ¶
func (sm *SnapshotManager) List() []string
List returns the names of all snapshots.
type StorageStats ¶
type StorageStats struct {
SpanCount int // Current number of spans stored
Capacity int // Maximum number of spans that can be stored
TraceCount int // Number of distinct traces
}
StorageStats contains statistics about trace storage.
type StoredLog ¶
type StoredLog struct {
ResourceLog *logspb.ResourceLogs
ScopeLog *logspb.ScopeLogs
LogRecord *logspb.LogRecord
// Extracted fields for in-memory filtering
TraceID string
SpanID string
ServiceName string
Severity string
SeverityNum int32
Body string
Timestamp uint64
}
StoredLog wraps a protobuf log record with extracted fields for filtering.
type StoredMetric ¶
type StoredMetric struct {
ResourceMetric *metricspb.ResourceMetrics
ScopeMetric *metricspb.ScopeMetrics
Metric *metricspb.Metric
// Extracted fields for in-memory filtering
MetricName string
ServiceName string
MetricType MetricType
Timestamp uint64
DataPointCount int
// Summary data for quick stats
NumericValue *float64
Count *uint64
Sum *float64
}
StoredMetric wraps a protobuf metric with extracted fields for filtering.
type StoredSpan ¶
type StoredSpan struct {
ResourceSpan *tracepb.ResourceSpans
ScopeSpan *tracepb.ScopeSpans
Span *tracepb.Span
// Indexed fields for fast lookup
TraceID string
SpanID string
ServiceName string
SpanName string
}
StoredSpan wraps a protobuf span with indexed fields for efficient querying. It preserves the full OTLP hierarchy: ResourceSpans -> ScopeSpans -> Span.
func FilterSpans ¶
func FilterSpans(spans []*StoredSpan, opts FilterOptions) []*StoredSpan
FilterSpans applies multiple filters using AND logic. Empty filter values are ignored.
func FilterSpansByName ¶
func FilterSpansByName(spans []*StoredSpan, spanName string) []*StoredSpan
FilterSpansByName returns spans matching the specified span name.
func FilterSpansByService ¶
func FilterSpansByService(spans []*StoredSpan, service string) []*StoredSpan
FilterSpansByService returns spans matching the specified service name.
func FilterSpansByTraceID ¶
func FilterSpansByTraceID(spans []*StoredSpan, traceID string) []*StoredSpan
FilterSpansByTraceID returns spans matching the specified trace ID.
type TimeRange ¶
type TimeRange struct {
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Duration string `json:"duration"`
}
TimeRange represents a time window.
type TraceEntry ¶
type TraceEntry struct {
TraceID string
Service string
RootSpan string
Status string // OK, ERROR, UNSET
DurationMs float64
ErrorMsg string // If failed
Timestamp uint64 // Start time
SpanCount int
HasRoot bool // True if we've seen the actual root span
}
TraceEntry captures trace-level info for activity tracking.
type TraceStorage ¶
type TraceStorage struct {
// contains filtered or unexported fields
}
TraceStorage stores OTLP trace spans without content indexes. Queries use position-based ranges with in-memory filtering. It implements the ReceiveSpans method used by the unified receiver.
func NewTraceStorage ¶
func NewTraceStorage(capacity int) *TraceStorage
NewTraceStorage creates a new trace storage with the specified capacity.
func (*TraceStorage) CurrentPosition ¶
func (ts *TraceStorage) CurrentPosition() int
CurrentPosition returns the current write position. Used by snapshots to bookmark a point in time.
func (*TraceStorage) GetAllSpans ¶
func (ts *TraceStorage) GetAllSpans() []*StoredSpan
GetAllSpans returns all stored spans in chronological order (oldest to newest).
func (*TraceStorage) GetRange ¶
func (ts *TraceStorage) GetRange(start, end int) []*StoredSpan
GetRange returns spans between start and end positions (inclusive). Positions are absolute and represent the logical sequence of spans added.
func (*TraceStorage) GetRecentSpans ¶
func (ts *TraceStorage) GetRecentSpans(n int) []*StoredSpan
GetRecentSpans returns the N most recent spans in chronological order.
func (*TraceStorage) GetSpansByName ¶
func (ts *TraceStorage) GetSpansByName(spanName string) []*StoredSpan
GetSpansByName returns all spans with a given span name. This performs a linear scan of all stored spans.
func (*TraceStorage) GetSpansByService ¶
func (ts *TraceStorage) GetSpansByService(serviceName string) []*StoredSpan
GetSpansByService returns all spans for a given service name. This performs a linear scan of all stored spans.
func (*TraceStorage) GetSpansByTraceID ¶
func (ts *TraceStorage) GetSpansByTraceID(traceID string) []*StoredSpan
GetSpansByTraceID returns all currently stored spans for a given trace ID. This performs an in-memory scan of all stored spans.
func (*TraceStorage) ReceiveSpans ¶
func (ts *TraceStorage) ReceiveSpans(ctx context.Context, resourceSpans []*tracepb.ResourceSpans) error
ReceiveSpans stores incoming OTLP resource spans. It stores received spans and updates indexes for querying.
func (*TraceStorage) Stats ¶
func (ts *TraceStorage) Stats() StorageStats
Stats returns current storage statistics.