postgres

package
v0.21.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2025 License: MIT Imports: 18 Imported by: 0

README

PostgreSQL EventStore with LISTEN/NOTIFY

This package provides a PostgreSQL implementation of the go-sync-kit EventStore with real-time notification capabilities using PostgreSQL's LISTEN/NOTIFY mechanism.

Features

  • ACID Transactions: Full transactional support with PostgreSQL's ACID guarantees
  • Real-time Notifications: Instant event notifications via PostgreSQL LISTEN/NOTIFY
  • Stream-based Subscriptions: Subscribe to events for specific aggregates
  • Global Event Subscriptions: Subscribe to all events across all streams
  • Event Type Filtering: Subscribe to events of specific types
  • Connection Recovery: Automatic reconnection with exponential backoff
  • High Performance: Connection pooling and prepared statements
  • Production Ready: Comprehensive error handling, logging, and monitoring

Quick Start

1. Start PostgreSQL with Docker
# Start PostgreSQL using the provided Docker Compose
docker-compose -f docker-compose.test.yml up -d postgres

# Or run PostgreSQL manually
docker run --name postgres-eventstore \
  -e POSTGRES_DB=eventstore \
  -e POSTGRES_USER=user \
  -e POSTGRES_PASSWORD=password \
  -p 5432:5432 -d postgres:15
2. Basic Usage
package main

import (
    "context"
    "log"
    "os"
    
    "github.com/c0deZ3R0/go-sync-kit/storage/postgres"
    "github.com/c0deZ3R0/go-sync-kit/synckit"
)

func main() {
    // Create configuration
    config := postgres.DefaultConfig("postgres://user:password@localhost/eventstore?sslmode=disable")
    config.Logger = log.New(os.Stdout, "[EventStore] ", log.LstdFlags)
    
    // Create EventStore
    store, err := postgres.New(config)
    if err != nil {
        log.Fatal("Failed to create store:", err)
    }
    defer store.Close()
    
    ctx := context.Background()
    
    // Store an event
    event := &MyEvent{
        id:          "event-1",
        eventType:   "UserCreated",
        aggregateID: "user-123",
        data:        map[string]string{"name": "John Doe"},
    }
    
    err = store.Store(ctx, event, nil)
    if err != nil {
        log.Fatal("Failed to store event:", err)
    }
    
    // Load events
    events, err := store.Load(ctx, cursor.IntegerCursor{Seq: 0})
    if err != nil {
        log.Fatal("Failed to load events:", err)
    }
    
    log.Printf("Loaded %d events", len(events))
}
3. Real-time Subscriptions
package main

import (
    "context"
    "log"
    "os"
    
    "github.com/c0deZ3R0/go-sync-kit/storage/postgres"
)

func main() {
    config := postgres.DefaultConfig("postgres://user:password@localhost/eventstore?sslmode=disable")
    store, err := postgres.NewRealtimeEventStore(config)
    if err != nil {
        log.Fatal("Failed to create realtime store:", err)
    }
    defer store.Close()
    
    ctx := context.Background()
    
    // Subscribe to events for a specific aggregate
    err = store.SubscribeToStream(ctx, "user-123", func(payload postgres.NotificationPayload) error {
        log.Printf("New event for user-123: %s (%s)", payload.EventType, payload.ID)
        return nil
    })
    if err != nil {
        log.Fatal("Failed to subscribe:", err)
    }
    
    // Subscribe to all events
    err = store.SubscribeToAll(ctx, func(payload postgres.NotificationPayload) error {
        log.Printf("Global event: %s for %s", payload.EventType, payload.AggregateID)
        return nil
    })
    if err != nil {
        log.Fatal("Failed to subscribe to all:", err)
    }
    
    // Keep the program running to receive notifications
    select {}
}

Configuration

The Config struct provides comprehensive configuration options:

config := &postgres.Config{
    ConnectionString: "postgres://user:password@localhost/eventstore?sslmode=disable",
    
    // Connection Pool Settings
    MaxOpenConns:    25,              // Maximum number of open connections
    MaxIdleConns:    10,              // Maximum number of idle connections
    ConnMaxLifetime: time.Hour,       // Maximum connection lifetime
    ConnMaxIdleTime: 15 * time.Minute, // Maximum idle time before closing
    
    // LISTEN/NOTIFY Settings
    NotificationTimeout:    30 * time.Second, // Timeout for waiting on notifications
    ReconnectInterval:      5 * time.Second,  // Interval between reconnection attempts
    MaxReconnectAttempts:   10,               // Maximum reconnection attempts
    
    // Performance Settings
    BatchSize:           1000, // Batch size for bulk operations
    EnablePreparedStmts: true, // Enable prepared statements
    
    // Monitoring
    Logger:        logger,     // Custom logger instance
    EnableMetrics: true,       // Enable metrics collection
}

Database Schema

The PostgreSQL EventStore uses the following optimized schema:

CREATE TABLE events (
    version         BIGSERIAL PRIMARY KEY,           -- Auto-incrementing version
    id              TEXT NOT NULL UNIQUE,            -- Event ID
    aggregate_id    TEXT NOT NULL,                   -- Aggregate identifier
    event_type      TEXT NOT NULL,                   -- Event type
    data            JSONB,                          -- Event payload (JSONB for querying)
    metadata        JSONB,                          -- Event metadata (JSONB for querying)
    created_at      TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    stream_name     TEXT GENERATED ALWAYS AS ('stream_' || aggregate_id) STORED
);

-- Optimized indexes
CREATE INDEX idx_events_aggregate_id ON events (aggregate_id);
CREATE INDEX idx_events_version ON events (version);
CREATE INDEX idx_events_created_at ON events (created_at);
CREATE INDEX idx_events_stream_name ON events (stream_name);
CREATE INDEX idx_events_type ON events (event_type);
CREATE INDEX idx_events_data_gin ON events USING GIN (data);
CREATE INDEX idx_events_metadata_gin ON events USING GIN (metadata);

LISTEN/NOTIFY Channels

The implementation automatically creates and manages the following notification channels:

  • Stream Channels: stream_{aggregate_id} - Notifications for specific aggregates
  • Global Channel: events_global - Notifications for all events
Notification Payload

Each notification contains a JSON payload with event information:

{
    "version": 12345,
    "id": "event-abc-123",
    "aggregate_id": "user-456",
    "event_type": "UserUpdated",
    "stream_name": "stream_user-456",
    "created_at": "2024-01-15T10:30:00Z"
}

Testing

Prerequisites

Start the test PostgreSQL instance:

docker-compose -f docker-compose.test.yml up -d postgres
Running Tests
# Run all tests
go test ./storage/postgres/...

# Run only unit tests (skip integration tests)
go test -short ./storage/postgres/...

# Run with verbose output
go test -v ./storage/postgres/...

# Run benchmarks
go test -bench=. ./storage/postgres/...

# Set custom connection string
POSTGRES_TEST_CONNECTION="postgres://user:pass@host/db" go test ./storage/postgres/...
Integration Tests

The package includes comprehensive integration tests that:

  • Test basic EventStore operations (Store, Load, LoadByAggregate)
  • Verify real-time notifications work correctly
  • Test connection recovery scenarios
  • Benchmark performance under load

Performance Considerations

Connection Pooling

The EventStore uses connection pooling for optimal performance:

config := postgres.DefaultConfig(connectionString)
config.MaxOpenConns = 25    // Adjust based on your workload
config.MaxIdleConns = 10    // Keep some connections ready
config.ConnMaxLifetime = time.Hour
config.ConnMaxIdleTime = 15 * time.Minute
Prepared Statements

Prepared statements are enabled by default for better performance:

config.EnablePreparedStmts = true
Batch Operations

For high-throughput scenarios, use batch operations:

events := []synckit.EventWithVersion{...}
err := store.StoreBatch(ctx, events)
Indexing

The schema includes optimized indexes for common query patterns:

  • B-tree indexes for exact lookups and range queries
  • GIN indexes for JSONB data and metadata querying

Monitoring and Observability

Database Statistics

Monitor connection pool statistics:

stats := store.Stats()
fmt.Printf("Open connections: %d\n", stats.OpenConnections)
fmt.Printf("In use: %d\n", stats.InUse)
fmt.Printf("Idle: %d\n", stats.Idle)
Logging

Configure comprehensive logging:

logger := log.New(os.Stdout, "[PostgresEventStore] ", log.LstdFlags|log.Lshortfile)
config.Logger = logger
Health Checks

Check listener connectivity:

if store.IsListenerConnected() {
    log.Println("LISTEN/NOTIFY is connected")
} else {
    log.Println("LISTEN/NOTIFY is disconnected")
}

Error Handling

The implementation provides detailed error types:

  • ErrStoreClosed: Store has been closed
  • ErrIncompatibleVersion: Version type mismatch
  • ErrEventNotFound: Event not found
  • ErrInvalidConnection: Database connection issues

All errors are wrapped with context using the syncErrors package.

Production Deployment

Connection String

Use a production-grade connection string with appropriate settings:

postgres://user:password@host:5432/database?sslmode=require&connect_timeout=10&statement_timeout=30000&idle_in_transaction_session_timeout=60000
Security
  • Enable SSL/TLS encryption (sslmode=require)
  • Use connection pooling appropriately
  • Set up proper database permissions
  • Consider using connection string from environment variables
High Availability
  • Use PostgreSQL with replication for high availability
  • Configure proper backup strategies
  • Monitor database health and connection pool metrics
  • Set up alerts for notification listener disconnections
Scaling
  • Partition the events table for very large datasets
  • Use read replicas for read-heavy workloads
  • Consider horizontal scaling with multiple EventStore instances
  • Monitor and tune PostgreSQL configuration for your workload

Migration from SQLite

To migrate from the SQLite EventStore:

  1. Export data from SQLite
  2. Transform to PostgreSQL format
  3. Import into PostgreSQL
  4. Update application configuration
  5. Test thoroughly with both stores

A migration utility will be provided in future releases.

Troubleshooting

Common Issues
  1. Connection refused: Ensure PostgreSQL is running and accessible
  2. Permission denied: Check database user permissions
  3. Notifications not received: Verify LISTEN/NOTIFY setup and firewall settings
  4. High connection count: Tune connection pool settings
  5. Slow queries: Check indexes and query plans
Debug Logging

Enable detailed logging to troubleshoot issues:

logger := log.New(os.Stdout, "[DEBUG] ", log.LstdFlags|log.Lshortfile)
config.Logger = logger
Connection Testing

Test database connectivity:

psql -h localhost -U user -d eventstore -c "SELECT version();"

Contributing

Contributions are welcome! Please:

  1. Add tests for new features
  2. Update documentation
  3. Follow existing code style
  4. Test with real PostgreSQL instances

License

This implementation is part of the go-sync-kit project and follows the same license terms.

Documentation

Overview

Package postgres provides a PostgreSQL implementation of the go-sync-kit EventStore with real-time LISTEN/NOTIFY capabilities for event streaming.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrIncompatibleVersion = errors.New("incompatible version type: expected cursor.IntegerCursor")
	ErrEventNotFound       = errors.New("event not found")
	ErrStoreClosed         = errors.New("store is closed")
	ErrInvalidConnection   = errors.New("invalid database connection")
)

Custom errors for better error handling

Functions

func ParseVersion

func ParseVersion(s string) (cursor.IntegerCursor, error)

ParseVersion parses a version string into a cursor.IntegerCursor. This is useful for HTTP transport and other external integrations.

Types

type Config

type Config struct {
	// ConnectionString is the PostgreSQL connection string.
	// Example: "postgres://user:password@localhost/dbname?sslmode=require"
	ConnectionString string

	// Logger is an optional logger for logging internal operations and errors.
	// If nil, logging is disabled by default (logs to io.Discard).
	Logger *log.Logger

	// TableName is the name of the table to store events.
	// Defaults to "events" if empty.
	TableName string

	// Connection pool settings for production workloads.
	// Defaults: MaxOpen=25, MaxIdle=10, Lifetime=1h, IdleTime=15m
	MaxOpenConns    int           // Default: 25 - Maximum number of open connections
	MaxIdleConns    int           // Default: 10 - Maximum number of idle connections
	ConnMaxLifetime time.Duration // Default: 1h - Maximum lifetime of connections
	ConnMaxIdleTime time.Duration // Default: 15m - Maximum idle time before closing

	// LISTEN/NOTIFY settings for real-time capabilities
	NotificationTimeout  time.Duration // Default: 30s - Timeout for waiting on notifications
	ReconnectInterval    time.Duration // Default: 5s - Interval between reconnection attempts
	MaxReconnectAttempts int           // Default: 10 - Maximum reconnection attempts before giving up

	// Performance tuning
	BatchSize           int  // Default: 1000 - Batch size for bulk operations
	EnablePreparedStmts bool // Default: true - Enable prepared statements for better performance

	// Monitoring and observability
	EnableMetrics bool // Default: false - Enable metrics collection
}

Config holds configuration options for the PostgresEventStore.

Production-ready defaults are applied by DefaultConfig() including:

  • Connection pool with 25 max open, 10 max idle connections
  • Connection lifetimes of 1 hour max, 15 minutes max idle
  • LISTEN/NOTIFY timeout of 30 seconds
  • Reconnection with exponential backoff

func DefaultConfig

func DefaultConfig(connectionString string) *Config

DefaultConfig returns a Config with production-ready defaults for PostgreSQL.

Default settings include:

  • Connection pool: 25 max open, 10 max idle connections
  • Connection lifetime: 1 hour max, 15 minutes max idle
  • LISTEN/NOTIFY timeout: 30 seconds
  • Table name: "events"
  • Logging disabled (to io.Discard)
  • Prepared statements enabled

type EventHandler

type EventHandler func(payload NotificationPayload) error

EventHandler is a function type for handling incoming event notifications

type NotificationListener

type NotificationListener struct {
	// contains filtered or unexported fields
}

NotificationListener manages PostgreSQL LISTEN/NOTIFY connections for real-time event streaming

func NewNotificationListener

func NewNotificationListener(connectionString string, logger *log.Logger) (*NotificationListener, error)

NewNotificationListener creates a new PostgreSQL notification listener

func (*NotificationListener) Close

func (nl *NotificationListener) Close() error

Close shuts down the notification listener

func (*NotificationListener) GetActiveChannels

func (nl *NotificationListener) GetActiveChannels() []string

GetActiveChannels returns a list of currently subscribed channels

func (*NotificationListener) IsConnected

func (nl *NotificationListener) IsConnected() bool

IsConnected returns true if the listener is connected to PostgreSQL

func (*NotificationListener) Start

func (nl *NotificationListener) Start(ctx context.Context) error

Start begins listening for notifications

func (*NotificationListener) SubscribeToAll

func (nl *NotificationListener) SubscribeToAll(handler EventHandler) error

SubscribeToAll subscribes to all events via the global channel

func (*NotificationListener) SubscribeToEventType

func (nl *NotificationListener) SubscribeToEventType(eventType string, handler EventHandler) error

SubscribeToEventType subscribes to events of a specific type across all streams This is implemented by subscribing to the global channel and filtering by event type

func (*NotificationListener) SubscribeToStream

func (nl *NotificationListener) SubscribeToStream(aggregateID string, handler EventHandler) error

SubscribeToStream subscribes to events for a specific aggregate stream

func (*NotificationListener) UnsubscribeFromAll

func (nl *NotificationListener) UnsubscribeFromAll() error

UnsubscribeFromAll unsubscribes from the global events channel

func (*NotificationListener) UnsubscribeFromStream

func (nl *NotificationListener) UnsubscribeFromStream(aggregateID string) error

UnsubscribeFromStream unsubscribes from a specific aggregate stream

func (*NotificationListener) WithNotificationTimeout

func (nl *NotificationListener) WithNotificationTimeout(timeout time.Duration) *NotificationListener

WithNotificationTimeout allows customizing the notification timeout

func (*NotificationListener) WithReconnectSettings

func (nl *NotificationListener) WithReconnectSettings(interval time.Duration, maxAttempts int) *NotificationListener

WithReconnectSettings allows customizing reconnection behavior

type NotificationPayload

type NotificationPayload struct {
	Version     int64     `json:"version"`
	ID          string    `json:"id"`
	AggregateID string    `json:"aggregate_id"`
	EventType   string    `json:"event_type"`
	StreamName  string    `json:"stream_name,omitempty"` // Only present in global notifications
	CreatedAt   time.Time `json:"created_at"`
}

NotificationPayload represents the structure of notification payloads

type PostgresEventStore

type PostgresEventStore struct {
	// contains filtered or unexported fields
}

PostgresEventStore implements the synckit.EventStore interface for PostgreSQL with additional LISTEN/NOTIFY capabilities for real-time event streaming.

func New

func New(config *Config) (*PostgresEventStore, error)

New creates a new PostgresEventStore from a Config. If config is nil, returns an error.

func NewWithConnectionString

func NewWithConnectionString(connectionString string) (*PostgresEventStore, error)

NewWithConnectionString is a convenience constructor

func (*PostgresEventStore) Close

func (s *PostgresEventStore) Close() error

Close closes the database connection and stops the notification listener.

func (*PostgresEventStore) LatestVersion

func (s *PostgresEventStore) LatestVersion(ctx context.Context) (synckit.Version, error)

LatestVersion returns the highest version number in the store.

func (*PostgresEventStore) Load

Load retrieves all events since a given version.

func (*PostgresEventStore) LoadByAggregate

func (s *PostgresEventStore) LoadByAggregate(ctx context.Context, aggregateID string, since synckit.Version) ([]synckit.EventWithVersion, error)

LoadByAggregate retrieves events for a specific aggregate since a given version.

func (*PostgresEventStore) ParseVersion

func (s *PostgresEventStore) ParseVersion(ctx context.Context, versionStr string) (synckit.Version, error)

ParseVersion converts a string representation into a cursor.IntegerCursor. This allows external integrations to handle PostgreSQL's integer versioning gracefully.

func (*PostgresEventStore) Stats

func (s *PostgresEventStore) Stats() sql.DBStats

Stats returns database statistics for monitoring

func (*PostgresEventStore) Store

func (s *PostgresEventStore) Store(ctx context.Context, event synckit.Event, version synckit.Version) error

Store saves an event to the PostgreSQL database. The version parameter is ignored as PostgreSQL uses BIGSERIAL for auto-incrementing versions. Upon successful insert, this triggers PostgreSQL LISTEN/NOTIFY for real-time subscribers.

func (*PostgresEventStore) StoreBatch

func (s *PostgresEventStore) StoreBatch(ctx context.Context, events []synckit.EventWithVersion) error

StoreBatch stores multiple events in a single transaction for better performance.

type RealtimeEventStore

type RealtimeEventStore struct {
	*PostgresEventStore
}

RealtimeEventStore extends the PostgresEventStore with real-time subscription capabilities

func NewRealtimeEventStore

func NewRealtimeEventStore(config *Config) (*RealtimeEventStore, error)

NewRealtimeEventStore creates a new PostgresEventStore with real-time capabilities

func (*RealtimeEventStore) GetActiveSubscriptions

func (rs *RealtimeEventStore) GetActiveSubscriptions() []string

GetActiveSubscriptions returns information about active subscriptions

func (*RealtimeEventStore) IsListenerConnected

func (rs *RealtimeEventStore) IsListenerConnected() bool

IsListenerConnected returns true if the notification listener is connected

func (*RealtimeEventStore) SubscribeToAll

func (rs *RealtimeEventStore) SubscribeToAll(ctx context.Context, handler func(NotificationPayload) error) error

SubscribeToAll subscribes to all real-time events

func (*RealtimeEventStore) SubscribeToEventType

func (rs *RealtimeEventStore) SubscribeToEventType(ctx context.Context, eventType string, handler func(NotificationPayload) error) error

SubscribeToEventType subscribes to events of a specific type

func (*RealtimeEventStore) SubscribeToStream

func (rs *RealtimeEventStore) SubscribeToStream(ctx context.Context, aggregateID string, handler func(NotificationPayload) error) error

SubscribeToStream subscribes to real-time events for a specific aggregate

type StoredEvent

type StoredEvent struct {
	// contains filtered or unexported fields
}

StoredEvent is a concrete implementation of synckit.Event used for retrieving events from the database. It holds data and metadata as JSONB.

func (*StoredEvent) AggregateID

func (e *StoredEvent) AggregateID() string

func (*StoredEvent) Data

func (e *StoredEvent) Data() interface{}

func (*StoredEvent) ID

func (e *StoredEvent) ID() string

func (*StoredEvent) Metadata

func (e *StoredEvent) Metadata() map[string]interface{}

func (*StoredEvent) Type

func (e *StoredEvent) Type() string

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

SubscriptionManager manages subscriptions to PostgreSQL LISTEN/NOTIFY channels

func NewSubscriptionManager

func NewSubscriptionManager() *SubscriptionManager

NewSubscriptionManager creates a new subscription manager

func (*SubscriptionManager) GetChannels

func (sm *SubscriptionManager) GetChannels() []string

GetChannels returns all subscribed channels

func (*SubscriptionManager) HandleNotification

func (sm *SubscriptionManager) HandleNotification(channel string, payload string) error

HandleNotification processes an incoming notification

func (*SubscriptionManager) Subscribe

func (sm *SubscriptionManager) Subscribe(channel string, handler EventHandler)

Subscribe adds a handler for a specific channel

func (*SubscriptionManager) Unsubscribe

func (sm *SubscriptionManager) Unsubscribe(channel string)

Unsubscribe removes handlers for a specific channel

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL