keyviz

package
v0.0.0-...-65e725a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: AGPL-3.0 Imports: 6 Imported by: 0

Documentation

Overview

Package keyviz implements the in-memory sampler that backs the docs/admin_ui_key_visualizer_design.md heatmap.

The sampler sits on the data-plane hot path and counts requests per Raft route. The contract for callers (today, kv.ShardedCoordinator):

  • Construct a MemSampler with NewMemSampler.
  • Wire it through the coordinator with a constructor option; the coordinator calls Sampler.Observe at the dispatch entry, after resolving the RouteID.
  • Run Flush every StepSeconds in a background goroutine — RunFlusher does this for you with the supplied clock and ctx.
  • Read the rendered matrix via Snapshot for the Admin gRPC service.

Hot-path properties (see design §5.1, §10):

  • Observe is a single atomic.Pointer[routeTable].Load, a plain map lookup against an immutable snapshot, and at most two atomic.AddUint64 calls (one for the count, one for bytes — skipped when both keyLen and valueLen are zero). No allocation, no mutex.
  • Flush drains the per-route counters with atomic.SwapUint64; no pointer retirement, so a late writer cannot race past the snapshot and lose counts.
  • Adding / removing routes (RegisterRoute, RemoveRoute today; ApplySplit / ApplyMerge in a future PR) builds a fresh routeTable copy under a non-hot-path mutex and publishes it with a single atomic.Pointer.Store. Routes mutated mid-step keep their counters in the new table by design.

Index

Constants

View Source
const (
	DefaultStep                   = 60 * time.Second
	DefaultHistoryColumns         = 1440 // 24 hours at 60s steps.
	DefaultMaxTrackedRoutes       = 10_000
	DefaultMaxMemberRoutesPerSlot = 256
)

Defaults for MemSamplerOptions when fields are left zero.

View Source
const MaxHistoryColumns = 100_000

MaxHistoryColumns is the upper bound on opts.HistoryColumns. The ring buffer pre-allocates a slice of capacity HistoryColumns at construction; misconfiguration (e.g. an operator typo of 100_000_000) would otherwise reserve gigabytes up front. 100 000 columns at the default 60s Step is ~70 days of history — longer retention is the Phase 3 persistence path's job, not the in-memory ring's.

Variables

This section is empty.

Functions

func RunFlusher

func RunFlusher(ctx context.Context, s *MemSampler, step time.Duration)

RunFlusher drives Sampler.Flush at the supplied interval until ctx is cancelled. Returns when ctx fires; the final tick is not executed (a graceful shutdown should call Sampler.Flush once more after RunFlusher returns if it wants to harvest the in-progress step).

step <= 0 falls back to DefaultStep.

This is a tiny wrapper so call sites in main.go don't need to spell out the ticker boilerplate; testing the boilerplate is the unit test for this package, not for callers.

Types

type MatrixColumn

type MatrixColumn struct {
	At   time.Time
	Rows []MatrixRow
}

MatrixColumn is one slice of the heatmap at a single flush time.

type MatrixRow

type MatrixRow struct {
	RouteID      uint64
	Start, End   []byte
	Aggregate    bool
	MemberRoutes []uint64
	// MemberRoutesTotal is how many distinct route IDs contributed to
	// this row's counters, including ones that exceeded
	// MaxMemberRoutesPerSlot and so are NOT listed in MemberRoutes.
	// Snapshot consumers should treat MemberRoutes as the visible
	// prefix of this list when MemberRoutesTotal > len(MemberRoutes).
	MemberRoutesTotal uint64

	// RaftGroupID + LeaderTerm carry the route's Raft identity at the
	// time the column was flushed. Stamped from the per-group term
	// snapshot SetLeaderTerm publishes (Phase 2-C+ fan-out merge
	// uses (RouteID/BucketID, RaftGroupID, LeaderTerm, columnAt) as
	// the dedupe key). Zero values mean "term not tracked" — emitted
	// when no SetLeaderTerm call has been made for the group yet, or
	// for synthetic virtual-bucket slots that span groups.
	RaftGroupID uint64
	LeaderTerm  uint64

	Reads      uint64
	Writes     uint64
	ReadBytes  uint64
	WriteBytes uint64
}

MatrixRow is a single route or virtual bucket's counter snapshot taken at flush time.

type MemSampler

type MemSampler struct {
	// contains filtered or unexported fields
}

MemSampler is the in-process Sampler implementation. The zero value is not usable — construct via NewMemSampler.

func NewMemSampler

func NewMemSampler(opts MemSamplerOptions) *MemSampler

NewMemSampler constructs a sampler with the supplied options. Zero fields fall back to the Default* constants; non-positive values (including explicitly-negative HistoryColumns) are clamped to a safe minimum by newRingBuffer. Always returns a usable sampler — callers should pass a zero options struct for the default configuration.

func (*MemSampler) Flush

func (s *MemSampler) Flush()

Flush drains every slot's counters with atomic.SwapUint64 and appends one MatrixColumn to the ring buffer. Idle slots (all counters zero) are skipped to keep the column compact. Slots that RemoveRoute retired since the previous Flush are drained alongside the live table, so route churn does not silently lose counts.

Rows are emitted in Start-key order regardless of which slot list they came from (live, retired, or virtual-member-pruned), preserving the API contract that matrix consumers can rely on monotone Start across columns.

func (*MemSampler) HistoryColumns

func (s *MemSampler) HistoryColumns() int

HistoryColumns returns the configured ring-buffer length after applying defaults and the MaxHistoryColumns clamp. Wiring tests use this to verify --keyvizHistoryColumns is forwarded end-to-end without exposing the internal opts struct.

func (*MemSampler) Observe

func (s *MemSampler) Observe(routeID uint64, op Op, keyLen, valueLen int)

Observe records one request against a route. Cost on a hit: atomic.Pointer.Load + plain map lookup + 2× atomic.AddUint64 (count and bytes). Misses (RouteID never registered) drop silently — the route-watch subscriber is responsible for Register before the coordinator publishes the new RouteID.

func (*MemSampler) RegisterRoute

func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte, groupID uint64) bool

RegisterRoute adds a (RouteID, [Start, End)) pair to the tracking set. groupID is the Raft group the route belongs to; it is stamped onto every MatrixRow this slot eventually emits so the Phase 2-C+ fan-out merge can dedupe write samples by (groupID, leaderTerm). Pass 0 when the deployment doesn't use multiple groups (legacy / single-group setups); the legacy max-merge fallback handles those.

Returns true when the route gets its own slot, false when the MaxTrackedRoutes cap was hit and the route was folded into a virtual aggregate bucket. Idempotent: calling twice with the same RouteID is a no-op (the original slot stays in place; a different groupID on the second call is silently ignored — RegisterRoute is not the right API to retag a slot, RemoveRoute + RegisterRoute is).

If a previous RemoveRoute(routeID) queued a deferred member-prune and the route is now re-registered into the SAME bucket inside the grace window, that prune is cancelled — otherwise the routeID would disappear from bucket.MemberRoutes despite Observe still attributing fresh traffic to it. Prunes for different buckets (or when the route rejoins as an individual slot) are left alone so the old bucket's MemberRoutes is correctly cleaned up.

func (*MemSampler) RemoveRoute

func (s *MemSampler) RemoveRoute(routeID uint64)

RemoveRoute drops a RouteID from tracking. Counts accumulated since the last flush are NOT lost: the retired slot (or, for virtual-bucket members, just the membership entry) is queued for one final drain by the next Flush. Subsequent Observe(routeID, …) calls are silent no-ops. Idempotent.

func (*MemSampler) SetLeaderTerm

func (s *MemSampler) SetLeaderTerm(groupID, term uint64)

SetLeaderTerm publishes the current Raft leader term for the given group. Called by main.go on a periodic ticker that polls the engine Status, and on every term-change observed via the engine. Phase 2-C+ stamps each MatrixRow's LeaderTerm from this snapshot at Flush time.

Calling with term == 0 is allowed but treated as "term unknown" during merge — the canonical (groupID, term) dedupe key collapses to the legacy max-merge for cells whose LeaderTerm is 0. This lets nodes that have not finished engine startup contribute partial data without poisoning the merge.

groupID == 0 is reserved for virtual aggregate buckets (which span multiple real groups). Calls with groupID == 0 are silently ignored so a future caller cannot accidentally stamp a non-zero term on aggregate rows — they must remain LeaderTerm == 0 so the fan-out merge falls back to max-merge for cross-group cells.

nil-receiver-safe.

func (*MemSampler) Snapshot

func (s *MemSampler) Snapshot(from, to time.Time) []MatrixColumn

Snapshot returns the matrix columns in the supplied [from, to) half-open time range. Ordering is oldest-first. Series selects which MatrixRow value the caller is going to display; the slot metadata (RouteID, Start, End, Aggregate, MemberRoutes) is included on every row so a UI can render the same response with different series without a re-query.

func (*MemSampler) Step

func (s *MemSampler) Step() time.Duration

Step returns the configured flush interval after applying default fallbacks. Callers wiring up RunFlusher can use this to align their ticker with the sampler's expectations rather than passing the interval through two configuration paths.

type MemSamplerOptions

type MemSamplerOptions struct {
	// Step is the flush interval. The ring buffer's resolution.
	Step time.Duration
	// HistoryColumns caps the ring buffer length. Older columns are
	// dropped on push when the buffer is full.
	HistoryColumns int
	// MaxTrackedRoutes caps the number of routes whose counters are
	// kept individually before coarsening kicks in. RegisterRoute
	// returns false past this cap, the route ID maps into a virtual
	// bucket, and Snapshot reports it with Aggregate=true.
	MaxTrackedRoutes int
	// MaxMemberRoutesPerSlot caps how many distinct RouteIDs a single
	// virtual bucket records in MemberRoutes. Beyond this cap the
	// route still folds into the bucket counters (so traffic is not
	// dropped) but the routeID is not appended — keeping per-column
	// payload size bounded when total routes far exceed
	// MaxTrackedRoutes. Snapshot consumers should treat the list as
	// "first N members" rather than authoritative attribution.
	MaxMemberRoutesPerSlot int
	// Now overrides time.Now for tests; nil falls back to time.Now.
	Now func() time.Time
}

MemSamplerOptions configures NewMemSampler. Zero values fall back to the Default* constants above; passing a struct literal with only the fields you care about is the expected call style.

type Op

type Op uint8

Op identifies which counter family Observe should bump.

const (
	OpRead Op = iota
	OpWrite
)

type Sampler

type Sampler interface {
	// Observe records a single request against a route. Op identifies
	// the counter family. keyLen and valueLen are summed into the
	// matching *Bytes counter; pass 0 for read-only ops where the
	// payload size is irrelevant. Implementations must no-op (not
	// panic) when invoked on a typed-nil receiver.
	Observe(routeID uint64, op Op, keyLen, valueLen int)
}

Sampler is the narrow interface the coordinator depends on. The nil-safe contract is documented per-method so a coordinator wired without a sampler compiles to a no-op call.

Implementations MUST be nil-receiver-safe: a typed-nil implementation passed through this interface (e.g. `var s Sampler = (*MemSampler)(nil)`) must not panic when its methods are called. The coordinator stores the interface value as supplied and dispatches through it on the hot path; a guard at the call site only checks for an interface-nil, not a typed-nil.

type Series

type Series uint8

Series selects which counter the matrix response should expose.

const (
	SeriesReads Series = iota
	SeriesWrites
	SeriesReadBytes
	SeriesWriteBytes
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL