kv

package
v0.0.0-...-a779712 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2026 License: AGPL-3.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const TxnMetaPrefix = txnMetaPrefix

TxnMetaPrefix is the key prefix used for transaction metadata mutations.

Variables

View Source
var (
	ErrTxnMetaMissing        = errors.New("txn meta missing")
	ErrTxnInvalidMeta        = errors.New("txn meta invalid")
	ErrTxnLocked             = errors.New("txn locked")
	ErrTxnCommitTSRequired   = errors.New("txn commit ts required")
	ErrTxnAlreadyCommitted   = errors.New("txn already committed")
	ErrTxnPrimaryKeyRequired = errors.New("txn primary key required")
)
View Source
var ErrCrossShardMutationBatchNotSupported = errors.New("cross-shard mutation batches are not supported")
View Source
var ErrCrossShardTransactionNotSupported = errors.New("cross-shard transactions are not supported")
View Source
var ErrInvalidRequest = errors.New("invalid request")
View Source
var ErrLeaderNotFound = errors.New("leader not found")
View Source
var ErrNotImplemented = errors.New("not implemented")
View Source
var ErrUnknownRequestType = errors.New("unknown request type")

Functions

func EncodeTxnMeta

func EncodeTxnMeta(m TxnMeta) []byte

func ExtractTxnUserKey

func ExtractTxnUserKey(key []byte) []byte

ExtractTxnUserKey returns the logical user key embedded in a transaction- internal key such as !txn|lock|, !txn|cmt|, or !txn|meta|. It returns nil when the key does not use a transaction-internal namespace or is malformed.

func MaxLatestCommitTS

func MaxLatestCommitTS(ctx context.Context, st store.MVCCStore, keys [][]byte) (uint64, error)

MaxLatestCommitTS returns the maximum commit timestamp for the provided keys.

Missing keys are ignored. If any LatestCommitTS lookup returns an error, the error is returned to the caller.

func NewTxnLockedError

func NewTxnLockedError(key []byte) error

func NewTxnLockedErrorWithDetail

func NewTxnLockedErrorWithDetail(key []byte, detail string) error

func TxnLockedDetails

func TxnLockedDetails(err error) ([]byte, string, bool)

Types

type ActiveTimestampToken

type ActiveTimestampToken struct {
	// contains filtered or unexported fields
}

ActiveTimestampToken releases one tracked timestamp when the owning operation completes.

func (*ActiveTimestampToken) Release

func (t *ActiveTimestampToken) Release()

type ActiveTimestampTracker

type ActiveTimestampTracker struct {
	// contains filtered or unexported fields
}

ActiveTimestampTracker tracks in-flight read or transaction timestamps that must remain readable while background compaction is running.

func NewActiveTimestampTracker

func NewActiveTimestampTracker() *ActiveTimestampTracker

func (*ActiveTimestampTracker) Oldest

func (t *ActiveTimestampTracker) Oldest() uint64

func (*ActiveTimestampTracker) Pin

type Coordinate

type Coordinate struct {
	// contains filtered or unexported fields
}

func NewCoordinator

func NewCoordinator(txm Transactional, r *raft.Raft) *Coordinate

func (*Coordinate) Clock

func (c *Coordinate) Clock() *HLC

func (*Coordinate) Dispatch

func (c *Coordinate) Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)

func (*Coordinate) IsLeader

func (c *Coordinate) IsLeader() bool

func (*Coordinate) IsLeaderForKey

func (c *Coordinate) IsLeaderForKey(_ []byte) bool

func (*Coordinate) RaftLeader

func (c *Coordinate) RaftLeader() raft.ServerAddress

RaftLeader returns the current leader's address as known by this node.

func (*Coordinate) RaftLeaderForKey

func (c *Coordinate) RaftLeaderForKey(_ []byte) raft.ServerAddress

func (*Coordinate) VerifyLeader

func (c *Coordinate) VerifyLeader() error

func (*Coordinate) VerifyLeaderForKey

func (c *Coordinate) VerifyLeaderForKey(_ []byte) error

type CoordinateResponse

type CoordinateResponse struct {
	CommitIndex uint64
}

type Coordinator

type Coordinator interface {
	Dispatch(ctx context.Context, reqs *OperationGroup[OP]) (*CoordinateResponse, error)
	IsLeader() bool
	VerifyLeader() error
	RaftLeader() raft.ServerAddress
	IsLeaderForKey(key []byte) bool
	VerifyLeaderForKey(key []byte) error
	RaftLeaderForKey(key []byte) raft.ServerAddress
	Clock() *HLC
}

type Elem

type Elem[T OP] struct {
	Op    T
	Key   []byte
	Value []byte
}

Elem is an element of a transaction.

type FSM

type FSM interface {
	raft.FSM
}

func NewKvFSM

func NewKvFSM(store store.MVCCStore) FSM

type FSMCompactRuntime

type FSMCompactRuntime struct {
	GroupID uint64
	Raft    RaftStatsProvider
	Store   store.MVCCStore
}

type FSMCompactor

type FSMCompactor struct {
	// contains filtered or unexported fields
}

func NewFSMCompactor

func NewFSMCompactor(runtimes []FSMCompactRuntime, opts ...FSMCompactorOption) *FSMCompactor

func (*FSMCompactor) Run

func (c *FSMCompactor) Run(ctx context.Context) error

func (*FSMCompactor) SyncOnce

func (c *FSMCompactor) SyncOnce(ctx context.Context) error

type FSMCompactorOption

type FSMCompactorOption func(*FSMCompactor)

func WithFSMCompactorActiveTimestampTracker

func WithFSMCompactorActiveTimestampTracker(tracker *ActiveTimestampTracker) FSMCompactorOption

func WithFSMCompactorInterval

func WithFSMCompactorInterval(interval time.Duration) FSMCompactorOption

func WithFSMCompactorLogger

func WithFSMCompactorLogger(logger *slog.Logger) FSMCompactorOption

func WithFSMCompactorRetentionWindow

func WithFSMCompactorRetentionWindow(window time.Duration) FSMCompactorOption

func WithFSMCompactorTimeout

func WithFSMCompactorTimeout(timeout time.Duration) FSMCompactorOption

type GRPCConnCache

type GRPCConnCache struct {
	// contains filtered or unexported fields
}

GRPCConnCache reuses gRPC connections per address. gRPC itself handles reconnection on transient failures; we only force a re-dial if the conn has already been closed (Shutdown).

func (*GRPCConnCache) Close

func (c *GRPCConnCache) Close() error

func (*GRPCConnCache) ConnFor

func (c *GRPCConnCache) ConnFor(addr raft.ServerAddress) (*grpc.ClientConn, error)

type HLC

type HLC struct {
	// contains filtered or unexported fields
}

HLC implements a simple hybrid logical clock suitable for issuing monotonically increasing timestamps across shards/raft groups.

Layout (ms logical):

high 48 bits: wall clock milliseconds since Unix epoch
low 16 bits : logical counter to break ties when wall time does not advance

This keeps ordering stable across leaders as long as clocks are loosely synchronized; it avoids dependence on per-raft commit indices that diverge between shards.

func NewHLC

func NewHLC() *HLC

func (*HLC) Current

func (h *HLC) Current() uint64

Current returns the last issued or observed HLC value without advancing it. If no timestamp has been generated yet, it returns 0.

func (*HLC) Next

func (h *HLC) Next() uint64

Next returns the next hybrid logical timestamp.

func (*HLC) Observe

func (h *HLC) Observe(ts uint64)

Observe bumps the local clock if a higher timestamp is seen.

type LeaderProxy

type LeaderProxy struct {
	// contains filtered or unexported fields
}

LeaderProxy forwards transactional requests to the current raft leader when the local node is not the leader.

func NewLeaderProxy

func NewLeaderProxy(r *raft.Raft) *LeaderProxy

NewLeaderProxy creates a leader-aware transactional proxy for a raft group.

func (*LeaderProxy) Abort

func (p *LeaderProxy) Abort(reqs []*pb.Request) (*TransactionResponse, error)

func (*LeaderProxy) Close

func (p *LeaderProxy) Close() error

func (*LeaderProxy) Commit

func (p *LeaderProxy) Commit(reqs []*pb.Request) (*TransactionResponse, error)

type LeaderRoutedStore

type LeaderRoutedStore struct {
	// contains filtered or unexported fields
}

LeaderRoutedStore is an MVCCStore wrapper that serves reads from the local store only when leadership is verified; otherwise it proxies reads to the current leader via gRPC.

This is intended for single-raft-group deployments where the underlying store itself is not leader-aware (e.g. *store.MVCCStore).

Writes and maintenance operations are delegated to the local store.

func NewLeaderRoutedStore

func NewLeaderRoutedStore(local store.MVCCStore, coordinator Coordinator) *LeaderRoutedStore

func (*LeaderRoutedStore) ApplyMutations

func (s *LeaderRoutedStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, startTS, commitTS uint64) error

func (*LeaderRoutedStore) Close

func (s *LeaderRoutedStore) Close() error

func (*LeaderRoutedStore) Compact

func (s *LeaderRoutedStore) Compact(ctx context.Context, minTS uint64) error

func (*LeaderRoutedStore) DeleteAt

func (s *LeaderRoutedStore) DeleteAt(ctx context.Context, key []byte, commitTS uint64) error

func (*LeaderRoutedStore) ExistsAt

func (s *LeaderRoutedStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, error)

func (*LeaderRoutedStore) ExpireAt

func (s *LeaderRoutedStore) ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS uint64) error

func (*LeaderRoutedStore) GetAt

func (s *LeaderRoutedStore) GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error)

func (*LeaderRoutedStore) LastCommitTS

func (s *LeaderRoutedStore) LastCommitTS() uint64

func (*LeaderRoutedStore) LatestCommitTS

func (s *LeaderRoutedStore) LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error)

func (*LeaderRoutedStore) PutAt

func (s *LeaderRoutedStore) PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error

func (*LeaderRoutedStore) PutWithTTLAt

func (s *LeaderRoutedStore) PutWithTTLAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error

func (*LeaderRoutedStore) Restore

func (s *LeaderRoutedStore) Restore(buf io.Reader) error

func (*LeaderRoutedStore) ReverseScanAt

func (s *LeaderRoutedStore) ReverseScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)

func (*LeaderRoutedStore) ScanAt

func (s *LeaderRoutedStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)

func (*LeaderRoutedStore) Snapshot

func (s *LeaderRoutedStore) Snapshot() (store.Snapshot, error)

type OP

type OP int

OP is an operation type.

const (
	Put OP = iota
	Del
)

Operation types.

type OperationGroup

type OperationGroup[T OP] struct {
	Elems []*Elem[T]
	IsTxn bool
	// StartTS is a logical timestamp captured at transaction begin.
	// It is ignored for non-transactional groups.
	StartTS uint64
	// CommitTS optionally pins the transaction commit timestamp.
	// Coordinators choose one automatically when this is zero.
	CommitTS uint64
}

OperationGroup is a group of operations that should be executed atomically.

type RaftStatsProvider

type RaftStatsProvider interface {
	Stats() map[string]string
}

type ShardGroup

type ShardGroup struct {
	Raft  *raft.Raft
	Store store.MVCCStore
	Txn   Transactional
}

type ShardRouter

type ShardRouter struct {
	// contains filtered or unexported fields
}

ShardRouter routes requests to multiple raft groups based on key ranges.

Cross-shard transactions are not supported. They require distributed coordination (for example, 2PC) to ensure atomicity.

Non-transactional request batches may still partially succeed across shards.

func NewShardRouter

func NewShardRouter(e *distribution.Engine) *ShardRouter

NewShardRouter creates a new router.

func (*ShardRouter) Abort

func (s *ShardRouter) Abort(reqs []*pb.Request) (*TransactionResponse, error)

Abort dispatches aborts to the correct raft group.

func (*ShardRouter) Commit

func (s *ShardRouter) Commit(reqs []*pb.Request) (*TransactionResponse, error)

func (*ShardRouter) Get

func (s *ShardRouter) Get(ctx context.Context, key []byte) ([]byte, error)

Get retrieves a key routed to the correct shard.

func (*ShardRouter) Register

func (s *ShardRouter) Register(group uint64, tm Transactional, st store.MVCCStore)

Register associates a raft group ID with its transactional manager and store.

type ShardStore

type ShardStore struct {
	// contains filtered or unexported fields
}

ShardStore routes MVCC reads to shard-specific stores and proxies to leaders when needed.

func NewShardStore

func NewShardStore(engine *distribution.Engine, groups map[uint64]*ShardGroup) *ShardStore

NewShardStore creates a sharded MVCC store wrapper.

func (*ShardStore) ApplyMutations

func (s *ShardStore) ApplyMutations(ctx context.Context, mutations []*store.KVPairMutation, startTS, commitTS uint64) error

ApplyMutations applies a batch of mutations to the correct shard store.

All mutations must belong to the same shard. Cross-shard mutation batches are not supported.

func (*ShardStore) Close

func (s *ShardStore) Close() error

func (*ShardStore) Compact

func (s *ShardStore) Compact(ctx context.Context, minTS uint64) error

func (*ShardStore) DeleteAt

func (s *ShardStore) DeleteAt(ctx context.Context, key []byte, commitTS uint64) error

func (*ShardStore) ExistsAt

func (s *ShardStore) ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, error)

func (*ShardStore) ExpireAt

func (s *ShardStore) ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS uint64) error

func (*ShardStore) GetAt

func (s *ShardStore) GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error)

func (*ShardStore) LastCommitTS

func (s *ShardStore) LastCommitTS() uint64

func (*ShardStore) LatestCommitTS

func (s *ShardStore) LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error)

func (*ShardStore) PutAt

func (s *ShardStore) PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error

func (*ShardStore) PutWithTTLAt

func (s *ShardStore) PutWithTTLAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error

func (*ShardStore) Restore

func (s *ShardStore) Restore(_ io.Reader) error

func (*ShardStore) ReverseScanAt

func (s *ShardStore) ReverseScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)

func (*ShardStore) ScanAt

func (s *ShardStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*store.KVPair, error)

func (*ShardStore) Snapshot

func (s *ShardStore) Snapshot() (store.Snapshot, error)

type ShardedCoordinator

type ShardedCoordinator struct {
	// contains filtered or unexported fields
}

ShardedCoordinator routes operations to shard-specific raft groups. It issues timestamps via a shared HLC and uses ShardRouter to dispatch.

func NewShardedCoordinator

func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*ShardGroup, defaultGroup uint64, clock *HLC, st store.MVCCStore) *ShardedCoordinator

NewShardedCoordinator builds a coordinator for the provided shard groups. The defaultGroup is used for non-keyed leader checks.

func (*ShardedCoordinator) Clock

func (c *ShardedCoordinator) Clock() *HLC

func (*ShardedCoordinator) Dispatch

func (*ShardedCoordinator) IsLeader

func (c *ShardedCoordinator) IsLeader() bool

func (*ShardedCoordinator) IsLeaderForKey

func (c *ShardedCoordinator) IsLeaderForKey(key []byte) bool

func (*ShardedCoordinator) RaftLeader

func (c *ShardedCoordinator) RaftLeader() raft.ServerAddress

func (*ShardedCoordinator) RaftLeaderForKey

func (c *ShardedCoordinator) RaftLeaderForKey(key []byte) raft.ServerAddress

func (*ShardedCoordinator) VerifyLeader

func (c *ShardedCoordinator) VerifyLeader() error

func (*ShardedCoordinator) VerifyLeaderForKey

func (c *ShardedCoordinator) VerifyLeaderForKey(key []byte) error

type TransactionManager

type TransactionManager struct {
	// contains filtered or unexported fields
}

func NewTransaction

func NewTransaction(raft *raft.Raft) *TransactionManager

func (*TransactionManager) Abort

func (t *TransactionManager) Abort(reqs []*pb.Request) (*TransactionResponse, error)

func (*TransactionManager) Close

func (t *TransactionManager) Close()

Close signals the TransactionManager to stop and drains any pending raw commit items, sending each an error so callers are not blocked forever.

func (*TransactionManager) Commit

func (t *TransactionManager) Commit(reqs []*pb.Request) (*TransactionResponse, error)

type TransactionResponse

type TransactionResponse struct {
	CommitIndex uint64
}

type Transactional

type Transactional interface {
	Commit(reqs []*pb.Request) (*TransactionResponse, error)
	Abort(reqs []*pb.Request) (*TransactionResponse, error)
}

type TxnLockedError

type TxnLockedError struct {
	// contains filtered or unexported fields
}

func (*TxnLockedError) Error

func (e *TxnLockedError) Error() string

func (*TxnLockedError) Unwrap

func (e *TxnLockedError) Unwrap() error

type TxnMeta

type TxnMeta struct {
	PrimaryKey []byte
	LockTTLms  uint64
	CommitTS   uint64
}

TxnMeta is embedded into transactional raft log requests via a synthetic mutation (key prefix "!txn|meta|"). It is not persisted in the MVCC store.

func DecodeTxnMeta

func DecodeTxnMeta(b []byte) (TxnMeta, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL