distribution

package
v0.0.0-...-a779712 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2026 License: AGPL-3.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCatalogStoreRequired        = errors.New("catalog store is required")
	ErrCatalogVersionMismatch      = errors.New("catalog version mismatch")
	ErrCatalogVersionOverflow      = errors.New("catalog version overflow")
	ErrCatalogRouteIDOverflow      = errors.New("catalog route id overflow")
	ErrCatalogRouteIDRequired      = errors.New("catalog route id is required")
	ErrCatalogGroupIDRequired      = errors.New("catalog group id is required")
	ErrCatalogDuplicateRouteID     = errors.New("catalog route id must be unique")
	ErrCatalogInvalidRouteRange    = errors.New("catalog route range is invalid")
	ErrCatalogInvalidVersionRecord = errors.New("catalog version record is invalid")
	ErrCatalogInvalidNextRouteID   = errors.New("catalog next route id record is invalid")
	ErrCatalogInvalidRouteRecord   = errors.New("catalog route record is invalid")
	ErrCatalogInvalidRouteState    = errors.New("catalog route state is invalid")
	ErrCatalogInvalidRouteKey      = errors.New("catalog route key is invalid")
	ErrCatalogRouteKeyIDMismatch   = errors.New("catalog route key and record route id mismatch")
)
View Source
var (
	ErrEngineSnapshotVersionStale = errors.New("engine snapshot version is stale")
	ErrEngineSnapshotDuplicateID  = errors.New("engine snapshot has duplicate route id")
	ErrEngineSnapshotRouteOverlap = errors.New("engine snapshot has overlapping routes")
	ErrEngineSnapshotRouteOrder   = errors.New("engine snapshot has invalid route order")
)
View Source
var ErrEngineRequired = errors.New("engine is required")

Functions

func CatalogNextRouteIDKey

func CatalogNextRouteIDKey() []byte

CatalogNextRouteIDKey returns the reserved key used for next route id storage.

func CatalogRouteIDFromKey

func CatalogRouteIDFromKey(key []byte) (uint64, bool)

CatalogRouteIDFromKey parses the route ID from a catalog route key.

func CatalogRouteKey

func CatalogRouteKey(routeID uint64) []byte

CatalogRouteKey returns the reserved key used for a route descriptor.

func CatalogVersionKey

func CatalogVersionKey() []byte

CatalogVersionKey returns the reserved key used for catalog version storage.

func CloneBytes

func CloneBytes(b []byte) []byte

CloneBytes returns a copied byte slice.

func DecodeCatalogNextRouteID

func DecodeCatalogNextRouteID(raw []byte) (uint64, error)

DecodeCatalogNextRouteID deserializes a next route id record.

func DecodeCatalogVersion

func DecodeCatalogVersion(raw []byte) (uint64, error)

DecodeCatalogVersion deserializes a catalog version record.

func EncodeCatalogNextRouteID

func EncodeCatalogNextRouteID(nextRouteID uint64) []byte

EncodeCatalogNextRouteID serializes a next route id record.

func EncodeCatalogVersion

func EncodeCatalogVersion(version uint64) []byte

EncodeCatalogVersion serializes a catalog version record.

func EncodeRouteDescriptor

func EncodeRouteDescriptor(route RouteDescriptor) ([]byte, error)

EncodeRouteDescriptor serializes a route descriptor record.

func IsCatalogRouteKey

func IsCatalogRouteKey(key []byte) bool

IsCatalogRouteKey reports whether key belongs to the route catalog keyspace.

func NextRouteIDFloor

func NextRouteIDFloor(routes []RouteDescriptor) (uint64, error)

NextRouteIDFloor returns the minimum valid next route ID for routes. It is shared by catalog persistence and split planning to keep route-ID allocation rules consistent.

func RunCatalogWatcher

func RunCatalogWatcher(ctx context.Context, catalog *CatalogStore, engine *Engine, logger *slog.Logger) error

RunCatalogWatcher runs CatalogWatcher with optional logger override.

Types

type CatalogSnapshot

type CatalogSnapshot struct {
	Version uint64
	Routes  []RouteDescriptor
	ReadTS  uint64
}

CatalogSnapshot is a point-in-time snapshot of the route catalog.

func EnsureCatalogSnapshot

func EnsureCatalogSnapshot(ctx context.Context, catalog *CatalogStore, engine *Engine) (CatalogSnapshot, error)

EnsureCatalogSnapshot makes engine and durable catalog consistent at startup.

If the durable catalog is empty (version 0, no routes), the current in-memory engine routes are persisted as the initial catalog snapshot with generated non-zero RouteIDs. Then the resolved snapshot is applied back to engine.

type CatalogStore

type CatalogStore struct {
	// contains filtered or unexported fields
}

CatalogStore provides persistence helpers for route catalog state.

func NewCatalogStore

func NewCatalogStore(st store.MVCCStore) *CatalogStore

NewCatalogStore creates a route catalog persistence helper.

func (*CatalogStore) NextRouteID

func (s *CatalogStore) NextRouteID(ctx context.Context) (uint64, error)

NextRouteID reads the next route id counter from catalog metadata.

func (*CatalogStore) NextRouteIDAt

func (s *CatalogStore) NextRouteIDAt(ctx context.Context, ts uint64) (uint64, error)

NextRouteIDAt reads the next route id counter at a given snapshot timestamp.

func (*CatalogStore) Save

func (s *CatalogStore) Save(ctx context.Context, expectedVersion uint64, routes []RouteDescriptor) (CatalogSnapshot, error)

Save updates the route catalog using optimistic version checks and bumps the catalog version by exactly one on success.

func (*CatalogStore) Snapshot

func (s *CatalogStore) Snapshot(ctx context.Context) (CatalogSnapshot, error)

Snapshot reads a consistent route catalog snapshot at the store's latest known commit timestamp.

func (*CatalogStore) Version

func (s *CatalogStore) Version(ctx context.Context) (uint64, error)

Version reads only the durable catalog version at the latest commit timestamp.

type CatalogWatcher

type CatalogWatcher struct {
	// contains filtered or unexported fields
}

CatalogWatcher periodically refreshes Engine from durable catalog snapshots.

func NewCatalogWatcher

func NewCatalogWatcher(catalog *CatalogStore, engine *Engine, opts ...CatalogWatcherOption) *CatalogWatcher

NewCatalogWatcher creates a watcher that polls the durable route catalog and applies newer snapshots to the in-memory engine.

func (*CatalogWatcher) Run

func (w *CatalogWatcher) Run(ctx context.Context) error

Run starts polling and only returns when ctx is canceled or initialization requirements are not met. Snapshot read/apply failures are retried.

func (*CatalogWatcher) SyncOnce

func (w *CatalogWatcher) SyncOnce(ctx context.Context) error

SyncOnce applies the latest durable snapshot when its version is newer than the current engine version.

type CatalogWatcherOption

type CatalogWatcherOption func(*CatalogWatcher)

CatalogWatcherOption customizes CatalogWatcher behavior.

func WithCatalogWatcherInterval

func WithCatalogWatcherInterval(interval time.Duration) CatalogWatcherOption

WithCatalogWatcherInterval sets the catalog polling interval.

func WithCatalogWatcherLogger

func WithCatalogWatcherLogger(logger *slog.Logger) CatalogWatcherOption

WithCatalogWatcherLogger sets the logger for watcher background retries.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine holds in-memory metadata of routes and provides timestamp generation.

func NewEngine

func NewEngine() *Engine

NewEngine creates an Engine with no hotspot splitting.

func NewEngineWithDefaultRoute

func NewEngineWithDefaultRoute() *Engine

NewEngineWithDefaultRoute creates an Engine and registers a default route covering the full keyspace with a default group ID.

func NewEngineWithThreshold

func NewEngineWithThreshold(threshold uint64) *Engine

NewEngineWithThreshold creates an Engine and sets a threshold for hotspot detection. A non-zero threshold enables automatic range splitting when the number of accesses to a range exceeds the threshold.

func (*Engine) ApplySnapshot

func (e *Engine) ApplySnapshot(snapshot CatalogSnapshot) error

ApplySnapshot atomically replaces all in-memory routes with the provided catalog snapshot when the snapshot version is newer.

func (*Engine) GetIntersectingRoutes

func (e *Engine) GetIntersectingRoutes(start, end []byte) []Route

GetIntersectingRoutes returns all routes whose key ranges intersect with [start, end). A route [rStart, rEnd) intersects with [start, end) if: - rStart < end (or end is nil, meaning unbounded scan) - start < rEnd (or rEnd is nil, meaning unbounded route)

func (*Engine) GetRoute

func (e *Engine) GetRoute(key []byte) (Route, bool)

GetRoute finds a route for the given key using right half-open intervals.

func (*Engine) NextTimestamp

func (e *Engine) NextTimestamp() uint64

NextTimestamp returns a monotonic increasing timestamp.

func (*Engine) RecordAccess

func (e *Engine) RecordAccess(key []byte)

RecordAccess increases the access counter for the range containing key and splits the range if it turns into a hotspot. The load counter is updated atomically under a read lock to allow concurrent access recording. If the hotspot threshold is exceeded, RecordAccess acquires a full write lock and re-checks the condition before splitting to avoid races with concurrent splits.

func (*Engine) Stats

func (e *Engine) Stats() []Route

Stats returns a snapshot of current ranges and their load counters.

func (*Engine) UpdateRoute

func (e *Engine) UpdateRoute(start, end []byte, group uint64)

UpdateRoute registers or updates a route for the given key range. Routes are stored sorted by Start.

func (*Engine) Version

func (e *Engine) Version() uint64

Version returns current route catalog version applied to the engine.

type Route

type Route struct {
	// RouteID is the durable identifier assigned by route catalog.
	// Zero means ephemeral/non-catalog routes.
	RouteID uint64
	// Start marks the inclusive beginning of the range.
	Start []byte
	// End marks the exclusive end of the range. nil means unbounded.
	End []byte
	// GroupID identifies the raft group for the range starting at Start.
	GroupID uint64
	// State tracks control-plane state for this route.
	State RouteState
	// Load tracks the number of accesses served by this range.
	Load uint64
}

Route represents a mapping from a key range to a raft group. Ranges are right half-open intervals: [Start, End). Start is inclusive and End is exclusive. A nil End denotes an unbounded interval extending to positive infinity.

type RouteDescriptor

type RouteDescriptor struct {
	RouteID       uint64
	Start         []byte
	End           []byte
	GroupID       uint64
	State         RouteState
	ParentRouteID uint64
}

RouteDescriptor is the durable representation of a route.

func CloneRouteDescriptor

func CloneRouteDescriptor(route RouteDescriptor) RouteDescriptor

CloneRouteDescriptor returns a deep copy of route.

func DecodeRouteDescriptor

func DecodeRouteDescriptor(raw []byte) (RouteDescriptor, error)

DecodeRouteDescriptor deserializes a route descriptor record.

type RouteState

type RouteState byte

RouteState describes the control-plane state of a route.

const (
	// RouteStateActive is a normal serving route.
	RouteStateActive RouteState = iota
	// RouteStateWriteFenced blocks writes during cutover.
	RouteStateWriteFenced
	// RouteStateMigratingSource means range data is being copied out.
	RouteStateMigratingSource
	// RouteStateMigratingTarget means range data is being copied in.
	RouteStateMigratingTarget
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL