Documentation
¶
Overview ¶
Package postgres provides a PostgreSQL implementation of the go-sync-kit EventStore with real-time LISTEN/NOTIFY capabilities for event streaming.
Index ¶
- Variables
- func ParseVersion(s string) (cursor.IntegerCursor, error)
- type Config
- type EventHandler
- type NotificationListener
- func (nl *NotificationListener) Close() error
- func (nl *NotificationListener) GetActiveChannels() []string
- func (nl *NotificationListener) IsConnected() bool
- func (nl *NotificationListener) Start(ctx context.Context) error
- func (nl *NotificationListener) SubscribeToAll(handler EventHandler) error
- func (nl *NotificationListener) SubscribeToEventType(eventType string, handler EventHandler) error
- func (nl *NotificationListener) SubscribeToStream(aggregateID string, handler EventHandler) error
- func (nl *NotificationListener) UnsubscribeFromAll() error
- func (nl *NotificationListener) UnsubscribeFromStream(aggregateID string) error
- func (nl *NotificationListener) WithNotificationTimeout(timeout time.Duration) *NotificationListener
- func (nl *NotificationListener) WithReconnectSettings(interval time.Duration, maxAttempts int) *NotificationListener
- type NotificationPayload
- type PostgresEventStore
- func (s *PostgresEventStore) Close() error
- func (s *PostgresEventStore) LatestVersion(ctx context.Context) (synckit.Version, error)
- func (s *PostgresEventStore) Load(ctx context.Context, since synckit.Version) ([]synckit.EventWithVersion, error)
- func (s *PostgresEventStore) LoadByAggregate(ctx context.Context, aggregateID string, since synckit.Version) ([]synckit.EventWithVersion, error)
- func (s *PostgresEventStore) ParseVersion(ctx context.Context, versionStr string) (synckit.Version, error)
- func (s *PostgresEventStore) Stats() sql.DBStats
- func (s *PostgresEventStore) Store(ctx context.Context, event synckit.Event, version synckit.Version) error
- func (s *PostgresEventStore) StoreBatch(ctx context.Context, events []synckit.EventWithVersion) error
- type RealtimeEventStore
- func (rs *RealtimeEventStore) GetActiveSubscriptions() []string
- func (rs *RealtimeEventStore) IsListenerConnected() bool
- func (rs *RealtimeEventStore) SubscribeToAll(ctx context.Context, handler func(NotificationPayload) error) error
- func (rs *RealtimeEventStore) SubscribeToEventType(ctx context.Context, eventType string, handler func(NotificationPayload) error) error
- func (rs *RealtimeEventStore) SubscribeToStream(ctx context.Context, aggregateID string, ...) error
- type StoredEvent
- type SubscriptionManager
Constants ¶
This section is empty.
Variables ¶
var ( ErrIncompatibleVersion = errors.New("incompatible version type: expected cursor.IntegerCursor") ErrEventNotFound = errors.New("event not found") ErrStoreClosed = errors.New("store is closed") ErrInvalidConnection = errors.New("invalid database connection") )
Custom errors for better error handling
Functions ¶
func ParseVersion ¶
func ParseVersion(s string) (cursor.IntegerCursor, error)
ParseVersion parses a version string into a cursor.IntegerCursor. This is useful for HTTP transport and other external integrations.
Types ¶
type Config ¶
type Config struct {
// ConnectionString is the PostgreSQL connection string.
// Example: "postgres://user:password@localhost/dbname?sslmode=require"
ConnectionString string
// Logger is an optional logger for logging internal operations and errors.
// If nil, logging is disabled by default (logs to io.Discard).
Logger *log.Logger
// TableName is the name of the table to store events.
// Defaults to "events" if empty.
TableName string
// Connection pool settings for production workloads.
// Defaults: MaxOpen=25, MaxIdle=10, Lifetime=1h, IdleTime=15m
MaxOpenConns int // Default: 25 - Maximum number of open connections
MaxIdleConns int // Default: 10 - Maximum number of idle connections
ConnMaxLifetime time.Duration // Default: 1h - Maximum lifetime of connections
ConnMaxIdleTime time.Duration // Default: 15m - Maximum idle time before closing
// LISTEN/NOTIFY settings for real-time capabilities
NotificationTimeout time.Duration // Default: 30s - Timeout for waiting on notifications
ReconnectInterval time.Duration // Default: 5s - Interval between reconnection attempts
MaxReconnectAttempts int // Default: 10 - Maximum reconnection attempts before giving up
// Performance tuning
BatchSize int // Default: 1000 - Batch size for bulk operations
EnablePreparedStmts bool // Default: true - Enable prepared statements for better performance
// Monitoring and observability
EnableMetrics bool // Default: false - Enable metrics collection
}
Config holds configuration options for the PostgresEventStore.
Production-ready defaults are applied by DefaultConfig() including:
- Connection pool with 25 max open, 10 max idle connections
- Connection lifetimes of 1 hour max, 15 minutes max idle
- LISTEN/NOTIFY timeout of 30 seconds
- Reconnection with exponential backoff
func DefaultConfig ¶
DefaultConfig returns a Config with production-ready defaults for PostgreSQL.
Default settings include:
- Connection pool: 25 max open, 10 max idle connections
- Connection lifetime: 1 hour max, 15 minutes max idle
- LISTEN/NOTIFY timeout: 30 seconds
- Table name: "events"
- Logging disabled (to io.Discard)
- Prepared statements enabled
type EventHandler ¶
type EventHandler func(payload NotificationPayload) error
EventHandler is a function type for handling incoming event notifications
type NotificationListener ¶
type NotificationListener struct {
// contains filtered or unexported fields
}
NotificationListener manages PostgreSQL LISTEN/NOTIFY connections for real-time event streaming
func NewNotificationListener ¶
func NewNotificationListener(connectionString string, logger *log.Logger) (*NotificationListener, error)
NewNotificationListener creates a new PostgreSQL notification listener
func (*NotificationListener) Close ¶
func (nl *NotificationListener) Close() error
Close shuts down the notification listener
func (*NotificationListener) GetActiveChannels ¶
func (nl *NotificationListener) GetActiveChannels() []string
GetActiveChannels returns a list of currently subscribed channels
func (*NotificationListener) IsConnected ¶
func (nl *NotificationListener) IsConnected() bool
IsConnected returns true if the listener is connected to PostgreSQL
func (*NotificationListener) Start ¶
func (nl *NotificationListener) Start(ctx context.Context) error
Start begins listening for notifications
func (*NotificationListener) SubscribeToAll ¶
func (nl *NotificationListener) SubscribeToAll(handler EventHandler) error
SubscribeToAll subscribes to all events via the global channel
func (*NotificationListener) SubscribeToEventType ¶
func (nl *NotificationListener) SubscribeToEventType(eventType string, handler EventHandler) error
SubscribeToEventType subscribes to events of a specific type across all streams This is implemented by subscribing to the global channel and filtering by event type
func (*NotificationListener) SubscribeToStream ¶
func (nl *NotificationListener) SubscribeToStream(aggregateID string, handler EventHandler) error
SubscribeToStream subscribes to events for a specific aggregate stream
func (*NotificationListener) UnsubscribeFromAll ¶
func (nl *NotificationListener) UnsubscribeFromAll() error
UnsubscribeFromAll unsubscribes from the global events channel
func (*NotificationListener) UnsubscribeFromStream ¶
func (nl *NotificationListener) UnsubscribeFromStream(aggregateID string) error
UnsubscribeFromStream unsubscribes from a specific aggregate stream
func (*NotificationListener) WithNotificationTimeout ¶
func (nl *NotificationListener) WithNotificationTimeout(timeout time.Duration) *NotificationListener
WithNotificationTimeout allows customizing the notification timeout
func (*NotificationListener) WithReconnectSettings ¶
func (nl *NotificationListener) WithReconnectSettings(interval time.Duration, maxAttempts int) *NotificationListener
WithReconnectSettings allows customizing reconnection behavior
type NotificationPayload ¶
type NotificationPayload struct {
Version int64 `json:"version"`
ID string `json:"id"`
AggregateID string `json:"aggregate_id"`
EventType string `json:"event_type"`
StreamName string `json:"stream_name,omitempty"` // Only present in global notifications
CreatedAt time.Time `json:"created_at"`
}
NotificationPayload represents the structure of notification payloads
type PostgresEventStore ¶
type PostgresEventStore struct {
// contains filtered or unexported fields
}
PostgresEventStore implements the synckit.EventStore interface for PostgreSQL with additional LISTEN/NOTIFY capabilities for real-time event streaming.
func New ¶
func New(config *Config) (*PostgresEventStore, error)
New creates a new PostgresEventStore from a Config. If config is nil, returns an error.
func NewWithConnectionString ¶
func NewWithConnectionString(connectionString string) (*PostgresEventStore, error)
NewWithConnectionString is a convenience constructor
func (*PostgresEventStore) Close ¶
func (s *PostgresEventStore) Close() error
Close closes the database connection and stops the notification listener.
func (*PostgresEventStore) LatestVersion ¶
LatestVersion returns the highest version number in the store.
func (*PostgresEventStore) Load ¶
func (s *PostgresEventStore) Load(ctx context.Context, since synckit.Version) ([]synckit.EventWithVersion, error)
Load retrieves all events since a given version.
func (*PostgresEventStore) LoadByAggregate ¶
func (s *PostgresEventStore) LoadByAggregate(ctx context.Context, aggregateID string, since synckit.Version) ([]synckit.EventWithVersion, error)
LoadByAggregate retrieves events for a specific aggregate since a given version.
func (*PostgresEventStore) ParseVersion ¶
func (s *PostgresEventStore) ParseVersion(ctx context.Context, versionStr string) (synckit.Version, error)
ParseVersion converts a string representation into a cursor.IntegerCursor. This allows external integrations to handle PostgreSQL's integer versioning gracefully.
func (*PostgresEventStore) Stats ¶
func (s *PostgresEventStore) Stats() sql.DBStats
Stats returns database statistics for monitoring
func (*PostgresEventStore) Store ¶
func (s *PostgresEventStore) Store(ctx context.Context, event synckit.Event, version synckit.Version) error
Store saves an event to the PostgreSQL database. The version parameter is ignored as PostgreSQL uses BIGSERIAL for auto-incrementing versions. Upon successful insert, this triggers PostgreSQL LISTEN/NOTIFY for real-time subscribers.
func (*PostgresEventStore) StoreBatch ¶
func (s *PostgresEventStore) StoreBatch(ctx context.Context, events []synckit.EventWithVersion) error
StoreBatch stores multiple events in a single transaction for better performance.
type RealtimeEventStore ¶
type RealtimeEventStore struct {
*PostgresEventStore
}
RealtimeEventStore extends the PostgresEventStore with real-time subscription capabilities
func NewRealtimeEventStore ¶
func NewRealtimeEventStore(config *Config) (*RealtimeEventStore, error)
NewRealtimeEventStore creates a new PostgresEventStore with real-time capabilities
func (*RealtimeEventStore) GetActiveSubscriptions ¶
func (rs *RealtimeEventStore) GetActiveSubscriptions() []string
GetActiveSubscriptions returns information about active subscriptions
func (*RealtimeEventStore) IsListenerConnected ¶
func (rs *RealtimeEventStore) IsListenerConnected() bool
IsListenerConnected returns true if the notification listener is connected
func (*RealtimeEventStore) SubscribeToAll ¶
func (rs *RealtimeEventStore) SubscribeToAll(ctx context.Context, handler func(NotificationPayload) error) error
SubscribeToAll subscribes to all real-time events
func (*RealtimeEventStore) SubscribeToEventType ¶
func (rs *RealtimeEventStore) SubscribeToEventType(ctx context.Context, eventType string, handler func(NotificationPayload) error) error
SubscribeToEventType subscribes to events of a specific type
func (*RealtimeEventStore) SubscribeToStream ¶
func (rs *RealtimeEventStore) SubscribeToStream(ctx context.Context, aggregateID string, handler func(NotificationPayload) error) error
SubscribeToStream subscribes to real-time events for a specific aggregate
type StoredEvent ¶
type StoredEvent struct {
// contains filtered or unexported fields
}
StoredEvent is a concrete implementation of synckit.Event used for retrieving events from the database. It holds data and metadata as JSONB.
func (*StoredEvent) AggregateID ¶
func (e *StoredEvent) AggregateID() string
func (*StoredEvent) Data ¶
func (e *StoredEvent) Data() interface{}
func (*StoredEvent) ID ¶
func (e *StoredEvent) ID() string
func (*StoredEvent) Metadata ¶
func (e *StoredEvent) Metadata() map[string]interface{}
func (*StoredEvent) Type ¶
func (e *StoredEvent) Type() string
type SubscriptionManager ¶
type SubscriptionManager struct {
// contains filtered or unexported fields
}
SubscriptionManager manages subscriptions to PostgreSQL LISTEN/NOTIFY channels
func NewSubscriptionManager ¶
func NewSubscriptionManager() *SubscriptionManager
NewSubscriptionManager creates a new subscription manager
func (*SubscriptionManager) GetChannels ¶
func (sm *SubscriptionManager) GetChannels() []string
GetChannels returns all subscribed channels
func (*SubscriptionManager) HandleNotification ¶
func (sm *SubscriptionManager) HandleNotification(channel string, payload string) error
HandleNotification processes an incoming notification
func (*SubscriptionManager) Subscribe ¶
func (sm *SubscriptionManager) Subscribe(channel string, handler EventHandler)
Subscribe adds a handler for a specific channel
func (*SubscriptionManager) Unsubscribe ¶
func (sm *SubscriptionManager) Unsubscribe(channel string)
Unsubscribe removes handlers for a specific channel