store

package
v0.0.0-...-ebdf8eb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: AGPL-3.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	HashMetaPrefix      = "!hs|meta|"
	HashFieldPrefix     = "!hs|fld|"
	HashMetaDeltaPrefix = "!hs|meta|d|"
)

Hash wide-column key layout:

Base Metadata: !hs|meta|<userKeyLen(4)><userKey>               → [Len(8)]
Field Key:     !hs|fld|<userKeyLen(4)><userKey><fieldName>     → field value bytes
Delta Key:     !hs|meta|d|<userKeyLen(4)><userKey><commitTS(8)><seqInTxn(4)> → [LenDelta(8)]
View Source
const (
	// ListMetaDeltaPrefix is the prefix for all list metadata delta keys.
	// Layout: !lst|meta|d|<userKeyLen(4)><userKey><commitTS(8)><seqInTxn(4)>
	ListMetaDeltaPrefix = "!lst|meta|d|"

	// ListClaimPrefix is the prefix for list claim keys used by POP operations.
	// Layout: !lst|claim|<userKeyLen(4)><userKey><seq(8-byte sortable)>
	ListClaimPrefix = "!lst|claim|"

	// MaxDeltaScanLimit is the hard limit on delta scan results per read.
	// If a key accumulates more than this many uncompacted deltas, reads
	// return ErrDeltaScanTruncated until background compaction catches up.
	// Set high enough to tolerate burst traffic between compaction ticks
	// (default 30 s) without sacrificing read availability on hot keys.
	MaxDeltaScanLimit = 1024
)

Delta/Claim key constants.

View Source
const (
	ListMetaPrefix = "!lst|meta|"
	ListItemPrefix = "!lst|itm|"
)
View Source
const (
	SetMetaPrefix      = "!st|meta|"
	SetMemberPrefix    = "!st|mem|"
	SetMetaDeltaPrefix = "!st|meta|d|"
)

Set wide-column key layout:

Base Metadata: !st|meta|<userKeyLen(4)><userKey>                 → [Len(8)]
Member Key:    !st|mem|<userKeyLen(4)><userKey><member>           → (empty value)
Delta Key:     !st|meta|d|<userKeyLen(4)><userKey><commitTS(8)><seqInTxn(4)> → [LenDelta(8)]
View Source
const (
	StreamMetaPrefix  = "!stream|meta|"
	StreamEntryPrefix = "!stream|entry|"

	// StreamIDBytes is the fixed size of the binary StreamID suffix on an entry key:
	// 8 bytes big-endian ms || 8 bytes big-endian seq. Big-endian so lex order
	// over the raw key bytes matches the (ms, seq) numeric order used by XADD / XRANGE.
	StreamIDBytes = 16
)

Stream wide-column key layout:

Meta:  !stream|meta|<userKeyLen(4)><userKey>        → [Len(8)][LastMs(8)][LastSeq(8)]
Entry: !stream|entry|<userKeyLen(4)><userKey><StreamID(16)>
View Source
const (
	WriteConflictKindWrite = "write"
	WriteConflictKindRead  = "read"
)

Write-conflict classification labels. These are emitted as the "key_prefix" Prometheus label on elastickv_store_write_conflict_total so operators can tell WHICH namespace is producing OCC conflicts. The label set is deliberately bounded and stable — adding a new bucket is a conscious schema change, not a cardinality surprise.

Buckets are aligned with the internal key prefixes declared across the codebase:

!txn|lock|, !txn|int|, !txn|cmt|, !txn|rb|  (kv/txn_keys.go)
!redis|str|, !redis|ttl|, !redis|hll|       (adapter/redis_compat_types.go)
!hs|, !st|, !zs|, !lst|                     (store/*_helpers.go)
!ddb|                                       (kv/shard_key.go)
View Source
const (
	ZSetMetaPrefix      = "!zs|meta|"
	ZSetMemberPrefix    = "!zs|mem|"
	ZSetScorePrefix     = "!zs|scr|"
	ZSetMetaDeltaPrefix = "!zs|meta|d|"
)

ZSet wide-column key layout:

Base Metadata: !zs|meta|<userKeyLen(4)><userKey>                               → [Len(8)]
Member Key:    !zs|mem|<userKeyLen(4)><userKey><member>                         → [Score(8)] IEEE 754
Score Index:   !zs|scr|<userKeyLen(4)><userKey><sortableScore(8)><member>       → (empty)
Delta Key:     !zs|meta|d|<userKeyLen(4)><userKey><commitTS(8)><seqInTxn(4)>   → [LenDelta(8)]

Variables

View Source
var ErrEncryptedReadIntegrity = errors.New("store: encrypted value failed integrity check (GCM tag mismatch); refusing to surface plaintext")

ErrEncryptedReadIntegrity wraps encryption.ErrIntegrity for storage-layer callers (Get / scan / iterator). Per design §4.1, callers MUST treat this as a typed read error and never silently zero the value or skip the row.

Callers can disambiguate it from any other read error with errors.Is.

View Source
var ErrEncryptedValueReservedState = errors.New("store: value header carries reserved encryption_state; binary too old to read this entry")

ErrEncryptedValueReservedState indicates decodeValue saw an encryption_state value (0b10 or 0b11) that the current build does not know how to interpret. Per design §7.1, this is a fail-closed trip-wire so an old binary cannot silently treat a future-version encrypted entry as cleartext bytes.

View Source
var ErrExpired = errors.New("expired")
View Source
var ErrInvalidChecksum = errors.New("invalid checksum")
View Source
var ErrKeyNotFound = errors.New("not found")
View Source
var ErrNotSupported = errors.New("not supported")
View Source
var ErrReadTSCompacted = errors.New("read timestamp has been compacted")
View Source
var ErrSnapshotKeyTooLarge = errors.New("mvcc snapshot key too large")
View Source
var ErrSnapshotVersionCountTooLarge = errors.New("mvcc snapshot version count too large")
View Source
var ErrUnknownOp = errors.New("unknown op")
View Source
var ErrUnsupportedStoreForWriterRegistry = errors.New("store: WriterRegistryFor requires a Pebble-backed MVCCStore")

ErrUnsupportedStoreForWriterRegistry is returned by WriterRegistryFor when the supplied MVCCStore is not backed by the in-tree Pebble implementation. Test fakes and alternate backends are detected at construction time so the failure mode is "binary refuses to start" rather than "FSM apply panics inside Raft loop".

View Source
var ErrValueTooLarge = errors.New("value too large")
View Source
var ErrWriteConflict = errors.New("write conflict")
View Source
var ErrWriterNotRegistered = errors.New("store: storage envelope active but writer not yet registered; refusing to emit nonce before registration")

ErrWriterNotRegistered is returned by the direct write path when the §7.1 storage envelope is active but this process load has not yet confirmed its §4.1 writer registration for the active storage DEK (Stage 7a-2). It is a transient, retryable condition: the caller should retry once registration commits (the barrier closes). Callers disambiguate it with errors.Is so a fail-closed pre-registration write is never mistaken for a permanent storage fault.

View Source
var Tombstone = []byte{0x00}

Functions

func DecodeSortableFloat64

func DecodeSortableFloat64(b [8]byte) float64

DecodeSortableFloat64 decodes a sortable 8-byte representation back to float64.

func DecodeStreamID

func DecodeStreamID(b []byte) (ms, seq uint64, ok bool)

DecodeStreamID parses a 16-byte big-endian ms||seq tuple. Returns false if the slice length is wrong.

func DecodeWriteConflictLabel

func DecodeWriteConflictLabel(label string) (kind, keyClass string, ok bool)

DecodeWriteConflictLabel splits a flat label back into (kind, keyClass). Returns ok=false for malformed input so collector code can skip rather than emit bogus series.

func EncodeSortableFloat64

func EncodeSortableFloat64(f float64) [8]byte

EncodeSortableFloat64 encodes a float64 into a sortable 8-byte representation. For positive floats: XOR the sign bit to make them sort above negative. For negative floats: XOR all bits to reverse the order. This produces a byte sequence that sorts correctly with standard byte comparison.

func EncodeStreamID

func EncodeStreamID(ms, seq uint64) []byte

EncodeStreamID writes a StreamID as 16 big-endian bytes: ms || seq.

func EncodeWriteConflictLabel

func EncodeWriteConflictLabel(kind, keyClass string) string

EncodeWriteConflictLabel joins a (kind, keyClass) tuple into the flat map key used by WriteConflictCountsByPrefix snapshots. Exposed so the monitoring collector can build the same string without re-implementing the convention.

func ExtractHashFieldName

func ExtractHashFieldName(key, userKey []byte) []byte

ExtractHashFieldName extracts the field name from a hash field key.

func ExtractHashUserKeyFromDelta

func ExtractHashUserKeyFromDelta(key []byte) []byte

ExtractHashUserKeyFromDelta extracts the logical user key from a hash delta key.

func ExtractHashUserKeyFromField

func ExtractHashUserKeyFromField(key []byte) []byte

ExtractHashUserKeyFromField extracts the logical user key from a hash field key.

func ExtractHashUserKeyFromMeta

func ExtractHashUserKeyFromMeta(key []byte) []byte

ExtractHashUserKeyFromMeta extracts the logical user key from a hash meta key.

func ExtractListItemSeq

func ExtractListItemSeq(itemKey, userKey []byte) (int64, bool)

ExtractListItemSeq extracts the sequence number encoded in a list item key. Returns (seq, true) on success; (0, false) if key is not a valid item key for userKey.

func ExtractListUserKey

func ExtractListUserKey(key []byte) []byte

ExtractListUserKey returns the logical user key from a list meta or item key. If the key is not a list key, it returns nil.

func ExtractListUserKeyFromClaim

func ExtractListUserKeyFromClaim(key []byte) []byte

ExtractListUserKeyFromClaim extracts the logical user key from a list claim key.

func ExtractListUserKeyFromDelta

func ExtractListUserKeyFromDelta(key []byte) []byte

ExtractListUserKeyFromDelta extracts the logical user key from a list delta key.

func ExtractSetMemberName

func ExtractSetMemberName(key, userKey []byte) []byte

ExtractSetMemberName extracts the member name from a set member key.

func ExtractSetUserKeyFromDelta

func ExtractSetUserKeyFromDelta(key []byte) []byte

ExtractSetUserKeyFromDelta extracts the logical user key from a set delta key.

func ExtractSetUserKeyFromMember

func ExtractSetUserKeyFromMember(key []byte) []byte

ExtractSetUserKeyFromMember extracts the logical user key from a set member key.

func ExtractSetUserKeyFromMeta

func ExtractSetUserKeyFromMeta(key []byte) []byte

ExtractSetUserKeyFromMeta extracts the logical user key from a set meta key.

func ExtractStreamEntryID

func ExtractStreamEntryID(entryKey, userKey []byte) (ms, seq uint64, ok bool)

ExtractStreamEntryID extracts (ms, seq) from a stream entry key. Returns false if the key is not a valid entry key for the given userKey.

func ExtractStreamUserKeyFromEntry

func ExtractStreamUserKeyFromEntry(key []byte) []byte

ExtractStreamUserKeyFromEntry extracts the logical user key from a stream entry key.

See ExtractStreamUserKeyFromMeta for the rationale of the uint64 bounds check; the entry variant additionally has to account for the trailing StreamIDBytes (16 bytes) suffix.

func ExtractStreamUserKeyFromMeta

func ExtractStreamUserKeyFromMeta(key []byte) []byte

ExtractStreamUserKeyFromMeta extracts the logical user key from a stream meta key.

The bounds check is done in uint64 so a corrupted length prefix near math.MaxUint32 cannot wrap (uint32(wideColKeyLenSize)+ukLen) and pass a false negative, which would then panic on the trimmed[lo:hi] slice below.

func ExtractZSetMemberName

func ExtractZSetMemberName(key, userKey []byte) []byte

ExtractZSetMemberName extracts the member name from a zset member key.

func ExtractZSetScoreAndMember

func ExtractZSetScoreAndMember(key, userKey []byte) (score float64, member []byte, ok bool)

ExtractZSetScoreAndMember extracts the score and member name from a zset score index key.

func ExtractZSetUserKeyFromDelta

func ExtractZSetUserKeyFromDelta(key []byte) []byte

ExtractZSetUserKeyFromDelta extracts the logical user key from a zset delta key.

func ExtractZSetUserKeyFromMember

func ExtractZSetUserKeyFromMember(key []byte) []byte

ExtractZSetUserKeyFromMember extracts the logical user key from a zset member key.

func ExtractZSetUserKeyFromMeta

func ExtractZSetUserKeyFromMeta(key []byte) []byte

ExtractZSetUserKeyFromMeta extracts the logical user key from a zset meta key.

func ExtractZSetUserKeyFromScore

func ExtractZSetUserKeyFromScore(key []byte) []byte

ExtractZSetUserKeyFromScore extracts the logical user key from a zset score index key.

func HashFieldKey

func HashFieldKey(userKey, fieldName []byte) []byte

HashFieldKey builds the per-field key for a hash field.

func HashFieldScanPrefix

func HashFieldScanPrefix(userKey []byte) []byte

HashFieldScanPrefix returns the prefix to scan all fields of a hash.

func HashMetaDeltaKey

func HashMetaDeltaKey(userKey []byte, commitTS uint64, seqInTxn uint32) []byte

HashMetaDeltaKey builds the delta key for a hash metadata change.

func HashMetaDeltaScanPrefix

func HashMetaDeltaScanPrefix(userKey []byte) []byte

HashMetaDeltaScanPrefix returns the prefix to scan all delta keys for a hash.

func HashMetaKey

func HashMetaKey(userKey []byte) []byte

HashMetaKey builds the metadata key for a hash.

func IsHashFieldKey

func IsHashFieldKey(key []byte) bool

IsHashFieldKey reports whether the key is a hash field key.

func IsHashMetaDeltaKey

func IsHashMetaDeltaKey(key []byte) bool

IsHashMetaDeltaKey reports whether the key is a hash metadata delta key.

func IsHashMetaKey

func IsHashMetaKey(key []byte) bool

IsHashMetaKey reports whether the key is a hash metadata key.

func IsListClaimKey

func IsListClaimKey(key []byte) bool

IsListClaimKey reports whether the key is a list claim key.

func IsListItemKey

func IsListItemKey(key []byte) bool

func IsListMetaDeltaKey

func IsListMetaDeltaKey(key []byte) bool

IsListMetaDeltaKey reports whether the key is a list metadata delta key.

func IsListMetaKey

func IsListMetaKey(key []byte) bool

IsListMetaKey Exported helpers for other packages (e.g., Redis adapter).

func IsSetMemberKey

func IsSetMemberKey(key []byte) bool

IsSetMemberKey reports whether the key is a set member key.

func IsSetMetaDeltaKey

func IsSetMetaDeltaKey(key []byte) bool

IsSetMetaDeltaKey reports whether the key is a set metadata delta key.

func IsSetMetaKey

func IsSetMetaKey(key []byte) bool

IsSetMetaKey reports whether the key is a set metadata key.

func IsStreamEntryKey

func IsStreamEntryKey(key []byte) bool

IsStreamEntryKey reports whether the key is a stream entry key.

func IsStreamMetaKey

func IsStreamMetaKey(key []byte) bool

IsStreamMetaKey reports whether the key is a stream metadata key.

func IsZSetMemberKey

func IsZSetMemberKey(key []byte) bool

IsZSetMemberKey reports whether the key is a sorted set member key.

func IsZSetMetaDeltaKey

func IsZSetMetaDeltaKey(key []byte) bool

IsZSetMetaDeltaKey reports whether the key is a sorted set metadata delta key.

func IsZSetMetaKey

func IsZSetMetaKey(key []byte) bool

IsZSetMetaKey reports whether the key is a sorted set metadata key.

func IsZSetScoreKey

func IsZSetScoreKey(key []byte) bool

IsZSetScoreKey reports whether the key is a sorted set score index key.

func ListClaimKey

func ListClaimKey(userKey []byte, seq int64) []byte

ListClaimKey builds the claim key for a list item at the given sequence number.

func ListClaimScanPrefix

func ListClaimScanPrefix(userKey []byte) []byte

ListClaimScanPrefix returns the prefix used to scan all claim keys for a userKey.

func ListItemKey

func ListItemKey(userKey []byte, seq int64) []byte

ListItemKey builds the item key for a user key and sequence number.

func ListMetaDeltaKey

func ListMetaDeltaKey(userKey []byte, commitTS uint64, seqInTxn uint32) []byte

ListMetaDeltaKey builds the delta key for a list: prefix + 4-byte userKeyLen + userKey + 8-byte commitTS + 4-byte seqInTxn.

func ListMetaDeltaScanPrefix

func ListMetaDeltaScanPrefix(userKey []byte) []byte

ListMetaDeltaScanPrefix returns the prefix used to scan all delta keys for a userKey.

func ListMetaKey

func ListMetaKey(userKey []byte) []byte

ListMetaKey builds the metadata key for a user key.

func MarshalHashMeta

func MarshalHashMeta(m HashMeta) []byte

MarshalHashMeta encodes HashMeta into a fixed 8-byte binary format.

func MarshalHashMetaDelta

func MarshalHashMetaDelta(d HashMetaDelta) []byte

MarshalHashMetaDelta encodes HashMetaDelta into a fixed 8-byte binary format.

func MarshalListMeta

func MarshalListMeta(meta ListMeta) ([]byte, error)

MarshalListMeta encodes ListMeta into a fixed 24-byte binary format.

func MarshalListMetaDelta

func MarshalListMetaDelta(d ListMetaDelta) []byte

MarshalListMetaDelta encodes a ListMetaDelta into a fixed 16-byte binary format.

func MarshalSetMeta

func MarshalSetMeta(m SetMeta) []byte

MarshalSetMeta encodes SetMeta into a fixed 8-byte binary format.

func MarshalSetMetaDelta

func MarshalSetMetaDelta(d SetMetaDelta) []byte

MarshalSetMetaDelta encodes SetMetaDelta into a fixed 8-byte binary format.

func MarshalStreamMeta

func MarshalStreamMeta(m StreamMeta) ([]byte, error)

MarshalStreamMeta encodes StreamMeta into a fixed 24-byte binary format.

func MarshalZSetMeta

func MarshalZSetMeta(m ZSetMeta) []byte

MarshalZSetMeta encodes ZSetMeta into a fixed 8-byte binary format.

func MarshalZSetMetaDelta

func MarshalZSetMetaDelta(d ZSetMetaDelta) []byte

MarshalZSetMetaDelta encodes ZSetMetaDelta into a fixed 8-byte binary format.

func MarshalZSetScore

func MarshalZSetScore(score float64) []byte

MarshalZSetScore encodes a float64 score in IEEE 754 big-endian format.

func NewWriteConflictError

func NewWriteConflictError(key []byte) error

func PrefixScanEnd

func PrefixScanEnd(prefix []byte) []byte

PrefixScanEnd returns the exclusive end key for a prefix scan. It increments the last byte of the prefix; if overflow occurs (all 0xFF), it returns a nil slice which callers must interpret as "scan to end of keyspace".

func SetMemberKey

func SetMemberKey(userKey, member []byte) []byte

SetMemberKey builds the per-member key for a set member.

func SetMemberScanPrefix

func SetMemberScanPrefix(userKey []byte) []byte

SetMemberScanPrefix returns the prefix to scan all members of a set.

func SetMetaDeltaKey

func SetMetaDeltaKey(userKey []byte, commitTS uint64, seqInTxn uint32) []byte

SetMetaDeltaKey builds the delta key for a set metadata change.

func SetMetaDeltaScanPrefix

func SetMetaDeltaScanPrefix(userKey []byte) []byte

SetMetaDeltaScanPrefix returns the prefix to scan all delta keys for a set.

func SetMetaKey

func SetMetaKey(userKey []byte) []byte

SetMetaKey builds the metadata key for a set.

func StreamEntryKey

func StreamEntryKey(userKey []byte, ms, seq uint64) []byte

StreamEntryKey builds the per-entry key for a stream.

func StreamEntryScanPrefix

func StreamEntryScanPrefix(userKey []byte) []byte

StreamEntryScanPrefix returns the common prefix for every entry belonging to userKey, used as the [start, end) prefix for XRANGE / XREAD scans.

func StreamMetaKey

func StreamMetaKey(userKey []byte) []byte

StreamMetaKey builds the per-stream metadata key.

func UnmarshalZSetScore

func UnmarshalZSetScore(b []byte) (float64, error)

UnmarshalZSetScore decodes a float64 score from IEEE 754 big-endian format.

func WriteConflictKey

func WriteConflictKey(err error) ([]byte, bool)

func WriterRegistryFor

func WriterRegistryFor(s MVCCStore) (encryption.WriterRegistryStore, error)

WriterRegistryFor returns an encryption.WriterRegistryStore backed by the Pebble DB underlying the supplied MVCCStore. Returns ErrUnsupportedStoreForWriterRegistry if the store is not a Pebble-backed MVCCStore — callers should treat this as a startup-fatal misconfiguration (the only in-tree non-Pebble MVCCStore is the in-memory test fake, which has no encryption requirements).

func ZSetMemberKey

func ZSetMemberKey(userKey, member []byte) []byte

ZSetMemberKey builds the per-member key storing the score for a sorted set.

func ZSetMemberScanPrefix

func ZSetMemberScanPrefix(userKey []byte) []byte

ZSetMemberScanPrefix returns the prefix to scan all members of a sorted set.

func ZSetMetaDeltaKey

func ZSetMetaDeltaKey(userKey []byte, commitTS uint64, seqInTxn uint32) []byte

ZSetMetaDeltaKey builds the delta key for a sorted set metadata change.

func ZSetMetaDeltaScanPrefix

func ZSetMetaDeltaScanPrefix(userKey []byte) []byte

ZSetMetaDeltaScanPrefix returns the prefix to scan all delta keys for a sorted set.

func ZSetMetaKey

func ZSetMetaKey(userKey []byte) []byte

ZSetMetaKey builds the metadata key for a sorted set.

func ZSetScoreKey

func ZSetScoreKey(userKey []byte, score float64, member []byte) []byte

ZSetScoreKey builds the score index key for a sorted set entry. Layout: !zs|scr|<userKeyLen(4)><userKey><sortableScore(8)><member>

func ZSetScoreRangeScanPrefix

func ZSetScoreRangeScanPrefix(userKey []byte, score float64) []byte

ZSetScoreRangeScanPrefix returns the prefix for scanning scores in [minScore, maxScore].

func ZSetScoreScanPrefix

func ZSetScoreScanPrefix(userKey []byte) []byte

ZSetScoreScanPrefix returns the prefix to scan all score index keys for a sorted set.

Types

type ActiveStorageKeyID

type ActiveStorageKeyID func() (uint32, bool)

ActiveStorageKeyID reports the currently-active storage DEK identifier. The bool is false when no storage DEK is active (i.e. the cluster has not run Phase 1 of the §7.1 rollout yet) — in that case the storage layer writes cleartext as if no cipher were configured. Stage 5/6 wires this from the sidecar's Active.Storage slot; Stage 2 takes it as a closure so test code can flip it independently.

type CounterNonceFactory

type CounterNonceFactory struct {
	// contains filtered or unexported fields
}

CounterNonceFactory is a test-only NonceFactory that produces the design §4.1 deterministic nonce shape (`node_id ‖ local_epoch ‖ write_count`) without the writer-registry round-trip Stage 7 brings. Production wiring uses the registry-backed factory; this implementation is only safe for tests where the caller controls every node_id / local_epoch combination.

Exposed (vs. living in a *_test.go file) so the encryption integration tests in other packages can build on the same implementation without re-deriving the byte layout. It is nevertheless test-grade — the doc comment on NonceFactory emphasises that production callers MUST guarantee (node_id, local_epoch, write_count) uniqueness.

func NewCounterNonceFactory

func NewCounterNonceFactory(nodeID, localEpoch uint16) *CounterNonceFactory

NewCounterNonceFactory constructs a CounterNonceFactory pinned to the given (nodeID, localEpoch). write_count starts at 0 and monotonically increments on every Next().

func (*CounterNonceFactory) Next

Next produces the next 12-byte nonce. Layout matches design §4.1:

bytes 0-1   node_id     (big-endian uint16)
bytes 2-3   local_epoch (big-endian uint16)
bytes 4-11  write_count (big-endian uint64)

Big-endian is chosen so a hex dump of consecutive nonces is human-readable as a counter; the AAD does NOT include the nonce bytes (the cipher composes the nonce into AES-GCM directly), so the byte order is internal to the factory.

type HashMeta

type HashMeta struct {
	Len int64
}

HashMeta is the base metadata for a hash collection.

func UnmarshalHashMeta

func UnmarshalHashMeta(b []byte) (HashMeta, error)

UnmarshalHashMeta decodes HashMeta from the fixed 8-byte binary format.

type HashMetaDelta

type HashMetaDelta struct {
	LenDelta int64
}

HashMetaDelta holds a signed change in field count.

func UnmarshalHashMetaDelta

func UnmarshalHashMetaDelta(b []byte) (HashMetaDelta, error)

UnmarshalHashMetaDelta decodes HashMetaDelta from the fixed 8-byte binary format.

type HybridClock

type HybridClock interface {
	Now() uint64
}

HybridClock provides monotonically increasing timestamps (HLC).

type KVPair

type KVPair struct {
	Key   []byte
	Value []byte
}

type KVPairMutation

type KVPairMutation struct {
	Op    OpType
	Key   []byte
	Value []byte
	// ExpireAt is an HLC timestamp; 0 means no TTL.
	ExpireAt uint64
}

KVPairMutation is a small helper struct for MVCC mutation application.

type ListMeta

type ListMeta struct {
	Head int64 `json:"h"`
	Tail int64 `json:"t"`
	Len  int64 `json:"l"`
}

func UnmarshalListMeta

func UnmarshalListMeta(b []byte) (ListMeta, error)

UnmarshalListMeta decodes ListMeta from the fixed 24-byte binary format.

type ListMetaDelta

type ListMetaDelta struct {
	HeadDelta int64
	LenDelta  int64
}

ListMetaDelta holds the signed deltas applied by a single PUSH/POP operation.

func UnmarshalListMetaDelta

func UnmarshalListMetaDelta(b []byte) (ListMetaDelta, error)

UnmarshalListMetaDelta decodes a ListMetaDelta from the fixed 16-byte binary format.

type MVCCStore

type MVCCStore interface {
	// GetAt returns the newest version whose commit timestamp is <= ts.
	GetAt(ctx context.Context, key []byte, ts uint64) ([]byte, error)
	// ExistsAt reports whether a visible, non-tombstone version exists at ts.
	ExistsAt(ctx context.Context, key []byte, ts uint64) (bool, error)
	// ScanAt returns versions visible at the given timestamp.
	ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*KVPair, error)
	// ReverseScanAt returns visible versions in descending key order for keys in [start, end).
	ReverseScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*KVPair, error)
	// PutAt commits a value at the provided commit timestamp and optional expireAt.
	PutAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error
	// DeleteAt commits a tombstone at the provided commit timestamp.
	DeleteAt(ctx context.Context, key []byte, commitTS uint64) error
	// PutWithTTLAt stores a value with a precomputed expireAt (HLC) at the given commit timestamp.
	PutWithTTLAt(ctx context.Context, key []byte, value []byte, commitTS uint64, expireAt uint64) error
	// ExpireAt sets/renews TTL using a precomputed expireAt (HLC) at the given commit timestamp.
	ExpireAt(ctx context.Context, key []byte, expireAt uint64, commitTS uint64) error
	// LatestCommitTS returns the commit timestamp of the newest version.
	// The boolean reports whether the key has any version.
	LatestCommitTS(ctx context.Context, key []byte) (uint64, bool, error)
	// CommittedVersionAt reports whether a committed version stamped
	// EXACTLY commitTS exists for key. Unlike GetAt (newest version
	// <= ts) this is an exact-timestamp existence check, used by the
	// one-phase transaction idempotency probe to ask "did the previous
	// attempt — which committed at this exact commit_ts — land?". Because
	// commit timestamps are issued by the strictly-monotonic, unique HLC
	// (Clock().Next()), a version at an exact commitTS on a given key can
	// only have come from the transaction that was assigned that timestamp,
	// so an exact hit unambiguously identifies that attempt. A tombstone
	// counts as a landed version (the attempt committed a delete). See
	// docs/design/2026_05_21_proposed_txn_secondary_idempotency.md.
	CommittedVersionAt(ctx context.Context, key []byte, commitTS uint64) (bool, error)
	// ApplyMutations atomically validates and appends the provided mutations.
	// It must return ErrWriteConflict if any mutation key or any read key has
	// a newer commit timestamp than startTS. readKeys carries the transaction's
	// read set for read-write conflict detection; pass nil when no read set
	// validation is needed.
	//
	// Isolation guarantees vary by transaction topology:
	//
	//   Single-shard transactions: readKeys are included in the Raft log entry
	//   and validated atomically under the FSM's applyMu lock alongside
	//   write-write conflict detection. The adapter's pre-Raft validateReadSet
	//   call is kept as a fast-fail optimization but the FSM check is
	//   authoritative. No TOCTOU window; full SSI.
	//
	//   Multi-shard (2PC) write shards: readKeys are included in the
	//   PREPARE Raft entry and validated atomically under the FSM's applyMu
	//   lock. No TOCTOU window; full SSI.
	//
	//   Multi-shard (2PC) read-only shards: validated via a linearizable
	//   read barrier followed by LatestCommitTS outside the FSM lock. A
	//   small TOCTOU window exists between the barrier and the check.
	ApplyMutations(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error
	// ApplyMutationsRaft is the raft-apply variant of ApplyMutations. It
	// carries identical MVCC semantics but is governed by the FSM-commit
	// sync-mode knob (ELASTICKV_FSM_SYNC_MODE). Callers MUST only use this
	// when the write is part of a raft-log apply — the raft WAL is the
	// durability backstop that makes an un-fsynced Pebble write safe.
	//
	// Direct (non-raft) callers (catalog bootstrap, admin snapshots,
	// migrations, tests) must use ApplyMutations, which is always
	// pebble.Sync and therefore safe without raft-log replay.
	ApplyMutationsRaft(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS uint64) error
	// ApplyMutationsRaftAt is ApplyMutationsRaft with the raft entry
	// index threaded through. The leaf bundles metaAppliedIndex in the
	// same pebble.Batch as the data mutation so a successful Apply
	// implies LastAppliedIndex >= appliedIndex; the cold-start
	// snapshot-restore skip gate uses this invariant (PR #910 / B2).
	// appliedIndex==0 is treated as "no index, do not bump the meta
	// key", matching ApplyMutationsRaft semantics for callers that have
	// not yet been wired to the raftengine.ApplyIndexAware seam.
	ApplyMutationsRaftAt(ctx context.Context, mutations []*KVPairMutation, readKeys [][]byte, startTS, commitTS, appliedIndex uint64) error
	// DeletePrefixAt atomically deletes all visible (non-tombstone, non-expired)
	// keys matching prefix at commitTS by writing tombstone versions. An empty
	// prefix means "all keys". Keys matching excludePrefix are preserved.
	// No conflict checking is performed; this is intended for bulk operations
	// such as FLUSHALL where the caller knows no conflict check is needed.
	DeletePrefixAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
	// DeletePrefixAtRaft is the raft-apply variant of DeletePrefixAt with
	// the same durability contract as ApplyMutationsRaft.
	DeletePrefixAtRaft(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS uint64) error
	// DeletePrefixAtRaftAt is DeletePrefixAtRaft with the raft entry
	// index threaded through. handleDelPrefix builds an independent
	// pebble.Batch separate from applyMutationsWithOpts; this overload
	// bundles metaAppliedIndex in that batch so DEL_PREFIX entries
	// also advance the meta key. PR #910 design §2 "why both leaves".
	DeletePrefixAtRaftAt(ctx context.Context, prefix []byte, excludePrefix []byte, commitTS, appliedIndex uint64) error
	// LastCommitTS returns the highest commit timestamp applied on this node.
	LastCommitTS() uint64
	// WriteConflictCountsByPrefix returns a snapshot of the MVCC
	// write-conflict counters keyed by "<kind>|<key_prefix>" where
	// kind is "read" or "write" and key_prefix is a bounded
	// classification of the conflicting key. The map is a copy; the
	// caller may mutate it freely. Implementations that do not track
	// conflicts may return an empty (non-nil) map.
	WriteConflictCountsByPrefix() map[string]uint64
	// Compact removes versions older than minTS that are no longer needed.
	Compact(ctx context.Context, minTS uint64) error
	Snapshot() (Snapshot, error)
	Restore(buf io.Reader) error
	Close() error
}

MVCCStore extends Store with multi-version concurrency control helpers. The interface is timestamp-explicit; callers must supply the snapshot or commit timestamp for every operation.

func NewMVCCStore

func NewMVCCStore(opts ...MVCCStoreOption) MVCCStore

NewMVCCStore creates a new MVCC-enabled in-memory store.

func NewPebbleStore

func NewPebbleStore(dir string, opts ...PebbleStoreOption) (MVCCStore, error)

NewPebbleStore creates a new Pebble-backed MVCC store.

type MVCCStoreOption

type MVCCStoreOption func(*mvccStore)

MVCCStoreOption configures the MVCCStore.

func WithLogger

func WithLogger(l *slog.Logger) MVCCStoreOption

WithLogger sets a custom logger for the store.

type NonceFactory

type NonceFactory interface {
	Next() ([encryption.NonceSize]byte, error)
}

NonceFactory produces unique 12-byte AES-GCM nonces for the storage envelope (§4.1). The factory is responsible for the cluster-wide uniqueness invariant across `(node_id, local_epoch, write_count)` — the storage layer just calls Next() and uses what comes back.

Stage 7 of the encryption rollout will replace the in-tree reference implementation (CounterNonceFactory) with a writer-registry-backed factory that guarantees uniqueness across voters, learners, and historical replicas. The interface stays the same; only the construction changes. Implementations MUST NOT return the same nonce twice under the same DEK — AES-GCM nonce reuse is catastrophic (see encryption.Cipher doc).

type OpType

type OpType int

OpType describes a mutation kind.

const (
	OpTypePut OpType = iota
	OpTypeDelete
)

type PebbleStoreOption

type PebbleStoreOption func(*pebbleStore)

PebbleStoreOption configures the PebbleStore.

func WithEncryption

func WithEncryption(cipher *encryption.Cipher, nf NonceFactory, activeKeyID ActiveStorageKeyID) PebbleStoreOption

WithEncryption configures the pebble-backed store to wrap every committed value in the §4.1 storage envelope.

All three arguments must be non-nil. activeKeyID is called on every Put — when it returns ok=false the store writes cleartext (encryption_state = 0b00) even though a cipher is wired, matching the §7.1 Phase 0 / Phase 1 split where capability is provisioned before activation. Reads that observe encryption_state = 0b01 always go through the cipher regardless of activeKeyID, so a cluster mid-cutover stays readable.

Calling WithEncryption with any nil argument is a no-op (the store stays in legacy cleartext-only mode). This keeps the option backwards-compatible with every existing NewPebbleStore caller and keeps the Stage 2 wiring trivially reversible.

func WithPebbleLogger

func WithPebbleLogger(l *slog.Logger) PebbleStoreOption

WithPebbleLogger sets a custom logger.

func WithStorageEnvelopeGate

func WithStorageEnvelopeGate(active StorageEnvelopeActive) PebbleStoreOption

WithStorageEnvelopeGate wires the Stage 6D-5 §6.2 cutover gate in front of the existing envelope-emit path. After this option, encryptForKey writes cleartext when active() returns false even if a cipher and active DEK are present — exactly the §7.1 Phase 0 / Phase 1 split described in the parent encryption design doc.

Passing nil is a no-op: the store stays in the pre-6D-5 "encrypt whenever activeKeyID returns a DEK" posture used by existing test fixtures and by production deployments that have not yet run the EnableStorageEnvelope RPC (lands in 6D-6). Operators must thread this option in alongside WithEncryption to actually opt into the cutover semantics.

The gate is consulted on every Put. Reads never consult it — on-disk versions with `encryption_state == 0b01` always go through the cipher regardless of the gate, so a cluster mid- cutover (some versions cleartext, others encrypted) stays readable.

func WithStorageRegistrationGate

func WithStorageRegistrationGate(registered StorageRegistered) PebbleStoreOption

WithStorageRegistrationGate wires the Stage 7a-2 §4.1 registration gate in front of the envelope-emit path on the DIRECT write path only. After this option, when the envelope would encrypt (cipher + active DEK + StorageEnvelopeActive all true) but registered() reports false, the direct path (PutAt / ExpireAt / ApplyMutations) returns ErrWriterNotRegistered instead of emitting a nonce. The FSM-apply path (ApplyMutationsRaft) passes gateRegistration=false into encryptForKey and is never gated — see design §1: replicated apply must stay deterministic and may run before this node's own registration entry commits, so fail-closing it would halt the apply loop (and deadlock a node whose storage entry is ordered before its registration entry).

Passing nil is a no-op: the store keeps the pre-7a-2 posture where the direct path emits envelopes as soon as the cutover gate is open, used by existing fixtures and by deployments that have not wired the writer registry. Operators thread cache.Registered in via main.go.

type RetentionController

type RetentionController interface {
	MinRetainedTS() uint64
	SetMinRetainedTS(ts uint64)
}

RetentionController exposes the minimum timestamp still retained by a store after MVCC compaction. Reads older than this watermark may fail with ErrReadTSCompacted.

type SetMeta

type SetMeta struct {
	Len int64
}

SetMeta is the base metadata for a set collection.

func UnmarshalSetMeta

func UnmarshalSetMeta(b []byte) (SetMeta, error)

UnmarshalSetMeta decodes SetMeta from the fixed 8-byte binary format.

type SetMetaDelta

type SetMetaDelta struct {
	LenDelta int64
}

SetMetaDelta holds a signed change in member count.

func UnmarshalSetMetaDelta

func UnmarshalSetMetaDelta(b []byte) (SetMetaDelta, error)

UnmarshalSetMetaDelta decodes SetMetaDelta from the fixed 8-byte binary format.

type Snapshot

type Snapshot interface {
	io.WriterTo
	io.Closer
}

Snapshot streams a consistent point-in-time store image to a writer. Implementations may back this with a temp file or an engine-native snapshot.

type StorageEnvelopeActive

type StorageEnvelopeActive func() bool

StorageEnvelopeActive reports whether the §7.1 Phase 1 cutover has fired on this node (the Stage 6D-4 sidecar field of the same name, surfaced to the storage layer via WithStorageEnvelopeGate). A return value of false forces every Put to write cleartext even when a cipher + active DEK are wired; true allows the existing activeStorageKeyID-driven envelope emit to proceed.

The contract intentionally separates "encryption is provisioned" (ActiveStorageKeyID has a DEK to use) from "encryption is activated for new writes" (the cutover has fired). The two signals diverge during the §7.1 Phase 0 → Phase 1 window: every node in the cluster has the DEK installed, but the cluster has not yet quiesced the cutover Raft entry that flips the bit on every replica simultaneously.

Reads ignore this gate — once an on-disk version's encryption_state bit is 0b01, the read path always invokes the cipher to unwrap regardless of the gate's current value. A cluster that flipped from active back to inactive (not a path the design supports) would still decrypt old envelopes correctly.

type StorageRegistered

type StorageRegistered func() bool

StorageRegistered reports whether this process load's §4.1 writer registration has committed for the currently-active storage DEK (Stage 7a-2). It gates only the DIRECT write path: when it returns false while the envelope would otherwise encrypt, encryptForKey refuses to emit a nonce and returns ErrWriterNotRegistered so the self-originated caller (catalog bootstrap Save, admin snapshot, migration) retries until registration lands rather than emitting an envelope ahead of its writer-registry row.

The FSM-apply path never consults it — see WithStorageRegistrationGate.

type StreamMeta

type StreamMeta struct {
	Length  int64
	LastMs  uint64
	LastSeq uint64
}

StreamMeta is the per-stream metadata. Length is authoritative for XLEN; LastMs/LastSeq track the highest ID ever appended so XADD '*' stays strictly monotonic even after XTRIM removes the current tail.

func UnmarshalStreamMeta

func UnmarshalStreamMeta(b []byte) (StreamMeta, error)

UnmarshalStreamMeta decodes StreamMeta from the fixed 24-byte binary format.

type VersionedValue

type VersionedValue struct {
	TS        uint64
	Value     []byte
	Tombstone bool
	ExpireAt  uint64 // HLC timestamp; 0 means no TTL
}

VersionedValue represents a single committed version in MVCC storage.

type WriteConflictError

type WriteConflictError struct {
	// contains filtered or unexported fields
}

func (*WriteConflictError) Error

func (e *WriteConflictError) Error() string

func (*WriteConflictError) Unwrap

func (e *WriteConflictError) Unwrap() error

type ZSetMeta

type ZSetMeta struct {
	Len int64
}

ZSetMeta is the base metadata for a sorted set collection.

func UnmarshalZSetMeta

func UnmarshalZSetMeta(b []byte) (ZSetMeta, error)

UnmarshalZSetMeta decodes ZSetMeta from the fixed 8-byte binary format.

type ZSetMetaDelta

type ZSetMetaDelta struct {
	LenDelta int64
}

ZSetMetaDelta holds a signed change in member count.

func UnmarshalZSetMetaDelta

func UnmarshalZSetMetaDelta(b []byte) (ZSetMetaDelta, error)

UnmarshalZSetMetaDelta decodes ZSetMetaDelta from the fixed 8-byte binary format.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL