memdb

package module
v0.0.0-...-d6b7445 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2025 License: Apache-2.0 Imports: 23 Imported by: 0

README

本项目修改 rosedblabs/rosedb,并包含 NOTICE 文件中列出的修改内容。

MemDB 是什么?

EimDB 是英文 Embedded in-memory database 的简写,是一个基于 Bitcask 存储模型,轻量、快速、可靠的 KV 存储引擎。

Bitcask 存储模型的设计主要受到日志结构化的文件系统和日志文件合并的启发。

设计概述

MemDB 存储数据的文件使用预写日志(Write Ahead Log),这些日志文件是具有分块结构的只追加写入(append-only)文件。

wal: https://github.com/rosedblabs/wal

主要特点

优势

读写低延迟 这是由于 Bitcask 存储模型文件的追加写入特性,充分利用顺序 IO 的优势。
高吞吐量,即使数据完全无序 写入 MemDB 的数据不需要在磁盘上排序,Bitcask 的日志结构文件设计在写入过程中减少了磁盘磁头的移动。
能够处理大于内存的数据集,性能稳定 MemDB 的数据访问涉及对内存中的索引数据结构进行直接查找,这使得即使数据集非常大,查找数据也非常高效。
一次磁盘 IO 可以获取任意键值对 MemDB 的内存索引数据结构直接指向数据所在的磁盘位置,不需要多次磁盘寻址来读取一个值,有时甚至不需要寻址,这归功于操作系统的文件系统缓存以及 WAL 的 block 缓存。
性能快速稳定 MemDB 写入操作最多需要一次对当前打开文件的尾部的寻址,然后进行追加写入,写入后会更新内存。这个流程不会受到数据库数据量大小的影响,因此性能稳定。
崩溃恢复快速 使用 MemDB 的崩溃恢复很容易也很快,因为 MemDB 文件是只追加写入一次的。恢复操作需要检查记录并验证CRC数据,以确保数据一致。
备份简单 在大多数系统中,备份可能非常复杂。MemDB 通过其只追加写入一次的磁盘格式简化了此过程。任何按磁盘块顺序存档或复制文件的工具都将正确备份或复制 MemDB 数据库。
批处理操作可以保证原子性、一致性和持久性 MemDB 支持批处理操作,这些操作是原子、一致和持久的。批处理中的新写入操作在提交之前被缓存在内存中。如果批处理成功提交,批处理中的所有写入操作将持久保存到磁盘。如果批处理失败,批处理中的所有写入操作将被丢弃。 即一个批处理操作中的所有写入操作要么全部成功,要么全部失败。
支持可以反向和正向迭代的迭代器 MemDB 支持正向和反向迭代器,这些迭代器可以在数据库中的任何位置开始迭代。迭代器可以用于扫描数据库中的所有键值对,也可以用于扫描数据库中的某个范围的键值对,迭代器从索引中获取位置信息,然后直接从磁盘中读取数据,因此迭代器的性能非常高。
支持 Watch 功能 MemDB 支持 Watch 功能,DB 中的 key 发生变化时你可以得到一个事件通知。
支持 Key 的过期时间 MemDB 支持为 key 设置过期时间,过期后 key 将被自动删除。
支持自定义排序 MemDB 支持运行时阶段自定义排序,不影响存储。

缺点

所有的 key 必须在内存中维护 MemDB 始终将所有 key 保留在内存中,这意味着您的系统必须具有足够的内存来容纳所有的 key。

快速上手

基本操作

package main

import "github.com/hupeh/memdb"

func main() {
	// 指定选项
	options := memdb.DefaultOptions
	options.DirPath = "/tmp/memdb_basic"

	// 打开数据库
	db, err := memdb.Open(options)
	if err != nil {
		panic(err)
	}
	defer func() {
		_ = db.Close()
	}()

	// 设置键值对
	err = db.Put([]byte("name"), []byte("memdb"))
	if err != nil {
		panic(err)
	}

	// 获取键值对
	val, err := db.Get([]byte("name"))
	if err != nil {
		panic(err)
	}
	println(string(val))

	// 删除键值对
	err = db.Delete([]byte("name"))
	if err != nil {
		panic(err)
	}
}

批处理操作

	// 创建批处理
	batch := db.NewBatch(memdb.DefaultBatchOptions)

	// 设置键值对
	_ = batch.Put([]byte("name"), []byte("memdb"))

	// 获取键值对
	val, _ := batch.Get([]byte("name"))
	println(string(val))

	// 删除键值对
	_ = batch.Delete([]byte("name"))

	// 提交批处理
	_ = batch.Commit()

完整代码可查看 examples 示例代码

Documentation

Index

Constants

View Source
const (
	B  = 1
	KB = 1024 * B
	MB = 1024 * KB
	GB = 1024 * MB
)

Variables

View Source
var (
	ErrKeyIsEmpty      = errors.New("the key is empty")
	ErrKeyNotFound     = errors.New("key not found in database")
	ErrDatabaseIsUsing = errors.New("the database directory is used by another process")
	ErrReadOnlyBatch   = errors.New("the batch is read only")
	ErrBatchCommitted  = errors.New("the batch is committed")
	ErrBatchRollbacked = errors.New("the batch is rollbacked")
	ErrDBClosed        = errors.New("the database is closed")
	ErrMergeRunning    = errors.New("the merge operation is running")
	ErrWatchDisabled   = errors.New("the watch is disabled")
)
View Source
var DefaultBatchOptions = BatchOptions{
	Sync:     true,
	ReadOnly: false,
}
View Source
var DefaultIteratorOptions = IteratorOptions{
	Prefix:          nil,
	Reverse:         false,
	ContinueOnError: false,
}
View Source
var DefaultOptions = Options{
	DirPath:           tempDBDir(),
	SegmentSize:       1 * GB,
	Sync:              false,
	BytesPerSync:      0,
	WatchQueueSize:    0,
	AutoMergeCronExpr: "",
	LessFunc:          nil,
}

Functions

This section is empty.

Types

type BTree

type BTree struct {
	// contains filtered or unexported fields
}

BTree is a memory based btree implementation of the Index interface It is a wrapper around the google/btree package: github.com/google/btree

func (*BTree) Ascend

func (mt *BTree) Ascend(handleFn func(key []byte, position *wal.ChunkPosition) (bool, error))

Ascend iterates over items in ascending order and invokes the handler function for each item. If the handler function returns false, iteration stops.

func (*BTree) AscendGreaterOrEqual

func (mt *BTree) AscendGreaterOrEqual(key []byte, handleFn func(key []byte, position *wal.ChunkPosition) (bool, error))

AscendGreaterOrEqual iterates in ascending order, starting from key >= given key, invoking handleFn. Stops if handleFn returns false.

func (*BTree) AscendRange

func (mt *BTree) AscendRange(startKey, endKey []byte, handleFn func(key []byte, position *wal.ChunkPosition) (bool, error))

AscendRange iterates in ascending order within [startKey, endKey], invoking handleFn. Stops if handleFn returns false.

func (*BTree) Delete

func (mt *BTree) Delete(key []byte) (*wal.ChunkPosition, bool)

Delete the index of the key.

func (*BTree) Descend

func (mt *BTree) Descend(handleFn func(key []byte, position *wal.ChunkPosition) (bool, error))

Descend iterates over items in descending order and invokes the handler function for each item. If the handler function returns false, iteration stops.

func (*BTree) DescendLessOrEqual

func (mt *BTree) DescendLessOrEqual(key []byte, handleFn func(key []byte, position *wal.ChunkPosition) (bool, error))

DescendLessOrEqual iterates in descending order, starting from key <= given key, invoking handleFn. Stops if handleFn returns false.

func (*BTree) DescendRange

func (mt *BTree) DescendRange(startKey, endKey []byte, handleFn func(key []byte, position *wal.ChunkPosition) (bool, error))

DescendRange iterates in descending order within [startKey, endKey], invoking handleFn. Stops if handleFn returns false.

func (*BTree) Get

func (mt *BTree) Get(key []byte) *wal.ChunkPosition

Get the position of the key in the index.

func (*BTree) Iterator

func (mt *BTree) Iterator(reverse bool) *BTreeIterator

Iterator returns an index iterator.

func (*BTree) Put

func (mt *BTree) Put(key []byte, position *wal.ChunkPosition) *wal.ChunkPosition

Put key and position into the index.

func (*BTree) Size

func (mt *BTree) Size() int

Size represents the number of keys in the index.

type BTreeIterator

type BTreeIterator struct {
	// contains filtered or unexported fields
}

BTreeIterator represents a B-tree index iterator

func (*BTreeIterator) Close

func (it *BTreeIterator) Close()

Close releases the resources associated with the iterator.

func (*BTreeIterator) Key

func (it *BTreeIterator) Key() []byte

Key returns the key of the current element.

func (*BTreeIterator) Next

func (it *BTreeIterator) Next()

Next moves the cursor to the next element.

func (*BTreeIterator) Rewind

func (it *BTreeIterator) Rewind()

Rewind resets the iterator to its initial position.

func (*BTreeIterator) Seek

func (it *BTreeIterator) Seek(key []byte)

Seek positions the cursor to the element with the specified key.

func (*BTreeIterator) Valid

func (it *BTreeIterator) Valid() bool

Valid checks if the iterator is still valid for reading.

func (*BTreeIterator) Value

func (it *BTreeIterator) Value() *wal.ChunkPosition

Value returns the value (chunk position) of the current element.

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch is a batch operations of the database. If readonly is true, you can only get data from the batch by Get method. An error will be returned if you try to use Put or Delete method.

If readonly is false, you can use Put and Delete method to write data to the batch. The data will be written to the database permanently after you call Commit method.

NB. There can only one write batch and multi read-only batches at the same time. And the db method is not allowed to use before the batch commit/rollback. So a typical usage of Batch is like:

batch := db.NewBatch(memdb.DefaultBatchOptions) batch.Put/batch.Get (and other methods) /* 1. a new write batch is not allowed */ /* 2. invoke DB method is not allowed, like db.Put */ batch.Commit() or batch.Rollback()

Batch is not a transaction, it does not guarantee isolation. But it can guarantee atomicity, consistency and durability(if the Sync options is true).

You must call Commit or Rollback method after using the batch, otherwise the DB will be locked in an unexpected way.

func (*Batch) Commit

func (b *Batch) Commit() error

Commit commits the batch, if the batch is readonly or empty, it will return directly.

It will iterate the pendingWrites and write the data to the database, then write a record to indicate the end of the batch to guarantee atomicity. Finally, it will write the index.

func (*Batch) Delete

func (b *Batch) Delete(key []byte) error

Delete marks a key for deletion in the batch.

func (*Batch) Exist

func (b *Batch) Exist(key []byte) (bool, error)

Exist checks if the key exists in the database.

func (*Batch) Expire

func (b *Batch) Expire(key []byte, ttl time.Duration) error

Expire sets the ttl of the key.

func (*Batch) Get

func (b *Batch) Get(key []byte) ([]byte, error)

Get retrieves the value associated with a given key from the batch.

func (*Batch) Persist

func (b *Batch) Persist(key []byte) error

Persist removes the ttl of the key.

func (*Batch) Put

func (b *Batch) Put(key []byte, value []byte) error

Put adds a key-value pair to the batch for writing.

func (*Batch) PutWithTTL

func (b *Batch) PutWithTTL(key []byte, value []byte, ttl time.Duration) error

PutWithTTL adds a key-value pair with ttl to the batch for writing.

func (*Batch) Rollback

func (b *Batch) Rollback() error

Rollback discards an uncommitted batch instance. the discard operation will clear the buffered data and release the lock.

func (*Batch) TTL

func (b *Batch) TTL(key []byte) (time.Duration, error)

TTL returns the ttl of the key.

type BatchOptions

