tsdb

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package tsdb provides an in-memory ring buffer for per-metric sliding windows with pre-computed aggregates. It accelerates dashboard queries for recent data so they never need to hit the relational DB.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggregator

type Aggregator struct {
	// contains filtered or unexported fields
}

Aggregator manages in-memory tumbling windows for metrics.

func NewAggregator

func NewAggregator(repo *storage.Repository, windowSize time.Duration) *Aggregator

NewAggregator creates a new TSDB aggregator.

func (*Aggregator) BucketCount

func (a *Aggregator) BucketCount() int

BucketCount returns the current number of in-memory buckets (for metrics/health).

func (*Aggregator) DroppedBatches

func (a *Aggregator) DroppedBatches() int64

DroppedBatches returns the total number of batches dropped due to a full flush channel.

func (*Aggregator) Ingest

func (a *Aggregator) Ingest(m RawMetric)

Ingest adds a raw metric point to the current aggregator window.

func (*Aggregator) SetCardinalityLimit

func (a *Aggregator) SetCardinalityLimit(global, perTenant int, onOverflow func(tenantID string))

SetCardinalityLimit configures the global and per-tenant series caps.

global       — total distinct series across all tenants; 0 = unlimited.
perTenant    — distinct series per tenant; 0 = unlimited (global only).
onOverflow   — called once per overflow event with the tenant ID
               that exceeded its cap, or overflowSentinelGlobal when
               the global cap (not per-tenant) is the trigger.

Pass nil for onOverflow to disable the callback. Either cap may be 0 independently.

func (*Aggregator) SetMetrics

func (a *Aggregator) SetMetrics(onIngest, onDropped func())

SetMetrics wires Prometheus metric callbacks.

func (*Aggregator) SetRingBuffer

func (a *Aggregator) SetRingBuffer(rb *RingBuffer)

SetRingBuffer attaches a RingBuffer that receives every ingested data point.

func (*Aggregator) Start

func (a *Aggregator) Start(ctx context.Context)

Start begins the aggregation background processes.

func (*Aggregator) Stop

func (a *Aggregator) Stop()

Stop stops the aggregator.

type MetricRing

type MetricRing struct {
	// contains filtered or unexported fields
}

MetricRing is a fixed-size circular buffer for a single metric+service key. Each slot represents one windowDuration-wide time bucket.

func (*MetricRing) Record

func (r *MetricRing) Record(value float64, at time.Time)

Record adds a new data point, advancing the window if necessary.

func (*MetricRing) Windows

func (r *MetricRing) Windows(n int) []WindowAgg

Windows returns up to `n` most-recent completed window aggregates. Takes a snapshot under the ring lock to avoid acquiring 120+ slot locks sequentially.

type RawMetric

type RawMetric struct {
	Name        string
	ServiceName string
	Value       float64
	Timestamp   time.Time
	Attributes  map[string]any
	// TenantID identifies the owning tenant for this point. When empty the
	// DB default ("default") applies at persist time.
	TenantID string
}

RawMetric represents an incoming single metric data point before aggregation.

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

RingBuffer manages per-metric ring buffers, keyed by "tenant|service|metric".

func NewRingBuffer

func NewRingBuffer(slots int, windowDur time.Duration, maxSeries int, onSeriesRejected func()) *RingBuffer

NewRingBuffer creates a RingBuffer. slots × windowDur = total retention (e.g. 120 × 30s = 1 hour). maxSeries caps distinct tenant|service|metric series (0 = unlimited); onSeriesRejected is invoked once per point refused at the cap (nil = no-op).

func (*RingBuffer) AllKeys

func (rb *RingBuffer) AllKeys() []string

AllKeys returns all registered tenant|service|metric keys.

func (*RingBuffer) MetricCount

func (rb *RingBuffer) MetricCount() int

MetricCount returns the number of distinct tenant-scoped metric series tracked.

func (*RingBuffer) QueryRecent

func (rb *RingBuffer) QueryRecent(tenantID, metricName, serviceName string, windowCount int) []WindowAgg

QueryRecent returns aggregated windows for the given tenant+metric+service.

func (*RingBuffer) Record

func (rb *RingBuffer) Record(tenantID, metricName, serviceName string, value float64, at time.Time) bool

Record records a value for the given tenant+metric+service at time t. Returns false when the point was refused because creating a NEW series would exceed maxSeries; existing series always keep recording.

type WindowAgg

type WindowAgg struct {
	MetricName  string
	ServiceName string
	WindowStart time.Time
	Count       int64
	Sum         float64
	Min         float64
	Max         float64
	P50         float64
	P95         float64
	P99         float64
	// contains filtered or unexported fields
}

WindowAgg is the pre-computed aggregate for one time window slot.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL