monitoring

package
v0.0.0-...-07ff07d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: AGPL-3.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LuaFastPathOutcomeHit            = "hit"
	LuaFastPathOutcomeSkipLoaded     = "skip_loaded"
	LuaFastPathOutcomeSkipCachedType = "skip_cached_type"
)

LuaFastPathOutcome labels tag each Lua-side read fast-path decision so operators can see how often a given command (ZRANGEBYSCORE, ZSCORE, HGET, etc.) actually takes the fast path vs falls back.

View Source
const (
	SQSPartitionActionSend    = "send"
	SQSPartitionActionReceive = "receive"
	SQSPartitionActionDelete  = "delete"
)

SQS HT-FIFO partition action labels. Stable string set so dashboards / alerts can rely on the values not changing.

Variables

This section is empty.

Functions

func AddressRequiresToken

func AddressRequiresToken(addr string) bool

AddressRequiresToken reports whether the address is exposed beyond loopback and therefore requires bearer-token protection.

func MetricsServeTask

func MetricsServeTask(server *http.Server, listener net.Listener, address string) func() error

MetricsServeTask returns an errgroup task that serves the metrics endpoint until shutdown.

func MetricsShutdownTask

func MetricsShutdownTask(ctx context.Context, server *http.Server, address string) func() error

MetricsShutdownTask returns an errgroup task that stops the metrics server on context cancellation.

func NewMetricsServer

func NewMetricsServer(handler http.Handler, bearerToken string) *http.Server

NewMetricsServer constructs an HTTP server for the protected metrics handler.

func NewPprofHandler

func NewPprofHandler() http.Handler

NewPprofHandler returns an http.Handler that serves the Go runtime profiling endpoints under /debug/pprof/.

func NewPprofServer

func NewPprofServer(bearerToken string) *http.Server

NewPprofServer creates an HTTP server for the pprof debug endpoints, optionally protected by bearer-token authentication.

func PprofServeTask

func PprofServeTask(server *http.Server, listener net.Listener, address string) func() error

PprofServeTask returns an errgroup task that serves the pprof endpoint until shutdown.

func PprofShutdownTask

func PprofShutdownTask(ctx context.Context, server *http.Server, address string) func() error

PprofShutdownTask returns an errgroup task that stops the pprof server on context cancellation.

func ProtectHandler

func ProtectHandler(handler http.Handler, bearerToken string) http.Handler

ProtectHandler wraps a metrics handler with optional bearer-token authentication.

Types

type DispatchCollector

type DispatchCollector struct {
	// contains filtered or unexported fields
}

DispatchCollector polls the etcd raft Engine's atomic dispatch counters on a fixed interval and mirrors them into monotonic Prometheus counters. We poll rather than calling Add() inline in the raft path because those code paths are already hot and must not take any additional interface call; the counters are atomic.Uint64 in the engine and polling is cheap (O(groups) reads every 5s).

func (*DispatchCollector) ObserveOnce

func (c *DispatchCollector) ObserveOnce(sources []DispatchSource)

ObserveOnce is exposed for tests and single-shot callers.

func (*DispatchCollector) Start

func (c *DispatchCollector) Start(ctx context.Context, sources []DispatchSource, interval time.Duration)

Start polls sources on the given interval until ctx is canceled. Passing interval <= 0 uses defaultDispatchPollInterval (5 s), which matches the cadence of RaftObserver so operators see consistent refresh rates across dashboards.

type DispatchCounterSource

type DispatchCounterSource interface {
	DispatchDropCount() uint64
	DispatchErrorCount() uint64
	StepQueueFullCount() uint64
	// DispatchErrorCountsByCode returns a snapshot of dispatch error
	// counts keyed by grpc status code ("Unavailable",
	// "DeadlineExceeded", ...). Sum of values equals
	// DispatchErrorCount(). Implementations that do not break out by
	// code may return an empty map.
	DispatchErrorCountsByCode() map[string]uint64
}

DispatchCounterSource abstracts the etcd raft Engine's monotonic dispatch counters so monitoring can scrape them without importing the etcd package. The concrete etcd Engine satisfies this interface via its DispatchDropCount / DispatchErrorCount / StepQueueFullCount accessors.

type DispatchSource

type DispatchSource struct {
	GroupID uint64
	Source  DispatchCounterSource
}

DispatchSource binds a raft group ID to its counter source. Multiple groups can be polled by a single collector on a sharded node.

type DynamoDBMetrics

type DynamoDBMetrics struct {
	// contains filtered or unexported fields
}

func (*DynamoDBMetrics) ObserveDynamoDBRequest

func (m *DynamoDBMetrics) ObserveDynamoDBRequest(report DynamoDBRequestReport)

ObserveDynamoDBRequest records the final outcome of a request.

func (*DynamoDBMetrics) ObserveInFlightChange

func (m *DynamoDBMetrics) ObserveInFlightChange(operation string, delta float64)

ObserveInFlightChange adjusts the in-flight request gauge for an operation.

type DynamoDBRequestObserver

type DynamoDBRequestObserver interface {
	ObserveInFlightChange(operation string, delta float64)
	ObserveDynamoDBRequest(report DynamoDBRequestReport)
}

DynamoDBRequestObserver records per-request DynamoDB-compatible API metrics.

type DynamoDBRequestReport

type DynamoDBRequestReport struct {
	Operation     string
	HTTPStatus    int
	ErrorType     string
	Duration      time.Duration
	RequestBytes  int
	ResponseBytes int
	Tables        []string
	TableMetrics  map[string]DynamoDBTableMetrics
}

DynamoDBRequestReport is the normalized result of a single request.

type DynamoDBTableMetrics

type DynamoDBTableMetrics struct {
	ReturnedItems int
	ScannedItems  int
	WrittenItems  int
}

DynamoDBTableMetrics captures table-scoped work done by a request.

type HotPathMetrics

type HotPathMetrics struct {
	// contains filtered or unexported fields
}

HotPathMetrics owns the Prometheus vectors introduced for the Redis GET hot-path dashboard. Kept in its own type so the Registry can hold a single instance and hand out scoped observer/collector objects.

type LeaseReadObserver

type LeaseReadObserver struct {
	// contains filtered or unexported fields
}

LeaseReadObserver implements kv.LeaseReadObserver by incrementing the elastickv_lease_read_total counter vector. Callers grab an instance via Registry.LeaseReadObserver(); the zero value is safe and silently drops samples, so tests can pass LeaseReadObserver{} as a stub.

func (LeaseReadObserver) ObserveLeaseRead

func (o LeaseReadObserver) ObserveLeaseRead(hit bool)

ObserveLeaseRead records a single lease-read outcome.

type LuaFastPathCmd

type LuaFastPathCmd struct {
	// contains filtered or unexported fields
}

LuaFastPathCmd is a pre-resolved bundle of fast-path outcome counters for a single Lua command. Construct once via LuaFastPathObserver.ForCommand(cmd) at server startup; call the Observe* methods per redis.call(). Safe to copy.

func (LuaFastPathCmd) ObserveFallback

func (c LuaFastPathCmd) ObserveFallback(reason LuaFastPathFallbackReason)

ObserveFallback routes to the counter for the given reason. Use the LuaFastPathFallback* constants; unknown values (including the zero value LuaFastPathFallbackNone, which should never reach here) land on the "other" bucket so cardinality stays bounded.

func (LuaFastPathCmd) ObserveHit

func (c LuaFastPathCmd) ObserveHit()

Observe* methods record one outcome. Each is a single atomic increment when the counter is wired; a no-op on the zero value.

func (LuaFastPathCmd) ObserveSkipCachedType

func (c LuaFastPathCmd) ObserveSkipCachedType()

func (LuaFastPathCmd) ObserveSkipLoaded

func (c LuaFastPathCmd) ObserveSkipLoaded()

type LuaFastPathFallbackReason

type LuaFastPathFallbackReason string

LuaFastPathFallbackReason is the typed label for server-side hit=false branches. A dedicated type (instead of a bare string) lets Go catch typos in call sites — producers in adapter/ and the switch in ObserveFallback stay in lockstep with the constants below, and the Prometheus label value is derived directly from the underlying string via string(reason).

const (
	LuaFastPathFallbackIneligible  LuaFastPathFallbackReason = "fallback_ineligible"
	LuaFastPathFallbackMissingKey  LuaFastPathFallbackReason = "fallback_missing_key"
	LuaFastPathFallbackWrongType   LuaFastPathFallbackReason = "fallback_wrong_type"
	LuaFastPathFallbackTruncated   LuaFastPathFallbackReason = "fallback_truncated"
	LuaFastPathFallbackLargeOffset LuaFastPathFallbackReason = "fallback_large_offset"
	LuaFastPathFallbackOther       LuaFastPathFallbackReason = "fallback_other"
	// LuaFastPathFallbackNone is the sentinel returned alongside
	// hit=true; callers must not record it.
	LuaFastPathFallbackNone LuaFastPathFallbackReason = ""
)

Known fallback reasons. Subdivides the generic "fallback" outcome so operators can tell WHY the fast path gave up and route the fix accordingly (eligibility check, truncation, empty-key short-circuit, etc.).

type LuaFastPathObserver

type LuaFastPathObserver struct {
	// contains filtered or unexported fields
}

LuaFastPathObserver records fast-path outcomes for redis.call() inside Lua scripts. The zero value is safe and silently drops samples so tests can pass LuaFastPathObserver{} as a stub.

Hot-path shape: each Observe* call on a LuaFastPathCmd handle is a single non-blocking atomic increment on a pre-resolved prometheus.Counter (client_golang's default Counter uses sync/atomic internally). Callers resolve one LuaFastPathCmd per command at server construction to avoid CounterVec.WithLabelValues (mutex-guarded map lookup) on the hot path.

func (LuaFastPathObserver) ForCommand

func (o LuaFastPathObserver) ForCommand(cmd string) LuaFastPathCmd

ForCommand pre-resolves the counter handles for cmd. Returns a zero-value LuaFastPathCmd when the observer is empty (tests), which silently drops all Observe* calls.

type LuaMetrics

type LuaMetrics struct {
	// contains filtered or unexported fields
}

LuaMetrics holds Prometheus metrics that break down where time goes inside a Lua script execution so that Lua VM slowness, Raft latency, and write conflict retries can be distinguished in dashboards.

func (*LuaMetrics) ObserveLuaScript

func (m *LuaMetrics) ObserveLuaScript(report LuaScriptReport)

ObserveLuaScript records a completed script invocation.

type LuaScriptObserver

type LuaScriptObserver interface {
	ObserveLuaScript(report LuaScriptReport)
}

LuaScriptObserver is implemented by anything that wants to record Lua script execution metrics.

type LuaScriptReport

type LuaScriptReport struct {
	// LuaExecDuration is the cumulative time spent inside state.PCall across
	// all retry attempts. This includes RedisCallDuration.
	LuaExecDuration time.Duration
	// RedisCallDuration is the cumulative time spent inside redis.call() /
	// redis.pcall() handlers (scriptCtx.exec) — Raft reads and in-memory
	// state updates. Pure VM time = LuaExecDuration - RedisCallDuration.
	RedisCallDuration time.Duration
	// RedisCallCount is the total number of redis.call()/pcall() invocations
	// across all retry attempts. High counts explain high RedisCallDuration.
	RedisCallCount int
	// CommitDuration is the cumulative time spent in scriptCtx.commit()
	// (coordinator.Dispatch → Raft consensus + storage write).
	CommitDuration time.Duration
	// ConflictRetries is the number of extra attempts caused by write
	// conflicts or locked transactions (0 = succeeded on the first try).
	ConflictRetries int
	// IsError is true when the final outcome was an error.
	IsError bool
}

LuaScriptReport summarises one Lua script invocation (EVAL / EVALSHA), covering all retryRedisWrite attempts.

type PebbleCacheCapacitySource

type PebbleCacheCapacitySource interface {
	BlockCacheCapacityBytes() int64
}

PebbleCacheCapacitySource is an optional companion to PebbleMetricsSource: sources that expose the configured block-cache capacity (in bytes) are queried by the collector to populate elastickv_pebble_block_cache_capacity_bytes. Implementations return 0 to indicate "not known / store closed"; the collector then leaves the gauge at its last observed value for that tick. The concrete *store pebbleStore satisfies this via BlockCacheCapacityBytes(); tests can omit it.

type PebbleCollector

type PebbleCollector struct {
	// contains filtered or unexported fields
}

PebbleCollector polls each registered Pebble store on a fixed interval and mirrors the snapshot into the Prometheus vectors. Gauges are overwritten; counters advance by the positive delta against the previous snapshot.

func (*PebbleCollector) ObserveOnce

func (c *PebbleCollector) ObserveOnce(sources []PebbleSource)

ObserveOnce is exposed for tests and single-shot callers.

func (*PebbleCollector) Start

func (c *PebbleCollector) Start(ctx context.Context, sources []PebbleSource, interval time.Duration)

Start begins polling sources on interval until ctx is canceled. Passing interval <= 0 uses defaultPebblePollInterval (5 s), matching the DispatchCollector cadence so operators see consistent refresh rates across dashboards. Pebble.Metrics() acquires internal mutexes but is not expensive; 5 s gives ample headroom.

type PebbleMetrics

type PebbleMetrics struct {
	// contains filtered or unexported fields
}

PebbleMetrics owns the Prometheus vectors for Pebble LSM internals. One instance per registry; shared by all groups (labelled by group ID + level where relevant).

func (*PebbleMetrics) SetFSMApplySyncMode

func (m *PebbleMetrics) SetFSMApplySyncMode(activeLabel string)

SetFSMApplySyncMode records which ELASTICKV_FSM_SYNC_MODE is active. activeLabel must be "sync" or "nosync"; any other value is coerced to "sync" to match the store resolver's fallback behaviour for unknown ELASTICKV_FSM_SYNC_MODE values (see store.resolveFSMApplyWriteOpts). This keeps the gauge's two-row shape stable: exactly one of {"sync","nosync"} is 1 at any time and the other is 0.

Call this once at startup after the store package has resolved the env var. Invoking again is safe and idempotent: the new label goes to 1 and the other known label goes to 0.

type PebbleMetricsSource

type PebbleMetricsSource interface {
	Metrics() *pebble.Metrics
}

PebbleMetricsSource abstracts the per-group access to a Pebble DB's Metrics(). The concrete *store pebbleStore satisfies this via its Metrics() accessor. Returning nil (e.g. store closed mid-restore) is allowed; the collector will skip that group for the tick.

type PebbleSource

type PebbleSource struct {
	GroupID    uint64
	GroupIDStr string
	Source     PebbleMetricsSource
}

PebbleSource binds a raft group ID to its Pebble store. Multiple groups can be polled by a single collector on a sharded node. GroupIDStr is the pre-formatted decimal form of GroupID used as the "group" Prometheus label; pre-computing it avoids a per-tick strconv.FormatUint allocation in observeOnce.

type RaftMetrics

type RaftMetrics struct {
	// contains filtered or unexported fields
}

RaftMetrics holds all Prometheus gauge vectors for a single Raft group. Fields are populated by observeRuntime from raftengine status snapshots.

type RaftObserver

type RaftObserver struct {
	// contains filtered or unexported fields
}

func (*RaftObserver) ObserveOnce

func (o *RaftObserver) ObserveOnce(runtimes []RaftRuntime)

ObserveOnce captures the latest raft state for all configured runtimes.

func (*RaftObserver) Start

func (o *RaftObserver) Start(ctx context.Context, runtimes []RaftRuntime, interval time.Duration)

Start polls raft state and configuration on a fixed interval until ctx is canceled.

type RaftRuntime

type RaftRuntime struct {
	GroupID      uint64
	StatusReader raftengine.StatusReader
	ConfigReader raftengine.ConfigReader
}

RaftRuntime describes a raft group observed by the metrics exporter.

type RedisMetrics

type RedisMetrics struct {
	// contains filtered or unexported fields
}

RedisMetrics holds all Prometheus metric vectors for the Redis adapter.

func (*RedisMetrics) ObserveRedisRequest

func (m *RedisMetrics) ObserveRedisRequest(report RedisRequestReport)

ObserveRedisRequest records the final outcome of a Redis command.

type RedisRequestObserver

type RedisRequestObserver interface {
	ObserveRedisRequest(report RedisRequestReport)
}

RedisRequestObserver records per-command Redis API metrics.

type RedisRequestReport

type RedisRequestReport struct {
	Command  string
	IsError  bool
	Duration time.Duration
	// Unsupported indicates the command was rejected because the adapter
	// has no route for it. When true, ObserveRedisRequest additionally
	// records the real (bounded) command name in
	// elastickv_redis_unsupported_commands_total alongside the existing
	// "unknown"-bucketed counters, which are preserved unchanged.
	Unsupported bool
}

RedisRequestReport is the normalized result of a single Redis command.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry owns the Prometheus registry used by a single Elastickv node.

func NewRegistry

func NewRegistry(nodeID string, nodeAddress string) *Registry

NewRegistry builds a registry with constant labels that identify the local node.

func (*Registry) DispatchCollector

func (r *Registry) DispatchCollector() *DispatchCollector

DispatchCollector returns a collector that polls the etcd raft engine's dispatch counters and exports them to Prometheus. Start it with the node's raft sources after engine Open() completes.

func (*Registry) DynamoDBObserver

func (r *Registry) DynamoDBObserver() DynamoDBRequestObserver

DynamoDBObserver returns the DynamoDB request observer backed by this registry.

func (*Registry) Gatherer

func (r *Registry) Gatherer() prometheus.Gatherer

Gatherer exposes the underlying gatherer for tests and custom exporters.

func (*Registry) Handler

func (r *Registry) Handler() http.Handler

Handler returns an HTTP handler that exposes the Prometheus scrape endpoint.

func (*Registry) LeaseReadObserver

func (r *Registry) LeaseReadObserver() LeaseReadObserver

LeaseReadObserver returns an observer for the kv coordinator's LeaseRead fast-path counter. Returns a zero-value observer when the registry is nil so callers can pass the result through without checking; the zero value silently drops samples.

func (*Registry) LuaFastPathObserver

func (r *Registry) LuaFastPathObserver() LuaFastPathObserver

LuaFastPathObserver returns an observer for Lua-side redis.call() fast-path outcomes (hit / skip / fallback per command). Zero-value safe for tests and tools that do not wire a registry.

func (*Registry) LuaObserver

func (r *Registry) LuaObserver() LuaScriptObserver

LuaObserver returns the Lua script execution observer backed by this registry.

func (*Registry) PebbleCollector

func (r *Registry) PebbleCollector() *PebbleCollector

PebbleCollector returns a collector that polls each Pebble store's Metrics() snapshot and mirrors the operationally useful fields (L0 sublevels, compaction debt, memtable, block cache) into Prometheus. Start it with the node's Pebble sources after the stores have been opened.

func (*Registry) RaftObserver

func (r *Registry) RaftObserver() *RaftObserver

RaftObserver returns the Raft topology observer backed by this registry.

func (*Registry) RaftProposalObserver

func (r *Registry) RaftProposalObserver(groupID uint64) *raftProposalObserver

RaftProposalObserver returns a group-scoped observer for failed raft proposals.

func (*Registry) RedisObserver

func (r *Registry) RedisObserver() RedisRequestObserver

RedisObserver returns the Redis request observer backed by this registry.

func (*Registry) SQSObserver

func (r *Registry) SQSObserver() *SQSObserver

SQSObserver returns the queue-depth gauge observer backed by this registry. Same shape as RaftObserver / RedisObserver: callers pull it via the registry, then drive Start(ctx, source, interval) from main.go's startMonitoringCollectors.

func (*Registry) SQSPartitionObserver

func (r *Registry) SQSPartitionObserver() SQSPartitionObserver

SQSPartitionObserver returns the HT-FIFO partition-messages observer backed by this registry. Returns nil when the registry itself is nil so adapter call sites can pass the result through without checking; SQSMetrics.ObservePartitionMessage is also nil-receiver safe.

func (*Registry) SetFSMApplySyncMode

func (r *Registry) SetFSMApplySyncMode(activeLabel string)

SetFSMApplySyncMode forwards the resolved ELASTICKV_FSM_SYNC_MODE label to the PebbleMetrics gauge so operators can observe the active durability posture on this node. Safe to call with a nil registry.

func (*Registry) WriteConflictCollector

func (r *Registry) WriteConflictCollector() *WriteConflictCollector

WriteConflictCollector returns a collector that polls each MVCC store's per-(kind, key_prefix) OCC conflict counters and mirrors them into the elastickv_store_write_conflict_total Prometheus counter vector. Start it with the node's MVCC sources after the stores have been opened.

type SQSDepthSource

type SQSDepthSource interface {
	SnapshotQueueDepths(ctx context.Context) (snaps []SQSQueueDepth, ok bool)
}

SQSDepthSource is the contract a per-tick queue-depth source must satisfy. Implemented by *adapter.SQSServer; SQSObserver.Start calls SnapshotQueueDepths on every interval and writes the returned slice to the elastickv_sqs_queue_messages gauges.

Mirrors the Raft observer's StatusReader / ConfigReader pattern (monitoring/raft.go): the source returns ready-to-use snapshots and the observer owns the gauge state machine (forget-on-disappear, cardinality cap).

Two distinct empty-snapshot states, signalled by ok:

  • ok=true with an empty/nil slice — "this source legitimately has no queues this tick". Triggers when the node is a follower (leader-only emission) or the leader genuinely has zero queues configured. The observer diffs against the previous tick and ForgetQueue's any queue that disappeared so a former leader's gauges are cleared on step-down.

  • ok=false (regardless of the slice contents) — "the source could not produce a snapshot this tick" (transient catalog- read failure on the leader, context cancelled mid-scan). The observer must skip the diff entirely: leave existing gauges in place AND leave lastSeen untouched so the next successful tick can still diff against the previous good state. Without this branch a single failed scrape would wipe every depth gauge and produce a false "all queues drained" event on the dashboard until the next successful tick.

type SQSMetrics

type SQSMetrics struct {
	// contains filtered or unexported fields
}

SQSMetrics owns the Prometheus collectors for the SQS adapter. Mirrors DynamoDBMetrics' shape: per-Registry instance, label- cardinality-bounded by sqsMaxTrackedQueues, and split between counters (HT-FIFO partition activity) and gauges (queue depth).

The cardinality budget is split into two independent maps because the two metrics have different deletion semantics:

  • partitionMessages is a CounterVec. Counters are cumulative; deleting a series throws away its observed-since-process-start value, so we never call DeleteLabelValues on it. The counter budget therefore only ever grows — once a queue is admitted to trackedCounterQueues it stays admitted, and ForgetQueue does NOT touch this map.
  • queueDepth is a GaugeVec. Gauges have no cumulative state, so DeleteLabelValues is safe (and necessary, otherwise a deleted queue keeps reporting a frozen backlog on the dashboard). ForgetQueue both removes the gauge series and frees the queue's slot in trackedDepthQueues so a churn-heavy deployment can reuse the budget.

Sharing one map across the two metrics regresses the counter cap: ForgetQueue would free a slot, a new queue would be admitted, and the previous queue's counter series would still occupy a real-name label in Prometheus — letting cardinality grow without bound under queue churn (or after a leader step-down clears the observer's lastSeen). This was the P1 finding on PR #743.

func (*SQSMetrics) ForgetQueue

func (m *SQSMetrics) ForgetQueue(queue string)

ForgetQueue drops the three gauge series for a queue and frees its slot in the depth-side cardinality budget so a long-running deployment that regularly creates and deletes queues (CI workloads, ephemeral per-job queues) doesn't permanently wedge the 512-entry depth budget.

Three cases, by membership at call time:

  1. Queue is in trackedDepthQueues (admitted under its real name): drop the three state-labelled series and free the budget slot.
  2. Queue is in overflowDepthQueues (admitted, then collapsed onto the shared sqsQueueOverflow / "_other" label because the budget was saturated): remove from the overflow set. If that leaves the overflow set empty, drop the three _other series too — there's no remaining queue reporting into them, so the gauge would otherwise pin phantom backlog. While the overflow set is still non-empty the _other series stays put: tearing it down would zero out values that other overflow queues are legitimately maintaining.
  3. Queue is in neither map (never depth-observed): no-op.

The (queue, partition, action) counter series stays in all three cases — cumulative-by-design — and the queue's slot in trackedCounterQueues stays consumed. Reclaiming the counter slot would let a later queue be admitted under its real name while the original counter series still sat in Prometheus, which would let counter cardinality grow past sqsMaxTrackedQueues under churn or after a leader step-down clears the observer's lastSeen (the P1 finding on PR #743). Counter-side _other has no equivalent of the gauge phantom-backlog problem either, because cumulative counters legitimately reflect total operations from queues that have since been deleted.

Caller-audit per the standing semantic-change rule: only SQSObserver.observeOnce calls this (registry plumbing aside), and it's invoked exactly when a queue is observed in the previous tick but not the current one. The caller's contract — "drop gauges for a queue that disappeared so dashboards don't show frozen backlog" — is preserved AND extended consistently: overflow queues, previously a silent no-op, now also stop pinning the shared gauge once the last one disappears. The narrowed-but-still-correct scope (counter budget never reclaimed) is invisible to observeOnce because the observer only writes gauges; counters are fed via ObservePartitionMessage from a different code path entirely.

func (*SQSMetrics) ObservePartitionMessage

func (m *SQSMetrics) ObservePartitionMessage(queue string, partition uint32, action string)

ObservePartitionMessage implements SQSPartitionObserver. The (queue, action) pair is validated and (queue) is collapsed to the overflow label past sqsMaxTrackedQueues distinct names.

func (*SQSMetrics) ObserveQueueDepth

func (m *SQSMetrics) ObserveQueueDepth(queue string, visible, notVisible, delayed int64)

ObserveQueueDepth implements SQSDepthObserver. Updates the three state-labelled gauges for queue. Negative values are clamped to 0 so a transient scan failure (returning -1 sentinel from a future caller) cannot blast a fake backlog onto the dashboard.

type SQSObserver

type SQSObserver struct {
	// contains filtered or unexported fields
}

SQSObserver polls a SQSDepthSource on a fixed cadence and writes the result into the SQSMetrics gauge. Same shape as RaftObserver (monitoring/raft.go): the observer owns the state machine (current-vs-previous queue diff for ForgetQueue) and the source just returns ready-to-use snapshots. The observer is nil-tolerant at every entrypoint so test fixtures and metrics-disabled deployments can no-op without a defensive nil check.

func (*SQSObserver) ObserveOnce

func (o *SQSObserver) ObserveOnce(source SQSDepthSource)

ObserveOnce captures the latest depth snapshot synchronously. Mirrors RaftObserver.ObserveOnce; intended for tests that want deterministic single-tick behaviour without spinning up a ticker.

func (*SQSObserver) Start

func (o *SQSObserver) Start(ctx context.Context, source SQSDepthSource, interval time.Duration)

Start kicks off a background ticker that polls source every interval (defaulting to sqsDepthObserveInterval when zero) until ctx is canceled. The first observation runs synchronously so /metrics has fresh data on the first scrape; subsequent ticks run on the goroutine.

type SQSPartitionObserver

type SQSPartitionObserver interface {
	// ObservePartitionMessage increments the
	// sqs_partition_messages_total counter for one operation on
	// one (queue, partition) pair. Action must be one of
	// SQSPartitionActionSend / Receive / Delete; any other value
	// is silently dropped so a typo at a future call site cannot
	// crash the process.
	ObservePartitionMessage(queue string, partition uint32, action string)
}

SQSPartitionObserver records per-(queue, partition, action) counters for HT-FIFO operations. The interface is small so adapter call sites can pass a no-op observer in tests without pulling in the full Prometheus registry.

type SQSQueueDepth

type SQSQueueDepth struct {
	Queue      string
	Visible    int64
	NotVisible int64
	Delayed    int64
}

SQSQueueDepth is one queue's depth-attribute snapshot. Mirrors adapter.SQSQueueDepth byte-for-byte and is re-declared here to keep the monitoring package free of an adapter import. A drift between the two definitions surfaces as a compile error at the SQSObserver call site.

type WriteConflictCollector

type WriteConflictCollector struct {
	// contains filtered or unexported fields
}

WriteConflictCollector polls each registered store on a fixed interval and mirrors the snapshot into the Prometheus counter vector. Store-side counts are monotonic for the lifetime of a single store instance; counters advance by the positive delta against the last snapshot so a store reopen (Restore swap) does not produce negative values.

func (*WriteConflictCollector) ObserveOnce

func (c *WriteConflictCollector) ObserveOnce(sources []WriteConflictSource)

ObserveOnce is exposed for tests and single-shot callers.

func (*WriteConflictCollector) Start

func (c *WriteConflictCollector) Start(ctx context.Context, sources []WriteConflictSource, interval time.Duration)

Start polls sources on the given interval until ctx is canceled. Passing interval <= 0 uses defaultWriteConflictPollInterval (5 s), matching the DispatchCollector / PebbleCollector cadence.

type WriteConflictCounterSource

type WriteConflictCounterSource interface {
	WriteConflictCountsByPrefix() map[string]uint64
}

WriteConflictCounterSource abstracts per-group access to the MVCC store's OCC conflict counters. The concrete store implementations (pebbleStore, mvccStore, ShardStore, LeaderRoutedStore) satisfy this via WriteConflictCountsByPrefix(); keys in the returned map follow the "<kind>|<key_prefix>" encoding from the store package.

type WriteConflictMetrics

type WriteConflictMetrics struct {
	// contains filtered or unexported fields
}

WriteConflictMetrics owns the per-(group, kind, key_prefix) counter vector used by the write-conflict dashboard. Registered once per Registry.

type WriteConflictSource

type WriteConflictSource struct {
	GroupID    uint64
	GroupIDStr string
	Source     WriteConflictCounterSource
}

WriteConflictSource binds a raft group ID to a counter source. Multiple groups can be polled by a single collector on a sharded node. GroupIDStr is the pre-formatted decimal form of GroupID used as the "group" Prometheus label; pre-computing it avoids a per-tick strconv allocation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL