sqlflow

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2026 License: MIT Imports: 16 Imported by: 0

README

sqlflow

A SQLite-backed storage layer for Go. It wraps SQLite in WAL mode with separate read/write connections, serialised writes with exponential-backoff retries, and an optional per-key connection pool backed by a TinyLFU cache. At-rest encryption is supported via SQLCipher.

All database access goes through Read and Write — the core abstraction. They manage transactions automatically so you never touch a raw connection directly.

Table of Contents

Installation

go get github.com/avalonbits/sqlflow

Because sqlflow uses cgo (via go-sqlite3), you need a C compiler available at build time.

Encryption

sqlflow supports at-rest encryption through SQLCipher, a SQLite extension that encrypts the entire database file with AES-256.

To enable it, replace the standard go-sqlite3 driver with the jgiannuzzi/go-sqlite3 fork in your go.mod:

replace github.com/mattn/go-sqlite3 => github.com/jgiannuzzi/go-sqlite3 v1.14.35-0.20260227142656-2c447b9a2806

Then use GetEncryptedDB / OpenEncryptedDB (single database) or pass a keyProvider to NewEncryptedPool (per-key pool). Both accept a 32-byte key; sqlflow passes it to the driver via DSN parameters at open time.

Concepts

Read and Write

Read and Write are the core of sqlflow. Every database interaction goes through one of them — there is no way to obtain a raw connection or run a query outside a managed transaction. This is deliberate: the API makes correct transaction handling the only path forward.

// DB
func (db *DB[Q])  Read (ctx context.Context,             f func(*Q) error) error
func (db *DB[Q])  Write(ctx context.Context,             f func(*Q) error) error

// Pool
func (p *Pool[Q]) Read (ctx context.Context, key string, f func(*Q) error) error
func (p *Pool[Q]) Write(ctx context.Context, key string, f func(*Q) error) error

Both methods accept a closure f that receives a *Q — your typed query accessor — already bound to an open transaction. You call your query methods on it; sqlflow commits on success or rolls back on any error, automatically, with no extra code on your part. You cannot accidentally run a query outside a transaction, mix transactional and non-transactional calls, or forget to commit.

Read opens a deferred (read-only) transaction on a shared connection pool, so multiple goroutines may call it concurrently without blocking each other. Transient busy errors are retried with exponential backoff until ctx is cancelled.

Write opens an immediate (exclusive) transaction on the single write connection, serialised by an internal mutex so only one writer runs at a time per database. Transient busy errors are retried up to five times with exponential backoff. Errors returned by f are treated as permanent: the transaction rolls back immediately with no retry, and the original error is returned to the caller unchanged.

For Pool, the key argument (e.g. a user ID) selects which database to operate on; everything else is identical.

Querier

A Querier[Q] is a constructor function func(tx DBTX) *Q that builds your per-transaction accessor. If you use sqlc, pass db.New directly; otherwise write a thin wrapper.

type Queries struct{ db sqlflow.DBTX }

func New(tx sqlflow.DBTX) *Queries { return &Queries{db: tx} }

var querier sqlflow.Querier[Queries] = New
Migrations

sqlflow uses goose for migrations. Every open function (GetDB, NewPool, …) runs all pending migrations automatically before returning. Migrations are supplied as an fs.FS whose root contains the *.sql files directly — no subdirectory.

// Embedded at compile time — sub-root so the FS root IS the migrations dir.
//go:embed migrations
var migrationsFS embed.FS

fsys, _ := fs.Sub(migrationsFS, "migrations")

// Or directly from disk at runtime:
fsys := os.DirFS("/path/to/migrations")

// Or in-memory for tests and examples:
fsys := fstest.MapFS{
    "001_init.sql": {Data: []byte(`-- +goose Up
CREATE TABLE ...
-- +goose Down
DROP TABLE ...`)},
}
Single database — DB[Q]

GetDB creates the file and any parent directories, runs all pending goose migrations, then opens separate read and write connections in WAL mode. Use OpenDB on the hot path to skip migrations when the file already exists.

Per-key connection pool — Pool[Q]

Pool manages a collection of SQLite databases — one per key (e.g. one per user). Databases are opened lazily and kept in a TinyLFU cache; evicted databases are closed only after all in-flight operations finish.

Use NewEncryptedPool to enable per-key encryption; it requires a keyProvider function. If the key for a given user is unavailable, Read/Write return sqlflow.ErrKeyNotAvailable.

Testing

TestDB and TestPool create in-memory / temp-dir instances and panic on error, keeping test setup concise:

db   := sqlflow.TestDB(fsys, querier)
pool := sqlflow.TestPool(t.TempDir(), fsys, querier)

Examples

1. Single database — plain
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"testing/fstest"

	"github.com/avalonbits/sqlflow"
)

func main() {
	path := "/tmp/plain.db"
	os.Remove(path)

	db, err := sqlflow.GetDB(path, migrations, newKV)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	ctx := context.Background()

	if err := db.Write(ctx, func(s *kvStore) error {
		return s.Set(ctx, "hello", "world")
	}); err != nil {
		log.Fatal(err)
	}

	var val string
	if err := db.Read(ctx, func(s *kvStore) error {
		var err error
		val, err = s.Get(ctx, "hello")
		return err
	}); err != nil {
		log.Fatal(err)
	}

	fmt.Println(val) // world
}

// migrations is an in-memory goose migration set. In production use
// //go:embed with fs.Sub, or os.DirFS, to point at real .sql files.
var migrations = fstest.MapFS{
	"001_init.sql": {Data: []byte(`-- +goose Up
CREATE TABLE IF NOT EXISTS kv (key TEXT PRIMARY KEY, val TEXT NOT NULL);
-- +goose Down
DROP TABLE kv;`)},
}

// kvStore wraps a DBTX to provide typed query methods for the kv table.
type kvStore struct{ db sqlflow.DBTX }

// newKV is a sqlflow.Querier: sqlflow calls it with the transaction's
// connection so every method on kvStore automatically runs within that
// transaction — no connection is ever passed around manually.
func newKV(db sqlflow.DBTX) *kvStore { return &kvStore{db: db} }

func (s *kvStore) Set(ctx context.Context, key, val string) error {
	_, err := s.db.ExecContext(ctx,
		`INSERT INTO kv(key,val) VALUES(?,?) ON CONFLICT(key) DO UPDATE SET val=excluded.val`,
		key, val)
	return err
}

func (s *kvStore) Get(ctx context.Context, key string) (string, error) {
	var val string
	err := s.db.QueryRowContext(ctx, `SELECT val FROM kv WHERE key=?`, key).Scan(&val)
	return val, err
}
2. Single database — encrypted
package main

import (
	"context"
	"crypto/rand"
	"fmt"
	"log"
	"os"
	"testing/fstest"

	"github.com/avalonbits/sqlflow"
)

// go.mod must contain:
// replace github.com/mattn/go-sqlite3 => github.com/jgiannuzzi/go-sqlite3 v1.14.35-0.20260227142656-2c447b9a2806

func main() {
	path := "/tmp/encrypted.db"
	os.Remove(path)

	key := make([]byte, 32)
	if _, err := rand.Read(key); err != nil {
		log.Fatal(err)
	}

	db, err := sqlflow.GetEncryptedDB(path, migrations, newKV, key)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	ctx := context.Background()

	if err := db.Write(ctx, func(s *kvStore) error {
		return s.Set(ctx, "secret", "value")
	}); err != nil {
		log.Fatal(err)
	}

	var val string
	if err := db.Read(ctx, func(s *kvStore) error {
		var err error
		val, err = s.Get(ctx, "secret")
		return err
	}); err != nil {
		log.Fatal(err)
	}

	fmt.Println(val) // value
}

// migrations is an in-memory goose migration set. In production use
// //go:embed with fs.Sub, or os.DirFS, to point at real .sql files.
var migrations = fstest.MapFS{
	"001_init.sql": {Data: []byte(`-- +goose Up
CREATE TABLE IF NOT EXISTS kv (key TEXT PRIMARY KEY, val TEXT NOT NULL);
-- +goose Down
DROP TABLE kv;`)},
}

// kvStore wraps a DBTX to provide typed query methods for the kv table.
type kvStore struct{ db sqlflow.DBTX }

// newKV is a sqlflow.Querier: sqlflow calls it with the transaction's
// connection so every method on kvStore automatically runs within that
// transaction — no connection is ever passed around manually.
func newKV(db sqlflow.DBTX) *kvStore { return &kvStore{db: db} }

func (s *kvStore) Set(ctx context.Context, key, val string) error {
	_, err := s.db.ExecContext(ctx,
		`INSERT INTO kv(key,val) VALUES(?,?) ON CONFLICT(key) DO UPDATE SET val=excluded.val`,
		key, val)
	return err
}

func (s *kvStore) Get(ctx context.Context, key string) (string, error) {
	var val string
	err := s.db.QueryRowContext(ctx, `SELECT val FROM kv WHERE key=?`, key).Scan(&val)
	return val, err
}
3. Connection pool — plain
package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"testing/fstest"
	"time"

	"github.com/avalonbits/sqlflow"
)

func main() {
	dir := "/tmp/pool-plain"
	os.RemoveAll(dir)

	pool, err := sqlflow.NewPool(
		dir,
		migrations,
		newKV,
		1_000,         // max cached open databases
		5*time.Minute, // evict after 5 min idle
	)
	if err != nil {
		log.Fatal(err)
	}
	defer pool.Close()

	ctx := context.Background()
	users := []string{"alice", "bob", "carol"}

	// Each user gets their own isolated database file.
	for _, user := range users {
		user := user
		if err := pool.Write(ctx, user, func(s *kvStore) error {
			return s.Set(ctx, "greeting", "hello "+user)
		}); err != nil {
			log.Fatal(err)
		}
	}

	for _, user := range users {
		user := user
		var val string
		if err := pool.Read(ctx, user, func(s *kvStore) error {
			var err error
			val, err = s.Get(ctx, "greeting")
			return err
		}); err != nil {
			log.Fatal(err)
		}
		fmt.Println(user, "→", val)
	}
	// alice → hello alice
	// bob   → hello bob
	// carol → hello carol
}

// migrations is an in-memory goose migration set. In production use
// //go:embed with fs.Sub, or os.DirFS, to point at real .sql files.
var migrations = fstest.MapFS{
	"001_init.sql": {Data: []byte(`-- +goose Up
CREATE TABLE IF NOT EXISTS kv (key TEXT PRIMARY KEY, val TEXT NOT NULL);
-- +goose Down
DROP TABLE kv;`)},
}

// kvStore wraps a DBTX to provide typed query methods for the kv table.
type kvStore struct{ db sqlflow.DBTX }

// newKV is a sqlflow.Querier: sqlflow calls it with the transaction's
// connection so every method on kvStore automatically runs within that
// transaction — no connection is ever passed around manually.
func newKV(db sqlflow.DBTX) *kvStore { return &kvStore{db: db} }

func (s *kvStore) Set(ctx context.Context, key, val string) error {
	_, err := s.db.ExecContext(ctx,
		`INSERT INTO kv(key,val) VALUES(?,?) ON CONFLICT(key) DO UPDATE SET val=excluded.val`,
		key, val)
	return err
}

func (s *kvStore) Get(ctx context.Context, key string) (string, error) {
	var val string
	err := s.db.QueryRowContext(ctx, `SELECT val FROM kv WHERE key=?`, key).Scan(&val)
	return val, err
}
4. Connection pool — encrypted
package main

import (
	"context"
	"crypto/rand"
	"fmt"
	"log"
	"os"
	"sync"
	"testing/fstest"
	"time"

	"github.com/avalonbits/sqlflow"
)

// go.mod must contain:
// replace github.com/mattn/go-sqlite3 => github.com/jgiannuzzi/go-sqlite3 v1.14.35-0.20260227142656-2c447b9a2806

func main() {
	dir := "/tmp/pool-encrypted"
	os.RemoveAll(dir)

	store := &keyStore{keys: make(map[string][]byte)}

	pool, err := sqlflow.NewEncryptedPool(
		dir,
		migrations,
		newKV,
		1_000,
		store.Get,     // keyProvider — called per DB open
		5*time.Minute,
	)
	if err != nil {
		log.Fatal(err)
	}
	defer pool.Close()

	ctx := context.Background()

	// Simulate users logging in — each gets a unique 32-byte key.
	users := []string{"alice", "bob", "carol"}
	for _, user := range users {
		key := make([]byte, 32)
		if _, err := rand.Read(key); err != nil {
			log.Fatal(err)
		}
		store.Set(user, key)
	}

	for _, user := range users {
		user := user
		if err := pool.Write(ctx, user, func(s *kvStore) error {
			return s.Set(ctx, "secret", "data for "+user)
		}); err != nil {
			log.Fatal(err)
		}
	}

	for _, user := range users {
		user := user
		var val string
		if err := pool.Read(ctx, user, func(s *kvStore) error {
			var err error
			val, err = s.Get(ctx, "secret")
			return err
		}); err != nil {
			log.Fatal(err)
		}
		fmt.Println(user, "→", val)
	}
	// alice → data for alice
	// bob   → data for bob
	// carol → data for carol
}

// migrations is an in-memory goose migration set. In production use
// //go:embed with fs.Sub, or os.DirFS, to point at real .sql files.
var migrations = fstest.MapFS{
	"001_init.sql": {Data: []byte(`-- +goose Up
CREATE TABLE IF NOT EXISTS kv (key TEXT PRIMARY KEY, val TEXT NOT NULL);
-- +goose Down
DROP TABLE kv;`)},
}

// kvStore wraps a DBTX to provide typed query methods for the kv table.
type kvStore struct{ db sqlflow.DBTX }

// newKV is a sqlflow.Querier: sqlflow calls it with the transaction's
// connection so every method on kvStore automatically runs within that
// transaction — no connection is ever passed around manually.
func newKV(db sqlflow.DBTX) *kvStore { return &kvStore{db: db} }

func (s *kvStore) Set(ctx context.Context, key, val string) error {
	_, err := s.db.ExecContext(ctx,
		`INSERT INTO kv(key,val) VALUES(?,?) ON CONFLICT(key) DO UPDATE SET val=excluded.val`,
		key, val)
	return err
}

func (s *kvStore) Get(ctx context.Context, key string) (string, error) {
	var val string
	err := s.db.QueryRowContext(ctx, `SELECT val FROM kv WHERE key=?`, key).Scan(&val)
	return val, err
}

// keyStore simulates a session store that holds per-user encryption keys.
type keyStore struct {
	mu   sync.Mutex
	keys map[string][]byte
}

func (ks *keyStore) Set(userID string, key []byte) {
	ks.mu.Lock()
	defer ks.mu.Unlock()
	ks.keys[userID] = key
}

func (ks *keyStore) Get(userID string) ([]byte, bool) {
	ks.mu.Lock()
	defer ks.mu.Unlock()
	k, ok := ks.keys[userID]
	return k, ok
}

License

MIT — see LICENSE.

Documentation

Overview

Package sqlflow provides a SQLite-backed storage layer built on top of database/sql. It wraps SQLite in WAL mode with separate read and write connections, serialised writes, and exponential-backoff retry logic.

The two main abstractions are:

  • DB[Queries]: a single SQLite database whose per-transaction accessor is Queries. Use GetDB or OpenDB to open an existing file, or TestDB for an in-memory database in tests.

  • Pool[Queries]: a per-key connection pool where each key (e.g. a user ID) maps to its own SQLite file on disk. Connections are cached in a ristretto TinyLFU cache and closed gracefully when evicted. Use NewPool to create one, or TestPool in tests.

All database access goes through Read and Write methods, that manage the transaction for the callers.

Both types have encrypted variants: use GetEncryptedDB/OpenEncryptedDB and NewEncryptedPool instead of their plain counterparts. The jgiannuzzi fork of go-sqlite3 applies PRAGMA key via the DSN before any other pragmas.

Migrations are handled by goose. Pass an fs.FS whose root contains the *.sql migration files directly (no subdirectory). Use embed.FS or os.DirFS.

Index

Constants

This section is empty.

Variables

View Source
var ErrKeyNotAvailable = errors.New("data key not available")

ErrKeyNotAvailable is returned by an encrypted Pool when the data key for a user is not in the in-memory key store.

Functions

func NoRows

func NoRows(err error) bool

NoRows reports whether err is a sql.ErrNoRows "not found" result. Use this instead of errors.Is(err, sql.ErrNoRows) for readability at call sites.

Types

type DB

type DB[Queries any] struct {
	// contains filtered or unexported fields
}

DB is a SQLite database handle parameterised by a per-transaction accessor type Queries. It maintains two underlying sql.DB connections:

  • wrdb: a single write connection (MaxOpenConns=1) with _txlock=immediate, serialised by a mutex so that only one writer can hold the SQLite WAL write lock at a time.
  • rddb: an unbounded pool of read connections with _txlock=deferred, allowing concurrent readers to proceed without blocking writers.

Every operation runs inside a transaction. Write calls retry on transient SQLite busy errors using exponential backoff; Read calls retry indefinitely until the context is cancelled.

func GetDB

func GetDB[Queries any](dbName string, fsys fs.FS, querier Querier[Queries]) (*DB[Queries], error)

GetDB opens (or creates) the SQLite database at dbName, runs all pending migrations from fsys, and returns an open DB. fsys must contain the *.sq; migration files at its root.

func GetEncryptedDB

func GetEncryptedDB[Queries any](dbName string, fsys fs.FS, querier Querier[Queries], key []byte) (*DB[Queries], error)

GetEncryptedDB opens (or creates) the SQLCipher-encrypted SQLite database at dbName, runs all pending migrations from fsys, and returns an open DB.

func OpenDB

func OpenDB[Queries any](dbName string, fsys fs.FS, querier Querier[Queries]) (*DB[Queries], error)

OpenDB opens an existing database without running migrations. If the file does not exist yet, it falls back to GetDB (which creates and migrates it). Use this on the hot path when migrations have already been applied (e.g. via MigrateAll at startup).

func OpenEncryptedDB

func OpenEncryptedDB[Queries any](dbName string, fsys fs.FS, querier Querier[Queries], key []byte) (*DB[Queries], error)

OpenEncryptedDB opens an existing SQLCipher-encrypted database without running migrations. If the file does not exist yet, it falls back to GetEncryptedDB (which creates and migrates it).

func TestDB

func TestDB[Queries any](fsys fs.FS, querier Querier[Queries]) *DB[Queries]

TestDB creates an in-memory SQLite database, runs migrations from fsys, and returns a DB ready for use in tests.

Panics on any error so test setup stays concise. fsys must contain the *.sql migration files at its root.

func (*DB[Queries]) Checkpoint

func (db *DB[Queries]) Checkpoint(ctx context.Context) error

Checkpoint runs PRAGMA wal_checkpoint(TRUNCATE) under the write mutex. WAL frames are moved into the main database file and, if all readers are done, the WAL file is reset to zero size.

func (*DB[Queries]) Close

func (db *DB[Queries]) Close() error

Close closes both the read and write database connections. It waits for any in-flight operations to complete before returning.

func (*DB[Queries]) Read

func (db *DB[Queries]) Read(ctx context.Context, f func(*Queries) error) error

Read executes f inside a read-only deferred transaction. It retries on transient SQLite busy errors using exponential backoff until ctx is cancelled. Errors returned by f are treated as permanent and not retried.

func (*DB[Queries]) Write

func (db *DB[Queries]) Write(ctx context.Context, f func(*Queries) error) error

Write executes f inside an immediate (exclusive) transaction under the write mutex. It retries on transient SQLite busy errors up to backoffRetries times with exponential backoff. Errors returned by f are treated as permanent and cause an immediate rollback with no retry.

type DBTX

type DBTX interface {
	ExecContext(context.Context, string, ...any) (sql.Result, error)
	PrepareContext(context.Context, string) (*sql.Stmt, error)
	QueryContext(context.Context, string, ...any) (*sql.Rows, error)
	QueryRowContext(context.Context, string, ...any) *sql.Row
}

DBTX is the interface satisfied by both *sql.DB and *sql.Tx, allowing the same accessor type to be used within or outside a transaction.

type Pool

type Pool[Queries any] struct {
	// contains filtered or unexported fields
}

Pool is a per-key connection pool backed by a ristretto cache with TinyLFU eviction. Each key (e.g. user ID) gets its own SQLite database file under dir. When the cache evicts an entry, its DB is closed only after all in-flight operations finish (reference-counted via poolEntry).

func NewEncryptedPool

func NewEncryptedPool[Queries any](
	dir string, fsys fs.FS, querier Querier[Queries], maxCached int64,
	keyProvider func(string) ([]byte, bool),
	inactivityTimeout time.Duration,
) (*Pool[Queries], error)

NewEncryptedPool creates a Pool where each database is encrypted with SQLCipher. keyProvider is called with the pool key (e.g. user ID) each time a database is opened; it must return the 32-byte encryption key and true, or false if the key is unavailable (causing Read/Write to return ErrKeyNotAvailable). Migration for encrypted databases is lazy: it runs on first open when the data key is available.

func NewPool

func NewPool[Queries any](
	dir string, fsys fs.FS, querier Querier[Queries], maxCached int64,
	inactivityTimeout time.Duration,
) (*Pool[Queries], error)

NewPool creates a plain (unencrypted) Pool backed by on-disk SQLite databases. fsys must contain the *.sql migration files at its root. maxCached controls the maximum number of open databases kept in the cache (minimum 1000). inactivityTimeout, if > 0, starts a background reaper that evicts entries idle for longer than the timeout; pass 0 to disable.

func TestPool

func TestPool[Queries any](dir string, fsys fs.FS, querier Querier[Queries]) *Pool[Queries]

TestPool returns a plain pool backed by dir for tests. Panics on error, matching the TestDB convention. fsys must contain the *.sql migration files at its root.

func (*Pool[Queries]) Close

func (p *Pool[Queries]) Close() error

Close stops the inactivity reaper and closes all cached databases. sql.DB.Close waits for in-flight operations to finish, so this blocks until everything drains.

func (*Pool[Queries]) Evict

func (p *Pool[Queries]) Evict(userID string)

Evict immediately removes the pool entry for userID from the cache, closing the database once all in-flight operations finish. No-op if the entry is not cached.

func (*Pool[Queries]) ListKeys

func (p *Pool[Queries]) ListKeys() ([]string, error)

ListKeys returns the key (user ID) for every database file in the pool directory. The returned slice is sorted by filesystem order.

func (*Pool[Queries]) MigrateAll

func (p *Pool[Queries]) MigrateAll() error

MigrateAll opens every *.db file under dir, runs migrations, and closes. If a keyProvider is configured, migration is skipped (lazy per-DB migration happens in getOrCreate when the data key is available).

func (*Pool[Queries]) Read

func (p *Pool[Queries]) Read(ctx context.Context, key string, f func(*Queries) error) error

Read acquires the database for key and executes f inside a read-only deferred transaction. The pool entry's reference count is held for the duration so the database is not closed while f is running.

func (*Pool[Queries]) Wait

func (p *Pool[Queries]) Wait()

Wait blocks until all pending cache evictions have been processed.

func (*Pool[Queries]) Write

func (p *Pool[Queries]) Write(ctx context.Context, key string, f func(*Queries) error) error

Write acquires the database for key and executes f inside an immediate (exclusive) transaction. The pool entry's reference count is held for the duration so the database is not closed while f is running.

type Querier

type Querier[Queries any] func(tx DBTX) *Queries

Querier is a function that builds a per-transaction accessor of type Queries from a DBTX. It is called once per transaction inside Read and Write.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL