postgresengine

package
v1.2.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2025 License: GPL-3.0 Imports: 13 Imported by: 0

Documentation

Overview

Package postgresengine provides a PostgreSQL implementation of the eventstore interface.

This package implements dynamic event streams using PostgreSQL as the storage backend, supporting multiple database adapters (pgx, sql.DB, sqlx) with atomic operations and concurrency control.

Key features:

  • Multiple database adapter support (PGX, SQL, SQLX)
  • Atomic event appending with concurrency conflict detection
  • Dynamic event stream filtering with JSON predicate support
  • Configurable table names and structured logging support
  • OpenTelemetry-compatible metrics for comprehensive observability
  • Transaction-safe operations with proper resource cleanup

Usage examples:

// Basic usage
db, _ := pgxpool.New(context.Background(), dsn)
store, _ := postgresengine.NewEventStoreFromPGXPool(db)

// With logging and metrics (production observability)
store, _ := postgresengine.NewEventStoreFromPGXPool(
	db,
	postgresengine.WithTableName("my_events"),
	postgresengine.WithLogger(logger),
	postgresengine.WithMetrics(metricsCollector),
)

events, maxSeq, _ := store.Query(ctx, filter)
err := store.Append(ctx, filter, maxSeq, newEvent)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore represents a storage mechanism for handling and querying events in an event sourcing implementation. It leverages a database adapter and supports customizable logging, metricsCollector collection, and event table configuration.

func NewEventStoreFromPGXPool

func NewEventStoreFromPGXPool(db *pgxpool.Pool, options ...Option) (EventStore, error)

NewEventStoreFromPGXPool creates a new EventStore using a pgx Pool with optional configuration.

func NewEventStoreFromSQLDB

func NewEventStoreFromSQLDB(db *sql.DB, options ...Option) (EventStore, error)

NewEventStoreFromSQLDB creates a new EventStore using a sql.DB with optional configuration.

func NewEventStoreFromSQLX

func NewEventStoreFromSQLX(db *sqlx.DB, options ...Option) (EventStore, error)

NewEventStoreFromSQLX creates a new EventStore using a sqlx.DB with optional configuration.

func (EventStore) Append

func (es EventStore) Append(
	ctx context.Context,
	filter eventstore.Filter,
	expectedMaxSequenceNumber eventstore.MaxSequenceNumberUint,
	event eventstore.StorableEvent,
	additionalEvents ...eventstore.StorableEvent,
) error

Append attempts to append one or multiple eventstore.StorableEvent(s) onto the Postgres event store respecting concurrency constraints for this "dynamic event stream" based on the provided eventstore.Filter criteria and the expected MaxSequenceNumberUint.

The provided eventstore.Filter criteria should be the same as the ones used for the Query before making the business decisions.

The insert query to append multiple events atomically is heavier than the one built to append a single event. In event-sourced applications, one command/request should typically only produce one event. Only supply multiple events if you are sure that you need to append multiple events at once!

func (EventStore) Query

Query retrieves events from the Postgres event store based on the provided eventstore.Filter criteria and returns them as eventstore.StorableEvents as well as the MaxSequenceNumberUint for this "dynamic event stream" at the time of the query.

type Logger

type Logger interface {
	Debug(msg string, args ...any)
	Info(msg string, args ...any)
	Warn(msg string, args ...any)
	Error(msg string, args ...any)
}

Logger interface for SQL query logging, operational metricsCollector, warnings, and error reporting.

type MetricsCollector

type MetricsCollector interface {
	RecordDuration(metric string, duration time.Duration, labels map[string]string)
	IncrementCounter(metric string, labels map[string]string)
	RecordValue(metric string, value float64, labels map[string]string)
}

MetricsCollector interface for collecting EventStore performance and operational metricsCollector.

type Option

type Option func(*EventStore) error

Option defines a functional option for configuring EventStore.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger sets the logger for the EventStore. The logger will receive messages at different levels based on the logger's configured level:

Debug level: SQL queries with execution timing (development use) Info level: Event counts, durations, concurrency conflicts (production-safe) Warn level: Non-critical issues like cleanup failures Error level: Critical failures that cause operation failures.

func WithMetrics

func WithMetrics(collector MetricsCollector) Option

WithMetrics sets the metricsCollector collector for the EventStore. The metricsCollector collector will receive performance and operational metricsCollector including query/append durations, event counts, concurrency conflicts, and database errors.

func WithTableName

func WithTableName(tableName string) Option

WithTableName sets the table name for the EventStore.

Directories

Path Synopsis
internal
adapters
Package adapters provide database adapter implementations for the PostgreSQL event store.
Package adapters provide database adapter implementations for the PostgreSQL event store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL