monitoring

package
v0.0.0-...-ebdf8eb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: AGPL-3.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LuaFastPathOutcomeHit            = "hit"
	LuaFastPathOutcomeSkipLoaded     = "skip_loaded"
	LuaFastPathOutcomeSkipCachedType = "skip_cached_type"
)

LuaFastPathOutcome labels tag each Lua-side read fast-path decision so operators can see how often a given command (ZRANGEBYSCORE, ZSCORE, HGET, etc.) actually takes the fast path vs falls back.

View Source
const (
	SQSPartitionActionSend    = "send"
	SQSPartitionActionReceive = "receive"
	SQSPartitionActionDelete  = "delete"
)

SQS HT-FIFO partition action labels. Stable string set so dashboards / alerts can rely on the values not changing.

Variables

This section is empty.

Functions

func AddressRequiresToken

func AddressRequiresToken(addr string) bool

AddressRequiresToken reports whether the address is exposed beyond loopback and therefore requires bearer-token protection.

func MetricsServeTask

func MetricsServeTask(server *http.Server, listener net.Listener, address string) func() error

MetricsServeTask returns an errgroup task that serves the metrics endpoint until shutdown.

func MetricsShutdownTask

func MetricsShutdownTask(ctx context.Context, server *http.Server, address string) func() error

MetricsShutdownTask returns an errgroup task that stops the metrics server on context cancellation.

func NewMetricsServer

func NewMetricsServer(handler http.Handler, bearerToken string) *http.Server

NewMetricsServer constructs an HTTP server for the protected metrics handler.

func NewPprofHandler

func NewPprofHandler() http.Handler

NewPprofHandler returns an http.Handler that serves the Go runtime profiling endpoints under /debug/pprof/.

func NewPprofServer

func NewPprofServer(bearerToken string) *http.Server

NewPprofServer creates an HTTP server for the pprof debug endpoints, optionally protected by bearer-token authentication.

func PprofServeTask

func PprofServeTask(server *http.Server, listener net.Listener, address string) func() error

PprofServeTask returns an errgroup task that serves the pprof endpoint until shutdown.

func PprofShutdownTask

func PprofShutdownTask(ctx context.Context, server *http.Server, address string) func() error

PprofShutdownTask returns an errgroup task that stops the pprof server on context cancellation.

func ProtectHandler

func ProtectHandler(handler http.Handler, bearerToken string) http.Handler

ProtectHandler wraps a metrics handler with optional bearer-token authentication.

func RegisterLuaPool

func RegisterLuaPool(registerer prometheus.Registerer, src LuaPoolSource) error

RegisterLuaPool wires `src` into the Prometheus registerer as a set of CounterFunc / GaugeFunc metrics:

  • elastickv_lua_pool_hits_total
  • elastickv_lua_pool_misses_total
  • elastickv_lua_pool_drops_total
  • elastickv_lua_pool_idle
  • elastickv_lua_pool_max_idle

Returns nil if src or registerer is nil; returns the first registration error otherwise. The caller (typically adapter.RedisServer.RegisterLuaPoolMetrics) should log-and- continue on error rather than refusing to start: a missing pool metric is observability degradation, not a correctness issue.

The CounterFunc / GaugeFunc closures capture `src` directly, so they read live values at scrape time. There is no polling goroutine and no snapshot staleness.

Types

type ColdStartMetrics

type ColdStartMetrics struct {
	// contains filtered or unexported fields
}

ColdStartMetrics exposes the cold-start snapshot-restore skip gate's three outcomes as Prometheus series. The skip gate fires at most once per process lifetime (on cold start) so these are low-cardinality, low-rate signals — counters rather than histograms. Operators alert when the skip rate falls below a soak threshold (design target: ≥ 90% in steady state) or when fallback_reason rises unexpectedly.

See docs/design/2026_06_02_idempotent_snapshot_restore.md §9.

type ColdStartObserver

type ColdStartObserver struct {
	// contains filtered or unexported fields
}

ColdStartObserver is the monitoring-side implementation of raftengine.ColdStartObserver. Holds nothing but the metrics handle; safe to share across goroutines (Prometheus collectors are concurrency-safe).

func (*ColdStartObserver) RestoreExecuted

func (o *ColdStartObserver) RestoreExecuted(snapIndex, have uint64)

RestoreExecuted records a full restore that ran. gap is the absolute distance |snapIndex - have| in entry indices.

The skip-gate threshold is no longer snap.Metadata.Index — it shifted to the WAL committed-tail (codex P1 #934). An executed restore therefore no longer implies have < snapIndex: a crashed follower can have the FSM ahead of the snapshot pointer but behind the WAL committed tail. Using snapIndex - have directly would underflow into ~2^64 in that case and corrupt the gauge. Codex P2 #934 round 3.

func (*ColdStartObserver) RestoreFallback

func (o *ColdStartObserver) RestoreFallback(snapIndex uint64, reason string)

RestoreFallback records the strictly-additive fallback path. reason is a stable enum the engine supplies (not_reader / missing_meta / read_err). No gap is reported — the store could not authoritatively name a value.

func (*ColdStartObserver) RestoreSkipped

func (o *ColdStartObserver) RestoreSkipped(snapIndex, have uint64)

RestoreSkipped records a successful skip. gap = have - snapIndex (positive: how far ahead the live store was).

type DispatchCollector

type DispatchCollector struct {
	// contains filtered or unexported fields
}

DispatchCollector polls the etcd raft Engine's atomic dispatch counters on a fixed interval and mirrors them into monotonic Prometheus counters. We poll rather than calling Add() inline in the raft path because those code paths are already hot and must not take any additional interface call; the counters are atomic.Uint64 in the engine and polling is cheap (O(groups) reads every 5s).

func (*DispatchCollector) ObserveOnce

func (c *DispatchCollector) ObserveOnce(sources []DispatchSource)

ObserveOnce is exposed for tests and single-shot callers.

func (*DispatchCollector) Start

func (c *DispatchCollector) Start(ctx context.Context, sources []DispatchSource, interval time.Duration)

Start polls sources on the given interval until ctx is canceled. Passing interval <= 0 uses defaultDispatchPollInterval (5 s), which matches the cadence of RaftObserver so operators see consistent refresh rates across dashboards.

type DispatchCounterSource

type DispatchCounterSource interface {
	DispatchDropCount() uint64
	DispatchErrorCount() uint64
	StepQueueFullCount() uint64
	// DispatchErrorCountsByCode returns a snapshot of dispatch error
	// counts keyed by grpc status code ("Unavailable",
	// "DeadlineExceeded", ...). Sum of values equals
	// DispatchErrorCount(). Implementations that do not break out by
	// code may return an empty map.
	DispatchErrorCountsByCode() map[string]uint64
}

DispatchCounterSource abstracts the etcd raft Engine's monotonic dispatch counters so monitoring can scrape them without importing the etcd package. The concrete etcd Engine satisfies this interface via its DispatchDropCount / DispatchErrorCount / StepQueueFullCount accessors.

type DispatchSource

type DispatchSource struct {
	GroupID uint64
	Source  DispatchCounterSource
}

DispatchSource binds a raft group ID to its counter source. Multiple groups can be polled by a single collector on a sharded node.

type DynamoDBMetrics

type DynamoDBMetrics struct {
	// contains filtered or unexported fields
}

func (*DynamoDBMetrics) ObserveDynamoDBRequest

func (m *DynamoDBMetrics) ObserveDynamoDBRequest(report DynamoDBRequestReport)

ObserveDynamoDBRequest records the final outcome of a request.

func (*DynamoDBMetrics) ObserveInFlightChange

func (m *DynamoDBMetrics) ObserveInFlightChange(operation string, delta float64)

ObserveInFlightChange adjusts the in-flight request gauge for an operation.

type DynamoDBRequestObserver

type DynamoDBRequestObserver interface {
	ObserveInFlightChange(operation string, delta float64)
	ObserveDynamoDBRequest(report DynamoDBRequestReport)
}

DynamoDBRequestObserver records per-request DynamoDB-compatible API metrics.

type DynamoDBRequestReport

type DynamoDBRequestReport struct {
	Operation     string
	HTTPStatus    int
	ErrorType     string
	Duration      time.Duration
	RequestBytes  int
	ResponseBytes int
	Tables        []string
	TableMetrics  map[string]DynamoDBTableMetrics
}

DynamoDBRequestReport is the normalized result of a single request.

type DynamoDBTableMetrics

type DynamoDBTableMetrics struct {
	ReturnedItems int
	ScannedItems  int
	WrittenItems  int
}

DynamoDBTableMetrics captures table-scoped work done by a request.

type HLCMetrics

type HLCMetrics struct {
	// contains filtered or unexported fields
}

HLCMetrics exposes the HLC-4 (i) bounded-skew assumption and the HLC-4 (iii) fence-trip rate as Prometheus series so operators can alert before the safety property is exercised in earnest.

The two gauges together let operators see *why* a NextFenced rejection happened (was the ceiling stale? was the wall clock ahead?), and the counter captures the load-bearing fact: NextFenced refused to issue a persistence ts.

type HLCObserver

type HLCObserver struct {
	// contains filtered or unexported fields
}

HLCObserver polls an HLCSource on a fixed interval and updates the matching HLCMetrics. Created via Registry.HLCObserver(); started by main.go alongside the other observers once the HLC is wired.

func (*HLCObserver) ObserveOnce

func (o *HLCObserver) ObserveOnce(source HLCSource)

ObserveOnce captures a single snapshot. Exposed for tests.

func (*HLCObserver) Start

func (o *HLCObserver) Start(ctx context.Context, source HLCSource, interval time.Duration)

Start polls the source on every tick until ctx is cancelled. An interval of zero falls back to defaultObserveInterval (5s) to match the Raft observer. Safe to call with a nil receiver / nil source — both shapes silently no-op so test fixtures that omit the registry do not need conditional wiring.

type HLCSource

type HLCSource interface {
	PhysicalCeiling() int64
	NextFencedRejections() uint64
}

HLCSource is the surface monitoring needs from the kv-layer HLC. Defining it here (rather than importing *kv.HLC directly) keeps the monitoring package decoupled from the kv module and lets a test or shim supply its own snapshot.

PhysicalCeiling returns the Raft-agreed physical ceiling in Unix milliseconds (zero before bootstrap). NextFencedRejections is a cumulative counter incremented inside HLC.NextFenced() whenever it returns ErrCeilingExpired; the observer reads the difference between successive ticks so Prometheus sees a counter, not a gauge.

type HotPathMetrics

type HotPathMetrics struct {
	// contains filtered or unexported fields
}

HotPathMetrics owns the Prometheus vectors introduced for the Redis GET hot-path dashboard. Kept in its own type so the Registry can hold a single instance and hand out scoped observer/collector objects.

type LeaseReadObserver

type LeaseReadObserver struct {
	// contains filtered or unexported fields
}

LeaseReadObserver implements kv.LeaseReadObserver by incrementing the elastickv_lease_read_total counter vector. Callers grab an instance via Registry.LeaseReadObserver(); the zero value is safe and silently drops samples, so tests can pass LeaseReadObserver{} as a stub.

func (LeaseReadObserver) ObserveLeaseRead

func (o LeaseReadObserver) ObserveLeaseRead(hit bool)

ObserveLeaseRead records a single lease-read outcome.

type LuaFastPathCmd

type LuaFastPathCmd struct {
	// contains filtered or unexported fields
}

LuaFastPathCmd is a pre-resolved bundle of fast-path outcome counters for a single Lua command. Construct once via LuaFastPathObserver.ForCommand(cmd) at server startup; call the Observe* methods per redis.call(). Safe to copy.

func (LuaFastPathCmd) ObserveFallback

func (c LuaFastPathCmd) ObserveFallback(reason LuaFastPathFallbackReason)

ObserveFallback routes to the counter for the given reason. Use the LuaFastPathFallback* constants; unknown values (including the zero value LuaFastPathFallbackNone, which should never reach here) land on the "other" bucket so cardinality stays bounded.

func (LuaFastPathCmd) ObserveHit

func (c LuaFastPathCmd) ObserveHit()

Observe* methods record one outcome. Each is a single atomic increment when the counter is wired; a no-op on the zero value.

func (LuaFastPathCmd) ObserveSkipCachedType

func (c LuaFastPathCmd) ObserveSkipCachedType()

func (LuaFastPathCmd) ObserveSkipLoaded

func (c LuaFastPathCmd) ObserveSkipLoaded()

type LuaFastPathFallbackReason

type LuaFastPathFallbackReason string

LuaFastPathFallbackReason is the typed label for server-side hit=false branches. A dedicated type (instead of a bare string) lets Go catch typos in call sites — producers in adapter/ and the switch in ObserveFallback stay in lockstep with the constants below, and the Prometheus label value is derived directly from the underlying string via string(reason).

const (
	LuaFastPathFallbackIneligible  LuaFastPathFallbackReason = "fallback_ineligible"
	LuaFastPathFallbackMissingKey  LuaFastPathFallbackReason = "fallback_missing_key"
	LuaFastPathFallbackWrongType   LuaFastPathFallbackReason = "fallback_wrong_type"
	LuaFastPathFallbackTruncated   LuaFastPathFallbackReason = "fallback_truncated"
	LuaFastPathFallbackLargeOffset LuaFastPathFallbackReason = "fallback_large_offset"
	LuaFastPathFallbackOther       LuaFastPathFallbackReason = "fallback_other"
	// LuaFastPathFallbackNone is the sentinel returned alongside
	// hit=true; callers must not record it.
	LuaFastPathFallbackNone LuaFastPathFallbackReason = ""
)

Known fallback reasons. Subdivides the generic "fallback" outcome so operators can tell WHY the fast path gave up and route the fix accordingly (eligibility check, truncation, empty-key short-circuit, etc.).

type LuaFastPathObserver

type LuaFastPathObserver struct {
	// contains filtered or unexported fields
}

LuaFastPathObserver records fast-path outcomes for redis.call() inside Lua scripts. The zero value is safe and silently drops samples so tests can pass LuaFastPathObserver{} as a stub.

Hot-path shape: each Observe* call on a LuaFastPathCmd handle is a single non-blocking atomic increment on a pre-resolved prometheus.Counter (client_golang's default Counter uses sync/atomic internally). Callers resolve one LuaFastPathCmd per command at server construction to avoid CounterVec.WithLabelValues (mutex-guarded map lookup) on the hot path.

func (LuaFastPathObserver) ForCommand

func (o LuaFastPathObserver) ForCommand(cmd string) LuaFastPathCmd

ForCommand pre-resolves the counter handles for cmd. Returns a zero-value LuaFastPathCmd when the observer is empty (tests), which silently drops all Observe* calls.

type LuaMetrics

type LuaMetrics struct {
	// contains filtered or unexported fields
}

LuaMetrics holds Prometheus metrics that break down where time goes inside a Lua script execution so that Lua VM slowness, Raft latency, and write conflict retries can be distinguished in dashboards.

func (*LuaMetrics) ObserveLuaScript

func (m *LuaMetrics) ObserveLuaScript(report LuaScriptReport)

ObserveLuaScript records a completed script invocation.

type LuaPoolSource

type LuaPoolSource interface {
	// Hits is the total number of get() calls served from the idle
	// pool (counts pool reuses).
	Hits() uint64
	// Misses is the total number of get() calls that fell through
	// to a fresh *lua.LState allocation (counts pool starvation).
	Misses() uint64
	// Drops is the total number of put() calls rejected because
	// the idle pool was full (counts pool saturation — i.e.
	// MaxIdle could be raised to retain more states).
	Drops() uint64
	// Idle is the current number of *lua.LState instances sitting
	// in the pool ready for reuse.
	Idle() int
	// MaxIdle is the configured upper bound on Idle. Exported so
	// dashboards can plot Idle/MaxIdle as a saturation ratio.
	MaxIdle() int
}

LuaPoolSource exposes the bounded *lua.LState pool counters that the Redis adapter maintains. The interface is a narrow projection of adapter.luaStatePool so the monitoring package does not depend on the adapter package (avoiding an import cycle): adapter wires `*luaStatePool` as a LuaPoolSource at NewRedisServer time.

All four accessors are read at Prometheus scrape time via prometheus.NewCounterFunc / NewGaugeFunc, so the lua hot path incurs zero observability overhead — the counters live on the pool as atomic.Uint64 either way; the metrics collector merely reads them when /metrics is hit.

type LuaScriptObserver

type LuaScriptObserver interface {
	ObserveLuaScript(report LuaScriptReport)
}

LuaScriptObserver is implemented by anything that wants to record Lua script execution metrics.

type LuaScriptReport

type LuaScriptReport struct {
	// LuaExecDuration is the cumulative time spent inside state.PCall across
	// all retry attempts. This includes RedisCallDuration.
	LuaExecDuration time.Duration
	// RedisCallDuration is the cumulative time spent inside redis.call() /
	// redis.pcall() handlers (scriptCtx.exec) — Raft reads and in-memory
	// state updates. Pure VM time = LuaExecDuration - RedisCallDuration.
	RedisCallDuration time.Duration
	// RedisCallCount is the total number of redis.call()/pcall() invocations
	// across all retry attempts. High counts explain high RedisCallDuration.
	RedisCallCount int
	// CommitDuration is the cumulative time spent in scriptCtx.commit()
	// (coordinator.Dispatch → Raft consensus + storage write).
	CommitDuration time.Duration
	// ConflictRetries is the number of extra attempts caused by write
	// conflicts or locked transactions (0 = succeeded on the first try).
	ConflictRetries int
	// IsError is true when the final outcome was an error.
	IsError bool
}

LuaScriptReport summarises one Lua script invocation (EVAL / EVALSHA), covering all retryRedisWrite attempts.

type PebbleCacheCapacitySource

type PebbleCacheCapacitySource interface {
	BlockCacheCapacityBytes() int64
}

PebbleCacheCapacitySource is an optional companion to PebbleMetricsSource: sources that expose the configured block-cache capacity (in bytes) are queried by the collector to populate elastickv_pebble_block_cache_capacity_bytes. Implementations return 0 to indicate "not known / store closed"; the collector then leaves the gauge at its last observed value for that tick. The concrete *store pebbleStore satisfies this via BlockCacheCapacityBytes(); tests can omit it.

type PebbleCollector

type PebbleCollector struct {
	// contains filtered or unexported fields
}

PebbleCollector polls each registered Pebble store on a fixed interval and mirrors the snapshot into the Prometheus vectors. Gauges are overwritten; counters advance by the positive delta against the previous snapshot.

func (*PebbleCollector) ObserveOnce

func (c *PebbleCollector) ObserveOnce(sources []PebbleSource)

ObserveOnce is exposed for tests and single-shot callers.

func (*PebbleCollector) Start

func (c *PebbleCollector) Start(ctx context.Context, sources []PebbleSource, interval time.Duration)

Start begins polling sources on interval until ctx is canceled. Passing interval <= 0 uses defaultPebblePollInterval (5 s), matching the DispatchCollector cadence so operators see consistent refresh rates across dashboards. Pebble.Metrics() acquires internal mutexes but is not expensive; 5 s gives ample headroom.

type PebbleMetrics

type PebbleMetrics struct {
	// contains filtered or unexported fields
}

PebbleMetrics owns the Prometheus vectors for Pebble LSM internals. One instance per registry; shared by all groups (labelled by group ID + level where relevant).

func (*PebbleMetrics) SetFSMApplySyncMode

func (m *PebbleMetrics) SetFSMApplySyncMode(activeLabel string)

SetFSMApplySyncMode records which ELASTICKV_FSM_SYNC_MODE is active. activeLabel must be "sync" or "nosync"; any other value is coerced to "sync" to match the store resolver's fallback behaviour for unknown ELASTICKV_FSM_SYNC_MODE values (see store.resolveFSMApplyWriteOpts). This keeps the gauge's two-row shape stable: exactly one of {"sync","nosync"} is 1 at any time and the other is 0.

Call this once at startup after the store package has resolved the env var. Invoking again is safe and idempotent: the new label goes to 1 and the other known label goes to 0.

type PebbleMetricsSource

type PebbleMetricsSource interface {
	Metrics() *pebble.Metrics
}

PebbleMetricsSource abstracts the per-group access to a Pebble DB's Metrics(). The concrete *store pebbleStore satisfies this via its Metrics() accessor. Returning nil (e.g. store closed mid-restore) is allowed; the collector will skip that group for the tick.

type PebbleSource

type PebbleSource struct {
	GroupID    uint64
	GroupIDStr string
	Source     PebbleMetricsSource
}

PebbleSource binds a raft group ID to its Pebble store. Multiple groups can be polled by a single collector on a sharded node. GroupIDStr is the pre-formatted decimal form of GroupID used as the "group" Prometheus label; pre-computing it avoids a per-tick strconv.FormatUint allocation in observeOnce.

type RaftMetrics

type RaftMetrics struct {
	// contains filtered or unexported fields
}

RaftMetrics holds all Prometheus gauge vectors for a single Raft group. Fields are populated by observeRuntime from raftengine status snapshots.

type RaftObserver

type RaftObserver struct {
	// contains filtered or unexported fields
}

func (*RaftObserver) ObserveOnce

func (o *RaftObserver) ObserveOnce(runtimes []RaftRuntime)

ObserveOnce captures the latest raft state for all configured runtimes.

func (*RaftObserver) Start

func (o *RaftObserver) Start(ctx context.Context, runtimes []RaftRuntime, interval time.Duration)

Start polls raft state and configuration on a fixed interval until ctx is canceled.

type RaftRuntime

type RaftRuntime struct {
	GroupID      uint64
	StatusReader raftengine.StatusReader
	ConfigReader raftengine.ConfigReader
}

RaftRuntime describes a raft group observed by the metrics exporter.

type RedisMetrics

type RedisMetrics struct {
	// contains filtered or unexported fields
}

RedisMetrics holds all Prometheus metric vectors for the Redis adapter.

func (*RedisMetrics) ObserveRedisRequest

func (m *RedisMetrics) ObserveRedisRequest(report RedisRequestReport)

ObserveRedisRequest records the final outcome of a Redis command.

type RedisRequestObserver

type RedisRequestObserver interface {
	ObserveRedisRequest(report RedisRequestReport)
}

RedisRequestObserver records per-command Redis API metrics.

type RedisRequestReport

type RedisRequestReport struct {
	Command  string
	IsError  bool
	Duration time.Duration
	// Unsupported indicates the command was rejected because the adapter
	// has no route for it. When true, ObserveRedisRequest additionally
	// records the real (bounded) command name in
	// elastickv_redis_unsupported_commands_total alongside the existing
	// "unknown"-bucketed counters, which are preserved unchanged.
	Unsupported bool
}

RedisRequestReport is the normalized result of a single Redis command.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry owns the Prometheus registry used by a single Elastickv node.

func NewRegistry

func NewRegistry(nodeID string, nodeAddress string) *Registry

NewRegistry builds a registry with constant labels that identify the local node.

func (*Registry) ColdStartObserver

func (r *Registry) ColdStartObserver() *ColdStartObserver

ColdStartObserver returns the cold-start snapshot-restore observer backed by this registry. The engine receives it through raftengine/etcd.OpenConfig.ColdStartObserver and calls it on each skip-gate outcome (PR #910 design §9).

func (*Registry) DispatchCollector

func (r *Registry) DispatchCollector() *DispatchCollector

DispatchCollector returns a collector that polls the etcd raft engine's dispatch counters and exports them to Prometheus. Start it with the node's raft sources after engine Open() completes.

func (*Registry) DynamoDBObserver

func (r *Registry) DynamoDBObserver() DynamoDBRequestObserver

DynamoDBObserver returns the DynamoDB request observer backed by this registry.

func (*Registry) Gatherer

func (r *Registry) Gatherer() prometheus.Gatherer

Gatherer exposes the underlying gatherer for tests and custom exporters.

func (*Registry) HLCObserver

func (r *Registry) HLCObserver() *HLCObserver

HLCObserver returns the HLC physical-ceiling + fence-rejection observer backed by this registry. Same shape as SQSObserver: a single observer is constructed inside NewRegistry and returned by reference here, so callers that pull it more than once observe the same lastRejections delta state (returning a fresh observer each call would reset lastRejections and risk double-counting rejections against the cumulative Prometheus counter — claude review on PR #879). Returns nil if r is nil so test fixtures can drop the observer without conditional wiring.

func (*Registry) Handler

func (r *Registry) Handler() http.Handler

Handler returns an HTTP handler that exposes the Prometheus scrape endpoint.

func (*Registry) LeaseReadObserver

func (r *Registry) LeaseReadObserver() LeaseReadObserver

LeaseReadObserver returns an observer for the kv coordinator's LeaseRead fast-path counter. Returns a zero-value observer when the registry is nil so callers can pass the result through without checking; the zero value silently drops samples.

func (*Registry) LuaFastPathObserver

func (r *Registry) LuaFastPathObserver() LuaFastPathObserver

LuaFastPathObserver returns an observer for Lua-side redis.call() fast-path outcomes (hit / skip / fallback per command). Zero-value safe for tests and tools that do not wire a registry.

func (*Registry) LuaObserver

func (r *Registry) LuaObserver() LuaScriptObserver

LuaObserver returns the Lua script execution observer backed by this registry.

func (*Registry) PebbleCollector

func (r *Registry) PebbleCollector() *PebbleCollector

PebbleCollector returns a collector that polls each Pebble store's Metrics() snapshot and mirrors the operationally useful fields (L0 sublevels, compaction debt, memtable, block cache) into Prometheus. Start it with the node's Pebble sources after the stores have been opened.

func (*Registry) RaftObserver

func (r *Registry) RaftObserver() *RaftObserver

RaftObserver returns the Raft topology observer backed by this registry.

func (*Registry) RaftProposalObserver

func (r *Registry) RaftProposalObserver(groupID uint64) *raftProposalObserver

RaftProposalObserver returns a group-scoped observer for failed raft proposals.

func (*Registry) RedisObserver

func (r *Registry) RedisObserver() RedisRequestObserver

RedisObserver returns the Redis request observer backed by this registry.

func (*Registry) Registerer

func (r *Registry) Registerer() prometheus.Registerer

Registerer exposes the label-wrapped registerer for callers that own a metric source whose lifecycle does not fit the newXxxMetrics(registerer) pattern in NewRegistry — currently the Redis adapter's Lua VM pool, which materializes inside NewRedisServer and registers via CounterFunc / GaugeFunc at that point. Returns nil if r is nil so callers can dereference safely in test fixtures.

func (*Registry) SQSObserver

func (r *Registry) SQSObserver() *SQSObserver

SQSObserver returns the queue-depth gauge observer backed by this registry. Same shape as RaftObserver / RedisObserver: callers pull it via the registry, then drive Start(ctx, source, interval) from main.go's startMonitoringCollectors.

func (*Registry) SQSPartitionObserver

func (r *Registry) SQSPartitionObserver() SQSPartitionObserver

SQSPartitionObserver returns the HT-FIFO partition-messages observer backed by this registry. Returns nil when the registry itself is nil so adapter call sites can pass the result through without checking; SQSMetrics.ObservePartitionMessage is also nil-receiver safe.

func (*Registry) SetFSMApplySyncMode

func (r *Registry) SetFSMApplySyncMode(activeLabel string)

SetFSMApplySyncMode forwards the resolved ELASTICKV_FSM_SYNC_MODE label to the PebbleMetrics gauge so operators can observe the active durability posture on this node. Safe to call with a nil registry.

func (*Registry) WriteConflictCollector

func (r *Registry) WriteConflictCollector() *WriteConflictCollector

WriteConflictCollector returns a collector that polls each MVCC store's per-(kind, key_prefix) OCC conflict counters and mirrors them into the elastickv_store_write_conflict_total Prometheus counter vector. Start it with the node's MVCC sources after the stores have been opened.

type SQSDepthSource

type SQSDepthSource interface {
	SnapshotQueueDepths(ctx context.Context) (snaps []SQSQueueDepth, ok bool)
}

SQSDepthSource is the contract a per-tick queue-depth source must satisfy. Implemented by *adapter.SQSServer; SQSObserver.Start calls SnapshotQueueDepths on every interval and writes the returned slice to the elastickv_sqs_queue_messages gauges.

Mirrors the Raft observer's StatusReader / ConfigReader pattern (monitoring/raft.go): the source returns ready-to-use snapshots and the observer owns the gauge state machine (forget-on-disappear, cardinality cap).

Two distinct empty-snapshot states, signalled by ok:

  • ok=true with an empty/nil slice — "this source legitimately has no queues this tick". Triggers when the node is a follower (leader-only emission) or the leader genuinely has zero queues configured. The observer diffs against the previous tick and ForgetQueue's any queue that disappeared so a former leader's gauges are cleared on step-down.

  • ok=false (regardless of the slice contents) — "the source could not produce a snapshot this tick" (transient catalog- read failure on the leader, context cancelled mid-scan). The observer must skip the diff entirely: leave existing gauges in place AND leave lastSeen untouched so the next successful tick can still diff against the previous good state. Without this branch a single failed scrape would wipe every depth gauge and produce a false "all queues drained" event on the dashboard until the next successful tick.

type SQSMetrics

type SQSMetrics struct {
	// contains filtered or unexported fields
}

SQSMetrics owns the Prometheus collectors for the SQS adapter. Mirrors DynamoDBMetrics' shape: per-Registry instance, label- cardinality-bounded by sqsMaxTrackedQueues, and split between counters (HT-FIFO partition activity) and gauges (queue depth).

The cardinality budget is split into two independent maps because the two metrics have different deletion semantics:

  • partitionMessages is a CounterVec. Counters are cumulative; deleting a series throws away its observed-since-process-start value, so we never call DeleteLabelValues on it. The counter budget therefore only ever grows — once a queue is admitted to trackedCounterQueues it stays admitted, and ForgetQueue does NOT touch this map.
  • queueDepth is a GaugeVec. Gauges have no cumulative state, so DeleteLabelValues is safe (and necessary, otherwise a deleted queue keeps reporting a frozen backlog on the dashboard). ForgetQueue both removes the gauge series and frees the queue's slot in trackedDepthQueues so a churn-heavy deployment can reuse the budget.

Sharing one map across the two metrics regresses the counter cap: ForgetQueue would free a slot, a new queue would be admitted, and the previous queue's counter series would still occupy a real-name label in Prometheus — letting cardinality grow without bound under queue churn (or after a leader step-down clears the observer's lastSeen). This was the P1 finding on PR #743.

func (*SQSMetrics) ForgetQueue

func (m *SQSMetrics) ForgetQueue(queue string)

ForgetQueue drops the three gauge series for a queue and frees its slot in the depth-side cardinality budget so a long-running deployment that regularly creates and deletes queues (CI workloads, ephemeral per-job queues) doesn't permanently wedge the 512-entry depth budget.

Three cases, by membership at call time:

  1. Queue is in trackedDepthQueues (admitted under its real name): drop the three state-labelled series and free the budget slot.
  2. Queue is in overflowDepthQueues (admitted, then collapsed onto the shared sqsQueueOverflow / "_other" label because the budget was saturated): remove from the overflow set. If that leaves the overflow set empty, drop the three _other series too — there's no remaining queue reporting into them, so the gauge would otherwise pin phantom backlog. While the overflow set is still non-empty the _other series stays put: tearing it down would zero out values that other overflow queues are legitimately maintaining.
  3. Queue is in neither map (never depth-observed): no-op.

The (queue, partition, action) counter series stays in all three cases — cumulative-by-design — and the queue's slot in trackedCounterQueues stays consumed. Reclaiming the counter slot would let a later queue be admitted under its real name while the original counter series still sat in Prometheus, which would let counter cardinality grow past sqsMaxTrackedQueues under churn or after a leader step-down clears the observer's lastSeen (the P1 finding on PR #743). Counter-side _other has no equivalent of the gauge phantom-backlog problem either, because cumulative counters legitimately reflect total operations from queues that have since been deleted.

Caller-audit per the standing semantic-change rule: only SQSObserver.observeOnce calls this (registry plumbing aside), and it's invoked exactly when a queue is observed in the previous tick but not the current one. The caller's contract — "drop gauges for a queue that disappeared so dashboards don't show frozen backlog" — is preserved AND extended consistently: overflow queues, previously a silent no-op, now also stop pinning the shared gauge once the last one disappears. The narrowed-but-still-correct scope (counter budget never reclaimed) is invisible to observeOnce because the observer only writes gauges; counters are fed via ObservePartitionMessage from a different code path entirely.

func (*SQSMetrics) ObservePartitionMessage

func (m *SQSMetrics) ObservePartitionMessage(queue string, partition uint32, action string)

ObservePartitionMessage implements SQSPartitionObserver. The (queue, action) pair is validated and (queue) is collapsed to the overflow label past sqsMaxTrackedQueues distinct names.

func (*SQSMetrics) ObserveQueueDepth

func (m *SQSMetrics) ObserveQueueDepth(queue string, visible, notVisible, delayed int64)

ObserveQueueDepth implements SQSDepthObserver. Updates the three state-labelled gauges for queue. Negative values are clamped to 0 so a transient scan failure (returning -1 sentinel from a future caller) cannot blast a fake backlog onto the dashboard.

type SQSObserver

type SQSObserver struct {
	// contains filtered or unexported fields
}

SQSObserver polls a SQSDepthSource on a fixed cadence and writes the result into the SQSMetrics gauge. Same shape as RaftObserver (monitoring/raft.go): the observer owns the state machine (current-vs-previous queue diff for ForgetQueue) and the source just returns ready-to-use snapshots. The observer is nil-tolerant at every entrypoint so test fixtures and metrics-disabled deployments can no-op without a defensive nil check.

func (*SQSObserver) ObserveOnce

func (o *SQSObserver) ObserveOnce(source SQSDepthSource)

ObserveOnce captures the latest depth snapshot synchronously. Mirrors RaftObserver.ObserveOnce; intended for tests that want deterministic single-tick behaviour without spinning up a ticker.

func (*SQSObserver) Start

func (o *SQSObserver) Start(ctx context.Context, source SQSDepthSource, interval time.Duration)

Start kicks off a background ticker that polls source every interval (defaulting to sqsDepthObserveInterval when zero) until ctx is canceled. The first observation runs synchronously so /metrics has fresh data on the first scrape; subsequent ticks run on the goroutine.

type SQSPartitionObserver

type SQSPartitionObserver interface {
	// ObservePartitionMessage increments the
	// sqs_partition_messages_total counter for one operation on
	// one (queue, partition) pair. Action must be one of
	// SQSPartitionActionSend / Receive / Delete; any other value
	// is silently dropped so a typo at a future call site cannot
	// crash the process.
	ObservePartitionMessage(queue string, partition uint32, action string)
}

SQSPartitionObserver records per-(queue, partition, action) counters for HT-FIFO operations. The interface is small so adapter call sites can pass a no-op observer in tests without pulling in the full Prometheus registry.

type SQSQueueDepth

type SQSQueueDepth struct {
	Queue      string
	Visible    int64
	NotVisible int64
	Delayed    int64
}

SQSQueueDepth is one queue's depth-attribute snapshot. Mirrors adapter.SQSQueueDepth byte-for-byte and is re-declared here to keep the monitoring package free of an adapter import. A drift between the two definitions surfaces as a compile error at the SQSObserver call site.

type WriteConflictCollector

type WriteConflictCollector struct {
	// contains filtered or unexported fields
}

WriteConflictCollector polls each registered store on a fixed interval and mirrors the snapshot into the Prometheus counter vector. Store-side counts are monotonic for the lifetime of a single store instance; counters advance by the positive delta against the last snapshot so a store reopen (Restore swap) does not produce negative values.

func (*WriteConflictCollector) ObserveOnce

func (c *WriteConflictCollector) ObserveOnce(sources []WriteConflictSource)

ObserveOnce is exposed for tests and single-shot callers.

func (*WriteConflictCollector) Start

func (c *WriteConflictCollector) Start(ctx context.Context, sources []WriteConflictSource, interval time.Duration)

Start polls sources on the given interval until ctx is canceled. Passing interval <= 0 uses defaultWriteConflictPollInterval (5 s), matching the DispatchCollector / PebbleCollector cadence.

type WriteConflictCounterSource

type WriteConflictCounterSource interface {
	WriteConflictCountsByPrefix() map[string]uint64
}

WriteConflictCounterSource abstracts per-group access to the MVCC store's OCC conflict counters. The concrete store implementations (pebbleStore, mvccStore, ShardStore, LeaderRoutedStore) satisfy this via WriteConflictCountsByPrefix(); keys in the returned map follow the "<kind>|<key_prefix>" encoding from the store package.

type WriteConflictMetrics

type WriteConflictMetrics struct {
	// contains filtered or unexported fields
}

WriteConflictMetrics owns the per-(group, kind, key_prefix) counter vector used by the write-conflict dashboard. Registered once per Registry.

type WriteConflictSource

type WriteConflictSource struct {
	GroupID    uint64
	GroupIDStr string
	Source     WriteConflictCounterSource
}

WriteConflictSource binds a raft group ID to a counter source. Multiple groups can be polled by a single collector on a sharded node. GroupIDStr is the pre-formatted decimal form of GroupID used as the "group" Prometheus label; pre-computing it avoids a per-tick strconv allocation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL