Documentation
¶
Overview ¶
Package postgresengine provides a PostgreSQL implementation of the eventstore interface.
This package implements dynamic event streams using PostgreSQL as the storage backend, supporting multiple database adapters (pgx, sql.DB, sqlx) with atomic operations and concurrency control.
Key features:
- Multiple database adapter support (PGX, SQL, SQLX)
- Atomic event appending with concurrency conflict detection
- Dynamic event stream filtering with JSON predicate support
- Configurable table names and structured logging support
- OpenTelemetry-compatible metrics and distributed tracing for comprehensive observability
- Transaction-safe operations with proper resource cleanup
Usage examples:
// Basic usage db, _ := pgxpool.New(context.Background(), dsn) store, _ := postgresengine.NewEventStoreFromPGXPool(db) // With logging, metrics, and tracing (production observability) store, _ := postgresengine.NewEventStoreFromPGXPool( db, postgresengine.WithTableName("my_events"), postgresengine.WithLogger(logger), postgresengine.WithMetrics(metricsCollector), postgresengine.WithTracing(tracingCollector), ) events, maxSeq, _ := store.Query(ctx, filter) err := store.Append(ctx, filter, maxSeq, newEvent)
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ContextualLogger ¶
type ContextualLogger = eventstore.ContextualLogger
ContextualLogger is an alias for eventstore.ContextualLogger for convenience when using postgresengine. It provides methods for context-aware logging with automatic trace correlation.
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore represents a storage mechanism for handling and querying events in an event sourcing implementation. It leverages a database adapter and supports customizable logging, metricsCollector collection, tracing, and event table configuration.
func NewEventStoreFromPGXPool ¶
func NewEventStoreFromPGXPool(db *pgxpool.Pool, options ...Option) (*EventStore, error)
NewEventStoreFromPGXPool creates a new EventStore using a pgx Pool with optional configuration.
func NewEventStoreFromSQLDB ¶
func NewEventStoreFromSQLDB(db *sql.DB, options ...Option) (*EventStore, error)
NewEventStoreFromSQLDB creates a new EventStore using a sql.DB with optional configuration.
func NewEventStoreFromSQLX ¶
func NewEventStoreFromSQLX(db *sqlx.DB, options ...Option) (*EventStore, error)
NewEventStoreFromSQLX creates a new EventStore using a sqlx.DB with optional configuration.
func (*EventStore) Append ¶
func (es *EventStore) Append( ctx context.Context, filter eventstore.Filter, expectedMaxSequenceNumber eventstore.MaxSequenceNumberUint, storableEvents ...eventstore.StorableEvent, ) error
Append attempts to append one or multiple eventstore.StorableEvent(s) onto the Postgres event store respecting concurrency constraints for this "dynamic event stream" based on the provided eventstore.Filter criteria and the expected MaxSequenceNumberUint.
The provided eventstore.Filter criteria should be the same as the ones used for the Query before making the business decisions.
The insert query to append multiple events atomically is heavier than the one built to append a single event. In event-sourced applications, one command/request should typically only produce one event. Only supply multiple events if you are sure that you need to append multiple events at once!
func (*EventStore) Query ¶
func (es *EventStore) Query(ctx context.Context, filter eventstore.Filter) ( eventstore.StorableEvents, eventstore.MaxSequenceNumberUint, error, )
Query retrieves events from the Postgres event store based on the provided eventstore.Filter criteria and returns them as eventstore.StorableEvents as well as the MaxSequenceNumberUint for this "dynamic event stream" at the time of the query.
type Logger ¶
type Logger = eventstore.Logger
Logger is an alias for eventstore.Logger for convenience when using postgresengine. It provides methods for SQL query logging, operational metrics, warnings, and error reporting.
type MetricsCollector ¶
type MetricsCollector = eventstore.MetricsCollector
MetricsCollector is an alias for eventstore.MetricsCollector for convenience when using postgresengine. It provides methods for collecting EventStore performance and operational metrics.
type Option ¶
type Option func(*EventStore) error
Option defines a functional option for configuring EventStore.
func WithContextualLogger ¶
func WithContextualLogger(logger eventstore.ContextualLogger) Option
WithContextualLogger sets the contextual logger for the EventStore. The contextual logger will receive log messages with context information including automatic trace/span correlation when tracing is enabled, enabling unified observability.
func WithLogger ¶
func WithLogger(logger eventstore.Logger) Option
WithLogger sets the logger for the EventStore. The logger will receive messages at different levels based on the logger's configured level:
Debug level: SQL queries with execution timing (development use) Info level: Event counts, durations, concurrency conflicts (production-safe) Warn level: Non-critical issues like cleanup failures Error level: Critical failures that cause operation failures.
func WithMetrics ¶
func WithMetrics(collector eventstore.MetricsCollector) Option
WithMetrics sets the metricsCollector collector for the EventStore. The metricsCollector collector will receive performance and operational metricsCollector including query/append durations, event counts, concurrency conflicts, and database errors.
func WithTableName ¶
WithTableName sets the table name for the EventStore.
func WithTracing ¶
func WithTracing(collector eventstore.TracingCollector) Option
WithTracing sets the tracing collector for the EventStore. The tracing collector will receive distributed tracing information including span creation for query/append operations, context propagation, and error tracking.
type SpanContext ¶
type SpanContext = eventstore.SpanContext
SpanContext is an alias for eventstore.SpanContext for convenience when using postgresengine. It represents an active tracing span that can be finished and updated with attributes.
type TracingCollector ¶
type TracingCollector = eventstore.TracingCollector
TracingCollector is an alias for eventstore.TracingCollector for convenience when using postgresengine. It provides methods for collecting distributed tracing information from EventStore operations.