Documentation
¶
Overview ¶
Package postgres provides a PostgreSQL-based persistent event store.
PostgresEventStore is suitable for production deployments requiring durability, concurrency, and advanced querying. It implements the synckit.EventStore interface using PostgreSQL as the underlying database. Use NewWithDataSource() to create a store from a connection string.
See also:
- README: https://github.com/c0deZ3R0/go-sync-kit#readme
- Architecture overview: https://github.com/c0deZ3R0/go-sync-kit/blob/main/docs/overview.md
Package postgres provides a PostgreSQL implementation of the go-sync-kit EventStore with real-time LISTEN/NOTIFY capabilities for event streaming.
Index ¶
- Variables
- func ParseVersion(s string) (cursor.IntegerCursor, error)
- type Config
- type EventHandler
- type NotificationListener
- func (nl *NotificationListener) Close() error
- func (nl *NotificationListener) GetActiveChannels() []string
- func (nl *NotificationListener) IsConnected() bool
- func (nl *NotificationListener) Start(ctx context.Context) error
- func (nl *NotificationListener) SubscribeToAll(handler EventHandler) error
- func (nl *NotificationListener) SubscribeToEventType(eventType string, handler EventHandler) error
- func (nl *NotificationListener) SubscribeToStream(aggregateID string, handler EventHandler) error
- func (nl *NotificationListener) UnsubscribeFromAll() error
- func (nl *NotificationListener) UnsubscribeFromStream(aggregateID string) error
- func (nl *NotificationListener) WithNotificationTimeout(timeout time.Duration) *NotificationListener
- func (nl *NotificationListener) WithReconnectSettings(interval time.Duration, maxAttempts int) *NotificationListener
- type NotificationPayload
- type PostgresEventStore
- func (s *PostgresEventStore) Close() error
- func (s *PostgresEventStore) LatestVersion(ctx context.Context) (synckit.Version, error)
- func (s *PostgresEventStore) Load(ctx context.Context, since synckit.Version, filters ...synckit.Filter) ([]synckit.EventWithVersion, error)
- func (s *PostgresEventStore) LoadByAggregate(ctx context.Context, aggregateID string, since synckit.Version, ...) ([]synckit.EventWithVersion, error)
- func (s *PostgresEventStore) ParseVersion(ctx context.Context, versionStr string) (synckit.Version, error)
- func (s *PostgresEventStore) Stats() sql.DBStats
- func (s *PostgresEventStore) Store(ctx context.Context, event synckit.Event, version synckit.Version) error
- func (s *PostgresEventStore) StoreBatch(ctx context.Context, events []synckit.EventWithVersion) error
- type RealtimeEventStore
- func (rs *RealtimeEventStore) GetActiveSubscriptions() []string
- func (rs *RealtimeEventStore) IsListenerConnected() bool
- func (rs *RealtimeEventStore) SubscribeToAll(ctx context.Context, handler func(NotificationPayload) error) error
- func (rs *RealtimeEventStore) SubscribeToEventType(ctx context.Context, eventType string, handler func(NotificationPayload) error) error
- func (rs *RealtimeEventStore) SubscribeToStream(ctx context.Context, aggregateID string, ...) error
- type StoredEvent
- type SubscriptionManager
Constants ¶
This section is empty.
Variables ¶
var ( ErrIncompatibleVersion = errors.New("incompatible version type: expected cursor.IntegerCursor") ErrEventNotFound = errors.New("event not found") ErrStoreClosed = errors.New("store is closed") ErrInvalidConnection = errors.New("invalid database connection") )
Custom errors for better error handling
Functions ¶
func ParseVersion ¶
func ParseVersion(s string) (cursor.IntegerCursor, error)
ParseVersion parses a version string into a cursor.IntegerCursor. This is useful for HTTP transport and other external integrations.
Types ¶
type Config ¶
type Config struct {
// ConnectionString is the PostgreSQL connection string.
// Example: "postgres://user:password@localhost/dbname?sslmode=require"
ConnectionString string
// Logger is an optional logger for logging internal operations and errors.
// If nil, logging is disabled by default (logs to io.Discard).
Logger *log.Logger
// TableName is the name of the table to store events.
// Defaults to "events" if empty.
TableName string
// Connection pool settings for production workloads.
// Defaults: MaxOpen=25, MaxIdle=10, Lifetime=1h, IdleTime=15m
MaxOpenConns int // Default: 25 - Maximum number of open connections
MaxIdleConns int // Default: 10 - Maximum number of idle connections
ConnMaxLifetime time.Duration // Default: 1h - Maximum lifetime of connections
ConnMaxIdleTime time.Duration // Default: 15m - Maximum idle time before closing
// LISTEN/NOTIFY settings for real-time capabilities
NotificationTimeout time.Duration // Default: 30s - Timeout for waiting on notifications
ReconnectInterval time.Duration // Default: 5s - Interval between reconnection attempts
MaxReconnectAttempts int // Default: 10 - Maximum reconnection attempts before giving up
// Performance tuning
BatchSize int // Default: 1000 - Batch size for bulk operations
EnablePreparedStmts bool // Default: true - Enable prepared statements for better performance
// Monitoring and observability
EnableMetrics bool // Default: false - Enable metrics collection
}
Config holds configuration options for the PostgresEventStore.
Production-ready defaults are applied by DefaultConfig() including:
- Connection pool with 25 max open, 10 max idle connections
- Connection lifetimes of 1 hour max, 15 minutes max idle
- LISTEN/NOTIFY timeout of 30 seconds
- Reconnection with exponential backoff
func DefaultConfig ¶
DefaultConfig returns a Config with production-ready defaults for PostgreSQL.
Default settings include:
- Connection pool: 25 max open, 10 max idle connections
- Connection lifetime: 1 hour max, 15 minutes max idle
- LISTEN/NOTIFY timeout: 30 seconds
- Table name: "events"
- Logging disabled (to io.Discard)
- Prepared statements enabled
type EventHandler ¶
type EventHandler func(payload NotificationPayload) error
EventHandler is a function type for handling incoming event notifications
type NotificationListener ¶
type NotificationListener struct {
// contains filtered or unexported fields
}
NotificationListener manages PostgreSQL LISTEN/NOTIFY connections for real-time event streaming
func NewNotificationListener ¶
func NewNotificationListener(connectionString string, logger *log.Logger) (*NotificationListener, error)
NewNotificationListener creates a new PostgreSQL notification listener
func (*NotificationListener) Close ¶
func (nl *NotificationListener) Close() error
Close shuts down the notification listener
func (*NotificationListener) GetActiveChannels ¶
func (nl *NotificationListener) GetActiveChannels() []string
GetActiveChannels returns a list of currently subscribed channels
func (*NotificationListener) IsConnected ¶
func (nl *NotificationListener) IsConnected() bool
IsConnected returns true if the listener is connected to PostgreSQL
func (*NotificationListener) Start ¶
func (nl *NotificationListener) Start(ctx context.Context) error
Start begins listening for notifications
func (*NotificationListener) SubscribeToAll ¶
func (nl *NotificationListener) SubscribeToAll(handler EventHandler) error
SubscribeToAll subscribes to all events via the global channel
func (*NotificationListener) SubscribeToEventType ¶
func (nl *NotificationListener) SubscribeToEventType(eventType string, handler EventHandler) error
SubscribeToEventType subscribes to events of a specific type across all streams This is implemented by subscribing to the global channel and filtering by event type
func (*NotificationListener) SubscribeToStream ¶
func (nl *NotificationListener) SubscribeToStream(aggregateID string, handler EventHandler) error
SubscribeToStream subscribes to events for a specific aggregate stream
func (*NotificationListener) UnsubscribeFromAll ¶
func (nl *NotificationListener) UnsubscribeFromAll() error
UnsubscribeFromAll unsubscribes from the global events channel
func (*NotificationListener) UnsubscribeFromStream ¶
func (nl *NotificationListener) UnsubscribeFromStream(aggregateID string) error
UnsubscribeFromStream unsubscribes from a specific aggregate stream
func (*NotificationListener) WithNotificationTimeout ¶
func (nl *NotificationListener) WithNotificationTimeout(timeout time.Duration) *NotificationListener
WithNotificationTimeout allows customizing the notification timeout
func (*NotificationListener) WithReconnectSettings ¶
func (nl *NotificationListener) WithReconnectSettings(interval time.Duration, maxAttempts int) *NotificationListener
WithReconnectSettings allows customizing reconnection behavior
type NotificationPayload ¶
type NotificationPayload struct {
Version int64 `json:"version"`
ID string `json:"id"`
AggregateID string `json:"aggregate_id"`
EventType string `json:"event_type"`
StreamName string `json:"stream_name,omitempty"` // Only present in global notifications
CreatedAt time.Time `json:"created_at"`
}
NotificationPayload represents the structure of notification payloads
type PostgresEventStore ¶
type PostgresEventStore struct {
// contains filtered or unexported fields
}
PostgresEventStore implements the synckit.EventStore interface for PostgreSQL with additional LISTEN/NOTIFY capabilities for real-time event streaming.
func New ¶
func New(config *Config) (*PostgresEventStore, error)
New creates a new PostgresEventStore from a Config. If config is nil, returns an error.
func NewWithConnectionString ¶
func NewWithConnectionString(connectionString string) (*PostgresEventStore, error)
NewWithConnectionString is a convenience constructor
func (*PostgresEventStore) Close ¶
func (s *PostgresEventStore) Close() error
Close closes the database connection and stops the notification listener.
func (*PostgresEventStore) LatestVersion ¶
LatestVersion returns the highest version number in the store.
func (*PostgresEventStore) Load ¶
func (s *PostgresEventStore) Load(ctx context.Context, since synckit.Version, filters ...synckit.Filter) ([]synckit.EventWithVersion, error)
Load retrieves all events since a given version with optional filters.
func (*PostgresEventStore) LoadByAggregate ¶
func (s *PostgresEventStore) LoadByAggregate(ctx context.Context, aggregateID string, since synckit.Version, filters ...synckit.Filter) ([]synckit.EventWithVersion, error)
LoadByAggregate retrieves events for a specific aggregate since a given version with optional filters.
func (*PostgresEventStore) ParseVersion ¶
func (s *PostgresEventStore) ParseVersion(ctx context.Context, versionStr string) (synckit.Version, error)
ParseVersion converts a string representation into a cursor.IntegerCursor. This allows external integrations to handle PostgreSQL's integer versioning gracefully.
func (*PostgresEventStore) Stats ¶
func (s *PostgresEventStore) Stats() sql.DBStats
Stats returns database statistics for monitoring
func (*PostgresEventStore) Store ¶
func (s *PostgresEventStore) Store(ctx context.Context, event synckit.Event, version synckit.Version) error
Store saves an event to the PostgreSQL database. The version parameter is ignored as PostgreSQL uses BIGSERIAL for auto-incrementing versions. Upon successful insert, this triggers PostgreSQL LISTEN/NOTIFY for real-time subscribers.
func (*PostgresEventStore) StoreBatch ¶
func (s *PostgresEventStore) StoreBatch(ctx context.Context, events []synckit.EventWithVersion) error
StoreBatch stores multiple events in a single transaction for better performance.
type RealtimeEventStore ¶
type RealtimeEventStore struct {
*PostgresEventStore
}
RealtimeEventStore extends the PostgresEventStore with real-time subscription capabilities
func NewRealtimeEventStore ¶
func NewRealtimeEventStore(config *Config) (*RealtimeEventStore, error)
NewRealtimeEventStore creates a new PostgresEventStore with real-time capabilities
func (*RealtimeEventStore) GetActiveSubscriptions ¶
func (rs *RealtimeEventStore) GetActiveSubscriptions() []string
GetActiveSubscriptions returns information about active subscriptions
func (*RealtimeEventStore) IsListenerConnected ¶
func (rs *RealtimeEventStore) IsListenerConnected() bool
IsListenerConnected returns true if the notification listener is connected
func (*RealtimeEventStore) SubscribeToAll ¶
func (rs *RealtimeEventStore) SubscribeToAll(ctx context.Context, handler func(NotificationPayload) error) error
SubscribeToAll subscribes to all real-time events
func (*RealtimeEventStore) SubscribeToEventType ¶
func (rs *RealtimeEventStore) SubscribeToEventType(ctx context.Context, eventType string, handler func(NotificationPayload) error) error
SubscribeToEventType subscribes to events of a specific type
func (*RealtimeEventStore) SubscribeToStream ¶
func (rs *RealtimeEventStore) SubscribeToStream(ctx context.Context, aggregateID string, handler func(NotificationPayload) error) error
SubscribeToStream subscribes to real-time events for a specific aggregate
type StoredEvent ¶
type StoredEvent struct {
// contains filtered or unexported fields
}
StoredEvent is a concrete implementation of synckit.Event used for retrieving events from the database. It holds data and metadata as JSONB.
func (*StoredEvent) AggregateID ¶
func (e *StoredEvent) AggregateID() string
func (*StoredEvent) Data ¶
func (e *StoredEvent) Data() interface{}
func (*StoredEvent) ID ¶
func (e *StoredEvent) ID() string
func (*StoredEvent) Metadata ¶
func (e *StoredEvent) Metadata() map[string]interface{}
func (*StoredEvent) Type ¶
func (e *StoredEvent) Type() string
type SubscriptionManager ¶
type SubscriptionManager struct {
// contains filtered or unexported fields
}
SubscriptionManager manages subscriptions to PostgreSQL LISTEN/NOTIFY channels
func NewSubscriptionManager ¶
func NewSubscriptionManager() *SubscriptionManager
NewSubscriptionManager creates a new subscription manager
func (*SubscriptionManager) GetChannels ¶
func (sm *SubscriptionManager) GetChannels() []string
GetChannels returns all subscribed channels
func (*SubscriptionManager) HandleNotification ¶
func (sm *SubscriptionManager) HandleNotification(channel string, payload string) error
HandleNotification processes an incoming notification
func (*SubscriptionManager) Subscribe ¶
func (sm *SubscriptionManager) Subscribe(channel string, handler EventHandler)
Subscribe adds a handler for a specific channel
func (*SubscriptionManager) Unsubscribe ¶
func (sm *SubscriptionManager) Unsubscribe(channel string)
Unsubscribe removes handlers for a specific channel