Documentation
¶
Index ¶
- Constants
- Variables
- func EncodeTxnMeta(m TxnMeta) []byte
- func ExtractTxnUserKey(key []byte) []byte
- func MaxLatestCommitTS(ctx context.Context, st store.MVCCStore, keys [][]byte) (uint64, error)
- func NewTxnLockedError(key []byte) error
- func NewTxnLockedErrorWithDetail(key []byte, detail string) error
- func TxnLockedDetails(err error) ([]byte, string, bool)
- type ActiveTimestampToken
- type ActiveTimestampTracker
- type Coordinate
- func (c *Coordinate) Clock() *HLC
- func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
- func (c *Coordinate) IsLeader() bool
- func (c *Coordinate) IsLeaderForKey(_ []byte) bool
- func (c *Coordinate) RaftLeader() raft.ServerAddress
- func (c *Coordinate) RaftLeaderForKey(_ []byte) raft.ServerAddress
- func (c *Coordinate) VerifyLeader() error
- func (c *Coordinate) VerifyLeaderForKey(_ []byte) error
- type CoordinateResponse
- type Coordinator
- type Elem
- type FSM
- type FSMCompactRuntime
- type FSMCompactor
- type FSMCompactorOption
- func WithFSMCompactorActiveTimestampTracker(tracker *ActiveTimestampTracker) FSMCompactorOption
- func WithFSMCompactorInterval(interval time.Duration) FSMCompactorOption
- func WithFSMCompactorLogger(logger *slog.Logger) FSMCompactorOption
- func WithFSMCompactorRetentionWindow(window time.Duration) FSMCompactorOption
- func WithFSMCompactorTimeout(timeout time.Duration) FSMCompactorOption
- type GRPCConnCache
- type HLC
- type LeaderProxy
- type LeaderRoutedStore
- func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, ...) error
- func (s *LeaderRoutedStore) Close() error
- func (s *LeaderRoutedStore) Compact(ctx context.Context, minTS uint64) error
- func (s *LeaderRoutedStore) DeleteAt(ctx context.Context, key []byte, commitTS uint64) error
- func (s *LeaderRoutedStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, error)
- func (s *LeaderRoutedStore) ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS uint64) error
- func (s *LeaderRoutedStore) GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error)
- func (s *LeaderRoutedStore) LastCommitTS() uint64
- func (s *LeaderRoutedStore) LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error)
- func (s *LeaderRoutedStore) PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, ...) error
- func (s *LeaderRoutedStore) PutWithTTLAt(ctx context.Context, key []byte, value []byte, commitTS uint64, ...) error
- func (s *LeaderRoutedStore) Restore(buf io.Reader) error
- func (s *LeaderRoutedStore) ReverseScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
- func (s *LeaderRoutedStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
- func (s *LeaderRoutedStore) Snapshot() (store.Snapshot, error)
- type OP
- type OperationGroup
- type RaftStatsProvider
- type ShardGroup
- type ShardRouter
- func (s *ShardRouter) Abort(reqs []*pb.Request) (*TransactionResponse, error)
- func (s *ShardRouter) Commit(reqs []*pb.Request) (*TransactionResponse, error)
- func (s *ShardRouter) Get(ctx context.Context, key []byte) ([]byte, error)
- func (s *ShardRouter) Register(group uint64, tm Transactional, st store.MVCCStore)
- type ShardStore
- func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, ...) error
- func (s *ShardStore) Close() error
- func (s *ShardStore) Compact(ctx context.Context, minTS uint64) error
- func (s *ShardStore) DeleteAt(ctx context.Context, key []byte, commitTS uint64) error
- func (s *ShardStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, error)
- func (s *ShardStore) ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS uint64) error
- func (s *ShardStore) GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error)
- func (s *ShardStore) LastCommitTS() uint64
- func (s *ShardStore) LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error)
- func (s *ShardStore) PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, ...) error
- func (s *ShardStore) PutWithTTLAt(ctx context.Context, key []byte, value []byte, commitTS uint64, ...) error
- func (s *ShardStore) Restore(_ io.Reader) error
- func (s *ShardStore) ReverseScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
- func (s *ShardStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)
- func (s *ShardStore) Snapshot() (store.Snapshot, error)
- type ShardedCoordinator
- func (c *ShardedCoordinator) Clock() *HLC
- func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
- func (c *ShardedCoordinator) IsLeader() bool
- func (c *ShardedCoordinator) IsLeaderForKey(key []byte) bool
- func (c *ShardedCoordinator) RaftLeader() raft.ServerAddress
- func (c *ShardedCoordinator) RaftLeaderForKey(key []byte) raft.ServerAddress
- func (c *ShardedCoordinator) VerifyLeader() error
- func (c *ShardedCoordinator) VerifyLeaderForKey(key []byte) error
- type TransactionManager
- type TransactionResponse
- type Transactional
- type TxnLockedError
- type TxnMeta
Constants ¶
const TxnMetaPrefix = txnMetaPrefix
TxnMetaPrefix is the key prefix used for transaction metadata mutations.
Variables ¶
var ( ErrTxnMetaMissing = errors.New("txn meta missing") ErrTxnInvalidMeta = errors.New("txn meta invalid") ErrTxnLocked = errors.New("txn locked") ErrTxnCommitTSRequired = errors.New("txn commit ts required") ErrTxnAlreadyCommitted = errors.New("txn already committed") ErrTxnPrimaryKeyRequired = errors.New("txn primary key required") )
var ErrCrossShardMutationBatchNotSupported = errors.New("cross-shard mutation batches are not supported")
var ErrCrossShardTransactionNotSupported = errors.New("cross-shard transactions are not supported")
var ErrInvalidRequest = errors.New("invalid request")
var ErrLeaderNotFound = errors.New("leader not found")
var ErrNotImplemented = errors.New("not implemented")
var ErrUnknownRequestType = errors.New("unknown request type")
Functions ¶
func EncodeTxnMeta ¶
func ExtractTxnUserKey ¶
ExtractTxnUserKey returns the logical user key embedded in a transaction- internal key such as !txn|lock|, !txn|cmt|, or !txn|meta|. It returns nil when the key does not use a transaction-internal namespace or is malformed.
func MaxLatestCommitTS ¶
MaxLatestCommitTS returns the maximum commit timestamp for the provided keys.
Missing keys are ignored. If any LatestCommitTS lookup returns an error, the error is returned to the caller.
func NewTxnLockedError ¶
Types ¶
type ActiveTimestampToken ¶
type ActiveTimestampToken struct {
// contains filtered or unexported fields
}
ActiveTimestampToken releases one tracked timestamp when the owning operation completes.
func (*ActiveTimestampToken) Release ¶
func (t *ActiveTimestampToken) Release()
type ActiveTimestampTracker ¶
type ActiveTimestampTracker struct {
// contains filtered or unexported fields
}
ActiveTimestampTracker tracks in-flight read or transaction timestamps that must remain readable while background compaction is running.
func NewActiveTimestampTracker ¶
func NewActiveTimestampTracker() *ActiveTimestampTracker
func (*ActiveTimestampTracker) Oldest ¶
func (t *ActiveTimestampTracker) Oldest() uint64
func (*ActiveTimestampTracker) Pin ¶
func (t *ActiveTimestampTracker) Pin(ts uint64) *ActiveTimestampToken
type Coordinate ¶
type Coordinate struct {
// contains filtered or unexported fields
}
func NewCoordinator ¶
func NewCoordinator(txm Transactional, r *raft.Raft) *Coordinate
func (*Coordinate) Clock ¶
func (c *Coordinate) Clock() *HLC
func (*Coordinate) Dispatch ¶
func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
func (*Coordinate) IsLeader ¶
func (c *Coordinate) IsLeader() bool
func (*Coordinate) IsLeaderForKey ¶
func (c *Coordinate) IsLeaderForKey(_ []byte) bool
func (*Coordinate) RaftLeader ¶
func (c *Coordinate) RaftLeader() raft.ServerAddress
RaftLeader returns the current leader's address as known by this node.
func (*Coordinate) RaftLeaderForKey ¶
func (c *Coordinate) RaftLeaderForKey(_ []byte) raft.ServerAddress
func (*Coordinate) VerifyLeader ¶
func (c *Coordinate) VerifyLeader() error
func (*Coordinate) VerifyLeaderForKey ¶
func (c *Coordinate) VerifyLeaderForKey(_ []byte) error
type CoordinateResponse ¶
type CoordinateResponse struct {
CommitIndex uint64
}
type Coordinator ¶
type Coordinator interface {
Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
IsLeader() bool
VerifyLeader() error
RaftLeader() raft.ServerAddress
IsLeaderForKey(key []byte) bool
VerifyLeaderForKey(key []byte) error
RaftLeaderForKey(key []byte) raft.ServerAddress
Clock() *HLC
}
type FSMCompactRuntime ¶
type FSMCompactRuntime struct {
GroupID uint64
Raft RaftStatsProvider
Store store.MVCCStore
}
type FSMCompactor ¶
type FSMCompactor struct {
// contains filtered or unexported fields
}
func NewFSMCompactor ¶
func NewFSMCompactor(runtimes []FSMCompactRuntime, opts ...FSMCompactorOption) *FSMCompactor
type FSMCompactorOption ¶
type FSMCompactorOption func(*FSMCompactor)
func WithFSMCompactorActiveTimestampTracker ¶
func WithFSMCompactorActiveTimestampTracker(tracker *ActiveTimestampTracker) FSMCompactorOption
func WithFSMCompactorInterval ¶
func WithFSMCompactorInterval(interval time.Duration) FSMCompactorOption
func WithFSMCompactorLogger ¶
func WithFSMCompactorLogger(logger *slog.Logger) FSMCompactorOption
func WithFSMCompactorRetentionWindow ¶
func WithFSMCompactorRetentionWindow(window time.Duration) FSMCompactorOption
func WithFSMCompactorTimeout ¶
func WithFSMCompactorTimeout(timeout time.Duration) FSMCompactorOption
type GRPCConnCache ¶
type GRPCConnCache struct {
// contains filtered or unexported fields
}
GRPCConnCache reuses gRPC connections per address. gRPC itself handles reconnection on transient failures; we only force a re-dial if the conn has already been closed (Shutdown).
func (*GRPCConnCache) Close ¶
func (c *GRPCConnCache) Close() error
func (*GRPCConnCache) ConnFor ¶
func (c *GRPCConnCache) ConnFor(addr raft.ServerAddress) (*grpc.ClientConn, error)
type HLC ¶
type HLC struct {
// contains filtered or unexported fields
}
HLC implements a simple hybrid logical clock suitable for issuing monotonically increasing timestamps across shards/raft groups.
Layout (ms logical):
high 48 bits: wall clock milliseconds since Unix epoch low 16 bits : logical counter to break ties when wall time does not advance
This keeps ordering stable across leaders as long as clocks are loosely synchronized; it avoids dependence on per-raft commit indices that diverge between shards.
type LeaderProxy ¶
type LeaderProxy struct {
// contains filtered or unexported fields
}
LeaderProxy forwards transactional requests to the current raft leader when the local node is not the leader.
func NewLeaderProxy ¶
func NewLeaderProxy(r *raft.Raft) *LeaderProxy
NewLeaderProxy creates a leader-aware transactional proxy for a raft group.
func (*LeaderProxy) Abort ¶
func (p *LeaderProxy) Abort(reqs []*pb.Request) (*TransactionResponse, error)
func (*LeaderProxy) Close ¶
func (p *LeaderProxy) Close() error
func (*LeaderProxy) Commit ¶
func (p *LeaderProxy) Commit(reqs []*pb.Request) (*TransactionResponse, error)
type LeaderRoutedStore ¶
type LeaderRoutedStore struct {
// contains filtered or unexported fields
}
LeaderRoutedStore is an MVCCStore wrapper that serves reads from the local store only when leadership is verified; otherwise it proxies reads to the current leader via gRPC.
This is intended for single-raft-group deployments where the underlying store itself is not leader-aware (e.g. *store.MVCCStore).
Writes and maintenance operations are delegated to the local store.
func NewLeaderRoutedStore ¶
func NewLeaderRoutedStore(local store.MVCCStore, coordinator Coordinator) *LeaderRoutedStore
func (*LeaderRoutedStore) ApplyMutations ¶
func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, startTS, commitTS uint64) error
func (*LeaderRoutedStore) Close ¶
func (s *LeaderRoutedStore) Close() error
func (*LeaderRoutedStore) Compact ¶
func (s *LeaderRoutedStore) Compact(ctx context.Context, minTS uint64) error
func (*LeaderRoutedStore) LastCommitTS ¶
func (s *LeaderRoutedStore) LastCommitTS() uint64
func (*LeaderRoutedStore) LatestCommitTS ¶
func (*LeaderRoutedStore) PutWithTTLAt ¶
func (*LeaderRoutedStore) ReverseScanAt ¶
type OperationGroup ¶
type OperationGroup[T OP] struct { Elems []*Elem[T] IsTxn bool // StartTS is a logical timestamp captured at transaction begin. // It is ignored for non-transactional groups. StartTS uint64 // CommitTS optionally pins the transaction commit timestamp. // Coordinators choose one automatically when this is zero. CommitTS uint64 }
OperationGroup is a group of operations that should be executed atomically.
type RaftStatsProvider ¶
type ShardGroup ¶
type ShardGroup struct {
Raft *raft.Raft
Store store.MVCCStore
Txn Transactional
}
type ShardRouter ¶
type ShardRouter struct {
// contains filtered or unexported fields
}
ShardRouter routes requests to multiple raft groups based on key ranges.
Cross-shard transactions are not supported. They require distributed coordination (for example, 2PC) to ensure atomicity.
Non-transactional request batches may still partially succeed across shards.
func NewShardRouter ¶
func NewShardRouter(e *distribution.Engine) *ShardRouter
NewShardRouter creates a new router.
func (*ShardRouter) Abort ¶
func (s *ShardRouter) Abort(reqs []*pb.Request) (*TransactionResponse, error)
Abort dispatches aborts to the correct raft group.
func (*ShardRouter) Commit ¶
func (s *ShardRouter) Commit(reqs []*pb.Request) (*TransactionResponse, error)
func (*ShardRouter) Register ¶
func (s *ShardRouter) Register(group uint64, tm Transactional, st store.MVCCStore)
Register associates a raft group ID with its transactional manager and store.
type ShardStore ¶
type ShardStore struct {
// contains filtered or unexported fields
}
ShardStore routes MVCC reads to shard-specific stores and proxies to leaders when needed.
func NewShardStore ¶
func NewShardStore(engine *distribution.Engine, groups map[uint64]*ShardGroup) *ShardStore
NewShardStore creates a sharded MVCC store wrapper.
func (*ShardStore) ApplyMutations ¶
func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, startTS, commitTS uint64) error
ApplyMutations applies a batch of mutations to the correct shard store.
All mutations must belong to the same shard. Cross-shard mutation batches are not supported.
func (*ShardStore) Close ¶
func (s *ShardStore) Close() error
func (*ShardStore) LastCommitTS ¶
func (s *ShardStore) LastCommitTS() uint64
func (*ShardStore) LatestCommitTS ¶
func (*ShardStore) PutWithTTLAt ¶
func (*ShardStore) ReverseScanAt ¶
type ShardedCoordinator ¶
type ShardedCoordinator struct {
// contains filtered or unexported fields
}
ShardedCoordinator routes operations to shard-specific raft groups. It issues timestamps via a shared HLC and uses ShardRouter to dispatch.
func NewShardedCoordinator ¶
func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*ShardGroup, defaultGroup uint64, clock *HLC, st store.MVCCStore) *ShardedCoordinator
NewShardedCoordinator builds a coordinator for the provided shard groups. The defaultGroup is used for non-keyed leader checks.
func (*ShardedCoordinator) Clock ¶
func (c *ShardedCoordinator) Clock() *HLC
func (*ShardedCoordinator) Dispatch ¶
func (c *ShardedCoordinator) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
func (*ShardedCoordinator) IsLeader ¶
func (c *ShardedCoordinator) IsLeader() bool
func (*ShardedCoordinator) IsLeaderForKey ¶
func (c *ShardedCoordinator) IsLeaderForKey(key []byte) bool
func (*ShardedCoordinator) RaftLeader ¶
func (c *ShardedCoordinator) RaftLeader() raft.ServerAddress
func (*ShardedCoordinator) RaftLeaderForKey ¶
func (c *ShardedCoordinator) RaftLeaderForKey(key []byte) raft.ServerAddress
func (*ShardedCoordinator) VerifyLeader ¶
func (c *ShardedCoordinator) VerifyLeader() error
func (*ShardedCoordinator) VerifyLeaderForKey ¶
func (c *ShardedCoordinator) VerifyLeaderForKey(key []byte) error
type TransactionManager ¶
type TransactionManager struct {
// contains filtered or unexported fields
}
func NewTransaction ¶
func NewTransaction(raft *raft.Raft) *TransactionManager
func (*TransactionManager) Abort ¶
func (t *TransactionManager) Abort(reqs []*pb.Request) (*TransactionResponse, error)
func (*TransactionManager) Close ¶
func (t *TransactionManager) Close()
Close signals the TransactionManager to stop and drains any pending raw commit items, sending each an error so callers are not blocked forever.
func (*TransactionManager) Commit ¶
func (t *TransactionManager) Commit(reqs []*pb.Request) (*TransactionResponse, error)
type TransactionResponse ¶
type TransactionResponse struct {
CommitIndex uint64
}
type Transactional ¶
type Transactional interface {
Commit(reqs []*pb.Request) (*TransactionResponse, error)
Abort(reqs []*pb.Request) (*TransactionResponse, error)
}
type TxnLockedError ¶
type TxnLockedError struct {
// contains filtered or unexported fields
}
func (*TxnLockedError) Error ¶
func (e *TxnLockedError) Error() string
func (*TxnLockedError) Unwrap ¶
func (e *TxnLockedError) Unwrap() error
type TxnMeta ¶
TxnMeta is embedded into transactional raft log requests via a synthetic mutation (key prefix "!txn|meta|"). It is not persisted in the MVCC store.
func DecodeTxnMeta ¶
Source Files
¶
- active_timestamp_tracker.go
- compactor.go
- coordinator.go
- fsm.go
- grpc_conn_cache.go
- hlc.go
- hlc_wall.go
- latest_commit_ts.go
- leader_proxy.go
- leader_routed_store.go
- raft_leader.go
- shard_key.go
- shard_router.go
- shard_store.go
- sharded_coordinator.go
- snapshot.go
- transaction.go
- transcoder.go
- txn_codec.go
- txn_consts.go
- txn_errors.go
- txn_keys.go