Documentation
¶
Overview ¶
Package sqlite provides a SQLite adapter for event sourcing.
Index ¶
- Variables
- func IsUniqueViolation(err error) bool
- type Processor
- type Store
- func (s *Store) Append(ctx context.Context, tx es.DBTX, expectedVersion es.ExpectedVersion, ...) (es.AppendResult, error)
- func (s *Store) GetCheckpoint(ctx context.Context, tx es.DBTX, projectionName string) (int64, error)
- func (s *Store) ReadAggregateStream(ctx context.Context, tx es.DBTX, ...) (es.Stream, error)
- func (s *Store) ReadEvents(ctx context.Context, tx es.DBTX, fromPosition int64, limit int) ([]es.PersistedEvent, error)
- func (s *Store) UpdateCheckpoint(ctx context.Context, tx es.DBTX, projectionName string, position int64) error
- type StoreConfig
- type StoreOption
Constants ¶
This section is empty.
Variables ¶
var ( // ErrProjectionStopped indicates the projection was stopped due to an error. ErrProjectionStopped = errors.New("projection stopped") )
Functions ¶
func IsUniqueViolation ¶
IsUniqueViolation checks if an error is a SQLite unique constraint violation. This is exported for testing purposes.
Types ¶
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor processes events for projections using SQLite for checkpointing. This is the SQLite-specific implementation that manages transactions internally.
func NewProcessor ¶
func NewProcessor(db *sql.DB, store *Store, config *projection.ProcessorConfig) *Processor
NewProcessor creates a new SQLite projection processor. The processor manages SQL transactions internally and coordinates checkpointing with event processing.
func (*Processor) Run ¶
func (p *Processor) Run(ctx context.Context, proj projection.Projection) error
Run processes events for the given projection until the context is canceled. It reads events in batches, applies partition and aggregate type filters, and updates checkpoints. Returns an error if the projection handler fails.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store is a SQLite-backed event store implementation.
func NewStore ¶
func NewStore(config StoreConfig) *Store
NewStore creates a new SQLite event store with the given configuration.
func (*Store) Append ¶
func (s *Store) Append(ctx context.Context, tx es.DBTX, expectedVersion es.ExpectedVersion, events []es.Event) (es.AppendResult, error)
Append implements store.EventStore. It automatically assigns aggregate versions using the aggregate_heads table for O(1) lookup. The expectedVersion parameter controls optimistic concurrency validation. The database constraint on (aggregate_type, aggregate_id, aggregate_version) enforces optimistic concurrency as a safety net - if another transaction commits between our version check and insert, the insert will fail with a unique constraint violation.
func (*Store) GetCheckpoint ¶
func (s *Store) GetCheckpoint(ctx context.Context, tx es.DBTX, projectionName string) (int64, error)
GetCheckpoint implements store.CheckpointStore.
func (*Store) ReadAggregateStream ¶
func (s *Store) ReadAggregateStream(ctx context.Context, tx es.DBTX, boundedContext, aggregateType, aggregateID string, fromVersion, toVersion *int64) (es.Stream, error)
ReadAggregateStream implements store.AggregateStreamReader.
type StoreConfig ¶
type StoreConfig struct {
// Logger is an optional logger for observability.
// If nil, logging is disabled (zero overhead).
Logger es.Logger
// EventsTable is the name of the events table
EventsTable string
// CheckpointsTable is the name of the projection checkpoints table
CheckpointsTable string
// AggregateHeadsTable is the name of the aggregate version tracking table
AggregateHeadsTable string
}
StoreConfig contains configuration for the SQLite event store. Configuration is immutable after construction.
func DefaultStoreConfig ¶
func DefaultStoreConfig() StoreConfig
DefaultStoreConfig returns the default configuration.
func NewStoreConfig ¶
func NewStoreConfig(opts ...StoreOption) StoreConfig
NewStoreConfig creates a new store configuration with functional options. It starts with the default configuration and applies the given options.
Example:
config := sqlite.NewStoreConfig(
sqlite.WithLogger(myLogger),
sqlite.WithEventsTable("custom_events"),
)
type StoreOption ¶
type StoreOption func(*StoreConfig)
StoreOption is a functional option for configuring a Store.
func WithAggregateHeadsTable ¶
func WithAggregateHeadsTable(tableName string) StoreOption
WithAggregateHeadsTable sets a custom aggregate heads table name.
func WithCheckpointsTable ¶
func WithCheckpointsTable(tableName string) StoreOption
WithCheckpointsTable sets a custom projection checkpoints table name.
func WithEventsTable ¶
func WithEventsTable(tableName string) StoreOption
WithEventsTable sets a custom events table name.
func WithLogger ¶
func WithLogger(logger es.Logger) StoreOption
WithLogger sets a logger for the store.