Documentation
¶
Overview ¶
Package keyviz implements the in-memory sampler that backs the docs/admin_ui_key_visualizer_design.md heatmap.
The sampler sits on the data-plane hot path and counts requests per Raft route. The contract for callers (today, kv.ShardedCoordinator):
- Construct a MemSampler with NewMemSampler.
- Wire it through the coordinator with a constructor option; the coordinator calls Sampler.Observe at the dispatch entry, after resolving the RouteID.
- Run Flush every StepSeconds in a background goroutine — RunFlusher does this for you with the supplied clock and ctx.
- Read the rendered matrix via Snapshot for the Admin gRPC service.
Hot-path properties (see design §5.1, §10):
- Observe is a single atomic.Pointer[routeTable].Load, a plain map lookup against an immutable snapshot, and at most two atomic.AddUint64 calls (one for the count, one for bytes — skipped when both keyLen and valueLen are zero). No allocation, no mutex.
- Flush drains the per-route counters with atomic.SwapUint64; no pointer retirement, so a late writer cannot race past the snapshot and lose counts.
- Adding / removing routes (RegisterRoute, RemoveRoute today; ApplySplit / ApplyMerge in a future PR) builds a fresh routeTable copy under a non-hot-path mutex and publishes it with a single atomic.Pointer.Store. Routes mutated mid-step keep their counters in the new table by design.
Index ¶
- Constants
- func RunFlusher(ctx context.Context, s *MemSampler, step time.Duration)
- func RunHotKeysAggregator(ctx context.Context, s *MemSampler)
- type KeyvizHotKeyEntry
- type KeyvizHotKeysSnapshot
- type MatrixColumn
- type MatrixRow
- type MemSampler
- func (s *MemSampler) Flush()
- func (s *MemSampler) HistoryColumns() int
- func (s *MemSampler) HotKeysOptions() (enabled bool, capacity, sampleRate, maxKeyLen int)
- func (s *MemSampler) HotKeysSnapshot(routeID uint64) *KeyvizHotKeysSnapshot
- func (s *MemSampler) Observe(routeID uint64, key []byte, op Op, valueLen int)
- func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte, groupID uint64) bool
- func (s *MemSampler) RemoveRoute(routeID uint64)
- func (s *MemSampler) SetLeaderTerm(groupID, term uint64)
- func (s *MemSampler) Snapshot(from, to time.Time) []MatrixColumn
- func (s *MemSampler) Step() time.Duration
- func (s *MemSampler) SubBucketBoundsFor(routeID uint64, subBucket int) (lo, hi []byte, ok bool)
- type MemSamplerOptions
- type Op
- type Sampler
- type Series
Constants ¶
const ( DefaultStep = 60 * time.Second DefaultHistoryColumns = 1440 // 24 hours at 60s steps. DefaultMaxTrackedRoutes = 10_000 DefaultMaxMemberRoutesPerSlot = 256 // DefaultKeyBucketsPerRoute is 1 — no sub-range bucketing, i.e. // today's exact route-granular behaviour. Operators opt into // hot-key sub-range sampling by raising it. DefaultKeyBucketsPerRoute = 1 )
Defaults for MemSamplerOptions when fields are left zero.
const ( DefaultHotKeysPerRoute = 64 MaxHotKeysPerRoute = 256 DefaultHotKeysSampleRate = 16 MaxHotKeysSampleRate = 1024 DefaultHotKeysQueueSize = 8192 MaxHotKeysQueueSize = 65536 DefaultHotKeysMaxKeyLen = 1024 MaxHotKeysMaxKeyLen = 4096 )
Defaults / caps for the per-cell hot-key drill-down knobs (see docs/design/2026_05_28_proposed_keyviz_hot_key_topk.md §8). All off-by-default; HotKeysEnabled=false is the binary's existing behaviour — no extra hot-path cost, no real key bytes retained.
const MaxHistoryColumns = 100_000
MaxHistoryColumns is the upper bound on opts.HistoryColumns. The ring buffer pre-allocates a slice of capacity HistoryColumns at construction; misconfiguration (e.g. an operator typo of 100_000_000) would otherwise reserve gigabytes up front. 100 000 columns at the default 60s Step is ~70 days of history — longer retention is the Phase 3 persistence path's job, not the in-memory ring's.
const MaxKeyBucketsPerRoute = 256
MaxKeyBucketsPerRoute caps KeyBucketsPerRoute. The cap is an operationally-safe bound, not merely a finite one: per-route memory is K × 4 × 8 bytes, so at the default MaxTrackedRoutes = 10_000 the cap of 256 bounds the worst case to ~80 MB of counters (vs ~1.28 GB at K=4096, which a monitoring subsystem must not need). The design's operator rule of thumb is K_max ≈ memBudget / (32 × MaxTrackedRoutes).
Variables ¶
This section is empty.
Functions ¶
func RunFlusher ¶
func RunFlusher(ctx context.Context, s *MemSampler, step time.Duration)
RunFlusher drives Sampler.Flush at the supplied interval until ctx is cancelled. Returns when ctx fires; the final tick is not executed (a graceful shutdown should call Sampler.Flush once more after RunFlusher returns if it wants to harvest the in-progress step).
step <= 0 falls back to DefaultStep.
This is a tiny wrapper so call sites in main.go don't need to spell out the ticker boilerplate; testing the boilerplate is the unit test for this package, not for callers.
func RunHotKeysAggregator ¶
func RunHotKeysAggregator(ctx context.Context, s *MemSampler)
RunHotKeysAggregator drives the per-route Top-K aggregator goroutine until ctx is cancelled, returning afterwards. Mirrors RunFlusher; the caller in main.go is expected to launch one of each in the same errgroup. No-op (blocks on ctx) when the sampler is nil or HotKeysEnabled is false — keeping the call site uniform so a future flip of the flag does not require restructuring startup wiring.
Types ¶
type KeyvizHotKeyEntry ¶
KeyvizHotKeyEntry is one tracked (key, count) pair in a snapshot. Count is the sketch's raw count (sampled-stream observations); the admin layer scales it by SampleRate to estimate true frequency and computes error_bound = SampleRate * SampledN / Capacity.
type KeyvizHotKeysSnapshot ¶
type KeyvizHotKeysSnapshot struct {
RouteID uint64
SampledN uint64 // events the sketch actually saw this window
DroppedSamples uint64 // node-global: bounded-queue back-pressure drops
SkippedLongKeys uint64 // node-global: pre-sample length-cap rejects
SnapshotAt time.Time // when the aggregator deep-copied this snapshot
SampleRate int // R at the time of snapshot
Capacity int // m at the time of snapshot
Entries []KeyvizHotKeyEntry
}
KeyvizHotKeysSnapshot is the immutable per-route view a drill-down handler reads. The aggregator deep-copies every key bytes before publishing, so a reader can hold it across resets / evictions on the live sketch without races (design §4 published-snapshot ownership).
Exported so the admin handler can read it; the type lives in the keyviz package because the aggregator owns and publishes it.
type MatrixColumn ¶
MatrixColumn is one slice of the heatmap at a single flush time.
type MatrixRow ¶
type MatrixRow struct {
RouteID uint64
Start, End []byte
Aggregate bool
MemberRoutes []uint64
// MemberRoutesTotal is how many distinct route IDs contributed to
// this row's counters, including ones that exceeded
// MaxMemberRoutesPerSlot and so are NOT listed in MemberRoutes.
// Snapshot consumers should treat MemberRoutes as the visible
// prefix of this list when MemberRoutesTotal > len(MemberRoutes).
MemberRoutesTotal uint64
// RaftGroupID + LeaderTerm carry the route's Raft identity at the
// time the column was flushed. Stamped from the per-group term
// snapshot SetLeaderTerm publishes (Phase 2-C+ fan-out merge
// uses (RouteID/BucketID, RaftGroupID, LeaderTerm, columnAt) as
// the dedupe key). Zero values mean "term not tracked" — emitted
// when no SetLeaderTerm call has been made for the group yet, or
// for synthetic virtual-bucket slots that span groups.
RaftGroupID uint64
LeaderTerm uint64
// SubBucket is this row's sub-range index within its parent route,
// 0 for the first (or only) sub-bucket. SubBucketCount is how many
// sub-buckets the route is divided into: 1 for a K==1 / aggregate /
// degenerate / unbounded-tail slot, > 1 for a genuinely sub-divided
// route. SubBucketCount is the disambiguator a row's BucketID
// suffix needs — a K==1 slot's only row and a K>1 slot's bucket-0
// row both have SubBucket == 0, so the count is what tells them
// apart (the #subIdx suffix is added only when SubBucketCount > 1).
// Both are int (bounded 0..MaxKeyBucketsPerRoute); they are
// keyviz-internal (the proto KeyVizRow has no equivalent), so no
// fixed-width type is required.
SubBucket int
SubBucketCount int
Reads uint64
Writes uint64
ReadBytes uint64
WriteBytes uint64
}
MatrixRow is a single route or virtual bucket's counter snapshot taken at flush time.
type MemSampler ¶
type MemSampler struct {
// contains filtered or unexported fields
}
MemSampler is the in-process Sampler implementation. The zero value is not usable — construct via NewMemSampler.
func NewMemSampler ¶
func NewMemSampler(opts MemSamplerOptions) *MemSampler
NewMemSampler constructs a sampler with the supplied options. Zero fields fall back to the Default* constants; non-positive values (including explicitly-negative HistoryColumns) are clamped to a safe minimum by newRingBuffer. Always returns a usable sampler — callers should pass a zero options struct for the default configuration.
func (*MemSampler) Flush ¶
func (s *MemSampler) Flush()
Flush drains every slot's counters with atomic.SwapUint64 and appends one MatrixColumn to the ring buffer. Idle slots (all counters zero) are skipped to keep the column compact. Slots that RemoveRoute retired since the previous Flush are drained alongside the live table, so route churn does not silently lose counts.
Rows are emitted in Start-key order regardless of which slot list they came from (live, retired, or virtual-member-pruned), preserving the API contract that matrix consumers can rely on monotone Start across columns.
func (*MemSampler) HistoryColumns ¶
func (s *MemSampler) HistoryColumns() int
HistoryColumns returns the configured ring-buffer length after applying defaults and the MaxHistoryColumns clamp. Wiring tests use this to verify --keyvizHistoryColumns is forwarded end-to-end without exposing the internal opts struct.
func (*MemSampler) HotKeysOptions ¶
func (s *MemSampler) HotKeysOptions() (enabled bool, capacity, sampleRate, maxKeyLen int)
HotKeysOptions returns the effective hot-keys configuration after the NewMemSampler clamps. Used by the admin handler so the response can echo the same `sample_rate` / `m` values the sketch was built with, regardless of what an operator passed on the CLI. nil-receiver-safe.
func (*MemSampler) HotKeysSnapshot ¶
func (s *MemSampler) HotKeysSnapshot(routeID uint64) *KeyvizHotKeysSnapshot
HotKeysSnapshot returns the most-recently-published per-route Top-K snapshot for the given individual route, or nil if hot-keys tracking is disabled, the route is unknown, the route is an aggregate, or no publish has fired yet. Lock-free: an atomic.Pointer.Load (Codex-P1 fix from the design's round-4 review — no shared mutable PRNG state, no per-route lock).
func (*MemSampler) Observe ¶
func (s *MemSampler) Observe(routeID uint64, key []byte, op Op, valueLen int)
Observe records one request against a route. Cost on a hit: atomic.Pointer.Load + plain map lookup + 2× atomic.AddUint64 (count and bytes). Misses (RouteID never registered) drop silently — the route-watch subscriber is responsible for Register before the coordinator publishes the new RouteID.
func (*MemSampler) RegisterRoute ¶
func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte, groupID uint64) bool
RegisterRoute adds a (RouteID, [Start, End)) pair to the tracking set. groupID is the Raft group the route belongs to; it is stamped onto every MatrixRow this slot eventually emits so the Phase 2-C+ fan-out merge can dedupe write samples by (groupID, leaderTerm). Pass 0 when the deployment doesn't use multiple groups (legacy / single-group setups); the legacy max-merge fallback handles those.
Returns true when the route gets its own slot, false when the MaxTrackedRoutes cap was hit and the route was folded into a virtual aggregate bucket. Idempotent: calling twice with the same RouteID is a no-op (the original slot stays in place; a different groupID on the second call is silently ignored — RegisterRoute is not the right API to retag a slot, RemoveRoute + RegisterRoute is).
If a previous RemoveRoute(routeID) queued a deferred member-prune and the route is now re-registered into the SAME bucket inside the grace window, that prune is cancelled — otherwise the routeID would disappear from bucket.MemberRoutes despite Observe still attributing fresh traffic to it. Prunes for different buckets (or when the route rejoins as an individual slot) are left alone so the old bucket's MemberRoutes is correctly cleaned up.
func (*MemSampler) RemoveRoute ¶
func (s *MemSampler) RemoveRoute(routeID uint64)
RemoveRoute drops a RouteID from tracking. Counts accumulated since the last flush are NOT lost: the retired slot (or, for virtual-bucket members, just the membership entry) is queued for one final drain by the next Flush. Subsequent Observe(routeID, …) calls are silent no-ops. Idempotent.
func (*MemSampler) SetLeaderTerm ¶
func (s *MemSampler) SetLeaderTerm(groupID, term uint64)
SetLeaderTerm publishes the current Raft leader term for the given group. Called by main.go on a periodic ticker that polls the engine Status, and on every term-change observed via the engine. Phase 2-C+ stamps each MatrixRow's LeaderTerm from this snapshot at Flush time.
Calling with term == 0 is allowed but treated as "term unknown" during merge — the canonical (groupID, term) dedupe key collapses to the legacy max-merge for cells whose LeaderTerm is 0. This lets nodes that have not finished engine startup contribute partial data without poisoning the merge.
groupID == 0 is reserved for virtual aggregate buckets (which span multiple real groups). Calls with groupID == 0 are silently ignored so a future caller cannot accidentally stamp a non-zero term on aggregate rows — they must remain LeaderTerm == 0 so the fan-out merge falls back to max-merge for cross-group cells.
nil-receiver-safe.
func (*MemSampler) Snapshot ¶
func (s *MemSampler) Snapshot(from, to time.Time) []MatrixColumn
Snapshot returns the matrix columns in the supplied [from, to) half-open time range. Ordering is oldest-first. Series selects which MatrixRow value the caller is going to display; the slot metadata (RouteID, Start, End, Aggregate, MemberRoutes) is included on every row so a UI can render the same response with different series without a re-query.
func (*MemSampler) Step ¶
func (s *MemSampler) Step() time.Duration
Step returns the configured flush interval after applying default fallbacks. Callers wiring up RunFlusher can use this to align their ticker with the sampler's expectations rather than passing the interval through two configuration paths.
func (*MemSampler) SubBucketBoundsFor ¶
func (s *MemSampler) SubBucketBoundsFor(routeID uint64, subBucket int) (lo, hi []byte, ok bool)
SubBucketBoundsFor returns the [lo, hi) key bounds of sub-bucket subBucket within the individual route routeID, mirroring what Flush emits as a MatrixRow's Start/End. The admin hot-keys handler uses this to filter the route's top-m snapshot to keys in the drill-down-clicked sub-range (design §5).
ok is false when the route doesn't exist, is an aggregate (those are not eligible for hot-keys), or subBucket is out of range [0, len(subBuckets)). For a single-bucket slot (K=1 / unbounded / degenerate), bound calls with subBucket == 0 succeed and return the route's own Start/End. hi == nil means the sub-bucket extends to the route's unbounded tail.
nil-receiver-safe.
type MemSamplerOptions ¶
type MemSamplerOptions struct {
// Step is the flush interval. The ring buffer's resolution.
Step time.Duration
// HistoryColumns caps the ring buffer length. Older columns are
// dropped on push when the buffer is full.
HistoryColumns int
// MaxTrackedRoutes caps the number of routes whose counters are
// kept individually before coarsening kicks in. RegisterRoute
// returns false past this cap, the route ID maps into a virtual
// bucket, and Snapshot reports it with Aggregate=true.
MaxTrackedRoutes int
// MaxMemberRoutesPerSlot caps how many distinct RouteIDs a single
// virtual bucket records in MemberRoutes. Beyond this cap the
// route still folds into the bucket counters (so traffic is not
// dropped) but the routeID is not appended — keeping per-column
// payload size bounded when total routes far exceed
// MaxTrackedRoutes. Snapshot consumers should treat the list as
// "first N members" rather than authoritative attribution.
MaxMemberRoutesPerSlot int
// KeyBucketsPerRoute (K) is how many order-preserving sub-range
// buckets each individual route's [Start, End) is divided into for
// the hot-key heatmap. 1 (the default) disables sub-bucketing and
// reproduces today's route-granular behaviour exactly. Clamped to
// [1, MaxKeyBucketsPerRoute] at construction. Virtual aggregate
// buckets are never sub-divided regardless of this value.
KeyBucketsPerRoute int
// HotKeysEnabled opts in to per-route Top-K hot-key tracking that
// backs the heatmap drill-down (Phase 2-A++; see
// docs/design/2026_05_28_proposed_keyviz_hot_key_topk.md). When
// false the hot path adds one early-return branch and nothing else
// — disabled-case behaviour is byte-identical to today. When true
// the sampler retains REAL key bytes in memory and exposes them on
// the admin drill-down API (gated behind admin auth + audit).
HotKeysEnabled bool
// HotKeysPerRoute is the Space-Saving sketch capacity m per route
// (default 64, cap 256 — design §8). Larger m tightens the noise
// floor (error_bound ≈ N_total / m) at the cost of memory.
HotKeysPerRoute int
// HotKeysSampleRate is R: the hot path enqueues 1 in R observes.
// Default 16, cap 1024. Higher R reduces hot-path cost but raises
// the probability that a heavy hitter is missed (Chernoff variance
// over the sampled stream).
HotKeysSampleRate int
// HotKeysQueueSize bounds the channel between the hot path and
// the aggregator goroutine. Defaults 8192, cap 65536. Drops past
// the queue are counted, not silent (`dropped_samples` →
// `degraded`).
HotKeysQueueSize int
// HotKeysMaxKeyLen caps the key length sampled into the hot-keys
// sketch. Default 1024 B, cap 4096 B. Longer keys bump the
// node-global `skipped_long_keys` counter and contribute to the
// snapshot's `degraded` flag — they are never truncated (truncation
// would alias different keys).
HotKeysMaxKeyLen int
// Now overrides time.Now for tests; nil falls back to time.Now.
Now func() time.Time
}
MemSamplerOptions configures NewMemSampler. Zero values fall back to the Default* constants above; passing a struct literal with only the fields you care about is the expected call style.
type Sampler ¶
type Sampler interface {
// Observe records a single request against a route. Op identifies
// the counter family. The key drives sub-range bucketing (the hot
// key/sub-range heatmap, see docs/design/2026_05_25_proposed_keyviz_subrange_sampling.md);
// len(key) and valueLen are summed into the matching *Bytes
// counter; pass valueLen 0 for read-only ops where the response
// size is irrelevant. Implementations must no-op (not panic) when
// invoked on a typed-nil receiver.
Observe(routeID uint64, key []byte, op Op, valueLen int)
}
Sampler is the narrow interface the coordinator depends on. The nil-safe contract is documented per-method so a coordinator wired without a sampler compiles to a no-op call.
Implementations MUST be nil-receiver-safe: a typed-nil implementation passed through this interface (e.g. `var s Sampler = (*MemSampler)(nil)`) must not panic when its methods are called. The coordinator stores the interface value as supplied and dispatches through it on the hot path; a guard at the call site only checks for an interface-nil, not a typed-nil.