pebbledb

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2025 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	VersionSize = 8

	PrefixStore    = "s/k:"
	LenPrefixStore = 4
	StorePrefixTpl = "s/k:%s/"          // s/k:<storeKey>
	HashTpl        = "s/_hash:%s:%d-%d" // "s/_hash:<storeKey>:%d-%d"

	// TODO: Make configurable
	ImportCommitBatchSize = 10000
	PruneCommitBatchSize  = 50
	DeleteCommitBatchSize = 50

	// Number of workers to use for hash computation
	HashComputationWorkers = 10
)

Variables

View Source
var MVCCComparer = &pebble.Comparer{
	Name: "ss_pebbledb_comparator",

	Compare: MVCCKeyCompare,

	AbbreviatedKey: func(k []byte) uint64 {
		key, _, ok := SplitMVCCKey(k)
		if !ok {
			return 0
		}

		return pebble.DefaultComparer.AbbreviatedKey(key)
	},

	Equal: func(a, b []byte) bool {
		return MVCCKeyCompare(a, b) == 0
	},

	Separator: func(dst, a, b []byte) []byte {
		aKey, _, ok := SplitMVCCKey(a)
		if !ok {
			return append(dst, a...)
		}

		bKey, _, ok := SplitMVCCKey(b)
		if !ok {
			return append(dst, a...)
		}

		if bytes.Equal(aKey, bKey) {
			return append(dst, a...)
		}

		n := len(dst)

		dst = pebble.DefaultComparer.Separator(dst, aKey, bKey)

		buf := dst[n:]
		if bytes.Equal(aKey, buf) {
			return append(dst[:n], a...)
		}

		return append(dst, 0)
	},

	ImmediateSuccessor: func(dst, a []byte) []byte {

		return append(append(dst, a...), 0)
	},

	Successor: func(dst, a []byte) []byte {
		aKey, _, ok := SplitMVCCKey(a)
		if !ok {
			return append(dst, a...)
		}

		n := len(dst)

		dst = pebble.DefaultComparer.Successor(dst, aKey)

		buf := dst[n:]
		if bytes.Equal(aKey, buf) {
			return append(dst[:n], a...)
		}

		return append(dst, 0)
	},

	FormatKey: func(k []byte) fmt.Formatter {
		return mvccKeyFormatter{key: k}
	},

	Split: func(k []byte) int {
		key, _, ok := SplitMVCCKey(k)
		if !ok {
			return len(k)
		}

		return len(key) + 1
	},
}

MVCCComparer returns a PebbleDB Comparer with encoding and decoding routines for MVCC control, used to compare and store versioned keys.

Note: This Comparer implementation is largely based on PebbleDB's internal MVCC example, which can be found here: https://github.com/cockroachdb/pebble/blob/master/cmd/pebble/mvcc.go

Functions

func MVCCEncode

func MVCCEncode(key []byte, version int64) (dst []byte)

<key>\x00[<version>]<#version-bytes>

func MVCCKeyCompare

func MVCCKeyCompare(a, b []byte) int

MVCCKeyCompare compares two MVCC keys.

func SplitMVCCKey

func SplitMVCCKey(mvccKey []byte) (key, version []byte, ok bool)

SplitMVCCKey accepts an MVCC key and returns the "user" key, the MVCC version, and a boolean indicating if the provided key is an MVCC key.

Note, internally, we must make a copy of the provided mvccKey argument, which typically comes from the Key() method as it's not safe.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

func NewBatch

func NewBatch(storage *pebble.DB, version int64) (*Batch, error)

func (*Batch) Delete

func (b *Batch) Delete(storeKey string, key []byte) error

func (*Batch) HardDelete

func (b *Batch) HardDelete(storeKey string, key []byte) error

HardDelete physically removes the key by encoding it with the batch’s version and calling the underlying pebble.Batch.Delete.

func (*Batch) Reset

func (b *Batch) Reset()

func (*Batch) Set

func (b *Batch) Set(storeKey string, key, value []byte) error

func (*Batch) Size

func (b *Batch) Size() int

func (*Batch) Write

func (b *Batch) Write() (err error)

type Database

type Database struct {
	// contains filtered or unexported fields
}

func New

func New(dataDir string, config config.StateStoreConfig) (*Database, error)

func NewWithDB

func NewWithDB(storage *pebble.DB) *Database

func (*Database) ApplyChangeset

func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) error

func (*Database) ApplyChangesetAsync

func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error

func (*Database) Close

func (db *Database) Close() error

func (*Database) DeleteKeysAtVersion

func (db *Database) DeleteKeysAtVersion(module string, version int64) error

func (*Database) Get

func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byte, error)

func (*Database) GetEarliestVersion

func (db *Database) GetEarliestVersion() (int64, error)

func (*Database) GetLastRangeHashed

func (db *Database) GetLastRangeHashed() (int64, error)

GetLastRangeHashed returns the highest block that has been fully hashed in ranges.

func (*Database) GetLatestMigratedKey

func (db *Database) GetLatestMigratedKey() ([]byte, error)

GetLatestKey retrieves the latest key processed during migration.

func (*Database) GetLatestMigratedModule

func (db *Database) GetLatestMigratedModule() (string, error)

GetLatestModule retrieves the latest module processed during migration.

func (*Database) GetLatestVersion

func (db *Database) GetLatestVersion() (int64, error)

func (*Database) Has

func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error)

func (*Database) Import

func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error

Import loads the initial version of the state in parallel with numWorkers goroutines TODO: Potentially add retries instead of panics

func (*Database) Iterator

func (db *Database) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error)

func (*Database) Prune

func (db *Database) Prune(version int64) error

Prune attempts to prune all versions up to and including the current version Get the range of keys, manually iterate over them and delete them We add a heuristic to skip over a module's keys during pruning if it hasn't been updated since the last time pruning occurred. NOTE: There is a rare case when a module's keys are skipped during pruning even though it has been updated. This occurs when that module's keys are updated in between pruning runs, the node after is restarted. This is not a large issue given the next time that module is updated, it will be properly pruned thereafter.

func (*Database) RawImport

func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error

func (*Database) RawIterate

func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte, version int64) bool) (bool, error)

RawIterate iterates over all keys and values for a store

func (*Database) ReverseIterator

func (db *Database) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error)

func (*Database) SetEarliestVersion

func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error

func (*Database) SetLastRangeHashed

func (db *Database) SetLastRangeHashed(latestHashed int64) error

func (*Database) SetLatestMigratedKey

func (db *Database) SetLatestMigratedKey(key []byte) error

SetLatestKey sets the latest key processed during migration.

func (*Database) SetLatestMigratedModule

func (db *Database) SetLatestMigratedModule(module string) error

SetLatestModule sets the latest module processed during migration.

func (*Database) SetLatestVersion

func (db *Database) SetLatestVersion(version int64) error

func (*Database) WriteBlockRangeHash

func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error

WriteBlockRangeHash writes a hash for a range of blocks to the database

type RawBatch

type RawBatch struct {
	// contains filtered or unexported fields
}

For writing kv pairs in any order of version

func NewRawBatch

func NewRawBatch(storage *pebble.DB) (*RawBatch, error)

func (*RawBatch) Delete

func (b *RawBatch) Delete(storeKey string, key []byte, version int64) error

func (*RawBatch) Reset

func (b *RawBatch) Reset()

func (*RawBatch) Set

func (b *RawBatch) Set(storeKey string, key, value []byte, version int64) error

func (*RawBatch) Size

func (b *RawBatch) Size() int

func (*RawBatch) Write

func (b *RawBatch) Write() (err error)

type VersionedChangesets

type VersionedChangesets struct {
	Version    int64
	Changesets []*proto.NamedChangeSet
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL