Documentation
¶
Overview ¶
Package sqlitestore provides SQLite-backed checkpoint storage for subscriptions.
SQLiteNotifier implements subscription.StoreNotifier for SQLite stores.
Two strategies are supported:
- Callback-based (default): The store calls Signal() after each append. This is instant and works with both :memory: and file-based databases.
- Polling-based: Polls PRAGMA data_version to detect external writes. Useful when multiple processes write to the same file-based database.
For single-process use (the common case), callback-based is recommended. The Store's WithNotifier option wires this up automatically.
Package sqlitestore provides a SQLite-backed EventStore implementation.
Index ¶
- Constants
- type CheckpointStore
- type NotifierOption
- type Option
- func WithCodec[E any](c codec.Codec) Option[E]
- func WithCodecRegistry[E any](r *codec.Registry) Option[E]
- func WithRegistry[E any](reg *eskit.EventRegistry) Option[E]
- func WithStoreNotifier[E any](n *SQLiteNotifier) Option[E]
- func WithUpcasters[E any](u *eskit.UpcasterRegistry) Option[E]
- func WithWriteCodec[E any](c codec.Codec) Option[E]
- type SQLiteNotifier
- type SnapshotStore
- func (s *SnapshotStore[S]) Invalidate(ctx context.Context, streamID string) error
- func (s *SnapshotStore[S]) InvalidateAll(ctx context.Context) error
- func (s *SnapshotStore[S]) LoadSnapshot(ctx context.Context, streamID string) (*eskit.Snapshot[S], error)
- func (s *SnapshotStore[S]) SaveSnapshot(ctx context.Context, snapshot eskit.Snapshot[S]) error
- type Store
- func (s *Store[E]) Append(ctx context.Context, streamID string, expectedVersion int, events []E, ...) ([]eskit.Event[E], error)
- func (s *Store[E]) AppendWithOptions(ctx context.Context, streamID string, expectedVersion int, events []E, ...) ([]eskit.Event[E], error)
- func (s *Store[E]) Archive(ctx context.Context, streamID string, target eskit.EventStore[E]) error
- func (s *Store[E]) ArchiveStream(ctx context.Context, streamID string) error
- func (s *Store[E]) Close() error
- func (s *Store[E]) DB() *sql.DB
- func (s *Store[E]) Delete(ctx context.Context, streamID string) error
- func (s *Store[E]) DeleteStream(ctx context.Context, streamID string) error
- func (s *Store[E]) IsTombstoned(ctx context.Context, streamID string) (*eskit.Tombstone, error)
- func (s *Store[E]) LatestSequence(ctx context.Context) (uint64, error)
- func (s *Store[E]) Load(ctx context.Context, streamID string) ([]eskit.Event[E], error)
- func (s *Store[E]) LoadFrom(ctx context.Context, streamID string, fromVersion int) ([]eskit.Event[E], error)
- func (s *Store[E]) LoadRaw(ctx context.Context, streamID string) ([]*eskit.RawEvent, error)
- func (s *Store[E]) LoadRawWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]*eskit.RawEvent, error)
- func (s *Store[E]) LoadWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]eskit.Event[E], error)
- func (s *Store[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error)
- func (s *Store[E]) ReadFromWithOptions(ctx context.Context, fromSequence uint64, limit int, opts eskit.LoadOptions) ([]eskit.Event[E], error)
- func (s *Store[E]) Restore(ctx context.Context, streamID string, source eskit.EventStore[E]) error
- func (s *Store[E]) RestoreStream(ctx context.Context, streamID string) error
- func (s *Store[E]) StreamStatus(ctx context.Context, streamID string) (eskit.StreamState, error)
- func (s *Store[E]) Tombstone(ctx context.Context, streamID string, reason string) error
- func (s *Store[E]) TombstoneStream(ctx context.Context, streamID string) error
Constants ¶
const ( // DefaultPollInterval is the default polling interval for data_version changes. DefaultPollInterval = 50 * time.Millisecond )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CheckpointStore ¶
type CheckpointStore struct {
// contains filtered or unexported fields
}
CheckpointStore is a durable, SQLite-backed checkpoint for event subscriptions. It tracks each consumer's last processed global sequence number, enabling crash recovery without reprocessing the entire event stream.
Thread-safe: all methods are safe for concurrent use (SQLite serializes writes).
func NewCheckpointStore ¶
func NewCheckpointStore(db *sql.DB) (*CheckpointStore, error)
NewCheckpointStore creates a SQLite-backed checkpoint store. Creates the checkpoints table if it doesn't exist.
type NotifierOption ¶
type NotifierOption func(*SQLiteNotifier)
NotifierOption configures an SQLiteNotifier.
func WithPollInterval ¶
func WithPollInterval(d time.Duration) NotifierOption
WithPollInterval sets the polling interval for data_version checks. Only relevant when Start() is called for polling mode.
type Option ¶
Option configures a SQLite store.
func WithCodec ¶
WithUpcasters enables event upcasting for schema evolution during Load. WithCodec sets a custom codec for event serialization and registers it for reads. By default, events are serialized as JSON using encoding/json. Use this to plug in CBOR, msgpack, or any custom format. For multi-codec migration, use WithWriteCodec and WithCodecRegistry instead.
func WithCodecRegistry ¶
WithCodecRegistry sets the registry used to look up codecs when reading events. Each event stores which codec was used to serialize it; the registry maps codec names back to implementations for deserialization. If not set, a default registry with JSON, JSONiter, and CBOR is used.
func WithRegistry ¶
func WithRegistry[E any](reg *eskit.EventRegistry) Option[E]
WithRegistry enables type registry for heterogeneous event deserialization.
func WithStoreNotifier ¶
func WithStoreNotifier[E any](n *SQLiteNotifier) Option[E]
WithStoreNotifier wires up an SQLiteNotifier to signal after each Append. This provides instant notification without polling.
func WithUpcasters ¶
func WithUpcasters[E any](u *eskit.UpcasterRegistry) Option[E]
type SQLiteNotifier ¶
type SQLiteNotifier struct {
// contains filtered or unexported fields
}
SQLiteNotifier detects database changes via callback or PRAGMA data_version polling. Safe for concurrent use.
func NewNotifier ¶
func NewNotifier(db *sql.DB, opts ...NotifierOption) *SQLiteNotifier
NewNotifier creates an SQLiteNotifier. If db is non-nil, Start() enables PRAGMA data_version polling for external writes. Signal() can always be called directly for instant notification.
func (*SQLiteNotifier) Close ¶
func (n *SQLiteNotifier) Close() error
Close stops the notifier and closes all listener channels.
func (*SQLiteNotifier) Notify ¶
func (n *SQLiteNotifier) Notify(ctx context.Context) <-chan uint64
Notify returns a channel that receives the latest global sequence when new events are appended. Implements subscription.StoreNotifier.
func (*SQLiteNotifier) Signal ¶
func (n *SQLiteNotifier) Signal(sequence uint64)
Signal notifies all listeners that new events are available at the given sequence. Call this after appending events. Non-blocking.
func (*SQLiteNotifier) Start ¶
func (n *SQLiteNotifier) Start(ctx context.Context)
Start begins polling PRAGMA data_version for external database changes. Blocks until ctx is cancelled or Close is called. Optional — not needed if all writes go through a store with WithNotifier.
func (*SQLiteNotifier) Wait ¶
func (n *SQLiteNotifier) Wait()
Wait blocks until the notifier has fully stopped.
type SnapshotStore ¶
type SnapshotStore[S any] struct { // contains filtered or unexported fields }
SnapshotStore is a SQLite-backed eskit.SnapshotStore implementation. Stores snapshots as JSON with schema versioning and timestamps.
func NewSnapshotStore ¶
func NewSnapshotStore[S any](db *sql.DB) (*SnapshotStore[S], error)
NewSnapshotStore creates a SQLite-backed snapshot store. Automatically creates the snapshots table with schema_version and created_at columns.
func (*SnapshotStore[S]) Invalidate ¶
func (s *SnapshotStore[S]) Invalidate(ctx context.Context, streamID string) error
Invalidate deletes the snapshot for a single stream.
func (*SnapshotStore[S]) InvalidateAll ¶
func (s *SnapshotStore[S]) InvalidateAll(ctx context.Context) error
InvalidateAll deletes all snapshots.
func (*SnapshotStore[S]) LoadSnapshot ¶
func (*SnapshotStore[S]) SaveSnapshot ¶
type Store ¶
type Store[E any] struct { // contains filtered or unexported fields }
Store is a SQLite-backed event store. Events are serialized as JSON.
func New ¶
New creates a new SQLite event store. The dsn is a SQLite connection string (e.g., "file:events.db" or ":memory:" for testing).
func (*Store[E]) AppendWithOptions ¶
func (s *Store[E]) AppendWithOptions(ctx context.Context, streamID string, expectedVersion int, events []E, opts eskit.AppendOptions) ([]eskit.Event[E], error)
AppendWithOptions persists events with idempotency and custom timestamp support.
func (*Store[E]) ArchiveStream ¶
ArchiveStream marks a stream as archived. Future appends are rejected.
func (*Store[E]) Delete ¶
Delete permanently removes all events for a stream. Returns ErrStreamNotFound if stream does not exist. Also removes associated snapshots and tombstones.
func (*Store[E]) DeleteStream ¶
DeleteStream permanently removes all events in a stream.
func (*Store[E]) IsTombstoned ¶
IsTombstoned checks if a stream has been tombstoned. Returns nil, nil if the stream is not tombstoned.
func (*Store[E]) LatestSequence ¶
LatestSequence returns the highest global sequence (rowid) in the store, or 0 if empty.
func (*Store[E]) LoadRaw ¶
LoadRaw loads events without deserializing the Data field. Returns RawEvent values that can be selectively decoded on demand. This is significantly faster when you only need metadata or a subset of events.
func (*Store[E]) LoadRawWithOptions ¶
func (s *Store[E]) LoadRawWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]*eskit.RawEvent, error)
LoadRawWithOptions loads raw events with optional filtering.
func (*Store[E]) LoadWithOptions ¶
func (s *Store[E]) LoadWithOptions(ctx context.Context, streamID string, opts eskit.LoadOptions) ([]eskit.Event[E], error)
LoadWithOptions loads events with server-side filtering (event types, version range, limit).
func (*Store[E]) ReadFrom ¶
func (s *Store[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]eskit.Event[E], error)
ReadFrom implements GlobalReader — reads events across all streams by global sequence. Uses SQLite's rowid (aliased as id via INTEGER PRIMARY KEY AUTOINCREMENT) as global sequence.
func (*Store[E]) ReadFromWithOptions ¶
func (s *Store[E]) ReadFromWithOptions(ctx context.Context, fromSequence uint64, limit int, opts eskit.LoadOptions) ([]eskit.Event[E], error)
ReadFromWithOptions reads global events with optional event type filtering.
func (*Store[E]) Restore ¶
Restore moves an archived stream back from the source store to the primary.
func (*Store[E]) RestoreStream ¶
RestoreStream brings an archived stream back to active state.
func (*Store[E]) StreamStatus ¶
StreamStatus returns the current lifecycle state of a stream.