postgres

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package postgres provides a PostgreSQL implementation for the event store.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsUniqueViolation

func IsUniqueViolation(err error) bool

IsUniqueViolation checks if an error is a PostgreSQL unique constraint violation. This is exported for testing purposes. Driver-agnostic: works with pq, pgx, and database/sql.

Types

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store is a PostgreSQL-backed event store implementation.

func NewStore

func NewStore(config *StoreConfig) *Store

NewStore creates a new PostgreSQL event store with the given configuration.

func (*Store) Append

func (s *Store) Append(ctx context.Context, tx *sql.Tx, expectedVersion store.ExpectedVersion, events []store.Event) (store.AppendResult, error)

Append implements store.EventStore. It automatically assigns aggregate versions using the aggregate_heads table for O(1) lookup. The expectedVersion parameter controls optimistic concurrency validation. The database constraint on (aggregate_type, aggregate_id, aggregate_version) enforces optimistic concurrency as a safety net - if another transaction commits between our version check and insert, the insert will fail with a unique constraint violation.

func (*Store) GetLatestGlobalPosition

func (s *Store) GetLatestGlobalPosition(ctx context.Context, tx *sql.Tx) (int64, error)

GetLatestGlobalPosition implements store.GlobalPositionReader.

func (*Store) ReadAggregateStream

func (s *Store) ReadAggregateStream(ctx context.Context, tx *sql.Tx, aggregateType, aggregateID string, fromVersion, toVersion *int64) (store.Stream, error)

ReadAggregateStream implements store.AggregateStreamReader.

func (*Store) ReadEvents

func (s *Store) ReadEvents(ctx context.Context, tx *sql.Tx, fromPosition int64, limit int) ([]store.PersistedEvent, error)

ReadEvents implements store.EventReader.

type StoreConfig

type StoreConfig struct {
	// Logger is an optional logger for observability.
	// If nil, logging is disabled (zero overhead).
	Logger store.Logger

	// EventsTable is the name of the events table
	EventsTable string

	// AggregateHeadsTable is the name of the aggregate version tracking table
	AggregateHeadsTable string

	// NotifyChannel is the Postgres NOTIFY channel name for event append notifications.
	// When set, Append() executes pg_notify within the same transaction, so the
	// notification fires only when the transaction commits.
	// Leave empty to disable notifications.
	NotifyChannel string
}

StoreConfig contains configuration for the PostgreSQL event store. Configuration is immutable after construction.

func DefaultStoreConfig

func DefaultStoreConfig() *StoreConfig

DefaultStoreConfig returns the default configuration.

func NewStoreConfig

func NewStoreConfig(opts ...StoreOption) *StoreConfig

NewStoreConfig creates a new store configuration with functional options. It starts with the default configuration and applies the given options.

Example:

config := postgres.NewStoreConfig(
    postgres.WithLogger(myLogger),
    postgres.WithEventsTable("custom_events"),
)

type StoreOption

type StoreOption func(*StoreConfig)

StoreOption is a functional option for configuring a Store.

func WithAggregateHeadsTable

func WithAggregateHeadsTable(tableName string) StoreOption

WithAggregateHeadsTable sets a custom aggregate heads table name.

func WithEventsTable

func WithEventsTable(tableName string) StoreOption

WithEventsTable sets a custom events table name.

func WithLogger

func WithLogger(logger store.Logger) StoreOption

WithLogger sets a logger for the store.

func WithNotifyChannel

func WithNotifyChannel(channel string) StoreOption

WithNotifyChannel sets the Postgres NOTIFY channel for event append notifications. When configured, each Append() call issues pg_notify within the same transaction, so the notification fires only when the transaction commits.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL