Documentation
¶
Index ¶
- Variables
- func CatalogNextRouteIDKey() []byte
- func CatalogRouteIDFromKey(key []byte) (uint64, bool)
- func CatalogRouteKey(routeID uint64) []byte
- func CatalogVersionKey() []byte
- func CloneBytes(b []byte) []byte
- func DecodeCatalogNextRouteID(raw []byte) (uint64, error)
- func DecodeCatalogVersion(raw []byte) (uint64, error)
- func EncodeCatalogNextRouteID(nextRouteID uint64) []byte
- func EncodeCatalogVersion(version uint64) []byte
- func EncodeRouteDescriptor(route RouteDescriptor) ([]byte, error)
- func IsCatalogRouteKey(key []byte) bool
- func NextRouteIDFloor(routes []RouteDescriptor) (uint64, error)
- func RunCatalogWatcher(ctx context.Context, catalog *CatalogStore, engine *Engine, ...) error
- type CatalogSnapshot
- type CatalogStore
- func (s *CatalogStore) NextRouteID(ctx context.Context) (uint64, error)
- func (s *CatalogStore) NextRouteIDAt(ctx context.Context, ts uint64) (uint64, error)
- func (s *CatalogStore) Save(ctx context.Context, expectedVersion uint64, routes []RouteDescriptor) (CatalogSnapshot, error)
- func (s *CatalogStore) Snapshot(ctx context.Context) (CatalogSnapshot, error)
- func (s *CatalogStore) Version(ctx context.Context) (uint64, error)
- type CatalogWatcher
- type CatalogWatcherOption
- type Engine
- func (e *Engine) ApplySnapshot(snapshot CatalogSnapshot) error
- func (e *Engine) GetIntersectingRoutes(start, end []byte) []Route
- func (e *Engine) GetRoute(key []byte) (Route, bool)
- func (e *Engine) NextTimestamp() uint64
- func (e *Engine) RecordAccess(key []byte)
- func (e *Engine) Stats() []Route
- func (e *Engine) UpdateRoute(start, end []byte, group uint64)
- func (e *Engine) Version() uint64
- type Route
- type RouteDescriptor
- type RouteState
Constants ¶
This section is empty.
Variables ¶
var ( ErrCatalogStoreRequired = errors.New("catalog store is required") ErrCatalogVersionMismatch = errors.New("catalog version mismatch") ErrCatalogVersionOverflow = errors.New("catalog version overflow") ErrCatalogRouteIDOverflow = errors.New("catalog route id overflow") ErrCatalogRouteIDRequired = errors.New("catalog route id is required") ErrCatalogGroupIDRequired = errors.New("catalog group id is required") ErrCatalogDuplicateRouteID = errors.New("catalog route id must be unique") ErrCatalogInvalidRouteRange = errors.New("catalog route range is invalid") ErrCatalogInvalidVersionRecord = errors.New("catalog version record is invalid") ErrCatalogInvalidNextRouteID = errors.New("catalog next route id record is invalid") ErrCatalogInvalidRouteRecord = errors.New("catalog route record is invalid") ErrCatalogInvalidRouteState = errors.New("catalog route state is invalid") ErrCatalogInvalidRouteKey = errors.New("catalog route key is invalid") ErrCatalogRouteKeyIDMismatch = errors.New("catalog route key and record route id mismatch") )
var ( ErrEngineSnapshotVersionStale = errors.New("engine snapshot version is stale") ErrEngineSnapshotDuplicateID = errors.New("engine snapshot has duplicate route id") ErrEngineSnapshotRouteOverlap = errors.New("engine snapshot has overlapping routes") ErrEngineSnapshotRouteOrder = errors.New("engine snapshot has invalid route order") )
var ErrEngineRequired = errors.New("engine is required")
Functions ¶
func CatalogNextRouteIDKey ¶
func CatalogNextRouteIDKey() []byte
CatalogNextRouteIDKey returns the reserved key used for next route id storage.
func CatalogRouteIDFromKey ¶
CatalogRouteIDFromKey parses the route ID from a catalog route key.
func CatalogRouteKey ¶
CatalogRouteKey returns the reserved key used for a route descriptor.
func CatalogVersionKey ¶
func CatalogVersionKey() []byte
CatalogVersionKey returns the reserved key used for catalog version storage.
func DecodeCatalogNextRouteID ¶
DecodeCatalogNextRouteID deserializes a next route id record.
func DecodeCatalogVersion ¶
DecodeCatalogVersion deserializes a catalog version record.
func EncodeCatalogNextRouteID ¶
EncodeCatalogNextRouteID serializes a next route id record.
func EncodeCatalogVersion ¶
EncodeCatalogVersion serializes a catalog version record.
func EncodeRouteDescriptor ¶
func EncodeRouteDescriptor(route RouteDescriptor) ([]byte, error)
EncodeRouteDescriptor serializes a route descriptor record.
func IsCatalogRouteKey ¶
IsCatalogRouteKey reports whether key belongs to the route catalog keyspace.
func NextRouteIDFloor ¶
func NextRouteIDFloor(routes []RouteDescriptor) (uint64, error)
NextRouteIDFloor returns the minimum valid next route ID for routes. It is shared by catalog persistence and split planning to keep route-ID allocation rules consistent.
func RunCatalogWatcher ¶
func RunCatalogWatcher(ctx context.Context, catalog *CatalogStore, engine *Engine, logger *slog.Logger) error
RunCatalogWatcher runs CatalogWatcher with optional logger override.
Types ¶
type CatalogSnapshot ¶
type CatalogSnapshot struct {
Version uint64
Routes []RouteDescriptor
ReadTS uint64
}
CatalogSnapshot is a point-in-time snapshot of the route catalog.
func EnsureCatalogSnapshot ¶
func EnsureCatalogSnapshot(ctx context.Context, catalog *CatalogStore, engine *Engine) (CatalogSnapshot, error)
EnsureCatalogSnapshot makes engine and durable catalog consistent at startup.
If the durable catalog is empty (version 0, no routes), the current in-memory engine routes are persisted as the initial catalog snapshot with generated non-zero RouteIDs. Then the resolved snapshot is applied back to engine.
type CatalogStore ¶
type CatalogStore struct {
// contains filtered or unexported fields
}
CatalogStore provides persistence helpers for route catalog state.
func NewCatalogStore ¶
func NewCatalogStore(st store.MVCCStore) *CatalogStore
NewCatalogStore creates a route catalog persistence helper.
func (*CatalogStore) NextRouteID ¶
func (s *CatalogStore) NextRouteID(ctx context.Context) (uint64, error)
NextRouteID reads the next route id counter from catalog metadata.
func (*CatalogStore) NextRouteIDAt ¶
NextRouteIDAt reads the next route id counter at a given snapshot timestamp.
func (*CatalogStore) Save ¶
func (s *CatalogStore) Save(ctx context.Context, expectedVersion uint64, routes []RouteDescriptor) (CatalogSnapshot, error)
Save updates the route catalog using optimistic version checks and bumps the catalog version by exactly one on success.
func (*CatalogStore) Snapshot ¶
func (s *CatalogStore) Snapshot(ctx context.Context) (CatalogSnapshot, error)
Snapshot reads a consistent route catalog snapshot at the store's latest known commit timestamp.
type CatalogWatcher ¶
type CatalogWatcher struct {
// contains filtered or unexported fields
}
CatalogWatcher periodically refreshes Engine from durable catalog snapshots.
func NewCatalogWatcher ¶
func NewCatalogWatcher(catalog *CatalogStore, engine *Engine, opts ...CatalogWatcherOption) *CatalogWatcher
NewCatalogWatcher creates a watcher that polls the durable route catalog and applies newer snapshots to the in-memory engine.
type CatalogWatcherOption ¶
type CatalogWatcherOption func(*CatalogWatcher)
CatalogWatcherOption customizes CatalogWatcher behavior.
func WithCatalogWatcherInterval ¶
func WithCatalogWatcherInterval(interval time.Duration) CatalogWatcherOption
WithCatalogWatcherInterval sets the catalog polling interval.
func WithCatalogWatcherLogger ¶
func WithCatalogWatcherLogger(logger *slog.Logger) CatalogWatcherOption
WithCatalogWatcherLogger sets the logger for watcher background retries.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine holds in-memory metadata of routes and provides timestamp generation.
func NewEngineWithDefaultRoute ¶
func NewEngineWithDefaultRoute() *Engine
NewEngineWithDefaultRoute creates an Engine and registers a default route covering the full keyspace with a default group ID.
func NewEngineWithThreshold ¶
NewEngineWithThreshold creates an Engine and sets a threshold for hotspot detection. A non-zero threshold enables automatic range splitting when the number of accesses to a range exceeds the threshold.
func (*Engine) ApplySnapshot ¶
func (e *Engine) ApplySnapshot(snapshot CatalogSnapshot) error
ApplySnapshot atomically replaces all in-memory routes with the provided catalog snapshot when the snapshot version is newer.
func (*Engine) GetIntersectingRoutes ¶
GetIntersectingRoutes returns all routes whose key ranges intersect with [start, end). A route [rStart, rEnd) intersects with [start, end) if: - rStart < end (or end is nil, meaning unbounded scan) - start < rEnd (or rEnd is nil, meaning unbounded route)
func (*Engine) NextTimestamp ¶
NextTimestamp returns a monotonic increasing timestamp.
func (*Engine) RecordAccess ¶
RecordAccess increases the access counter for the range containing key and splits the range if it turns into a hotspot. The load counter is updated atomically under a read lock to allow concurrent access recording. If the hotspot threshold is exceeded, RecordAccess acquires a full write lock and re-checks the condition before splitting to avoid races with concurrent splits.
func (*Engine) UpdateRoute ¶
UpdateRoute registers or updates a route for the given key range. Routes are stored sorted by Start.
type Route ¶
type Route struct {
// RouteID is the durable identifier assigned by route catalog.
// Zero means ephemeral/non-catalog routes.
RouteID uint64
// Start marks the inclusive beginning of the range.
Start []byte
// End marks the exclusive end of the range. nil means unbounded.
End []byte
// GroupID identifies the raft group for the range starting at Start.
GroupID uint64
// State tracks control-plane state for this route.
State RouteState
// Load tracks the number of accesses served by this range.
Load uint64
}
Route represents a mapping from a key range to a raft group. Ranges are right half-open intervals: [Start, End). Start is inclusive and End is exclusive. A nil End denotes an unbounded interval extending to positive infinity.
type RouteDescriptor ¶
type RouteDescriptor struct {
RouteID uint64
Start []byte
End []byte
GroupID uint64
State RouteState
ParentRouteID uint64
}
RouteDescriptor is the durable representation of a route.
func CloneRouteDescriptor ¶
func CloneRouteDescriptor(route RouteDescriptor) RouteDescriptor
CloneRouteDescriptor returns a deep copy of route.
func DecodeRouteDescriptor ¶
func DecodeRouteDescriptor(raw []byte) (RouteDescriptor, error)
DecodeRouteDescriptor deserializes a route descriptor record.
type RouteState ¶
type RouteState byte
RouteState describes the control-plane state of a route.
const ( // RouteStateActive is a normal serving route. RouteStateActive RouteState = iota // RouteStateWriteFenced blocks writes during cutover. RouteStateWriteFenced // RouteStateMigratingSource means range data is being copied out. RouteStateMigratingSource // RouteStateMigratingTarget means range data is being copied in. RouteStateMigratingTarget )