sqlite

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package sqlite provides a SQLite adapter for event sourcing.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrProjectionStopped indicates the projection was stopped due to an error.
	ErrProjectionStopped = errors.New("projection stopped")
)

Functions

func IsUniqueViolation

func IsUniqueViolation(err error) bool

IsUniqueViolation checks if an error is a SQLite unique constraint violation. This is exported for testing purposes.

Types

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor processes events for projections using SQLite for checkpointing. This is the SQLite-specific implementation that manages transactions internally.

func NewProcessor

func NewProcessor(db *sql.DB, store *Store, config *projection.ProcessorConfig) *Processor

NewProcessor creates a new SQLite projection processor. The processor manages SQL transactions internally and coordinates checkpointing with event processing.

func (*Processor) Run

func (p *Processor) Run(ctx context.Context, proj projection.Projection) error

Run processes events for the given projection until the context is canceled. It reads events in batches, applies partition and aggregate type filters, and updates checkpoints. Returns an error if the projection handler fails.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is a SQLite-backed event store implementation.

func NewStore

func NewStore(config StoreConfig) *Store

NewStore creates a new SQLite event store with the given configuration.

func (*Store) Append

func (s *Store) Append(ctx context.Context, tx es.DBTX, expectedVersion es.ExpectedVersion, events []es.Event) (es.AppendResult, error)

Append implements store.EventStore. It automatically assigns aggregate versions using the aggregate_heads table for O(1) lookup. The expectedVersion parameter controls optimistic concurrency validation. The database constraint on (aggregate_type, aggregate_id, aggregate_version) enforces optimistic concurrency as a safety net - if another transaction commits between our version check and insert, the insert will fail with a unique constraint violation.

func (*Store) GetCheckpoint

func (s *Store) GetCheckpoint(ctx context.Context, tx es.DBTX, projectionName string) (int64, error)

GetCheckpoint implements store.CheckpointStore.

func (*Store) ReadAggregateStream

func (s *Store) ReadAggregateStream(ctx context.Context, tx es.DBTX, boundedContext, aggregateType, aggregateID string, fromVersion, toVersion *int64) (es.Stream, error)

ReadAggregateStream implements store.AggregateStreamReader.

func (*Store) ReadEvents

func (s *Store) ReadEvents(ctx context.Context, tx es.DBTX, fromPosition int64, limit int) ([]es.PersistedEvent, error)

ReadEvents implements store.EventReader.

func (*Store) UpdateCheckpoint

func (s *Store) UpdateCheckpoint(ctx context.Context, tx es.DBTX, projectionName string, position int64) error

UpdateCheckpoint implements store.CheckpointStore.

type StoreConfig

type StoreConfig struct {
	// Logger is an optional logger for observability.
	// If nil, logging is disabled (zero overhead).
	Logger es.Logger

	// EventsTable is the name of the events table
	EventsTable string

	// CheckpointsTable is the name of the projection checkpoints table
	CheckpointsTable string

	// AggregateHeadsTable is the name of the aggregate version tracking table
	AggregateHeadsTable string
}

StoreConfig contains configuration for the SQLite event store. Configuration is immutable after construction.

func DefaultStoreConfig

func DefaultStoreConfig() StoreConfig

DefaultStoreConfig returns the default configuration.

func NewStoreConfig

func NewStoreConfig(opts ...StoreOption) StoreConfig

NewStoreConfig creates a new store configuration with functional options. It starts with the default configuration and applies the given options.

Example:

config := sqlite.NewStoreConfig(
    sqlite.WithLogger(myLogger),
    sqlite.WithEventsTable("custom_events"),
)

type StoreOption

type StoreOption func(*StoreConfig)

StoreOption is a functional option for configuring a Store.

func WithAggregateHeadsTable

func WithAggregateHeadsTable(tableName string) StoreOption

WithAggregateHeadsTable sets a custom aggregate heads table name.

func WithCheckpointsTable

func WithCheckpointsTable(tableName string) StoreOption

WithCheckpointsTable sets a custom projection checkpoints table name.

func WithEventsTable

func WithEventsTable(tableName string) StoreOption

WithEventsTable sets a custom events table name.

func WithLogger

func WithLogger(logger es.Logger) StoreOption

WithLogger sets a logger for the store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL