Documentation
¶
Index ¶
- Constants
- func AddressRequiresToken(addr string) bool
- func MetricsServeTask(server *http.Server, listener net.Listener, address string) func() error
- func MetricsShutdownTask(ctx context.Context, server *http.Server, address string) func() error
- func NewMetricsServer(handler http.Handler, bearerToken string) *http.Server
- func NewPprofHandler() http.Handler
- func NewPprofServer(bearerToken string) *http.Server
- func PprofServeTask(server *http.Server, listener net.Listener, address string) func() error
- func PprofShutdownTask(ctx context.Context, server *http.Server, address string) func() error
- func ProtectHandler(handler http.Handler, bearerToken string) http.Handler
- type DispatchCollector
- type DispatchCounterSource
- type DispatchSource
- type DynamoDBMetrics
- type DynamoDBRequestObserver
- type DynamoDBRequestReport
- type DynamoDBTableMetrics
- type HotPathMetrics
- type LeaseReadObserver
- type LuaFastPathCmd
- type LuaFastPathFallbackReason
- type LuaFastPathObserver
- type LuaMetrics
- type LuaScriptObserver
- type LuaScriptReport
- type PebbleCacheCapacitySource
- type PebbleCollector
- type PebbleMetrics
- type PebbleMetricsSource
- type PebbleSource
- type RaftMetrics
- type RaftObserver
- type RaftRuntime
- type RedisMetrics
- type RedisRequestObserver
- type RedisRequestReport
- type Registry
- func (r *Registry) DispatchCollector() *DispatchCollector
- func (r *Registry) DynamoDBObserver() DynamoDBRequestObserver
- func (r *Registry) Gatherer() prometheus.Gatherer
- func (r *Registry) Handler() http.Handler
- func (r *Registry) LeaseReadObserver() LeaseReadObserver
- func (r *Registry) LuaFastPathObserver() LuaFastPathObserver
- func (r *Registry) LuaObserver() LuaScriptObserver
- func (r *Registry) PebbleCollector() *PebbleCollector
- func (r *Registry) RaftObserver() *RaftObserver
- func (r *Registry) RaftProposalObserver(groupID uint64) *raftProposalObserver
- func (r *Registry) RedisObserver() RedisRequestObserver
- func (r *Registry) SQSObserver() *SQSObserver
- func (r *Registry) SQSPartitionObserver() SQSPartitionObserver
- func (r *Registry) SetFSMApplySyncMode(activeLabel string)
- func (r *Registry) WriteConflictCollector() *WriteConflictCollector
- type SQSDepthSource
- type SQSMetrics
- type SQSObserver
- type SQSPartitionObserver
- type SQSQueueDepth
- type WriteConflictCollector
- type WriteConflictCounterSource
- type WriteConflictMetrics
- type WriteConflictSource
Constants ¶
const ( LuaFastPathOutcomeHit = "hit" LuaFastPathOutcomeSkipLoaded = "skip_loaded" LuaFastPathOutcomeSkipCachedType = "skip_cached_type" )
LuaFastPathOutcome labels tag each Lua-side read fast-path decision so operators can see how often a given command (ZRANGEBYSCORE, ZSCORE, HGET, etc.) actually takes the fast path vs falls back.
const ( SQSPartitionActionSend = "send" SQSPartitionActionReceive = "receive" SQSPartitionActionDelete = "delete" )
SQS HT-FIFO partition action labels. Stable string set so dashboards / alerts can rely on the values not changing.
Variables ¶
This section is empty.
Functions ¶
func AddressRequiresToken ¶
AddressRequiresToken reports whether the address is exposed beyond loopback and therefore requires bearer-token protection.
func MetricsServeTask ¶
MetricsServeTask returns an errgroup task that serves the metrics endpoint until shutdown.
func MetricsShutdownTask ¶
MetricsShutdownTask returns an errgroup task that stops the metrics server on context cancellation.
func NewMetricsServer ¶
NewMetricsServer constructs an HTTP server for the protected metrics handler.
func NewPprofHandler ¶
NewPprofHandler returns an http.Handler that serves the Go runtime profiling endpoints under /debug/pprof/.
func NewPprofServer ¶
NewPprofServer creates an HTTP server for the pprof debug endpoints, optionally protected by bearer-token authentication.
func PprofServeTask ¶
PprofServeTask returns an errgroup task that serves the pprof endpoint until shutdown.
func PprofShutdownTask ¶
PprofShutdownTask returns an errgroup task that stops the pprof server on context cancellation.
Types ¶
type DispatchCollector ¶
type DispatchCollector struct {
// contains filtered or unexported fields
}
DispatchCollector polls the etcd raft Engine's atomic dispatch counters on a fixed interval and mirrors them into monotonic Prometheus counters. We poll rather than calling Add() inline in the raft path because those code paths are already hot and must not take any additional interface call; the counters are atomic.Uint64 in the engine and polling is cheap (O(groups) reads every 5s).
func (*DispatchCollector) ObserveOnce ¶
func (c *DispatchCollector) ObserveOnce(sources []DispatchSource)
ObserveOnce is exposed for tests and single-shot callers.
func (*DispatchCollector) Start ¶
func (c *DispatchCollector) Start(ctx context.Context, sources []DispatchSource, interval time.Duration)
Start polls sources on the given interval until ctx is canceled. Passing interval <= 0 uses defaultDispatchPollInterval (5 s), which matches the cadence of RaftObserver so operators see consistent refresh rates across dashboards.
type DispatchCounterSource ¶
type DispatchCounterSource interface {
DispatchDropCount() uint64
DispatchErrorCount() uint64
StepQueueFullCount() uint64
// DispatchErrorCountsByCode returns a snapshot of dispatch error
// counts keyed by grpc status code ("Unavailable",
// "DeadlineExceeded", ...). Sum of values equals
// DispatchErrorCount(). Implementations that do not break out by
// code may return an empty map.
DispatchErrorCountsByCode() map[string]uint64
}
DispatchCounterSource abstracts the etcd raft Engine's monotonic dispatch counters so monitoring can scrape them without importing the etcd package. The concrete etcd Engine satisfies this interface via its DispatchDropCount / DispatchErrorCount / StepQueueFullCount accessors.
type DispatchSource ¶
type DispatchSource struct {
GroupID uint64
Source DispatchCounterSource
}
DispatchSource binds a raft group ID to its counter source. Multiple groups can be polled by a single collector on a sharded node.
type DynamoDBMetrics ¶
type DynamoDBMetrics struct {
// contains filtered or unexported fields
}
func (*DynamoDBMetrics) ObserveDynamoDBRequest ¶
func (m *DynamoDBMetrics) ObserveDynamoDBRequest(report DynamoDBRequestReport)
ObserveDynamoDBRequest records the final outcome of a request.
func (*DynamoDBMetrics) ObserveInFlightChange ¶
func (m *DynamoDBMetrics) ObserveInFlightChange(operation string, delta float64)
ObserveInFlightChange adjusts the in-flight request gauge for an operation.
type DynamoDBRequestObserver ¶
type DynamoDBRequestObserver interface {
ObserveInFlightChange(operation string, delta float64)
ObserveDynamoDBRequest(report DynamoDBRequestReport)
}
DynamoDBRequestObserver records per-request DynamoDB-compatible API metrics.
type DynamoDBRequestReport ¶
type DynamoDBRequestReport struct {
Operation string
HTTPStatus int
ErrorType string
Duration time.Duration
RequestBytes int
ResponseBytes int
Tables []string
TableMetrics map[string]DynamoDBTableMetrics
}
DynamoDBRequestReport is the normalized result of a single request.
type DynamoDBTableMetrics ¶
DynamoDBTableMetrics captures table-scoped work done by a request.
type HotPathMetrics ¶
type HotPathMetrics struct {
// contains filtered or unexported fields
}
HotPathMetrics owns the Prometheus vectors introduced for the Redis GET hot-path dashboard. Kept in its own type so the Registry can hold a single instance and hand out scoped observer/collector objects.
type LeaseReadObserver ¶
type LeaseReadObserver struct {
// contains filtered or unexported fields
}
LeaseReadObserver implements kv.LeaseReadObserver by incrementing the elastickv_lease_read_total counter vector. Callers grab an instance via Registry.LeaseReadObserver(); the zero value is safe and silently drops samples, so tests can pass LeaseReadObserver{} as a stub.
func (LeaseReadObserver) ObserveLeaseRead ¶
func (o LeaseReadObserver) ObserveLeaseRead(hit bool)
ObserveLeaseRead records a single lease-read outcome.
type LuaFastPathCmd ¶
type LuaFastPathCmd struct {
// contains filtered or unexported fields
}
LuaFastPathCmd is a pre-resolved bundle of fast-path outcome counters for a single Lua command. Construct once via LuaFastPathObserver.ForCommand(cmd) at server startup; call the Observe* methods per redis.call(). Safe to copy.
func (LuaFastPathCmd) ObserveFallback ¶
func (c LuaFastPathCmd) ObserveFallback(reason LuaFastPathFallbackReason)
ObserveFallback routes to the counter for the given reason. Use the LuaFastPathFallback* constants; unknown values (including the zero value LuaFastPathFallbackNone, which should never reach here) land on the "other" bucket so cardinality stays bounded.
func (LuaFastPathCmd) ObserveHit ¶
func (c LuaFastPathCmd) ObserveHit()
Observe* methods record one outcome. Each is a single atomic increment when the counter is wired; a no-op on the zero value.
func (LuaFastPathCmd) ObserveSkipCachedType ¶
func (c LuaFastPathCmd) ObserveSkipCachedType()
func (LuaFastPathCmd) ObserveSkipLoaded ¶
func (c LuaFastPathCmd) ObserveSkipLoaded()
type LuaFastPathFallbackReason ¶
type LuaFastPathFallbackReason string
LuaFastPathFallbackReason is the typed label for server-side hit=false branches. A dedicated type (instead of a bare string) lets Go catch typos in call sites — producers in adapter/ and the switch in ObserveFallback stay in lockstep with the constants below, and the Prometheus label value is derived directly from the underlying string via string(reason).
const ( LuaFastPathFallbackIneligible LuaFastPathFallbackReason = "fallback_ineligible" LuaFastPathFallbackMissingKey LuaFastPathFallbackReason = "fallback_missing_key" LuaFastPathFallbackWrongType LuaFastPathFallbackReason = "fallback_wrong_type" LuaFastPathFallbackTruncated LuaFastPathFallbackReason = "fallback_truncated" LuaFastPathFallbackLargeOffset LuaFastPathFallbackReason = "fallback_large_offset" LuaFastPathFallbackOther LuaFastPathFallbackReason = "fallback_other" // LuaFastPathFallbackNone is the sentinel returned alongside // hit=true; callers must not record it. LuaFastPathFallbackNone LuaFastPathFallbackReason = "" )
Known fallback reasons. Subdivides the generic "fallback" outcome so operators can tell WHY the fast path gave up and route the fix accordingly (eligibility check, truncation, empty-key short-circuit, etc.).
type LuaFastPathObserver ¶
type LuaFastPathObserver struct {
// contains filtered or unexported fields
}
LuaFastPathObserver records fast-path outcomes for redis.call() inside Lua scripts. The zero value is safe and silently drops samples so tests can pass LuaFastPathObserver{} as a stub.
Hot-path shape: each Observe* call on a LuaFastPathCmd handle is a single non-blocking atomic increment on a pre-resolved prometheus.Counter (client_golang's default Counter uses sync/atomic internally). Callers resolve one LuaFastPathCmd per command at server construction to avoid CounterVec.WithLabelValues (mutex-guarded map lookup) on the hot path.
func (LuaFastPathObserver) ForCommand ¶
func (o LuaFastPathObserver) ForCommand(cmd string) LuaFastPathCmd
ForCommand pre-resolves the counter handles for cmd. Returns a zero-value LuaFastPathCmd when the observer is empty (tests), which silently drops all Observe* calls.
type LuaMetrics ¶
type LuaMetrics struct {
// contains filtered or unexported fields
}
LuaMetrics holds Prometheus metrics that break down where time goes inside a Lua script execution so that Lua VM slowness, Raft latency, and write conflict retries can be distinguished in dashboards.
func (*LuaMetrics) ObserveLuaScript ¶
func (m *LuaMetrics) ObserveLuaScript(report LuaScriptReport)
ObserveLuaScript records a completed script invocation.
type LuaScriptObserver ¶
type LuaScriptObserver interface {
ObserveLuaScript(report LuaScriptReport)
}
LuaScriptObserver is implemented by anything that wants to record Lua script execution metrics.
type LuaScriptReport ¶
type LuaScriptReport struct {
// LuaExecDuration is the cumulative time spent inside state.PCall across
// all retry attempts. This includes RedisCallDuration.
LuaExecDuration time.Duration
// RedisCallDuration is the cumulative time spent inside redis.call() /
// redis.pcall() handlers (scriptCtx.exec) — Raft reads and in-memory
// state updates. Pure VM time = LuaExecDuration - RedisCallDuration.
RedisCallDuration time.Duration
// RedisCallCount is the total number of redis.call()/pcall() invocations
// across all retry attempts. High counts explain high RedisCallDuration.
RedisCallCount int
// CommitDuration is the cumulative time spent in scriptCtx.commit()
// (coordinator.Dispatch → Raft consensus + storage write).
CommitDuration time.Duration
// ConflictRetries is the number of extra attempts caused by write
// conflicts or locked transactions (0 = succeeded on the first try).
ConflictRetries int
// IsError is true when the final outcome was an error.
IsError bool
}
LuaScriptReport summarises one Lua script invocation (EVAL / EVALSHA), covering all retryRedisWrite attempts.
type PebbleCacheCapacitySource ¶
type PebbleCacheCapacitySource interface {
BlockCacheCapacityBytes() int64
}
PebbleCacheCapacitySource is an optional companion to PebbleMetricsSource: sources that expose the configured block-cache capacity (in bytes) are queried by the collector to populate elastickv_pebble_block_cache_capacity_bytes. Implementations return 0 to indicate "not known / store closed"; the collector then leaves the gauge at its last observed value for that tick. The concrete *store pebbleStore satisfies this via BlockCacheCapacityBytes(); tests can omit it.
type PebbleCollector ¶
type PebbleCollector struct {
// contains filtered or unexported fields
}
PebbleCollector polls each registered Pebble store on a fixed interval and mirrors the snapshot into the Prometheus vectors. Gauges are overwritten; counters advance by the positive delta against the previous snapshot.
func (*PebbleCollector) ObserveOnce ¶
func (c *PebbleCollector) ObserveOnce(sources []PebbleSource)
ObserveOnce is exposed for tests and single-shot callers.
func (*PebbleCollector) Start ¶
func (c *PebbleCollector) Start(ctx context.Context, sources []PebbleSource, interval time.Duration)
Start begins polling sources on interval until ctx is canceled. Passing interval <= 0 uses defaultPebblePollInterval (5 s), matching the DispatchCollector cadence so operators see consistent refresh rates across dashboards. Pebble.Metrics() acquires internal mutexes but is not expensive; 5 s gives ample headroom.
type PebbleMetrics ¶
type PebbleMetrics struct {
// contains filtered or unexported fields
}
PebbleMetrics owns the Prometheus vectors for Pebble LSM internals. One instance per registry; shared by all groups (labelled by group ID + level where relevant).
func (*PebbleMetrics) SetFSMApplySyncMode ¶
func (m *PebbleMetrics) SetFSMApplySyncMode(activeLabel string)
SetFSMApplySyncMode records which ELASTICKV_FSM_SYNC_MODE is active. activeLabel must be "sync" or "nosync"; any other value is coerced to "sync" to match the store resolver's fallback behaviour for unknown ELASTICKV_FSM_SYNC_MODE values (see store.resolveFSMApplyWriteOpts). This keeps the gauge's two-row shape stable: exactly one of {"sync","nosync"} is 1 at any time and the other is 0.
Call this once at startup after the store package has resolved the env var. Invoking again is safe and idempotent: the new label goes to 1 and the other known label goes to 0.
type PebbleMetricsSource ¶
PebbleMetricsSource abstracts the per-group access to a Pebble DB's Metrics(). The concrete *store pebbleStore satisfies this via its Metrics() accessor. Returning nil (e.g. store closed mid-restore) is allowed; the collector will skip that group for the tick.
type PebbleSource ¶
type PebbleSource struct {
GroupID uint64
GroupIDStr string
Source PebbleMetricsSource
}
PebbleSource binds a raft group ID to its Pebble store. Multiple groups can be polled by a single collector on a sharded node. GroupIDStr is the pre-formatted decimal form of GroupID used as the "group" Prometheus label; pre-computing it avoids a per-tick strconv.FormatUint allocation in observeOnce.
type RaftMetrics ¶
type RaftMetrics struct {
// contains filtered or unexported fields
}
RaftMetrics holds all Prometheus gauge vectors for a single Raft group. Fields are populated by observeRuntime from raftengine status snapshots.
type RaftObserver ¶
type RaftObserver struct {
// contains filtered or unexported fields
}
func (*RaftObserver) ObserveOnce ¶
func (o *RaftObserver) ObserveOnce(runtimes []RaftRuntime)
ObserveOnce captures the latest raft state for all configured runtimes.
func (*RaftObserver) Start ¶
func (o *RaftObserver) Start(ctx context.Context, runtimes []RaftRuntime, interval time.Duration)
Start polls raft state and configuration on a fixed interval until ctx is canceled.
type RaftRuntime ¶
type RaftRuntime struct {
GroupID uint64
StatusReader raftengine.StatusReader
ConfigReader raftengine.ConfigReader
}
RaftRuntime describes a raft group observed by the metrics exporter.
type RedisMetrics ¶
type RedisMetrics struct {
// contains filtered or unexported fields
}
RedisMetrics holds all Prometheus metric vectors for the Redis adapter.
func (*RedisMetrics) ObserveRedisRequest ¶
func (m *RedisMetrics) ObserveRedisRequest(report RedisRequestReport)
ObserveRedisRequest records the final outcome of a Redis command.
type RedisRequestObserver ¶
type RedisRequestObserver interface {
ObserveRedisRequest(report RedisRequestReport)
}
RedisRequestObserver records per-command Redis API metrics.
type RedisRequestReport ¶
type RedisRequestReport struct {
Command string
IsError bool
Duration time.Duration
// Unsupported indicates the command was rejected because the adapter
// has no route for it. When true, ObserveRedisRequest additionally
// records the real (bounded) command name in
// elastickv_redis_unsupported_commands_total alongside the existing
// "unknown"-bucketed counters, which are preserved unchanged.
Unsupported bool
}
RedisRequestReport is the normalized result of a single Redis command.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry owns the Prometheus registry used by a single Elastickv node.
func NewRegistry ¶
NewRegistry builds a registry with constant labels that identify the local node.
func (*Registry) DispatchCollector ¶
func (r *Registry) DispatchCollector() *DispatchCollector
DispatchCollector returns a collector that polls the etcd raft engine's dispatch counters and exports them to Prometheus. Start it with the node's raft sources after engine Open() completes.
func (*Registry) DynamoDBObserver ¶
func (r *Registry) DynamoDBObserver() DynamoDBRequestObserver
DynamoDBObserver returns the DynamoDB request observer backed by this registry.
func (*Registry) Gatherer ¶
func (r *Registry) Gatherer() prometheus.Gatherer
Gatherer exposes the underlying gatherer for tests and custom exporters.
func (*Registry) Handler ¶
Handler returns an HTTP handler that exposes the Prometheus scrape endpoint.
func (*Registry) LeaseReadObserver ¶
func (r *Registry) LeaseReadObserver() LeaseReadObserver
LeaseReadObserver returns an observer for the kv coordinator's LeaseRead fast-path counter. Returns a zero-value observer when the registry is nil so callers can pass the result through without checking; the zero value silently drops samples.
func (*Registry) LuaFastPathObserver ¶
func (r *Registry) LuaFastPathObserver() LuaFastPathObserver
LuaFastPathObserver returns an observer for Lua-side redis.call() fast-path outcomes (hit / skip / fallback per command). Zero-value safe for tests and tools that do not wire a registry.
func (*Registry) LuaObserver ¶
func (r *Registry) LuaObserver() LuaScriptObserver
LuaObserver returns the Lua script execution observer backed by this registry.
func (*Registry) PebbleCollector ¶
func (r *Registry) PebbleCollector() *PebbleCollector
PebbleCollector returns a collector that polls each Pebble store's Metrics() snapshot and mirrors the operationally useful fields (L0 sublevels, compaction debt, memtable, block cache) into Prometheus. Start it with the node's Pebble sources after the stores have been opened.
func (*Registry) RaftObserver ¶
func (r *Registry) RaftObserver() *RaftObserver
RaftObserver returns the Raft topology observer backed by this registry.
func (*Registry) RaftProposalObserver ¶
RaftProposalObserver returns a group-scoped observer for failed raft proposals.
func (*Registry) RedisObserver ¶
func (r *Registry) RedisObserver() RedisRequestObserver
RedisObserver returns the Redis request observer backed by this registry.
func (*Registry) SQSObserver ¶
func (r *Registry) SQSObserver() *SQSObserver
SQSObserver returns the queue-depth gauge observer backed by this registry. Same shape as RaftObserver / RedisObserver: callers pull it via the registry, then drive Start(ctx, source, interval) from main.go's startMonitoringCollectors.
func (*Registry) SQSPartitionObserver ¶
func (r *Registry) SQSPartitionObserver() SQSPartitionObserver
SQSPartitionObserver returns the HT-FIFO partition-messages observer backed by this registry. Returns nil when the registry itself is nil so adapter call sites can pass the result through without checking; SQSMetrics.ObservePartitionMessage is also nil-receiver safe.
func (*Registry) SetFSMApplySyncMode ¶
SetFSMApplySyncMode forwards the resolved ELASTICKV_FSM_SYNC_MODE label to the PebbleMetrics gauge so operators can observe the active durability posture on this node. Safe to call with a nil registry.
func (*Registry) WriteConflictCollector ¶
func (r *Registry) WriteConflictCollector() *WriteConflictCollector
WriteConflictCollector returns a collector that polls each MVCC store's per-(kind, key_prefix) OCC conflict counters and mirrors them into the elastickv_store_write_conflict_total Prometheus counter vector. Start it with the node's MVCC sources after the stores have been opened.
type SQSDepthSource ¶
type SQSDepthSource interface {
SnapshotQueueDepths(ctx context.Context) (snaps []SQSQueueDepth, ok bool)
}
SQSDepthSource is the contract a per-tick queue-depth source must satisfy. Implemented by *adapter.SQSServer; SQSObserver.Start calls SnapshotQueueDepths on every interval and writes the returned slice to the elastickv_sqs_queue_messages gauges.
Mirrors the Raft observer's StatusReader / ConfigReader pattern (monitoring/raft.go): the source returns ready-to-use snapshots and the observer owns the gauge state machine (forget-on-disappear, cardinality cap).
Two distinct empty-snapshot states, signalled by ok:
ok=true with an empty/nil slice — "this source legitimately has no queues this tick". Triggers when the node is a follower (leader-only emission) or the leader genuinely has zero queues configured. The observer diffs against the previous tick and ForgetQueue's any queue that disappeared so a former leader's gauges are cleared on step-down.
ok=false (regardless of the slice contents) — "the source could not produce a snapshot this tick" (transient catalog- read failure on the leader, context cancelled mid-scan). The observer must skip the diff entirely: leave existing gauges in place AND leave lastSeen untouched so the next successful tick can still diff against the previous good state. Without this branch a single failed scrape would wipe every depth gauge and produce a false "all queues drained" event on the dashboard until the next successful tick.
type SQSMetrics ¶
type SQSMetrics struct {
// contains filtered or unexported fields
}
SQSMetrics owns the Prometheus collectors for the SQS adapter. Mirrors DynamoDBMetrics' shape: per-Registry instance, label- cardinality-bounded by sqsMaxTrackedQueues, and split between counters (HT-FIFO partition activity) and gauges (queue depth).
The cardinality budget is split into two independent maps because the two metrics have different deletion semantics:
- partitionMessages is a CounterVec. Counters are cumulative; deleting a series throws away its observed-since-process-start value, so we never call DeleteLabelValues on it. The counter budget therefore only ever grows — once a queue is admitted to trackedCounterQueues it stays admitted, and ForgetQueue does NOT touch this map.
- queueDepth is a GaugeVec. Gauges have no cumulative state, so DeleteLabelValues is safe (and necessary, otherwise a deleted queue keeps reporting a frozen backlog on the dashboard). ForgetQueue both removes the gauge series and frees the queue's slot in trackedDepthQueues so a churn-heavy deployment can reuse the budget.
Sharing one map across the two metrics regresses the counter cap: ForgetQueue would free a slot, a new queue would be admitted, and the previous queue's counter series would still occupy a real-name label in Prometheus — letting cardinality grow without bound under queue churn (or after a leader step-down clears the observer's lastSeen). This was the P1 finding on PR #743.
func (*SQSMetrics) ForgetQueue ¶
func (m *SQSMetrics) ForgetQueue(queue string)
ForgetQueue drops the three gauge series for a queue and frees its slot in the depth-side cardinality budget so a long-running deployment that regularly creates and deletes queues (CI workloads, ephemeral per-job queues) doesn't permanently wedge the 512-entry depth budget.
Three cases, by membership at call time:
- Queue is in trackedDepthQueues (admitted under its real name): drop the three state-labelled series and free the budget slot.
- Queue is in overflowDepthQueues (admitted, then collapsed onto the shared sqsQueueOverflow / "_other" label because the budget was saturated): remove from the overflow set. If that leaves the overflow set empty, drop the three _other series too — there's no remaining queue reporting into them, so the gauge would otherwise pin phantom backlog. While the overflow set is still non-empty the _other series stays put: tearing it down would zero out values that other overflow queues are legitimately maintaining.
- Queue is in neither map (never depth-observed): no-op.
The (queue, partition, action) counter series stays in all three cases — cumulative-by-design — and the queue's slot in trackedCounterQueues stays consumed. Reclaiming the counter slot would let a later queue be admitted under its real name while the original counter series still sat in Prometheus, which would let counter cardinality grow past sqsMaxTrackedQueues under churn or after a leader step-down clears the observer's lastSeen (the P1 finding on PR #743). Counter-side _other has no equivalent of the gauge phantom-backlog problem either, because cumulative counters legitimately reflect total operations from queues that have since been deleted.
Caller-audit per the standing semantic-change rule: only SQSObserver.observeOnce calls this (registry plumbing aside), and it's invoked exactly when a queue is observed in the previous tick but not the current one. The caller's contract — "drop gauges for a queue that disappeared so dashboards don't show frozen backlog" — is preserved AND extended consistently: overflow queues, previously a silent no-op, now also stop pinning the shared gauge once the last one disappears. The narrowed-but-still-correct scope (counter budget never reclaimed) is invisible to observeOnce because the observer only writes gauges; counters are fed via ObservePartitionMessage from a different code path entirely.
func (*SQSMetrics) ObservePartitionMessage ¶
func (m *SQSMetrics) ObservePartitionMessage(queue string, partition uint32, action string)
ObservePartitionMessage implements SQSPartitionObserver. The (queue, action) pair is validated and (queue) is collapsed to the overflow label past sqsMaxTrackedQueues distinct names.
func (*SQSMetrics) ObserveQueueDepth ¶
func (m *SQSMetrics) ObserveQueueDepth(queue string, visible, notVisible, delayed int64)
ObserveQueueDepth implements SQSDepthObserver. Updates the three state-labelled gauges for queue. Negative values are clamped to 0 so a transient scan failure (returning -1 sentinel from a future caller) cannot blast a fake backlog onto the dashboard.
type SQSObserver ¶
type SQSObserver struct {
// contains filtered or unexported fields
}
SQSObserver polls a SQSDepthSource on a fixed cadence and writes the result into the SQSMetrics gauge. Same shape as RaftObserver (monitoring/raft.go): the observer owns the state machine (current-vs-previous queue diff for ForgetQueue) and the source just returns ready-to-use snapshots. The observer is nil-tolerant at every entrypoint so test fixtures and metrics-disabled deployments can no-op without a defensive nil check.
func (*SQSObserver) ObserveOnce ¶
func (o *SQSObserver) ObserveOnce(source SQSDepthSource)
ObserveOnce captures the latest depth snapshot synchronously. Mirrors RaftObserver.ObserveOnce; intended for tests that want deterministic single-tick behaviour without spinning up a ticker.
func (*SQSObserver) Start ¶
func (o *SQSObserver) Start(ctx context.Context, source SQSDepthSource, interval time.Duration)
Start kicks off a background ticker that polls source every interval (defaulting to sqsDepthObserveInterval when zero) until ctx is canceled. The first observation runs synchronously so /metrics has fresh data on the first scrape; subsequent ticks run on the goroutine.
type SQSPartitionObserver ¶
type SQSPartitionObserver interface {
// ObservePartitionMessage increments the
// sqs_partition_messages_total counter for one operation on
// one (queue, partition) pair. Action must be one of
// SQSPartitionActionSend / Receive / Delete; any other value
// is silently dropped so a typo at a future call site cannot
// crash the process.
ObservePartitionMessage(queue string, partition uint32, action string)
}
SQSPartitionObserver records per-(queue, partition, action) counters for HT-FIFO operations. The interface is small so adapter call sites can pass a no-op observer in tests without pulling in the full Prometheus registry.
type SQSQueueDepth ¶
SQSQueueDepth is one queue's depth-attribute snapshot. Mirrors adapter.SQSQueueDepth byte-for-byte and is re-declared here to keep the monitoring package free of an adapter import. A drift between the two definitions surfaces as a compile error at the SQSObserver call site.
type WriteConflictCollector ¶
type WriteConflictCollector struct {
// contains filtered or unexported fields
}
WriteConflictCollector polls each registered store on a fixed interval and mirrors the snapshot into the Prometheus counter vector. Store-side counts are monotonic for the lifetime of a single store instance; counters advance by the positive delta against the last snapshot so a store reopen (Restore swap) does not produce negative values.
func (*WriteConflictCollector) ObserveOnce ¶
func (c *WriteConflictCollector) ObserveOnce(sources []WriteConflictSource)
ObserveOnce is exposed for tests and single-shot callers.
func (*WriteConflictCollector) Start ¶
func (c *WriteConflictCollector) Start(ctx context.Context, sources []WriteConflictSource, interval time.Duration)
Start polls sources on the given interval until ctx is canceled. Passing interval <= 0 uses defaultWriteConflictPollInterval (5 s), matching the DispatchCollector / PebbleCollector cadence.
type WriteConflictCounterSource ¶
WriteConflictCounterSource abstracts per-group access to the MVCC store's OCC conflict counters. The concrete store implementations (pebbleStore, mvccStore, ShardStore, LeaderRoutedStore) satisfy this via WriteConflictCountsByPrefix(); keys in the returned map follow the "<kind>|<key_prefix>" encoding from the store package.
type WriteConflictMetrics ¶
type WriteConflictMetrics struct {
// contains filtered or unexported fields
}
WriteConflictMetrics owns the per-(group, kind, key_prefix) counter vector used by the write-conflict dashboard. Registered once per Registry.
type WriteConflictSource ¶
type WriteConflictSource struct {
GroupID uint64
GroupIDStr string
Source WriteConflictCounterSource
}
WriteConflictSource binds a raft group ID to a counter source. Multiple groups can be polled by a single collector on a sharded node. GroupIDStr is the pre-formatted decimal form of GroupID used as the "group" Prometheus label; pre-computing it avoids a per-tick strconv allocation.