Documentation
¶
Index ¶
- Constants
- Variables
- func EncodeTxnMeta(m TxnMeta) []byte
- func ExtractTxnUserKey(key []byte) []byte
- func LeaseReadForKeyThrough(c Coordinator, ctx context.Context, key []byte) (uint64, error)
- func LeaseReadThrough(c Coordinator, ctx context.Context) (uint64, error)
- func MaxLatestCommitTS(ctx context.Context, st store.MVCCStore, keys [][]byte) (uint64, error)
- func NewTxnLockedError(key []byte) error
- func NewTxnLockedErrorWithDetail(key []byte, detail string) error
- func TxnLockedDetails(err error) ([]byte, string, bool)
- type ActiveTimestampToken
- type ActiveTimestampTracker
- type Coordinate
- func (c *Coordinate) Clock() *HLC
- func (c *Coordinate) Close() error
- func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
- func (c *Coordinate) IsLeader() bool
- func (c *Coordinate) IsLeaderAcceptingWrites() bool
- func (c *Coordinate) IsLeaderForKey(_ []byte) bool
- func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error)
- func (c *Coordinate) LeaseReadForKey(ctx context.Context, _ []byte) (uint64, error)
- func (c *Coordinate) LinearizableRead(ctx context.Context) (uint64, error)
- func (c *Coordinate) LinearizableReadForKey(ctx context.Context, _ []byte) (uint64, error)
- func (c *Coordinate) ProposeHLCLease(ctx context.Context, ceilingMs int64) error
- func (c *Coordinate) RaftLeader() string
- func (c *Coordinate) RaftLeaderForKey(_ []byte) string
- func (c *Coordinate) RunHLCLeaseRenewal(ctx context.Context)
- func (c *Coordinate) VerifyLeader() error
- func (c *Coordinate) VerifyLeaderForKey(_ []byte) error
- type CoordinateResponse
- type Coordinator
- type CoordinatorOption
- type Elem
- type FSM
- type FSMCompactRuntime
- type FSMCompactor
- type FSMCompactorOption
- func WithFSMCompactorActiveTimestampTracker(tracker *ActiveTimestampTracker) FSMCompactorOption
- func WithFSMCompactorInterval(interval time.Duration) FSMCompactorOption
- func WithFSMCompactorLogger(logger *slog.Logger) FSMCompactorOption
- func WithFSMCompactorRetentionWindow(window time.Duration) FSMCompactorOption
- func WithFSMCompactorTimeout(timeout time.Duration) FSMCompactorOption
- type GRPCConnCache
- type HLC
- type LeaderProxy
- type LeaderRoutedStore
- func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, ...) error
- func (s *LeaderRoutedStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, ...) error
- func (s *LeaderRoutedStore) Close() error
- func (s *LeaderRoutedStore) Compact(ctx context.Context, minTS uint64) error
- func (s *LeaderRoutedStore) DeleteAt(ctx context.Context, key []byte, commitTS uint64) error
- func (s *LeaderRoutedStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
- func (s *LeaderRoutedStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
- func (s *LeaderRoutedStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, error)
- func (s *LeaderRoutedStore) ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS uint64) error
- func (s *LeaderRoutedStore) GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error)
- func (s *LeaderRoutedStore) GlobalLastCommitTS(ctx context.Context) uint64
- func (s *LeaderRoutedStore) LastCommitTS() uint64
- func (s *LeaderRoutedStore) LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error)
- func (s *LeaderRoutedStore) PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, ...) error
- func (s *LeaderRoutedStore) PutWithTTLAt(ctx context.Context, key []byte, value []byte, commitTS uint64, ...) error
- func (s *LeaderRoutedStore) Restore(buf io.Reader) error
- func (s *LeaderRoutedStore) ReverseScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
- func (s *LeaderRoutedStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
- func (s *LeaderRoutedStore) Snapshot() (store.Snapshot, error)
- func (s *LeaderRoutedStore) WriteConflictCountsByPrefix() map[string]uint64
- type LeaseReadObserver
- type LeaseReadableCoordinator
- type LockResolver
- type OP
- type OperationGroup
- type PartitionResolver
- type ProposalObserver
- type RaftStatusProvider
- type ShardGroup
- type ShardRouter
- func (s *ShardRouter) Abort(reqs []*pb.Request) (*TransactionResponse, error)
- func (s *ShardRouter) Commit(reqs []*pb.Request) (*TransactionResponse, error)
- func (s *ShardRouter) Get(ctx context.Context, key []byte) ([]byte, error)
- func (s *ShardRouter) Register(group uint64, tm Transactional, st store.MVCCStore)
- func (s *ShardRouter) ResolveGroup(rawKey []byte) (uint64, bool)
- func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter
- type ShardStore
- func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, ...) error
- func (s *ShardStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, ...) error
- func (s *ShardStore) Close() error
- func (s *ShardStore) Compact(ctx context.Context, minTS uint64) error
- func (s *ShardStore) DeleteAt(ctx context.Context, key []byte, commitTS uint64) error
- func (s *ShardStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
- func (s *ShardStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
- func (s *ShardStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, error)
- func (s *ShardStore) ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS uint64) error
- func (s *ShardStore) GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error)
- func (s *ShardStore) LastCommitTS() uint64
- func (s *ShardStore) LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error)
- func (s *ShardStore) PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, ...) error
- func (s *ShardStore) PutWithTTLAt(ctx context.Context, key []byte, value []byte, commitTS uint64, ...) error
- func (s *ShardStore) Restore(_ io.Reader) error
- func (s *ShardStore) ReverseScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
- func (s *ShardStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
- func (s *ShardStore) Snapshot() (store.Snapshot, error)
- func (s *ShardStore) WriteConflictCountsByPrefix() map[string]uint64
- type ShardedCoordinator
- func (c *ShardedCoordinator) Clock() *HLC
- func (c *ShardedCoordinator) Close() error
- func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
- func (c *ShardedCoordinator) IsLeader() bool
- func (c *ShardedCoordinator) IsLeaderForKey(key []byte) bool
- func (c *ShardedCoordinator) LeaseRead(ctx context.Context) (uint64, error)
- func (c *ShardedCoordinator) LeaseReadForKey(ctx context.Context, key []byte) (uint64, error)
- func (c *ShardedCoordinator) LinearizableRead(ctx context.Context) (uint64, error)
- func (c *ShardedCoordinator) LinearizableReadForKey(ctx context.Context, key []byte) (uint64, error)
- func (c *ShardedCoordinator) RaftLeader() string
- func (c *ShardedCoordinator) RaftLeaderForKey(key []byte) string
- func (c *ShardedCoordinator) RunHLCLeaseRenewal(ctx context.Context)
- func (c *ShardedCoordinator) VerifyLeader() error
- func (c *ShardedCoordinator) VerifyLeaderForKey(key []byte) error
- func (c *ShardedCoordinator) WithLeaseReadObserver(observer LeaseReadObserver) *ShardedCoordinator
- func (c *ShardedCoordinator) WithPartitionResolver(r PartitionResolver) *ShardedCoordinator
- func (c *ShardedCoordinator) WithSampler(s keyviz.Sampler) *ShardedCoordinator
- type TransactionManager
- type TransactionOption
- type TransactionResponse
- type Transactional
- type TxnLockedError
- type TxnMeta
Constants ¶
const ( // DynamoTableMetaPrefix prefixes DynamoDB table metadata keys. DynamoTableMetaPrefix = "!ddb|meta|table|" // DynamoTableGenerationPrefix prefixes DynamoDB table generation keys. DynamoTableGenerationPrefix = "!ddb|meta|gen|" // DynamoItemPrefix prefixes DynamoDB item storage keys. DynamoItemPrefix = "!ddb|item|" // DynamoGSIPrefix prefixes DynamoDB GSI storage keys. DynamoGSIPrefix = "!ddb|gsi|" )
const HLCLogicalBits = hlcLogicalBits
HLCLogicalBits is the number of low bits in an HLC timestamp reserved for the in-memory logical counter (vs the upper bits which encode the Raft-agreed wall-clock millis). Exported so downstream tools — admin dashboard ISO-8601 formatting being the motivating case — can recover the physical half without hard-coding a magic number that silently drifts when the layout changes (Claude Issue 4 on PR #658).
const ( // TxnKeyPrefix is the common prefix shared by all transaction internal // key namespaces. All per-namespace prefixes below are derived from it. // NOTE: store/store.go duplicates this literal as txnInternalKeyPrefix // because an import cycle prevents store from importing kv. TxnKeyPrefix = "!txn|" )
const TxnMetaPrefix = txnMetaPrefix
TxnMetaPrefix is the key prefix used for transaction metadata mutations.
Variables ¶
var ( ErrTxnMetaMissing = errors.New("txn meta missing") ErrTxnInvalidMeta = errors.New("txn meta invalid") ErrTxnLocked = errors.New("txn locked") ErrTxnCommitTSRequired = errors.New("txn commit ts required") ErrTxnAlreadyCommitted = errors.New("txn already committed") ErrTxnAlreadyAborted = errors.New("txn already aborted") ErrTxnPrimaryKeyRequired = errors.New("txn primary key required") )
var ErrCrossShardMutationBatchNotSupported = errors.New("cross-shard mutation batches are not supported")
var ErrCrossShardTransactionNotSupported = errors.New("cross-shard transactions are not supported")
var ErrInvalidRequest = errors.New("invalid request")
var ErrLeaderNotFound = errors.New("leader not found")
var ErrNotImplemented = errors.New("not implemented")
var ErrUnknownRequestType = errors.New("unknown request type")
Functions ¶
func EncodeTxnMeta ¶
func ExtractTxnUserKey ¶
ExtractTxnUserKey returns the logical user key embedded in a transaction- internal key such as !txn|lock|, !txn|cmt|, or !txn|meta|. It returns nil when the key does not use a transaction-internal namespace or is malformed.
func LeaseReadForKeyThrough ¶
LeaseReadForKeyThrough is the key-routed counterpart of LeaseReadThrough.
func LeaseReadThrough ¶
func LeaseReadThrough(c Coordinator, ctx context.Context) (uint64, error)
LeaseReadThrough is a helper that calls LeaseRead when the coordinator supports it, falling back to LinearizableRead otherwise. Adapter call sites use this so they don't have to repeat the type-assertion dance.
func MaxLatestCommitTS ¶
MaxLatestCommitTS returns the maximum commit timestamp for the provided keys.
Missing keys are ignored. If any LatestCommitTS lookup returns an error, the error is returned to the caller.
func NewTxnLockedError ¶
Types ¶
type ActiveTimestampToken ¶
type ActiveTimestampToken struct {
// contains filtered or unexported fields
}
ActiveTimestampToken releases one tracked timestamp when the owning operation completes.
func (*ActiveTimestampToken) Release ¶
func (t *ActiveTimestampToken) Release()
type ActiveTimestampTracker ¶
type ActiveTimestampTracker struct {
// contains filtered or unexported fields
}
ActiveTimestampTracker tracks in-flight read or transaction timestamps that must remain readable while background compaction is running.
func NewActiveTimestampTracker ¶
func NewActiveTimestampTracker() *ActiveTimestampTracker
func (*ActiveTimestampTracker) Oldest ¶
func (t *ActiveTimestampTracker) Oldest() uint64
func (*ActiveTimestampTracker) Pin ¶
func (t *ActiveTimestampTracker) Pin(ts uint64) *ActiveTimestampToken
type Coordinate ¶
type Coordinate struct {
// contains filtered or unexported fields
}
func NewCoordinatorWithEngine ¶
func NewCoordinatorWithEngine(txm Transactional, engine raftengine.Engine, opts ...CoordinatorOption) *Coordinate
func (*Coordinate) Clock ¶
func (c *Coordinate) Clock() *HLC
func (*Coordinate) Close ¶
func (c *Coordinate) Close() error
Close releases any engine-side registrations (currently the leader-loss callback) held by this Coordinate. It is safe to call on a nil receiver and multiple times. Owners whose lifetime matches the engine's do not need to call Close; owners who discard the Coordinate before closing the engine MUST.
func (*Coordinate) Dispatch ¶
func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
func (*Coordinate) IsLeader ¶
func (c *Coordinate) IsLeader() bool
func (*Coordinate) IsLeaderAcceptingWrites ¶
func (c *Coordinate) IsLeaderAcceptingWrites() bool
IsLeaderAcceptingWrites reports whether this node is leader and not currently transferring leadership. Background proposers should gate on this to avoid piling up dropped proposals while a transfer is in flight.
func (*Coordinate) IsLeaderForKey ¶
func (c *Coordinate) IsLeaderForKey(_ []byte) bool
func (*Coordinate) LeaseRead ¶
func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error)
LeaseRead returns a read fence backed by a leader-local lease when available, falling back to a full LinearizableRead when no fast path is live or the engine does not implement LeaseProvider.
The PRIMARY lease path is maintained inside the engine from ongoing MsgAppResp / MsgHeartbeatResp traffic, so that path does not rely on callers sampling time.Now() before the slow path to "extend" a lease afterwards. The earlier pre-read sampling was racy under congestion: if a LinearizableRead took longer than LeaseDuration, the extension would land already expired and the lease never warmed up. The engine-driven anchor is refreshed every heartbeat independent of read latency.
The SECONDARY caller-side lease remains as a rollout fallback, still populated by the original pre-read sampling; it covers the narrow window between startup and the first quorum heartbeat round landing on the engine.
The returned index is the engine's current applied index (fast path) or the index returned by LinearizableRead (slow path). Callers that resolve timestamps via store.LastCommitTS may discard the value.
func (*Coordinate) LeaseReadForKey ¶
func (*Coordinate) LinearizableRead ¶
func (c *Coordinate) LinearizableRead(ctx context.Context) (uint64, error)
func (*Coordinate) LinearizableReadForKey ¶
func (*Coordinate) ProposeHLCLease ¶
func (c *Coordinate) ProposeHLCLease(ctx context.Context, ceilingMs int64) error
ProposeHLCLease proposes a new physical ceiling to the Raft cluster. Only the current leader should call this; followers silently ignore proposals from non-leaders via Raft's leader-only write guarantee.
func (*Coordinate) RaftLeader ¶
func (c *Coordinate) RaftLeader() string
RaftLeader returns the current leader's address as known by this node.
func (*Coordinate) RaftLeaderForKey ¶
func (c *Coordinate) RaftLeaderForKey(_ []byte) string
func (*Coordinate) RunHLCLeaseRenewal ¶
func (c *Coordinate) RunHLCLeaseRenewal(ctx context.Context)
RunHLCLeaseRenewal runs a background loop that periodically proposes a new physical ceiling to the Raft cluster while this node is the leader.
The ceiling is set to now + hlcPhysicalWindowMs (3 s) and is renewed every hlcRenewalInterval (1 s), mirroring TiDB's TSO window strategy. Because the window is always at least 2 s ahead of any real timestamp, a new leader will never issue timestamps that overlap with the previous leader's window.
RunHLCLeaseRenewal blocks until ctx is cancelled; call it in a goroutine.
func (*Coordinate) VerifyLeader ¶
func (c *Coordinate) VerifyLeader() error
func (*Coordinate) VerifyLeaderForKey ¶
func (c *Coordinate) VerifyLeaderForKey(_ []byte) error
type CoordinateResponse ¶
type CoordinateResponse struct {
CommitIndex uint64
}
type Coordinator ¶
type Coordinator interface {
Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
IsLeader() bool
VerifyLeader() error
LinearizableRead(ctx context.Context) (uint64, error)
RaftLeader() string
IsLeaderForKey(key []byte) bool
VerifyLeaderForKey(key []byte) error
RaftLeaderForKey(key []byte) string
Clock() *HLC
}
type CoordinatorOption ¶
type CoordinatorOption func(*Coordinate)
CoordinatorOption is a functional option for Coordinate constructors.
func WithHLC ¶
func WithHLC(hlc *HLC) CoordinatorOption
WithHLC sets a pre-created HLC on the coordinator. Use this together with NewKvFSMWithHLC so the FSM and coordinator share the same clock instance: the FSM advances physicalCeiling on every applied HLC lease entry, and the coordinator reads it inside Next() to floor new timestamps above the previous leader's committed window.
func WithLeaseReadObserver ¶
func WithLeaseReadObserver(observer LeaseReadObserver) CoordinatorOption
WithLeaseReadObserver wires a LeaseReadObserver onto a Coordinate. This is the mechanism monitoring uses to surface the lease-hit ratio panel on the Redis hot-path dashboard (see the "Hot Path" row in monitoring/grafana/dashboards/elastickv-redis-summary.json).
Typed-nil guard: a caller passing a typed-nil pointer (e.g. `var o *myObserver; WithLeaseReadObserver(o)`) produces an interface value that is NOT equal to nil under the normal `!= nil` check, yet invoking ObserveLeaseRead would panic. Normalise here with reflect.Value.IsNil so the hot-path nil check in LeaseRead stays a single branch on a real nil interface.
type FSM ¶
type FSM interface {
raftengine.StateMachine
}
type FSMCompactRuntime ¶
type FSMCompactRuntime struct {
GroupID uint64
StatusReader RaftStatusProvider
Store store.MVCCStore
}
type FSMCompactor ¶
type FSMCompactor struct {
// contains filtered or unexported fields
}
func NewFSMCompactor ¶
func NewFSMCompactor(runtimes []FSMCompactRuntime, opts ...FSMCompactorOption) *FSMCompactor
type FSMCompactorOption ¶
type FSMCompactorOption func(*FSMCompactor)
func WithFSMCompactorActiveTimestampTracker ¶
func WithFSMCompactorActiveTimestampTracker(tracker *ActiveTimestampTracker) FSMCompactorOption
func WithFSMCompactorInterval ¶
func WithFSMCompactorInterval(interval time.Duration) FSMCompactorOption
func WithFSMCompactorLogger ¶
func WithFSMCompactorLogger(logger *slog.Logger) FSMCompactorOption
func WithFSMCompactorRetentionWindow ¶
func WithFSMCompactorRetentionWindow(window time.Duration) FSMCompactorOption
func WithFSMCompactorTimeout ¶
func WithFSMCompactorTimeout(timeout time.Duration) FSMCompactorOption
type GRPCConnCache ¶
type GRPCConnCache struct {
// contains filtered or unexported fields
}
GRPCConnCache reuses gRPC connections per address. gRPC itself handles reconnection on transient failures; we only force a re-dial if the conn has already been closed (Shutdown).
func (*GRPCConnCache) Close ¶
func (c *GRPCConnCache) Close() error
func (*GRPCConnCache) ConnFor ¶
func (c *GRPCConnCache) ConnFor(addr string) (*grpc.ClientConn, error)
type HLC ¶
type HLC struct {
// contains filtered or unexported fields
}
HLC implements a hybrid logical clock where the physical part is agreed upon via Raft consensus and the logical counter is managed purely in memory.
Layout (ms | logical):
high 48 bits: wall clock milliseconds since Unix epoch ← Raft-agreed physical part low 16 bits : logical counter ← in-memory only
Physical ceiling (前半, consensus):
The leader periodically commits a HLC lease entry to the Raft log that establishes an upper bound for the physical timestamp (physicalCeiling). All nodes apply this entry via the FSM, advancing their local ceiling. When a new leader is elected it inherits the committed ceiling from the FSM state so it never issues timestamps below the previous leader's window.
Logical counter (後半, memory):
The 16-bit counter increments purely in memory on every Next() call within the same millisecond. It resets to 0 whenever wall time advances, and overflows by bumping the wall millisecond by one. No Raft round-trip is needed for logical counter advancement.
func (*HLC) Current ¶
Current returns the last issued or observed HLC value without advancing it. If no timestamp has been generated yet, it returns 0.
func (*HLC) Next ¶
Next returns the next hybrid logical timestamp.
Physical part (upper 48 bits): derived from the wall clock, but never less than the Raft-agreed physicalCeiling so that a newly elected leader cannot issue timestamps that collide with the previous leader's window.
Logical part (lower 16 bits): a pure in-memory counter that increments without any Raft round-trip. It resets to 0 whenever the physical millisecond advances and overflows by bumping the physical millisecond by one.
func (*HLC) PhysicalCeiling ¶
PhysicalCeiling returns the last Raft-committed physical ceiling in Unix milliseconds. Returns 0 if no ceiling has been established yet.
func (*HLC) SetPhysicalCeiling ¶
SetPhysicalCeiling atomically advances the Raft-agreed physical ceiling. It is called by the FSM whenever a HLC lease entry is applied to the log. The ceiling is monotonically increasing: calls with a smaller value are silently ignored.
type LeaderProxy ¶
type LeaderProxy struct {
// contains filtered or unexported fields
}
LeaderProxy forwards transactional requests to the current raft leader when the local node is not the leader.
func NewLeaderProxyWithEngine ¶
func NewLeaderProxyWithEngine(engine raftengine.Engine, opts ...TransactionOption) *LeaderProxy
func (*LeaderProxy) Abort ¶
func (p *LeaderProxy) Abort(reqs []*pb.Request) (*TransactionResponse, error)
func (*LeaderProxy) Close ¶
func (p *LeaderProxy) Close() error
func (*LeaderProxy) Commit ¶
func (p *LeaderProxy) Commit(reqs []*pb.Request) (*TransactionResponse, error)
type LeaderRoutedStore ¶
type LeaderRoutedStore struct {
// contains filtered or unexported fields
}
LeaderRoutedStore is an MVCCStore wrapper that serves reads from the local store only when leadership is verified; otherwise it proxies reads to the current leader via gRPC.
This is intended for single-raft-group deployments where the underlying store itself is not leader-aware (e.g. *store.MVCCStore).
Writes and maintenance operations are delegated to the local store.
func NewLeaderRoutedStore ¶
func NewLeaderRoutedStore(local store.MVCCStore, coordinator Coordinator) *LeaderRoutedStore
func (*LeaderRoutedStore) ApplyMutations ¶
func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error
func (*LeaderRoutedStore) ApplyMutationsRaft ¶
func (s *LeaderRoutedStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error
ApplyMutationsRaft forwards to the local store's raft-apply variant. See store.MVCCStore for the durability contract.
func (*LeaderRoutedStore) Close ¶
func (s *LeaderRoutedStore) Close() error
func (*LeaderRoutedStore) Compact ¶
func (s *LeaderRoutedStore) Compact(ctx context.Context, minTS uint64) error
func (*LeaderRoutedStore) DeletePrefixAt ¶
func (*LeaderRoutedStore) DeletePrefixAtRaft ¶
func (s *LeaderRoutedStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
DeletePrefixAtRaft forwards to the local store's raft-apply variant.
func (*LeaderRoutedStore) GlobalLastCommitTS ¶
func (s *LeaderRoutedStore) GlobalLastCommitTS(ctx context.Context) uint64
GlobalLastCommitTS returns the most recently committed HLC timestamp from the authoritative leader. On the leader this is the local LastCommitTS. On a follower the method issues a lightweight RPC (RawLatestCommitTS with an empty key) so callers obtain a non-stale snapshot — critical for ConsistentRead semantics where followers must not serve reads at a stale local watermark. Falls back to the local LastCommitTS on any error.
func (*LeaderRoutedStore) LastCommitTS ¶
func (s *LeaderRoutedStore) LastCommitTS() uint64
func (*LeaderRoutedStore) LatestCommitTS ¶
func (*LeaderRoutedStore) PutWithTTLAt ¶
func (*LeaderRoutedStore) ReverseScanAt ¶
func (*LeaderRoutedStore) WriteConflictCountsByPrefix ¶
func (s *LeaderRoutedStore) WriteConflictCountsByPrefix() map[string]uint64
WriteConflictCountsByPrefix delegates to the local MVCC store. The leader-routed wrapper does not add cross-group conflict detection of its own, so the node-local view IS the authoritative view.
type LeaseReadObserver ¶
type LeaseReadObserver interface {
// ObserveLeaseRead is called with hit=true when the lease fast path
// served the read from local AppliedIndex, or hit=false when the
// coordinator fell back to a full LinearizableRead (expired lease,
// engine reported non-leader, or leader-loss callback raced with
// the request).
ObserveLeaseRead(hit bool)
}
LeaseReadObserver records lease-read fast-path vs slow-path outcomes without coupling kv to a concrete monitoring backend. It is called once per LeaseRead invocation that actually evaluates the lease (the initial type-assertion/LeaseDuration==0 short-circuits are NOT counted because they indicate the engine does not participate in lease reads at all).
Implementations MUST be safe for concurrent use and MUST NOT block; the observer is invoked on the Redis GET hot path.
type LeaseReadableCoordinator ¶
type LeaseReadableCoordinator interface {
LeaseRead(ctx context.Context) (uint64, error)
LeaseReadForKey(ctx context.Context, key []byte) (uint64, error)
}
LeaseReadableCoordinator is the optional capability implemented by coordinators that participate in the leader-local lease read path (see docs/design/2026_04_20_implemented_lease_read.md). Callers that want lease reads should type-assert to this interface and fall back to LinearizableRead when the assertion fails, following the same pattern as raftengine.LeaseProvider. Keeping the lease methods OFF the Coordinator interface avoids breaking existing external implementations that predate the lease-read feature.
type LockResolver ¶
type LockResolver struct {
// contains filtered or unexported fields
}
LockResolver periodically scans for expired transaction locks and resolves them. This handles the case where secondary commit fails and leaves orphaned locks that no read path would discover (e.g., cold keys).
func NewLockResolver ¶
func NewLockResolver(ss *ShardStore, groups map[uint64]*ShardGroup, log *slog.Logger) *LockResolver
NewLockResolver creates and starts a background lock resolver.
func (*LockResolver) Close ¶
func (lr *LockResolver) Close()
Close stops the background resolver and waits for it to finish.
type OperationGroup ¶
type OperationGroup[T OP] struct { Elems []*Elem[T] IsTxn bool // StartTS is a logical timestamp captured at transaction begin. // It is ignored for non-transactional groups. StartTS uint64 // CommitTS optionally pins the transaction commit timestamp. // Coordinators choose one automatically when this is zero. CommitTS uint64 // ReadKeys carries the transaction's read set so the FSM can validate // read-write conflicts atomically with the commit. ReadKeys [][]byte }
OperationGroup is a group of operations that should be executed atomically.
type PartitionResolver ¶
type PartitionResolver interface {
ResolveGroup(key []byte) (uint64, bool)
// RecognisesPartitionedKey reports whether the key SHAPE is
// one this resolver is responsible for. Implementations
// answer based on prefix / structural inspection only — the
// answer must NOT depend on any in-memory mapping that could
// drift out of sync, otherwise the router cannot reliably
// fail-closed for unresolved-but-recognised keys.
RecognisesPartitionedKey(key []byte) bool
}
PartitionResolver maps a key to its owning Raft group when the key belongs to a partition-scheme keyspace (e.g. SQS HT-FIFO, where each (queue, partition) pair lives on a different group). ShardRouter consults the resolver before falling through to the byte-range engine, so partition routing can override the default shard-range layout without breaking the engine's non-overlapping- cover invariant.
Implementations must be safe for concurrent use — ResolveGroup is called on the request hot path. Returning (0, false) for a key the resolver does not recognise lets the router fall through to the engine. Returning (0, false) for a key the resolver DOES recognise (the partitioned shape matches but the queue is not in the map, or the partition is out of range) is also valid; the router uses RecognisesPartitionedKey to distinguish "not partitioned, fall through" from "partitioned but unresolved, fail closed".
The fail-closed split matters under partition-map drift / a partial rollout: without it, an unresolved partitioned key would silently land on the engine's SQS-catalog default group (because routeKey normalises every !sqs|... key to !sqs|route|global) instead of surfacing a routing error.
type ProposalObserver ¶
type ProposalObserver interface {
ObserveProposalFailure()
}
ProposalObserver records raft proposal failures for operational metrics.
type RaftStatusProvider ¶
type RaftStatusProvider interface {
Status() raftengine.Status
}
type ShardGroup ¶
type ShardGroup struct {
Engine raftengine.Engine
Store store.MVCCStore
Txn Transactional
// contains filtered or unexported fields
}
type ShardRouter ¶
type ShardRouter struct {
// contains filtered or unexported fields
}
ShardRouter routes requests to multiple raft groups based on key ranges.
Cross-shard transactions are not supported. They require distributed coordination (for example, 2PC) to ensure atomicity.
Non-transactional request batches may still partially succeed across shards.
func NewShardRouter ¶
func NewShardRouter(e *distribution.Engine) *ShardRouter
NewShardRouter creates a new router.
func (*ShardRouter) Abort ¶
func (s *ShardRouter) Abort(reqs []*pb.Request) (*TransactionResponse, error)
Abort dispatches aborts to the correct raft group.
func (*ShardRouter) Commit ¶
func (s *ShardRouter) Commit(reqs []*pb.Request) (*TransactionResponse, error)
func (*ShardRouter) Register ¶
func (s *ShardRouter) Register(group uint64, tm Transactional, st store.MVCCStore)
Register associates a raft group ID with its transactional manager and store.
func (*ShardRouter) ResolveGroup ¶
func (s *ShardRouter) ResolveGroup(rawKey []byte) (uint64, bool)
ResolveGroup tries the partition resolver first (when installed), then falls through to the byte-range engine. Exposed at package scope so ShardedCoordinator's per-key helpers (groupForKey, routeAndGroupForKey, engineGroupIDForKey, groupMutations) can consult the same dispatch path Commit / Abort / Get use — without it those helpers would bypass the resolver and partitioned-FIFO traffic would silently mis-route through 2PC and the read paths.
The resolver runs on the RAW key before any user-key normalization. SQS keys in particular are collapsed to !sqs|route|global by routeKey to keep the engine's per-shard layout simple, but that collapse hides the partitioned-prefix information the resolver needs (issue: codex P1 / gemini high on PR #715). The engine still sees the post-normalization key, so legacy routing (catalog → !sqs|route|global → default group) stays unchanged.
Fail-closed for recognised-but-unresolved keys: when the resolver recognises a partitioned shape (RecognisesPartitionedKey == true) but cannot resolve the queue/partition pair (ResolveGroup returns ok=false), the router refuses to fall through to the engine. Otherwise the engine would route the partitioned key to !sqs|route|global's default group, silently mis-routing HT-FIFO traffic during partition-map drift or partial rollout (codex P1 round 2 on PR #715).
Returns (0, false) when neither the resolver nor the engine recognises the key. Caller surfaces this as an "unknown group" error so a partitioned-prefix key whose queue is missing from the resolver map fails closed rather than landing on whichever engine-default group happens to cover the raw bytes.
func (*ShardRouter) WithPartitionResolver ¶
func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter
WithPartitionResolver installs a partition-keyspace resolver that is consulted before the byte-range engine on every dispatch. A nil resolver clears any previously-installed resolver. Returns the receiver so callers can chain.
Intended for use during startup, before the router begins handling requests. Interface assignment in Go is not atomic, so a call that races with a concurrent ResolveGroup in resolveGroup may produce a torn read; callers must wire the resolver once during construction (parseRuntimeConfig → NewShardedCoordinator → WithPartitionResolver) and treat any post-startup re-assignment as undefined behaviour.
type ShardStore ¶
type ShardStore struct {
// contains filtered or unexported fields
}
ShardStore routes MVCC reads to shard-specific stores and proxies to leaders when needed.
func NewShardStore ¶
func NewShardStore(engine *distribution.Engine, groups map[uint64]*ShardGroup) *ShardStore
NewShardStore creates a sharded MVCC store wrapper.
func (*ShardStore) ApplyMutations ¶
func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error
ApplyMutations applies a batch of mutations to the correct shard store.
All mutations must belong to the same shard. Cross-shard mutation batches are not supported.
func (*ShardStore) ApplyMutationsRaft ¶
func (s *ShardStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error
ApplyMutationsRaft is the raft-apply variant; see store.MVCCStore for the durability contract. Only the FSM may call this method.
func (*ShardStore) Close ¶
func (s *ShardStore) Close() error
func (*ShardStore) DeletePrefixAt ¶
func (s *ShardStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
DeletePrefixAt applies a prefix delete to every shard in the store.
func (*ShardStore) DeletePrefixAtRaft ¶
func (s *ShardStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt.
func (*ShardStore) LastCommitTS ¶
func (s *ShardStore) LastCommitTS() uint64
func (*ShardStore) LatestCommitTS ¶
func (*ShardStore) PutWithTTLAt ¶
func (*ShardStore) ReverseScanAt ¶
func (*ShardStore) ScanAt ¶
func (s *ShardStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
ScanAt scans keys across shards at the given timestamp. Note: when the range spans multiple shards, each shard may have a different Raft apply position. This means the returned view is NOT a globally consistent snapshot — it is a best-effort point-in-time scan. Callers requiring cross-shard consistency should use a transaction or implement a cross-shard snapshot fence.
func (*ShardStore) WriteConflictCountsByPrefix ¶
func (s *ShardStore) WriteConflictCountsByPrefix() map[string]uint64
WriteConflictCountsByPrefix aggregates OCC conflict counts across every shard group owned by this ShardStore. Per-shard counts share the same "<kind>|<key_prefix>" label schema, so a simple sum gives the node-wide view. The result is always non-nil.
type ShardedCoordinator ¶
type ShardedCoordinator struct {
// contains filtered or unexported fields
}
ShardedCoordinator routes operations to shard-specific raft groups. It issues timestamps via a shared HLC and uses ShardRouter to dispatch.
func NewShardedCoordinator ¶
func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*ShardGroup, defaultGroup uint64, clock *HLC, st store.MVCCStore) *ShardedCoordinator
NewShardedCoordinator builds a coordinator for the provided shard groups. The defaultGroup is used for non-keyed leader checks.
func (*ShardedCoordinator) Clock ¶
func (c *ShardedCoordinator) Clock() *HLC
func (*ShardedCoordinator) Close ¶
func (c *ShardedCoordinator) Close() error
Close releases per-shard engine-side registrations. Idempotent.
func (*ShardedCoordinator) Dispatch ¶
func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
func (*ShardedCoordinator) IsLeader ¶
func (c *ShardedCoordinator) IsLeader() bool
func (*ShardedCoordinator) IsLeaderForKey ¶
func (c *ShardedCoordinator) IsLeaderForKey(key []byte) bool
func (*ShardedCoordinator) LeaseRead ¶
func (c *ShardedCoordinator) LeaseRead(ctx context.Context) (uint64, error)
LeaseRead routes through the default group's lease. See Coordinate.LeaseRead for semantics.
func (*ShardedCoordinator) LeaseReadForKey ¶
LeaseReadForKey performs the lease check on the shard group that owns key. Each group maintains its own lease since each group has independent leadership and term.
func (*ShardedCoordinator) LinearizableRead ¶
func (c *ShardedCoordinator) LinearizableRead(ctx context.Context) (uint64, error)
func (*ShardedCoordinator) LinearizableReadForKey ¶
func (*ShardedCoordinator) RaftLeader ¶
func (c *ShardedCoordinator) RaftLeader() string
func (*ShardedCoordinator) RaftLeaderForKey ¶
func (c *ShardedCoordinator) RaftLeaderForKey(key []byte) string
func (*ShardedCoordinator) RunHLCLeaseRenewal ¶
func (c *ShardedCoordinator) RunHLCLeaseRenewal(ctx context.Context)
RunHLCLeaseRenewal periodically proposes a new physical ceiling to the default shard group's Raft cluster while this node is the leader of that group. This mirrors the single-shard Coordinate.RunHLCLeaseRenewal behaviour, ensuring the shared HLC ceiling is maintained in multi-shard deployments.
RunHLCLeaseRenewal blocks until ctx is cancelled; call it in a goroutine.
func (*ShardedCoordinator) VerifyLeader ¶
func (c *ShardedCoordinator) VerifyLeader() error
func (*ShardedCoordinator) VerifyLeaderForKey ¶
func (c *ShardedCoordinator) VerifyLeaderForKey(key []byte) error
func (*ShardedCoordinator) WithLeaseReadObserver ¶
func (c *ShardedCoordinator) WithLeaseReadObserver(observer LeaseReadObserver) *ShardedCoordinator
WithLeaseReadObserver wires a LeaseReadObserver onto a ShardedCoordinator. Applied after construction because the NewShardedCoordinator signature is already heavily overloaded; see Coordinate.WithLeaseReadObserver for the equivalent option on the single-group coordinator, including the typed-nil guard rationale.
func (*ShardedCoordinator) WithPartitionResolver ¶
func (c *ShardedCoordinator) WithPartitionResolver(r PartitionResolver) *ShardedCoordinator
WithPartitionResolver wires a PartitionResolver onto the coordinator's underlying ShardRouter. The resolver runs before the byte-range engine on every dispatch, so partition-keyspace schemes (e.g. SQS HT-FIFO) can override the default shard layout without breaking the engine's non-overlapping-cover invariant.
Applied after construction for the same reason as the other With* options on this type — NewShardedCoordinator is already heavily overloaded. Passing a nil resolver clears any previously- installed resolver.
func (*ShardedCoordinator) WithSampler ¶
func (c *ShardedCoordinator) WithSampler(s keyviz.Sampler) *ShardedCoordinator
WithSampler wires a keyviz.Sampler onto a ShardedCoordinator. The coordinator calls sampler.Observe at dispatch entry — once per resolved (RouteID, mutation key) pair — to feed the key visualizer heatmap (design doc §5.1). Applied after construction for the same reason as WithLeaseReadObserver: NewShardedCoordinator is already heavily overloaded.
Passing a nil interface value is supported and disables sampling (the call site guards against it). Passing a typed-nil *keyviz.MemSampler also works because Observe is nil-safe by contract.
type TransactionManager ¶
type TransactionManager struct {
// contains filtered or unexported fields
}
func NewTransactionWithProposer ¶
func NewTransactionWithProposer(proposer raftengine.Proposer, opts ...TransactionOption) *TransactionManager
func (*TransactionManager) Abort ¶
func (t *TransactionManager) Abort(reqs []*pb.Request) (*TransactionResponse, error)
func (*TransactionManager) Close ¶
func (t *TransactionManager) Close()
Close signals the TransactionManager to stop and drains any pending raw commit items, sending each an error so callers are not blocked forever.
func (*TransactionManager) Commit ¶
func (t *TransactionManager) Commit(reqs []*pb.Request) (*TransactionResponse, error)
type TransactionOption ¶
type TransactionOption func(*TransactionManager)
func WithProposalObserver ¶
func WithProposalObserver(observer ProposalObserver) TransactionOption
WithProposalObserver records raft.Apply failures without coupling kv to a concrete monitoring backend.
type TransactionResponse ¶
type TransactionResponse struct {
CommitIndex uint64
}
type Transactional ¶
type Transactional interface {
Commit(reqs []*pb.Request) (*TransactionResponse, error)
Abort(reqs []*pb.Request) (*TransactionResponse, error)
}
type TxnLockedError ¶
type TxnLockedError struct {
// contains filtered or unexported fields
}
func (*TxnLockedError) Error ¶
func (e *TxnLockedError) Error() string
func (*TxnLockedError) Unwrap ¶
func (e *TxnLockedError) Unwrap() error
type TxnMeta ¶
TxnMeta is embedded into transactional raft log requests via a synthetic mutation (key prefix "!txn|meta|"). It is not persisted in the MVCC store.
func DecodeTxnMeta ¶
Source Files
¶
- active_timestamp_tracker.go
- compactor.go
- coordinator.go
- fsm.go
- grpc_conn_cache.go
- hlc.go
- hlc_wall.go
- latest_commit_ts.go
- leader_proxy.go
- leader_routed_store.go
- lease_state.go
- lock_resolver.go
- raft_engine.go
- shard_key.go
- shard_router.go
- shard_store.go
- sharded_coordinator.go
- snapshot.go
- transaction.go
- transcoder.go
- txn_codec.go
- txn_consts.go
- txn_errors.go
- txn_keys.go