Documentation
¶
Overview ¶
Package metrics is Murmur's observability surface. The Recorder interface is what runtimes call to record per-pipeline events, errors, and latencies; the recorder implementation decides what to do with them (drop them, push to Prometheus, etc).
Implementations:
- Noop: discards everything. The default when no recorder is configured.
- InMemory: keeps per-pipeline running stats and bounded latency histograms in RAM. Powers the admin REST API and the web UI's "live metrics" cards. Suitable for single-process workers; for multi-replica deployments aggregate via your observability stack of choice.
Index ¶
- Constants
- type InMemory
- func (m *InMemory) RecordBatch(pipeline string, mode string, n int, d time.Duration)
- func (m *InMemory) RecordError(pipeline string, err error)
- func (m *InMemory) RecordEvent(pipeline string)
- func (m *InMemory) RecordLatency(pipeline string, op string, d time.Duration)
- func (m *InMemory) Snapshot() []PipelineStats
- func (m *InMemory) SnapshotOne(name string) PipelineStats
- type LatencyStats
- type Noop
- type PipelineStats
- type Recorder
Constants ¶
const ( ModeStreaming = "streaming" ModeBootstrap = "bootstrap" ModeReplay = "replay" )
Mode labels for RecordBatch. Stable strings runtimes pass so dashboards have a known vocabulary to filter against.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type InMemory ¶
type InMemory struct {
// contains filtered or unexported fields
}
InMemory is a process-local Recorder. Concurrent-safe; bounded memory: each op's latency window is capped at MaxLatencySamples.
func (*InMemory) RecordBatch ¶
RecordBatch records n events and a single batch-latency sample under a synthetic op name "batch:<mode>", so dashboards can filter batch throughput by runtime mode without colliding with per-op store_merge / cache_merge histograms.
The synthetic pipeline name "<pipeline>:batch:<mode>" carries the event count; the op-named latency belongs to the unmodified pipeline so query tools can find both batch latency and store_merge latency under the same pipeline key.
func (*InMemory) RecordError ¶
RecordError increments the error counter and stores the most recent error string.
func (*InMemory) RecordEvent ¶
RecordEvent increments the events-processed counter for the given pipeline.
func (*InMemory) RecordLatency ¶
RecordLatency stores a duration sample in the ring buffer for (pipeline, op).
func (*InMemory) Snapshot ¶
func (m *InMemory) Snapshot() []PipelineStats
Snapshot returns a point-in-time view of all pipeline metrics. Suitable for the admin REST API; copies are taken so callers can't mutate internal state.
func (*InMemory) SnapshotOne ¶
func (m *InMemory) SnapshotOne(name string) PipelineStats
SnapshotOne returns the snapshot for a single pipeline.
type LatencyStats ¶
LatencyStats is a histogram summary in milliseconds.
type Noop ¶
type Noop struct{}
Noop discards all metrics. Useful as a default and in tests; satisfies Recorder with zero allocations and zero method-call overhead the compiler can't inline away.
func (Noop) RecordBatch ¶
RecordBatch implements Recorder.RecordBatch.
func (Noop) RecordError ¶
RecordError implements Recorder.RecordError.
func (Noop) RecordEvent ¶
RecordEvent implements Recorder.RecordEvent.
type PipelineStats ¶
type PipelineStats struct {
Pipeline string
EventsProcessed uint64
Errors uint64
LastEventAt time.Time
LastErrorAt time.Time
LastError string
// Latencies maps op name (e.g. "store_merge") to p50/p95/p99 over the last N samples.
Latencies map[string]LatencyStats
}
PipelineStats is an immutable snapshot of metrics for a single pipeline.
type Recorder ¶
type Recorder interface {
// RecordEvent is called once per record successfully processed.
RecordEvent(pipeline string)
// RecordError is called when processing a record fails. The runtime decides
// whether to retry, drop, or surface the error.
RecordError(pipeline string, err error)
// RecordLatency records the duration of a named operation. Typical names:
// "store_merge", "cache_merge", "ack". Histograms expose p50/p95/p99.
RecordLatency(pipeline string, op string, d time.Duration)
// RecordBatch is called once per source batch processed by a runtime. The
// mode label distinguishes which runtime emitted the batch (typically
// "streaming", "bootstrap", or "replay") so a single dashboard can filter
// or stack batch throughput by runtime. n is the number of records in the
// batch; d is the wall-clock time spent draining the batch.
//
// Implementations should be cheap: a counter add and a latency sample is
// enough. The Noop implementation is a true no-op so backfills running
// without metrics configured pay zero cost.
RecordBatch(pipeline string, mode string, n int, d time.Duration)
}
Recorder is the abstraction runtimes call to publish observability events. Methods must be safe for concurrent use; implementations should aim for nanosecond-cost on the hot path so wrapping a Counter pipeline doesn't reshape its throughput.