Documentation
¶
Overview ¶
Package hashdb implements HashDB, a mmap-backed hash index with a slab value log.
The recommended entrypoint is Open/OpenWithShards, which returns *HashDB: a thread-safe sharded store (formerly "gomap_distributed").
For single-shard usage (not thread-safe), open DB directly or use OpenSingle.
HashDB acquires an exclusive process lock on the database directory. If the directory is already open in another process, Open/OpenWithShards/OpenSingle return ErrLocked.
Note on durability: HashDB is tuned for performance and uses an append-only slab value log.
- Put/Delete are not guaranteed durable (no fsync).
- PutSync/DeleteSync fsync the slab value log so the operation survives a crash/power loss.
- ApplyBatch applies a batch atomically in-process; ApplyBatchSync is the durable, crash-atomic variant.
- Export/Restore provide a simple snapshot mechanism built on ForEach + ApplyBatchSync. Snapshot iteration order is arbitrary.
The mmap index files are treated as a derived cache; after an unclean shutdown HashDB rebuilds the index by scanning the slab log (and truncates torn tail records).
Note on sharded caching: The sharded *HashDB implementation uses a per-shard write-back cache. By default the cache has no WAL (pending writes are volatile until flushed). For advanced usage, a per-shard cache WAL can be enabled via OpenWithOptions/OpenWithShardsAndOptions.
Index ¶
- Constants
- Variables
- type Batch
- type BatchOp
- type BatchOpType
- type BatchWriter
- type CacheKV
- type CacheWALFsyncPolicy
- type CacheWALOptions
- type CachedDB
- func (c *CachedDB) Add(key []byte, value []byte) error
- func (c *CachedDB) AddMany(items []Item) error
- func (c *CachedDB) ApplyBatch(ops []BatchOp) error
- func (c *CachedDB) ApplyBatchSync(ops []BatchOp) error
- func (c *CachedDB) Clear() error
- func (c *CachedDB) Close() error
- func (c *CachedDB) Compact() error
- func (c *CachedDB) Delete(key []byte) error
- func (c *CachedDB) DeleteSync(key []byte) error
- func (c *CachedDB) Flush() error
- func (c *CachedDB) Get(key []byte) ([]byte, error)
- func (c *CachedDB) Has(key []byte) (bool, error)
- func (c *CachedDB) NewBatch() *Batch
- func (c *CachedDB) Put(key []byte, value []byte) error
- func (c *CachedDB) PutMany(items []Item) error
- func (c *CachedDB) PutSync(key []byte, value []byte) error
- func (c *CachedDB) SetCompression(enabled bool)
- func (c *CachedDB) Stats() Stats
- func (c *CachedDB) Sync() error
- func (c *CachedDB) Update(key []byte, callback func([]byte) ([]byte, error)) error
- type CachedDBOptions
- type CachedHashmap
- type DB
- func (h *DB) Add(key []byte, value []byte) error
- func (h *DB) AddMany(items []Item) error
- func (h *DB) ApplyBatch(ops []BatchOp) error
- func (h *DB) ApplyBatchSync(ops []BatchOp) error
- func (h *DB) Clear() error
- func (h *DB) Close() error
- func (h *DB) Compact() error
- func (h *DB) Delete(key []byte) error
- func (h *DB) DeleteSync(key []byte) error
- func (h *DB) Export(w io.Writer) error
- func (h *DB) ForEach(fn func(key, value []byte) error) error
- func (h *DB) Get(key []byte) ([]byte, error)
- func (h *DB) Has(key []byte) (bool, error)
- func (h *DB) New(folder string) error
- func (h *DB) NewBatch() *Batch
- func (h *DB) Open(folder string) error
- func (h *DB) Put(key []byte, value []byte) error
- func (h *DB) PutMany(items []Item) error
- func (h *DB) PutSync(key []byte, value []byte) error
- func (h *DB) ReadBytes(offset SlabOffset, n int64) ([]byte, error)
- func (h *DB) Recover() error
- func (h *DB) Restore(r io.Reader) error
- func (h *DB) SetCompression(enabled bool)
- func (h *DB) SetIndexMemoryPolicy(policy IndexMemoryPolicy)
- func (h *DB) SetResizeThreshold(percent uint64)
- func (h *DB) Stats() Stats
- func (h *DB) Sync() error
- func (h *DB) Update(key []byte, callback func([]byte) ([]byte, error)) error
- type Hash
- type HashDB
- func (h *HashDB) Add(key []byte, value []byte) error
- func (h *HashDB) AddMany(items []Item) error
- func (h *HashDB) ApplyBatch(ops []BatchOp) error
- func (h *HashDB) ApplyBatchSync(ops []BatchOp) error
- func (h *HashDB) Clear() error
- func (h *HashDB) Close() error
- func (h *HashDB) Compact() error
- func (h *HashDB) Delete(key []byte) error
- func (h *HashDB) DeleteSync(key []byte) error
- func (h *HashDB) Export(w io.Writer) error
- func (h *HashDB) Flush() error
- func (h *HashDB) ForEach(fn func(key, value []byte) error) error
- func (h *HashDB) Get(key []byte) ([]byte, error)
- func (h *HashDB) GetMany(keys [][]byte) ([][]byte, []error)
- func (h *HashDB) Has(key []byte) (bool, error)
- func (h *HashDB) New(folder string) error
- func (h *HashDB) NewBatch() *Batch
- func (h *HashDB) NewWithShards(folder string, numShards int) (err error)
- func (h *HashDB) NewWithShardsAndOptions(folder string, numShards int, opts HashDBOptions) (err error)
- func (h *HashDB) Open(folder string) error
- func (h *HashDB) OpenWithShards(folder string, numShards int) error
- func (h *HashDB) Put(key []byte, value []byte) error
- func (h *HashDB) PutMany(items []Item) error
- func (h *HashDB) PutSync(key []byte, value []byte) error
- func (h *HashDB) Restore(r io.Reader) error
- func (h *HashDB) SetCompression(enabled bool)
- func (h *HashDB) Stats() Stats
- func (h *HashDB) Sync() error
- func (h *HashDB) Update(key []byte, callback func([]byte) ([]byte, error)) error
- type HashDBOptions
- type Hashmap
- type HashmapDistributed
- type IndexMemoryPolicy
- type Item
- type KVStore
- type Key
- type ShardedDB
- type SlabOffset
- type Stats
Examples ¶
Constants ¶
const ( // SegmentBits encodes the slab segment ID width within a SlabOffset. SegmentBits = 16 // OffsetBits encodes the byte-offset width within a SlabOffset. OffsetBits = 48 )
const DefaultCapacity uint64 = 32 * 1024
DefaultCapacity is used when no capacity metadata exists on disk.
const FlagCompressed = 0x80
FlagCompressed marks slab records with s2-compressed payloads.
const (
// FlagControl marks slab records used for batch begin/commit markers.
FlagControl = 0x40
)
Variables ¶
var DefaultIndexMemoryPolicy = IndexMemoryPolicy{ LockControls: true, LockControlsStrict: false, AdviseKeysWillNeed: true, AdviseKeysRandom: true, }
DefaultIndexMemoryPolicy is the default memory pinning/advice configuration.
var ( // ErrLocked indicates the database directory is already opened by another process. ErrLocked = lockfile.ErrLocked )
var ErrSnapshotCorrupt = errors.New("hashdb: snapshot corrupt")
ErrSnapshotCorrupt indicates a snapshot stream failed validation.
var MaxSegmentSize int64 = 64 * 1024 * 1024 // 64MB
MaxSegmentSize controls the maximum bytes per slab segment.
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch buffers mutations and applies them via ApplyBatch / ApplyBatchSync.
Notes:
- For the sharded HashDB type, commits are atomic per shard but not across shards.
- Keys and values are copied into the batch so callers may reuse buffers safely.
func (*Batch) CommitSync ¶
CommitSync applies the batch and forces durability.
type BatchOp ¶
type BatchOp struct {
Type BatchOpType
Key []byte
Value []byte // only for BatchOpPut
}
BatchOp is a single mutation applied by ApplyBatch/ApplyBatchSync.
type BatchOpType ¶
type BatchOpType uint8
BatchOpType is the type of an operation in a batch.
const ( // BatchOpPut represents a put/update operation in a batch. BatchOpPut BatchOpType = iota // BatchOpDelete represents a delete operation in a batch. BatchOpDelete )
type BatchWriter ¶
type BatchWriter struct {
// contains filtered or unexported fields
}
BatchWriter buffers writes and flushes them to the underlying HashDB using PutMany to reduce syscall and hashing overhead.
Note: keys are copied into an internal arena so callers may safely reuse key buffers between Add() calls (common in hot loops). Values are not copied; the caller must not mutate the value slice until it has been flushed (explicitly via Flush() or implicitly by reaching the limit).
func NewBatchWriter ¶
func NewBatchWriter(store *HashDB, limit int) *BatchWriter
NewBatchWriter creates a new BatchWriter with the given flush threshold. A zero or negative limit defaults to 1024 items.
func (*BatchWriter) Add ¶
func (b *BatchWriter) Add(key, value []byte) error
Add buffers a key/value. It flushes automatically when the buffer reaches the limit.
type CacheKV ¶
type CacheKV struct {
// contains filtered or unexported fields
}
CacheKV buffers writes and flushes them to the underlying store. WARNING: by default there is no WAL; pending writes are lost on crash before flush.
func NewCacheKV ¶
NewCacheKV wraps a KVStore with a write-back cache. flushInterval <=0 disables timer flushes.
func NewCacheKVWithWAL ¶
func NewCacheKVWithWAL(backend KVStore, maxEntries, maxBytes int, flushInterval time.Duration, walPath string, walOpts CacheWALOptions) (*CacheKV, error)
NewCacheKVWithWAL wraps a KVStore with a write-back cache and an optional WAL. When WAL is enabled, pending writes can be recovered after a crash (depending on fsync policy).
func (*CacheKV) Close ¶
Close stops the flush loop and flushes remaining writes. Close stops background workers and closes the cache WAL if present.
func (*CacheKV) Flush ¶
Flush writes pending changes to the backend. Flush flushes queued cache entries to the backend.
type CacheWALFsyncPolicy ¶
type CacheWALFsyncPolicy uint8
CacheWALFsyncPolicy controls fsync behavior for the cache WAL.
const ( // CacheWALDisabled disables the cache WAL entirely. CacheWALDisabled CacheWALFsyncPolicy = iota // CacheWALFsyncOnSync fsyncs the WAL only when SyncWAL is called. CacheWALFsyncOnSync // CacheWALFsyncAlways fsyncs the WAL after each append. CacheWALFsyncAlways )
type CacheWALOptions ¶
type CacheWALOptions struct {
FsyncPolicy CacheWALFsyncPolicy
}
CacheWALOptions configures the optional cache WAL.
type CachedDB ¶
type CachedDB struct {
// contains filtered or unexported fields
}
CachedDB wraps a single DB with a write-back cache. By default there is no WAL; cached writes are volatile until flushed. A WAL can be enabled via NewCachedDBWithOptions.
func NewCachedDB ¶
func NewCachedDB(folder string, maxEntries, maxBytes int, flushInterval time.Duration) (*CachedDB, error)
NewCachedDB initializes a new cached DB at the given folder. maxEntries/maxBytes control flush thresholds; flushInterval <=0 disables ticker flush.
func NewCachedDBWithOptions ¶
func NewCachedDBWithOptions(folder string, maxEntries, maxBytes int, flushInterval time.Duration, opts CachedDBOptions) (*CachedDB, error)
NewCachedDBWithOptions opens a cached DB with explicit caching and WAL options.
func (*CachedDB) ApplyBatch ¶
ApplyBatch applies a set of operations at the cache layer. Pending cache writes are volatile until flushed; use ApplyBatchSync for a durable commit.
func (*CachedDB) ApplyBatchSync ¶
ApplyBatchSync flushes the write-back cache and then performs a durable batch commit on the backend DB.
func (*CachedDB) DeleteSync ¶
DeleteSync flushes the write-back cache and then performs a durable delete on the backend.
func (*CachedDB) PutSync ¶
PutSync flushes the write-back cache and then performs a durable write to the backend. Without a WAL, the cache itself is volatile; PutSync is the supported durability path.
func (*CachedDB) SetCompression ¶
SetCompression enables or disables backend compression for values.
type CachedDBOptions ¶
type CachedDBOptions struct {
CacheWAL CacheWALOptions
// IndexMemoryPolicy applies to the underlying on-disk DB's mmap index.
// This must be set before opening the DB.
IndexMemoryPolicy IndexMemoryPolicy
IndexMemoryPolicySet bool
}
CachedDBOptions configures CachedDB behavior.
type CachedHashmap ¶
type CachedHashmap = CachedDB
CachedHashmap is kept as a compatibility alias for older code. New code should use CachedDB.
func NewCachedHashmap ¶
func NewCachedHashmap(folder string, maxEntries, maxBytes int, flushInterval time.Duration) (*CachedHashmap, error)
NewCachedHashmap initializes a new cached DB at the given folder (compatibility wrapper).
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB is a single-shard HashDB instance.
It is not safe for concurrent use; prefer HashDB for most applications.
func OpenSingle ¶
OpenSingle opens (or creates) a single-shard DB rooted at dir. The single-shard DB is not thread-safe; prefer Open/OpenWithShards in most cases.
func (*DB) ApplyBatch ¶
ApplyBatch applies a set of operations atomically (all-or-nothing) in the in-memory index. It is not guaranteed durable on power loss; use ApplyBatchSync for a durable commit.
func (*DB) ApplyBatchSync ¶
ApplyBatchSync applies a set of operations atomically and fsyncs the slab value log so the full batch survives a crash/power loss.
func (*DB) Compact ¶
Compact rewrites the database to reclaim space from deleted/updated keys. It creates a new copy of the database and swaps it in.
func (*DB) DeleteSync ¶
DeleteSync removes a key and fsyncs the value log so the delete survives a crash.
Note: HashDB durability is implemented via the slab value log + crash recovery. The mmap index is treated as a derived cache and may be rebuilt on next open.
func (*DB) Export ¶
Export writes a snapshot of all live key/value pairs to w.
The iteration order is arbitrary and not stable across runs. Use TreeDB if you need ordered iteration.
Example ¶
srcDir, err := os.MkdirTemp("", "hashdb-src-*")
if err != nil {
panic(err)
}
defer os.RemoveAll(srcDir)
src, err := OpenSingle(srcDir)
if err != nil {
panic(err)
}
defer src.Close()
if err := src.PutSync([]byte("a"), []byte("1")); err != nil {
panic(err)
}
var buf bytes.Buffer
if err := src.Export(&buf); err != nil {
panic(err)
}
dstDir, err := os.MkdirTemp("", "hashdb-dst-*")
if err != nil {
panic(err)
}
defer os.RemoveAll(dstDir)
dst, err := OpenSingle(dstDir)
if err != nil {
panic(err)
}
defer dst.Close()
if err := dst.Restore(bytes.NewReader(buf.Bytes())); err != nil {
panic(err)
}
v, err := dst.Get([]byte("a"))
if err != nil {
panic(err)
}
fmt.Println(string(v))
Output: 1
func (*DB) ForEach ¶
ForEach calls fn for every live key/value pair in the DB. The iteration order is arbitrary.
DB is not goroutine-safe; the caller must not mutate the DB concurrently.
func (*DB) Get ¶
Get retrieves the value for a given key. It returns nil, nil if the key is not found.
func (*DB) PutSync ¶
PutSync inserts a key/value pair and fsyncs the value log so the write survives a crash.
Note: HashDB durability is implemented via the slab value log + crash recovery. The mmap index is treated as a derived cache and may be rebuilt on next open.
func (*DB) ReadBytes ¶
func (h *DB) ReadBytes(offset SlabOffset, n int64) ([]byte, error)
ReadBytes reads raw bytes from the slab at the given offset.
func (*DB) Recover ¶
Recover rebuilds the hash index from the slab file (WAL). It iterates through the entire slab-real file and replays operations.
func (*DB) Restore ¶
Restore reads a snapshot produced by Export and writes it into this DB.
Restore uses ApplyBatchSync and is intended for durable, repeatable restores. Existing keys are overwritten.
func (*DB) SetCompression ¶
SetCompression enables or disables value compression. Default is true.
func (*DB) SetIndexMemoryPolicy ¶
func (h *DB) SetIndexMemoryPolicy(policy IndexMemoryPolicy)
SetIndexMemoryPolicy overrides the default policy. Call before Open.
func (*DB) SetResizeThreshold ¶
SetResizeThreshold sets the load factor percentage at which the hashmap resizes. For example, 65 means resize when Count/Capacity > 0.65. Values <= 0 reset to the default of 65.
type HashDB ¶
type HashDB struct {
// contains filtered or unexported fields
}
HashDB is the primary, thread-safe HashDB implementation.
This is what older code called `gomap_distributed`: a sharded store backed by multiple underlying on-disk DB instances to maximize concurrency.
func Open ¶
Open opens (or creates) the primary HashDB store rooted at dir.
This is the sharded/distributed engine (formerly "gomap_distributed").
Example ¶
package main
import (
"fmt"
"os"
hashdb "github.com/snissn/gomap/HashDB"
)
func main() {
dir, err := os.MkdirTemp("", "hashdb-example-")
if err != nil {
panic(err)
}
defer os.RemoveAll(dir)
db, err := hashdb.Open(dir)
if err != nil {
panic(err)
}
defer db.Close()
if err := db.Put([]byte("k"), []byte("v")); err != nil {
panic(err)
}
val, err := db.Get([]byte("k"))
if err != nil {
panic(err)
}
fmt.Println(string(val))
}
Output: v
func OpenWithOptions ¶
func OpenWithOptions(dir string, opts HashDBOptions) (*HashDB, error)
OpenWithOptions opens the primary HashDB store with additional options. Options are currently intended for advanced durability/perf tuning and may evolve.
func OpenWithShards ¶
OpenWithShards opens the primary HashDB store with an explicit shard count.
func OpenWithShardsAndOptions ¶
func OpenWithShardsAndOptions(dir string, numShards int, opts HashDBOptions) (*HashDB, error)
OpenWithShardsAndOptions opens the primary HashDB store with an explicit shard count and options.
func (*HashDB) ApplyBatch ¶
ApplyBatch applies a set of operations by grouping them per shard. It is atomic per shard, but not atomic across shards.
func (*HashDB) ApplyBatchSync ¶
ApplyBatchSync applies a set of operations durably by grouping them per shard. It is atomic per shard, but not atomic across shards.
func (*HashDB) Close ¶
Close flushes and closes all shards. It is not safe to call Close concurrently with other operations.
func (*HashDB) DeleteSync ¶
DeleteSync performs a durable delete. See CachedDB.DeleteSync for details.
func (*HashDB) Export ¶
Export writes a snapshot of all live key/value pairs in the sharded store to w.
The iteration order is arbitrary and not stable across runs.
Example ¶
srcDir, err := os.MkdirTemp("", "hashdb-sharded-src-*")
if err != nil {
panic(err)
}
defer os.RemoveAll(srcDir)
src, err := OpenWithShards(srcDir, 8)
if err != nil {
panic(err)
}
defer src.Close()
if err := src.PutSync([]byte("a"), []byte("1")); err != nil {
panic(err)
}
var buf bytes.Buffer
if err := src.Export(&buf); err != nil {
panic(err)
}
dstDir, err := os.MkdirTemp("", "hashdb-sharded-dst-*")
if err != nil {
panic(err)
}
defer os.RemoveAll(dstDir)
dst, err := OpenWithShards(dstDir, 8)
if err != nil {
panic(err)
}
defer dst.Close()
if err := dst.Restore(bytes.NewReader(buf.Bytes())); err != nil {
panic(err)
}
v, err := dst.Get([]byte("a"))
if err != nil {
panic(err)
}
fmt.Println(string(v))
Output: 1
func (*HashDB) Flush ¶
Flush forces all shard-level write-back caches to flush pending writes. This is important before process exit or reopening the same on-disk store to ensure durability of recent writes.
func (*HashDB) ForEach ¶
ForEach calls fn for every live key/value pair in the sharded store. The iteration order is arbitrary.
ForEach takes an exclusive snapshot of the store: - blocks concurrent writers, - flushes shard write-back caches to the backend DBs, - and then iterates backend state.
func (*HashDB) Get ¶
Get retrieves the value for a given key. It returns nil if the key does not exist.
func (*HashDB) GetMany ¶
GetMany retrieves values for multiple keys efficiently by grouping them per shard. It returns a slice of values aligned with the input keys slice; missing keys map to nil. Errors are returned per key; nil error means the operation for that key succeeded (even if value is nil).
func (*HashDB) Has ¶
Has reports whether key exists.
Note: For the sharded HashDB type, Has is implemented as Get+nil check and may read the value. Use TreeDB if you need ordered iteration or richer read APIs.
func (*HashDB) New ¶
New initializes the sharded store with storage in the specified folder. It creates sub-directories for each partition.
func (*HashDB) NewWithShards ¶
NewWithShards initializes the sharded store with a specific number of shards.
func (*HashDB) NewWithShardsAndOptions ¶
func (h *HashDB) NewWithShardsAndOptions(folder string, numShards int, opts HashDBOptions) (err error)
NewWithShardsAndOptions initializes the sharded store with explicit options.
func (*HashDB) OpenWithShards ¶
OpenWithShards is a compatibility wrapper for older code.
func (*HashDB) PutMany ¶
PutMany inserts multiple key-value pairs efficiently. It buckets items by shard and performs parallel insertion.
func (*HashDB) Restore ¶
Restore reads a snapshot produced by Export and writes it into this sharded store.
func (*HashDB) SetCompression ¶
SetCompression enables or disables value compression on all shards. It should typically be called during initialization before serving traffic.
type HashDBOptions ¶
type HashDBOptions struct {
CacheWAL CacheWALOptions
// IndexMemoryPolicy controls memory pinning/advice for the swiss-table index
// maps of each shard's backend DB.
IndexMemoryPolicy IndexMemoryPolicy
IndexMemoryPolicySet bool
}
HashDBOptions configures sharded HashDB behavior.
type Hashmap ¶
type Hashmap = DB
Hashmap is kept as a compatibility alias for older code. New code should use DB.
type HashmapDistributed ¶
type HashmapDistributed = HashDB
HashmapDistributed is kept as a compatibility alias for older code.
type IndexMemoryPolicy ¶
type IndexMemoryPolicy struct {
LockControls bool
LockControlsStrict bool
AdviseKeysWillNeed bool
AdviseKeysRandom bool
}
IndexMemoryPolicy configures memory pinning/advice for the on-disk hash index.
Controls (SwissHash control bytes) are small and benefit from being pinned to RAM. Keys are much larger and are treated as best-effort OS hints.
type KVStore ¶
type KVStore interface {
Get(key []byte) ([]byte, error)
Put(key, value []byte) error
Delete(key []byte) error
}
KVStore is the minimal interface implemented by backends used here.
type Key ¶
type Key struct {
// contains filtered or unexported fields
}
Key holds metadata for a stored key in the hash index.
type SlabOffset ¶
type SlabOffset uint64
SlabOffset encodes a segment ID and byte offset within a slab file.
const Tombstone SlabOffset = 0xFFFFFFFFFFFFFFFF
Tombstone is a sentinel slab offset representing a deleted key.
Source Files
¶
- applybatch.go
- batch.go
- batcher.go
- cache_wal.go
- cached_db.go
- cachekv.go
- compression.go
- db.go
- doc.go
- durability.go
- errors.go
- fadvise_linux.go
- foreach.go
- getmany.go
- has.go
- hashindex.go
- index_memory.go
- memlock_unix.go
- metadata_state.go
- mmap.go
- open.go
- public_batch.go
- readat_linux.go
- recovery.go
- replacefile_unix.go
- resize.go
- sharded_db.go
- slab.go
- slab_ro_mmap.go
- snapshot.go
- sync.go
- syncdir_unix.go
- types.go
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
benchmarkmain
command
|
|
|
loadfactorbench
command
|
|
|
resizebench
command
|
|
|
shardbench
command
|
|
|
internal
|
|