metrics

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Package metrics is Murmur's observability surface. The Recorder interface is what runtimes call to record per-pipeline events, errors, and latencies; the recorder implementation decides what to do with them (drop them, push to Prometheus, etc).

Implementations:

  • Noop: discards everything. The default when no recorder is configured.
  • InMemory: keeps per-pipeline running stats and bounded latency histograms in RAM. Powers the admin REST API and the web UI's "live metrics" cards. Suitable for single-process workers; for multi-replica deployments aggregate via your observability stack of choice.

Index

Constants

View Source
const (
	ModeStreaming = "streaming"
	ModeBootstrap = "bootstrap"
	ModeReplay    = "replay"
)

Mode labels for RecordBatch. Stable strings runtimes pass so dashboards have a known vocabulary to filter against.

Variables

This section is empty.

Functions

This section is empty.

Types

type InMemory

type InMemory struct {
	// contains filtered or unexported fields
}

InMemory is a process-local Recorder. Concurrent-safe; bounded memory: each op's latency window is capped at MaxLatencySamples.

func NewInMemory

func NewInMemory() *InMemory

NewInMemory returns an InMemory recorder.

func (*InMemory) RecordBatch

func (m *InMemory) RecordBatch(pipeline string, mode string, n int, d time.Duration)

RecordBatch records n events and a single batch-latency sample under a synthetic op name "batch:<mode>", so dashboards can filter batch throughput by runtime mode without colliding with per-op store_merge / cache_merge histograms.

The synthetic pipeline name "<pipeline>:batch:<mode>" carries the event count; the op-named latency belongs to the unmodified pipeline so query tools can find both batch latency and store_merge latency under the same pipeline key.

func (*InMemory) RecordError

func (m *InMemory) RecordError(pipeline string, err error)

RecordError increments the error counter and stores the most recent error string.

func (*InMemory) RecordEvent

func (m *InMemory) RecordEvent(pipeline string)

RecordEvent increments the events-processed counter for the given pipeline.

func (*InMemory) RecordLatency

func (m *InMemory) RecordLatency(pipeline string, op string, d time.Duration)

RecordLatency stores a duration sample in the ring buffer for (pipeline, op).

func (*InMemory) Snapshot

func (m *InMemory) Snapshot() []PipelineStats

Snapshot returns a point-in-time view of all pipeline metrics. Suitable for the admin REST API; copies are taken so callers can't mutate internal state.

func (*InMemory) SnapshotOne

func (m *InMemory) SnapshotOne(name string) PipelineStats

SnapshotOne returns the snapshot for a single pipeline.

type LatencyStats

type LatencyStats struct {
	N   int
	P50 float64
	P95 float64
	P99 float64
	Max float64
}

LatencyStats is a histogram summary in milliseconds.

type Noop

type Noop struct{}

Noop discards all metrics. Useful as a default and in tests; satisfies Recorder with zero allocations and zero method-call overhead the compiler can't inline away.

func (Noop) RecordBatch

func (Noop) RecordBatch(string, string, int, time.Duration)

RecordBatch implements Recorder.RecordBatch.

func (Noop) RecordError

func (Noop) RecordError(string, error)

RecordError implements Recorder.RecordError.

func (Noop) RecordEvent

func (Noop) RecordEvent(string)

RecordEvent implements Recorder.RecordEvent.

func (Noop) RecordLatency

func (Noop) RecordLatency(string, string, time.Duration)

RecordLatency implements Recorder.RecordLatency.

type PipelineStats

type PipelineStats struct {
	Pipeline        string
	EventsProcessed uint64
	Errors          uint64
	LastEventAt     time.Time
	LastErrorAt     time.Time
	LastError       string
	// Latencies maps op name (e.g. "store_merge") to p50/p95/p99 over the last N samples.
	Latencies map[string]LatencyStats
}

PipelineStats is an immutable snapshot of metrics for a single pipeline.

type Recorder

type Recorder interface {
	// RecordEvent is called once per record successfully processed.
	RecordEvent(pipeline string)

	// RecordError is called when processing a record fails. The runtime decides
	// whether to retry, drop, or surface the error.
	RecordError(pipeline string, err error)

	// RecordLatency records the duration of a named operation. Typical names:
	// "store_merge", "cache_merge", "ack". Histograms expose p50/p95/p99.
	RecordLatency(pipeline string, op string, d time.Duration)

	// RecordBatch is called once per source batch processed by a runtime. The
	// mode label distinguishes which runtime emitted the batch (typically
	// "streaming", "bootstrap", or "replay") so a single dashboard can filter
	// or stack batch throughput by runtime. n is the number of records in the
	// batch; d is the wall-clock time spent draining the batch.
	//
	// Implementations should be cheap: a counter add and a latency sample is
	// enough. The Noop implementation is a true no-op so backfills running
	// without metrics configured pay zero cost.
	RecordBatch(pipeline string, mode string, n int, d time.Duration)
}

Recorder is the abstraction runtimes call to publish observability events. Methods must be safe for concurrent use; implementations should aim for nanosecond-cost on the hot path so wrapping a Counter pipeline doesn't reshape its throughput.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL