sqlview

package
v0.0.0-...-0b3a9fd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package sqlview provides SQL-backed StateView projections with transactional event processing.

sqlview bridges the gap between eskit's StateView pattern and SQL databases. Each projection's Evolve function receives a *sql.Tx, ensuring that read model updates are atomic. Setup and Reset functions receive a *sql.DB for DDL operations.

Level 1: Simple (in-memory)

Use eskit.StateView directly with closures over your data structures. Good for prototyping, tests, and small datasets.

Level 2: SQL (this package)

Use sqlview.Config to define projections that write to SQL tables. sqlview.From() wraps the config into an eskit.StateView with TX management. Good for production: queryable, survives restarts, rebuildable.

Atomic Checkpoint

For non-idempotent projections, set CheckpointInTx: true on the Config. This saves the subscription checkpoint within the same transaction as Evolve, preventing double processing on crash recovery.

Example

var BalanceView = sqlview.Config[domain.Event]{
    Name:       "account-balance",
    EventTypes: []string{"MoneyDeposited", "MoneyWithdrawn"},
    Setup: func(ctx context.Context, db *sql.DB) error {
        _, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS balances (
            account_id TEXT PRIMARY KEY, balance INTEGER NOT NULL DEFAULT 0)`)
        return err
    },
    Evolve: func(ctx context.Context, tx *sql.Tx, event eskit.Event[domain.Event]) error {
        switch e := event.Data.(type) {
        case domain.MoneyDeposited:
            _, err := tx.ExecContext(ctx, `INSERT INTO balances ...`, ...)
            return err
        }
        return nil
    },
}

view := sqlview.From(db, BalanceView)
sub := subscription.FromStateView(view, reader, checkpoint)

Zero-Downtime Rebuild

Use sqlview.Rebuild to rebuild a projection without downtime. It creates a shadow table, replays all events into it, then atomically swaps the tables:

err := sqlview.Rebuild(ctx, db, sqlview.RebuildConfig[domain.Event]{
    Live:       BalanceView,
    Shadow:     BalanceViewRebuild, // same logic, targets shadow table
    Table:      "balances",
    Reader:     reader,
    Checkpoint: checkpoint,
})

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func From

func From[E any](db *sql.DB, cfg Config[E]) eskit.StateView[E]

From creates a StateView from a SQL config. The returned StateView's Evolve wraps each call in a database transaction.

func Rebuild

func Rebuild[E any](ctx context.Context, db *sql.DB, cfg RebuildConfig[E]) error

Rebuild performs a zero-downtime projection rebuild using shadow tables.

It creates a shadow table, replays all events into it, then atomically swaps the shadow table for the live table using ALTER TABLE RENAME. Readers see consistent data throughout the rebuild — either the old complete table or the new complete table, never partial state.

If the rebuild fails or is cancelled, the live table is untouched. The shadow table can be dropped manually and the rebuild retried.

Example:

err := sqlview.Rebuild(ctx, db, sqlview.RebuildConfig[domain.Event]{
    Live:   projections.AccountList,
    Shadow: projections.AccountListRebuild, // same logic, targets "account_list_rebuild"
    Table:  "account_list",
    Reader: store,
    Checkpoint: checkpointStore,
})

Types

type Config

type Config[E any] struct {
	// Name identifies this projection (used for checkpoint tracking).
	Name string

	// EventTypes lists the event types this view cares about.
	EventTypes []string

	// Setup initializes the read model (e.g., CREATE TABLE IF NOT EXISTS).
	// Called once when the subscription starts. Optional.
	Setup func(ctx context.Context, db *sql.DB) error

	// Reset clears the read model for rebuild (e.g., DELETE FROM table).
	// Called before replaying from position 0. Optional.
	Reset func(ctx context.Context, db *sql.DB) error

	// Evolve applies a single event to the read model within a transaction.
	// The transaction is committed automatically if Evolve returns nil.
	Evolve func(ctx context.Context, tx *sql.Tx, event eskit.Event[E]) error

	// CheckpointInTx saves the subscription checkpoint within the same
	// transaction as Evolve, making them atomic. This prevents double
	// processing of non-idempotent side effects on crash recovery.
	// Requires a "checkpoints" table with (name TEXT PRIMARY KEY, position INTEGER, updated_at DATETIME).
	CheckpointInTx bool

	// OnChange is called after Evolve's transaction commits successfully.
	// Use this to notify SSE handlers or watchers that the projection changed.
	// Receives the notification key (see GetOnChangeKey) and EventType strings.
	// Must be non-blocking — slow consumers should be dropped, not waited on.
	OnChange func(streamID string, eventType string)

	// GetOnChangeKey, if set, derives the notification key from the event.
	// The returned string is passed to OnChange as the streamID parameter,
	// enabling entity-level ChangeNotifier.Watch subscriptions.
	// If nil, defaults to event.StreamID.
	GetOnChangeKey func(event eskit.Event[E]) string
}

Config defines a SQL-backed projection.

Each field maps to the corresponding StateView field, but with SQL-specific signatures: Setup and Reset receive *sql.DB, Evolve receives *sql.Tx.

type RebuildConfig

type RebuildConfig[E any] struct {
	// Live is the currently-active projection config.
	// Only Live.Name is used (to identify the checkpoint to update on swap).
	Live Config[E]

	// Shadow is the rebuild projection config.
	// Shadow.Setup must create the shadow table (e.g., CREATE TABLE order_list_rebuild ...).
	// Shadow.Evolve must write to the shadow table.
	// Shadow.Name should differ from Live.Name (e.g., "order-list-rebuild").
	Shadow Config[E]

	// Table is the live table name (e.g., "order_list").
	Table string

	// ShadowTable is the shadow table name. Defaults to "{Table}_rebuild" if empty.
	ShadowTable string

	// Reader provides access to the global event stream.
	Reader subscription.GlobalReader[E]

	// Checkpoint tracks consumer positions.
	Checkpoint subscription.Checkpoint

	// BatchSize is events per poll during catch-up. Default: 100.
	BatchSize int

	// Logger for operational logging. Optional.
	Logger *slog.Logger
}

RebuildConfig configures a zero-downtime projection rebuild using shadow tables.

The rebuild process:

  1. Creates a shadow table by calling Shadow.Setup
  2. Replays all events from position 0 into the shadow table via Shadow.Evolve
  3. Once caught up, atomically swaps the shadow table for the live table
  4. Readers see consistent data throughout — either old or new, never partial

The Shadow config should target a different table name than the live projection (e.g., "order_list_rebuild" instead of "order_list"). The swap renames the tables so the shadow becomes the live table.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL