tsdb

package
v0.2.0-beta.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package tsdb provides an in-memory ring buffer for per-metric sliding windows with pre-computed aggregates. It accelerates dashboard queries for recent data so they never need to hit the relational DB.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggregator

type Aggregator struct {
	// contains filtered or unexported fields
}

Aggregator manages in-memory tumbling windows for metrics.

func NewAggregator

func NewAggregator(repo *storage.Repository, windowSize time.Duration) *Aggregator

NewAggregator creates a new TSDB aggregator.

func (*Aggregator) BucketCount

func (a *Aggregator) BucketCount() int

BucketCount returns the current number of in-memory buckets (for metrics/health).

func (*Aggregator) DroppedBatches

func (a *Aggregator) DroppedBatches() int64

DroppedBatches returns the total number of batches dropped due to a full flush channel.

func (*Aggregator) Ingest

func (a *Aggregator) Ingest(m RawMetric)

Ingest adds a raw metric point to the current aggregator window.

func (*Aggregator) SetCardinalityLimit

func (a *Aggregator) SetCardinalityLimit(global, perTenant int, onOverflow func(tenantID string))

SetCardinalityLimit configures the global and per-tenant series caps.

global       — total distinct series across all tenants; 0 = unlimited.
perTenant    — distinct series per tenant; 0 = unlimited (global only).
onOverflow   — called once per overflow event with the tenant ID
               that exceeded its cap, or overflowSentinelGlobal when
               the global cap (not per-tenant) is the trigger.

Pass nil for onOverflow to disable the callback. Either cap may be 0 independently.

func (*Aggregator) SetMetrics

func (a *Aggregator) SetMetrics(onIngest, onDropped func())

SetMetrics wires Prometheus metric callbacks.

func (*Aggregator) SetRingBuffer

func (a *Aggregator) SetRingBuffer(rb *RingBuffer)

SetRingBuffer attaches a RingBuffer that receives every ingested data point.

func (*Aggregator) Start

func (a *Aggregator) Start(ctx context.Context)

Start begins the aggregation background processes.

func (*Aggregator) Stop

func (a *Aggregator) Stop()

Stop stops the aggregator.

type MetricRing

type MetricRing struct {
	// contains filtered or unexported fields
}

MetricRing is a fixed-size circular buffer for a single metric+service key. Each slot represents one windowDuration-wide time bucket.

func (*MetricRing) Record

func (r *MetricRing) Record(value float64, at time.Time)

Record adds a new data point, advancing the window if necessary.

func (*MetricRing) Windows

func (r *MetricRing) Windows(n int) []WindowAgg

Windows returns up to `n` most-recent completed window aggregates. Takes a snapshot under the ring lock to avoid acquiring 120+ slot locks sequentially.

type RawMetric

type RawMetric struct {
	Name        string
	ServiceName string
	Value       float64
	Timestamp   time.Time
	Attributes  map[string]any
	// TenantID identifies the owning tenant for this point. When empty the
	// DB default ("default") applies at persist time.
	TenantID string
}

RawMetric represents an incoming single metric data point before aggregation.

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

RingBuffer manages per-metric ring buffers, keyed by "service|metric".

func NewRingBuffer

func NewRingBuffer(slots int, windowDur time.Duration) *RingBuffer

NewRingBuffer creates a RingBuffer. slots × windowDur = total retention (e.g. 120 × 30s = 1 hour).

func (*RingBuffer) AllKeys

func (rb *RingBuffer) AllKeys() []string

AllKeys returns all registered metric+service keys.

func (*RingBuffer) MetricCount

func (rb *RingBuffer) MetricCount() int

MetricCount returns the number of distinct metric series tracked.

func (*RingBuffer) QueryRecent

func (rb *RingBuffer) QueryRecent(metricName, serviceName string, windowCount int) []WindowAgg

QueryRecent returns aggregated windows for the given metric+service. Pass an empty serviceName to query across all services (returns first match).

func (*RingBuffer) Record

func (rb *RingBuffer) Record(metricName, serviceName string, value float64, at time.Time)

Record records a value for the given metric+service at time t.

type WindowAgg

type WindowAgg struct {
	MetricName  string
	ServiceName string
	WindowStart time.Time
	Count       int64
	Sum         float64
	Min         float64
	Max         float64
	P50         float64
	P95         float64
	P99         float64
	// contains filtered or unexported fields
}

WindowAgg is the pre-computed aggregate for one time window slot.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL