store

package
v0.0.0-...-07ff07d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: AGPL-3.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	HashMetaPrefix      = "!hs|meta|"
	HashFieldPrefix     = "!hs|fld|"
	HashMetaDeltaPrefix = "!hs|meta|d|"
)

Hash wide-column key layout:

Base Metadata: !hs|meta|<userKeyLen(4)><userKey>               → [Len(8)]
Field Key:     !hs|fld|<userKeyLen(4)><userKey><fieldName>     → field value bytes
Delta Key:     !hs|meta|d|<userKeyLen(4)><userKey><commitTS(8)><seqInTxn(4)> → [LenDelta(8)]
View Source
const (
	// ListMetaDeltaPrefix is the prefix for all list metadata delta keys.
	// Layout: !lst|meta|d|<userKeyLen(4)><userKey><commitTS(8)><seqInTxn(4)>
	ListMetaDeltaPrefix = "!lst|meta|d|"

	// ListClaimPrefix is the prefix for list claim keys used by POP operations.
	// Layout: !lst|claim|<userKeyLen(4)><userKey><seq(8-byte sortable)>
	ListClaimPrefix = "!lst|claim|"

	// MaxDeltaScanLimit is the hard limit on delta scan results per read.
	// If a key accumulates more than this many uncompacted deltas, reads
	// return ErrDeltaScanTruncated until background compaction catches up.
	// Set high enough to tolerate burst traffic between compaction ticks
	// (default 30 s) without sacrificing read availability on hot keys.
	MaxDeltaScanLimit = 1024
)

Delta/Claim key constants.

View Source
const (
	ListMetaPrefix = "!lst|meta|"
	ListItemPrefix = "!lst|itm|"
)
View Source
const (
	SetMetaPrefix      = "!st|meta|"
	SetMemberPrefix    = "!st|mem|"
	SetMetaDeltaPrefix = "!st|meta|d|"
)

Set wide-column key layout:

Base Metadata: !st|meta|<userKeyLen(4)><userKey>                 → [Len(8)]
Member Key:    !st|mem|<userKeyLen(4)><userKey><member>           → (empty value)
Delta Key:     !st|meta|d|<userKeyLen(4)><userKey><commitTS(8)><seqInTxn(4)> → [LenDelta(8)]
View Source
const (
	StreamMetaPrefix  = "!stream|meta|"
	StreamEntryPrefix = "!stream|entry|"

	// StreamIDBytes is the fixed size of the binary StreamID suffix on an entry key:
	// 8 bytes big-endian ms || 8 bytes big-endian seq. Big-endian so lex order
	// over the raw key bytes matches the (ms, seq) numeric order used by XADD / XRANGE.
	StreamIDBytes = 16
)

Stream wide-column key layout:

Meta:  !stream|meta|<userKeyLen(4)><userKey>        → [Len(8)][LastMs(8)][LastSeq(8)]
Entry: !stream|entry|<userKeyLen(4)><userKey><StreamID(16)>
View Source
const (
	WriteConflictKindWrite = "write"
	WriteConflictKindRead  = "read"
)

Write-conflict classification labels. These are emitted as the "key_prefix" Prometheus label on elastickv_store_write_conflict_total so operators can tell WHICH namespace is producing OCC conflicts. The label set is deliberately bounded and stable — adding a new bucket is a conscious schema change, not a cardinality surprise.

Buckets are aligned with the internal key prefixes declared across the codebase:

!txn|lock|, !txn|int|, !txn|cmt|, !txn|rb|  (kv/txn_keys.go)
!redis|str|, !redis|ttl|, !redis|hll|       (adapter/redis_compat_types.go)
!hs|, !st|, !zs|, !lst|                     (store/*_helpers.go)
!ddb|                                       (kv/shard_key.go)
View Source
const (
	ZSetMetaPrefix      = "!zs|meta|"
	ZSetMemberPrefix    = "!zs|mem|"
	ZSetScorePrefix     = "!zs|scr|"
	ZSetMetaDeltaPrefix = "!zs|meta|d|"
)

ZSet wide-column key layout:

Base Metadata: !zs|meta|<userKeyLen(4)><userKey>                               → [Len(8)]
Member Key:    !zs|mem|<userKeyLen(4)><userKey><member>                         → [Score(8)] IEEE 754
Score Index:   !zs|scr|<userKeyLen(4)><userKey><sortableScore(8)><member>       → (empty)
Delta Key:     !zs|meta|d|<userKeyLen(4)><userKey><commitTS(8)><seqInTxn(4)>   → [LenDelta(8)]

Variables

View Source
var ErrExpired = errors.New("expired")
View Source
var ErrInvalidChecksum = errors.New("invalid checksum")
View Source
var ErrKeyNotFound = errors.New("not found")
View Source
var ErrNotSupported = errors.New("not supported")
View Source
var ErrReadTSCompacted = errors.New("read timestamp has been compacted")
View Source
var ErrSnapshotKeyTooLarge = errors.New("mvcc snapshot key too large")
View Source
var ErrSnapshotVersionCountTooLarge = errors.New("mvcc snapshot version count too large")
View Source
var ErrUnknownOp = errors.New("unknown op")
View Source
var ErrValueTooLarge = errors.New("value too large")
View Source
var ErrWriteConflict = errors.New("write conflict")
View Source
var Tombstone = []byte{0x00}

Functions

func DecodeSortableFloat64

func DecodeSortableFloat64(b [8]byte) float64

DecodeSortableFloat64 decodes a sortable 8-byte representation back to float64.

func DecodeStreamID

func DecodeStreamID(b []byte) (ms, seq uint64, ok bool)

DecodeStreamID parses a 16-byte big-endian ms||seq tuple. Returns false if the slice length is wrong.

func DecodeWriteConflictLabel

func DecodeWriteConflictLabel(label string) (kind, keyClass string, ok bool)

DecodeWriteConflictLabel splits a flat label back into (kind, keyClass). Returns ok=false for malformed input so collector code can skip rather than emit bogus series.

func EncodeSortableFloat64

func EncodeSortableFloat64(f float64) [8]byte

EncodeSortableFloat64 encodes a float64 into a sortable 8-byte representation. For positive floats: XOR the sign bit to make them sort above negative. For negative floats: XOR all bits to reverse the order. This produces a byte sequence that sorts correctly with standard byte comparison.

func EncodeStreamID

func EncodeStreamID(ms, seq uint64) []byte

EncodeStreamID writes a StreamID as 16 big-endian bytes: ms || seq.

func EncodeWriteConflictLabel

func EncodeWriteConflictLabel(kind, keyClass string) string

EncodeWriteConflictLabel joins a (kind, keyClass) tuple into the flat map key used by WriteConflictCountsByPrefix snapshots. Exposed so the monitoring collector can build the same string without re-implementing the convention.

func ExtractHashFieldName

func ExtractHashFieldName(key, userKey []byte) []byte

ExtractHashFieldName extracts the field name from a hash field key.

func ExtractHashUserKeyFromDelta

func ExtractHashUserKeyFromDelta(key []byte) []byte

ExtractHashUserKeyFromDelta extracts the logical user key from a hash delta key.

func ExtractHashUserKeyFromField

func ExtractHashUserKeyFromField(key []byte) []byte

ExtractHashUserKeyFromField extracts the logical user key from a hash field key.

func ExtractHashUserKeyFromMeta

func ExtractHashUserKeyFromMeta(key []byte) []byte

ExtractHashUserKeyFromMeta extracts the logical user key from a hash meta key.

func ExtractListItemSeq

func ExtractListItemSeq(itemKey, userKey []byte) (int64, bool)

ExtractListItemSeq extracts the sequence number encoded in a list item key. Returns (seq, true) on success; (0, false) if key is not a valid item key for userKey.

func ExtractListUserKey

func ExtractListUserKey(key []byte) []byte

ExtractListUserKey returns the logical user key from a list meta or item key. If the key is not a list key, it returns nil.

func ExtractListUserKeyFromClaim

func ExtractListUserKeyFromClaim(key []byte) []byte

ExtractListUserKeyFromClaim extracts the logical user key from a list claim key.

func ExtractListUserKeyFromDelta

func ExtractListUserKeyFromDelta(key []byte) []byte

ExtractListUserKeyFromDelta extracts the logical user key from a list delta key.

func ExtractSetMemberName

func ExtractSetMemberName(key, userKey []byte) []byte

ExtractSetMemberName extracts the member name from a set member key.

func ExtractSetUserKeyFromDelta

func ExtractSetUserKeyFromDelta(key []byte) []byte

ExtractSetUserKeyFromDelta extracts the logical user key from a set delta key.

func ExtractSetUserKeyFromMember

func ExtractSetUserKeyFromMember(key []byte) []byte

ExtractSetUserKeyFromMember extracts the logical user key from a set member key.

func ExtractSetUserKeyFromMeta

func ExtractSetUserKeyFromMeta(key []byte) []byte

ExtractSetUserKeyFromMeta extracts the logical user key from a set meta key.

func ExtractStreamEntryID

func ExtractStreamEntryID(entryKey, userKey []byte) (ms, seq uint64, ok bool)

ExtractStreamEntryID extracts (ms, seq) from a stream entry key. Returns false if the key is not a valid entry key for the given userKey.

func ExtractStreamUserKeyFromEntry

func ExtractStreamUserKeyFromEntry(key []byte) []byte

ExtractStreamUserKeyFromEntry extracts the logical user key from a stream entry key.

See ExtractStreamUserKeyFromMeta for the rationale of the uint64 bounds check; the entry variant additionally has to account for the trailing StreamIDBytes (16 bytes) suffix.

func ExtractStreamUserKeyFromMeta

func ExtractStreamUserKeyFromMeta(key []byte) []byte

ExtractStreamUserKeyFromMeta extracts the logical user key from a stream meta key.

The bounds check is done in uint64 so a corrupted length prefix near math.MaxUint32 cannot wrap (uint32(wideColKeyLenSize)+ukLen) and pass a false negative, which would then panic on the trimmed[lo:hi] slice below.

func ExtractZSetMemberName

func ExtractZSetMemberName(key, userKey []byte) []byte

ExtractZSetMemberName extracts the member name from a zset member key.

func ExtractZSetScoreAndMember

func ExtractZSetScoreAndMember(key, userKey []byte) (score float64, member []byte, ok bool)

ExtractZSetScoreAndMember extracts the score and member name from a zset score index key.

func ExtractZSetUserKeyFromDelta

func ExtractZSetUserKeyFromDelta(key []byte) []byte

ExtractZSetUserKeyFromDelta extracts the logical user key from a zset delta key.

func ExtractZSetUserKeyFromMember

func ExtractZSetUserKeyFromMember(key []byte) []byte

ExtractZSetUserKeyFromMember extracts the logical user key from a zset member key.

func ExtractZSetUserKeyFromMeta

func ExtractZSetUserKeyFromMeta(key []byte) []byte

ExtractZSetUserKeyFromMeta extracts the logical user key from a zset meta key.

func ExtractZSetUserKeyFromScore

func ExtractZSetUserKeyFromScore(key []byte) []byte

ExtractZSetUserKeyFromScore extracts the logical user key from a zset score index key.

func HashFieldKey

func HashFieldKey(userKey, fieldName []byte) []byte

HashFieldKey builds the per-field key for a hash field.

func HashFieldScanPrefix

func HashFieldScanPrefix(userKey []byte) []byte

HashFieldScanPrefix returns the prefix to scan all fields of a hash.

func HashMetaDeltaKey

func HashMetaDeltaKey(userKey []byte, commitTS uint64, seqInTxn uint32) []byte

HashMetaDeltaKey builds the delta key for a hash metadata change.

func HashMetaDeltaScanPrefix

func HashMetaDeltaScanPrefix(userKey []byte) []byte

HashMetaDeltaScanPrefix returns the prefix to scan all delta keys for a hash.

func HashMetaKey

func HashMetaKey(userKey []byte) []byte

HashMetaKey builds the metadata key for a hash.

func IsHashFieldKey

func IsHashFieldKey(key []byte) bool

IsHashFieldKey reports whether the key is a hash field key.

func IsHashMetaDeltaKey

func IsHashMetaDeltaKey(key []byte) bool

IsHashMetaDeltaKey reports whether the key is a hash metadata delta key.

func IsHashMetaKey

func IsHashMetaKey(key []byte) bool

IsHashMetaKey reports whether the key is a hash metadata key.

func IsListClaimKey

func IsListClaimKey(key []byte) bool

IsListClaimKey reports whether the key is a list claim key.

func IsListItemKey

func IsListItemKey(key []byte) bool

func IsListMetaDeltaKey

func IsListMetaDeltaKey(key []byte) bool

IsListMetaDeltaKey reports whether the key is a list metadata delta key.

func IsListMetaKey

func IsListMetaKey(key []byte) bool

IsListMetaKey Exported helpers for other packages (e.g., Redis adapter).

func IsSetMemberKey

func IsSetMemberKey(key []byte) bool

IsSetMemberKey reports whether the key is a set member key.

func IsSetMetaDeltaKey

func IsSetMetaDeltaKey(key []byte) bool

IsSetMetaDeltaKey reports whether the key is a set metadata delta key.

func IsSetMetaKey

func IsSetMetaKey(key []byte) bool

IsSetMetaKey reports whether the key is a set metadata key.

func IsStreamEntryKey

func IsStreamEntryKey(key []byte) bool

IsStreamEntryKey reports whether the key is a stream entry key.

func IsStreamMetaKey

func IsStreamMetaKey(key []byte) bool

IsStreamMetaKey reports whether the key is a stream metadata key.

func IsZSetMemberKey

func IsZSetMemberKey(key []byte) bool

IsZSetMemberKey reports whether the key is a sorted set member key.

func IsZSetMetaDeltaKey

func IsZSetMetaDeltaKey(key []byte) bool

IsZSetMetaDeltaKey reports whether the key is a sorted set metadata delta key.

func IsZSetMetaKey

func IsZSetMetaKey(key []byte) bool

IsZSetMetaKey reports whether the key is a sorted set metadata key.

func IsZSetScoreKey

func IsZSetScoreKey(key []byte) bool

IsZSetScoreKey reports whether the key is a sorted set score index key.

func ListClaimKey

func ListClaimKey(userKey []byte, seq int64) []byte

ListClaimKey builds the claim key for a list item at the given sequence number.

func ListClaimScanPrefix

func ListClaimScanPrefix(userKey []byte) []byte

ListClaimScanPrefix returns the prefix used to scan all claim keys for a userKey.

func ListItemKey

func ListItemKey(userKey []byte, seq int64) []byte

ListItemKey builds the item key for a user key and sequence number.

func ListMetaDeltaKey

func ListMetaDeltaKey(userKey []byte, commitTS uint64, seqInTxn uint32) []byte

ListMetaDeltaKey builds the delta key for a list: prefix + 4-byte userKeyLen + userKey + 8-byte commitTS + 4-byte seqInTxn.

func ListMetaDeltaScanPrefix

func ListMetaDeltaScanPrefix(userKey []byte) []byte

ListMetaDeltaScanPrefix returns the prefix used to scan all delta keys for a userKey.

func ListMetaKey

func ListMetaKey(userKey []byte) []byte

ListMetaKey builds the metadata key for a user key.

func MarshalHashMeta

func MarshalHashMeta(m HashMeta) []byte

MarshalHashMeta encodes HashMeta into a fixed 8-byte binary format.

func MarshalHashMetaDelta

func MarshalHashMetaDelta(d HashMetaDelta) []byte

MarshalHashMetaDelta encodes HashMetaDelta into a fixed 8-byte binary format.

func MarshalListMeta

func MarshalListMeta(meta ListMeta) ([]byte, error)

MarshalListMeta encodes ListMeta into a fixed 24-byte binary format.

func MarshalListMetaDelta

func MarshalListMetaDelta(d ListMetaDelta) []byte

MarshalListMetaDelta encodes a ListMetaDelta into a fixed 16-byte binary format.

func MarshalSetMeta

func MarshalSetMeta(m SetMeta) []byte

MarshalSetMeta encodes SetMeta into a fixed 8-byte binary format.

func MarshalSetMetaDelta

func MarshalSetMetaDelta(d SetMetaDelta) []byte

MarshalSetMetaDelta encodes SetMetaDelta into a fixed 8-byte binary format.

func MarshalStreamMeta

func MarshalStreamMeta(m StreamMeta) ([]byte, error)

MarshalStreamMeta encodes StreamMeta into a fixed 24-byte binary format.

func MarshalZSetMeta

func MarshalZSetMeta(m ZSetMeta) []byte

MarshalZSetMeta encodes ZSetMeta into a fixed 8-byte binary format.

func MarshalZSetMetaDelta

func MarshalZSetMetaDelta(d ZSetMetaDelta) []byte

MarshalZSetMetaDelta encodes ZSetMetaDelta into a fixed 8-byte binary format.

func MarshalZSetScore

func MarshalZSetScore(score float64) []byte

MarshalZSetScore encodes a float64 score in IEEE 754 big-endian format.

func NewWriteConflictError

func NewWriteConflictError(key []byte) error

func PrefixScanEnd

func PrefixScanEnd(prefix []byte) []byte

PrefixScanEnd returns the exclusive end key for a prefix scan. It increments the last byte of the prefix; if overflow occurs (all 0xFF), it returns a nil slice which callers must interpret as "scan to end of keyspace".

func SetMemberKey

func SetMemberKey(userKey, member []byte) []byte

SetMemberKey builds the per-member key for a set member.

func SetMemberScanPrefix

func SetMemberScanPrefix(userKey []byte) []byte

SetMemberScanPrefix returns the prefix to scan all members of a set.

func SetMetaDeltaKey

func SetMetaDeltaKey(userKey []byte, commitTS uint64, seqInTxn uint32) []byte

SetMetaDeltaKey builds the delta key for a set metadata change.

func SetMetaDeltaScanPrefix

func SetMetaDeltaScanPrefix(userKey []byte) []byte

SetMetaDeltaScanPrefix returns the prefix to scan all delta keys for a set.

func SetMetaKey

func SetMetaKey(userKey []byte) []byte

SetMetaKey builds the metadata key for a set.

func StreamEntryKey

func StreamEntryKey(userKey []byte, ms, seq uint64) []byte

StreamEntryKey builds the per-entry key for a stream.

func StreamEntryScanPrefix

func StreamEntryScanPrefix(userKey []byte) []byte

StreamEntryScanPrefix returns the common prefix for every entry belonging to userKey, used as the [start, end) prefix for XRANGE / XREAD scans.

func StreamMetaKey

func StreamMetaKey(userKey []byte) []byte

StreamMetaKey builds the per-stream metadata key.

func UnmarshalZSetScore

func UnmarshalZSetScore(b []byte) (float64, error)

UnmarshalZSetScore decodes a float64 score from IEEE 754 big-endian format.

func WriteConflictKey

func WriteConflictKey(err error) ([]byte, bool)

func ZSetMemberKey

func ZSetMemberKey(userKey, member []byte) []byte

ZSetMemberKey builds the per-member key storing the score for a sorted set.

func ZSetMemberScanPrefix

func ZSetMemberScanPrefix(userKey []byte) []byte

ZSetMemberScanPrefix returns the prefix to scan all members of a sorted set.

func ZSetMetaDeltaKey

func ZSetMetaDeltaKey(userKey []byte, commitTS uint64, seqInTxn uint32) []byte

ZSetMetaDeltaKey builds the delta key for a sorted set metadata change.

func ZSetMetaDeltaScanPrefix

func ZSetMetaDeltaScanPrefix(userKey []byte) []byte

ZSetMetaDeltaScanPrefix returns the prefix to scan all delta keys for a sorted set.

func ZSetMetaKey

func ZSetMetaKey(userKey []byte) []byte

ZSetMetaKey builds the metadata key for a sorted set.

func ZSetScoreKey

func ZSetScoreKey(userKey []byte, score float64, member []byte) []byte

ZSetScoreKey builds the score index key for a sorted set entry. Layout: !zs|scr|<userKeyLen(4)><userKey><sortableScore(8)><member>

func ZSetScoreRangeScanPrefix

func ZSetScoreRangeScanPrefix(userKey []byte, score float64) []byte

ZSetScoreRangeScanPrefix returns the prefix for scanning scores in [minScore, maxScore].

func ZSetScoreScanPrefix

func ZSetScoreScanPrefix(userKey []byte) []byte

ZSetScoreScanPrefix returns the prefix to scan all score index keys for a sorted set.

Types

type HashMeta

type HashMeta struct {
	Len int64
}

HashMeta is the base metadata for a hash collection.

func UnmarshalHashMeta

func UnmarshalHashMeta(b []byte) (HashMeta, error)

UnmarshalHashMeta decodes HashMeta from the fixed 8-byte binary format.

type HashMetaDelta

type HashMetaDelta struct {
	LenDelta int64
}

HashMetaDelta holds a signed change in field count.

func UnmarshalHashMetaDelta

func UnmarshalHashMetaDelta(b []byte) (HashMetaDelta, error)

UnmarshalHashMetaDelta decodes HashMetaDelta from the fixed 8-byte binary format.

type HybridClock

type HybridClock interface {
	Now() uint64
}

HybridClock provides monotonically increasing timestamps (HLC).

type KVPair

type KVPair struct {
	Key   []byte
	Value []byte
}

type KVPairMutation

type KVPairMutation struct {
	Op    OpType
	Key   []byte
	Value []byte
	// ExpireAt is an HLC timestamp; 0 means no TTL.
	ExpireAt uint64
}

KVPairMutation is a small helper struct for MVCC mutation application.

type ListMeta

type ListMeta struct {
	Head int64 `json:"h"`
	Tail int64 `json:"t"`
	Len  int64 `json:"l"`
}

func UnmarshalListMeta

func UnmarshalListMeta(b []byte) (ListMeta, error)

UnmarshalListMeta decodes ListMeta from the fixed 24-byte binary format.

type ListMetaDelta

type ListMetaDelta struct {
	HeadDelta int64
	LenDelta  int64
}

ListMetaDelta holds the signed deltas applied by a single PUSH/POP operation.

func UnmarshalListMetaDelta

func UnmarshalListMetaDelta(b []byte) (ListMetaDelta, error)

UnmarshalListMetaDelta decodes a ListMetaDelta from the fixed 16-byte binary format.

type MVCCStore

type MVCCStore interface {
	// GetAt returns the newest version whose commit timestamp is <= ts.
	GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error)
	// ExistsAt reports whether a visible, non-tombstone version exists at ts.
	ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, error)
	// ScanAt returns versions visible at the given timestamp.
	ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*KVPair, error)
	// ReverseScanAt returns visible versions in descending key order for keys in [start, end).
	ReverseScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*KVPair, error)
	// PutAt commits a value at the provided commit timestamp and optional expireAt.
	PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error
	// DeleteAt commits a tombstone at the provided commit timestamp.
	DeleteAt(ctx context.Context, key []byte, commitTS uint64) error
	// PutWithTTLAt stores a value with a precomputed expireAt (HLC) at the given commit timestamp.
	PutWithTTLAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error
	// ExpireAt sets/renews TTL using a precomputed expireAt (HLC) at the given commit timestamp.
	ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS uint64) error
	// LatestCommitTS returns the commit timestamp of the newest version.
	// The boolean reports whether the key has any version.
	LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error)
	// ApplyMutations atomically validates and appends the provided mutations.
	// It must return ErrWriteConflict if any mutation key or any read key has
	// a newer commit timestamp than startTS. readKeys carries the transaction's
	// read set for read-write conflict detection; pass nil when no read set
	// validation is needed.
	//
	// Isolation guarantees vary by transaction topology:
	//
	//   Single-shard transactions: readKeys are included in the Raft log entry
	//   and validated atomically under the FSM's applyMu lock alongside
	//   write-write conflict detection. The adapter's pre-Raft validateReadSet
	//   call is kept as a fast-fail optimization but the FSM check is
	//   authoritative. No TOCTOU window; full SSI.
	//
	//   Multi-shard (2PC) write shards: readKeys are included in the
	//   PREPARE Raft entry and validated atomically under the FSM's applyMu
	//   lock. No TOCTOU window; full SSI.
	//
	//   Multi-shard (2PC) read-only shards: validated via a linearizable
	//   read barrier followed by LatestCommitTS outside the FSM lock. A
	//   small TOCTOU window exists between the barrier and the check.
	ApplyMutations(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error
	// ApplyMutationsRaft is the raft-apply variant of ApplyMutations. It
	// carries identical MVCC semantics but is governed by the FSM-commit
	// sync-mode knob (ELASTICKV_FSM_SYNC_MODE). Callers MUST only use this
	// when the write is part of a raft-log apply — the raft WAL is the
	// durability backstop that makes an un-fsynced Pebble write safe.
	//
	// Direct (non-raft) callers (catalog bootstrap, admin snapshots,
	// migrations, tests) must use ApplyMutations, which is always
	// pebble.Sync and therefore safe without raft-log replay.
	ApplyMutationsRaft(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error
	// DeletePrefixAt atomically deletes all visible (non-tombstone, non-expired)
	// keys matching prefix at commitTS by writing tombstone versions. An empty
	// prefix means "all keys". Keys matching excludePrefix are preserved.
	// No conflict checking is performed; this is intended for bulk operations
	// such as FLUSHALL where the caller knows no conflict check is needed.
	DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
	// DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt with
	// the same durability contract as ApplyMutationsRaft.
	DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
	// LastCommitTS returns the highest commit timestamp applied on this node.
	LastCommitTS() uint64
	// WriteConflictCountsByPrefix returns a snapshot of the MVCC
	// write-conflict counters keyed by "<kind>|<key_prefix>" where
	// kind is "read" or "write" and key_prefix is a bounded
	// classification of the conflicting key. The map is a copy; the
	// caller may mutate it freely. Implementations that do not track
	// conflicts may return an empty (non-nil) map.
	WriteConflictCountsByPrefix() map[string]uint64
	// Compact removes versions older than minTS that are no longer needed.
	Compact(ctx context.Context, minTS uint64) error
	Snapshot() (Snapshot, error)
	Restore(buf io.Reader) error
	Close() error
}

MVCCStore extends Store with multi-version concurrency control helpers. The interface is timestamp-explicit; callers must supply the snapshot or commit timestamp for every operation.

func NewMVCCStore

func NewMVCCStore(opts ...MVCCStoreOption) MVCCStore

NewMVCCStore creates a new MVCC-enabled in-memory store.

func NewPebbleStore

func NewPebbleStore(dir string, opts ...PebbleStoreOption) (MVCCStore, error)

NewPebbleStore creates a new Pebble-backed MVCC store.

type MVCCStoreOption

type MVCCStoreOption func(*mvccStore)

MVCCStoreOption configures the MVCCStore.

func WithLogger

func WithLogger(l *slog.Logger) MVCCStoreOption

WithLogger sets a custom logger for the store.

type OpType

type OpType int

OpType describes a mutation kind.

const (
	OpTypePut OpType = iota
	OpTypeDelete
)

type PebbleStoreOption

type PebbleStoreOption func(*pebbleStore)

PebbleStoreOption configures the PebbleStore.

func WithPebbleLogger

func WithPebbleLogger(l *slog.Logger) PebbleStoreOption

WithPebbleLogger sets a custom logger.

type RetentionController

type RetentionController interface {
	MinRetainedTS() uint64
	SetMinRetainedTS(ts uint64)
}

RetentionController exposes the minimum timestamp still retained by a store after MVCC compaction. Reads older than this watermark may fail with ErrReadTSCompacted.

type SetMeta

type SetMeta struct {
	Len int64
}

SetMeta is the base metadata for a set collection.

func UnmarshalSetMeta

func UnmarshalSetMeta(b []byte) (SetMeta, error)

UnmarshalSetMeta decodes SetMeta from the fixed 8-byte binary format.

type SetMetaDelta

type SetMetaDelta struct {
	LenDelta int64
}

SetMetaDelta holds a signed change in member count.

func UnmarshalSetMetaDelta

func UnmarshalSetMetaDelta(b []byte) (SetMetaDelta, error)

UnmarshalSetMetaDelta decodes SetMetaDelta from the fixed 8-byte binary format.

type Snapshot

type Snapshot interface {
	io.WriterTo
	io.Closer
}

Snapshot streams a consistent point-in-time store image to a writer. Implementations may back this with a temp file or an engine-native snapshot.

type StreamMeta

type StreamMeta struct {
	Length  int64
	LastMs  uint64
	LastSeq uint64
}

StreamMeta is the per-stream metadata. Length is authoritative for XLEN; LastMs/LastSeq track the highest ID ever appended so XADD '*' stays strictly monotonic even after XTRIM removes the current tail.

func UnmarshalStreamMeta

func UnmarshalStreamMeta(b []byte) (StreamMeta, error)

UnmarshalStreamMeta decodes StreamMeta from the fixed 24-byte binary format.

type VersionedValue

type VersionedValue struct {
	TS        uint64
	Value     []byte
	Tombstone bool
	ExpireAt  uint64 // HLC timestamp; 0 means no TTL
}

VersionedValue represents a single committed version in MVCC storage.

type WriteConflictError

type WriteConflictError struct {
	// contains filtered or unexported fields
}

func (*WriteConflictError) Error

func (e *WriteConflictError) Error() string

func (*WriteConflictError) Unwrap

func (e *WriteConflictError) Unwrap() error

type ZSetMeta

type ZSetMeta struct {
	Len int64
}

ZSetMeta is the base metadata for a sorted set collection.

func UnmarshalZSetMeta

func UnmarshalZSetMeta(b []byte) (ZSetMeta, error)

UnmarshalZSetMeta decodes ZSetMeta from the fixed 8-byte binary format.

type ZSetMetaDelta

type ZSetMetaDelta struct {
	LenDelta int64
}

ZSetMetaDelta holds a signed change in member count.

func UnmarshalZSetMetaDelta

func UnmarshalZSetMetaDelta(b []byte) (ZSetMetaDelta, error)

UnmarshalZSetMetaDelta decodes ZSetMetaDelta from the fixed 8-byte binary format.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL