distribution

package
v0.0.0-...-ebdf8eb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: AGPL-3.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultRouteHistoryDepth = 32

DefaultRouteHistoryDepth is the size of Engine's versioned-snapshot ring used by the Composed-1 M2 plumbing. 32 is conservative against current single-leader catalog churn (operator-frequency, not data-plane) per the design doc §9 Q2; raise if a future control plane generates more than ~tens of versions per second.

Variables

View Source
var (
	ErrCatalogStoreRequired        = errors.New("catalog store is required")
	ErrCatalogVersionMismatch      = errors.New("catalog version mismatch")
	ErrCatalogVersionOverflow      = errors.New("catalog version overflow")
	ErrCatalogRouteIDOverflow      = errors.New("catalog route id overflow")
	ErrCatalogRouteIDRequired      = errors.New("catalog route id is required")
	ErrCatalogGroupIDRequired      = errors.New("catalog group id is required")
	ErrCatalogDuplicateRouteID     = errors.New("catalog route id must be unique")
	ErrCatalogInvalidRouteRange    = errors.New("catalog route range is invalid")
	ErrCatalogInvalidVersionRecord = errors.New("catalog version record is invalid")
	ErrCatalogInvalidNextRouteID   = errors.New("catalog next route id record is invalid")
	ErrCatalogInvalidRouteRecord   = errors.New("catalog route record is invalid")
	ErrCatalogInvalidRouteState    = errors.New("catalog route state is invalid")
	ErrCatalogInvalidRouteKey      = errors.New("catalog route key is invalid")
	ErrCatalogRouteKeyIDMismatch   = errors.New("catalog route key and record route id mismatch")
)
View Source
var (
	ErrEngineSnapshotVersionStale = errors.New("engine snapshot version is stale")
	ErrEngineSnapshotDuplicateID  = errors.New("engine snapshot has duplicate route id")
	ErrEngineSnapshotRouteOverlap = errors.New("engine snapshot has overlapping routes")
	ErrEngineSnapshotRouteOrder   = errors.New("engine snapshot has invalid route order")
)
View Source
var ErrEngineRequired = errors.New("engine is required")

Functions

func CatalogNextRouteIDKey

func CatalogNextRouteIDKey() []byte

CatalogNextRouteIDKey returns the reserved key used for next route id storage.

func CatalogRouteIDFromKey

func CatalogRouteIDFromKey(key []byte) (uint64, bool)

CatalogRouteIDFromKey parses the route ID from a catalog route key.

func CatalogRouteKey

func CatalogRouteKey(routeID uint64) []byte

CatalogRouteKey returns the reserved key used for a route descriptor.

func CatalogVersionKey

func CatalogVersionKey() []byte

CatalogVersionKey returns the reserved key used for catalog version storage.

func CloneBytes

func CloneBytes(b []byte) []byte

CloneBytes returns a copied byte slice.

func DecodeCatalogNextRouteID

func DecodeCatalogNextRouteID(raw []byte) (uint64, error)

DecodeCatalogNextRouteID deserializes a next route id record.

func DecodeCatalogVersion

func DecodeCatalogVersion(raw []byte) (uint64, error)

DecodeCatalogVersion deserializes a catalog version record.

func EncodeCatalogNextRouteID

func EncodeCatalogNextRouteID(nextRouteID uint64) []byte

EncodeCatalogNextRouteID serializes a next route id record.

func EncodeCatalogVersion

func EncodeCatalogVersion(version uint64) []byte

EncodeCatalogVersion serializes a catalog version record.

func EncodeRouteDescriptor

func EncodeRouteDescriptor(route RouteDescriptor) ([]byte, error)

EncodeRouteDescriptor serializes a route descriptor record.

func IsCatalogRouteKey

func IsCatalogRouteKey(key []byte) bool

IsCatalogRouteKey reports whether key belongs to the route catalog keyspace.

func NextRouteIDFloor

func NextRouteIDFloor(routes []RouteDescriptor) (uint64, error)

NextRouteIDFloor returns the minimum valid next route ID for routes. It is shared by catalog persistence and split planning to keep route-ID allocation rules consistent.

func RunCatalogWatcher

func RunCatalogWatcher(ctx context.Context, catalog *CatalogStore, engine *Engine, logger *slog.Logger) error

RunCatalogWatcher runs CatalogWatcher with optional logger override.

Types

type CatalogSnapshot

type CatalogSnapshot struct {
	Version uint64
	Routes  []RouteDescriptor
	ReadTS  uint64
}

CatalogSnapshot is a point-in-time snapshot of the route catalog.

func EnsureCatalogSnapshot

func EnsureCatalogSnapshot(ctx context.Context, catalog *CatalogStore, engine *Engine) (CatalogSnapshot, error)

EnsureCatalogSnapshot makes engine and durable catalog consistent at startup.

If the durable catalog is empty (version 0, no routes), the current in-memory engine routes are persisted as the initial catalog snapshot with generated non-zero RouteIDs. Then the resolved snapshot is applied back to engine.

type CatalogStore

type CatalogStore struct {
	// contains filtered or unexported fields
}

CatalogStore provides persistence helpers for route catalog state.

func NewCatalogStore

func NewCatalogStore(st store.MVCCStore) *CatalogStore

NewCatalogStore creates a route catalog persistence helper.

func (*CatalogStore) NextRouteID

func (s *CatalogStore) NextRouteID(ctx context.Context) (uint64, error)

NextRouteID reads the next route id counter from catalog metadata.

func (*CatalogStore) NextRouteIDAt

func (s *CatalogStore) NextRouteIDAt(ctx context.Context, ts uint64) (uint64, error)

NextRouteIDAt reads the next route id counter at a given snapshot timestamp.

func (*CatalogStore) Save

func (s *CatalogStore) Save(ctx context.Context, expectedVersion uint64, routes []RouteDescriptor) (CatalogSnapshot, error)

Save updates the route catalog using optimistic version checks and bumps the catalog version by exactly one on success.

func (*CatalogStore) Snapshot

func (s *CatalogStore) Snapshot(ctx context.Context) (CatalogSnapshot, error)

Snapshot reads a consistent route catalog snapshot at the store's latest known commit timestamp.

func (*CatalogStore) Version

func (s *CatalogStore) Version(ctx context.Context) (uint64, error)

Version reads only the durable catalog version at the latest commit timestamp.

type CatalogWatcher

type CatalogWatcher struct {
	// contains filtered or unexported fields
}

CatalogWatcher periodically refreshes Engine from durable catalog snapshots.

func NewCatalogWatcher

func NewCatalogWatcher(catalog *CatalogStore, engine *Engine, opts ...CatalogWatcherOption) *CatalogWatcher

NewCatalogWatcher creates a watcher that polls the durable route catalog and applies newer snapshots to the in-memory engine.

func (*CatalogWatcher) Run

func (w *CatalogWatcher) Run(ctx context.Context) error

Run starts polling and only returns when ctx is canceled or initialization requirements are not met. Snapshot read/apply failures are retried.

func (*CatalogWatcher) SyncOnce

func (w *CatalogWatcher) SyncOnce(ctx context.Context) error

SyncOnce applies the latest durable snapshot when its version is newer than the current engine version.

type CatalogWatcherOption

type CatalogWatcherOption func(*CatalogWatcher)

CatalogWatcherOption customizes CatalogWatcher behavior.

func WithCatalogWatcherInterval

func WithCatalogWatcherInterval(interval time.Duration) CatalogWatcherOption

WithCatalogWatcherInterval sets the catalog polling interval.

func WithCatalogWatcherLogger

func WithCatalogWatcherLogger(logger *slog.Logger) CatalogWatcherOption

WithCatalogWatcherLogger sets the logger for watcher background retries.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine holds in-memory metadata of routes and provides timestamp generation.

func NewEngine

func NewEngine() *Engine

NewEngine creates an Engine with no hotspot splitting.

func NewEngineWithDefaultRoute

func NewEngineWithDefaultRoute() *Engine

NewEngineWithDefaultRoute creates an Engine and registers a default route covering the full keyspace with a default group ID. The default route is also recorded in the M2 history ring as the version-0 snapshot so transactions that observed catalogVersion = 0 can resolve their read-set owner through SnapshotAt(0).

func NewEngineWithThreshold

func NewEngineWithThreshold(threshold uint64) *Engine

NewEngineWithThreshold creates an Engine and sets a threshold for hotspot detection. A non-zero threshold enables automatic range splitting when the number of accesses to a range exceeds the threshold.

func (*Engine) ApplySnapshot

func (e *Engine) ApplySnapshot(snapshot CatalogSnapshot) error

ApplySnapshot atomically replaces all in-memory routes with the provided catalog snapshot when the snapshot version is newer.

func (*Engine) Current

func (e *Engine) Current() (RouteHistorySnapshot, bool)

Current returns the route catalog snapshot at the engine's current catalogVersion. Returns (zero, false) when the history ring has not been initialised (bare-struct Engine). Used by the M3 Composed-1 cross-version-read fence (design doc §4.4) — the gate compares the txn's observed-version owner against the current owner so a route shift between BeginTxn and Commit is caught before it can produce a G1c anomaly across a cross-group MoveRange / SplitRange.

func (*Engine) GetIntersectingRoutes

func (e *Engine) GetIntersectingRoutes(start, end []byte) []Route

GetIntersectingRoutes returns all routes whose key ranges intersect with [start, end). A route [rStart, rEnd) intersects with [start, end) if: - rStart < end (or end is nil, meaning unbounded scan) - start < rEnd (or rEnd is nil, meaning unbounded route)

func (*Engine) GetRoute

func (e *Engine) GetRoute(key []byte) (Route, bool)

GetRoute finds a route for the given key using right half-open intervals.

func (*Engine) HistoryDepth

func (e *Engine) HistoryDepth() int

HistoryDepth returns the configured ring depth for diagnostics.

func (*Engine) NextTimestamp

func (e *Engine) NextTimestamp() uint64

NextTimestamp returns a monotonic increasing timestamp.

func (*Engine) RecordAccess

func (e *Engine) RecordAccess(key []byte)

RecordAccess increases the access counter for the range containing key and splits the range if it turns into a hotspot. The load counter is updated atomically under a read lock to allow concurrent access recording. If the hotspot threshold is exceeded, RecordAccess acquires a full write lock and re-checks the condition before splitting to avoid races with concurrent splits.

func (*Engine) SetHistoryDepthForTest

func (e *Engine) SetHistoryDepthForTest(depth int)

SetHistoryDepthForTest overrides the FIFO ring depth from outside the package. Test-only. Callers should set the depth before sharing the Engine with concurrent SnapshotAt/Current readers to avoid interleaving surprises around the eviction watermark, but the write itself is lock-protected (e.mu.Lock below) so it is safe to call from any goroutine that does not also expect a consistent SnapshotAt view across the depth change.

Exists so tests in the kv package can drive eviction-trigger scenarios without adding a constructor option just for tests (claude review on PR #894). Production code must use DefaultRouteHistoryDepth (32) or a future operator-exposed config knob.

Fails fast on depth <= 0 (coderabbit minor on PR #895): recordHistorySnapshotLocked's eviction path indexes historyOrder[0], so a zero/negative depth would surface as a confusing index-out-of-range deep in the apply path instead of at the misconfigured test seam. When shrinking depth below the current ring size, evict the excess oldest entries immediately rather than letting the next record see len(historyOrder) > historyDepth (gemini medium on PR #895 — without this trim, the next recordHistorySnapshotLocked's `make([]uint64, len-1, historyDepth)` would panic on len-1 > historyDepth).

func (*Engine) SnapshotAt

func (e *Engine) SnapshotAt(v uint64) (RouteHistorySnapshot, bool)

SnapshotAt returns the route catalog snapshot recorded at version v. Returns (zero, false) when v is not in the ring — either because v is in the future (> catalogVersion), or because the FIFO ring has evicted v (it was older than the historyDepth-most-recent versions). The M3 Composed-1 gate (design doc §4.3) treats the not-found case as a hard error and triggers a coordinator retry, so retention depth is a liveness knob, not a safety knob.

func (*Engine) Stats

func (e *Engine) Stats() []Route

Stats returns a snapshot of current ranges and their load counters.

func (*Engine) UpdateRoute

func (e *Engine) UpdateRoute(start, end []byte, group uint64)

UpdateRoute registers or updates a route for the given key range. Routes are stored sorted by Start.

func (*Engine) Version

func (e *Engine) Version() uint64

Version returns current route catalog version applied to the engine.

type Route

type Route struct {
	// RouteID is the durable identifier assigned by route catalog.
	// Zero means ephemeral/non-catalog routes.
	RouteID uint64
	// Start marks the inclusive beginning of the range.
	Start []byte
	// End marks the exclusive end of the range. nil means unbounded.
	End []byte
	// GroupID identifies the raft group for the range starting at Start.
	GroupID uint64
	// State tracks control-plane state for this route.
	State RouteState
	// Load tracks the number of accesses served by this range.
	Load uint64
}

Route represents a mapping from a key range to a raft group. Ranges are right half-open intervals: [Start, End). Start is inclusive and End is exclusive. A nil End denotes an unbounded interval extending to positive infinity.

type RouteDescriptor

type RouteDescriptor struct {
	RouteID       uint64
	Start         []byte
	End           []byte
	GroupID       uint64
	State         RouteState
	ParentRouteID uint64
}

RouteDescriptor is the durable representation of a route.

func CloneRouteDescriptor

func CloneRouteDescriptor(route RouteDescriptor) RouteDescriptor

CloneRouteDescriptor returns a deep copy of route.

func DecodeRouteDescriptor

func DecodeRouteDescriptor(raw []byte) (RouteDescriptor, error)

DecodeRouteDescriptor deserializes a route descriptor record.

type RouteHistorySnapshot

type RouteHistorySnapshot struct {
	// contains filtered or unexported fields
}

RouteHistorySnapshot is a point-in-time view of the route catalog at a specific version. Returned by Engine.SnapshotAt for the M3 Composed-1 commit-time gate. Carries an immutable copy of the catalog's routes at the recorded version so a caller can resolve ownership without holding the Engine lock.

func (RouteHistorySnapshot) OwnerOf

func (s RouteHistorySnapshot) OwnerOf(key []byte) (uint64, bool)

OwnerOf returns the Raft group ID that owned key at this snapshot's version. Returns (0, false) when no route covers key (the pre-bootstrap state or an explicitly-uncovered range). Mirrors Engine.GetRoute's right-half-open interval semantics but against the historical snapshot, not the live engine state.

Routes are sorted by Start (recordHistorySnapshotLocked clones from e.routes, which Engine.UpdateRoute / routesFromCatalog keep sorted), so the scan can break the moment key < r.Start — every later route has a strictly greater Start and cannot cover key either. This matters because M3 puts OwnerOf on every txn commit's apply path (claude review on PR #894 — break-vs-continue lifts the worst-case scan from O(N) to "first non-covering gap" without changing the resolution semantics).

func (RouteHistorySnapshot) Version

func (s RouteHistorySnapshot) Version() uint64

Version returns the catalog version this snapshot was recorded at.

type RouteState

type RouteState byte

RouteState describes the control-plane state of a route.

const (
	// RouteStateActive is a normal serving route.
	RouteStateActive RouteState = iota
	// RouteStateWriteFenced blocks writes during cutover.
	RouteStateWriteFenced
	// RouteStateMigratingSource means range data is being copied out.
	RouteStateMigratingSource
	// RouteStateMigratingTarget means range data is being copied in.
	RouteStateMigratingTarget
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL