kv

package
v0.0.0-...-07ff07d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: AGPL-3.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// DynamoTableMetaPrefix prefixes DynamoDB table metadata keys.
	DynamoTableMetaPrefix = "!ddb|meta|table|"
	// DynamoTableGenerationPrefix prefixes DynamoDB table generation keys.
	DynamoTableGenerationPrefix = "!ddb|meta|gen|"
	// DynamoItemPrefix prefixes DynamoDB item storage keys.
	DynamoItemPrefix = "!ddb|item|"
	// DynamoGSIPrefix prefixes DynamoDB GSI storage keys.
	DynamoGSIPrefix = "!ddb|gsi|"
)
View Source
const HLCLogicalBits = hlcLogicalBits

HLCLogicalBits is the number of low bits in an HLC timestamp reserved for the in-memory logical counter (vs the upper bits which encode the Raft-agreed wall-clock millis). Exported so downstream tools — admin dashboard ISO-8601 formatting being the motivating case — can recover the physical half without hard-coding a magic number that silently drifts when the layout changes (Claude Issue 4 on PR #658).

View Source
const (
	// TxnKeyPrefix is the common prefix shared by all transaction internal
	// key namespaces. All per-namespace prefixes below are derived from it.
	// NOTE: store/store.go duplicates this literal as txnInternalKeyPrefix
	// because an import cycle prevents store from importing kv.
	TxnKeyPrefix = "!txn|"
)
View Source
const TxnMetaPrefix = txnMetaPrefix

TxnMetaPrefix is the key prefix used for transaction metadata mutations.

Variables

View Source
var (
	ErrTxnMetaMissing        = errors.New("txn meta missing")
	ErrTxnInvalidMeta        = errors.New("txn meta invalid")
	ErrTxnLocked             = errors.New("txn locked")
	ErrTxnCommitTSRequired   = errors.New("txn commit ts required")
	ErrTxnAlreadyCommitted   = errors.New("txn already committed")
	ErrTxnAlreadyAborted     = errors.New("txn already aborted")
	ErrTxnPrimaryKeyRequired = errors.New("txn primary key required")
)
View Source
var ErrCrossShardMutationBatchNotSupported = errors.New("cross-shard mutation batches are not supported")
View Source
var ErrCrossShardTransactionNotSupported = errors.New("cross-shard transactions are not supported")
View Source
var ErrInvalidRequest = errors.New("invalid request")
View Source
var ErrLeaderNotFound = errors.New("leader not found")
View Source
var ErrNotImplemented = errors.New("not implemented")
View Source
var ErrUnknownRequestType = errors.New("unknown request type")

Functions

func EncodeTxnMeta

func EncodeTxnMeta(m TxnMeta) []byte

func ExtractTxnUserKey

func ExtractTxnUserKey(key []byte) []byte

ExtractTxnUserKey returns the logical user key embedded in a transaction- internal key such as !txn|lock|, !txn|cmt|, or !txn|meta|. It returns nil when the key does not use a transaction-internal namespace or is malformed.

func LeaseReadForKeyThrough

func LeaseReadForKeyThrough(c Coordinator, ctx context.Context, key []byte) (uint64, error)

LeaseReadForKeyThrough is the key-routed counterpart of LeaseReadThrough.

func LeaseReadThrough

func LeaseReadThrough(c Coordinator, ctx context.Context) (uint64, error)

LeaseReadThrough is a helper that calls LeaseRead when the coordinator supports it, falling back to LinearizableRead otherwise. Adapter call sites use this so they don't have to repeat the type-assertion dance.

func MaxLatestCommitTS

func MaxLatestCommitTS(ctx context.Context, st store.MVCCStore, keys [][]byte) (uint64, error)

MaxLatestCommitTS returns the maximum commit timestamp for the provided keys.

Missing keys are ignored. If any LatestCommitTS lookup returns an error, the error is returned to the caller.

func NewTxnLockedError

func NewTxnLockedError(key []byte) error

func NewTxnLockedErrorWithDetail

func NewTxnLockedErrorWithDetail(key []byte, detail string) error

func TxnLockedDetails

func TxnLockedDetails(err error) ([]byte, string, bool)

Types

type ActiveTimestampToken

type ActiveTimestampToken struct {
	// contains filtered or unexported fields
}

ActiveTimestampToken releases one tracked timestamp when the owning operation completes.

func (*ActiveTimestampToken) Release

func (t *ActiveTimestampToken) Release()

type ActiveTimestampTracker

type ActiveTimestampTracker struct {
	// contains filtered or unexported fields
}

ActiveTimestampTracker tracks in-flight read or transaction timestamps that must remain readable while background compaction is running.

func NewActiveTimestampTracker

func NewActiveTimestampTracker() *ActiveTimestampTracker

func (*ActiveTimestampTracker) Oldest

func (t *ActiveTimestampTracker) Oldest() uint64

func (*ActiveTimestampTracker) Pin

type Coordinate

type Coordinate struct {
	// contains filtered or unexported fields
}

func NewCoordinatorWithEngine

func NewCoordinatorWithEngine(txm Transactional, engine raftengine.Engine, opts ...CoordinatorOption) *Coordinate

func (*Coordinate) Clock

func (c *Coordinate) Clock() *HLC

func (*Coordinate) Close

func (c *Coordinate) Close() error

Close releases any engine-side registrations (currently the leader-loss callback) held by this Coordinate. It is safe to call on a nil receiver and multiple times. Owners whose lifetime matches the engine's do not need to call Close; owners who discard the Coordinate before closing the engine MUST.

func (*Coordinate) Dispatch

func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)

func (*Coordinate) IsLeader

func (c *Coordinate) IsLeader() bool

func (*Coordinate) IsLeaderAcceptingWrites

func (c *Coordinate) IsLeaderAcceptingWrites() bool

IsLeaderAcceptingWrites reports whether this node is leader and not currently transferring leadership. Background proposers should gate on this to avoid piling up dropped proposals while a transfer is in flight.

func (*Coordinate) IsLeaderForKey

func (c *Coordinate) IsLeaderForKey(_ []byte) bool

func (*Coordinate) LeaseRead

func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error)

LeaseRead returns a read fence backed by a leader-local lease when available, falling back to a full LinearizableRead when no fast path is live or the engine does not implement LeaseProvider.

The PRIMARY lease path is maintained inside the engine from ongoing MsgAppResp / MsgHeartbeatResp traffic, so that path does not rely on callers sampling time.Now() before the slow path to "extend" a lease afterwards. The earlier pre-read sampling was racy under congestion: if a LinearizableRead took longer than LeaseDuration, the extension would land already expired and the lease never warmed up. The engine-driven anchor is refreshed every heartbeat independent of read latency.

The SECONDARY caller-side lease remains as a rollout fallback, still populated by the original pre-read sampling; it covers the narrow window between startup and the first quorum heartbeat round landing on the engine.

The returned index is the engine's current applied index (fast path) or the index returned by LinearizableRead (slow path). Callers that resolve timestamps via store.LastCommitTS may discard the value.

func (*Coordinate) LeaseReadForKey

func (c *Coordinate) LeaseReadForKey(ctx context.Context, _ []byte) (uint64, error)

func (*Coordinate) LinearizableRead

func (c *Coordinate) LinearizableRead(ctx context.Context) (uint64, error)

func (*Coordinate) LinearizableReadForKey

func (c *Coordinate) LinearizableReadForKey(ctx context.Context, _ []byte) (uint64, error)

func (*Coordinate) ProposeHLCLease

func (c *Coordinate) ProposeHLCLease(ctx context.Context, ceilingMs int64) error

ProposeHLCLease proposes a new physical ceiling to the Raft cluster. Only the current leader should call this; followers silently ignore proposals from non-leaders via Raft's leader-only write guarantee.

func (*Coordinate) RaftLeader

func (c *Coordinate) RaftLeader() string

RaftLeader returns the current leader's address as known by this node.

func (*Coordinate) RaftLeaderForKey

func (c *Coordinate) RaftLeaderForKey(_ []byte) string

func (*Coordinate) RunHLCLeaseRenewal

func (c *Coordinate) RunHLCLeaseRenewal(ctx context.Context)

RunHLCLeaseRenewal runs a background loop that periodically proposes a new physical ceiling to the Raft cluster while this node is the leader.

The ceiling is set to now + hlcPhysicalWindowMs (3 s) and is renewed every hlcRenewalInterval (1 s), mirroring TiDB's TSO window strategy. Because the window is always at least 2 s ahead of any real timestamp, a new leader will never issue timestamps that overlap with the previous leader's window.

RunHLCLeaseRenewal blocks until ctx is cancelled; call it in a goroutine.

func (*Coordinate) VerifyLeader

func (c *Coordinate) VerifyLeader() error

func (*Coordinate) VerifyLeaderForKey

func (c *Coordinate) VerifyLeaderForKey(_ []byte) error

type CoordinateResponse

type CoordinateResponse struct {
	CommitIndex uint64
}

type Coordinator

type Coordinator interface {
	Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
	IsLeader() bool
	VerifyLeader() error
	LinearizableRead(ctx context.Context) (uint64, error)
	RaftLeader() string
	IsLeaderForKey(key []byte) bool
	VerifyLeaderForKey(key []byte) error
	RaftLeaderForKey(key []byte) string
	Clock() *HLC
}

type CoordinatorOption

type CoordinatorOption func(*Coordinate)

CoordinatorOption is a functional option for Coordinate constructors.

func WithHLC

func WithHLC(hlc *HLC) CoordinatorOption

WithHLC sets a pre-created HLC on the coordinator. Use this together with NewKvFSMWithHLC so the FSM and coordinator share the same clock instance: the FSM advances physicalCeiling on every applied HLC lease entry, and the coordinator reads it inside Next() to floor new timestamps above the previous leader's committed window.

func WithLeaseReadObserver

func WithLeaseReadObserver(observer LeaseReadObserver) CoordinatorOption

WithLeaseReadObserver wires a LeaseReadObserver onto a Coordinate. This is the mechanism monitoring uses to surface the lease-hit ratio panel on the Redis hot-path dashboard (see the "Hot Path" row in monitoring/grafana/dashboards/elastickv-redis-summary.json).

Typed-nil guard: a caller passing a typed-nil pointer (e.g. `var o *myObserver; WithLeaseReadObserver(o)`) produces an interface value that is NOT equal to nil under the normal `!= nil` check, yet invoking ObserveLeaseRead would panic. Normalise here with reflect.Value.IsNil so the hot-path nil check in LeaseRead stays a single branch on a real nil interface.

type Elem

type Elem[T OP] struct {
	Op    T
	Key   []byte
	Value []byte
}

Elem is an element of a transaction.

type FSM

type FSM interface {
	raftengine.StateMachine
}

func NewKvFSMWithHLC

func NewKvFSMWithHLC(store store.MVCCStore, hlc *HLC) FSM

NewKvFSMWithHLC creates a KV FSM that updates hlc.physicalCeiling whenever a HLC lease entry is applied. The caller must pass the same *HLC instance to the coordinator so both sides share the agreed physical ceiling.

type FSMCompactRuntime

type FSMCompactRuntime struct {
	GroupID      uint64
	StatusReader RaftStatusProvider
	Store        store.MVCCStore
}

type FSMCompactor

type FSMCompactor struct {
	// contains filtered or unexported fields
}

func NewFSMCompactor

func NewFSMCompactor(runtimes []FSMCompactRuntime, opts ...FSMCompactorOption) *FSMCompactor

func (*FSMCompactor) Run

func (c *FSMCompactor) Run(ctx context.Context) error

func (*FSMCompactor) SyncOnce

func (c *FSMCompactor) SyncOnce(ctx context.Context) error

type FSMCompactorOption

type FSMCompactorOption func(*FSMCompactor)

func WithFSMCompactorActiveTimestampTracker

func WithFSMCompactorActiveTimestampTracker(tracker *ActiveTimestampTracker) FSMCompactorOption

func WithFSMCompactorInterval

func WithFSMCompactorInterval(interval time.Duration) FSMCompactorOption

func WithFSMCompactorLogger

func WithFSMCompactorLogger(logger *slog.Logger) FSMCompactorOption

func WithFSMCompactorRetentionWindow

func WithFSMCompactorRetentionWindow(window time.Duration) FSMCompactorOption

func WithFSMCompactorTimeout

func WithFSMCompactorTimeout(timeout time.Duration) FSMCompactorOption

type GRPCConnCache

type GRPCConnCache struct {
	// contains filtered or unexported fields
}

GRPCConnCache reuses gRPC connections per address. gRPC itself handles reconnection on transient failures; we only force a re-dial if the conn has already been closed (Shutdown).

func (*GRPCConnCache) Close

func (c *GRPCConnCache) Close() error

func (*GRPCConnCache) ConnFor

func (c *GRPCConnCache) ConnFor(addr string) (*grpc.ClientConn, error)

type HLC

type HLC struct {
	// contains filtered or unexported fields
}

HLC implements a hybrid logical clock where the physical part is agreed upon via Raft consensus and the logical counter is managed purely in memory.

Layout (ms | logical):

high 48 bits: wall clock milliseconds since Unix epoch  ← Raft-agreed physical part
low 16 bits : logical counter                           ← in-memory only

Physical ceiling (前半, consensus):

The leader periodically commits a HLC lease entry to the Raft log that
establishes an upper bound for the physical timestamp (physicalCeiling).
All nodes apply this entry via the FSM, advancing their local ceiling.
When a new leader is elected it inherits the committed ceiling from the
FSM state so it never issues timestamps below the previous leader's window.

Logical counter (後半, memory):

The 16-bit counter increments purely in memory on every Next() call within
the same millisecond. It resets to 0 whenever wall time advances, and
overflows by bumping the wall millisecond by one. No Raft round-trip is
needed for logical counter advancement.

func NewHLC

func NewHLC() *HLC

func (*HLC) Current

func (h *HLC) Current() uint64

Current returns the last issued or observed HLC value without advancing it. If no timestamp has been generated yet, it returns 0.

func (*HLC) Next

func (h *HLC) Next() uint64

Next returns the next hybrid logical timestamp.

Physical part (upper 48 bits): derived from the wall clock, but never less than the Raft-agreed physicalCeiling so that a newly elected leader cannot issue timestamps that collide with the previous leader's window.

Logical part (lower 16 bits): a pure in-memory counter that increments without any Raft round-trip. It resets to 0 whenever the physical millisecond advances and overflows by bumping the physical millisecond by one.

func (*HLC) Observe

func (h *HLC) Observe(ts uint64)

Observe bumps the local clock if a higher timestamp is seen.

func (*HLC) PhysicalCeiling

func (h *HLC) PhysicalCeiling() int64

PhysicalCeiling returns the last Raft-committed physical ceiling in Unix milliseconds. Returns 0 if no ceiling has been established yet.

func (*HLC) SetPhysicalCeiling

func (h *HLC) SetPhysicalCeiling(ms int64)

SetPhysicalCeiling atomically advances the Raft-agreed physical ceiling. It is called by the FSM whenever a HLC lease entry is applied to the log. The ceiling is monotonically increasing: calls with a smaller value are silently ignored.

type LeaderProxy

type LeaderProxy struct {
	// contains filtered or unexported fields
}

LeaderProxy forwards transactional requests to the current raft leader when the local node is not the leader.

func NewLeaderProxyWithEngine

func NewLeaderProxyWithEngine(engine raftengine.Engine, opts ...TransactionOption) *LeaderProxy

func (*LeaderProxy) Abort

func (p *LeaderProxy) Abort(reqs []*pb.Request) (*TransactionResponse, error)

func (*LeaderProxy) Close

func (p *LeaderProxy) Close() error

func (*LeaderProxy) Commit

func (p *LeaderProxy) Commit(reqs []*pb.Request) (*TransactionResponse, error)

type LeaderRoutedStore

type LeaderRoutedStore struct {
	// contains filtered or unexported fields
}

LeaderRoutedStore is an MVCCStore wrapper that serves reads from the local store only when leadership is verified; otherwise it proxies reads to the current leader via gRPC.

This is intended for single-raft-group deployments where the underlying store itself is not leader-aware (e.g. *store.MVCCStore).

Writes and maintenance operations are delegated to the local store.

func NewLeaderRoutedStore

func NewLeaderRoutedStore(local store.MVCCStore, coordinator Coordinator) *LeaderRoutedStore

func (*LeaderRoutedStore) ApplyMutations

func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error

func (*LeaderRoutedStore) ApplyMutationsRaft

func (s *LeaderRoutedStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error

ApplyMutationsRaft forwards to the local store's raft-apply variant. See store.MVCCStore for the durability contract.

func (*LeaderRoutedStore) Close

func (s *LeaderRoutedStore) Close() error

func (*LeaderRoutedStore) Compact

func (s *LeaderRoutedStore) Compact(ctx context.Context, minTS uint64) error

func (*LeaderRoutedStore) DeleteAt

func (s *LeaderRoutedStore) DeleteAt(ctx context.Context, key []byte, commitTS uint64) error

func (*LeaderRoutedStore) DeletePrefixAt

func (s *LeaderRoutedStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error

func (*LeaderRoutedStore) DeletePrefixAtRaft

func (s *LeaderRoutedStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error

DeletePrefixAtRaft forwards to the local store's raft-apply variant.

func (*LeaderRoutedStore) ExistsAt

func (s *LeaderRoutedStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, error)

func (*LeaderRoutedStore) ExpireAt

func (s *LeaderRoutedStore) ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS uint64) error

func (*LeaderRoutedStore) GetAt

func (s *LeaderRoutedStore) GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error)

func (*LeaderRoutedStore) GlobalLastCommitTS

func (s *LeaderRoutedStore) GlobalLastCommitTS(ctx context.Context) uint64

GlobalLastCommitTS returns the most recently committed HLC timestamp from the authoritative leader. On the leader this is the local LastCommitTS. On a follower the method issues a lightweight RPC (RawLatestCommitTS with an empty key) so callers obtain a non-stale snapshot — critical for ConsistentRead semantics where followers must not serve reads at a stale local watermark. Falls back to the local LastCommitTS on any error.

func (*LeaderRoutedStore) LastCommitTS

func (s *LeaderRoutedStore) LastCommitTS() uint64

func (*LeaderRoutedStore) LatestCommitTS

func (s *LeaderRoutedStore) LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error)

func (*LeaderRoutedStore) PutAt

func (s *LeaderRoutedStore) PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error

func (*LeaderRoutedStore) PutWithTTLAt

func (s *LeaderRoutedStore) PutWithTTLAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error

func (*LeaderRoutedStore) Restore

func (s *LeaderRoutedStore) Restore(buf io.Reader) error

func (*LeaderRoutedStore) ReverseScanAt

func (s *LeaderRoutedStore) ReverseScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)

func (*LeaderRoutedStore) ScanAt

func (s *LeaderRoutedStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)

func (*LeaderRoutedStore) Snapshot

func (s *LeaderRoutedStore) Snapshot() (store.Snapshot, error)

func (*LeaderRoutedStore) WriteConflictCountsByPrefix

func (s *LeaderRoutedStore) WriteConflictCountsByPrefix() map[string]uint64

WriteConflictCountsByPrefix delegates to the local MVCC store. The leader-routed wrapper does not add cross-group conflict detection of its own, so the node-local view IS the authoritative view.

type LeaseReadObserver

type LeaseReadObserver interface {
	// ObserveLeaseRead is called with hit=true when the lease fast path
	// served the read from local AppliedIndex, or hit=false when the
	// coordinator fell back to a full LinearizableRead (expired lease,
	// engine reported non-leader, or leader-loss callback raced with
	// the request).
	ObserveLeaseRead(hit bool)
}

LeaseReadObserver records lease-read fast-path vs slow-path outcomes without coupling kv to a concrete monitoring backend. It is called once per LeaseRead invocation that actually evaluates the lease (the initial type-assertion/LeaseDuration==0 short-circuits are NOT counted because they indicate the engine does not participate in lease reads at all).

Implementations MUST be safe for concurrent use and MUST NOT block; the observer is invoked on the Redis GET hot path.

type LeaseReadableCoordinator

type LeaseReadableCoordinator interface {
	LeaseRead(ctx context.Context) (uint64, error)
	LeaseReadForKey(ctx context.Context, key []byte) (uint64, error)
}

LeaseReadableCoordinator is the optional capability implemented by coordinators that participate in the leader-local lease read path (see docs/design/2026_04_20_implemented_lease_read.md). Callers that want lease reads should type-assert to this interface and fall back to LinearizableRead when the assertion fails, following the same pattern as raftengine.LeaseProvider. Keeping the lease methods OFF the Coordinator interface avoids breaking existing external implementations that predate the lease-read feature.

type LockResolver

type LockResolver struct {
	// contains filtered or unexported fields
}

LockResolver periodically scans for expired transaction locks and resolves them. This handles the case where secondary commit fails and leaves orphaned locks that no read path would discover (e.g., cold keys).

func NewLockResolver

func NewLockResolver(ss *ShardStore, groups map[uint64]*ShardGroup, log *slog.Logger) *LockResolver

NewLockResolver creates and starts a background lock resolver.

func (*LockResolver) Close

func (lr *LockResolver) Close()

Close stops the background resolver and waits for it to finish.

type OP

type OP int

OP is an operation type.

const (
	Put OP = iota
	Del
	// DelPrefix deletes all visible keys matching the prefix stored in Key.
	// An empty Key means "all keys". Transaction-internal keys are excluded.
	DelPrefix
)

Operation types.

type OperationGroup

type OperationGroup[T OP] struct {
	Elems []*Elem[T]
	IsTxn bool
	// StartTS is a logical timestamp captured at transaction begin.
	// It is ignored for non-transactional groups.
	StartTS uint64
	// CommitTS optionally pins the transaction commit timestamp.
	// Coordinators choose one automatically when this is zero.
	CommitTS uint64
	// ReadKeys carries the transaction's read set so the FSM can validate
	// read-write conflicts atomically with the commit.
	ReadKeys [][]byte
}

OperationGroup is a group of operations that should be executed atomically.

type PartitionResolver

type PartitionResolver interface {
	ResolveGroup(key []byte) (uint64, bool)

	// RecognisesPartitionedKey reports whether the key SHAPE is
	// one this resolver is responsible for. Implementations
	// answer based on prefix / structural inspection only — the
	// answer must NOT depend on any in-memory mapping that could
	// drift out of sync, otherwise the router cannot reliably
	// fail-closed for unresolved-but-recognised keys.
	RecognisesPartitionedKey(key []byte) bool
}

PartitionResolver maps a key to its owning Raft group when the key belongs to a partition-scheme keyspace (e.g. SQS HT-FIFO, where each (queue, partition) pair lives on a different group). ShardRouter consults the resolver before falling through to the byte-range engine, so partition routing can override the default shard-range layout without breaking the engine's non-overlapping- cover invariant.

Implementations must be safe for concurrent use — ResolveGroup is called on the request hot path. Returning (0, false) for a key the resolver does not recognise lets the router fall through to the engine. Returning (0, false) for a key the resolver DOES recognise (the partitioned shape matches but the queue is not in the map, or the partition is out of range) is also valid; the router uses RecognisesPartitionedKey to distinguish "not partitioned, fall through" from "partitioned but unresolved, fail closed".

The fail-closed split matters under partition-map drift / a partial rollout: without it, an unresolved partitioned key would silently land on the engine's SQS-catalog default group (because routeKey normalises every !sqs|... key to !sqs|route|global) instead of surfacing a routing error.

type ProposalObserver

type ProposalObserver interface {
	ObserveProposalFailure()
}

ProposalObserver records raft proposal failures for operational metrics.

type RaftStatusProvider

type RaftStatusProvider interface {
	Status() raftengine.Status
}

type ShardGroup

type ShardGroup struct {
	Engine raftengine.Engine
	Store  store.MVCCStore
	Txn    Transactional
	// contains filtered or unexported fields
}

type ShardRouter

type ShardRouter struct {
	// contains filtered or unexported fields
}

ShardRouter routes requests to multiple raft groups based on key ranges.

Cross-shard transactions are not supported. They require distributed coordination (for example, 2PC) to ensure atomicity.

Non-transactional request batches may still partially succeed across shards.

func NewShardRouter

func NewShardRouter(e *distribution.Engine) *ShardRouter

NewShardRouter creates a new router.

func (*ShardRouter) Abort

func (s *ShardRouter) Abort(reqs []*pb.Request) (*TransactionResponse, error)

Abort dispatches aborts to the correct raft group.

func (*ShardRouter) Commit

func (s *ShardRouter) Commit(reqs []*pb.Request) (*TransactionResponse, error)

func (*ShardRouter) Get

func (s *ShardRouter) Get(ctx context.Context, key []byte) ([]byte, error)

Get retrieves a key routed to the correct shard.

func (*ShardRouter) Register

func (s *ShardRouter) Register(group uint64, tm Transactional, st store.MVCCStore)

Register associates a raft group ID with its transactional manager and store.

func (*ShardRouter) ResolveGroup

func (s *ShardRouter) ResolveGroup(rawKey []byte) (uint64, bool)

ResolveGroup tries the partition resolver first (when installed), then falls through to the byte-range engine. Exposed at package scope so ShardedCoordinator's per-key helpers (groupForKey, routeAndGroupForKey, engineGroupIDForKey, groupMutations) can consult the same dispatch path Commit / Abort / Get use — without it those helpers would bypass the resolver and partitioned-FIFO traffic would silently mis-route through 2PC and the read paths.

The resolver runs on the RAW key before any user-key normalization. SQS keys in particular are collapsed to !sqs|route|global by routeKey to keep the engine's per-shard layout simple, but that collapse hides the partitioned-prefix information the resolver needs (issue: codex P1 / gemini high on PR #715). The engine still sees the post-normalization key, so legacy routing (catalog → !sqs|route|global → default group) stays unchanged.

Fail-closed for recognised-but-unresolved keys: when the resolver recognises a partitioned shape (RecognisesPartitionedKey == true) but cannot resolve the queue/partition pair (ResolveGroup returns ok=false), the router refuses to fall through to the engine. Otherwise the engine would route the partitioned key to !sqs|route|global's default group, silently mis-routing HT-FIFO traffic during partition-map drift or partial rollout (codex P1 round 2 on PR #715).

Returns (0, false) when neither the resolver nor the engine recognises the key. Caller surfaces this as an "unknown group" error so a partitioned-prefix key whose queue is missing from the resolver map fails closed rather than landing on whichever engine-default group happens to cover the raw bytes.

func (*ShardRouter) WithPartitionResolver

func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter

WithPartitionResolver installs a partition-keyspace resolver that is consulted before the byte-range engine on every dispatch. A nil resolver clears any previously-installed resolver. Returns the receiver so callers can chain.

Intended for use during startup, before the router begins handling requests. Interface assignment in Go is not atomic, so a call that races with a concurrent ResolveGroup in resolveGroup may produce a torn read; callers must wire the resolver once during construction (parseRuntimeConfig → NewShardedCoordinator → WithPartitionResolver) and treat any post-startup re-assignment as undefined behaviour.

type ShardStore

type ShardStore struct {
	// contains filtered or unexported fields
}

ShardStore routes MVCC reads to shard-specific stores and proxies to leaders when needed.

func NewShardStore

func NewShardStore(engine *distribution.Engine, groups map[uint64]*ShardGroup) *ShardStore

NewShardStore creates a sharded MVCC store wrapper.

func (*ShardStore) ApplyMutations

func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error

ApplyMutations applies a batch of mutations to the correct shard store.

All mutations must belong to the same shard. Cross-shard mutation batches are not supported.

func (*ShardStore) ApplyMutationsRaft

func (s *ShardStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error

ApplyMutationsRaft is the raft-apply variant; see store.MVCCStore for the durability contract. Only the FSM may call this method.

func (*ShardStore) Close

func (s *ShardStore) Close() error

func (*ShardStore) Compact

func (s *ShardStore) Compact(ctx context.Context, minTS uint64) error

func (*ShardStore) DeleteAt

func (s *ShardStore) DeleteAt(ctx context.Context, key []byte, commitTS uint64) error

func (*ShardStore) DeletePrefixAt

func (s *ShardStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error

DeletePrefixAt applies a prefix delete to every shard in the store.

func (*ShardStore) DeletePrefixAtRaft

func (s *ShardStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error

DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt.

func (*ShardStore) ExistsAt

func (s *ShardStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, error)

func (*ShardStore) ExpireAt

func (s *ShardStore) ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS uint64) error

func (*ShardStore) GetAt

func (s *ShardStore) GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error)

func (*ShardStore) LastCommitTS

func (s *ShardStore) LastCommitTS() uint64

func (*ShardStore) LatestCommitTS

func (s *ShardStore) LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error)

func (*ShardStore) PutAt

func (s *ShardStore) PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error

func (*ShardStore) PutWithTTLAt

func (s *ShardStore) PutWithTTLAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error

func (*ShardStore) Restore

func (s *ShardStore) Restore(_ io.Reader) error

func (*ShardStore) ReverseScanAt

func (s *ShardStore) ReverseScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)

func (*ShardStore) ScanAt

func (s *ShardStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)

ScanAt scans keys across shards at the given timestamp. Note: when the range spans multiple shards, each shard may have a different Raft apply position. This means the returned view is NOT a globally consistent snapshot — it is a best-effort point-in-time scan. Callers requiring cross-shard consistency should use a transaction or implement a cross-shard snapshot fence.

func (*ShardStore) Snapshot

func (s *ShardStore) Snapshot() (store.Snapshot, error)

func (*ShardStore) WriteConflictCountsByPrefix

func (s *ShardStore) WriteConflictCountsByPrefix() map[string]uint64

WriteConflictCountsByPrefix aggregates OCC conflict counts across every shard group owned by this ShardStore. Per-shard counts share the same "<kind>|<key_prefix>" label schema, so a simple sum gives the node-wide view. The result is always non-nil.

type ShardedCoordinator

type ShardedCoordinator struct {
	// contains filtered or unexported fields
}

ShardedCoordinator routes operations to shard-specific raft groups. It issues timestamps via a shared HLC and uses ShardRouter to dispatch.

func NewShardedCoordinator

func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*ShardGroup, defaultGroup uint64, clock *HLC, st store.MVCCStore) *ShardedCoordinator

NewShardedCoordinator builds a coordinator for the provided shard groups. The defaultGroup is used for non-keyed leader checks.

func (*ShardedCoordinator) Clock

func (c *ShardedCoordinator) Clock() *HLC

func (*ShardedCoordinator) Close

func (c *ShardedCoordinator) Close() error

Close releases per-shard engine-side registrations. Idempotent.

func (*ShardedCoordinator) Dispatch

func (*ShardedCoordinator) IsLeader

func (c *ShardedCoordinator) IsLeader() bool

func (*ShardedCoordinator) IsLeaderForKey

func (c *ShardedCoordinator) IsLeaderForKey(key []byte) bool

func (*ShardedCoordinator) LeaseRead

func (c *ShardedCoordinator) LeaseRead(ctx context.Context) (uint64, error)

LeaseRead routes through the default group's lease. See Coordinate.LeaseRead for semantics.

func (*ShardedCoordinator) LeaseReadForKey

func (c *ShardedCoordinator) LeaseReadForKey(ctx context.Context, key []byte) (uint64, error)

LeaseReadForKey performs the lease check on the shard group that owns key. Each group maintains its own lease since each group has independent leadership and term.

func (*ShardedCoordinator) LinearizableRead

func (c *ShardedCoordinator) LinearizableRead(ctx context.Context) (uint64, error)

func (*ShardedCoordinator) LinearizableReadForKey

func (c *ShardedCoordinator) LinearizableReadForKey(ctx context.Context, key []byte) (uint64, error)

func (*ShardedCoordinator) RaftLeader

func (c *ShardedCoordinator) RaftLeader() string

func (*ShardedCoordinator) RaftLeaderForKey

func (c *ShardedCoordinator) RaftLeaderForKey(key []byte) string

func (*ShardedCoordinator) RunHLCLeaseRenewal

func (c *ShardedCoordinator) RunHLCLeaseRenewal(ctx context.Context)

RunHLCLeaseRenewal periodically proposes a new physical ceiling to the default shard group's Raft cluster while this node is the leader of that group. This mirrors the single-shard Coordinate.RunHLCLeaseRenewal behaviour, ensuring the shared HLC ceiling is maintained in multi-shard deployments.

RunHLCLeaseRenewal blocks until ctx is cancelled; call it in a goroutine.

func (*ShardedCoordinator) VerifyLeader

func (c *ShardedCoordinator) VerifyLeader() error

func (*ShardedCoordinator) VerifyLeaderForKey

func (c *ShardedCoordinator) VerifyLeaderForKey(key []byte) error

func (*ShardedCoordinator) WithLeaseReadObserver

func (c *ShardedCoordinator) WithLeaseReadObserver(observer LeaseReadObserver) *ShardedCoordinator

WithLeaseReadObserver wires a LeaseReadObserver onto a ShardedCoordinator. Applied after construction because the NewShardedCoordinator signature is already heavily overloaded; see Coordinate.WithLeaseReadObserver for the equivalent option on the single-group coordinator, including the typed-nil guard rationale.

func (*ShardedCoordinator) WithPartitionResolver

func (c *ShardedCoordinator) WithPartitionResolver(r PartitionResolver) *ShardedCoordinator

WithPartitionResolver wires a PartitionResolver onto the coordinator's underlying ShardRouter. The resolver runs before the byte-range engine on every dispatch, so partition-keyspace schemes (e.g. SQS HT-FIFO) can override the default shard layout without breaking the engine's non-overlapping-cover invariant.

Applied after construction for the same reason as the other With* options on this type — NewShardedCoordinator is already heavily overloaded. Passing a nil resolver clears any previously- installed resolver.

func (*ShardedCoordinator) WithSampler

WithSampler wires a keyviz.Sampler onto a ShardedCoordinator. The coordinator calls sampler.Observe at dispatch entry — once per resolved (RouteID, mutation key) pair — to feed the key visualizer heatmap (design doc §5.1). Applied after construction for the same reason as WithLeaseReadObserver: NewShardedCoordinator is already heavily overloaded.

Passing a nil interface value is supported and disables sampling (the call site guards against it). Passing a typed-nil *keyviz.MemSampler also works because Observe is nil-safe by contract.

type TransactionManager

type TransactionManager struct {
	// contains filtered or unexported fields
}

func NewTransactionWithProposer

func NewTransactionWithProposer(proposer raftengine.Proposer, opts ...TransactionOption) *TransactionManager

func (*TransactionManager) Abort

func (t *TransactionManager) Abort(reqs []*pb.Request) (*TransactionResponse, error)

func (*TransactionManager) Close

func (t *TransactionManager) Close()

Close signals the TransactionManager to stop and drains any pending raw commit items, sending each an error so callers are not blocked forever.

func (*TransactionManager) Commit

func (t *TransactionManager) Commit(reqs []*pb.Request) (*TransactionResponse, error)

type TransactionOption

type TransactionOption func(*TransactionManager)

func WithProposalObserver

func WithProposalObserver(observer ProposalObserver) TransactionOption

WithProposalObserver records raft.Apply failures without coupling kv to a concrete monitoring backend.

type TransactionResponse

type TransactionResponse struct {
	CommitIndex uint64
}

type Transactional

type Transactional interface {
	Commit(reqs []*pb.Request) (*TransactionResponse, error)
	Abort(reqs []*pb.Request) (*TransactionResponse, error)
}

type TxnLockedError

type TxnLockedError struct {
	// contains filtered or unexported fields
}

func (*TxnLockedError) Error

func (e *TxnLockedError) Error() string

func (*TxnLockedError) Unwrap

func (e *TxnLockedError) Unwrap() error

type TxnMeta

type TxnMeta struct {
	PrimaryKey []byte
	LockTTLms  uint64
	CommitTS   uint64
}

TxnMeta is embedded into transactional raft log requests via a synthetic mutation (key prefix "!txn|meta|"). It is not persisted in the MVCC store.

func DecodeTxnMeta

func DecodeTxnMeta(b []byte) (TxnMeta, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL