postgres

package
v0.24.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2025 License: MIT Imports: 18 Imported by: 0

README

PostgreSQL EventStore with LISTEN/NOTIFY

This package provides a PostgreSQL implementation of the go-sync-kit EventStore with real-time notification capabilities using PostgreSQL's LISTEN/NOTIFY mechanism.

Features

  • ACID Transactions: Full transactional support with PostgreSQL's ACID guarantees
  • Real-time Notifications: Instant event notifications via PostgreSQL LISTEN/NOTIFY
  • Stream-based Subscriptions: Subscribe to events for specific aggregates
  • Global Event Subscriptions: Subscribe to all events across all streams
  • Event Type Filtering: Subscribe to events of specific types
  • Connection Recovery: Automatic reconnection with exponential backoff
  • High Performance: Connection pooling and prepared statements
  • Production Ready: Comprehensive error handling, logging, and monitoring

Quick Start

1. Start PostgreSQL with Docker
# Start PostgreSQL using the provided Docker Compose
docker-compose -f docker-compose.test.yml up -d postgres

# Or run PostgreSQL manually
docker run --name postgres-eventstore \
  -e POSTGRES_DB=eventstore \
  -e POSTGRES_USER=user \
  -e POSTGRES_PASSWORD=password \
  -p 5432:5432 -d postgres:15
2. Basic Usage
package main

import (
    "context"
    "log"
    "os"
    
    "github.com/c0deZ3R0/go-sync-kit/storage/postgres"
    "github.com/c0deZ3R0/go-sync-kit/synckit"
)

func main() {
    // Create configuration
    config := postgres.DefaultConfig("postgres://user:password@localhost/eventstore?sslmode=disable")
    config.Logger = log.New(os.Stdout, "[EventStore] ", log.LstdFlags)
    
    // Create EventStore
    store, err := postgres.New(config)
    if err != nil {
        log.Fatal("Failed to create store:", err)
    }
    defer store.Close()
    
    ctx := context.Background()
    
    // Store an event
    event := &MyEvent{
        id:          "event-1",
        eventType:   "UserCreated",
        aggregateID: "user-123",
        data:        map[string]string{"name": "John Doe"},
    }
    
    err = store.Store(ctx, event, nil)
    if err != nil {
        log.Fatal("Failed to store event:", err)
    }
    
    // Load events
    events, err := store.Load(ctx, cursor.IntegerCursor{Seq: 0})
    if err != nil {
        log.Fatal("Failed to load events:", err)
    }
    
    log.Printf("Loaded %d events", len(events))
}
3. Real-time Subscriptions
package main

import (
    "context"
    "log"
    "os"
    
    "github.com/c0deZ3R0/go-sync-kit/storage/postgres"
)

func main() {
    config := postgres.DefaultConfig("postgres://user:password@localhost/eventstore?sslmode=disable")
    store, err := postgres.NewRealtimeEventStore(config)
    if err != nil {
        log.Fatal("Failed to create realtime store:", err)
    }
    defer store.Close()
    
    ctx := context.Background()
    
    // Subscribe to events for a specific aggregate
    err = store.SubscribeToStream(ctx, "user-123", func(payload postgres.NotificationPayload) error {
        log.Printf("New event for user-123: %s (%s)", payload.EventType, payload.ID)
        return nil
    })
    if err != nil {
        log.Fatal("Failed to subscribe:", err)
    }
    
    // Subscribe to all events
    err = store.SubscribeToAll(ctx, func(payload postgres.NotificationPayload) error {
        log.Printf("Global event: %s for %s", payload.EventType, payload.AggregateID)
        return nil
    })
    if err != nil {
        log.Fatal("Failed to subscribe to all:", err)
    }
    
    // Keep the program running to receive notifications
    select {}
}

Configuration

The Config struct provides comprehensive configuration options:

config := &postgres.Config{
    ConnectionString: "postgres://user:password@localhost/eventstore?sslmode=disable",
    
    // Connection Pool Settings
    MaxOpenConns:    25,              // Maximum number of open connections
    MaxIdleConns:    10,              // Maximum number of idle connections
    ConnMaxLifetime: time.Hour,       // Maximum connection lifetime
    ConnMaxIdleTime: 15 * time.Minute, // Maximum idle time before closing
    
    // LISTEN/NOTIFY Settings
    NotificationTimeout:    30 * time.Second, // Timeout for waiting on notifications
    ReconnectInterval:      5 * time.Second,  // Interval between reconnection attempts
    MaxReconnectAttempts:   10,               // Maximum reconnection attempts
    
    // Performance Settings
    BatchSize:           1000, // Batch size for bulk operations
    EnablePreparedStmts: true, // Enable prepared statements
    
    // Monitoring
    Logger:        logger,     // Custom logger instance
    EnableMetrics: true,       // Enable metrics collection
}

Database Schema

The PostgreSQL EventStore uses the following optimized schema:

CREATE TABLE events (
    version         BIGSERIAL PRIMARY KEY,           -- Auto-incrementing version
    id              TEXT NOT NULL UNIQUE,            -- Event ID
    aggregate_id    TEXT NOT NULL,                   -- Aggregate identifier
    event_type      TEXT NOT NULL,                   -- Event type
    data            JSONB,                          -- Event payload (JSONB for querying)
    metadata        JSONB,                          -- Event metadata (JSONB for querying)
    created_at      TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    stream_name     TEXT GENERATED ALWAYS AS ('stream_' || aggregate_id) STORED
);

-- Optimized indexes
CREATE INDEX idx_events_aggregate_id ON events (aggregate_id);
CREATE INDEX idx_events_version ON events (version);
CREATE INDEX idx_events_created_at ON events (created_at);
CREATE INDEX idx_events_stream_name ON events (stream_name);
CREATE INDEX idx_events_type ON events (event_type);
CREATE INDEX idx_events_data_gin ON events USING GIN (data);
CREATE INDEX idx_events_metadata_gin ON events USING GIN (metadata);

LISTEN/NOTIFY Channels

The implementation automatically creates and manages the following notification channels:

  • Stream Channels: stream_{aggregate_id} - Notifications for specific aggregates
  • Global Channel: events_global - Notifications for all events
Notification Payload

Each notification contains a JSON payload with event information:

{
    "version": 12345,
    "id": "event-abc-123",
    "aggregate_id": "user-456",
    "event_type": "UserUpdated",
    "stream_name": "stream_user-456",
    "created_at": "2024-01-15T10:30:00Z"
}

Testing

Prerequisites

Start the test PostgreSQL instance:

docker-compose -f docker-compose.test.yml up -d postgres
Running Tests
# Run all tests
go test ./storage/postgres/...

# Run only unit tests (skip integration tests)
go test -short ./storage/postgres/...

# Run with verbose output
go test -v ./storage/postgres/...

# Run benchmarks
go test -bench=. ./storage/postgres/...

# Set custom connection string
POSTGRES_TEST_CONNECTION="postgres://user:pass@host/db" go test ./storage/postgres/...
Integration Tests

The package includes comprehensive integration tests that:

  • Test basic EventStore operations (Store, Load, LoadByAggregate)
  • Verify real-time notifications work correctly
  • Test connection recovery scenarios
  • Benchmark performance under load

Performance Considerations

Connection Pooling

The EventStore uses connection pooling for optimal performance:

config := postgres.DefaultConfig(connectionString)
config.MaxOpenConns = 25    // Adjust based on your workload
config.MaxIdleConns = 10    // Keep some connections ready
config.ConnMaxLifetime = time.Hour
config.ConnMaxIdleTime = 15 * time.Minute
Prepared Statements

Prepared statements are enabled by default for better performance:

config.EnablePreparedStmts = true
Batch Operations

For high-throughput scenarios, use batch operations:

events := []synckit.EventWithVersion{...}
err := store.StoreBatch(ctx, events)
Indexing

The schema includes optimized indexes for common query patterns:

  • B-tree indexes for exact lookups and range queries
  • GIN indexes for JSONB data and metadata querying

Monitoring and Observability

Database Statistics

Monitor connection pool statistics:

stats := store.Stats()
fmt.Printf("Open connections: %d\n", stats.OpenConnections)
fmt.Printf("In use: %d\n", stats.InUse)
fmt.Printf("Idle: %d\n", stats.Idle)
Logging

Configure comprehensive logging:

logger := log.New(os.Stdout, "[PostgresEventStore] ", log.LstdFlags|log.Lshortfile)
config.Logger = logger
Health Checks

Check listener connectivity:

if store.IsListenerConnected() {
    log.Println("LISTEN/NOTIFY is connected")
} else {
    log.Println("LISTEN/NOTIFY is disconnected")
}

Error Handling

The implementation provides detailed error types:

  • ErrStoreClosed: Store has been closed
  • ErrIncompatibleVersion: Version type mismatch
  • ErrEventNotFound: Event not found
  • ErrInvalidConnection: Database connection issues

All errors are wrapped with context using the syncErrors package.

Production Deployment

Connection String

Use a production-grade connection string with appropriate settings:

postgres://user:password@host:5432/database?sslmode=require&connect_timeout=10&statement_timeout=30000&idle_in_transaction_session_timeout=60000
Security
  • Enable SSL/TLS encryption (sslmode=require)
  • Use connection pooling appropriately
  • Set up proper database permissions
  • Consider using connection string from environment variables
High Availability
  • Use PostgreSQL with replication for high availability
  • Configure proper backup strategies
  • Monitor database health and connection pool metrics
  • Set up alerts for notification listener disconnections
Scaling
  • Partition the events table for very large datasets
  • Use read replicas for read-heavy workloads
  • Consider horizontal scaling with multiple EventStore instances
  • Monitor and tune PostgreSQL configuration for your workload

Migration from SQLite

To migrate from the SQLite EventStore:

  1. Export data from SQLite
  2. Transform to PostgreSQL format
  3. Import into PostgreSQL
  4. Update application configuration
  5. Test thoroughly with both stores

A migration utility will be provided in future releases.

Troubleshooting

Common Issues
  1. Connection refused: Ensure PostgreSQL is running and accessible
  2. Permission denied: Check database user permissions
  3. Notifications not received: Verify LISTEN/NOTIFY setup and firewall settings
  4. High connection count: Tune connection pool settings
  5. Slow queries: Check indexes and query plans
Debug Logging

Enable detailed logging to troubleshoot issues:

logger := log.New(os.Stdout, "[DEBUG] ", log.LstdFlags|log.Lshortfile)
config.Logger = logger
Connection Testing

Test database connectivity:

psql -h localhost -U user -d eventstore -c "SELECT version();"

Contributing

Contributions are welcome! Please:

  1. Add tests for new features
  2. Update documentation
  3. Follow existing code style
  4. Test with real PostgreSQL instances

License

This implementation is part of the go-sync-kit project and follows the same license terms.

Documentation

Overview

Package postgres provides a PostgreSQL-based persistent event store.

PostgresEventStore is suitable for production deployments requiring durability, concurrency, and advanced querying. It implements the synckit.EventStore interface using PostgreSQL as the underlying database. Use NewWithDataSource() to create a store from a connection string.

See also:

Package postgres provides a PostgreSQL implementation of the go-sync-kit EventStore with real-time LISTEN/NOTIFY capabilities for event streaming.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrIncompatibleVersion = errors.New("incompatible version type: expected cursor.IntegerCursor")
	ErrEventNotFound       = errors.New("event not found")
	ErrStoreClosed         = errors.New("store is closed")
	ErrInvalidConnection   = errors.New("invalid database connection")
)

Custom errors for better error handling

Functions

func ParseVersion

func ParseVersion(s string) (cursor.IntegerCursor, error)

ParseVersion parses a version string into a cursor.IntegerCursor. This is useful for HTTP transport and other external integrations.

Types

type Config

type Config struct {
	// ConnectionString is the PostgreSQL connection string.
	// Example: "postgres://user:password@localhost/dbname?sslmode=require"
	ConnectionString string

	// Logger is an optional logger for logging internal operations and errors.
	// If nil, logging is disabled by default (logs to io.Discard).
	Logger *log.Logger

	// TableName is the name of the table to store events.
	// Defaults to "events" if empty.
	TableName string

	// Connection pool settings for production workloads.
	// Defaults: MaxOpen=25, MaxIdle=10, Lifetime=1h, IdleTime=15m
	MaxOpenConns    int           // Default: 25 - Maximum number of open connections
	MaxIdleConns    int           // Default: 10 - Maximum number of idle connections
	ConnMaxLifetime time.Duration // Default: 1h - Maximum lifetime of connections
	ConnMaxIdleTime time.Duration // Default: 15m - Maximum idle time before closing

	// LISTEN/NOTIFY settings for real-time capabilities
	NotificationTimeout  time.Duration // Default: 30s - Timeout for waiting on notifications
	ReconnectInterval    time.Duration // Default: 5s - Interval between reconnection attempts
	MaxReconnectAttempts int           // Default: 10 - Maximum reconnection attempts before giving up

	// Performance tuning
	BatchSize           int  // Default: 1000 - Batch size for bulk operations
	EnablePreparedStmts bool // Default: true - Enable prepared statements for better performance

	// Monitoring and observability
	EnableMetrics bool // Default: false - Enable metrics collection
}

Config holds configuration options for the PostgresEventStore.

Production-ready defaults are applied by DefaultConfig() including:

  • Connection pool with 25 max open, 10 max idle connections
  • Connection lifetimes of 1 hour max, 15 minutes max idle
  • LISTEN/NOTIFY timeout of 30 seconds
  • Reconnection with exponential backoff

func DefaultConfig

func DefaultConfig(connectionString string) *Config

DefaultConfig returns a Config with production-ready defaults for PostgreSQL.

Default settings include:

  • Connection pool: 25 max open, 10 max idle connections
  • Connection lifetime: 1 hour max, 15 minutes max idle
  • LISTEN/NOTIFY timeout: 30 seconds
  • Table name: "events"
  • Logging disabled (to io.Discard)
  • Prepared statements enabled

type EventHandler

type EventHandler func(payload NotificationPayload) error

EventHandler is a function type for handling incoming event notifications

type NotificationListener

type NotificationListener struct {
	// contains filtered or unexported fields
}

NotificationListener manages PostgreSQL LISTEN/NOTIFY connections for real-time event streaming

func NewNotificationListener

func NewNotificationListener(connectionString string, logger *log.Logger) (*NotificationListener, error)

NewNotificationListener creates a new PostgreSQL notification listener

func (*NotificationListener) Close

func (nl *NotificationListener) Close() error

Close shuts down the notification listener

func (*NotificationListener) GetActiveChannels

func (nl *NotificationListener) GetActiveChannels() []string

GetActiveChannels returns a list of currently subscribed channels

func (*NotificationListener) IsConnected

func (nl *NotificationListener) IsConnected() bool

IsConnected returns true if the listener is connected to PostgreSQL

func (*NotificationListener) Start

func (nl *NotificationListener) Start(ctx context.Context) error

Start begins listening for notifications

func (*NotificationListener) SubscribeToAll

func (nl *NotificationListener) SubscribeToAll(handler EventHandler) error

SubscribeToAll subscribes to all events via the global channel

func (*NotificationListener) SubscribeToEventType

func (nl *NotificationListener) SubscribeToEventType(eventType string, handler EventHandler) error

SubscribeToEventType subscribes to events of a specific type across all streams This is implemented by subscribing to the global channel and filtering by event type

func (*NotificationListener) SubscribeToStream

func (nl *NotificationListener) SubscribeToStream(aggregateID string, handler EventHandler) error

SubscribeToStream subscribes to events for a specific aggregate stream

func (*NotificationListener) UnsubscribeFromAll

func (nl *NotificationListener) UnsubscribeFromAll() error

UnsubscribeFromAll unsubscribes from the global events channel

func (*NotificationListener) UnsubscribeFromStream

func (nl *NotificationListener) UnsubscribeFromStream(aggregateID string) error

UnsubscribeFromStream unsubscribes from a specific aggregate stream

func (*NotificationListener) WithNotificationTimeout

func (nl *NotificationListener) WithNotificationTimeout(timeout time.Duration) *NotificationListener

WithNotificationTimeout allows customizing the notification timeout

func (*NotificationListener) WithReconnectSettings

func (nl *NotificationListener) WithReconnectSettings(interval time.Duration, maxAttempts int) *NotificationListener

WithReconnectSettings allows customizing reconnection behavior

type NotificationPayload

type NotificationPayload struct {
	Version     int64     `json:"version"`
	ID          string    `json:"id"`
	AggregateID string    `json:"aggregate_id"`
	EventType   string    `json:"event_type"`
	StreamName  string    `json:"stream_name,omitempty"` // Only present in global notifications
	CreatedAt   time.Time `json:"created_at"`
}

NotificationPayload represents the structure of notification payloads

type PostgresEventStore

type PostgresEventStore struct {
	// contains filtered or unexported fields
}

PostgresEventStore implements the synckit.EventStore interface for PostgreSQL with additional LISTEN/NOTIFY capabilities for real-time event streaming.

func New

func New(config *Config) (*PostgresEventStore, error)

New creates a new PostgresEventStore from a Config. If config is nil, returns an error.

func NewWithConnectionString

func NewWithConnectionString(connectionString string) (*PostgresEventStore, error)

NewWithConnectionString is a convenience constructor

func (*PostgresEventStore) Close

func (s *PostgresEventStore) Close() error

Close closes the database connection and stops the notification listener.

func (*PostgresEventStore) LatestVersion

func (s *PostgresEventStore) LatestVersion(ctx context.Context) (synckit.Version, error)

LatestVersion returns the highest version number in the store.

func (*PostgresEventStore) Load

Load retrieves all events since a given version with optional filters.

func (*PostgresEventStore) LoadByAggregate

func (s *PostgresEventStore) LoadByAggregate(ctx context.Context, aggregateID string, since synckit.Version, filters ...synckit.Filter) ([]synckit.EventWithVersion, error)

LoadByAggregate retrieves events for a specific aggregate since a given version with optional filters.

func (*PostgresEventStore) ParseVersion

func (s *PostgresEventStore) ParseVersion(ctx context.Context, versionStr string) (synckit.Version, error)

ParseVersion converts a string representation into a cursor.IntegerCursor. This allows external integrations to handle PostgreSQL's integer versioning gracefully.

func (*PostgresEventStore) Stats

func (s *PostgresEventStore) Stats() sql.DBStats

Stats returns database statistics for monitoring

func (*PostgresEventStore) Store

func (s *PostgresEventStore) Store(ctx context.Context, event synckit.Event, version synckit.Version) error

Store saves an event to the PostgreSQL database. The version parameter is ignored as PostgreSQL uses BIGSERIAL for auto-incrementing versions. Upon successful insert, this triggers PostgreSQL LISTEN/NOTIFY for real-time subscribers.

func (*PostgresEventStore) StoreBatch

func (s *PostgresEventStore) StoreBatch(ctx context.Context, events []synckit.EventWithVersion) error

StoreBatch stores multiple events in a single transaction for better performance.

type RealtimeEventStore

type RealtimeEventStore struct {
	*PostgresEventStore
}

RealtimeEventStore extends the PostgresEventStore with real-time subscription capabilities

func NewRealtimeEventStore

func NewRealtimeEventStore(config *Config) (*RealtimeEventStore, error)

NewRealtimeEventStore creates a new PostgresEventStore with real-time capabilities

func (*RealtimeEventStore) GetActiveSubscriptions

func (rs *RealtimeEventStore) GetActiveSubscriptions() []string

GetActiveSubscriptions returns information about active subscriptions

func (*RealtimeEventStore) IsListenerConnected

func (rs *RealtimeEventStore) IsListenerConnected() bool

IsListenerConnected returns true if the notification listener is connected

func (*RealtimeEventStore) SubscribeToAll

func (rs *RealtimeEventStore) SubscribeToAll(ctx context.Context, handler func(NotificationPayload) error) error

SubscribeToAll subscribes to all real-time events

func (*RealtimeEventStore) SubscribeToEventType

func (rs *RealtimeEventStore) SubscribeToEventType(ctx context.Context, eventType string, handler func(NotificationPayload) error) error

SubscribeToEventType subscribes to events of a specific type

func (*RealtimeEventStore) SubscribeToStream

func (rs *RealtimeEventStore) SubscribeToStream(ctx context.Context, aggregateID string, handler func(NotificationPayload) error) error

SubscribeToStream subscribes to real-time events for a specific aggregate

type StoredEvent

type StoredEvent struct {
	// contains filtered or unexported fields
}

StoredEvent is a concrete implementation of synckit.Event used for retrieving events from the database. It holds data and metadata as JSONB.

func (*StoredEvent) AggregateID

func (e *StoredEvent) AggregateID() string

func (*StoredEvent) Data

func (e *StoredEvent) Data() interface{}

func (*StoredEvent) ID

func (e *StoredEvent) ID() string

func (*StoredEvent) Metadata

func (e *StoredEvent) Metadata() map[string]interface{}

func (*StoredEvent) Type

func (e *StoredEvent) Type() string

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

SubscriptionManager manages subscriptions to PostgreSQL LISTEN/NOTIFY channels

func NewSubscriptionManager

func NewSubscriptionManager() *SubscriptionManager

NewSubscriptionManager creates a new subscription manager

func (*SubscriptionManager) GetChannels

func (sm *SubscriptionManager) GetChannels() []string

GetChannels returns all subscribed channels

func (*SubscriptionManager) HandleNotification

func (sm *SubscriptionManager) HandleNotification(channel string, payload string) error

HandleNotification processes an incoming notification

func (*SubscriptionManager) Subscribe

func (sm *SubscriptionManager) Subscribe(channel string, handler EventHandler)

Subscribe adds a handler for a specific channel

func (*SubscriptionManager) Unsubscribe

func (sm *SubscriptionManager) Unsubscribe(channel string)

Unsubscribe removes handlers for a specific channel

Directories

Path Synopsis
Package main demonstrates usage of the postgres event store.
Package main demonstrates usage of the postgres event store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL