Documentation
¶
Index ¶
- Constants
- Variables
- func EncodeTxnMeta(m TxnMeta) []byte
- func ExtractTxnUserKey(key []byte) []byte
- func LeaseReadAllGroupsThrough(c Coordinator, ctx context.Context) error
- func LeaseReadForKeyThrough(c Coordinator, ctx context.Context, key []byte) (uint64, error)
- func LeaseReadGroupKeys(c Coordinator, keys [][]byte) [][]byte
- func LeaseReadThrough(c Coordinator, ctx context.Context) (uint64, error)
- func MaxLatestCommitTS(ctx context.Context, st store.MVCCStore, keys [][]byte) (uint64, error)
- func NewTxnLockedError(key []byte) error
- func NewTxnLockedErrorWithDetail(key []byte, detail string) error
- func PrimaryKeyForElems(reqs []*Elem[OP]) []byte
- func ReadSnapshotHeader(r *bufio.Reader) (ceiling, cutover uint64, err error)
- func TxnLockedDetails(err error) ([]byte, string, bool)
- type ActiveTimestampToken
- type ActiveTimestampTracker
- type AllGroupsLeaseReadableCoordinator
- type Coordinate
- func (c *Coordinate) Clock() *HLC
- func (c *Coordinate) Close() error
- func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
- func (c *Coordinate) EngineGroupIDForKey(_ []byte) uint64
- func (c *Coordinate) IsLeader() bool
- func (c *Coordinate) IsLeaderAcceptingWrites() bool
- func (c *Coordinate) IsLeaderForKey(_ []byte) bool
- func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error)
- func (c *Coordinate) LeaseReadForKey(ctx context.Context, _ []byte) (uint64, error)
- func (c *Coordinate) LinearizableRead(ctx context.Context) (uint64, error)
- func (c *Coordinate) LinearizableReadForKey(ctx context.Context, _ []byte) (uint64, error)
- func (c *Coordinate) ProposeHLCLease(ctx context.Context, ceilingMs int64) error
- func (c *Coordinate) RaftLeader() string
- func (c *Coordinate) RaftLeaderForKey(_ []byte) string
- func (c *Coordinate) RunHLCLeaseRenewal(ctx context.Context)
- func (c *Coordinate) VerifyLeader(ctx context.Context) error
- func (c *Coordinate) VerifyLeaderForKey(ctx context.Context, _ []byte) error
- type CoordinateResponse
- type Coordinator
- type CoordinatorOption
- type CutoverSource
- type Elem
- type EncryptionApplier
- type FSM
- type FSMCompactRuntime
- type FSMCompactor
- type FSMCompactorOption
- func WithFSMCompactorActiveTimestampTracker(tracker *ActiveTimestampTracker) FSMCompactorOption
- func WithFSMCompactorInterval(interval time.Duration) FSMCompactorOption
- func WithFSMCompactorLogger(logger *slog.Logger) FSMCompactorOption
- func WithFSMCompactorRetentionWindow(window time.Duration) FSMCompactorOption
- func WithFSMCompactorTimeout(timeout time.Duration) FSMCompactorOption
- type FSMOption
- type GRPCConnCache
- type GroupRoutableCoordinator
- type HLC
- type LeaderProxy
- type LeaderRoutedStore
- func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, ...) error
- func (s *LeaderRoutedStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, ...) error
- func (s *LeaderRoutedStore) ApplyMutationsRaftAt(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, ...) error
- func (s *LeaderRoutedStore) Close() error
- func (s *LeaderRoutedStore) CommittedVersionAt(ctx context.Context, key []byte, commitTS uint64) (bool, error)
- func (s *LeaderRoutedStore) Compact(ctx context.Context, minTS uint64) error
- func (s *LeaderRoutedStore) DeleteAt(ctx context.Context, key []byte, commitTS uint64) error
- func (s *LeaderRoutedStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
- func (s *LeaderRoutedStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
- func (s *LeaderRoutedStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, ...) error
- func (s *LeaderRoutedStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, error)
- func (s *LeaderRoutedStore) ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS uint64) error
- func (s *LeaderRoutedStore) GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error)
- func (s *LeaderRoutedStore) GlobalLastCommitTS(ctx context.Context) uint64
- func (s *LeaderRoutedStore) LastAppliedIndex() (uint64, bool, error)
- func (s *LeaderRoutedStore) LastCommitTS() uint64
- func (s *LeaderRoutedStore) LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error)
- func (s *LeaderRoutedStore) PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, ...) error
- func (s *LeaderRoutedStore) PutWithTTLAt(ctx context.Context, key []byte, value []byte, commitTS uint64, ...) error
- func (s *LeaderRoutedStore) Restore(buf io.Reader) error
- func (s *LeaderRoutedStore) ReverseScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
- func (s *LeaderRoutedStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
- func (s *LeaderRoutedStore) SetDurableAppliedIndex(idx uint64) error
- func (s *LeaderRoutedStore) Snapshot() (store.Snapshot, error)
- func (s *LeaderRoutedStore) WriteConflictCountsByPrefix() map[string]uint64
- type LeaseReadObserver
- type LeaseReadableCoordinator
- type LockResolver
- type OP
- type OperationGroup
- type PartitionResolver
- type ProposalObserver
- type RaftPayloadWrapper
- type RaftStatusProvider
- type RegistrationGate
- type RouteHistory
- type RouteSnapshot
- type ShardGroup
- func (g *ShardGroup) BeginCutoverBarrier() <-chan struct{}
- func (g *ShardGroup) EndCutoverBarrier()
- func (g *ShardGroup) Proposer() raftengine.Proposer
- func (g *ShardGroup) RaftPayloadWrap() RaftPayloadWrapper
- func (g *ShardGroup) SetRaftPayloadWrap(wrap RaftPayloadWrapper)
- func (g *ShardGroup) WaitInflightDrained(ctx context.Context) error
- type ShardRouter
- func (s *ShardRouter) Abort(ctx context.Context, reqs []*pb.Request) (*TransactionResponse, error)
- func (s *ShardRouter) Commit(ctx context.Context, reqs []*pb.Request) (*TransactionResponse, error)
- func (s *ShardRouter) Get(ctx context.Context, key []byte) ([]byte, error)
- func (s *ShardRouter) Register(group uint64, tm Transactional, st store.MVCCStore)
- func (s *ShardRouter) ResolveGroup(rawKey []byte) (uint64, bool)
- func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter
- type ShardStore
- func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, ...) error
- func (s *ShardStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, ...) error
- func (s *ShardStore) ApplyMutationsRaftAt(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, ...) error
- func (s *ShardStore) Close() error
- func (s *ShardStore) CommittedVersionAt(ctx context.Context, key []byte, commitTS uint64) (bool, error)
- func (s *ShardStore) Compact(ctx context.Context, minTS uint64) error
- func (s *ShardStore) DeleteAt(ctx context.Context, key []byte, commitTS uint64) error
- func (s *ShardStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
- func (s *ShardStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
- func (s *ShardStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, ...) error
- func (s *ShardStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, error)
- func (s *ShardStore) ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS uint64) error
- func (s *ShardStore) GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error)
- func (s *ShardStore) LastAppliedIndex() (uint64, bool, error)
- func (s *ShardStore) LastCommitTS() uint64
- func (s *ShardStore) LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error)
- func (s *ShardStore) PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, ...) error
- func (s *ShardStore) PutWithTTLAt(ctx context.Context, key []byte, value []byte, commitTS uint64, ...) error
- func (s *ShardStore) Restore(_ io.Reader) error
- func (s *ShardStore) ReverseScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
- func (s *ShardStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
- func (s *ShardStore) SetDurableAppliedIndex(idx uint64) error
- func (s *ShardStore) Snapshot() (store.Snapshot, error)
- func (s *ShardStore) WriteConflictCountsByPrefix() map[string]uint64
- type ShardedCoordinator
- func (c *ShardedCoordinator) Clock() *HLC
- func (c *ShardedCoordinator) Close() error
- func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
- func (c *ShardedCoordinator) EngineGroupIDForKey(key []byte) uint64
- func (c *ShardedCoordinator) IsLeader() bool
- func (c *ShardedCoordinator) IsLeaderForKey(key []byte) bool
- func (c *ShardedCoordinator) LeaseRead(ctx context.Context) (uint64, error)
- func (c *ShardedCoordinator) LeaseReadAllGroups(ctx context.Context) error
- func (c *ShardedCoordinator) LeaseReadForKey(ctx context.Context, key []byte) (uint64, error)
- func (c *ShardedCoordinator) LinearizableRead(ctx context.Context) (uint64, error)
- func (c *ShardedCoordinator) LinearizableReadForKey(ctx context.Context, key []byte) (uint64, error)
- func (c *ShardedCoordinator) RaftLeader() string
- func (c *ShardedCoordinator) RaftLeaderForKey(key []byte) string
- func (c *ShardedCoordinator) RunHLCLeaseRenewal(ctx context.Context)
- func (c *ShardedCoordinator) VerifyLeader(ctx context.Context) error
- func (c *ShardedCoordinator) VerifyLeaderForKey(ctx context.Context, key []byte) error
- func (c *ShardedCoordinator) WithLeaseReadObserver(observer LeaseReadObserver) *ShardedCoordinator
- func (c *ShardedCoordinator) WithPartitionResolver(r PartitionResolver) *ShardedCoordinator
- func (c *ShardedCoordinator) WithRegistrationGate(g *RegistrationGate) *ShardedCoordinator
- func (c *ShardedCoordinator) WithSampler(s keyviz.Sampler) *ShardedCoordinator
- type TransactionManager
- type TransactionOption
- type TransactionResponse
- type Transactional
- type TxnLockedError
- type TxnMeta
Constants ¶
const ( // DynamoTableMetaPrefix prefixes DynamoDB table metadata keys. DynamoTableMetaPrefix = "!ddb|meta|table|" // DynamoTableGenerationPrefix prefixes DynamoDB table generation keys. DynamoTableGenerationPrefix = "!ddb|meta|gen|" // DynamoItemPrefix prefixes DynamoDB item storage keys. DynamoItemPrefix = "!ddb|item|" // DynamoGSIPrefix prefixes DynamoDB GSI storage keys. DynamoGSIPrefix = "!ddb|gsi|" )
const HLCLogicalBits = hlcLogicalBits
HLCLogicalBits is the number of low bits in an HLC timestamp reserved for the in-memory logical counter (vs the upper bits which encode the Raft-agreed wall-clock millis). Exported so downstream tools — admin dashboard ISO-8601 formatting being the motivating case — can recover the physical half without hard-coding a magic number that silently drifts when the layout changes (Claude Issue 4 on PR #658).
const ( // TxnKeyPrefix is the common prefix shared by all transaction internal // key namespaces. All per-namespace prefixes below are derived from it. // NOTE: store/store.go duplicates this literal as txnInternalKeyPrefix // because an import cycle prevents store from importing kv. TxnKeyPrefix = "!txn|" )
const TxnMetaPrefix = txnMetaPrefix
TxnMetaPrefix is the key prefix used for transaction metadata mutations.
Variables ¶
var ( ErrTxnMetaMissing = errors.New("txn meta missing") ErrTxnInvalidMeta = errors.New("txn meta invalid") ErrTxnLocked = errors.New("txn locked") ErrTxnCommitTSRequired = errors.New("txn commit ts required") ErrTxnAlreadyCommitted = errors.New("txn already committed") ErrTxnAlreadyAborted = errors.New("txn already aborted") ErrTxnPrimaryKeyRequired = errors.New("txn primary key required") // ErrTxnDedupRequiresSingleShard is returned when a transaction request // carries OperationGroup.PrevCommitTS (the option-2 one-phase dedup probe // key) but its mutations or read keys span shards. The 2PC log builders // encode only CommitTS, so silently honoring such a request would drop // the probe at the FSM and let the original duplicate-elements anomaly // reappear. See codex P2 in PR #796 and the design doc. ErrTxnDedupRequiresSingleShard = errors.New("txn dedup (prev_commit_ts) requires a single-shard write set") // ErrTxnSecondaryRouteShiftedAfterPrimaryCommit is returned by // dispatchMultiShardTxn when a per-secondary commit (after primary // has durably committed) fails its M3 verifyComposed1 check — // i.e. the route catalog moved between primary-COMMIT and the // secondary-COMMIT, the secondary's FSM rejects with a Composed-1 // sentinel, and we cannot transparently recover (the prepared // lock lives at the old gid; the new owner per the catalog has no // commit record). The 2PC contract is half-broken at this point: // the primary's write is durable but at least one secondary's // write is missing. Surfacing this explicitly (rather than // swallowing per the original best-effort semantic OR silently // landing the write on a stale owner per a dropped-gate fix) is // the least bad outcome — the caller knows the txn state is // uncertain and can do application-level recovery. // // This is a DIFFERENT sentinel from ErrComposed1Violation by // design: the M4 retry path in dispatchTxnWithComposed1Retry // matches ErrComposed1Violation / ErrComposed1VersionGCd and // would otherwise loop here, re-prewriting against the same old // gid that already has the first attempt's prepared lock — pure // wasted work since the route catalog won't move backward. // codex P1 on 6202b964 (PR #900) raised the silent-partial-commit // hazard; codex P1 on d8487672 (PR #900) raised the symmetric // hazard of disabling the gate. This sentinel resolves both by // keeping the gate active and surfacing the error fatally. ErrTxnSecondaryRouteShiftedAfterPrimaryCommit = errors.New("txn secondary commit failed after primary commit: route catalog shifted") )
var ErrCeilingExpired = errors.New("hlc: physical ceiling expired (wall_now >= physicalCeiling); refusing to issue persistence timestamp")
ErrCeilingExpired is returned by HLC.NextFenced() when the Raft-agreed physical ceiling has expired — i.e. the wall clock has caught up to or passed the ceiling, meaning RunHLCLeaseRenewal has not applied a fresh ceiling within `hlcPhysicalWindowMs` of the current wall time. Callers MUST refuse to commit and propagate this to the client.
This implements HLC-4 precondition (iii) from docs/design/2026_05_28_partial_tla_safety_spec.md §5.1: every persistence-grade ts allocation is gated on `wall_now < physicalCeiling`. The TLA+ MCHLC_gap.cfg counterexample at depth 5 demonstrates the safety property this enforces.
Pre-bootstrap (ceiling == 0) is intentionally NOT fenced: there is no prior leader to protect against and tests / demo clusters need to issue ts before the first RunHLCLeaseRenewal cycle. Strict bootstrap fencing is a follow-up consideration; the current semantics match the spec wherever the prior-leader hazard is real.
var ErrComposed1VersionGCd = errors.New("composed-1: observed catalog version evicted from history ring; retry")
ErrComposed1VersionGCd is returned by verifyComposed1 when the txn's observed catalog version is no longer in the engine's retention ring — either because the FIFO ring evicted it (the txn lived longer than `routeHistoryDepth` versions worth of catalog churn) or because the version was never seen on this node. Surfaces to the coordinator as a retryable error: the caller's M4 retry path reads the current route cache and re-issues the txn with a fresh observedVer.
The not-found ⇒ hard-error semantics (rather than soft-pass) matters because a soft-pass would let the gate be bypassed exactly in the long-running-txn / high-churn cases where the cross-version-read hazard is most likely (design doc §4.3 + gemini medium + codex P2 on PR #870).
var ErrComposed1Violation = errors.New("composed-1: route ownership shifted; retry on new owning group")
ErrComposed1Violation is returned by verifyComposed1 when the transaction's commit cannot proceed on this Raft group because the txn's read-set or write-set keys are not owned by this group at either the txn's observed catalog version (the spec-level §4.2(a) check) or the current catalog version observed by the FSM at apply time (the §4.4 cross-version-read fence). Surfaces to the coordinator as a retryable error: the M4 coordinator path re-reads the route cache, re-routes the txn, and re-issues it once on the new owning group.
Wrapped with errors.Wrapf at the call site to carry the per-key diagnostic (which key, which observed-version owner, which current-version owner) — the caller's retry path uses errors.Is(err, ErrComposed1Violation) to match.
var ErrCrossShardMutationBatchNotSupported = errors.New("cross-shard mutation batches are not supported")
var ErrCrossShardTransactionNotSupported = errors.New("cross-shard transactions are not supported")
var ErrInvalidRequest = errors.New("invalid request")
var ErrLeaderNotFound = errors.New("leader not found")
var ErrNotImplemented = errors.New("not implemented")
var ErrSnapshotHeaderInvalidLength = errors.New("snapshot header: invalid v2 length prefix")
ErrSnapshotHeaderInvalidLength indicates a v2 header whose length prefix is below the required minimum (cannot hold ceiling+cutover) or above the maxSnapshotHeaderPayload DoS bound.
var ErrSnapshotHeaderUnknownMagic = errors.New("snapshot header: unknown EKVTHLC* magic")
ErrSnapshotHeaderUnknownMagic indicates an EKVTHLC* magic whose version byte the current binary does not recognise. The restore fail-closes; the operator must upgrade.
var ErrUnknownRequestType = errors.New("unknown request type")
Functions ¶
func EncodeTxnMeta ¶
func ExtractTxnUserKey ¶
ExtractTxnUserKey returns the logical user key embedded in a transaction- internal key such as !txn|lock|, !txn|cmt|, or !txn|meta|. It returns nil when the key does not use a transaction-internal namespace or is malformed.
func LeaseReadAllGroupsThrough ¶
func LeaseReadAllGroupsThrough(c Coordinator, ctx context.Context) error
LeaseReadAllGroupsThrough establishes the lease freshness bound across every shard group a multi-shard read can touch. When the coordinator owns multiple groups (AllGroupsLeaseReadableCoordinator) it fences all of them; otherwise it falls back to the single-group LeaseRead path so a single-group deployment still issues exactly one lease read. Adapter call sites use this for keyless reads (Scan, whole-table/GSI Query fallback) that the per-key LeaseReadForKey cannot route to one group.
func LeaseReadForKeyThrough ¶
LeaseReadForKeyThrough is the key-routed counterpart of LeaseReadThrough.
func LeaseReadGroupKeys ¶
func LeaseReadGroupKeys(c Coordinator, keys [][]byte) [][]byte
LeaseReadGroupKey returns a representative key per distinct owning group for the supplied keys, so callers can issue one lease read per group rather than one per key. The returned slice preserves the order of first appearance. When the coordinator does not implement GroupRoutableCoordinator (single-group deployments) every distinct key is returned unchanged so the caller's per-key dedup still bounds the work. Keys that cannot be routed (group ID 0) are never collapsed — each is kept as its own representative so the lease check still runs and surfaces the routing failure.
func LeaseReadThrough ¶
func LeaseReadThrough(c Coordinator, ctx context.Context) (uint64, error)
LeaseReadThrough is a helper that calls LeaseRead when the coordinator supports it, falling back to LinearizableRead otherwise. Adapter call sites use this so they don't have to repeat the type-assertion dance.
func MaxLatestCommitTS ¶
MaxLatestCommitTS returns the maximum commit timestamp for the provided keys.
Missing keys are ignored. If any LatestCommitTS lookup returns an error, the error is returned to the caller.
func NewTxnLockedError ¶
func PrimaryKeyForElems ¶
PrimaryKeyForElems returns the primary key the coordinator derives for a single-shard one-phase txn over elems — the lexicographically smallest write key. Adapters that implement option-2 one-phase dedup must probe this exact key (it becomes the FSM's meta.PrimaryKey) so the adapter-side self-inflicted-conflict guard agrees with dedupProbeOnePhase. See docs/design/2026_06_03_partial_dynamodb_onephase_dedup.md (R4).
func ReadSnapshotHeader ¶
ReadSnapshotHeader is the §3.2 read path. The caller wraps the input io.Reader in a *bufio.Reader and passes it here once, then MUST reuse the same *bufio.Reader for the inner-store restore — buffered bytes can sit in the bufio.Reader between calls; switching readers silently loses them.
Return contract:
- v1 magic: (ceiling, 0, nil), magic + ceiling consumed.
- v2 magic: (ceiling, cutover, nil), full v2 header consumed; trailing bytes above the parsed fields (forward-compat extension area) are consumed and discarded.
- EKVTHLC* with an unknown version byte: (0, 0, ErrSnapshotHeaderUnknownMagic) — fail-closed, restore aborts.
- v2 magic with a malformed length prefix: (0, 0, ErrSnapshotHeaderInvalidLength) — fail-closed.
- Anything else (including streams shorter than 8 bytes): (0, 0, nil) and ALL bytes left in the *bufio.Reader for the inner-store path.
Types ¶
type ActiveTimestampToken ¶
type ActiveTimestampToken struct {
// contains filtered or unexported fields
}
ActiveTimestampToken releases one tracked timestamp when the owning operation completes.
func (*ActiveTimestampToken) Release ¶
func (t *ActiveTimestampToken) Release()
type ActiveTimestampTracker ¶
type ActiveTimestampTracker struct {
// contains filtered or unexported fields
}
ActiveTimestampTracker tracks in-flight read or transaction timestamps that must remain readable while background compaction is running.
func NewActiveTimestampTracker ¶
func NewActiveTimestampTracker() *ActiveTimestampTracker
func (*ActiveTimestampTracker) Oldest ¶
func (t *ActiveTimestampTracker) Oldest() uint64
func (*ActiveTimestampTracker) Pin ¶
func (t *ActiveTimestampTracker) Pin(ts uint64) *ActiveTimestampToken
type AllGroupsLeaseReadableCoordinator ¶
type AllGroupsLeaseReadableCoordinator interface {
// LeaseReadAllGroups establishes the lease freshness bound on every
// group the coordinator owns, failing closed on the first group that
// cannot confirm its lease. The freshness bound is what a multi-shard
// read relies on, so a returned error MUST abort the read.
LeaseReadAllGroups(ctx context.Context) error
}
AllGroupsLeaseReadableCoordinator is the optional capability implemented by coordinators that own more than one Raft group and can establish the lease freshness bound on EVERY group in a single call. Multi-shard read handlers (Scan, GSI/whole-table Query) need this because the underlying scan visits all intersecting routes across all groups, whereas the plain LeaseRead only fences the default group. Single-group coordinators do not implement it: LeaseReadAllGroupsThrough falls back to LeaseRead so they still issue exactly one lease read.
type Coordinate ¶
type Coordinate struct {
// contains filtered or unexported fields
}
func NewCoordinatorWithEngine ¶
func NewCoordinatorWithEngine(txm Transactional, engine raftengine.Engine, opts ...CoordinatorOption) *Coordinate
func (*Coordinate) Clock ¶
func (c *Coordinate) Clock() *HLC
func (*Coordinate) Close ¶
func (c *Coordinate) Close() error
Close releases any engine-side registrations (currently the leader-loss callback) held by this Coordinate. It is safe to call on a nil receiver and multiple times. Owners whose lifetime matches the engine's do not need to call Close; owners who discard the Coordinate before closing the engine MUST.
func (*Coordinate) Dispatch ¶
func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
func (*Coordinate) EngineGroupIDForKey ¶
func (c *Coordinate) EngineGroupIDForKey(_ []byte) uint64
EngineGroupIDForKey makes Coordinate satisfy GroupRoutableCoordinator. A Coordinate fronts exactly one Raft group, so every key maps to the same group and batched lease checks collapse to a single read.
func (*Coordinate) IsLeader ¶
func (c *Coordinate) IsLeader() bool
func (*Coordinate) IsLeaderAcceptingWrites ¶
func (c *Coordinate) IsLeaderAcceptingWrites() bool
IsLeaderAcceptingWrites reports whether this node is leader and not currently transferring leadership. Background proposers should gate on this to avoid piling up dropped proposals while a transfer is in flight.
func (*Coordinate) IsLeaderForKey ¶
func (c *Coordinate) IsLeaderForKey(_ []byte) bool
func (*Coordinate) LeaseRead ¶
func (c *Coordinate) LeaseRead(ctx context.Context) (uint64, error)
LeaseRead returns a read fence backed by a leader-local lease when available, falling back to a full LinearizableRead when no fast path is live or the engine does not implement LeaseProvider.
The PRIMARY lease path is maintained inside the engine from ongoing MsgAppResp / MsgHeartbeatResp traffic, so that path does not rely on callers sampling time.Now() before the slow path to "extend" a lease afterwards. The earlier pre-read sampling was racy under congestion: if a LinearizableRead took longer than LeaseDuration, the extension would land already expired and the lease never warmed up. The engine-driven anchor is refreshed every heartbeat independent of read latency.
The SECONDARY caller-side lease remains as a rollout fallback, still populated by the original pre-read sampling; it covers the narrow window between startup and the first quorum heartbeat round landing on the engine.
The returned index is the engine's current applied index (fast path) or the index returned by LinearizableRead (slow path). Callers that resolve timestamps via store.LastCommitTS may discard the value.
func (*Coordinate) LeaseReadForKey ¶
func (*Coordinate) LinearizableRead ¶
func (c *Coordinate) LinearizableRead(ctx context.Context) (uint64, error)
func (*Coordinate) LinearizableReadForKey ¶
func (*Coordinate) ProposeHLCLease ¶
func (c *Coordinate) ProposeHLCLease(ctx context.Context, ceilingMs int64) error
ProposeHLCLease proposes a new physical ceiling to the Raft cluster. Only the current leader should call this; followers silently ignore proposals from non-leaders via Raft's leader-only write guarantee.
A successful propose is a quorum-acked Raft commit, exactly the same confirmation Dispatch relies on, so it also warms the leader-local read lease. The lease-extension base (dispatchStart) and the invalidation generation are sampled BEFORE the propose, mirroring refreshLeaseAfterDispatch: the window can only ever be SHORTER than the true safety window, and a leader-loss callback that fires during the propose advances the generation so extend refuses to resurrect a stale lease. This is the background warm-up that flattens the read-only lease-expiry sawtooth on idle-write workloads -- no extra goroutine, no change to the lease window/duration semantics.
On a leadership-loss propose error the lease is invalidated eagerly, mirroring refreshLeaseAfterDispatch's error branch exactly: when Propose returns the loss before the async RegisterLeaderLossCallback fires, a stale-warm lease must not survive on a non-leader node for the callback latency window. Non-leadership errors (no quorum, validation) are NOT leadership signals and must not tear down a warm lease -- doing so would force every read onto the slow path.
func (*Coordinate) RaftLeader ¶
func (c *Coordinate) RaftLeader() string
RaftLeader returns the current leader's address as known by this node.
func (*Coordinate) RaftLeaderForKey ¶
func (c *Coordinate) RaftLeaderForKey(_ []byte) string
func (*Coordinate) RunHLCLeaseRenewal ¶
func (c *Coordinate) RunHLCLeaseRenewal(ctx context.Context)
RunHLCLeaseRenewal runs a background loop that periodically proposes a new physical ceiling to the Raft cluster while this node is the leader.
The ceiling is set to now + hlcPhysicalWindowMs (3 s) and is renewed every hlcRenewalInterval (1 s), mirroring TiDB's TSO window strategy. Because the window is always at least 2 s ahead of any real timestamp, a new leader will never issue timestamps that overlap with the previous leader's window.
RunHLCLeaseRenewal blocks until ctx is cancelled; call it in a goroutine.
func (*Coordinate) VerifyLeader ¶
func (c *Coordinate) VerifyLeader(ctx context.Context) error
func (*Coordinate) VerifyLeaderForKey ¶
func (c *Coordinate) VerifyLeaderForKey(ctx context.Context, _ []byte) error
type CoordinateResponse ¶
type CoordinateResponse struct {
CommitIndex uint64
}
type Coordinator ¶
type Coordinator interface {
Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
IsLeader() bool
VerifyLeader(ctx context.Context) error
LinearizableRead(ctx context.Context) (uint64, error)
RaftLeader() string
IsLeaderForKey(key []byte) bool
VerifyLeaderForKey(ctx context.Context, key []byte) error
RaftLeaderForKey(key []byte) string
Clock() *HLC
}
type CoordinatorOption ¶
type CoordinatorOption func(*Coordinate)
CoordinatorOption is a functional option for Coordinate constructors.
func WithHLC ¶
func WithHLC(hlc *HLC) CoordinatorOption
WithHLC sets a pre-created HLC on the coordinator. Use this together with NewKvFSMWithHLC so the FSM and coordinator share the same clock instance: the FSM advances physicalCeiling on every applied HLC lease entry, and the coordinator reads it inside Next() to floor new timestamps above the previous leader's committed window.
func WithLeaseReadObserver ¶
func WithLeaseReadObserver(observer LeaseReadObserver) CoordinatorOption
WithLeaseReadObserver wires a LeaseReadObserver onto a Coordinate. This is the mechanism monitoring uses to surface the lease-hit ratio panel on the Redis hot-path dashboard (see the "Hot Path" row in monitoring/grafana/dashboards/elastickv-redis-summary.json).
Typed-nil guard: a caller passing a typed-nil pointer (e.g. `var o *myObserver; WithLeaseReadObserver(o)`) produces an interface value that is NOT equal to nil under the normal `!= nil` check, yet invoking ObserveLeaseRead would panic. Normalise here with reflect.Value.IsNil so the hot-path nil check in LeaseRead stays a single branch on a real nil interface.
type CutoverSource ¶
type CutoverSource interface {
RaftEnvelopeCutoverIndex() uint64
}
CutoverSource is the writer-side view of the Phase-2 envelope cutover. kvFSMSnapshot consults it once per snapshot to decide v1 vs v2 layout. A nil source means "always v1" (matches the Phase-0/Phase-1 posture and every pre-8a code path).
type EncryptionApplier ¶
type EncryptionApplier interface {
ApplyRegistration(p fsmwire.RegistrationPayload) error
ApplyBootstrap(raftIdx uint64, p fsmwire.BootstrapPayload) error
ApplyRotation(raftIdx uint64, p fsmwire.RotationPayload) error
}
EncryptionApplier owns the side-effects an encryption FSM entry must persist on apply: keystore mutation, sidecar update, and writer-registry insert. Stage 4 ships the dispatch seam and HaltApply propagation; Stage 5/6/7 will provide a concrete implementation that
- KEK-unwraps the wrapped DEK and calls Keystore.Set
- mutates the local sidecar (Active.{Storage,Raft}, keys map, raft_envelope_cutover_index) via the §5.1 crash-durable WriteSidecar protocol
- inserts writer-registry rows under the §4.1 `!encryption|writers|<dek_id>|<uint16(node_id)>` Pebble key
The separation lets Stage 4 land the byte-tag dispatch + halt machinery without depending on the Stage 7 writer-registry storage layer or the Stage 5 admin RPC plumbing.
All three methods may return an error wrapped with encryption.ErrEncryptionApply to halt the apply loop. The kvFSM dispatcher converts any non-nil return into a haltApplyResponse so internal/raftengine/etcd's HaltApply seam recognises it.
The raftIdx parameter on ApplyBootstrap and ApplyRotation is the Raft entry index of the entry being applied. The applier persists this as sidecar.RaftAppliedIndex inside the same WriteSidecar fsync that mutates the keys[] map, so the §9.1 ErrSidecarBehindRaftLog startup guard can compare the sidecar's last-witnessed index against the engine's AppliedIndex on the next process start. ApplyRegistration does NOT take an index because writer-registry inserts do not touch the sidecar (§5.5 OpRegistration is intentionally excluded from the audit predicate).
type FSM ¶
type FSM interface {
raftengine.StateMachine
}
func NewKvFSMWithHLC ¶
NewKvFSMWithHLC creates a KV FSM that updates hlc.physicalCeiling whenever a HLC lease entry is applied. The caller must pass the same *HLC instance to the coordinator so both sides share the agreed physical ceiling.
Optional FSMOption arguments configure additional handlers (see WithEncryption). Existing callers without options keep the pre-Stage-4 behaviour byte-for-byte.
type FSMCompactRuntime ¶
type FSMCompactRuntime struct {
GroupID uint64
StatusReader RaftStatusProvider
Store store.MVCCStore
}
type FSMCompactor ¶
type FSMCompactor struct {
// contains filtered or unexported fields
}
func NewFSMCompactor ¶
func NewFSMCompactor(runtimes []FSMCompactRuntime, opts ...FSMCompactorOption) *FSMCompactor
type FSMCompactorOption ¶
type FSMCompactorOption func(*FSMCompactor)
func WithFSMCompactorActiveTimestampTracker ¶
func WithFSMCompactorActiveTimestampTracker(tracker *ActiveTimestampTracker) FSMCompactorOption
func WithFSMCompactorInterval ¶
func WithFSMCompactorInterval(interval time.Duration) FSMCompactorOption
func WithFSMCompactorLogger ¶
func WithFSMCompactorLogger(logger *slog.Logger) FSMCompactorOption
func WithFSMCompactorRetentionWindow ¶
func WithFSMCompactorRetentionWindow(window time.Duration) FSMCompactorOption
func WithFSMCompactorTimeout ¶
func WithFSMCompactorTimeout(timeout time.Duration) FSMCompactorOption
type FSMOption ¶
type FSMOption func(*kvFSM)
FSMOption configures a *kvFSM at construction. Stage 4 introduces WithEncryption; future stages may add more.
func WithCutoverSource ¶
func WithCutoverSource(src CutoverSource) FSMOption
WithCutoverSource installs the Stage 8a §3.3 writer-side view of the Phase-2 envelope cutover index. The FSM consults it once per Snapshot call to decide v1 vs v2 layout. Pass nil (or omit the option) to keep the pre-8a posture where every snapshot is v1.
func WithEncryption ¶
func WithEncryption(applier EncryptionApplier) FSMOption
WithEncryption installs the EncryptionApplier the kvFSM dispatches opcodes 0x03 / 0x04 / 0x05 to. Pass nil (or omit the option entirely) to leave the FSM in the Stage-4-default fail-closed state where any encryption opcode halts the apply loop via ErrEncryptionApply.
func WithRouteHistory ¶
func WithRouteHistory(routes RouteHistory, shardGroupID uint64) FSMOption
WithRouteHistory installs the M2 Composed-1 versioned-snapshot provider and the FSM's owning shard group ID. Both fields are consumed by the M3 verifyComposed1 gate. At M2 the values are stored but not consulted (M3 wires the check); a caller that constructs a kvFSM without this option remains "unpinned" — the M3 gate will short-circuit and behave exactly like the pre-feature FSM.
shardGroupID MUST match the Raft group ID this FSM serves — the gate uses it as the "this group" value when comparing against the historical owner-of-key resolution. Zero is reserved for the not-wired case.
See docs/design/2026_05_29_partial_composed1_cross_group_commit_guard.md §M2 + §4.2 prerequisite block.
type GRPCConnCache ¶
type GRPCConnCache struct {
// contains filtered or unexported fields
}
GRPCConnCache reuses gRPC connections per address. gRPC itself handles reconnection on transient failures; we only force a re-dial if the conn has already been closed (Shutdown).
func (*GRPCConnCache) Close ¶
func (c *GRPCConnCache) Close() error
func (*GRPCConnCache) ConnFor ¶
func (c *GRPCConnCache) ConnFor(addr string) (*grpc.ClientConn, error)
type GroupRoutableCoordinator ¶
type GroupRoutableCoordinator interface {
// EngineGroupIDForKey returns the owning group ID, or 0 when the
// key cannot be routed.
EngineGroupIDForKey(key []byte) uint64
}
GroupRoutableCoordinator is the optional capability implemented by coordinators that can resolve the owning Raft group of a key without any I/O. Callers that need to lease-check a set of keys use it to deduplicate by group: keys that resolve to the same group share one lease read instead of issuing one per key. Single-group coordinators do not implement it, so callers must fall back to per-key dedup.
type HLC ¶
type HLC struct {
// contains filtered or unexported fields
}
HLC implements a hybrid logical clock where the physical part is agreed upon via Raft consensus and the logical counter is managed purely in memory.
Layout (ms | logical):
high 48 bits: wall clock milliseconds since Unix epoch ← Raft-agreed physical part low 16 bits : logical counter ← in-memory only
Physical ceiling (前半, consensus):
The leader periodically commits a HLC lease entry to the Raft log that establishes an upper bound for the physical timestamp (physicalCeiling). All nodes apply this entry via the FSM, advancing their local ceiling. When a new leader is elected it inherits the committed ceiling from the FSM state so it never issues timestamps below the previous leader's window.
Logical counter (後半, memory):
The 16-bit counter increments purely in memory on every Next() call within the same millisecond. It resets to 0 whenever wall time advances, and overflows by bumping the wall millisecond by one. No Raft round-trip is needed for logical counter advancement.
func (*HLC) Current ¶
Current returns the last issued or observed HLC value without advancing it. If no timestamp has been generated yet, it returns 0.
func (*HLC) Next ¶
Next returns the next hybrid logical timestamp, ignoring the HLC-4 physical-ceiling fence. Kept for non-persistence callers (diagnostics, identifiers, retry IDs, tests, demo wiring) and for adapter sites whose fail-closed migration is not yet completed.
NEW persistence-grade allocations (everything that ends up as a startTS / commitTS, an MVCC write timestamp, or a lease/expiry boundary) MUST go through NextFenced instead — Next bypasses the HLC-4 (iii) ceiling fence and can therefore issue a timestamp inside a stale leader window after lease renewal stops.
Physical part (upper 48 bits): derived from the wall clock, but never less than the Raft-agreed physicalCeiling so that a newly elected leader cannot issue timestamps that collide with the previous leader's window.
Logical part (lower 16 bits): a pure in-memory counter that increments without any Raft round-trip. It resets to 0 whenever the physical millisecond advances and overflows by bumping the physical millisecond by one.
func (*HLC) NextFenced ¶
NextFenced returns the next hybrid logical timestamp with the HLC-4 (iii) physical-ceiling fence enforced. ALL persistence-grade allocations (startTS, commitTS, MVCC write ts, lease/expiry bounds) MUST go through this entry point so that an expired-ceiling allocation fails closed instead of silently issuing a ts that could collide with a subsequent leader's window after renewal catches up.
Fence semantics:
- ceiling == 0 (pre-bootstrap, no prior leader): no fence, identical to Next.
- ceiling > 0 AND wall_now >= ceiling: returns (0, ErrCeilingExpired).
- ceiling > 0 AND wall_now < ceiling: floor wall at ceiling, then proceed.
The TLA+ proof for this lives in tla/hlc/MCHLC_gap.cfg (HLC-4 counterexample, depth 5) — see docs/design/2026_05_28_partial_tla_safety_spec.md §5.1.
func (*HLC) NextFencedRejections ¶
NextFencedRejections returns the cumulative count of NextFenced() calls that returned ErrCeilingExpired since process start. The monitoring layer reads this on a fixed interval and exports it as a Prometheus counter so operators can alert on the rate.
A non-zero value here means the HLC-4 (i) bounded-skew assumption (`MaxClockSkewMs < HlcPhysicalWindowMs` — see docs/design/2026_05_28_partial_tla_safety_spec.md §5.1) has at some point been violated by enough margin that wall_now caught up to physicalCeiling and the fence fired — typically because the leader's lease renewal stopped (network partition, GC pause, …) for longer than `hlcPhysicalWindowMs`.
func (*HLC) PhysicalCeiling ¶
PhysicalCeiling returns the last Raft-committed physical ceiling in Unix milliseconds. Returns 0 if no ceiling has been established yet.
func (*HLC) SetPhysicalCeiling ¶
SetPhysicalCeiling atomically advances the Raft-agreed physical ceiling. It is called by the FSM whenever a HLC lease entry is applied to the log. The ceiling is monotonically increasing: calls with a smaller value are silently ignored.
type LeaderProxy ¶
type LeaderProxy struct {
// contains filtered or unexported fields
}
LeaderProxy forwards transactional requests to the current raft leader when the local node is not the leader.
func NewLeaderProxyForShardGroup ¶
func NewLeaderProxyForShardGroup(g *ShardGroup, opts ...TransactionOption) *LeaderProxy
NewLeaderProxyForShardGroup wires a LeaderProxy whose proposer consults g.raftPayloadWrap on every call, so SetRaftPayloadWrap becomes the hot-swap surface for the raft envelope cutover.
Use this in preference to NewLeaderProxyWithEngine(g.Engine, ...) for any ShardGroup that participates in the encryption cutover pipeline — without the dynamic wrap, post-cutover writes would land cleartext at index > cutoverIndex and halt the apply loop on strict-> unwrap. The non-wrap-aware constructor remains as a convenience for shard groups that opt out of encryption (test fixtures, transient groups).
Contract: g MUST be non-nil. The constructor takes the address of g.raftPayloadWrap so a nil receiver is an immediate nil deref — caller bug, not silently swallowed. Returning nil here would only defer the panic to the first sg.Txn.Commit call site (see main.go's buildShardGroups), with worse diagnostics; CLAUDE.md's "don't validate for scenarios that can't happen at internal boundaries" applies.
func NewLeaderProxyWithEngine ¶
func NewLeaderProxyWithEngine(engine raftengine.Engine, opts ...TransactionOption) *LeaderProxy
func (*LeaderProxy) Abort ¶
func (p *LeaderProxy) Abort(ctx context.Context, reqs []*pb.Request) (*TransactionResponse, error)
func (*LeaderProxy) Close ¶
func (p *LeaderProxy) Close() error
func (*LeaderProxy) Commit ¶
func (p *LeaderProxy) Commit(ctx context.Context, reqs []*pb.Request) (*TransactionResponse, error)
type LeaderRoutedStore ¶
type LeaderRoutedStore struct {
// contains filtered or unexported fields
}
LeaderRoutedStore is an MVCCStore wrapper that serves reads from the local store only when leadership is verified; otherwise it proxies reads to the current leader via gRPC.
This is intended for single-raft-group deployments where the underlying store itself is not leader-aware (e.g. *store.MVCCStore).
Writes and maintenance operations are delegated to the local store.
func NewLeaderRoutedStore ¶
func NewLeaderRoutedStore(local store.MVCCStore, coordinator Coordinator) *LeaderRoutedStore
func (*LeaderRoutedStore) ApplyMutations ¶
func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error
func (*LeaderRoutedStore) ApplyMutationsRaft ¶
func (s *LeaderRoutedStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error
ApplyMutationsRaft forwards to the local store's raft-apply variant. See store.MVCCStore for the durability contract.
func (*LeaderRoutedStore) ApplyMutationsRaftAt ¶
func (s *LeaderRoutedStore) ApplyMutationsRaftAt(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS, appliedIndex uint64) error
ApplyMutationsRaftAt forwards to the local store's raft-entry-index- aware variant so the underlying pebbleStore can bundle metaAppliedIndex with the mutation. See PR #910 design §2.
func (*LeaderRoutedStore) Close ¶
func (s *LeaderRoutedStore) Close() error
func (*LeaderRoutedStore) CommittedVersionAt ¶
func (s *LeaderRoutedStore) CommittedVersionAt(ctx context.Context, key []byte, commitTS uint64) (bool, error)
CommittedVersionAt gates the exact-timestamp existence probe so client reads through this wrapper get a fresh authoritative answer even on a deposed leader. The FSM apply path is NOT affected — it holds the raw local store (not a LeaderRoutedStore), so its deterministic probe never goes through this method. The option-2 reuse path (RedisServer.resolveReuseLength) DOES call this and needs the authoritative answer to preserve the pending.length fast-path (returning the per-our-commit length rather than the leader's current Len) when our prior attempt actually committed.
Two-path strategy, mirroring how LatestCommitTS uses a lease fast-path and a proxy slow-path:
- We are the leader with a valid lease (leaderOKForKey is true): the local replica is up-to-date by the lease invariant; read local.
- Not leader (deposed or never): there is no RawCommittedVersionAt RPC to proxy to, so use the coordinator's LinearizableRead to submit a Raft ReadIndex — that protocol forwards to the current leader and waits until our local applied index has caught up to the leader's commit point. After that, a local probe sees every committed version of this key (including any landed at commitTS). If the read-index fails (no leader reachable, ctx canceled), fall back to (false, nil); the adapter's resolveReuseLength then re-reads via the already-leader-fenced ScanAt/GetAt, returning the leader's current Len — a valid serialization, just not the per-our-commit value.
func (*LeaderRoutedStore) Compact ¶
func (s *LeaderRoutedStore) Compact(ctx context.Context, minTS uint64) error
func (*LeaderRoutedStore) DeletePrefixAt ¶
func (*LeaderRoutedStore) DeletePrefixAtRaft ¶
func (s *LeaderRoutedStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
DeletePrefixAtRaft forwards to the local store's raft-apply variant.
func (*LeaderRoutedStore) DeletePrefixAtRaftAt ¶
func (s *LeaderRoutedStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, appliedIndex uint64) error
DeletePrefixAtRaftAt forwards to the local store's raft-entry- index-aware variant. See PR #910 design §2 "why both leaves".
func (*LeaderRoutedStore) GlobalLastCommitTS ¶
func (s *LeaderRoutedStore) GlobalLastCommitTS(ctx context.Context) uint64
GlobalLastCommitTS returns the most recently committed HLC timestamp from the authoritative leader. On the leader this is the local LastCommitTS. On a follower the method issues a lightweight RPC (RawLatestCommitTS with an empty key) so callers obtain a non-stale snapshot — critical for ConsistentRead semantics where followers must not serve reads at a stale local watermark. Falls back to the local LastCommitTS on any error.
func (*LeaderRoutedStore) LastAppliedIndex ¶
func (s *LeaderRoutedStore) LastAppliedIndex() (uint64, bool, error)
LastAppliedIndex forwards to the local store when it implements raftengine.AppliedIndexReader. Defensive: in production today the kvFSM holds a *pebbleStore directly (not a LeaderRoutedStore — that wrapper is used by adapter/server code for read routing, not by the FSM apply path); so this forward is currently dead code for the cold-start skip optimisation. We add it anyway because future refactors might wrap the FSM's store, and a silent no-op there would degrade the optimisation to full-restore-always with no failure signal.
(0, false, nil) returns are the strictly-additive fallback — either the wrapper has no local, the local does not implement the reader, or the local reports missing/truncated. The caller in internal/raftengine/etcd/wal_store.go (Branch 3) treats all of these as "fall back to full restore", which is correct.
func (*LeaderRoutedStore) LastCommitTS ¶
func (s *LeaderRoutedStore) LastCommitTS() uint64
func (*LeaderRoutedStore) LatestCommitTS ¶
func (*LeaderRoutedStore) PutWithTTLAt ¶
func (*LeaderRoutedStore) ReverseScanAt ¶
func (*LeaderRoutedStore) SetDurableAppliedIndex ¶
func (s *LeaderRoutedStore) SetDurableAppliedIndex(idx uint64) error
SetDurableAppliedIndex forwards to the local store when it implements raftengine.AppliedIndexWriter. Symmetric defensive no-op when the local store does not expose the writer seam — see LastAppliedIndex doc-comment.
func (*LeaderRoutedStore) WriteConflictCountsByPrefix ¶
func (s *LeaderRoutedStore) WriteConflictCountsByPrefix() map[string]uint64
WriteConflictCountsByPrefix delegates to the local MVCC store. The leader-routed wrapper does not add cross-group conflict detection of its own, so the node-local view IS the authoritative view.
type LeaseReadObserver ¶
type LeaseReadObserver interface {
// ObserveLeaseRead is called with hit=true when the lease fast path
// served the read from local AppliedIndex, or hit=false when the
// coordinator fell back to a full LinearizableRead (expired lease,
// engine reported non-leader, or leader-loss callback raced with
// the request).
ObserveLeaseRead(hit bool)
}
LeaseReadObserver records lease-read fast-path vs slow-path outcomes without coupling kv to a concrete monitoring backend. It is called once per LeaseRead invocation that actually evaluates the lease (the initial type-assertion/LeaseDuration==0 short-circuits are NOT counted because they indicate the engine does not participate in lease reads at all).
Implementations MUST be safe for concurrent use and MUST NOT block; the observer is invoked on the Redis GET hot path.
type LeaseReadableCoordinator ¶
type LeaseReadableCoordinator interface {
LeaseRead(ctx context.Context) (uint64, error)
LeaseReadForKey(ctx context.Context, key []byte) (uint64, error)
}
LeaseReadableCoordinator is the optional capability implemented by coordinators that participate in the leader-local lease read path (see docs/design/2026_04_20_implemented_lease_read.md). Callers that want lease reads should type-assert to this interface and fall back to LinearizableRead when the assertion fails, following the same pattern as raftengine.LeaseProvider. Keeping the lease methods OFF the Coordinator interface avoids breaking existing external implementations that predate the lease-read feature.
type LockResolver ¶
type LockResolver struct {
// contains filtered or unexported fields
}
LockResolver periodically scans for expired transaction locks and resolves them. This handles the case where secondary commit fails and leaves orphaned locks that no read path would discover (e.g., cold keys).
func NewLockResolver ¶
func NewLockResolver(ss *ShardStore, groups map[uint64]*ShardGroup, log *slog.Logger) *LockResolver
NewLockResolver creates and starts a background lock resolver.
func (*LockResolver) Close ¶
func (lr *LockResolver) Close()
Close stops the background resolver and waits for it to finish.
type OperationGroup ¶
type OperationGroup[T OP] struct { Elems []*Elem[T] IsTxn bool // StartTS is a logical timestamp captured at transaction begin. // It is ignored for non-transactional groups. StartTS uint64 // CommitTS optionally pins the transaction commit timestamp. // Coordinators choose one automatically when this is zero. CommitTS uint64 // PrevCommitTS carries the commit timestamp of a failed previous attempt // of the same single-shard transaction (option-2 one-phase idempotency // dedup). It is set only on a retry that reuses the prior attempt's write // set, and only flows to the one-phase apply path, where the FSM probes // whether that attempt already landed and no-ops the apply if so. Zero on // first attempts and on every non-retry caller. See // docs/design/2026_05_21_proposed_txn_secondary_idempotency.md. PrevCommitTS uint64 // ReadKeys carries the transaction's read set so the FSM can validate // read-write conflicts atomically with the commit. ReadKeys [][]byte // ObservedRouteVersion is the durable catalog version this // transaction's read set was captured at (typically set on // BeginTxn from distribution.Engine.Version()). Zero means // "unpinned" — every existing caller leaves it at zero so this // is behaviour-neutral on the M1 plumbing PR. M3 of the // Composed-1 design // (docs/design/2026_05_29_partial_composed1_cross_group_commit_guard.md) // will gate the FSM apply path on this version so a route shift // between BeginTxn and Commit is caught before it can produce a // G1c anomaly across a cross-group MoveRange / SplitRange. ObservedRouteVersion uint64 }
OperationGroup is a group of operations that should be executed atomically.
type PartitionResolver ¶
type PartitionResolver interface {
ResolveGroup(key []byte) (uint64, bool)
// RecognisesPartitionedKey reports whether the key SHAPE is
// one this resolver is responsible for. Implementations
// answer based on prefix / structural inspection only — the
// answer must NOT depend on any in-memory mapping that could
// drift out of sync, otherwise the router cannot reliably
// fail-closed for unresolved-but-recognised keys.
RecognisesPartitionedKey(key []byte) bool
}
PartitionResolver maps a key to its owning Raft group when the key belongs to a partition-scheme keyspace (e.g. SQS HT-FIFO, where each (queue, partition) pair lives on a different group). ShardRouter consults the resolver before falling through to the byte-range engine, so partition routing can override the default shard-range layout without breaking the engine's non-overlapping- cover invariant.
Implementations must be safe for concurrent use — ResolveGroup is called on the request hot path. Returning (0, false) for a key the resolver does not recognise lets the router fall through to the engine. Returning (0, false) for a key the resolver DOES recognise (the partitioned shape matches but the queue is not in the map, or the partition is out of range) is also valid; the router uses RecognisesPartitionedKey to distinguish "not partitioned, fall through" from "partitioned but unresolved, fail closed".
The fail-closed split matters under partition-map drift / a partial rollout: without it, an unresolved partitioned key would silently land on the engine's SQS-catalog default group (because routeKey normalises every !sqs|... key to !sqs|route|global) instead of surfacing a routing error.
type ProposalObserver ¶
type ProposalObserver interface {
ObserveProposalFailure()
}
ProposalObserver records raft proposal failures for operational metrics.
type RaftPayloadWrapper ¶
RaftPayloadWrapper transforms an FSM payload into a §4.2 raft envelope just before submission to the engine. The Stage 3 default (when no wrapper is installed on a coordinator) is identity — payloads pass through unchanged. Stage 6's cluster-flag pipeline installs an active wrapper, sourced from the sidecar's currently- active raft DEK and a writer-registry-backed nonce factory.
Implementations MUST be safe to call concurrently from many goroutines: the coordinator may invoke this on every concurrent proposal. Encryption-state transitions (Phase 1 → Phase 2 cutover) publish a fresh closure via atomic.Pointer so the wrapper observes one consistent (cipher, key_id, nonce_factory) tuple per call.
type RaftStatusProvider ¶
type RaftStatusProvider interface {
Status() raftengine.Status
}
type RegistrationGate ¶
type RegistrationGate struct {
// Barrier is the open-or-closed channel described above. A nil
// Barrier (the zero value, or an explicitly nil field) means
// "never armed" → ungated; awaitRegistration checks for nil first
// so a nil channel is never received on (which would block forever).
Barrier <-chan struct{}
// StorageEnvelopeActive and ActiveStorageKeyID read the process-wide
// encryption StateCache. A write is gated only when the §7.1 cutover
// has fired AND a storage DEK is active — i.e. the write would land
// encrypted and thus emit a nonce under this node's identity.
StorageEnvelopeActive func() bool
ActiveStorageKeyID func() (uint32, bool)
}
RegistrationGate carries the Stage 7a §4.1 registration-before- first-write barrier into the coordinator. main.go owns the encryption StateCache and the registration goroutine and supplies this; kv stays decoupled from internal/encryption by taking a plain channel + predicate closures rather than the StateCache type.
Barrier three-state (per the 7a design §3.2):
- nil → no registration pending (Phase 0 / skip / off) → ungated
- open (non-nil, not closed) → registration in flight → mutating encrypted writes block on it
- closed → registration committed → ungated (fast path)
type RouteHistory ¶
type RouteHistory interface {
// SnapshotAt returns the route catalog at the given catalog
// version. Returns (zero, false) when the version is outside
// the ring (either evicted by depth, or in the future). The
// M3 gate maps the not-found case to ErrComposed1VersionGCd.
SnapshotAt(version uint64) (RouteSnapshot, bool)
// Current returns the route catalog snapshot at the engine's
// current catalog version. Returns (zero, false) when the
// engine has no history (bare-struct case used by some test
// seams). The M3 cross-version fence uses this to compare
// the txn's observed-version owner against the current
// owner — a mismatch is the §3 codex P1 trace.
Current() (RouteSnapshot, bool)
}
RouteHistory is the kv-side interface to the route catalog's versioned-snapshot ring. *distribution.Engine satisfies it via WrapDistributionEngine. Defined in the kv package so kvFSM does not have to import a concrete type for the field; the M3 verifyComposed1 gate uses only SnapshotAt + Current + the returned snapshot's OwnerOf, so the interface stays minimal.
func WrapDistributionEngine ¶
func WrapDistributionEngine(e *distribution.Engine) RouteHistory
WrapDistributionEngine adapts a *distribution.Engine so it satisfies the kv.RouteHistory interface that kvFSM's M2 Composed-1 plumbing consumes.
The adapter is a thin two-hop boxing: kv.RouteHistory.SnapshotAt returns a kv.RouteSnapshot interface; distribution.Engine.SnapshotAt returns a concrete distribution.RouteHistorySnapshot struct. Go's structural interface satisfaction is byte-equivalent on return types, so we cannot have *distribution.Engine satisfy kv.RouteHistory directly — and moving the interface to the distribution package would create an import cycle (kv already imports distribution via kv/sharded_coordinator.go).
Production wiring in main.go uses WrapDistributionEngine to install the engine as the FSM's route-history provider; tests that want to mock kv.RouteHistory bypass the wrapper entirely and implement the kv interface directly.
See docs/design/2026_05_29_partial_composed1_cross_group_commit_guard.md §M2.
type RouteSnapshot ¶
type RouteSnapshot interface {
// Version returns the catalog version this snapshot was
// recorded at.
Version() uint64
// OwnerOf returns the Raft group ID that owned key at this
// snapshot's version. (0, false) when no route covered key.
OwnerOf(key []byte) (uint64, bool)
}
RouteSnapshot is the historical view of the route catalog at a specific version. Returned by RouteHistory.SnapshotAt; the M3 gate uses Version + OwnerOf to compare against the FSM's shardGroupID.
type ShardGroup ¶
type ShardGroup struct {
Engine raftengine.Engine
Store store.MVCCStore
Txn Transactional
// contains filtered or unexported fields
}
func (*ShardGroup) BeginCutoverBarrier ¶
func (g *ShardGroup) BeginCutoverBarrier() <-chan struct{}
BeginCutoverBarrier opens the §7.1 step-1 quiescence barrier on this shard group's proposer chain. Returns a channel that closes when all in-flight user Propose calls drain; the typical caller uses WaitInflightDrained which composes context cancellation.
Forwards to *dynamicWrappedProposer when present. When the proposer is the bare engine (raw Engine fallback in test fixtures), returns a pre-closed channel so callers that don't distinguish barrier-capable from -incapable proposers can drive the same state-machine shape against either.
6E-2d wiring: every leader's EnableRaftEnvelope handler calls this on each ShardGroup that participates in the cutover before proposing the cutover entry. After return, the proposer's dynamicWrappedProposer.Propose rejects fresh user calls with raftengine.ErrEnvelopeCutoverInProgress.
func (*ShardGroup) EndCutoverBarrier ¶
func (g *ShardGroup) EndCutoverBarrier()
EndCutoverBarrier closes the §7.1 step-6 barrier on this shard group's proposer chain. Idempotent against barrier-incapable proposers (no-op). Callers MUST pair each BeginCutoverBarrier with exactly one EndCutoverBarrier (the EnableRaftEnvelope handler uses defer).
func (*ShardGroup) Proposer ¶
func (g *ShardGroup) Proposer() raftengine.Proposer
Proposer returns the wrap-aware proposer chain installed by NewLeaderProxyForShardGroup, or the raw Engine when the constructor was bypassed (legacy / test fixtures that build ShardGroup via struct literal). Direct shard proposals (HLC lease renewal, future cipher rotations, etc.) MUST go through this getter rather than g.Engine.Propose so the Stage 6E-2c dynamic wrap path applies — a direct g.Engine Propose call would bypass the wrap pointer and let post-cutover writes land cleartext above the raft-envelope cutover, halting the apply loop on §6.3 strict-> unwrap (codex P2 round-1).
func (*ShardGroup) RaftPayloadWrap ¶
func (g *ShardGroup) RaftPayloadWrap() RaftPayloadWrapper
RaftPayloadWrap returns the currently-installed wrap closure, or nil if the wrap is inactive. Primarily intended for tests and diagnostics; production proposers consult the underlying atomic.Pointer directly (see dynamicWrappedProposer).
func (*ShardGroup) SetRaftPayloadWrap ¶
func (g *ShardGroup) SetRaftPayloadWrap(wrap RaftPayloadWrapper)
SetRaftPayloadWrap publishes wrap as the active raft envelope closure for this shard group. Passing nil clears the wrap (the proposer reverts to cleartext pass-through). Safe to call from any goroutine; the next Propose / ProposeAdmin observes the new state via the proposer's atomic.Pointer.Load.
This is the sole supported way to install or rotate the wrap closure on a running coordinator. Stage 6E-2d's EnableRaftEnvelope handler will call this on every leader when the cutover entry commits.
func (*ShardGroup) WaitInflightDrained ¶
func (g *ShardGroup) WaitInflightDrained(ctx context.Context) error
WaitInflightDrained blocks until the in-flight Propose counter drops to 0 after BeginCutoverBarrier ran on this ShardGroup, or ctx fires. Returns nil on drain or when the proposer is barrier- incapable (degraded fast-path so test fixtures don't deadlock the handler). Wraps ctx.Err() on cancellation.
type ShardRouter ¶
type ShardRouter struct {
// contains filtered or unexported fields
}
ShardRouter routes requests to multiple raft groups based on key ranges.
Cross-shard transactions are not supported. They require distributed coordination (for example, 2PC) to ensure atomicity.
Non-transactional request batches may still partially succeed across shards.
func NewShardRouter ¶
func NewShardRouter(e *distribution.Engine) *ShardRouter
NewShardRouter creates a new router.
func (*ShardRouter) Abort ¶
func (s *ShardRouter) Abort(ctx context.Context, reqs []*pb.Request) (*TransactionResponse, error)
Abort dispatches aborts to the correct raft group.
func (*ShardRouter) Commit ¶
func (s *ShardRouter) Commit(ctx context.Context, reqs []*pb.Request) (*TransactionResponse, error)
func (*ShardRouter) Register ¶
func (s *ShardRouter) Register(group uint64, tm Transactional, st store.MVCCStore)
Register associates a raft group ID with its transactional manager and store.
func (*ShardRouter) ResolveGroup ¶
func (s *ShardRouter) ResolveGroup(rawKey []byte) (uint64, bool)
ResolveGroup tries the partition resolver first (when installed), then falls through to the byte-range engine. Exposed at package scope so ShardedCoordinator's per-key helpers (groupForKey, routeAndGroupForKey, engineGroupIDForKey, groupMutations) can consult the same dispatch path Commit / Abort / Get use — without it those helpers would bypass the resolver and partitioned-FIFO traffic would silently mis-route through 2PC and the read paths.
The resolver runs on the RAW key before any user-key normalization. SQS keys in particular are collapsed to !sqs|route|global by routeKey to keep the engine's per-shard layout simple, but that collapse hides the partitioned-prefix information the resolver needs (issue: codex P1 / gemini high on PR #715). The engine still sees the post-normalization key, so legacy routing (catalog → !sqs|route|global → default group) stays unchanged.
Fail-closed for recognised-but-unresolved keys: when the resolver recognises a partitioned shape (RecognisesPartitionedKey == true) but cannot resolve the queue/partition pair (ResolveGroup returns ok=false), the router refuses to fall through to the engine. Otherwise the engine would route the partitioned key to !sqs|route|global's default group, silently mis-routing HT-FIFO traffic during partition-map drift or partial rollout (codex P1 round 2 on PR #715).
Returns (0, false) when neither the resolver nor the engine recognises the key. Caller surfaces this as an "unknown group" error so a partitioned-prefix key whose queue is missing from the resolver map fails closed rather than landing on whichever engine-default group happens to cover the raw bytes.
func (*ShardRouter) WithPartitionResolver ¶
func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter
WithPartitionResolver installs a partition-keyspace resolver that is consulted before the byte-range engine on every dispatch. A nil resolver clears any previously-installed resolver. Returns the receiver so callers can chain.
Intended for use during startup, before the router begins handling requests. Interface assignment in Go is not atomic, so a call that races with a concurrent ResolveGroup in resolveGroup may produce a torn read; callers must wire the resolver once during construction (parseRuntimeConfig → NewShardedCoordinator → WithPartitionResolver) and treat any post-startup re-assignment as undefined behaviour.
type ShardStore ¶
type ShardStore struct {
// contains filtered or unexported fields
}
ShardStore routes MVCC reads to shard-specific stores and proxies to leaders when needed.
func NewShardStore ¶
func NewShardStore(engine *distribution.Engine, groups map[uint64]*ShardGroup) *ShardStore
NewShardStore creates a sharded MVCC store wrapper.
func (*ShardStore) ApplyMutations ¶
func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error
ApplyMutations applies a batch of mutations to the correct shard store.
All mutations must belong to the same shard. Cross-shard mutation batches are not supported.
func (*ShardStore) ApplyMutationsRaft ¶
func (s *ShardStore) ApplyMutationsRaft(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error
ApplyMutationsRaft is the raft-apply variant; see store.MVCCStore for the durability contract. Only the FSM may call this method.
func (*ShardStore) ApplyMutationsRaftAt ¶
func (s *ShardStore) ApplyMutationsRaftAt(ctx context.Context, mutations []*store.KVPairMutation, readKeys [][]byte, startTS, commitTS, appliedIndex uint64) error
ApplyMutationsRaftAt is the raft-entry-index-aware variant. Threads appliedIndex through to the single owning shard so the leaf can bundle metaAppliedIndex with the mutation. See PR #910 design §2.
func (*ShardStore) Close ¶
func (s *ShardStore) Close() error
func (*ShardStore) CommittedVersionAt ¶
func (s *ShardStore) CommittedVersionAt(ctx context.Context, key []byte, commitTS uint64) (bool, error)
CommittedVersionAt routes the exact-timestamp existence probe to the owning group's local store, gated on the same lease-aware leader check GetAt uses, so a deposed node that has not yet applied a freshly- committed entry does not silently return false to a client read. The FSM apply path is NOT affected — it holds the per-shard store directly (not ShardStore) and runs the probe on the deterministic local replica it is writing to. The option-2 reuse path (RedisServer.resolveReuseLength) goes through this wrapper, so during leader churn the probe must answer authoritatively or defer to a leader-routed re-read.
There is no RawCommittedVersionAt RPC to proxy to; when we are not the linearizable leader for the group we return (false, nil) and let the caller fall back to derived reads (resolveListMeta uses ScanAt/GetAt, which ARE leader-fenced / proxied per group). The fallback returns the leader's current Len — a valid serialization — at the cost of the pending.length fast-path during churn. Mirrors LeaderRoutedStore's fix for codex P1 #796.
func (*ShardStore) DeletePrefixAt ¶
func (s *ShardStore) DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
DeletePrefixAt applies a prefix delete to every shard in the store.
func (*ShardStore) DeletePrefixAtRaft ¶
func (s *ShardStore) DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt.
func (*ShardStore) DeletePrefixAtRaftAt ¶
func (s *ShardStore) DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, appliedIndex uint64) error
DeletePrefixAtRaftAt is the raft-entry-index-aware variant. The caller's raft entry index applies only to the local group whose FSM is driving this apply; on a multi-group ShardStore, fanning the SAME index across other groups would corrupt their metaAppliedIndex. The single-group case (the common case for an FSM-local DeletePrefixAtRaft path) gets the correct bundling; the multi-group broadcast case is treated as "passive" — peer groups receive the prefix-delete without a meta-key bump (their own raft applies will catch up the index on the next mutation).
In practice the FSM call sites that issue raft-DeletePrefix operate against a single group's store; the multi-group ShardStore is the receiver only when an aggregate (admin / coordinator) path is replaying a global FLUSHALL, which is not raft-applied.
func (*ShardStore) LastAppliedIndex ¶
func (s *ShardStore) LastAppliedIndex() (uint64, bool, error)
LastAppliedIndex aggregates the durable applied-index across every shard group, returning the MIN over all groups that report one.
MIN is the right aggregator because the kvFSM is per-shard in production — each shard's FSM independently asks "is MY group's applied index at least as fresh as MY group's snapshot?" — and ShardStore is NEVER used as the FSM's f.store in production today (the FSM holds a *pebbleStore directly; ShardStore is the coordinator-facing fanout wrapper). This method exists as a defensive forward in case a future refactor uses ShardStore from the apply path; reporting MIN guarantees the cold-start skip gate would refuse to skip whenever ANY group lags, matching the conservative "over-restore beats under-restore" rule (PR #910 design §4).
(0, false, nil) when no group reports a value — strictly-additive fallback per design §4.
func (*ShardStore) LastCommitTS ¶
func (s *ShardStore) LastCommitTS() uint64
func (*ShardStore) LatestCommitTS ¶
func (*ShardStore) PutWithTTLAt ¶
func (*ShardStore) ReverseScanAt ¶
func (*ShardStore) ScanAt ¶
func (s *ShardStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
ScanAt scans keys across shards at the given timestamp. Note: when the range spans multiple shards, each shard may have a different Raft apply position. This means the returned view is NOT a globally consistent snapshot — it is a best-effort point-in-time scan. Callers requiring cross-shard consistency should use a transaction or implement a cross-shard snapshot fence.
func (*ShardStore) SetDurableAppliedIndex ¶
func (s *ShardStore) SetDurableAppliedIndex(idx uint64) error
SetDurableAppliedIndex broadcasts the bump to every group store that exposes the writer seam.
This is purely defensive — in production today the FSM holds a *pebbleStore directly; ShardStore is never f.store. Were it ever wired through the FSM apply path, broadcasting the same idx across groups would corrupt their per-group metaAppliedIndex semantics (each group has its own raft log with its own entry numbering). For that hypothetical, the test convention from DeletePrefixAtRaftAt applies: tests MUST pass idx=0 to opt out, or not use ShardStore as the writer at all. Returns the first per-group error.
func (*ShardStore) WriteConflictCountsByPrefix ¶
func (s *ShardStore) WriteConflictCountsByPrefix() map[string]uint64
WriteConflictCountsByPrefix aggregates OCC conflict counts across every shard group owned by this ShardStore. Per-shard counts share the same "<kind>|<key_prefix>" label schema, so a simple sum gives the node-wide view. The result is always non-nil.
type ShardedCoordinator ¶
type ShardedCoordinator struct {
// contains filtered or unexported fields
}
ShardedCoordinator routes operations to shard-specific raft groups. It issues timestamps via a shared HLC and uses ShardRouter to dispatch.
func NewShardedCoordinator ¶
func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*ShardGroup, defaultGroup uint64, clock *HLC, st store.MVCCStore) *ShardedCoordinator
NewShardedCoordinator builds a coordinator for the provided shard groups. The defaultGroup is used for non-keyed leader checks.
func (*ShardedCoordinator) Clock ¶
func (c *ShardedCoordinator) Clock() *HLC
func (*ShardedCoordinator) Close ¶
func (c *ShardedCoordinator) Close() error
Close releases per-shard engine-side registrations. Idempotent.
func (*ShardedCoordinator) Dispatch ¶
func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
func (*ShardedCoordinator) EngineGroupIDForKey ¶
func (c *ShardedCoordinator) EngineGroupIDForKey(key []byte) uint64
EngineGroupIDForKey reports the Raft group ID that owns key, or 0 when the key cannot be routed. Callers that batch lease checks across many keys use it to collapse keys sharing a group into a single lease read (see GroupRoutableCoordinator). It performs no I/O — only an in-memory router lookup — so it is safe on the read hot path.
func (*ShardedCoordinator) IsLeader ¶
func (c *ShardedCoordinator) IsLeader() bool
func (*ShardedCoordinator) IsLeaderForKey ¶
func (c *ShardedCoordinator) IsLeaderForKey(key []byte) bool
func (*ShardedCoordinator) LeaseRead ¶
func (c *ShardedCoordinator) LeaseRead(ctx context.Context) (uint64, error)
LeaseRead routes through the default group's lease. See Coordinate.LeaseRead for semantics.
func (*ShardedCoordinator) LeaseReadAllGroups ¶
func (c *ShardedCoordinator) LeaseReadAllGroups(ctx context.Context) error
LeaseReadAllGroups establishes the lease freshness bound on every shard group this coordinator owns. Multi-shard reads (Scan, GSI/whole-table Query) visit all intersecting routes across all groups (see ShardStore.ScanAt), so fencing only the default group would let those reads sample a snapshot on a non-default group without the freshness bound. It fails closed on the first group that cannot confirm its lease, since a partially-fenced read is exactly the stale read this guards against. Group iteration order is unspecified; correctness does not depend on it because every group must succeed.
func (*ShardedCoordinator) LeaseReadForKey ¶
LeaseReadForKey performs the lease check on the shard group that owns key. Each group maintains its own lease since each group has independent leadership and term.
func (*ShardedCoordinator) LinearizableRead ¶
func (c *ShardedCoordinator) LinearizableRead(ctx context.Context) (uint64, error)
func (*ShardedCoordinator) LinearizableReadForKey ¶
func (*ShardedCoordinator) RaftLeader ¶
func (c *ShardedCoordinator) RaftLeader() string
func (*ShardedCoordinator) RaftLeaderForKey ¶
func (c *ShardedCoordinator) RaftLeaderForKey(key []byte) string
func (*ShardedCoordinator) RunHLCLeaseRenewal ¶
func (c *ShardedCoordinator) RunHLCLeaseRenewal(ctx context.Context)
RunHLCLeaseRenewal periodically proposes a new physical ceiling to the default shard group's Raft cluster while this node is the leader of that group. This mirrors the single-shard Coordinate.RunHLCLeaseRenewal behaviour, ensuring the shared HLC ceiling is maintained in multi-shard deployments.
RunHLCLeaseRenewal blocks until ctx is cancelled; call it in a goroutine.
func (*ShardedCoordinator) VerifyLeader ¶
func (c *ShardedCoordinator) VerifyLeader(ctx context.Context) error
func (*ShardedCoordinator) VerifyLeaderForKey ¶
func (c *ShardedCoordinator) VerifyLeaderForKey(ctx context.Context, key []byte) error
func (*ShardedCoordinator) WithLeaseReadObserver ¶
func (c *ShardedCoordinator) WithLeaseReadObserver(observer LeaseReadObserver) *ShardedCoordinator
WithLeaseReadObserver wires a LeaseReadObserver onto a ShardedCoordinator. Applied after construction because the NewShardedCoordinator signature is already heavily overloaded; see Coordinate.WithLeaseReadObserver for the equivalent option on the single-group coordinator, including the typed-nil guard rationale.
func (*ShardedCoordinator) WithPartitionResolver ¶
func (c *ShardedCoordinator) WithPartitionResolver(r PartitionResolver) *ShardedCoordinator
WithPartitionResolver wires a PartitionResolver onto the coordinator's underlying ShardRouter. The resolver runs before the byte-range engine on every dispatch, so partition-keyspace schemes (e.g. SQS HT-FIFO) can override the default shard layout without breaking the engine's non-overlapping-cover invariant.
Applied after construction for the same reason as the other With* options on this type — NewShardedCoordinator is already heavily overloaded. Passing a nil resolver clears any previously- installed resolver.
func (*ShardedCoordinator) WithRegistrationGate ¶
func (c *ShardedCoordinator) WithRegistrationGate(g *RegistrationGate) *ShardedCoordinator
WithRegistrationGate wires the Stage 7a first-write barrier onto the coordinator. Applied after construction for the same reason as the other With* options. A nil gate (or a gate with a nil Barrier) leaves every write ungated — the encryption-off / no-pending- registration posture.
func (*ShardedCoordinator) WithSampler ¶
func (c *ShardedCoordinator) WithSampler(s keyviz.Sampler) *ShardedCoordinator
WithSampler wires a keyviz.Sampler onto a ShardedCoordinator. The coordinator calls sampler.Observe at dispatch entry — once per resolved (RouteID, mutation key) pair — to feed the key visualizer heatmap (design doc §5.1). Applied after construction for the same reason as WithLeaseReadObserver: NewShardedCoordinator is already heavily overloaded.
Passing a nil interface value is supported and disables sampling (the call site guards against it). Passing a typed-nil *keyviz.MemSampler also works because Observe is nil-safe by contract.
type TransactionManager ¶
type TransactionManager struct {
// contains filtered or unexported fields
}
func NewTransactionWithProposer ¶
func NewTransactionWithProposer(proposer raftengine.Proposer, opts ...TransactionOption) *TransactionManager
func (*TransactionManager) Abort ¶
func (t *TransactionManager) Abort(ctx context.Context, reqs []*pb.Request) (*TransactionResponse, error)
func (*TransactionManager) Close ¶
func (t *TransactionManager) Close()
Close signals the TransactionManager to stop and drains any pending raw commit items, sending each an error so callers are not blocked forever.
func (*TransactionManager) Commit ¶
func (t *TransactionManager) Commit(ctx context.Context, reqs []*pb.Request) (*TransactionResponse, error)
type TransactionOption ¶
type TransactionOption func(*TransactionManager)
func WithProposalObserver ¶
func WithProposalObserver(observer ProposalObserver) TransactionOption
WithProposalObserver records raft.Apply failures without coupling kv to a concrete monitoring backend.
type TransactionResponse ¶
type TransactionResponse struct {
CommitIndex uint64
}
type Transactional ¶
type Transactional interface {
Commit(ctx context.Context, reqs []*pb.Request) (*TransactionResponse, error)
Abort(ctx context.Context, reqs []*pb.Request) (*TransactionResponse, error)
}
Transactional is the kv-internal interface that fronts the raft propose path. Implementations (TransactionManager, LeaderProxy, ShardRouter, leaseRefreshingTxn) thread the caller's context end-to-end so a Redis / gRPC / S3 / SQS handler's deadline reaches Propose / VerifyLeader without being silently dropped to context.Background. See PR #748 / design doc 2026_05_10_proposed_kv_ctx_plumbing.md for the rationale; the prior signatures lived behind `verifyLeaderEngine`'s 5 s safety bound (#745), which is preserved as the no-ctx defense-in-depth fallback.
type TxnLockedError ¶
type TxnLockedError struct {
// contains filtered or unexported fields
}
func (*TxnLockedError) Error ¶
func (e *TxnLockedError) Error() string
func (*TxnLockedError) Unwrap ¶
func (e *TxnLockedError) Unwrap() error
type TxnMeta ¶
TxnMeta is embedded into transactional raft log requests via a synthetic mutation (key prefix "!txn|meta|"). It is not persisted in the MVCC store.
PrevCommitTS is the commit timestamp of a failed previous attempt of the same single-shard transaction. It is set only on a retry, and only carries the one-phase idempotency dedup probe (option 2): at apply, the FSM checks whether the previous attempt's write set already landed at exactly this timestamp and, if so, no-ops the apply instead of re-applying. Because it only needs the V2 wire format, EncodeTxnMeta keeps emitting V1 whenever PrevCommitTS is zero (every non-retry path), so the default wire format is unchanged. See docs/design/2026_05_21_proposed_txn_secondary_idempotency.md.
func DecodeTxnMeta ¶
Source Files
¶
- active_timestamp_tracker.go
- compactor.go
- coordinator.go
- fsm.go
- fsm_applied_index_iface_check.go
- fsm_encryption.go
- grpc_conn_cache.go
- hlc.go
- hlc_wall.go
- latest_commit_ts.go
- leader_proxy.go
- leader_routed_store.go
- lease_state.go
- lock_resolver.go
- raft_engine.go
- raft_payload_wrapper.go
- route_history.go
- shard_key.go
- shard_router.go
- shard_store.go
- sharded_coordinator.go
- snapshot.go
- transaction.go
- transcoder.go
- txn_codec.go
- txn_consts.go
- txn_errors.go
- txn_keys.go