sqlflow

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2026 License: MIT Imports: 15 Imported by: 0

README

sqlflow

A SQLite-backed storage layer for Go. It wraps SQLite in WAL mode with separate read/write connections, serialised writes with exponential-backoff retries, and an optional per-key connection pool backed by a Ristretto cache.

sqlflow is driver-agnostic: it imports only database/sql and works with any SQLite driver you choose. At-rest encryption is supported via SQLCipher when using the mattn driver.

All database access goes through Read and Write methods, which manage the transaction for you, so you never touch a raw connection directly.

This package works nicely with sqlc.dev, which creates named queries as methods to a type that wraps database/sql.{DB,Tx} connections.

Table of contents

Installation

go get github.com/avalonbits/sqlflow

Then pick a driver sub-package (see Driver selection below).

Contributing

See CONTRIBUTING.md.

Driver selection

sqlflow ships three driver sub-packages. Import the one you want — it registers the underlying SQLite driver and provides the ready-to-use Option in a single import, with no separate blank-import or WithDriver call needed:

Sub-package Driver CGo Encryption
github.com/avalonbits/sqlflow/drivers/mattn mattn/go-sqlite3 yes yes (SQLCipher fork)
github.com/avalonbits/sqlflow/drivers/modernc modernc.org/sqlite no no
github.com/avalonbits/sqlflow/drivers/ncruces ncruces/go-sqlite3 no no
// mattn (CGo, supports encryption):
import "github.com/avalonbits/sqlflow/drivers/mattn"
db, err := sqlflow.OpenDB(path, querier, mattn.Driver, ...)

// modernc (pure Go):
import "github.com/avalonbits/sqlflow/drivers/modernc"
db, err := sqlflow.OpenDB(path, querier, modernc.Driver, ...)

// ncruces (WebAssembly):
import "github.com/avalonbits/sqlflow/drivers/ncruces"
db, err := sqlflow.OpenDB(path, querier, ncruces.Driver, ...)

[!NOTE] mattn and ncruces both register as "sqlite3" — they cannot coexist in the same binary. modernc registers as "sqlite" and can coexist with ncruces.

Usage

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"testing/fstest"

	"github.com/avalonbits/sqlflow"
	"github.com/avalonbits/sqlflow/drivers/mattn" // or modernc / ncruces
	"github.com/avalonbits/sqlflow/migrators"
)

func main() {
	path := "/tmp/plain.db"
	os.Remove(path)

	db, err := sqlflow.OpenDB(
		// the path to your database file.
		path,
		// A Querier function — sqlflow calls it with the open transaction.
		newKV,
		// Select the driver.
		mattn.Driver,
		// migrators.Goose applies migrations on open.
		migrators.Goose(migrations),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer db.Close()

	ctx := context.Background()

	// Write starts an immediate (exclusive) transaction and runs your func
	// inside it. Blocks if another write is in progress.
	err = db.Write(ctx, func(s *kvStore) error {
		return s.Set(ctx, "hello", "world")
	})
	if err != nil {
		log.Fatal(err)
	}

	var val string

	// Read starts a deferred transaction and can run concurrently with other
	// Read calls (but not with a Write).
	err = db.Read(ctx, func(s *kvStore) error {
		var err error
		val, err = s.Get(ctx, "hello")
		return err
	})
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println(val) // world
}

// migrations is an in-memory goose migration set. In production use
// //go:embed with fs.Sub, or os.DirFS, to point at real .sql files.
var migrations = fstest.MapFS{
	"001_init.sql": {
		Data: []byte(`
            -- +goose Up
            CREATE TABLE IF NOT EXISTS kv (key TEXT PRIMARY KEY, val TEXT NOT NULL);

            -- +goose Down
            DROP TABLE kv;
        `),
	},
}

// kvStore wraps a DBTX to provide typed query methods for the kv table.
type kvStore struct{ db sqlflow.DBTX }

// newKV is a sqlflow.Querier: sqlflow calls it with the transaction's
// connection so every method on kvStore automatically runs within that
// transaction — no connection is ever passed around manually.
func newKV(db sqlflow.DBTX) *kvStore { return &kvStore{db: db} }

func (s *kvStore) Set(ctx context.Context, key, val string) error {
	_, err := s.db.ExecContext(ctx,
		`INSERT INTO kv(key,val) VALUES(?,?) ON CONFLICT(key) DO UPDATE SET val=excluded.val`,
		key, val)
	return err
}

func (s *kvStore) Get(ctx context.Context, key string) (string, error) {
	var val string
	err := s.db.QueryRowContext(ctx, `SELECT val FROM kv WHERE key=?`, key).Scan(&val)
	return val, err
}

In the typical case where you are working with a single database file, calling sqlflow.OpenDB with the path and Querier factory is analogous to sql.Open(...) with an extra factory function.

[!NOTE] Pass the plain file path — not a DSN URI. Paths containing file: or ? are rejected with an error. Use WithDSNParams or WithPragma to set connection parameters (see Connection parameters).

sqlflow.DB is generic over your Querier type — DB[kvStore] in this example — which is why the closure passed to Read and Write receives a concrete *kvStore rather than an interface. The type is fixed at construction time via the factory function, so no type assertions are needed.

From then on you call db.Read(ctx, func(q *kvStore) error { ... }) with your database code inside the querier closure for concurrent read operations or use Write when you want to perform write operations.

While there is nothing preventing you from doing write operations within the Read closure, you should avoid it: the read connection uses deferred transactions, so a write inside a Read closure can conflict with an active Write and return SQLITE_BUSY. All Write calls are serialized with each other — only one runs at a time — but concurrent Reads are always allowed, even while a Write is in progress.

Pool usage

When each user (or tenant) needs their own isolated database file, use NewPool instead of OpenDB. The pool opens databases lazily on first access and keeps them in a Ristretto cache. Call SetInactivityTimeout to start a background reaper that evicts databases that idle longer than the given duration.

Options work exactly the same way as with OpenDB — pass them as the trailing variadic arguments. The options are applied to every database the pool opens, so migrators.Goose(fsys) will run migrations on each user's database the first time it is accessed.

pool, err := sqlflow.NewPool(
    dir,                         // directory where per-user .db files are stored
    newKV,                       // same Querier factory as OpenDB
    1_000,                       // max cached open databases
    mattn.Driver,                // select driver
    migrators.Goose(migrations), // options — same as OpenDB
)
if err != nil {
    log.Fatal(err)
}
pool.SetInactivityTimeout(5 * time.Minute) // evict after 5 min idle
defer pool.Close()

ctx := context.Background()

// Read and Write take an extra key argument that selects the database.
if err := pool.Write(ctx, "alice", func(s *kvStore) error {
    return s.Set(ctx, "hello", "world")
}); err != nil {
    log.Fatal(err)
}

var val string
if err := pool.Read(ctx, "alice", func(s *kvStore) error {
    var err error
    val, err = s.Get(ctx, "hello")
    return err
}); err != nil {
    log.Fatal(err)
}

fmt.Println(val) // world

The only difference from the single-database case is the key argument ("alice" above). Everything else — the Querier type, the closure shape, the Read/Write semantics — is identical.

Encryption

sqlflow supports at-rest encryption through SQLCipher, a SQLite extension that encrypts the entire database file with AES-256.

Encryption requires mattn.Driver and the jgiannuzzi/go-sqlite3 fork (which bundles SQLCipher). Add the replace directive to your go.mod:

replace github.com/mattn/go-sqlite3 => github.com/jgiannuzzi/go-sqlite3 v1.14.35-0.20260227142656-2c447b9a2806

Then use OpenEncryptedDB (single database) or NewEncryptedPool (per-key pool). Both accept a 32-byte key; sqlflow passes it to the driver via DSN parameters at open time.

import "github.com/avalonbits/sqlflow/drivers/mattn" // must use the jgiannuzzi fork

key := make([]byte, 32) // fill with your 32-byte key

db, err := sqlflow.OpenEncryptedDB(
    path, querier, key,
    mattn.Driver,
    migrators.Goose(fsys),
)

Calling OpenEncryptedDB or NewEncryptedPool with modernc.Driver or ncruces.Driver returns sqlflow.ErrEncryptionNotSupported immediately.

Using with sqlc

sqlc generates type-safe Go query functions from SQL. It's a natural fit for sqlflow: sqlc produces a New(db DBTX) *Queries constructor and a DBTX interface that sqlflow accepts directly, so there is no adapter code to write.

Configure sqlc

A minimal sqlc.yaml for a SQLite project:

version: "2"
sql:
  - engine: "sqlite"
    queries: "queries.sql"
    schema:  "migrations/"
    gen:
      go:
        package:     "store"
        out:         "store"
        sql_package: "database/sql"

[!NOTE] Set sql_package: "database/sql" so sqlc generates a DBTX interface backed by the standard library — this is what sqlflow's DBTX is compatible with.

Write your queries
-- queries.sql

-- name: GetUser :one
SELECT id, name FROM users WHERE id = ?;

-- name: CreateUser :exec
INSERT INTO users (id, name) VALUES (?, ?);

Run sqlc generate after editing .sql files to keep the generated code in sync.

Wire it to sqlflow

Pass the generated store.New function directly as the Querier — sqlflow infers all type parameters from it:

import (
    "github.com/avalonbits/sqlflow"
    "github.com/avalonbits/sqlflow/drivers/mattn"
    "github.com/avalonbits/sqlflow/migrators"
    "myapp/store"
)

db, err := sqlflow.OpenDB(
    "/var/data/app.db",
    store.New,                   // sqlc-generated constructor, no wrapper needed
    mattn.Driver,
    migrators.Goose(migrationsFS),
)

db is a *sqlflow.DB[store.Queries, store.DBTX]. Inside Read and Write closures the *store.Queries accessor gives you fully type-safe calls:

err = db.Write(ctx, func(q *store.Queries) error {
    return q.CreateUser(ctx, store.CreateUserParams{ID: 1, Name: "Alice"})
})

err = db.Read(ctx, func(q *store.Queries) error {
    user, err := q.GetUser(ctx, 1)
    fmt.Println(user.Name) // Alice
    return err
})

Concepts

See CONCEPTS.md for a detailed explanation of Read/Write, Querier, Migrations, Options, Connection parameters, and Testing.

License

MIT — see LICENSE.

Documentation

Overview

Package sqlflow provides a SQLite-backed storage layer built on top of database/sql. It wraps SQLite in WAL mode with separate read and write connections, serialised writes, and exponential-backoff retry logic.

The two main abstractions are:

  • DB[Queries, D]: a single SQLite database whose per-transaction accessor is Queries. Use OpenDB to open (or create) a file, or TestDB for an in-memory database in tests.

  • Pool[Queries, D]: a per-key connection pool where each key (e.g. a user ID) maps to its own SQLite file on disk. Connections are cached in a ristretto TinyLFU cache and closed gracefully when evicted. Use NewPool to create one, or TestPool in tests.

All database access goes through Read and Write methods, that manage the transaction for the callers.

Both types have encrypted variants: use OpenEncryptedDB and NewEncryptedPool instead of their plain counterparts. Encryption requires the jgiannuzzi fork of mattn/go-sqlite3 with SQLCipher support and the mattn driver sub-package.

sqlflow has no built-in driver dependency. Import one of the driver sub-packages to register a SQLite driver and receive a ready-to-use Option:

import "github.com/avalonbits/sqlflow/drivers/mattn"   // mattn (CGo, encryption)
import "github.com/avalonbits/sqlflow/drivers/modernc" // modernc (pure Go)
import "github.com/avalonbits/sqlflow/drivers/ncruces" // ncruces (WebAssembly)

A driver option is required. Use one of the drivers/* sub-packages which register the driver and provide a ready-to-use Option in a single import.

Migrations are decoupled from the core: pass migrators.Goose(fsys) as an Option to run goose-based schema migrations on open, or implement your own OnOpen hook for any other migration tool.

Index

Constants

This section is empty.

Variables

View Source
var ErrEncryptionNotSupported = errors.New(
	"encryption requires a SQLCipher-enabled driver; " +
		"use the jgiannuzzi fork of mattn/go-sqlite3 with WithDriver(sqlflow.MattnDriver)",
)

ErrEncryptionNotSupported is returned by OpenEncryptedDB and NewEncryptedPool when the active driver does not support SQLCipher encryption. Use the jgiannuzzi fork of mattn/go-sqlite3 and pass WithDriver(MattnDriver).

View Source
var ErrKeyNotAvailable = errors.New("data key not available")

ErrKeyNotAvailable is returned by an encrypted Pool when the data key for a user is not in the in-memory key store.

Functions

func NoRows

func NoRows(err error) bool

NoRows reports whether err is a sql.ErrNoRows "not found" result. Use this instead of errors.Is(err, sql.ErrNoRows) for readability at call sites.

Types

type DB

type DB[Queries any, D DBTX] struct {
	// contains filtered or unexported fields
}

DB is a SQLite database handle parameterised by a per-transaction accessor type Queries. It maintains two underlying sql.DB connections:

  • wrdb: a single write connection (MaxOpenConns=1) with _txlock=immediate, serialised by a mutex so that only one writer can hold the SQLite WAL write lock at a time.
  • rddb: an unbounded pool of read connections with _txlock=deferred, allowing concurrent readers to proceed without blocking writers.

Every operation runs inside a transaction. Write calls retry on transient SQLite busy errors using exponential backoff; Read calls retry indefinitely until the context is cancelled.

func OpenDB

func OpenDB[Queries any, D DBTX](dbName string, querier Querier[Queries, D], opts ...Option) (*DB[Queries, D], error)

OpenDB opens (or creates) the SQLite database at dbName and returns an open DB. Pass migrators.Goose(fsys) as an option to run schema migrations.

func OpenEncryptedDB

func OpenEncryptedDB[Queries any, D DBTX](dbName string, querier Querier[Queries, D], key []byte, opts ...Option) (*DB[Queries, D], error)

OpenEncryptedDB opens (or creates) the SQLCipher-encrypted SQLite database at dbName and returns an open DB. Pass migrators.Goose(fsys) as an option to run schema migrations.

Encryption requires a SQLCipher-enabled driver. Use the jgiannuzzi fork of mattn/go-sqlite3 with WithDriver(MattnDriver) (the default). Other drivers return ErrEncryptionNotSupported.

func TestDB

func TestDB[Queries any, D DBTX](querier Querier[Queries, D], opts ...Option) *DB[Queries, D]

TestDB creates an in-memory SQLite database and returns a DB ready for use in tests. Pass migrators.Goose(fsys) as an option to apply schema migrations.

Panics on any error so test setup stays concise.

func (*DB[Queries, D]) Checkpoint

func (db *DB[Queries, D]) Checkpoint(ctx context.Context) error

Checkpoint runs PRAGMA wal_checkpoint(TRUNCATE) under the write mutex. WAL frames are moved into the main database file and, if all readers are done, the WAL file is reset to zero size.

func (*DB[Queries, D]) Close

func (db *DB[Queries, D]) Close() error

Close calls the OnClose hook (if any) exactly once, then closes both the read and write database connections. It waits for any in-flight operations to complete before returning.

func (*DB[Queries, D]) Read

func (db *DB[Queries, D]) Read(ctx context.Context, f func(*Queries) error) error

Read executes f inside a read-only deferred transaction. It retries on transient SQLite busy errors using exponential backoff until ctx is cancelled. Errors returned by f are treated as permanent and not retried.

func (*DB[Queries, D]) Write

func (db *DB[Queries, D]) Write(ctx context.Context, f func(*Queries) error) error

Write executes f inside an immediate (exclusive) transaction under the write mutex. It retries on transient SQLite busy errors up to backoffRetries times with exponential backoff. Errors returned by f are treated as permanent and cause an immediate rollback with no retry.

type DBTX

type DBTX interface {
	ExecContext(context.Context, string, ...any) (sql.Result, error)
	PrepareContext(context.Context, string) (*sql.Stmt, error)
	QueryContext(context.Context, string, ...any) (*sql.Rows, error)
	QueryRowContext(context.Context, string, ...any) *sql.Row
}

DBTX is the interface satisfied by both *sql.DB and *sql.Tx, allowing the same accessor type to be used within or outside a transaction.

type Option added in v0.2.0

type Option struct {
	// contains filtered or unexported fields
}

Option carries configuration for a DB or Pool instance. Construct one with OnOpen, OnClose, WithDSNParams, WithPragma, or WithDriver; passing no options is always valid.

func OnClose added in v0.2.0

func OnClose(fn func(path string, db *sql.DB)) Option

OnClose registers fn to be called after both database connections are closed. fn receives the database file path and the (now-closed) write connection. Multiple OnClose options run in registration order. fn fires at most once even if Close is called multiple times. Use OnClose to release resources tied to this DB's lifetime (e.g. lock files).

func OnOpen added in v0.2.0

func OnOpen(fn func(path string, db *sql.DB) error) Option

OnOpen registers fn to be called with the database file path and the live write connection once all connections are established and the DB is ready for use. If fn returns a non-nil error, the connections are closed and the error is propagated from the constructor.

Multiple OnOpen options run in registration order. For in-memory databases created by TestDB the path is ":memory:".

func WithDSNParams added in v0.7.0

func WithDSNParams(params string) Option

WithDSNParams adds extra SQLite DSN parameters to every connection opened by this DB or Pool. The _txlock and _journal parameters are always controlled by sqlflow and cannot be overridden. All other parameters — including _sync, _busy_timeout, and _cache_size — take the value from the last WithDSNParams option that sets them, overriding sqlflow's defaults. The _key and _cipher parameters are silently ignored; use OpenEncryptedDB or NewEncryptedPool for encrypted databases.

params is a URL query string, e.g. "_foreign_keys=1&_cache_size=50000". This option is driver-specific: mattn uses flat params (_foreign_keys=1) while modernc and ncruces use _pragma format (_pragma=foreign_keys(1)). Use WithPragma for a portable alternative that works with all drivers.

func WithDriver added in v1.0.0

func WithDriver(d drivers.Config) Option

WithDriver selects the SQLite driver used by this DB or Pool. A driver option is required. Prefer the driver sub-packages (drivers/mattn, drivers/modernc, drivers/ncruces) which register the driver and provide a ready-to-use Option in a single import.

func WithPragma added in v1.0.0

func WithPragma(name, value string) Option

WithPragma sets a SQLite PRAGMA on every connection opened by this DB or Pool. It works with all supported drivers regardless of their DSN format — mattn renders it as _name=value; modernc and ncruces render it as _pragma=name(value).

The journal_mode PRAGMA is always controlled by sqlflow and cannot be overridden via WithPragma.

type Pool

type Pool[Queries any, D DBTX] struct {
	// contains filtered or unexported fields
}

Pool is a per-key connection pool backed by a ristretto cache with TinyLFU eviction. Each key (e.g. user ID) gets its own SQLite database file under dir. When the cache evicts an entry, its DB is closed only after all in-flight operations finish (reference-counted via poolEntry).

func NewEncryptedPool

func NewEncryptedPool[Queries any, D DBTX](
	dir string, querier Querier[Queries, D], maxCached int64,
	keyProvider func(string) ([]byte, bool), opts ...Option,
) (*Pool[Queries, D], error)

NewEncryptedPool creates a Pool where each database is encrypted with SQLCipher. keyProvider is called with the pool key (e.g. user ID) each time a database is opened; it must return the 32-byte encryption key and true, or false if the key is unavailable (causing Read/Write to return ErrKeyNotAvailable). opts are applied to every database opened by the pool. Call SetInactivityTimeout to enable the background eviction reaper.

Encryption requires a SQLCipher-enabled driver. Use the jgiannuzzi fork of mattn/go-sqlite3 with the drivers/mattn sub-package. Other drivers return ErrEncryptionNotSupported when the first database is opened.

func NewPool

func NewPool[Queries any, D DBTX](
	dir string, querier Querier[Queries, D], maxCached int64, opts ...Option,
) (*Pool[Queries, D], error)

NewPool creates a plain (unencrypted) Pool backed by on-disk SQLite databases. maxCached controls the maximum number of open databases kept in the cache (minimum 1000). opts are applied to every database opened by the pool. Call SetInactivityTimeout to enable the background eviction reaper.

func TestPool

func TestPool[Queries any, D DBTX](dir string, querier Querier[Queries, D], opts ...Option) *Pool[Queries, D]

TestPool returns a plain pool backed by dir for tests. Panics on error, matching the TestDB convention.

func (*Pool[Queries, D]) Close

func (p *Pool[Queries, D]) Close() error

Close stops the inactivity reaper and closes all cached databases. sql.DB.Close waits for in-flight operations to finish, so this blocks until everything drains.

func (*Pool[Queries, D]) Evict

func (p *Pool[Queries, D]) Evict(userID string)

Evict immediately removes the pool entry for userID from the cache, closing the database once all in-flight operations finish. No-op if the entry is not cached.

func (*Pool[Queries, D]) ListKeys

func (p *Pool[Queries, D]) ListKeys() ([]string, error)

ListKeys returns the key (user ID) for every database file in the pool directory. The returned slice is sorted by filesystem order.

func (*Pool[Queries, D]) Read

func (p *Pool[Queries, D]) Read(ctx context.Context, key string, f func(*Queries) error) error

Read acquires the database for key and executes f inside a read-only deferred transaction. The pool entry's reference count is held for the duration so the database is not closed while f is running.

func (*Pool[Queries, D]) SetInactivityTimeout added in v0.6.0

func (p *Pool[Queries, D]) SetInactivityTimeout(d time.Duration)

SetInactivityTimeout starts a background reaper that evicts pool entries that have been idle for longer than d. Calling it again cancels the previous reaper and starts a new one with the updated duration. Pass 0 to stop the reaper without starting a new one.

func (*Pool[Queries, D]) Wait

func (p *Pool[Queries, D]) Wait()

Wait blocks until all pending cache evictions have been processed.

func (*Pool[Queries, D]) Write

func (p *Pool[Queries, D]) Write(ctx context.Context, key string, f func(*Queries) error) error

Write acquires the database for key and executes f inside an immediate (exclusive) transaction. The pool entry's reference count is held for the duration so the database is not closed while f is running.

type Querier

type Querier[Queries any, D DBTX] func(tx D) *Queries

Querier is a function that builds a per-transaction accessor of type Queries from a D. It is called once per transaction inside Read and Write.

D is the concrete DBTX type accepted by the accessor constructor — typically sqlflow.DBTX for hand-written code, or the package-local DBTX generated by sqlc. Using the package-local type lets you pass sqlc-generated New functions directly without any adapter wrapper.

Directories

Path Synopsis
Package drivers defines the Config type that sqlflow uses to build connection strings and detect permanent errors for a specific SQLite driver.
Package drivers defines the Config type that sqlflow uses to build connection strings and detect permanent errors for a specific SQLite driver.
mattn
Package mattn registers the github.com/mattn/go-sqlite3 SQLite driver and exports a ready-to-use sqlflow.Option that selects it.
Package mattn registers the github.com/mattn/go-sqlite3 SQLite driver and exports a ready-to-use sqlflow.Option that selects it.
modernc
Package modernc registers the modernc.org/sqlite SQLite driver (pure Go, no CGo) and exports a ready-to-use sqlflow.Option that selects it.
Package modernc registers the modernc.org/sqlite SQLite driver (pure Go, no CGo) and exports a ready-to-use sqlflow.Option that selects it.
ncruces
Package ncruces registers the github.com/ncruces/go-sqlite3 SQLite driver (WebAssembly, no CGo) and exports a ready-to-use sqlflow.Option that selects it.
Package ncruces registers the github.com/ncruces/go-sqlite3 SQLite driver (WebAssembly, no CGo) and exports a ready-to-use sqlflow.Option that selects it.
Package migrators provides migration helpers for sqlflow databases.
Package migrators provides migration helpers for sqlflow databases.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL