Documentation
¶
Overview ¶
Package sqlview provides SQL-backed StateView projections with transactional event processing.
sqlview bridges the gap between eskit's StateView pattern and SQL databases. Each projection's Evolve function receives a *sql.Tx, ensuring that read model updates are atomic. Setup and Reset functions receive a *sql.DB for DDL operations.
Level 1: Simple (in-memory) ¶
Use eskit.StateView directly with closures over your data structures. Good for prototyping, tests, and small datasets.
Level 2: SQL (this package) ¶
Use sqlview.Config to define projections that write to SQL tables. sqlview.From() wraps the config into an eskit.StateView with TX management. Good for production: queryable, survives restarts, rebuildable.
Atomic Checkpoint ¶
For non-idempotent projections, set CheckpointInTx: true on the Config. This saves the subscription checkpoint within the same transaction as Evolve, preventing double processing on crash recovery.
Example ¶
var BalanceView = sqlview.Config[domain.Event]{
Name: "account-balance",
EventTypes: []string{"MoneyDeposited", "MoneyWithdrawn"},
Setup: func(ctx context.Context, db *sql.DB) error {
_, err := db.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS balances (
account_id TEXT PRIMARY KEY, balance INTEGER NOT NULL DEFAULT 0)`)
return err
},
Evolve: func(ctx context.Context, tx *sql.Tx, event eskit.Event[domain.Event]) error {
switch e := event.Data.(type) {
case domain.MoneyDeposited:
_, err := tx.ExecContext(ctx, `INSERT INTO balances ...`, ...)
return err
}
return nil
},
}
view := sqlview.From(db, BalanceView)
sub := subscription.FromStateView(view, reader, checkpoint)
Zero-Downtime Rebuild ¶
Use sqlview.Rebuild to rebuild a projection without downtime. It creates a shadow table, replays all events into it, then atomically swaps the tables:
err := sqlview.Rebuild(ctx, db, sqlview.RebuildConfig[domain.Event]{
Live: BalanceView,
Shadow: BalanceViewRebuild, // same logic, targets shadow table
Table: "balances",
Reader: reader,
Checkpoint: checkpoint,
})
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func From ¶
From creates a StateView from a SQL config. The returned StateView's Evolve wraps each call in a database transaction.
func Rebuild ¶
Rebuild performs a zero-downtime projection rebuild using shadow tables.
It creates a shadow table, replays all events into it, then atomically swaps the shadow table for the live table using ALTER TABLE RENAME. Readers see consistent data throughout the rebuild — either the old complete table or the new complete table, never partial state.
If the rebuild fails or is cancelled, the live table is untouched. The shadow table can be dropped manually and the rebuild retried.
Example:
err := sqlview.Rebuild(ctx, db, sqlview.RebuildConfig[domain.Event]{
Live: projections.AccountList,
Shadow: projections.AccountListRebuild, // same logic, targets "account_list_rebuild"
Table: "account_list",
Reader: store,
Checkpoint: checkpointStore,
})
Types ¶
type Config ¶
type Config[E any] struct { // Name identifies this projection (used for checkpoint tracking). Name string // EventTypes lists the event types this view cares about. EventTypes []string // Setup initializes the read model (e.g., CREATE TABLE IF NOT EXISTS). // Called once when the subscription starts. Optional. Setup func(ctx context.Context, db *sql.DB) error // Reset clears the read model for rebuild (e.g., DELETE FROM table). // Called before replaying from position 0. Optional. Reset func(ctx context.Context, db *sql.DB) error // Evolve applies a single event to the read model within a transaction. // The transaction is committed automatically if Evolve returns nil. Evolve func(ctx context.Context, tx *sql.Tx, event eskit.Event[E]) error // CheckpointInTx saves the subscription checkpoint within the same // transaction as Evolve, making them atomic. This prevents double // processing of non-idempotent side effects on crash recovery. // Requires a "checkpoints" table with (name TEXT PRIMARY KEY, position INTEGER, updated_at DATETIME). CheckpointInTx bool // OnChange is called after Evolve's transaction commits successfully. // Use this to notify SSE handlers or watchers that the projection changed. // Receives the notification key (see GetOnChangeKey) and EventType strings. // Must be non-blocking — slow consumers should be dropped, not waited on. OnChange func(streamID string, eventType string) // GetOnChangeKey, if set, derives the notification key from the event. // The returned string is passed to OnChange as the streamID parameter, // enabling entity-level ChangeNotifier.Watch subscriptions. // If nil, defaults to event.StreamID. GetOnChangeKey func(event eskit.Event[E]) string }
Config defines a SQL-backed projection.
Each field maps to the corresponding StateView field, but with SQL-specific signatures: Setup and Reset receive *sql.DB, Evolve receives *sql.Tx.
type RebuildConfig ¶
type RebuildConfig[E any] struct { // Live is the currently-active projection config. // Only Live.Name is used (to identify the checkpoint to update on swap). Live Config[E] // Shadow is the rebuild projection config. // Shadow.Setup must create the shadow table (e.g., CREATE TABLE order_list_rebuild ...). // Shadow.Evolve must write to the shadow table. // Shadow.Name should differ from Live.Name (e.g., "order-list-rebuild"). Shadow Config[E] // Table is the live table name (e.g., "order_list"). Table string // ShadowTable is the shadow table name. Defaults to "{Table}_rebuild" if empty. ShadowTable string // Reader provides access to the global event stream. Reader subscription.GlobalReader[E] // Checkpoint tracks consumer positions. Checkpoint subscription.Checkpoint // BatchSize is events per poll during catch-up. Default: 100. BatchSize int // Logger for operational logging. Optional. Logger *slog.Logger }
RebuildConfig configures a zero-downtime projection rebuild using shadow tables.
The rebuild process:
- Creates a shadow table by calling Shadow.Setup
- Replays all events from position 0 into the shadow table via Shadow.Evolve
- Once caught up, atomically swaps the shadow table for the live table
- Readers see consistent data throughout — either old or new, never partial
The Shadow config should target a different table name than the live projection (e.g., "order_list_rebuild" instead of "order_list"). The swap renames the tables so the shadow becomes the live table.