Documentation
¶
Index ¶
- Constants
- Variables
- func CatalogNextRouteIDKey() []byte
- func CatalogRouteIDFromKey(key []byte) (uint64, bool)
- func CatalogRouteKey(routeID uint64) []byte
- func CatalogVersionKey() []byte
- func CloneBytes(b []byte) []byte
- func DecodeCatalogNextRouteID(raw []byte) (uint64, error)
- func DecodeCatalogVersion(raw []byte) (uint64, error)
- func EncodeCatalogNextRouteID(nextRouteID uint64) []byte
- func EncodeCatalogVersion(version uint64) []byte
- func EncodeRouteDescriptor(route RouteDescriptor) ([]byte, error)
- func IsCatalogRouteKey(key []byte) bool
- func NextRouteIDFloor(routes []RouteDescriptor) (uint64, error)
- func RunCatalogWatcher(ctx context.Context, catalog *CatalogStore, engine *Engine, ...) error
- type CatalogSnapshot
- type CatalogStore
- func (s *CatalogStore) NextRouteID(ctx context.Context) (uint64, error)
- func (s *CatalogStore) NextRouteIDAt(ctx context.Context, ts uint64) (uint64, error)
- func (s *CatalogStore) Save(ctx context.Context, expectedVersion uint64, routes []RouteDescriptor) (CatalogSnapshot, error)
- func (s *CatalogStore) Snapshot(ctx context.Context) (CatalogSnapshot, error)
- func (s *CatalogStore) Version(ctx context.Context) (uint64, error)
- type CatalogWatcher
- type CatalogWatcherOption
- type Engine
- func (e *Engine) ApplySnapshot(snapshot CatalogSnapshot) error
- func (e *Engine) Current() (RouteHistorySnapshot, bool)
- func (e *Engine) GetIntersectingRoutes(start, end []byte) []Route
- func (e *Engine) GetRoute(key []byte) (Route, bool)
- func (e *Engine) HistoryDepth() int
- func (e *Engine) NextTimestamp() uint64
- func (e *Engine) RecordAccess(key []byte)
- func (e *Engine) SetHistoryDepthForTest(depth int)
- func (e *Engine) SnapshotAt(v uint64) (RouteHistorySnapshot, bool)
- func (e *Engine) Stats() []Route
- func (e *Engine) UpdateRoute(start, end []byte, group uint64)
- func (e *Engine) Version() uint64
- type Route
- type RouteDescriptor
- type RouteHistorySnapshot
- type RouteState
Constants ¶
const DefaultRouteHistoryDepth = 32
DefaultRouteHistoryDepth is the size of Engine's versioned-snapshot ring used by the Composed-1 M2 plumbing. 32 is conservative against current single-leader catalog churn (operator-frequency, not data-plane) per the design doc §9 Q2; raise if a future control plane generates more than ~tens of versions per second.
Variables ¶
var ( ErrCatalogStoreRequired = errors.New("catalog store is required") ErrCatalogVersionMismatch = errors.New("catalog version mismatch") ErrCatalogVersionOverflow = errors.New("catalog version overflow") ErrCatalogRouteIDOverflow = errors.New("catalog route id overflow") ErrCatalogRouteIDRequired = errors.New("catalog route id is required") ErrCatalogGroupIDRequired = errors.New("catalog group id is required") ErrCatalogDuplicateRouteID = errors.New("catalog route id must be unique") ErrCatalogInvalidRouteRange = errors.New("catalog route range is invalid") ErrCatalogInvalidVersionRecord = errors.New("catalog version record is invalid") ErrCatalogInvalidNextRouteID = errors.New("catalog next route id record is invalid") ErrCatalogInvalidRouteRecord = errors.New("catalog route record is invalid") ErrCatalogInvalidRouteState = errors.New("catalog route state is invalid") ErrCatalogInvalidRouteKey = errors.New("catalog route key is invalid") ErrCatalogRouteKeyIDMismatch = errors.New("catalog route key and record route id mismatch") )
var ( ErrEngineSnapshotVersionStale = errors.New("engine snapshot version is stale") ErrEngineSnapshotDuplicateID = errors.New("engine snapshot has duplicate route id") ErrEngineSnapshotRouteOverlap = errors.New("engine snapshot has overlapping routes") ErrEngineSnapshotRouteOrder = errors.New("engine snapshot has invalid route order") )
var ErrEngineRequired = errors.New("engine is required")
Functions ¶
func CatalogNextRouteIDKey ¶
func CatalogNextRouteIDKey() []byte
CatalogNextRouteIDKey returns the reserved key used for next route id storage.
func CatalogRouteIDFromKey ¶
CatalogRouteIDFromKey parses the route ID from a catalog route key.
func CatalogRouteKey ¶
CatalogRouteKey returns the reserved key used for a route descriptor.
func CatalogVersionKey ¶
func CatalogVersionKey() []byte
CatalogVersionKey returns the reserved key used for catalog version storage.
func DecodeCatalogNextRouteID ¶
DecodeCatalogNextRouteID deserializes a next route id record.
func DecodeCatalogVersion ¶
DecodeCatalogVersion deserializes a catalog version record.
func EncodeCatalogNextRouteID ¶
EncodeCatalogNextRouteID serializes a next route id record.
func EncodeCatalogVersion ¶
EncodeCatalogVersion serializes a catalog version record.
func EncodeRouteDescriptor ¶
func EncodeRouteDescriptor(route RouteDescriptor) ([]byte, error)
EncodeRouteDescriptor serializes a route descriptor record.
func IsCatalogRouteKey ¶
IsCatalogRouteKey reports whether key belongs to the route catalog keyspace.
func NextRouteIDFloor ¶
func NextRouteIDFloor(routes []RouteDescriptor) (uint64, error)
NextRouteIDFloor returns the minimum valid next route ID for routes. It is shared by catalog persistence and split planning to keep route-ID allocation rules consistent.
func RunCatalogWatcher ¶
func RunCatalogWatcher(ctx context.Context, catalog *CatalogStore, engine *Engine, logger *slog.Logger) error
RunCatalogWatcher runs CatalogWatcher with optional logger override.
Types ¶
type CatalogSnapshot ¶
type CatalogSnapshot struct {
Version uint64
Routes []RouteDescriptor
ReadTS uint64
}
CatalogSnapshot is a point-in-time snapshot of the route catalog.
func EnsureCatalogSnapshot ¶
func EnsureCatalogSnapshot(ctx context.Context, catalog *CatalogStore, engine *Engine) (CatalogSnapshot, error)
EnsureCatalogSnapshot makes engine and durable catalog consistent at startup.
If the durable catalog is empty (version 0, no routes), the current in-memory engine routes are persisted as the initial catalog snapshot with generated non-zero RouteIDs. Then the resolved snapshot is applied back to engine.
type CatalogStore ¶
type CatalogStore struct {
// contains filtered or unexported fields
}
CatalogStore provides persistence helpers for route catalog state.
func NewCatalogStore ¶
func NewCatalogStore(st store.MVCCStore) *CatalogStore
NewCatalogStore creates a route catalog persistence helper.
func (*CatalogStore) NextRouteID ¶
func (s *CatalogStore) NextRouteID(ctx context.Context) (uint64, error)
NextRouteID reads the next route id counter from catalog metadata.
func (*CatalogStore) NextRouteIDAt ¶
NextRouteIDAt reads the next route id counter at a given snapshot timestamp.
func (*CatalogStore) Save ¶
func (s *CatalogStore) Save(ctx context.Context, expectedVersion uint64, routes []RouteDescriptor) (CatalogSnapshot, error)
Save updates the route catalog using optimistic version checks and bumps the catalog version by exactly one on success.
func (*CatalogStore) Snapshot ¶
func (s *CatalogStore) Snapshot(ctx context.Context) (CatalogSnapshot, error)
Snapshot reads a consistent route catalog snapshot at the store's latest known commit timestamp.
type CatalogWatcher ¶
type CatalogWatcher struct {
// contains filtered or unexported fields
}
CatalogWatcher periodically refreshes Engine from durable catalog snapshots.
func NewCatalogWatcher ¶
func NewCatalogWatcher(catalog *CatalogStore, engine *Engine, opts ...CatalogWatcherOption) *CatalogWatcher
NewCatalogWatcher creates a watcher that polls the durable route catalog and applies newer snapshots to the in-memory engine.
type CatalogWatcherOption ¶
type CatalogWatcherOption func(*CatalogWatcher)
CatalogWatcherOption customizes CatalogWatcher behavior.
func WithCatalogWatcherInterval ¶
func WithCatalogWatcherInterval(interval time.Duration) CatalogWatcherOption
WithCatalogWatcherInterval sets the catalog polling interval.
func WithCatalogWatcherLogger ¶
func WithCatalogWatcherLogger(logger *slog.Logger) CatalogWatcherOption
WithCatalogWatcherLogger sets the logger for watcher background retries.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine holds in-memory metadata of routes and provides timestamp generation.
func NewEngineWithDefaultRoute ¶
func NewEngineWithDefaultRoute() *Engine
NewEngineWithDefaultRoute creates an Engine and registers a default route covering the full keyspace with a default group ID. The default route is also recorded in the M2 history ring as the version-0 snapshot so transactions that observed catalogVersion = 0 can resolve their read-set owner through SnapshotAt(0).
func NewEngineWithThreshold ¶
NewEngineWithThreshold creates an Engine and sets a threshold for hotspot detection. A non-zero threshold enables automatic range splitting when the number of accesses to a range exceeds the threshold.
func (*Engine) ApplySnapshot ¶
func (e *Engine) ApplySnapshot(snapshot CatalogSnapshot) error
ApplySnapshot atomically replaces all in-memory routes with the provided catalog snapshot when the snapshot version is newer.
func (*Engine) Current ¶
func (e *Engine) Current() (RouteHistorySnapshot, bool)
Current returns the route catalog snapshot at the engine's current catalogVersion. Returns (zero, false) when the history ring has not been initialised (bare-struct Engine). Used by the M3 Composed-1 cross-version-read fence (design doc §4.4) — the gate compares the txn's observed-version owner against the current owner so a route shift between BeginTxn and Commit is caught before it can produce a G1c anomaly across a cross-group MoveRange / SplitRange.
func (*Engine) GetIntersectingRoutes ¶
GetIntersectingRoutes returns all routes whose key ranges intersect with [start, end). A route [rStart, rEnd) intersects with [start, end) if: - rStart < end (or end is nil, meaning unbounded scan) - start < rEnd (or rEnd is nil, meaning unbounded route)
func (*Engine) HistoryDepth ¶
HistoryDepth returns the configured ring depth for diagnostics.
func (*Engine) NextTimestamp ¶
NextTimestamp returns a monotonic increasing timestamp.
func (*Engine) RecordAccess ¶
RecordAccess increases the access counter for the range containing key and splits the range if it turns into a hotspot. The load counter is updated atomically under a read lock to allow concurrent access recording. If the hotspot threshold is exceeded, RecordAccess acquires a full write lock and re-checks the condition before splitting to avoid races with concurrent splits.
func (*Engine) SetHistoryDepthForTest ¶
SetHistoryDepthForTest overrides the FIFO ring depth from outside the package. Test-only. Callers should set the depth before sharing the Engine with concurrent SnapshotAt/Current readers to avoid interleaving surprises around the eviction watermark, but the write itself is lock-protected (e.mu.Lock below) so it is safe to call from any goroutine that does not also expect a consistent SnapshotAt view across the depth change.
Exists so tests in the kv package can drive eviction-trigger scenarios without adding a constructor option just for tests (claude review on PR #894). Production code must use DefaultRouteHistoryDepth (32) or a future operator-exposed config knob.
Fails fast on depth <= 0 (coderabbit minor on PR #895): recordHistorySnapshotLocked's eviction path indexes historyOrder[0], so a zero/negative depth would surface as a confusing index-out-of-range deep in the apply path instead of at the misconfigured test seam. When shrinking depth below the current ring size, evict the excess oldest entries immediately rather than letting the next record see len(historyOrder) > historyDepth (gemini medium on PR #895 — without this trim, the next recordHistorySnapshotLocked's `make([]uint64, len-1, historyDepth)` would panic on len-1 > historyDepth).
func (*Engine) SnapshotAt ¶
func (e *Engine) SnapshotAt(v uint64) (RouteHistorySnapshot, bool)
SnapshotAt returns the route catalog snapshot recorded at version v. Returns (zero, false) when v is not in the ring — either because v is in the future (> catalogVersion), or because the FIFO ring has evicted v (it was older than the historyDepth-most-recent versions). The M3 Composed-1 gate (design doc §4.3) treats the not-found case as a hard error and triggers a coordinator retry, so retention depth is a liveness knob, not a safety knob.
func (*Engine) UpdateRoute ¶
UpdateRoute registers or updates a route for the given key range. Routes are stored sorted by Start.
type Route ¶
type Route struct {
// RouteID is the durable identifier assigned by route catalog.
// Zero means ephemeral/non-catalog routes.
RouteID uint64
// Start marks the inclusive beginning of the range.
Start []byte
// End marks the exclusive end of the range. nil means unbounded.
End []byte
// GroupID identifies the raft group for the range starting at Start.
GroupID uint64
// State tracks control-plane state for this route.
State RouteState
// Load tracks the number of accesses served by this range.
Load uint64
}
Route represents a mapping from a key range to a raft group. Ranges are right half-open intervals: [Start, End). Start is inclusive and End is exclusive. A nil End denotes an unbounded interval extending to positive infinity.
type RouteDescriptor ¶
type RouteDescriptor struct {
RouteID uint64
Start []byte
End []byte
GroupID uint64
State RouteState
ParentRouteID uint64
}
RouteDescriptor is the durable representation of a route.
func CloneRouteDescriptor ¶
func CloneRouteDescriptor(route RouteDescriptor) RouteDescriptor
CloneRouteDescriptor returns a deep copy of route.
func DecodeRouteDescriptor ¶
func DecodeRouteDescriptor(raw []byte) (RouteDescriptor, error)
DecodeRouteDescriptor deserializes a route descriptor record.
type RouteHistorySnapshot ¶
type RouteHistorySnapshot struct {
// contains filtered or unexported fields
}
RouteHistorySnapshot is a point-in-time view of the route catalog at a specific version. Returned by Engine.SnapshotAt for the M3 Composed-1 commit-time gate. Carries an immutable copy of the catalog's routes at the recorded version so a caller can resolve ownership without holding the Engine lock.
func (RouteHistorySnapshot) OwnerOf ¶
func (s RouteHistorySnapshot) OwnerOf(key []byte) (uint64, bool)
OwnerOf returns the Raft group ID that owned key at this snapshot's version. Returns (0, false) when no route covers key (the pre-bootstrap state or an explicitly-uncovered range). Mirrors Engine.GetRoute's right-half-open interval semantics but against the historical snapshot, not the live engine state.
Routes are sorted by Start (recordHistorySnapshotLocked clones from e.routes, which Engine.UpdateRoute / routesFromCatalog keep sorted), so the scan can break the moment key < r.Start — every later route has a strictly greater Start and cannot cover key either. This matters because M3 puts OwnerOf on every txn commit's apply path (claude review on PR #894 — break-vs-continue lifts the worst-case scan from O(N) to "first non-covering gap" without changing the resolution semantics).
func (RouteHistorySnapshot) Version ¶
func (s RouteHistorySnapshot) Version() uint64
Version returns the catalog version this snapshot was recorded at.
type RouteState ¶
type RouteState byte
RouteState describes the control-plane state of a route.
const ( // RouteStateActive is a normal serving route. RouteStateActive RouteState = iota // RouteStateWriteFenced blocks writes during cutover. RouteStateWriteFenced // RouteStateMigratingSource means range data is being copied out. RouteStateMigratingSource // RouteStateMigratingTarget means range data is being copied in. RouteStateMigratingTarget )