keyviz

package
v0.0.0-...-ebdf8eb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: AGPL-3.0 Imports: 9 Imported by: 0

Documentation

Overview

Package keyviz implements the in-memory sampler that backs the docs/admin_ui_key_visualizer_design.md heatmap.

The sampler sits on the data-plane hot path and counts requests per Raft route. The contract for callers (today, kv.ShardedCoordinator):

  • Construct a MemSampler with NewMemSampler.
  • Wire it through the coordinator with a constructor option; the coordinator calls Sampler.Observe at the dispatch entry, after resolving the RouteID.
  • Run Flush every StepSeconds in a background goroutine — RunFlusher does this for you with the supplied clock and ctx.
  • Read the rendered matrix via Snapshot for the Admin gRPC service.

Hot-path properties (see design §5.1, §10):

  • Observe is a single atomic.Pointer[routeTable].Load, a plain map lookup against an immutable snapshot, and at most two atomic.AddUint64 calls (one for the count, one for bytes — skipped when both keyLen and valueLen are zero). No allocation, no mutex.
  • Flush drains the per-route counters with atomic.SwapUint64; no pointer retirement, so a late writer cannot race past the snapshot and lose counts.
  • Adding / removing routes (RegisterRoute, RemoveRoute today; ApplySplit / ApplyMerge in a future PR) builds a fresh routeTable copy under a non-hot-path mutex and publishes it with a single atomic.Pointer.Store. Routes mutated mid-step keep their counters in the new table by design.

Index

Constants

View Source
const (
	DefaultStep                   = 60 * time.Second
	DefaultHistoryColumns         = 1440 // 24 hours at 60s steps.
	DefaultMaxTrackedRoutes       = 10_000
	DefaultMaxMemberRoutesPerSlot = 256
	// DefaultKeyBucketsPerRoute is 1 — no sub-range bucketing, i.e.
	// today's exact route-granular behaviour. Operators opt into
	// hot-key sub-range sampling by raising it.
	DefaultKeyBucketsPerRoute = 1
)

Defaults for MemSamplerOptions when fields are left zero.

View Source
const (
	DefaultHotKeysPerRoute   = 64
	MaxHotKeysPerRoute       = 256
	DefaultHotKeysSampleRate = 16
	MaxHotKeysSampleRate     = 1024
	DefaultHotKeysQueueSize  = 8192
	MaxHotKeysQueueSize      = 65536
	DefaultHotKeysMaxKeyLen  = 1024
	MaxHotKeysMaxKeyLen      = 4096
)

Defaults / caps for the per-cell hot-key drill-down knobs (see docs/design/2026_05_28_proposed_keyviz_hot_key_topk.md §8). All off-by-default; HotKeysEnabled=false is the binary's existing behaviour — no extra hot-path cost, no real key bytes retained.

View Source
const MaxHistoryColumns = 100_000

MaxHistoryColumns is the upper bound on opts.HistoryColumns. The ring buffer pre-allocates a slice of capacity HistoryColumns at construction; misconfiguration (e.g. an operator typo of 100_000_000) would otherwise reserve gigabytes up front. 100 000 columns at the default 60s Step is ~70 days of history — longer retention is the Phase 3 persistence path's job, not the in-memory ring's.

View Source
const MaxKeyBucketsPerRoute = 256

MaxKeyBucketsPerRoute caps KeyBucketsPerRoute. The cap is an operationally-safe bound, not merely a finite one: per-route memory is K × 4 × 8 bytes, so at the default MaxTrackedRoutes = 10_000 the cap of 256 bounds the worst case to ~80 MB of counters (vs ~1.28 GB at K=4096, which a monitoring subsystem must not need). The design's operator rule of thumb is K_max ≈ memBudget / (32 × MaxTrackedRoutes).

Variables

This section is empty.

Functions

func RunFlusher

func RunFlusher(ctx context.Context, s *MemSampler, step time.Duration)

RunFlusher drives Sampler.Flush at the supplied interval until ctx is cancelled. Returns when ctx fires; the final tick is not executed (a graceful shutdown should call Sampler.Flush once more after RunFlusher returns if it wants to harvest the in-progress step).

step <= 0 falls back to DefaultStep.

This is a tiny wrapper so call sites in main.go don't need to spell out the ticker boilerplate; testing the boilerplate is the unit test for this package, not for callers.

func RunHotKeysAggregator

func RunHotKeysAggregator(ctx context.Context, s *MemSampler)

RunHotKeysAggregator drives the per-route Top-K aggregator goroutine until ctx is cancelled, returning afterwards. Mirrors RunFlusher; the caller in main.go is expected to launch one of each in the same errgroup. No-op (blocks on ctx) when the sampler is nil or HotKeysEnabled is false — keeping the call site uniform so a future flip of the flag does not require restructuring startup wiring.

Types

type KeyvizHotKeyEntry

type KeyvizHotKeyEntry struct {
	Key   []byte
	Count uint64
}

KeyvizHotKeyEntry is one tracked (key, count) pair in a snapshot. Count is the sketch's raw count (sampled-stream observations); the admin layer scales it by SampleRate to estimate true frequency and computes error_bound = SampleRate * SampledN / Capacity.

type KeyvizHotKeysSnapshot

type KeyvizHotKeysSnapshot struct {
	RouteID         uint64
	SampledN        uint64    // events the sketch actually saw this window
	DroppedSamples  uint64    // node-global: bounded-queue back-pressure drops
	SkippedLongKeys uint64    // node-global: pre-sample length-cap rejects
	SnapshotAt      time.Time // when the aggregator deep-copied this snapshot
	SampleRate      int       // R at the time of snapshot
	Capacity        int       // m at the time of snapshot
	Entries         []KeyvizHotKeyEntry
}

KeyvizHotKeysSnapshot is the immutable per-route view a drill-down handler reads. The aggregator deep-copies every key bytes before publishing, so a reader can hold it across resets / evictions on the live sketch without races (design §4 published-snapshot ownership).

Exported so the admin handler can read it; the type lives in the keyviz package because the aggregator owns and publishes it.

type MatrixColumn

type MatrixColumn struct {
	At   time.Time
	Rows []MatrixRow
}

MatrixColumn is one slice of the heatmap at a single flush time.

type MatrixRow

type MatrixRow struct {
	RouteID      uint64
	Start, End   []byte
	Aggregate    bool
	MemberRoutes []uint64
	// MemberRoutesTotal is how many distinct route IDs contributed to
	// this row's counters, including ones that exceeded
	// MaxMemberRoutesPerSlot and so are NOT listed in MemberRoutes.
	// Snapshot consumers should treat MemberRoutes as the visible
	// prefix of this list when MemberRoutesTotal > len(MemberRoutes).
	MemberRoutesTotal uint64

	// RaftGroupID + LeaderTerm carry the route's Raft identity at the
	// time the column was flushed. Stamped from the per-group term
	// snapshot SetLeaderTerm publishes (Phase 2-C+ fan-out merge
	// uses (RouteID/BucketID, RaftGroupID, LeaderTerm, columnAt) as
	// the dedupe key). Zero values mean "term not tracked" — emitted
	// when no SetLeaderTerm call has been made for the group yet, or
	// for synthetic virtual-bucket slots that span groups.
	RaftGroupID uint64
	LeaderTerm  uint64

	// SubBucket is this row's sub-range index within its parent route,
	// 0 for the first (or only) sub-bucket. SubBucketCount is how many
	// sub-buckets the route is divided into: 1 for a K==1 / aggregate /
	// degenerate / unbounded-tail slot, > 1 for a genuinely sub-divided
	// route. SubBucketCount is the disambiguator a row's BucketID
	// suffix needs — a K==1 slot's only row and a K>1 slot's bucket-0
	// row both have SubBucket == 0, so the count is what tells them
	// apart (the #subIdx suffix is added only when SubBucketCount > 1).
	// Both are int (bounded 0..MaxKeyBucketsPerRoute); they are
	// keyviz-internal (the proto KeyVizRow has no equivalent), so no
	// fixed-width type is required.
	SubBucket      int
	SubBucketCount int

	Reads      uint64
	Writes     uint64
	ReadBytes  uint64
	WriteBytes uint64
}

MatrixRow is a single route or virtual bucket's counter snapshot taken at flush time.

type MemSampler

type MemSampler struct {
	// contains filtered or unexported fields
}

MemSampler is the in-process Sampler implementation. The zero value is not usable — construct via NewMemSampler.

func NewMemSampler

func NewMemSampler(opts MemSamplerOptions) *MemSampler

NewMemSampler constructs a sampler with the supplied options. Zero fields fall back to the Default* constants; non-positive values (including explicitly-negative HistoryColumns) are clamped to a safe minimum by newRingBuffer. Always returns a usable sampler — callers should pass a zero options struct for the default configuration.

func (*MemSampler) Flush

func (s *MemSampler) Flush()

Flush drains every slot's counters with atomic.SwapUint64 and appends one MatrixColumn to the ring buffer. Idle slots (all counters zero) are skipped to keep the column compact. Slots that RemoveRoute retired since the previous Flush are drained alongside the live table, so route churn does not silently lose counts.

Rows are emitted in Start-key order regardless of which slot list they came from (live, retired, or virtual-member-pruned), preserving the API contract that matrix consumers can rely on monotone Start across columns.

func (*MemSampler) HistoryColumns

func (s *MemSampler) HistoryColumns() int

HistoryColumns returns the configured ring-buffer length after applying defaults and the MaxHistoryColumns clamp. Wiring tests use this to verify --keyvizHistoryColumns is forwarded end-to-end without exposing the internal opts struct.

func (*MemSampler) HotKeysOptions

func (s *MemSampler) HotKeysOptions() (enabled bool, capacity, sampleRate, maxKeyLen int)

HotKeysOptions returns the effective hot-keys configuration after the NewMemSampler clamps. Used by the admin handler so the response can echo the same `sample_rate` / `m` values the sketch was built with, regardless of what an operator passed on the CLI. nil-receiver-safe.

func (*MemSampler) HotKeysSnapshot

func (s *MemSampler) HotKeysSnapshot(routeID uint64) *KeyvizHotKeysSnapshot

HotKeysSnapshot returns the most-recently-published per-route Top-K snapshot for the given individual route, or nil if hot-keys tracking is disabled, the route is unknown, the route is an aggregate, or no publish has fired yet. Lock-free: an atomic.Pointer.Load (Codex-P1 fix from the design's round-4 review — no shared mutable PRNG state, no per-route lock).

func (*MemSampler) Observe

func (s *MemSampler) Observe(routeID uint64, key []byte, op Op, valueLen int)

Observe records one request against a route. Cost on a hit: atomic.Pointer.Load + plain map lookup + 2× atomic.AddUint64 (count and bytes). Misses (RouteID never registered) drop silently — the route-watch subscriber is responsible for Register before the coordinator publishes the new RouteID.

func (*MemSampler) RegisterRoute

func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte, groupID uint64) bool

RegisterRoute adds a (RouteID, [Start, End)) pair to the tracking set. groupID is the Raft group the route belongs to; it is stamped onto every MatrixRow this slot eventually emits so the Phase 2-C+ fan-out merge can dedupe write samples by (groupID, leaderTerm). Pass 0 when the deployment doesn't use multiple groups (legacy / single-group setups); the legacy max-merge fallback handles those.

Returns true when the route gets its own slot, false when the MaxTrackedRoutes cap was hit and the route was folded into a virtual aggregate bucket. Idempotent: calling twice with the same RouteID is a no-op (the original slot stays in place; a different groupID on the second call is silently ignored — RegisterRoute is not the right API to retag a slot, RemoveRoute + RegisterRoute is).

If a previous RemoveRoute(routeID) queued a deferred member-prune and the route is now re-registered into the SAME bucket inside the grace window, that prune is cancelled — otherwise the routeID would disappear from bucket.MemberRoutes despite Observe still attributing fresh traffic to it. Prunes for different buckets (or when the route rejoins as an individual slot) are left alone so the old bucket's MemberRoutes is correctly cleaned up.

func (*MemSampler) RemoveRoute

func (s *MemSampler) RemoveRoute(routeID uint64)

RemoveRoute drops a RouteID from tracking. Counts accumulated since the last flush are NOT lost: the retired slot (or, for virtual-bucket members, just the membership entry) is queued for one final drain by the next Flush. Subsequent Observe(routeID, …) calls are silent no-ops. Idempotent.

func (*MemSampler) SetLeaderTerm

func (s *MemSampler) SetLeaderTerm(groupID, term uint64)

SetLeaderTerm publishes the current Raft leader term for the given group. Called by main.go on a periodic ticker that polls the engine Status, and on every term-change observed via the engine. Phase 2-C+ stamps each MatrixRow's LeaderTerm from this snapshot at Flush time.

Calling with term == 0 is allowed but treated as "term unknown" during merge — the canonical (groupID, term) dedupe key collapses to the legacy max-merge for cells whose LeaderTerm is 0. This lets nodes that have not finished engine startup contribute partial data without poisoning the merge.

groupID == 0 is reserved for virtual aggregate buckets (which span multiple real groups). Calls with groupID == 0 are silently ignored so a future caller cannot accidentally stamp a non-zero term on aggregate rows — they must remain LeaderTerm == 0 so the fan-out merge falls back to max-merge for cross-group cells.

nil-receiver-safe.

func (*MemSampler) Snapshot

func (s *MemSampler) Snapshot(from, to time.Time) []MatrixColumn

Snapshot returns the matrix columns in the supplied [from, to) half-open time range. Ordering is oldest-first. Series selects which MatrixRow value the caller is going to display; the slot metadata (RouteID, Start, End, Aggregate, MemberRoutes) is included on every row so a UI can render the same response with different series without a re-query.

func (*MemSampler) Step

func (s *MemSampler) Step() time.Duration

Step returns the configured flush interval after applying default fallbacks. Callers wiring up RunFlusher can use this to align their ticker with the sampler's expectations rather than passing the interval through two configuration paths.

func (*MemSampler) SubBucketBoundsFor

func (s *MemSampler) SubBucketBoundsFor(routeID uint64, subBucket int) (lo, hi []byte, ok bool)

SubBucketBoundsFor returns the [lo, hi) key bounds of sub-bucket subBucket within the individual route routeID, mirroring what Flush emits as a MatrixRow's Start/End. The admin hot-keys handler uses this to filter the route's top-m snapshot to keys in the drill-down-clicked sub-range (design §5).

ok is false when the route doesn't exist, is an aggregate (those are not eligible for hot-keys), or subBucket is out of range [0, len(subBuckets)). For a single-bucket slot (K=1 / unbounded / degenerate), bound calls with subBucket == 0 succeed and return the route's own Start/End. hi == nil means the sub-bucket extends to the route's unbounded tail.

nil-receiver-safe.

type MemSamplerOptions

type MemSamplerOptions struct {
	// Step is the flush interval. The ring buffer's resolution.
	Step time.Duration
	// HistoryColumns caps the ring buffer length. Older columns are
	// dropped on push when the buffer is full.
	HistoryColumns int
	// MaxTrackedRoutes caps the number of routes whose counters are
	// kept individually before coarsening kicks in. RegisterRoute
	// returns false past this cap, the route ID maps into a virtual
	// bucket, and Snapshot reports it with Aggregate=true.
	MaxTrackedRoutes int
	// MaxMemberRoutesPerSlot caps how many distinct RouteIDs a single
	// virtual bucket records in MemberRoutes. Beyond this cap the
	// route still folds into the bucket counters (so traffic is not
	// dropped) but the routeID is not appended — keeping per-column
	// payload size bounded when total routes far exceed
	// MaxTrackedRoutes. Snapshot consumers should treat the list as
	// "first N members" rather than authoritative attribution.
	MaxMemberRoutesPerSlot int
	// KeyBucketsPerRoute (K) is how many order-preserving sub-range
	// buckets each individual route's [Start, End) is divided into for
	// the hot-key heatmap. 1 (the default) disables sub-bucketing and
	// reproduces today's route-granular behaviour exactly. Clamped to
	// [1, MaxKeyBucketsPerRoute] at construction. Virtual aggregate
	// buckets are never sub-divided regardless of this value.
	KeyBucketsPerRoute int
	// HotKeysEnabled opts in to per-route Top-K hot-key tracking that
	// backs the heatmap drill-down (Phase 2-A++; see
	// docs/design/2026_05_28_proposed_keyviz_hot_key_topk.md). When
	// false the hot path adds one early-return branch and nothing else
	// — disabled-case behaviour is byte-identical to today. When true
	// the sampler retains REAL key bytes in memory and exposes them on
	// the admin drill-down API (gated behind admin auth + audit).
	HotKeysEnabled bool
	// HotKeysPerRoute is the Space-Saving sketch capacity m per route
	// (default 64, cap 256 — design §8). Larger m tightens the noise
	// floor (error_bound ≈ N_total / m) at the cost of memory.
	HotKeysPerRoute int
	// HotKeysSampleRate is R: the hot path enqueues 1 in R observes.
	// Default 16, cap 1024. Higher R reduces hot-path cost but raises
	// the probability that a heavy hitter is missed (Chernoff variance
	// over the sampled stream).
	HotKeysSampleRate int
	// HotKeysQueueSize bounds the channel between the hot path and
	// the aggregator goroutine. Defaults 8192, cap 65536. Drops past
	// the queue are counted, not silent (`dropped_samples` →
	// `degraded`).
	HotKeysQueueSize int
	// HotKeysMaxKeyLen caps the key length sampled into the hot-keys
	// sketch. Default 1024 B, cap 4096 B. Longer keys bump the
	// node-global `skipped_long_keys` counter and contribute to the
	// snapshot's `degraded` flag — they are never truncated (truncation
	// would alias different keys).
	HotKeysMaxKeyLen int
	// Now overrides time.Now for tests; nil falls back to time.Now.
	Now func() time.Time
}

MemSamplerOptions configures NewMemSampler. Zero values fall back to the Default* constants above; passing a struct literal with only the fields you care about is the expected call style.

type Op

type Op uint8

Op identifies which counter family Observe should bump.

const (
	OpRead Op = iota
	OpWrite
)

type Sampler

type Sampler interface {
	// Observe records a single request against a route. Op identifies
	// the counter family. The key drives sub-range bucketing (the hot
	// key/sub-range heatmap, see docs/design/2026_05_25_proposed_keyviz_subrange_sampling.md);
	// len(key) and valueLen are summed into the matching *Bytes
	// counter; pass valueLen 0 for read-only ops where the response
	// size is irrelevant. Implementations must no-op (not panic) when
	// invoked on a typed-nil receiver.
	Observe(routeID uint64, key []byte, op Op, valueLen int)
}

Sampler is the narrow interface the coordinator depends on. The nil-safe contract is documented per-method so a coordinator wired without a sampler compiles to a no-op call.

Implementations MUST be nil-receiver-safe: a typed-nil implementation passed through this interface (e.g. `var s Sampler = (*MemSampler)(nil)`) must not panic when its methods are called. The coordinator stores the interface value as supplied and dispatches through it on the hot path; a guard at the call site only checks for an interface-nil, not a typed-nil.

type Series

type Series uint8

Series selects which counter the matrix response should expose.

const (
	SeriesReads Series = iota
	SeriesWrites
	SeriesReadBytes
	SeriesWriteBytes
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL