Documentation
¶
Overview ¶
Package postgres provides a PostgreSQL implementation of the event store adapter.
Package postgres provides a PostgreSQL implementation of the read model repository.
Index ¶
- Constants
- Variables
- type ColumnType
- type ConcurrencyError
- type IdempotencyStore
- func (s *IdempotencyStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *IdempotencyStore) Clear(ctx context.Context) error
- func (s *IdempotencyStore) Count(ctx context.Context) (int64, error)
- func (s *IdempotencyStore) Delete(ctx context.Context, key string) error
- func (s *IdempotencyStore) Exists(ctx context.Context, key string) (bool, error)
- func (s *IdempotencyStore) Get(ctx context.Context, key string) (*adapters.IdempotencyRecord, error)
- func (s *IdempotencyStore) Initialize(ctx context.Context) error
- func (s *IdempotencyStore) Store(ctx context.Context, record *adapters.IdempotencyRecord) error
- type IdempotencyStoreOption
- type IndexDef
- type Option
- type OutboxStore
- func (s *OutboxStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *OutboxStore) Close() error
- func (s *OutboxStore) DB() *sql.DB
- func (s *OutboxStore) FetchPending(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
- func (s *OutboxStore) GetDeadLetterMessages(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
- func (s *OutboxStore) Initialize(ctx context.Context) error
- func (s *OutboxStore) MarkCompleted(ctx context.Context, ids []string) error
- func (s *OutboxStore) MarkFailed(ctx context.Context, id string, lastErr error) error
- func (s *OutboxStore) MoveToDeadLetter(ctx context.Context, maxAttempts int) (int64, error)
- func (s *OutboxStore) RetryFailed(ctx context.Context, maxAttempts int) (int64, error)
- func (s *OutboxStore) Schedule(ctx context.Context, messages []*adapters.OutboxMessage) error
- func (s *OutboxStore) ScheduleInTx(ctx context.Context, tx interface{}, messages []*adapters.OutboxMessage) error
- type OutboxStoreOption
- type PostgresAdapter
- func (a *PostgresAdapter) Append(ctx context.Context, streamID string, events []adapters.EventRecord, ...) ([]adapters.StoredEvent, error)
- func (a *PostgresAdapter) AppendWithOutbox(ctx context.Context, streamID string, events []adapters.EventRecord, ...) ([]adapters.StoredEvent, error)
- func (a *PostgresAdapter) CheckSchema(ctx context.Context, tableName string) (*adapters.SchemaCheckResult, error)
- func (a *PostgresAdapter) Close() error
- func (a *PostgresAdapter) DB() *sql.DB
- func (a *PostgresAdapter) DeleteCheckpoint(ctx context.Context, projectionName string) error
- func (a *PostgresAdapter) DeleteSnapshot(ctx context.Context, streamID string) error
- func (a *PostgresAdapter) ExecuteSQL(ctx context.Context, sql string) error
- func (a *PostgresAdapter) GenerateSchema(projectName, tableName, snapshotTableName, outboxTableName string) string
- func (a *PostgresAdapter) GetAllCheckpoints(ctx context.Context) (map[string]uint64, error)
- func (a *PostgresAdapter) GetAppliedMigrations(ctx context.Context) ([]string, error)
- func (a *PostgresAdapter) GetCheckpoint(ctx context.Context, projectionName string) (uint64, error)
- func (a *PostgresAdapter) GetDiagnosticInfo(ctx context.Context) (*adapters.DiagnosticInfo, error)
- func (a *PostgresAdapter) GetEventStoreStats(ctx context.Context) (*adapters.EventStoreStats, error)
- func (a *PostgresAdapter) GetLastPosition(ctx context.Context) (uint64, error)
- func (a *PostgresAdapter) GetProjection(ctx context.Context, name string) (*adapters.ProjectionInfo, error)
- func (a *PostgresAdapter) GetProjectionHealth(ctx context.Context) (*adapters.ProjectionHealthResult, error)
- func (a *PostgresAdapter) GetStreamEvents(ctx context.Context, streamID string, fromVersion int64, limit int) ([]adapters.StoredEvent, error)
- func (a *PostgresAdapter) GetStreamInfo(ctx context.Context, streamID string) (*adapters.StreamInfo, error)
- func (a *PostgresAdapter) GetTotalEventCount(ctx context.Context) (int64, error)
- func (a *PostgresAdapter) Initialize(ctx context.Context) error
- func (a *PostgresAdapter) ListProjections(ctx context.Context) ([]adapters.ProjectionInfo, error)
- func (a *PostgresAdapter) ListStreams(ctx context.Context, prefix string, limit int) ([]adapters.StreamSummary, error)
- func (a *PostgresAdapter) Load(ctx context.Context, streamID string, fromVersion int64) ([]adapters.StoredEvent, error)
- func (a *PostgresAdapter) LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]adapters.StoredEvent, error)
- func (a *PostgresAdapter) LoadSnapshot(ctx context.Context, streamID string) (*adapters.SnapshotRecord, error)
- func (a *PostgresAdapter) Migrate(ctx context.Context) error
- func (a *PostgresAdapter) MigrationVersion(ctx context.Context) (int, error)
- func (a *PostgresAdapter) Ping(ctx context.Context) error
- func (a *PostgresAdapter) RecordMigration(ctx context.Context, name string) error
- func (a *PostgresAdapter) RemoveMigrationRecord(ctx context.Context, name string) error
- func (a *PostgresAdapter) ResetProjectionCheckpoint(ctx context.Context, name string) error
- func (a *PostgresAdapter) SaveSnapshot(ctx context.Context, streamID string, version int64, data []byte) error
- func (a *PostgresAdapter) Schema() string
- func (a *PostgresAdapter) SetCheckpoint(ctx context.Context, projectionName string, position uint64) error
- func (a *PostgresAdapter) SetProjectionStatus(ctx context.Context, name string, status string) error
- func (a *PostgresAdapter) Stats() sql.DBStats
- func (a *PostgresAdapter) SubscribeAll(ctx context.Context, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
- func (a *PostgresAdapter) SubscribeCategory(ctx context.Context, category string, fromPosition uint64, ...) (<-chan adapters.StoredEvent, error)
- func (a *PostgresAdapter) SubscribeStream(ctx context.Context, streamID string, fromVersion int64, ...) (<-chan adapters.StoredEvent, error)
- type PostgresRepository
- func (r *PostgresRepository[T]) Clear(ctx context.Context) error
- func (r *PostgresRepository[T]) Count(ctx context.Context, query mink.Query) (int64, error)
- func (r *PostgresRepository[T]) Delete(ctx context.Context, id string) error
- func (r *PostgresRepository[T]) DeleteMany(ctx context.Context, query mink.Query) (int64, error)
- func (r *PostgresRepository[T]) DropTable(ctx context.Context) error
- func (r *PostgresRepository[T]) Exists(ctx context.Context, id string) (bool, error)
- func (r *PostgresRepository[T]) Find(ctx context.Context, query mink.Query) ([]*T, error)
- func (r *PostgresRepository[T]) FindOne(ctx context.Context, query mink.Query) (*T, error)
- func (r *PostgresRepository[T]) Get(ctx context.Context, id string) (*T, error)
- func (r *PostgresRepository[T]) GetAll(ctx context.Context) ([]*T, error)
- func (r *PostgresRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)
- func (r *PostgresRepository[T]) Insert(ctx context.Context, model *T) error
- func (r *PostgresRepository[T]) Migrate(ctx context.Context) error
- func (r *PostgresRepository[T]) Schema() string
- func (r *PostgresRepository[T]) TableName() string
- func (r *PostgresRepository[T]) Update(ctx context.Context, id string, updateFn func(*T)) error
- func (r *PostgresRepository[T]) Upsert(ctx context.Context, model *T) error
- func (r *PostgresRepository[T]) WithTx(tx *sql.Tx) *TxRepository[T]
- type ReadModelOption
- type SagaStore
- func (s *SagaStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *SagaStore) Close() error
- func (s *SagaStore) CountByStatus(ctx context.Context) (map[mink.SagaStatus]int64, error)
- func (s *SagaStore) Delete(ctx context.Context, sagaID string) error
- func (s *SagaStore) FindByCorrelationID(ctx context.Context, correlationID string) (*mink.SagaState, error)
- func (s *SagaStore) FindByType(ctx context.Context, sagaType string, statuses ...mink.SagaStatus) ([]*mink.SagaState, error)
- func (s *SagaStore) Initialize(ctx context.Context) error
- func (s *SagaStore) Load(ctx context.Context, sagaID string) (*mink.SagaState, error)
- func (s *SagaStore) Save(ctx context.Context, state *mink.SagaState) error
- type SagaStoreOption
- type StreamNotFoundError
- type TableSchema
- type TxRepository
- func (tr *TxRepository[T]) Clear(ctx context.Context) error
- func (tr *TxRepository[T]) Count(ctx context.Context, query mink.Query) (int64, error)
- func (tr *TxRepository[T]) Delete(ctx context.Context, id string) error
- func (tr *TxRepository[T]) DeleteMany(ctx context.Context, query mink.Query) (int64, error)
- func (tr *TxRepository[T]) Exists(ctx context.Context, id string) (bool, error)
- func (tr *TxRepository[T]) Find(ctx context.Context, query mink.Query) ([]*T, error)
- func (tr *TxRepository[T]) FindOne(ctx context.Context, query mink.Query) (*T, error)
- func (tr *TxRepository[T]) Get(ctx context.Context, id string) (*T, error)
- func (tr *TxRepository[T]) GetAll(ctx context.Context) ([]*T, error)
- func (tr *TxRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)
- func (tr *TxRepository[T]) Insert(ctx context.Context, model *T) error
- func (tr *TxRepository[T]) Update(ctx context.Context, id string, updateFn func(*T)) error
- func (tr *TxRepository[T]) Upsert(ctx context.Context, model *T) error
Constants ¶
const ( AnyVersion = adapters.AnyVersion NoStream = adapters.NoStream StreamExists = adapters.StreamExists )
Version constants for optimistic concurrency control. These are re-exported from the adapters package for convenience.
Variables ¶
var ( ErrAdapterClosed = adapters.ErrAdapterClosed ErrEmptyStreamID = adapters.ErrEmptyStreamID ErrNoEvents = adapters.ErrNoEvents ErrConcurrencyConflict = adapters.ErrConcurrencyConflict ErrStreamNotFound = adapters.ErrStreamNotFound ErrInvalidVersion = adapters.ErrInvalidVersion ErrInvalidSchemaName = fmt.Errorf("mink/postgres: invalid schema name") )
Sentinel errors for the postgres adapter. These are aliases to the adapters package errors for compatibility with errors.Is().
var NewConcurrencyError = adapters.NewConcurrencyError
NewConcurrencyError is an alias for adapters.NewConcurrencyError for backward compatibility.
var NewStreamNotFoundError = adapters.NewStreamNotFoundError
NewStreamNotFoundError is an alias for adapters.NewStreamNotFoundError for backward compatibility.
Functions ¶
This section is empty.
Types ¶
type ColumnType ¶
type ColumnType struct {
Name string
SQLType string
Nullable bool
PrimaryKey bool
Index bool
Unique bool
Default string
}
ColumnType represents SQL column type information.
type ConcurrencyError ¶
type ConcurrencyError = adapters.ConcurrencyError
ConcurrencyError is an alias for adapters.ConcurrencyError for backward compatibility.
type IdempotencyStore ¶
type IdempotencyStore struct {
// contains filtered or unexported fields
}
IdempotencyStore provides a PostgreSQL implementation of mink.IdempotencyStore.
func NewIdempotencyStore ¶
func NewIdempotencyStore(db *sql.DB, opts ...IdempotencyStoreOption) *IdempotencyStore
NewIdempotencyStore creates a new PostgreSQL IdempotencyStore.
func NewIdempotencyStoreFromAdapter ¶
func NewIdempotencyStoreFromAdapter(adapter *PostgresAdapter, opts ...IdempotencyStoreOption) *IdempotencyStore
NewIdempotencyStoreFromAdapter creates a new IdempotencyStore using an existing PostgresAdapter's connection.
func (*IdempotencyStore) Cleanup ¶
Cleanup removes records older than the specified duration. Returns the number of records deleted.
func (*IdempotencyStore) Clear ¶
func (s *IdempotencyStore) Clear(ctx context.Context) error
Clear removes all records from the store. Useful for testing.
func (*IdempotencyStore) Count ¶
func (s *IdempotencyStore) Count(ctx context.Context) (int64, error)
Count returns the total number of records in the store. Useful for testing and monitoring.
func (*IdempotencyStore) Delete ¶
func (s *IdempotencyStore) Delete(ctx context.Context, key string) error
Delete removes an idempotency record by key.
func (*IdempotencyStore) Exists ¶
Exists checks if a record with the given key exists and is not expired.
func (*IdempotencyStore) Get ¶
func (s *IdempotencyStore) Get(ctx context.Context, key string) (*adapters.IdempotencyRecord, error)
Get retrieves an idempotency record by key. Returns nil, nil if the record doesn't exist or is expired.
func (*IdempotencyStore) Initialize ¶
func (s *IdempotencyStore) Initialize(ctx context.Context) error
Initialize creates the idempotency table if it doesn't exist.
func (*IdempotencyStore) Store ¶
func (s *IdempotencyStore) Store(ctx context.Context, record *adapters.IdempotencyRecord) error
Store saves a new idempotency record, using upsert to handle conflicts.
type IdempotencyStoreOption ¶
type IdempotencyStoreOption func(*IdempotencyStore)
IdempotencyStoreOption configures an IdempotencyStore
func WithIdempotencySchema ¶
func WithIdempotencySchema(schema string) IdempotencyStoreOption
WithIdempotencySchema sets the PostgreSQL schema for the idempotency table.
func WithIdempotencyTable ¶
func WithIdempotencyTable(table string) IdempotencyStoreOption
WithIdempotencyTable sets the table name for idempotency records.
type Option ¶
type Option func(*PostgresAdapter)
Option configures a PostgresAdapter.
func WithConnectionMaxIdleTime ¶
WithConnectionMaxIdleTime sets the maximum idle time for connections.
func WithConnectionMaxLifetime ¶
WithConnectionMaxLifetime sets the maximum connection lifetime.
func WithHealthCheck ¶
WithHealthCheck enables periodic connection pool health checking. The health check runs at the specified interval and validates connections.
func WithMaxConnections ¶
WithMaxConnections sets the maximum number of open connections.
func WithMaxIdleConnections ¶
WithMaxIdleConnections sets the maximum number of idle connections.
type OutboxStore ¶
type OutboxStore struct {
// contains filtered or unexported fields
}
OutboxStore provides a PostgreSQL implementation of adapters.OutboxStore.
func NewOutboxStore ¶
func NewOutboxStore(db *sql.DB, opts ...OutboxStoreOption) *OutboxStore
NewOutboxStore creates a new PostgreSQL OutboxStore.
func NewOutboxStoreFromAdapter ¶
func NewOutboxStoreFromAdapter(adapter *PostgresAdapter, opts ...OutboxStoreOption) *OutboxStore
NewOutboxStoreFromAdapter creates a new OutboxStore using an existing PostgresAdapter's connection.
func (*OutboxStore) Close ¶
func (s *OutboxStore) Close() error
Close releases any resources (no-op as db is shared).
func (*OutboxStore) DB ¶
func (s *OutboxStore) DB() *sql.DB
DB returns the underlying database connection (for testing).
func (*OutboxStore) FetchPending ¶
func (s *OutboxStore) FetchPending(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
FetchPending atomically claims up to limit pending messages for processing. Uses SELECT ... FOR UPDATE SKIP LOCKED to prevent double-claiming.
func (*OutboxStore) GetDeadLetterMessages ¶
func (s *OutboxStore) GetDeadLetterMessages(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
GetDeadLetterMessages retrieves dead-lettered messages.
func (*OutboxStore) Initialize ¶
func (s *OutboxStore) Initialize(ctx context.Context) error
Initialize creates the outbox table if it doesn't exist.
func (*OutboxStore) MarkCompleted ¶
func (s *OutboxStore) MarkCompleted(ctx context.Context, ids []string) error
MarkCompleted marks messages as successfully delivered.
func (*OutboxStore) MarkFailed ¶
MarkFailed marks a message as failed with an error description.
func (*OutboxStore) MoveToDeadLetter ¶
MoveToDeadLetter transitions messages that exceeded per-message max_attempts or global maxAttempts to dead letter.
func (*OutboxStore) RetryFailed ¶
RetryFailed resets eligible failed messages (below per-message max_attempts or global maxAttempts) to pending.
func (*OutboxStore) Schedule ¶
func (s *OutboxStore) Schedule(ctx context.Context, messages []*adapters.OutboxMessage) error
Schedule stores outbox messages for later processing.
func (*OutboxStore) ScheduleInTx ¶
func (s *OutboxStore) ScheduleInTx(ctx context.Context, tx interface{}, messages []*adapters.OutboxMessage) error
ScheduleInTx stores outbox messages within an existing database transaction.
type OutboxStoreOption ¶
type OutboxStoreOption func(*OutboxStore)
OutboxStoreOption configures an OutboxStore.
func WithOutboxSchema ¶
func WithOutboxSchema(schema string) OutboxStoreOption
WithOutboxSchema sets the PostgreSQL schema for the outbox table.
func WithOutboxTableName ¶
func WithOutboxTableName(table string) OutboxStoreOption
WithOutboxTableName sets the table name for outbox records.
type PostgresAdapter ¶
type PostgresAdapter struct {
// contains filtered or unexported fields
}
PostgresAdapter is a PostgreSQL implementation of EventStoreAdapter.
func NewAdapter ¶
func NewAdapter(connStr string, opts ...Option) (*PostgresAdapter, error)
NewAdapter creates a new PostgreSQL event store adapter.
func NewAdapterWithDB ¶
func NewAdapterWithDB(db *sql.DB, opts ...Option) (*PostgresAdapter, error)
NewAdapterWithDB creates a new adapter with an existing database connection. Returns an error if the schema name is invalid.
func (*PostgresAdapter) Append ¶
func (a *PostgresAdapter) Append(ctx context.Context, streamID string, events []adapters.EventRecord, expectedVersion int64) ([]adapters.StoredEvent, error)
Append stores events to the specified stream with optimistic concurrency control.
func (*PostgresAdapter) AppendWithOutbox ¶
func (a *PostgresAdapter) AppendWithOutbox(ctx context.Context, streamID string, events []adapters.EventRecord, expectedVersion int64, outboxMessages []*adapters.OutboxMessage) ([]adapters.StoredEvent, error)
AppendWithOutbox atomically appends events and schedules outbox messages in a single transaction.
func (*PostgresAdapter) CheckSchema ¶
func (a *PostgresAdapter) CheckSchema(ctx context.Context, tableName string) (*adapters.SchemaCheckResult, error)
CheckSchema verifies the event store schema exists.
func (*PostgresAdapter) Close ¶
func (a *PostgresAdapter) Close() error
Close releases the database connection and stops health checking.
func (*PostgresAdapter) DB ¶
func (a *PostgresAdapter) DB() *sql.DB
DB returns the underlying database connection.
func (*PostgresAdapter) DeleteCheckpoint ¶
func (a *PostgresAdapter) DeleteCheckpoint(ctx context.Context, projectionName string) error
DeleteCheckpoint removes the checkpoint for a projection. This implements mink.CheckpointStore interface.
func (*PostgresAdapter) DeleteSnapshot ¶
func (a *PostgresAdapter) DeleteSnapshot(ctx context.Context, streamID string) error
DeleteSnapshot removes the snapshot for the given stream.
func (*PostgresAdapter) ExecuteSQL ¶
func (a *PostgresAdapter) ExecuteSQL(ctx context.Context, sql string) error
ExecuteSQL runs arbitrary SQL (for applying migrations).
func (*PostgresAdapter) GenerateSchema ¶
func (a *PostgresAdapter) GenerateSchema(projectName, tableName, snapshotTableName, outboxTableName string) string
GenerateSchema returns the DDL for the PostgreSQL event store schema.
func (*PostgresAdapter) GetAllCheckpoints ¶
GetAllCheckpoints returns checkpoints for all projections. This implements mink.CheckpointStore interface.
func (*PostgresAdapter) GetAppliedMigrations ¶
func (a *PostgresAdapter) GetAppliedMigrations(ctx context.Context) ([]string, error)
GetAppliedMigrations returns the list of applied migration names.
func (*PostgresAdapter) GetCheckpoint ¶
GetCheckpoint returns the last processed position for a projection.
func (*PostgresAdapter) GetDiagnosticInfo ¶
func (a *PostgresAdapter) GetDiagnosticInfo(ctx context.Context) (*adapters.DiagnosticInfo, error)
GetDiagnosticInfo returns database version and connection status.
func (*PostgresAdapter) GetEventStoreStats ¶
func (a *PostgresAdapter) GetEventStoreStats(ctx context.Context) (*adapters.EventStoreStats, error)
GetEventStoreStats returns aggregate statistics about the event store.
func (*PostgresAdapter) GetLastPosition ¶
func (a *PostgresAdapter) GetLastPosition(ctx context.Context) (uint64, error)
GetLastPosition returns the global position of the last stored event.
func (*PostgresAdapter) GetProjection ¶
func (a *PostgresAdapter) GetProjection(ctx context.Context, name string) (*adapters.ProjectionInfo, error)
GetProjection returns information about a specific projection.
func (*PostgresAdapter) GetProjectionHealth ¶
func (a *PostgresAdapter) GetProjectionHealth(ctx context.Context) (*adapters.ProjectionHealthResult, error)
GetProjectionHealth returns projection health status.
func (*PostgresAdapter) GetStreamEvents ¶
func (a *PostgresAdapter) GetStreamEvents(ctx context.Context, streamID string, fromVersion int64, limit int) ([]adapters.StoredEvent, error)
GetStreamEvents returns events from a stream with pagination for CLI display.
func (*PostgresAdapter) GetStreamInfo ¶
func (a *PostgresAdapter) GetStreamInfo(ctx context.Context, streamID string) (*adapters.StreamInfo, error)
GetStreamInfo returns metadata about a stream.
func (*PostgresAdapter) GetTotalEventCount ¶
func (a *PostgresAdapter) GetTotalEventCount(ctx context.Context) (int64, error)
GetTotalEventCount returns the highest global position.
func (*PostgresAdapter) Initialize ¶
func (a *PostgresAdapter) Initialize(ctx context.Context) error
Initialize creates the required database schema and tables.
func (*PostgresAdapter) ListProjections ¶
func (a *PostgresAdapter) ListProjections(ctx context.Context) ([]adapters.ProjectionInfo, error)
ListProjections returns all registered projections.
func (*PostgresAdapter) ListStreams ¶
func (a *PostgresAdapter) ListStreams(ctx context.Context, prefix string, limit int) ([]adapters.StreamSummary, error)
ListStreams returns a list of stream summaries for CLI display.
func (*PostgresAdapter) Load ¶
func (a *PostgresAdapter) Load(ctx context.Context, streamID string, fromVersion int64) ([]adapters.StoredEvent, error)
Load retrieves all events from a stream starting from the specified version.
func (*PostgresAdapter) LoadFromPosition ¶
func (a *PostgresAdapter) LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]adapters.StoredEvent, error)
LoadFromPosition loads events starting from a global position. This is used by projection engines to catch up on historical events.
func (*PostgresAdapter) LoadSnapshot ¶
func (a *PostgresAdapter) LoadSnapshot(ctx context.Context, streamID string) (*adapters.SnapshotRecord, error)
LoadSnapshot retrieves the latest snapshot for the given stream (SnapshotAdapter implementation).
func (*PostgresAdapter) Migrate ¶
func (a *PostgresAdapter) Migrate(ctx context.Context) error
Migrate runs database migrations.
func (*PostgresAdapter) MigrationVersion ¶
func (a *PostgresAdapter) MigrationVersion(ctx context.Context) (int, error)
MigrationVersion returns the current migration version.
func (*PostgresAdapter) Ping ¶
func (a *PostgresAdapter) Ping(ctx context.Context) error
Ping checks database connectivity.
func (*PostgresAdapter) RecordMigration ¶
func (a *PostgresAdapter) RecordMigration(ctx context.Context, name string) error
RecordMigration marks a migration as applied.
func (*PostgresAdapter) RemoveMigrationRecord ¶
func (a *PostgresAdapter) RemoveMigrationRecord(ctx context.Context, name string) error
RemoveMigrationRecord removes a migration record (for rollback).
func (*PostgresAdapter) ResetProjectionCheckpoint ¶
func (a *PostgresAdapter) ResetProjectionCheckpoint(ctx context.Context, name string) error
ResetProjectionCheckpoint resets a projection's position to 0 for rebuild.
func (*PostgresAdapter) SaveSnapshot ¶
func (a *PostgresAdapter) SaveSnapshot(ctx context.Context, streamID string, version int64, data []byte) error
SaveSnapshot stores a snapshot for the given stream.
func (*PostgresAdapter) Schema ¶
func (a *PostgresAdapter) Schema() string
Schema returns the schema name.
func (*PostgresAdapter) SetCheckpoint ¶
func (a *PostgresAdapter) SetCheckpoint(ctx context.Context, projectionName string, position uint64) error
SetCheckpoint stores the last processed position for a projection.
func (*PostgresAdapter) SetProjectionStatus ¶
func (a *PostgresAdapter) SetProjectionStatus(ctx context.Context, name string, status string) error
SetProjectionStatus updates a projection's status.
func (*PostgresAdapter) Stats ¶
func (a *PostgresAdapter) Stats() sql.DBStats
Stats returns connection pool statistics.
func (*PostgresAdapter) SubscribeAll ¶
func (a *PostgresAdapter) SubscribeAll(ctx context.Context, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
SubscribeAll subscribes to all events across all streams. Events are delivered starting from the specified global position. This uses polling-based subscription with continuous updates. Optional SubscriptionOptions can be provided to configure behavior.
func (*PostgresAdapter) SubscribeCategory ¶
func (a *PostgresAdapter) SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
SubscribeCategory subscribes to all events from streams in a category. Events are delivered starting from the specified global position with continuous polling. Optional SubscriptionOptions can be provided to configure behavior.
func (*PostgresAdapter) SubscribeStream ¶
func (a *PostgresAdapter) SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
SubscribeStream subscribes to events from a specific stream. Events are delivered starting from the specified version with continuous polling. Optional SubscriptionOptions can be provided to configure behavior.
type PostgresRepository ¶
type PostgresRepository[T any] struct { // contains filtered or unexported fields }
PostgresRepository provides a PostgreSQL implementation of ReadModelRepository. It supports automatic schema migration based on struct tags.
func NewPostgresRepository ¶
func NewPostgresRepository[T any](db *sql.DB, opts ...ReadModelOption) (*PostgresRepository[T], error)
NewPostgresRepository creates a new PostgreSQL-backed repository for read models. The type T should be a struct with mink tags for column mapping.
Supported struct tags:
- `mink:"column_name"` - Column name (default: snake_case of field name)
- `mink:"-"` - Skip this field
- `mink:"column_name,pk"` - Primary key
- `mink:"column_name,index"` - Create index
- `mink:"column_name,unique"` - Unique constraint
- `mink:"column_name,nullable"` - Allow NULL values
- `mink:"column_name,default=value"` - Default value
Example:
type OrderSummary struct {
OrderID string `mink:"order_id,pk"`
CustomerID string `mink:"customer_id,index"`
Status string `mink:"status"`
Total float64 `mink:"total_amount"`
CreatedAt time.Time `mink:"created_at"`
}
repo, err := postgres.NewPostgresRepository[OrderSummary](db,
postgres.WithReadModelSchema("projections"),
postgres.WithTableName("order_summaries"),
)
func (*PostgresRepository[T]) Clear ¶
func (r *PostgresRepository[T]) Clear(ctx context.Context) error
Clear removes all read models.
func (*PostgresRepository[T]) Delete ¶
func (r *PostgresRepository[T]) Delete(ctx context.Context, id string) error
Delete removes a read model by ID.
func (*PostgresRepository[T]) DeleteMany ¶
DeleteMany removes all read models matching the query.
func (*PostgresRepository[T]) DropTable ¶
func (r *PostgresRepository[T]) DropTable(ctx context.Context) error
DropTable removes the read model table (use with caution!).
func (*PostgresRepository[T]) Get ¶
func (r *PostgresRepository[T]) Get(ctx context.Context, id string) (*T, error)
Get retrieves a read model by ID.
func (*PostgresRepository[T]) GetAll ¶
func (r *PostgresRepository[T]) GetAll(ctx context.Context) ([]*T, error)
GetAll returns all read models in the repository.
func (*PostgresRepository[T]) GetMany ¶
func (r *PostgresRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)
GetMany retrieves multiple read models by their IDs.
func (*PostgresRepository[T]) Insert ¶
func (r *PostgresRepository[T]) Insert(ctx context.Context, model *T) error
Insert creates a new read model.
func (*PostgresRepository[T]) Migrate ¶
func (r *PostgresRepository[T]) Migrate(ctx context.Context) error
Migrate creates or updates the table schema.
func (*PostgresRepository[T]) Schema ¶
func (r *PostgresRepository[T]) Schema() string
Schema returns the PostgreSQL schema name.
func (*PostgresRepository[T]) TableName ¶
func (r *PostgresRepository[T]) TableName() string
TableName returns the fully qualified table name.
func (*PostgresRepository[T]) Update ¶
func (r *PostgresRepository[T]) Update(ctx context.Context, id string, updateFn func(*T)) error
Update modifies an existing read model.
func (*PostgresRepository[T]) Upsert ¶
func (r *PostgresRepository[T]) Upsert(ctx context.Context, model *T) error
Upsert creates or updates a read model.
func (*PostgresRepository[T]) WithTx ¶
func (r *PostgresRepository[T]) WithTx(tx *sql.Tx) *TxRepository[T]
WithTx creates a new repository instance that uses the provided transaction.
type ReadModelOption ¶
type ReadModelOption func(*readModelConfig)
ReadModelOption configures the PostgresRepository.
func WithAutoMigrate ¶
func WithAutoMigrate(enabled bool) ReadModelOption
WithAutoMigrate enables automatic table creation and migration. Default is true.
func WithIDField ¶
func WithIDField(field string) ReadModelOption
WithIDField sets the field name used as the primary key. Default is "ID".
func WithReadModelSchema ¶
func WithReadModelSchema(schema string) ReadModelOption
WithReadModelSchema sets the PostgreSQL schema for the read model table.
func WithTableName ¶
func WithTableName(name string) ReadModelOption
WithTableName sets the table name for the read model.
type SagaStore ¶
type SagaStore struct {
// contains filtered or unexported fields
}
SagaStore provides a PostgreSQL implementation of mink.SagaStore.
func NewSagaStore ¶
func NewSagaStore(db *sql.DB, opts ...SagaStoreOption) *SagaStore
NewSagaStore creates a new PostgreSQL SagaStore.
func NewSagaStoreFromAdapter ¶
func NewSagaStoreFromAdapter(adapter *PostgresAdapter, opts ...SagaStoreOption) *SagaStore
NewSagaStoreFromAdapter creates a new SagaStore using an existing PostgresAdapter's connection.
func (*SagaStore) Close ¶
Close releases any resources (no-op for this implementation as db is shared).
func (*SagaStore) CountByStatus ¶
CountByStatus returns the count of sagas by status.
func (*SagaStore) FindByCorrelationID ¶
func (s *SagaStore) FindByCorrelationID(ctx context.Context, correlationID string) (*mink.SagaState, error)
FindByCorrelationID finds a saga by its correlation ID.
func (*SagaStore) FindByType ¶
func (s *SagaStore) FindByType(ctx context.Context, sagaType string, statuses ...mink.SagaStatus) ([]*mink.SagaState, error)
FindByType finds all sagas of a given type with the specified statuses.
func (*SagaStore) Initialize ¶
Initialize creates the saga table if it doesn't exist.
func (*SagaStore) Save ¶
Save persists a saga state with optimistic concurrency control.
Version semantics:
- Version 0: Creates a new saga. Uses INSERT with version=1.
- Version > 0: Updates an existing saga. Uses UPDATE with version check. If the version doesn't match, returns ErrConcurrencyConflict.
The version is incremented atomically by the database (version = version + 1) and returned via RETURNING clause. After a successful save, state.Version is updated with the new version from the database.
type SagaStoreOption ¶
type SagaStoreOption func(*SagaStore)
SagaStoreOption configures a SagaStore.
func WithSagaSchema ¶
func WithSagaSchema(schema string) SagaStoreOption
WithSagaSchema sets the PostgreSQL schema for the saga table.
func WithSagaTable ¶
func WithSagaTable(table string) SagaStoreOption
WithSagaTable sets the table name for saga records.
type StreamNotFoundError ¶
type StreamNotFoundError = adapters.StreamNotFoundError
StreamNotFoundError is an alias for adapters.StreamNotFoundError for backward compatibility.
type TableSchema ¶
type TableSchema struct {
TableName string
Schema string
Columns []ColumnType
Indexes []IndexDef
}
TableSchema represents the schema of a read model table.
type TxRepository ¶
type TxRepository[T any] struct { // contains filtered or unexported fields }
TxRepository wraps PostgresRepository for transaction support. It implements all ReadModelRepository methods within a transaction context.
func (*TxRepository[T]) Clear ¶
func (tr *TxRepository[T]) Clear(ctx context.Context) error
Clear removes all read models within a transaction.
func (*TxRepository[T]) Count ¶
Count returns the number of read models matching the query within a transaction.
func (*TxRepository[T]) Delete ¶
func (tr *TxRepository[T]) Delete(ctx context.Context, id string) error
Delete removes a read model within a transaction.
func (*TxRepository[T]) DeleteMany ¶
DeleteMany removes all read models matching the query within a transaction.
func (*TxRepository[T]) Exists ¶
Exists checks if a read model with the given ID exists within a transaction.
func (*TxRepository[T]) Find ¶
Find queries read models with the given criteria within a transaction.
func (*TxRepository[T]) FindOne ¶
FindOne returns the first read model matching the query within a transaction.
func (*TxRepository[T]) Get ¶
func (tr *TxRepository[T]) Get(ctx context.Context, id string) (*T, error)
Get retrieves a read model by ID within a transaction.
func (*TxRepository[T]) GetAll ¶
func (tr *TxRepository[T]) GetAll(ctx context.Context) ([]*T, error)
GetAll returns all read models in the repository within a transaction.
func (*TxRepository[T]) GetMany ¶
func (tr *TxRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)
GetMany retrieves multiple read models by their IDs within a transaction.
func (*TxRepository[T]) Insert ¶
func (tr *TxRepository[T]) Insert(ctx context.Context, model *T) error
Insert creates a new read model within a transaction.