sqlitestore

package module
v0.0.0-...-17eb08c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package sqlitestore provides SQLite-backed checkpoint storage for subscriptions.

SQLiteNotifier implements subscription.StoreNotifier for SQLite stores.

Two strategies are supported:

  1. Callback-based (default): The store calls Signal() after each append. This is instant and works with both :memory: and file-based databases.
  2. Polling-based: Polls PRAGMA data_version to detect external writes. Useful when multiple processes write to the same file-based database.

For single-process use (the common case), callback-based is recommended. The Store's WithNotifier option wires this up automatically.

Package sqlitestore provides a SQLite-backed EventStore implementation.

Index

Constants

View Source
const (
	// DefaultPollInterval is the default polling interval for data_version changes.
	DefaultPollInterval = 50 * time.Millisecond
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CheckpointStore

type CheckpointStore struct {
	// contains filtered or unexported fields
}

CheckpointStore is a durable, SQLite-backed checkpoint for event subscriptions. It tracks each consumer's last processed global sequence number, enabling crash recovery without reprocessing the entire event stream.

Thread-safe: all methods are safe for concurrent use (SQLite serializes writes).

func NewCheckpointStore

func NewCheckpointStore(db *sql.DB) (*CheckpointStore, error)

NewCheckpointStore creates a SQLite-backed checkpoint store. Creates the checkpoints table if it doesn't exist.

func (*CheckpointStore) Load

func (c *CheckpointStore) Load(ctx context.Context, consumerID string) (uint64, error)

Load returns the last processed sequence for a consumer. Returns 0 if new.

func (*CheckpointStore) Save

func (c *CheckpointStore) Save(ctx context.Context, consumerID string, sequence uint64) error

Save persists the consumer's position atomically.

type NotifierOption

type NotifierOption func(*SQLiteNotifier)

NotifierOption configures an SQLiteNotifier.

func WithPollInterval

func WithPollInterval(d time.Duration) NotifierOption

WithPollInterval sets the polling interval for data_version checks. Only relevant when Start() is called for polling mode.

type Option

type Option[E any] func(*Store[E])

Option configures a SQLite store.

func WithCodec

func WithCodec[E any](c codec.Codec) Option[E]

WithUpcasters enables event upcasting for schema evolution during Load. WithCodec sets a custom codec for event serialization and registers it for reads. By default, events are serialized as JSON using encoding/json. Use this to plug in CBOR, msgpack, or any custom format. For multi-codec migration, use WithWriteCodec and WithCodecRegistry instead.

func WithCodecRegistry

func WithCodecRegistry[E any](r *codec.Registry) Option[E]

WithCodecRegistry sets the registry used to look up codecs when reading events. Each event stores which codec was used to serialize it; the registry maps codec names back to implementations for deserialization. If not set, a default registry with JSON, JSONiter, and CBOR is used.

func WithRegistry

func WithRegistry[E any](reg *eskit.EventRegistry) Option[E]

WithRegistry enables type registry for heterogeneous event deserialization.

func WithStoreNotifier

func WithStoreNotifier[E any](n *SQLiteNotifier) Option[E]

WithStoreNotifier wires up an SQLiteNotifier to signal after each Append. This provides instant notification without polling.

func WithUpcasters

func WithUpcasters[E any](u *eskit.UpcasterRegistry) Option[E]

func WithWriteCodec

func WithWriteCodec[E any](c codec.Codec) Option[E]

WithWriteCodec sets the codec used for writing NEW events. Existing events are read using the codec stored in each event's codec column. Must be used with WithCodecRegistry to ensure the write codec is available for reads.

type SQLiteNotifier

type SQLiteNotifier struct {
	// contains filtered or unexported fields
}

SQLiteNotifier detects database changes via callback or PRAGMA data_version polling. Safe for concurrent use.

func NewNotifier

func NewNotifier(db *sql.DB, opts ...NotifierOption) *SQLiteNotifier

NewNotifier creates an SQLiteNotifier. If db is non-nil, Start() enables PRAGMA data_version polling for external writes. Signal() can always be called directly for instant notification.

func (*SQLiteNotifier) Close

func (n *SQLiteNotifier) Close() error

Close stops the notifier and closes all listener channels.

func (*SQLiteNotifier) Notify

func (n *SQLiteNotifier) Notify(ctx context.Context) <-chan uint64

Notify returns a channel that receives the latest global sequence when new events are appended. Implements subscription.StoreNotifier.

func (*SQLiteNotifier) Signal

func (n *SQLiteNotifier) Signal(sequence uint64)

Signal notifies all listeners that new events are available at the given sequence. Call this after appending events. Non-blocking.

func (*SQLiteNotifier) Start

func (n *SQLiteNotifier) Start(ctx context.Context)

Start begins polling PRAGMA data_version for external database changes. Blocks until ctx is cancelled or Close is called. Optional — not needed if all writes go through a store with WithNotifier.

func (*SQLiteNotifier) Wait

func (n *SQLiteNotifier) Wait()

Wait blocks until the notifier has fully stopped.

type SnapshotStore

type SnapshotStore[S any] struct {
	// contains filtered or unexported fields
}

SnapshotStore is a SQLite-backed eskit.SnapshotStore implementation. Stores snapshots as JSON with schema versioning and timestamps.

func NewSnapshotStore

func NewSnapshotStore[S any](db *sql.DB) (*SnapshotStore[S], error)

NewSnapshotStore creates a SQLite-backed snapshot store. Automatically creates the snapshots table with schema_version and created_at columns.

func (*SnapshotStore[S]) Invalidate

func (s *SnapshotStore[S]) Invalidate(ctx context.Context, streamID string) error

Invalidate deletes the snapshot for a single stream.

func (*SnapshotStore[S]) InvalidateAll

func (s *SnapshotStore[S]) InvalidateAll(ctx context.Context) error

InvalidateAll deletes all snapshots.

func (*SnapshotStore[S]) LoadSnapshot

func (s *SnapshotStore[S]) LoadSnapshot(ctx context.Context, streamID string) (*eskit.Snapshot[S], error)

func (*SnapshotStore[S]) SaveSnapshot

func (s *SnapshotStore[S]) SaveSnapshot(ctx context.Context, snapshot eskit.Snapshot[S]) error

type Store

type Store[E any] struct {
	// contains filtered or unexported fields
}

Store is a SQLite-backed event store. Events are serialized as JSON.

func New

func New[E any](dsn string, opts ...Option[E]) (*Store[E], error)

New creates a new SQLite event store. The dsn is a SQLite connection string (e.g., "file:events.db" or ":memory:" for testing).

func NewFromDB

func NewFromDB[E any](db *sql.DB, opts ...Option[E]) (*Store[E], error)

NewFromDB wraps an existing *sql.DB connection.

func (*Store[E]) Append

func (s *Store[E]) Append(ctx context.Context, streamID string, expectedVersion int, events []E, metadata ...eskit.Metadata) ([]eskit.Event[E], error)

func (*Store[E]) AppendWithOptions

func (s *Store[E]) AppendWithOptions(ctx context.Context, streamID string, expectedVersion int, events []E, opts eskit.AppendOptions) ([]eskit.Event[E], error)

AppendWithOptions persists events with idempotency and custom timestamp support.

func (*Store[E]) Archive

func (s *Store[E]) Archive(ctx context.Context, streamID string, target eskit.EventStore[E]) error

Archive moves a stream to the target store and tombstones the primary.

func (*Store[E]) ArchiveStream

func (s *Store[E]) ArchiveStream(ctx context.Context, streamID string) error

ArchiveStream marks a stream as archived. Future appends are rejected.

func (*Store[E]) Close

func (s *Store[E]) Close() error

Close closes the underlying database connection.

func (*Store[E]) DB

func (s *Store[E]) DB() *sql.DB

DB returns the underlying database connection for advanced use cases.

func (*Store[E]) Delete

func (s *Store[E]) Delete(ctx context.Context, streamID string) error

Delete permanently removes all events for a stream. Returns ErrStreamNotFound if stream does not exist. Also removes associated snapshots and tombstones.

func (*Store[E]) DeleteStream

func (s *Store[E]) DeleteStream(ctx context.Context, streamID string) error

DeleteStream permanently removes all events in a stream.

func (*Store[E]) IsTombstoned

func (s *Store[E]) IsTombstoned(ctx context.Context, streamID string) (*eskit.Tombstone, error)

IsTombstoned checks if a stream has been tombstoned. Returns nil, nil if the stream is not tombstoned.

func (*Store[E]) LatestSequence

func (s *Store[E]) LatestSequence(ctx context.Context) (uint64, error)

LatestSequence returns the highest global sequence (rowid) in the store, or 0 if empty.

func (*Store[E]) Load

func (s *Store[E]) Load(ctx context.Context, streamID string) ([]eskit.Event[E], error)

func (*Store[E]) LoadFrom

func (s *Store[E]) LoadFrom(ctx context.Context, streamID string, fromVersion int) ([]eskit.Event[E], error)

func (*Store[E]) LoadRaw

func (s *Store[E]) LoadRaw(ctx context.Context, streamID string) ([]*eskit.RawEvent, error)

LoadRaw loads events without deserializing the Data field. Returns RawEvent values that can be selectively decoded on demand. This is significantly faster when you only need metadata or a subset of events.

func (*Store[E]) LoadRawWithOptions

func (s *Store[E]) LoadRawWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]*eskit.RawEvent, error)

LoadRawWithOptions loads raw events with optional filtering.

func (*Store[E]) LoadWithOptions

func (s *Store[E]) LoadWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]eskit.Event[E], error)

LoadWithOptions loads events with server-side filtering (event types, version range, limit).

func (*Store[E]) ReadFrom

func (s *Store[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error)

ReadFrom implements GlobalReader — reads events across all streams by global sequence. Uses SQLite's rowid (aliased as id via INTEGER PRIMARY KEY AUTOINCREMENT) as global sequence.

func (*Store[E]) ReadFromWithOptions

func (s *Store[E]) ReadFromWithOptions(ctx context.Context, fromSequence uint64, limit int, opts eskit.LoadOptions) ([]eskit.Event[E], error)

ReadFromWithOptions reads global events with optional event type filtering.

func (*Store[E]) Restore

func (s *Store[E]) Restore(ctx context.Context, streamID string, source eskit.EventStore[E]) error

Restore moves an archived stream back from the source store to the primary.

func (*Store[E]) RestoreStream

func (s *Store[E]) RestoreStream(ctx context.Context, streamID string) error

RestoreStream brings an archived stream back to active state.

func (*Store[E]) StreamStatus

func (s *Store[E]) StreamStatus(ctx context.Context, streamID string) (eskit.StreamState, error)

StreamStatus returns the current lifecycle state of a stream.

func (*Store[E]) Tombstone

func (s *Store[E]) Tombstone(ctx context.Context, streamID string, reason string) error

Tombstone marks a stream as deleted. Future Append calls return ErrStreamDeleted. Load returns empty. The tombstone record remains for audit.

func (*Store[E]) TombstoneStream

func (s *Store[E]) TombstoneStream(ctx context.Context, streamID string) error

TombstoneStream marks a stream as deleted. Future appends are rejected.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL