Documentation
¶
Overview ¶
Package postgres provides a PostgreSQL implementation for the event store.
Index ¶
- func IsUniqueViolation(err error) bool
- type Store
- func (s *Store) Append(ctx context.Context, tx *sql.Tx, expectedVersion store.ExpectedVersion, ...) (store.AppendResult, error)
- func (s *Store) GetLatestGlobalPosition(ctx context.Context, tx *sql.Tx) (int64, error)
- func (s *Store) ReadAggregateStream(ctx context.Context, tx *sql.Tx, aggregateType, aggregateID string, ...) (store.Stream, error)
- func (s *Store) ReadEvents(ctx context.Context, tx *sql.Tx, fromPosition int64, limit int) ([]store.PersistedEvent, error)
- type StoreConfig
- type StoreOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsUniqueViolation ¶
IsUniqueViolation checks if an error is a PostgreSQL unique constraint violation. This is exported for testing purposes. Driver-agnostic: works with pq, pgx, and database/sql.
Types ¶
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store is a PostgreSQL-backed event store implementation.
func NewStore ¶
func NewStore(config *StoreConfig) *Store
NewStore creates a new PostgreSQL event store with the given configuration.
func (*Store) Append ¶
func (s *Store) Append(ctx context.Context, tx *sql.Tx, expectedVersion store.ExpectedVersion, events []store.Event) (store.AppendResult, error)
Append implements store.EventStore. It automatically assigns aggregate versions using the aggregate_heads table for O(1) lookup. The expectedVersion parameter controls optimistic concurrency validation. The database constraint on (aggregate_type, aggregate_id, aggregate_version) enforces optimistic concurrency as a safety net - if another transaction commits between our version check and insert, the insert will fail with a unique constraint violation.
func (*Store) GetLatestGlobalPosition ¶
GetLatestGlobalPosition implements store.GlobalPositionReader.
type StoreConfig ¶
type StoreConfig struct {
// Logger is an optional logger for observability.
// If nil, logging is disabled (zero overhead).
Logger store.Logger
// EventsTable is the name of the events table
EventsTable string
// AggregateHeadsTable is the name of the aggregate version tracking table
AggregateHeadsTable string
// NotifyChannel is the Postgres NOTIFY channel name for event append notifications.
// When set, Append() executes pg_notify within the same transaction, so the
// notification fires only when the transaction commits.
// Leave empty to disable notifications.
NotifyChannel string
}
StoreConfig contains configuration for the PostgreSQL event store. Configuration is immutable after construction.
func DefaultStoreConfig ¶
func DefaultStoreConfig() *StoreConfig
DefaultStoreConfig returns the default configuration.
func NewStoreConfig ¶
func NewStoreConfig(opts ...StoreOption) *StoreConfig
NewStoreConfig creates a new store configuration with functional options. It starts with the default configuration and applies the given options.
Example:
config := postgres.NewStoreConfig(
postgres.WithLogger(myLogger),
postgres.WithEventsTable("custom_events"),
)
type StoreOption ¶
type StoreOption func(*StoreConfig)
StoreOption is a functional option for configuring a Store.
func WithAggregateHeadsTable ¶
func WithAggregateHeadsTable(tableName string) StoreOption
WithAggregateHeadsTable sets a custom aggregate heads table name.
func WithEventsTable ¶
func WithEventsTable(tableName string) StoreOption
WithEventsTable sets a custom events table name.
func WithLogger ¶
func WithLogger(logger store.Logger) StoreOption
WithLogger sets a logger for the store.
func WithNotifyChannel ¶
func WithNotifyChannel(channel string) StoreOption
WithNotifyChannel sets the Postgres NOTIFY channel for event append notifications. When configured, each Append() call issues pg_notify within the same transaction, so the notification fires only when the transaction commits.