kv

package module
v1.14.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2025 License: BSD-2-Clause Imports: 16 Imported by: 14

README

KV - Key-Value Store Abstraction Library

A Go library providing unified interfaces for key-value stores. This library abstracts different KV database implementations (BadgerDB, BoltDB, in-memory) behind common interfaces, enabling easy switching between backends without code changes.

Features

  • Unified Interface: Common API for different key-value stores
  • Generic Type Safety: Full Go 1.18+ generics support for type-safe operations
  • Transaction Support: Read and write transactions with proper error handling
  • Bucket Management: Organized data storage with bucket abstraction
  • Metrics Integration: Built-in Prometheus metrics support
  • Relation Store: Bidirectional 1:N relationship management
  • Benchmarking: Performance testing framework included

Quick Start

Installation
go get github.com/bborbe/kv
Basic Usage
package main

import (
    "context"
    "fmt"
    "github.com/bborbe/kv"
    "github.com/bborbe/badgerkv" // or boltkv, memorykv
)

type User struct {
    ID   string `json:"id"`
    Name string `json:"name"`
    Email string `json:"email"`
}

func main() {
    ctx := context.Background()
    
    // Create database instance (example with badgerkv)
    db, err := badgerkv.Open("/tmp/mydb")
    if err != nil {
        panic(err)
    }
    defer db.Close()
    
    // Create a type-safe store
    userStore := kv.NewStore[string, User](db, kv.BucketName("users"))
    
    // Add a user
    user := User{ID: "123", Name: "Alice", Email: "alice@example.com"}
    if err := userStore.Add(ctx, user.ID, user); err != nil {
        panic(err)
    }
    
    // Get a user
    retrievedUser, err := userStore.Get(ctx, "123")
    if err != nil {
        panic(err)
    }
    fmt.Printf("User: %+v\n", *retrievedUser)
    
    // Check if user exists
    exists, err := userStore.Exists(ctx, "123")
    if err != nil {
        panic(err)
    }
    fmt.Printf("User exists: %v\n", exists)
}
Using Transactions
// Manual transaction control
err := db.Update(ctx, func(ctx context.Context, tx kv.Tx) error {
    bucket, err := tx.CreateBucket(kv.BucketName("users"))
    if err != nil {
        return err
    }
    
    userData, _ := json.Marshal(user)
    return bucket.Put(ctx, []byte("123"), userData)
})
Iterating Over Data
// Stream all users
userCh := make(chan User, 10)
go func() {
    defer close(userCh)
    userStore.Stream(ctx, userCh)
}()

for user := range userCh {
    fmt.Printf("User: %+v\n", user)
}

// Or map over all users
userStore.Map(ctx, func(ctx context.Context, key string, user User) error {
    fmt.Printf("Key: %s, User: %+v\n", key, user)
    return nil
})

Architecture

Interface Hierarchy

The library uses a layered interface approach:

  1. DB Interface - Database lifecycle management

    • Update() - Write transactions
    • View() - Read-only transactions
    • Sync(), Close(), Remove() - Database management
  2. Transaction Interface - Bucket operations within transactions

    • Bucket(), CreateBucket(), DeleteBucket() - Bucket management
    • ListBucketNames() - Bucket enumeration
  3. Bucket Interface - Key-value operations

    • Put(), Get(), Delete() - Basic operations
    • Iterator(), IteratorReverse() - Iteration support
  4. Generic Store Layer - Type-safe operations

    • Uses Go generics: Store[KEY ~[]byte | ~string, OBJECT any]
    • Automatic JSON marshaling/unmarshaling
    • Composed interfaces: StoreAdder, StoreGetter, StoreRemover, etc.
Advanced Features
Metrics Wrapper

Monitor your database operations with Prometheus metrics:

import "github.com/prometheus/client_golang/prometheus"

// Wrap your DB with metrics
metricsDB := kv.NewDBWithMetrics(db, prometheus.DefaultRegisterer, "myapp")
Relation Store

Manage bidirectional 1:N relationships:

relationStore := kv.NewRelationStore[string, string](db, kv.BucketName("user_groups"))

// Add relationships
relationStore.Add(ctx, "user123", "group456")
relationStore.Add(ctx, "user123", "group789")

// Query relationships
groups, err := relationStore.GetByA(ctx, "user123") // Returns: ["group456", "group789"]
users, err := relationStore.GetByB(ctx, "group456") // Returns: ["user123"]

Implementations

This library defines interfaces implemented by three concrete packages:

  • badgerkv - BadgerDB implementation (LSM-tree, high performance)
  • boltkv - BoltDB implementation (B+ tree, ACID compliance)
  • memorykv - In-memory implementation (testing/development)
Switching Implementations
// BadgerDB for high-performance scenarios
db, err := badgerkv.Open("/path/to/badger")

// BoltDB for ACID compliance
db, err := boltkv.Open("/path/to/bolt.db")

// In-memory for testing
db := memorykv.New()

Testing

The library includes comprehensive test suites that can be reused for implementations:

import "github.com/bborbe/kv"

// Use the basic test suite for your implementation
var _ = Describe("MyKV", func() {
    kv.BasicTestSuite(func() kv.Provider {
        return mykvProvider{}
    })
})

Development

Prerequisites
  • Go 1.18+ (for generics support)
Commands
# Full pre-commit pipeline
make precommit

# Run tests
make test

# Format code
make format

# Generate mocks
make generate

# Check code quality
make check

License

This project is licensed under the BSD-style license. See the LICENSE file for details.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BucketAlreadyExistsError = errors.New("bucket already exists")
View Source
var BucketNotFoundError = errors.New("bucket not found")
View Source
var KeyNotFoundError = errors.New("key not found")

KeyNotFoundError is currently not return, but can be used as common error

View Source
var TransactionAlreadyOpenError = errors.New("transaction already open")

Functions

func BasicTestSuite

func BasicTestSuite(provider Provider)

func BucketTestSuite

func BucketTestSuite(provider Provider)

func Count

func Count(ctx context.Context, bucket Bucket) (int64, error)

func ForEach

func ForEach(
	ctx context.Context,
	bucket Bucket,
	fn func(item Item) error,
) error

func IteratorTestSuite

func IteratorTestSuite(provider Provider)

func NewResetBucketHandler

func NewResetBucketHandler(db DB, cancel context.CancelFunc) http.Handler

NewResetBucketHandler returns a http.Handler that allow delete a bucket

func NewResetHandler

func NewResetHandler(db DB, cancel context.CancelFunc) http.Handler

NewResetHandler returns a http.Handler that allow delete the complete database

func RelationStoreTestSuite

func RelationStoreTestSuite(provider Provider)

Types

type Bucket

type Bucket interface {
	Put(ctx context.Context, key []byte, value []byte) error
	Get(ctx context.Context, bytes []byte) (Item, error)
	Delete(ctx context.Context, bytes []byte) error
	Iterator() Iterator
	IteratorReverse() Iterator
}

type BucketName

type BucketName []byte

func BucketFromStrings

func BucketFromStrings(values ...string) BucketName

func NewBucketName

func NewBucketName(name string) BucketName

func (BucketName) Bytes

func (b BucketName) Bytes() []byte

func (BucketName) Equal

func (b BucketName) Equal(value BucketName) bool

func (BucketName) String

func (b BucketName) String() string

type BucketNames

type BucketNames []BucketName

func (BucketNames) Contains

func (t BucketNames) Contains(value BucketName) bool

type DB

type DB interface {
	// Update opens a write transaction
	Update(ctx context.Context, fn func(ctx context.Context, tx Tx) error) error

	// View opens a read only transaction
	View(ctx context.Context, fn func(ctx context.Context, tx Tx) error) error

	// Sync database to disk
	Sync() error

	// Close database
	Close() error

	// Remove database files from disk
	Remove() error
}

func NewDBWithMetrics added in v1.13.0

func NewDBWithMetrics(
	db DB,
	metrics Metrics,
) DB

type FuncTx added in v1.14.0

type FuncTx func(ctx context.Context, tx Tx) error

FuncTx interface for all run utils.

func (FuncTx) Run added in v1.14.0

func (r FuncTx) Run(ctx context.Context, tx Tx) error

Run the func

type Item

type Item interface {
	Exists() bool
	Key() []byte
	Value(fn func(val []byte) error) error
}

func NewByteItem

func NewByteItem(
	key []byte,
	value []byte,
) Item

type Iterator

type Iterator interface {
	Close()
	Item() Item
	Next()
	Valid() bool
	Rewind()
	Seek(key []byte)
}

type Key

type Key []byte

func (Key) Bytes

func (f Key) Bytes() []byte

func (Key) String

func (f Key) String() string

type Metrics added in v1.13.0

type Metrics interface {
	DbUpdateInc()
	DbViewInc()
}

func NewMetrics added in v1.13.0

func NewMetrics() Metrics

type Provider

type Provider interface {
	Get(ctx context.Context) (DB, error)
}

type ProviderFunc

type ProviderFunc func(ctx context.Context) (DB, error)

func (ProviderFunc) Get

func (p ProviderFunc) Get(ctx context.Context) (DB, error)

type RelationStore

type RelationStore[ID ~[]byte | ~string, RelatedID ~[]byte | ~string] interface {
	// Add the given relationIDs to ID
	Add(ctx context.Context, id ID, relatedIds []RelatedID) error
	// Replace all relations of id with the given
	Replace(ctx context.Context, id ID, relatedIds []RelatedID) error
	// Remove all relation from ID to the given
	Remove(ctx context.Context, id ID, relatedIds []RelatedID) error
	// Delete ID and all relations
	Delete(ctx context.Context, id ID) error
	// RelatedIDs return all relation of ID
	RelatedIDs(ctx context.Context, id ID) ([]RelatedID, error)
	// IDs return all ids of RelatedID
	IDs(ctx context.Context, relatedId RelatedID) ([]ID, error)
	// StreamIDs return all existing IDs
	StreamIDs(ctx context.Context, ch chan<- ID) error
	// StreamRelatedIDs return all existing relationIDs
	StreamRelatedIDs(ctx context.Context, ch chan<- RelatedID) error
	// MapIDRelations maps all entry to the given func
	MapIDRelations(ctx context.Context, fn func(ctx context.Context, key ID, relatedIDs []RelatedID) error) error
	// MapRelationIDs maps all entry to the given func
	MapRelationIDs(ctx context.Context, fn func(ctx context.Context, key RelatedID, ids []ID) error) error
	// Invert returns the same store with flipped ID <-> RelationID
	Invert() RelationStore[RelatedID, ID]
}

RelationStore implement a forward and backword id lookup for a 1:N relation.

func NewRelationStore

func NewRelationStore[ID ~[]byte | ~string, RelatedID ~[]byte | ~string](db DB, name string) RelationStore[ID, RelatedID]

func NewRelationStoreFromRelationStoreTx added in v1.12.0

func NewRelationStoreFromRelationStoreTx[ID ~[]byte | ~string, RelatedID ~[]byte | ~string](
	db DB,
	storeTx RelationStoreTx[ID, RelatedID],
) RelationStore[ID, RelatedID]

type RelationStoreString

type RelationStoreString RelationStore[string, string]

type RelationStoreTx

type RelationStoreTx[ID ~[]byte | ~string, RelatedID ~[]byte | ~string] interface {
	// Add the given relationIDs to ID
	Add(ctx context.Context, tx Tx, id ID, relatedIds []RelatedID) error
	// Replace all relations of id with the given
	Replace(ctx context.Context, tx Tx, id ID, relatedIds []RelatedID) error
	// Remove all relation from ID to the given
	Remove(ctx context.Context, tx Tx, id ID, relatedIds []RelatedID) error
	// Delete ID and all relations
	Delete(ctx context.Context, tx Tx, id ID) error
	// RelatedIDs return all relation of ID
	RelatedIDs(ctx context.Context, tx Tx, id ID) ([]RelatedID, error)
	// IDs return all ids of RelatedID
	IDs(ctx context.Context, tx Tx, relatedId RelatedID) ([]ID, error)
	// StreamIDs return all existing IDs
	StreamIDs(ctx context.Context, tx Tx, ch chan<- ID) error
	// StreamRelatedIDs return all existing relationIDs
	StreamRelatedIDs(ctx context.Context, tx Tx, ch chan<- RelatedID) error
	// Invert returns the same store with flipped ID <-> RelationID
	Invert() RelationStoreTx[RelatedID, ID]
	// MapIDRelations maps all entry to the given func
	MapIDRelations(ctx context.Context, tx Tx, fn func(ctx context.Context, key ID, relatedIDs []RelatedID) error) error
	// MapRelationIDs maps all entry to the given func
	MapRelationIDs(ctx context.Context, tx Tx, fn func(ctx context.Context, key RelatedID, ids []ID) error) error
}

func NewRelationStoreTx

func NewRelationStoreTx[ID ~[]byte | ~string, RelatedID ~[]byte | ~string](name string) RelationStoreTx[ID, RelatedID]

func NewRelationStoreTxWithBucket added in v1.12.0

func NewRelationStoreTxWithBucket[ID ~[]byte | ~string, RelatedID ~[]byte | ~string](
	idRelationBucket StoreTx[ID, []RelatedID],
	relationIdBucket StoreTx[RelatedID, []ID],
) RelationStoreTx[ID, RelatedID]

type RelationStoreTxString

type RelationStoreTxString RelationStoreTx[string, string]

type RunnableTx added in v1.14.0

type RunnableTx interface {
	Run(ctx context.Context, tx Tx) error
}

RunnableTx interface

type Store

type Store[KEY ~[]byte | ~string, OBJECT any] interface {
	StoreAdder[KEY, OBJECT]
	StoreRemover[KEY]
	StoreGetter[KEY, OBJECT]
	StoreMapper[KEY, OBJECT]
	StoreExists[KEY, OBJECT]
	StoreStream[KEY, OBJECT]
}

func NewStore

func NewStore[KEY ~[]byte | ~string, OBJECT any](db DB, bucketName BucketName) Store[KEY, OBJECT]

NewStore returns a Store

func NewStoreFromTx added in v1.13.1

func NewStoreFromTx[KEY ~[]byte | ~string, OBJECT any](db DB, storeTx StoreTx[KEY, OBJECT]) Store[KEY, OBJECT]

NewStoreFromTx returns a Store from a existing StoreTx

type StoreAdder

type StoreAdder[KEY ~[]byte | ~string, OBJECT any] interface {
	Add(ctx context.Context, key KEY, object OBJECT) error
}

type StoreAdderTx

type StoreAdderTx[KEY ~[]byte | ~string, OBJECT any] interface {
	Add(ctx context.Context, tx Tx, key KEY, object OBJECT) error
}

type StoreExists

type StoreExists[KEY ~[]byte | ~string, OBJECT any] interface {
	Exists(ctx context.Context, key KEY) (bool, error)
}

type StoreExistsTx

type StoreExistsTx[KEY ~[]byte | ~string, OBJECT any] interface {
	Exists(ctx context.Context, tx Tx, key KEY) (bool, error)
}

type StoreGetter

type StoreGetter[KEY ~[]byte | ~string, OBJECT any] interface {
	Get(ctx context.Context, key KEY) (*OBJECT, error)
}

type StoreGetterTx

type StoreGetterTx[KEY ~[]byte | ~string, OBJECT any] interface {
	Get(ctx context.Context, tx Tx, key KEY) (*OBJECT, error)
}

type StoreMapper

type StoreMapper[KEY ~[]byte | ~string, OBJECT any] interface {
	Map(ctx context.Context, fn func(ctx context.Context, key KEY, object OBJECT) error) error
}

type StoreMapperTx

type StoreMapperTx[KEY ~[]byte | ~string, OBJECT any] interface {
	Map(ctx context.Context, tx Tx, fn func(ctx context.Context, key KEY, object OBJECT) error) error
}

type StoreRemover

type StoreRemover[KEY ~[]byte | ~string] interface {
	Remove(ctx context.Context, key KEY) error
}

type StoreRemoverTx

type StoreRemoverTx[KEY ~[]byte | ~string] interface {
	Remove(ctx context.Context, tx Tx, key KEY) error
}

type StoreStream

type StoreStream[KEY ~[]byte | ~string, OBJECT any] interface {
	Stream(ctx context.Context, ch chan<- OBJECT) error
}

type StoreStreamTx

type StoreStreamTx[KEY ~[]byte | ~string, OBJECT any] interface {
	Stream(ctx context.Context, tx Tx, ch chan<- OBJECT) error
}

type StoreTx

type StoreTx[KEY ~[]byte | ~string, OBJECT any] interface {
	StoreAdderTx[KEY, OBJECT]
	StoreRemoverTx[KEY]
	StoreGetterTx[KEY, OBJECT]
	StoreMapperTx[KEY, OBJECT]
	StoreExistsTx[KEY, OBJECT]
	StoreStreamTx[KEY, OBJECT]
}

func NewStoreTx

func NewStoreTx[KEY ~[]byte | ~string, OBJECT any](bucketName BucketName) StoreTx[KEY, OBJECT]

type Tx

type Tx interface {
	Bucket(ctx context.Context, name BucketName) (Bucket, error)
	CreateBucket(ctx context.Context, name BucketName) (Bucket, error)
	CreateBucketIfNotExists(ctx context.Context, name BucketName) (Bucket, error)
	DeleteBucket(ctx context.Context, name BucketName) error
	ListBucketNames(ctx context.Context) (BucketNames, error)
}

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL