Documentation
¶
Overview ¶
Package keyviz implements the in-memory sampler that backs the docs/admin_ui_key_visualizer_design.md heatmap.
The sampler sits on the data-plane hot path and counts requests per Raft route. The contract for callers (today, kv.ShardedCoordinator):
- Construct a MemSampler with NewMemSampler.
- Wire it through the coordinator with a constructor option; the coordinator calls Sampler.Observe at the dispatch entry, after resolving the RouteID.
- Run Flush every StepSeconds in a background goroutine — RunFlusher does this for you with the supplied clock and ctx.
- Read the rendered matrix via Snapshot for the Admin gRPC service.
Hot-path properties (see design §5.1, §10):
- Observe is a single atomic.Pointer[routeTable].Load, a plain map lookup against an immutable snapshot, and at most two atomic.AddUint64 calls (one for the count, one for bytes — skipped when both keyLen and valueLen are zero). No allocation, no mutex.
- Flush drains the per-route counters with atomic.SwapUint64; no pointer retirement, so a late writer cannot race past the snapshot and lose counts.
- Adding / removing routes (RegisterRoute, RemoveRoute today; ApplySplit / ApplyMerge in a future PR) builds a fresh routeTable copy under a non-hot-path mutex and publishes it with a single atomic.Pointer.Store. Routes mutated mid-step keep their counters in the new table by design.
Index ¶
- Constants
- func RunFlusher(ctx context.Context, s *MemSampler, step time.Duration)
- type MatrixColumn
- type MatrixRow
- type MemSampler
- func (s *MemSampler) Flush()
- func (s *MemSampler) HistoryColumns() int
- func (s *MemSampler) Observe(routeID uint64, op Op, keyLen, valueLen int)
- func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte, groupID uint64) bool
- func (s *MemSampler) RemoveRoute(routeID uint64)
- func (s *MemSampler) SetLeaderTerm(groupID, term uint64)
- func (s *MemSampler) Snapshot(from, to time.Time) []MatrixColumn
- func (s *MemSampler) Step() time.Duration
- type MemSamplerOptions
- type Op
- type Sampler
- type Series
Constants ¶
const ( DefaultStep = 60 * time.Second DefaultHistoryColumns = 1440 // 24 hours at 60s steps. DefaultMaxTrackedRoutes = 10_000 DefaultMaxMemberRoutesPerSlot = 256 )
Defaults for MemSamplerOptions when fields are left zero.
const MaxHistoryColumns = 100_000
MaxHistoryColumns is the upper bound on opts.HistoryColumns. The ring buffer pre-allocates a slice of capacity HistoryColumns at construction; misconfiguration (e.g. an operator typo of 100_000_000) would otherwise reserve gigabytes up front. 100 000 columns at the default 60s Step is ~70 days of history — longer retention is the Phase 3 persistence path's job, not the in-memory ring's.
Variables ¶
This section is empty.
Functions ¶
func RunFlusher ¶
func RunFlusher(ctx context.Context, s *MemSampler, step time.Duration)
RunFlusher drives Sampler.Flush at the supplied interval until ctx is cancelled. Returns when ctx fires; the final tick is not executed (a graceful shutdown should call Sampler.Flush once more after RunFlusher returns if it wants to harvest the in-progress step).
step <= 0 falls back to DefaultStep.
This is a tiny wrapper so call sites in main.go don't need to spell out the ticker boilerplate; testing the boilerplate is the unit test for this package, not for callers.
Types ¶
type MatrixColumn ¶
MatrixColumn is one slice of the heatmap at a single flush time.
type MatrixRow ¶
type MatrixRow struct {
RouteID uint64
Start, End []byte
Aggregate bool
MemberRoutes []uint64
// MemberRoutesTotal is how many distinct route IDs contributed to
// this row's counters, including ones that exceeded
// MaxMemberRoutesPerSlot and so are NOT listed in MemberRoutes.
// Snapshot consumers should treat MemberRoutes as the visible
// prefix of this list when MemberRoutesTotal > len(MemberRoutes).
MemberRoutesTotal uint64
// RaftGroupID + LeaderTerm carry the route's Raft identity at the
// time the column was flushed. Stamped from the per-group term
// snapshot SetLeaderTerm publishes (Phase 2-C+ fan-out merge
// uses (RouteID/BucketID, RaftGroupID, LeaderTerm, columnAt) as
// the dedupe key). Zero values mean "term not tracked" — emitted
// when no SetLeaderTerm call has been made for the group yet, or
// for synthetic virtual-bucket slots that span groups.
RaftGroupID uint64
LeaderTerm uint64
Reads uint64
Writes uint64
ReadBytes uint64
WriteBytes uint64
}
MatrixRow is a single route or virtual bucket's counter snapshot taken at flush time.
type MemSampler ¶
type MemSampler struct {
// contains filtered or unexported fields
}
MemSampler is the in-process Sampler implementation. The zero value is not usable — construct via NewMemSampler.
func NewMemSampler ¶
func NewMemSampler(opts MemSamplerOptions) *MemSampler
NewMemSampler constructs a sampler with the supplied options. Zero fields fall back to the Default* constants; non-positive values (including explicitly-negative HistoryColumns) are clamped to a safe minimum by newRingBuffer. Always returns a usable sampler — callers should pass a zero options struct for the default configuration.
func (*MemSampler) Flush ¶
func (s *MemSampler) Flush()
Flush drains every slot's counters with atomic.SwapUint64 and appends one MatrixColumn to the ring buffer. Idle slots (all counters zero) are skipped to keep the column compact. Slots that RemoveRoute retired since the previous Flush are drained alongside the live table, so route churn does not silently lose counts.
Rows are emitted in Start-key order regardless of which slot list they came from (live, retired, or virtual-member-pruned), preserving the API contract that matrix consumers can rely on monotone Start across columns.
func (*MemSampler) HistoryColumns ¶
func (s *MemSampler) HistoryColumns() int
HistoryColumns returns the configured ring-buffer length after applying defaults and the MaxHistoryColumns clamp. Wiring tests use this to verify --keyvizHistoryColumns is forwarded end-to-end without exposing the internal opts struct.
func (*MemSampler) Observe ¶
func (s *MemSampler) Observe(routeID uint64, op Op, keyLen, valueLen int)
Observe records one request against a route. Cost on a hit: atomic.Pointer.Load + plain map lookup + 2× atomic.AddUint64 (count and bytes). Misses (RouteID never registered) drop silently — the route-watch subscriber is responsible for Register before the coordinator publishes the new RouteID.
func (*MemSampler) RegisterRoute ¶
func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte, groupID uint64) bool
RegisterRoute adds a (RouteID, [Start, End)) pair to the tracking set. groupID is the Raft group the route belongs to; it is stamped onto every MatrixRow this slot eventually emits so the Phase 2-C+ fan-out merge can dedupe write samples by (groupID, leaderTerm). Pass 0 when the deployment doesn't use multiple groups (legacy / single-group setups); the legacy max-merge fallback handles those.
Returns true when the route gets its own slot, false when the MaxTrackedRoutes cap was hit and the route was folded into a virtual aggregate bucket. Idempotent: calling twice with the same RouteID is a no-op (the original slot stays in place; a different groupID on the second call is silently ignored — RegisterRoute is not the right API to retag a slot, RemoveRoute + RegisterRoute is).
If a previous RemoveRoute(routeID) queued a deferred member-prune and the route is now re-registered into the SAME bucket inside the grace window, that prune is cancelled — otherwise the routeID would disappear from bucket.MemberRoutes despite Observe still attributing fresh traffic to it. Prunes for different buckets (or when the route rejoins as an individual slot) are left alone so the old bucket's MemberRoutes is correctly cleaned up.
func (*MemSampler) RemoveRoute ¶
func (s *MemSampler) RemoveRoute(routeID uint64)
RemoveRoute drops a RouteID from tracking. Counts accumulated since the last flush are NOT lost: the retired slot (or, for virtual-bucket members, just the membership entry) is queued for one final drain by the next Flush. Subsequent Observe(routeID, …) calls are silent no-ops. Idempotent.
func (*MemSampler) SetLeaderTerm ¶
func (s *MemSampler) SetLeaderTerm(groupID, term uint64)
SetLeaderTerm publishes the current Raft leader term for the given group. Called by main.go on a periodic ticker that polls the engine Status, and on every term-change observed via the engine. Phase 2-C+ stamps each MatrixRow's LeaderTerm from this snapshot at Flush time.
Calling with term == 0 is allowed but treated as "term unknown" during merge — the canonical (groupID, term) dedupe key collapses to the legacy max-merge for cells whose LeaderTerm is 0. This lets nodes that have not finished engine startup contribute partial data without poisoning the merge.
groupID == 0 is reserved for virtual aggregate buckets (which span multiple real groups). Calls with groupID == 0 are silently ignored so a future caller cannot accidentally stamp a non-zero term on aggregate rows — they must remain LeaderTerm == 0 so the fan-out merge falls back to max-merge for cross-group cells.
nil-receiver-safe.
func (*MemSampler) Snapshot ¶
func (s *MemSampler) Snapshot(from, to time.Time) []MatrixColumn
Snapshot returns the matrix columns in the supplied [from, to) half-open time range. Ordering is oldest-first. Series selects which MatrixRow value the caller is going to display; the slot metadata (RouteID, Start, End, Aggregate, MemberRoutes) is included on every row so a UI can render the same response with different series without a re-query.
func (*MemSampler) Step ¶
func (s *MemSampler) Step() time.Duration
Step returns the configured flush interval after applying default fallbacks. Callers wiring up RunFlusher can use this to align their ticker with the sampler's expectations rather than passing the interval through two configuration paths.
type MemSamplerOptions ¶
type MemSamplerOptions struct {
// Step is the flush interval. The ring buffer's resolution.
Step time.Duration
// HistoryColumns caps the ring buffer length. Older columns are
// dropped on push when the buffer is full.
HistoryColumns int
// MaxTrackedRoutes caps the number of routes whose counters are
// kept individually before coarsening kicks in. RegisterRoute
// returns false past this cap, the route ID maps into a virtual
// bucket, and Snapshot reports it with Aggregate=true.
MaxTrackedRoutes int
// MaxMemberRoutesPerSlot caps how many distinct RouteIDs a single
// virtual bucket records in MemberRoutes. Beyond this cap the
// route still folds into the bucket counters (so traffic is not
// dropped) but the routeID is not appended — keeping per-column
// payload size bounded when total routes far exceed
// MaxTrackedRoutes. Snapshot consumers should treat the list as
// "first N members" rather than authoritative attribution.
MaxMemberRoutesPerSlot int
// Now overrides time.Now for tests; nil falls back to time.Now.
Now func() time.Time
}
MemSamplerOptions configures NewMemSampler. Zero values fall back to the Default* constants above; passing a struct literal with only the fields you care about is the expected call style.
type Sampler ¶
type Sampler interface {
// Observe records a single request against a route. Op identifies
// the counter family. keyLen and valueLen are summed into the
// matching *Bytes counter; pass 0 for read-only ops where the
// payload size is irrelevant. Implementations must no-op (not
// panic) when invoked on a typed-nil receiver.
Observe(routeID uint64, op Op, keyLen, valueLen int)
}
Sampler is the narrow interface the coordinator depends on. The nil-safe contract is documented per-method so a coordinator wired without a sampler compiles to a no-op call.
Implementations MUST be nil-receiver-safe: a typed-nil implementation passed through this interface (e.g. `var s Sampler = (*MemSampler)(nil)`) must not panic when its methods are called. The coordinator stores the interface value as supplied and dispatches through it on the hot path; a guard at the call site only checks for an interface-nil, not a typed-nil.