type BatchOptions struct {
	// Sync has the same semantics as Options.Sync.
	Sync bool
	// ReadOnly specifies whether the batch is read only.
	ReadOnly bool
}

BatchOptions specifies the options for creating a batch.

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB represents a ROSEDB database instance. It is built on the bitcask model, which is a log-structured storage. It uses WAL to write data, and uses an in-memory index to store the key and the position of the data in the WAL, the index will be rebuilt when the database is opened.

The main advantage of ROSEDB is that it is very fast to write, read, and delete data. Because it only needs one disk IO to complete a single operation.

But since we should store all keys and their positions(index) in memory, our total data size is limited by the memory size.

So if your memory can almost hold all the keys, ROSEDB is the perfect storage engine for you.

func Open

func Open(options Options) (*DB, error)

Open a database with the specified options. If the database directory does not exist, it will be created automatically.

Multiple processes can not use the same database directory at the same time, otherwise it will return ErrDatabaseIsUsing.

It will open the wal files in the database directory and load the index from them. Return the DB instance, or an error if any.

func (*DB) Ascend

func (db *DB) Ascend(handleFn func(k []byte, v []byte) (bool, error))

Ascend calls handleFn for each key/value pair in the db in ascending order.

func (*DB) AscendGreaterOrEqual

func (db *DB) AscendGreaterOrEqual(key []byte, handleFn func(k []byte, v []byte) (bool, error))

AscendGreaterOrEqual calls handleFn for each key/value pair in the db with keys greater than or equal to the given key.

func (*DB) AscendKeys

func (db *DB) AscendKeys(pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) error

AscendKeys calls handleFn for each key in the db in ascending order. Since our expiry time is stored in the value, if you want to filter expired keys, you need to set parameter filterExpired to true. But the performance will be affected. Because we need to read the value of each key to determine if it is expired.

func (*DB) AscendKeysRange

func (db *DB) AscendKeysRange(startKey, endKey, pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) error

AscendKeysRange calls handleFn for keys within a range in the db in ascending order. Since our expiry time is stored in the value, if you want to filter expired keys, you need to set parameter filterExpired to true. But the performance will be affected. Because we need to read the value of each key to determine if it is expired.

func (*DB) AscendRange

func (db *DB) AscendRange(startKey, endKey []byte, handleFn func(k []byte, v []byte) (bool, error))

AscendRange calls handleFn for each key/value pair in the db within the range [startKey, endKey] in ascending order.

func (*DB) Close

func (db *DB) Close() error

Close the database, close all data files and release file lock. Set the closed flag to true. The DB instance cannot be used after closing.

func (*DB) Delete

func (db *DB) Delete(key []byte) error

Delete the specified key from the database. Actually, it will open a new batch and commit it. You can think the batch has only one Delete operation.

func (*DB) DeleteExpiredKeys

func (db *DB) DeleteExpiredKeys(timeout time.Duration) error

DeleteExpiredKeys scan the entire index in ascending order to delete expired keys. It is a time-consuming operation, so we need to specify a timeout to prevent the DB from being unavailable for a long time.

func (*DB) Descend

func (db *DB) Descend(handleFn func(k []byte, v []byte) (bool, error))

Descend calls handleFn for each key/value pair in the db in descending order.

func (*DB) DescendKeys

func (db *DB) DescendKeys(pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) error

DescendKeys calls handleFn for each key in the db in descending order. Since our expiry time is stored in the value, if you want to filter expired keys, you need to set parameter filterExpired to true. But the performance will be affected. Because we need to read the value of each key to determine if it is expired.

func (*DB) DescendKeysRange

func (db *DB) DescendKeysRange(startKey, endKey, pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) error

DescendKeysRange calls handleFn for keys within a range in the db in descending order. Since our expiry time is stored in the value, if you want to filter expired keys, you need to set parameter filterExpired to true. But the performance will be affected. Because we need to read the value of each key to determine if it is expired.

func (*DB) DescendLessOrEqual

func (db *DB) DescendLessOrEqual(key []byte, handleFn func(k []byte, v []byte) (bool, error))

DescendLessOrEqual calls handleFn for each key/value pair in the db with keys less than or equal to the given key.

func (*DB) DescendRange

func (db *DB) DescendRange(startKey, endKey []byte, handleFn func(k []byte, v []byte) (bool, error))

DescendRange calls handleFn for each key/value pair in the db within the range [startKey, endKey] in descending order.

func (*DB) Exist

func (db *DB) Exist(key []byte) (bool, error)

Exist checks if the specified key exists in the database. Actually, it will open a new batch and commit it. You can think the batch has only one Exist operation.

func (*DB) Expire

func (db *DB) Expire(key []byte, ttl time.Duration) error

Expire sets the ttl of the key.

func (*DB) Get

func (db *DB) Get(key []byte) ([]byte, error)

Get the value of the specified key from the database. Actually, it will open a new batch and commit it. You can think the batch has only one Get operation.

func (*DB) Merge

func (db *DB) Merge(reopenAfterDone bool) error

Merge merges all the data files in the database. It will iterate all the data files, find the valid data, and rewrite the data to the new data file.

Merge operation maybe a very time-consuming operation when the database is large. So it is recommended to perform this operation when the database is idle.

If reopenAfterDone is true, the original file will be replaced by the merge file, and db's index will be rebuilt after the merge completes.

func (*DB) NewBatch

func (db *DB) NewBatch(options BatchOptions) *Batch

NewBatch creates a new Batch instance.

func (*DB) NewIterator

func (db *DB) NewIterator(opts IteratorOptions) *Iterator

NewIterator initializes and returns a new database iterator with the specified options. The iterator is automatically positioned at the first valid entry.

func (*DB) Persist

func (db *DB) Persist(key []byte) error

Persist removes the ttl of the key. If the key does not exist or expired, it will return ErrKeyNotFound.

func (*DB) Put

func (db *DB) Put(key []byte, value []byte) error

Put a key-value pair into the database. Actually, it will open a new batch and commit it. You can think the batch has only one Put operation.

func (*DB) PutWithTTL

func (db *DB) PutWithTTL(key []byte, value []byte, ttl time.Duration) error

PutWithTTL a key-value pair into the database, with a ttl. Actually, it will open a new batch and commit it. You can think the batch has only one PutWithTTL operation.

func (*DB) Stat

func (db *DB) Stat() *Stat

Stat returns the statistics of the database.

func (*DB) Sync

func (db *DB) Sync() error

Sync all data files to the underlying storage.

func (*DB) TTL

func (db *DB) TTL(key []byte) (time.Duration, error)

TTL get the ttl of the key.

func (*DB) Watch

func (db *DB) Watch() (<-chan *Event, error)

type Event

type Event struct {
	Action  WatchActionType
	Key     []byte
	Value   []byte
	BatchId uint64
}

Event is the event that occurs when the database is modified. It is used to synchronize the watch of the database.

type IndexRecord

type IndexRecord struct {
	// contains filtered or unexported fields
}

IndexRecord is the index record of the key. It contains the key, the record type and the position of the record in the wal. Only used in start up to rebuild the index.

type Item

type Item struct {
	Key   []byte
	Value []byte
}

Item represents a key-value pair in the database.

type Iterator

type Iterator struct {
	// contains filtered or unexported fields
}

Iterator represents a database-level iterator that provides methods to traverse over the key/value pairs in the database. It wraps the index iterator and adds functionality to retrieve the actual values from the database.

func (*Iterator) Close

func (it *Iterator) Close()

Close releases all resources associated with the iterator.

func (*Iterator) Err

func (it *Iterator) Err() error

Err returns the last error encountered during iteration.

func (*Iterator) Item

func (it *Iterator) Item() *Item

Item retrieves the current key-value pair as an Item.

func (*Iterator) Next

func (it *Iterator) Next()

Next advances the iterator to the next valid entry in the database.

func (*Iterator) Rewind

func (it *Iterator) Rewind()

Rewind repositions the iterator to its initial state based on the iteration order. After repositioning, it automatically skips any invalid or expired entries.

func (*Iterator) Seek

func (it *Iterator) Seek(key []byte)

Seek positions the iterator at a specific key in the database. After seeking, it automatically skips any invalid or expired entries.

func (*Iterator) Valid

func (it *Iterator) Valid() bool

Valid checks if the iterator is currently positioned at a valid entry.

type IteratorOptions

type IteratorOptions struct {
	// Prefix specifies a key prefix for filtering. If set, the iterator will only
	// traverse keys that start with this prefix. Default is empty (no filtering).
	Prefix []byte

	// Reverse determines the traversal order. If true, the iterator will traverse
	// in descending order. Default is false (ascending order).
	Reverse bool

	// ContinueOnError determines how the iterator handles errors during iteration.
	// If true, the iterator will log errors and continue to the next entry.
	// If false, the iterator will stop and become invalid when an error occurs.
	ContinueOnError bool
}

IteratorOptions defines configuration options for creating a new iterator.

type LogRecord

type LogRecord struct {
	Key     []byte
	Value   []byte
	Type    LogRecordType
	BatchId uint64
	Expire  int64
}

LogRecord is the log record of the key/value pair. It contains the key, the value, the record type and the batch id It will be encoded to byte slice and written to the wal.

func (*LogRecord) IsExpired

func (lr *LogRecord) IsExpired(now int64) bool

IsExpired checks whether the log record is expired.

type LogRecordType

type LogRecordType = byte

LogRecordType is the type of the log record.

const (
	// LogRecordNormal is the normal log record type.
	LogRecordNormal LogRecordType = iota
	// LogRecordDeleted is the deleted log record type.
	LogRecordDeleted
	// LogRecordBatchFinished is the batch finished log record type.
	LogRecordBatchFinished
)

type Options

type Options struct {
	// DirPath specifies the directory path where the WAL segment files will be stored.
	DirPath string

	// SegmentSize specifies the maximum size of each segment file in bytes.
	SegmentSize int64

	// Sync is whether to synchronize writes through os buffer cache and down onto the actual disk.
	// Setting sync is required for durability of a single write operation, but also results in slower writes.
	//
	// If false, and the machine crashes, then some recent writes may be lost.
	// Note that if it is just the process that crashes (machine does not) then no writes will be lost.
	//
	// In other words, Sync being false has the same semantics as a write
	// system call. Sync being true means write followed by fsync.
	Sync bool

	// BytesPerSync specifies the number of bytes to write before calling fsync.
	BytesPerSync uint32

	// WatchQueueSize the cache length of the watch queue.
	// if the size greater than 0, which means enable the watch.
	WatchQueueSize uint64

	// AutoMergeEnable enable the auto merge.
	// auto merge will be triggered when cron expr is satisfied.
	// cron expression follows the standard cron expression.
	// e.g. "0 0 * * *" means merge at 00:00:00 every day.
	// it also supports seconds optionally.
	// when enable the second field, the cron expression will be like this: "0/10 * * * * *" (every 10 seconds).
	// when auto merge is enabled, the db will be closed and reopened after merge done.
	// do not set this shecule too frequently, it will affect the performance.
	// refer to https://en.wikipedia.org/wiki/Cron
	AutoMergeCronExpr string

	// LessFunc is used for custom index sorting
	LessFunc func(key1, key2 []byte) bool
}

Options specifies the options for opening a database.

type Stat

type Stat struct {
	// Total number of keys
	KeysNum int
	// Total disk size of database directory
	DiskSize int64
}

Stat represents the statistics of the database.

type WatchActionType

type WatchActionType = byte
const (
	WatchActionPut WatchActionType = iota
	WatchActionDelete
)

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher temporarily stores event information, as it is generated until it is synchronized to DB's watch.

If the event is overflow, It will remove the oldest data, even if event hasn't been read yet.

func NewWatcher

func NewWatcher(capacity uint64) *Watcher

Directories

Path Synopsis
examples
basic command
batch command
iterate command
merge command
ttl command
watch command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL