Documentation
¶
Overview ¶
Package postgres provides a PostgreSQL implementation of the event store adapter.
Package postgres provides a PostgreSQL implementation of the read model repository.
Index ¶
- Constants
- Variables
- func GenerateSchema(projectName, schemaName, tableName, snapshotTableName, outboxTableName string) string
- type AuditStore
- func (s *AuditStore) Append(ctx context.Context, entry *adapters.AuditEntry) error
- func (s *AuditStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *AuditStore) Clear(ctx context.Context) error
- func (s *AuditStore) Close() error
- func (s *AuditStore) Count(ctx context.Context, q adapters.AuditQuery) (int64, error)
- func (s *AuditStore) Find(ctx context.Context, q adapters.AuditQuery) ([]*adapters.AuditEntry, error)
- func (s *AuditStore) Initialize(ctx context.Context) error
- type AuditStoreOption
- type ColumnType
- type ConcurrencyError
- type IdempotencyStore
- func (s *IdempotencyStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *IdempotencyStore) Clear(ctx context.Context) error
- func (s *IdempotencyStore) Count(ctx context.Context) (int64, error)
- func (s *IdempotencyStore) Delete(ctx context.Context, key string) error
- func (s *IdempotencyStore) Exists(ctx context.Context, key string) (bool, error)
- func (s *IdempotencyStore) Get(ctx context.Context, key string) (*adapters.IdempotencyRecord, error)
- func (s *IdempotencyStore) Initialize(ctx context.Context) error
- func (s *IdempotencyStore) Store(ctx context.Context, record *adapters.IdempotencyRecord) error
- func (s *IdempotencyStore) StoreIfAbsent(ctx context.Context, record *adapters.IdempotencyRecord) (bool, error)
- type IdempotencyStoreOption
- type IndexDef
- type Option
- type OutboxStore
- func (s *OutboxStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *OutboxStore) Close() error
- func (s *OutboxStore) DB() *sql.DB
- func (s *OutboxStore) FetchPending(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
- func (s *OutboxStore) GetDeadLetterMessages(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
- func (s *OutboxStore) Initialize(ctx context.Context) error
- func (s *OutboxStore) MarkCompleted(ctx context.Context, ids []string) error
- func (s *OutboxStore) MarkFailed(ctx context.Context, id string, lastErr error) error
- func (s *OutboxStore) MoveToDeadLetter(ctx context.Context, maxAttempts int) (int64, error)
- func (s *OutboxStore) ReclaimStale(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *OutboxStore) RetryFailed(ctx context.Context, maxAttempts int) (int64, error)
- func (s *OutboxStore) Schedule(ctx context.Context, messages []*adapters.OutboxMessage) error
- func (s *OutboxStore) ScheduleInTx(ctx context.Context, tx interface{}, messages []*adapters.OutboxMessage) error
- type OutboxStoreOption
- type PostgresAdapter
- func (a *PostgresAdapter) Append(ctx context.Context, streamID string, events []adapters.EventRecord, ...) ([]adapters.StoredEvent, error)
- func (a *PostgresAdapter) AppendWithOutbox(ctx context.Context, streamID string, events []adapters.EventRecord, ...) ([]adapters.StoredEvent, error)
- func (a *PostgresAdapter) CheckSchema(ctx context.Context, tableName string) (*adapters.SchemaCheckResult, error)
- func (a *PostgresAdapter) Close() error
- func (a *PostgresAdapter) DB() *sql.DB
- func (a *PostgresAdapter) DeleteCheckpoint(ctx context.Context, projectionName string) error
- func (a *PostgresAdapter) DeleteSnapshot(ctx context.Context, streamID string) error
- func (a *PostgresAdapter) ExecuteSQL(ctx context.Context, sql string) error
- func (a *PostgresAdapter) GenerateSchema(projectName, tableName, snapshotTableName, outboxTableName string) string
- func (a *PostgresAdapter) GetAllCheckpoints(ctx context.Context) (map[string]uint64, error)
- func (a *PostgresAdapter) GetAppliedMigrations(ctx context.Context) ([]string, error)
- func (a *PostgresAdapter) GetCheckpoint(ctx context.Context, projectionName string) (uint64, error)
- func (a *PostgresAdapter) GetDiagnosticInfo(ctx context.Context) (*adapters.DiagnosticInfo, error)
- func (a *PostgresAdapter) GetEventStoreStats(ctx context.Context) (*adapters.EventStoreStats, error)
- func (a *PostgresAdapter) GetLastPosition(ctx context.Context) (uint64, error)
- func (a *PostgresAdapter) GetProjection(ctx context.Context, name string) (*adapters.ProjectionInfo, error)
- func (a *PostgresAdapter) GetProjectionHealth(ctx context.Context) (*adapters.ProjectionHealthResult, error)
- func (a *PostgresAdapter) GetStreamEvents(ctx context.Context, streamID string, fromVersion int64, limit int) ([]adapters.StoredEvent, error)
- func (a *PostgresAdapter) GetStreamInfo(ctx context.Context, streamID string) (*adapters.StreamInfo, error)
- func (a *PostgresAdapter) GetTotalEventCount(ctx context.Context) (int64, error)
- func (a *PostgresAdapter) Initialize(ctx context.Context) error
- func (a *PostgresAdapter) ListProjections(ctx context.Context) ([]adapters.ProjectionInfo, error)
- func (a *PostgresAdapter) ListStreams(ctx context.Context, prefix string, limit int) ([]adapters.StreamSummary, error)
- func (a *PostgresAdapter) Load(ctx context.Context, streamID string, fromVersion int64) ([]adapters.StoredEvent, error)
- func (a *PostgresAdapter) LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]adapters.StoredEvent, error)
- func (a *PostgresAdapter) LoadSnapshot(ctx context.Context, streamID string) (*adapters.SnapshotRecord, error)
- func (a *PostgresAdapter) Migrate(ctx context.Context) error
- func (a *PostgresAdapter) MigrationVersion(ctx context.Context) (int, error)
- func (a *PostgresAdapter) Ping(ctx context.Context) error
- func (a *PostgresAdapter) RecordMigration(ctx context.Context, name string) error
- func (a *PostgresAdapter) RemoveMigrationRecord(ctx context.Context, name string) error
- func (a *PostgresAdapter) ResetProjectionCheckpoint(ctx context.Context, name string) error
- func (a *PostgresAdapter) SaveSnapshot(ctx context.Context, streamID string, version int64, data []byte) error
- func (a *PostgresAdapter) Schema() string
- func (a *PostgresAdapter) SetCheckpoint(ctx context.Context, projectionName string, position uint64) error
- func (a *PostgresAdapter) SetProjectionStatus(ctx context.Context, name string, status string) error
- func (a *PostgresAdapter) Stats() sql.DBStats
- func (a *PostgresAdapter) SubscribeAll(ctx context.Context, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
- func (a *PostgresAdapter) SubscribeCategory(ctx context.Context, category string, fromPosition uint64, ...) (<-chan adapters.StoredEvent, error)
- func (a *PostgresAdapter) SubscribeStream(ctx context.Context, streamID string, fromVersion int64, ...) (<-chan adapters.StoredEvent, error)
- type PostgresRepository
- func (r *PostgresRepository[T]) Clear(ctx context.Context) error
- func (r *PostgresRepository[T]) Count(ctx context.Context, query mink.Query) (int64, error)
- func (r *PostgresRepository[T]) Delete(ctx context.Context, id string) error
- func (r *PostgresRepository[T]) DeleteMany(ctx context.Context, query mink.Query) (int64, error)
- func (r *PostgresRepository[T]) DropTable(ctx context.Context) error
- func (r *PostgresRepository[T]) Exists(ctx context.Context, id string) (bool, error)
- func (r *PostgresRepository[T]) Find(ctx context.Context, query mink.Query) ([]*T, error)
- func (r *PostgresRepository[T]) FindOne(ctx context.Context, query mink.Query) (*T, error)
- func (r *PostgresRepository[T]) Get(ctx context.Context, id string) (*T, error)
- func (r *PostgresRepository[T]) GetAll(ctx context.Context) ([]*T, error)
- func (r *PostgresRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)
- func (r *PostgresRepository[T]) Insert(ctx context.Context, model *T) error
- func (r *PostgresRepository[T]) Migrate(ctx context.Context) error
- func (r *PostgresRepository[T]) Schema() string
- func (r *PostgresRepository[T]) TableName() string
- func (r *PostgresRepository[T]) Update(ctx context.Context, id string, updateFn func(*T)) error
- func (r *PostgresRepository[T]) Upsert(ctx context.Context, model *T) error
- func (r *PostgresRepository[T]) WithTx(tx *sql.Tx) *TxRepository[T]
- type ReadModelOption
- type SagaStore
- func (s *SagaStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *SagaStore) Close() error
- func (s *SagaStore) CountByStatus(ctx context.Context) (map[mink.SagaStatus]int64, error)
- func (s *SagaStore) Delete(ctx context.Context, sagaID string) error
- func (s *SagaStore) FindByCorrelationID(ctx context.Context, correlationID string) (*mink.SagaState, error)
- func (s *SagaStore) FindByType(ctx context.Context, sagaType string, statuses ...mink.SagaStatus) ([]*mink.SagaState, error)
- func (s *SagaStore) Initialize(ctx context.Context) error
- func (s *SagaStore) Load(ctx context.Context, sagaID string) (*mink.SagaState, error)
- func (s *SagaStore) Save(ctx context.Context, state *mink.SagaState) error
- type SagaStoreOption
- type StreamNotFoundError
- type TableSchema
- type TxRepository
- func (tr *TxRepository[T]) Clear(ctx context.Context) error
- func (tr *TxRepository[T]) Count(ctx context.Context, query mink.Query) (int64, error)
- func (tr *TxRepository[T]) Delete(ctx context.Context, id string) error
- func (tr *TxRepository[T]) DeleteMany(ctx context.Context, query mink.Query) (int64, error)
- func (tr *TxRepository[T]) Exists(ctx context.Context, id string) (bool, error)
- func (tr *TxRepository[T]) Find(ctx context.Context, query mink.Query) ([]*T, error)
- func (tr *TxRepository[T]) FindOne(ctx context.Context, query mink.Query) (*T, error)
- func (tr *TxRepository[T]) Get(ctx context.Context, id string) (*T, error)
- func (tr *TxRepository[T]) GetAll(ctx context.Context) ([]*T, error)
- func (tr *TxRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)
- func (tr *TxRepository[T]) Insert(ctx context.Context, model *T) error
- func (tr *TxRepository[T]) Update(ctx context.Context, id string, updateFn func(*T)) error
- func (tr *TxRepository[T]) Upsert(ctx context.Context, model *T) error
Constants ¶
const ( AnyVersion = adapters.AnyVersion NoStream = adapters.NoStream StreamExists = adapters.StreamExists )
Version constants for optimistic concurrency control. These are re-exported from the adapters package for convenience.
Variables ¶
var ( ErrAdapterClosed = adapters.ErrAdapterClosed ErrEmptyStreamID = adapters.ErrEmptyStreamID ErrNoEvents = adapters.ErrNoEvents ErrConcurrencyConflict = adapters.ErrConcurrencyConflict ErrStreamNotFound = adapters.ErrStreamNotFound ErrInvalidVersion = adapters.ErrInvalidVersion ErrInvalidSchemaName = fmt.Errorf("mink/postgres: invalid schema name") )
Sentinel errors for the postgres adapter. These are aliases to the adapters package errors for compatibility with errors.Is().
var NewConcurrencyError = adapters.NewConcurrencyError
NewConcurrencyError is an alias for adapters.NewConcurrencyError for backward compatibility.
var NewStreamNotFoundError = adapters.NewStreamNotFoundError
NewStreamNotFoundError is an alias for adapters.NewStreamNotFoundError for backward compatibility.
Functions ¶
func GenerateSchema ¶ added in v1.0.17
func GenerateSchema(projectName, schemaName, tableName, snapshotTableName, outboxTableName string) string
GenerateSchema returns PostgreSQL DDL aligned with the runtime adapter schema.
Types ¶
type AuditStore ¶ added in v1.1.0
type AuditStore struct {
// contains filtered or unexported fields
}
AuditStore provides a PostgreSQL implementation of mink.AuditStore.
func NewAuditStore ¶ added in v1.1.0
func NewAuditStore(db *sql.DB, opts ...AuditStoreOption) *AuditStore
NewAuditStore creates a new PostgreSQL AuditStore.
func NewAuditStoreFromAdapter ¶ added in v1.1.0
func NewAuditStoreFromAdapter(adapter *PostgresAdapter, opts ...AuditStoreOption) *AuditStore
NewAuditStoreFromAdapter creates a new AuditStore using an existing PostgresAdapter's connection.
The returned store does not share the adapter's migrations: PostgresAdapter.Migrate does NOT create the audit table. Callers must invoke Initialize(ctx) on the returned AuditStore to create its table before use.
func (*AuditStore) Append ¶ added in v1.1.0
func (s *AuditStore) Append(ctx context.Context, entry *adapters.AuditEntry) error
Append writes a single audit entry.
func (*AuditStore) Cleanup ¶ added in v1.1.0
Cleanup removes entries with a timestamp older than olderThan ago. Returns the number of entries removed.
func (*AuditStore) Clear ¶ added in v1.1.0
func (s *AuditStore) Clear(ctx context.Context) error
Clear removes all entries from the store. Useful for testing.
func (*AuditStore) Close ¶ added in v1.1.0
func (s *AuditStore) Close() error
Close is a no-op; the store does not own the *sql.DB.
func (*AuditStore) Count ¶ added in v1.1.0
func (s *AuditStore) Count(ctx context.Context, q adapters.AuditQuery) (int64, error)
Count returns the number of entries matching the query, ignoring Limit/Offset.
func (*AuditStore) Find ¶ added in v1.1.0
func (s *AuditStore) Find(ctx context.Context, q adapters.AuditQuery) ([]*adapters.AuditEntry, error)
Find returns audit entries matching the query, honoring Order/Limit/Offset.
func (*AuditStore) Initialize ¶ added in v1.1.0
func (s *AuditStore) Initialize(ctx context.Context) error
Initialize creates the audit table if it doesn't exist.
type AuditStoreOption ¶ added in v1.1.0
type AuditStoreOption func(*AuditStore)
AuditStoreOption configures an AuditStore.
func WithAuditSchema ¶ added in v1.1.0
func WithAuditSchema(schema string) AuditStoreOption
WithAuditSchema sets the PostgreSQL schema for the audit table.
func WithAuditTable ¶ added in v1.1.0
func WithAuditTable(table string) AuditStoreOption
WithAuditTable sets the table name for audit entries.
type ColumnType ¶
type ColumnType struct {
Name string
SQLType string
Nullable bool
PrimaryKey bool
Index bool
Unique bool
Default string
}
ColumnType represents SQL column type information.
type ConcurrencyError ¶
type ConcurrencyError = adapters.ConcurrencyError
ConcurrencyError is an alias for adapters.ConcurrencyError for backward compatibility.
type IdempotencyStore ¶
type IdempotencyStore struct {
// contains filtered or unexported fields
}
IdempotencyStore provides a PostgreSQL implementation of mink.IdempotencyStore.
func NewIdempotencyStore ¶
func NewIdempotencyStore(db *sql.DB, opts ...IdempotencyStoreOption) *IdempotencyStore
NewIdempotencyStore creates a new PostgreSQL IdempotencyStore.
func NewIdempotencyStoreFromAdapter ¶
func NewIdempotencyStoreFromAdapter(adapter *PostgresAdapter, opts ...IdempotencyStoreOption) *IdempotencyStore
NewIdempotencyStoreFromAdapter creates a new IdempotencyStore using an existing PostgresAdapter's connection.
The returned store does not share the adapter's migrations: PostgresAdapter.Migrate does NOT create the idempotency table. Callers must invoke Initialize(ctx) on the returned IdempotencyStore to create its table before use.
func (*IdempotencyStore) Cleanup ¶
Cleanup removes records older than the specified duration. Returns the number of records deleted.
func (*IdempotencyStore) Clear ¶
func (s *IdempotencyStore) Clear(ctx context.Context) error
Clear removes all records from the store. Useful for testing.
func (*IdempotencyStore) Count ¶
func (s *IdempotencyStore) Count(ctx context.Context) (int64, error)
Count returns the total number of records in the store. Useful for testing and monitoring.
func (*IdempotencyStore) Delete ¶
func (s *IdempotencyStore) Delete(ctx context.Context, key string) error
Delete removes an idempotency record by key.
func (*IdempotencyStore) Exists ¶
Exists checks if a record with the given key exists and is not expired.
func (*IdempotencyStore) Get ¶
func (s *IdempotencyStore) Get(ctx context.Context, key string) (*adapters.IdempotencyRecord, error)
Get retrieves an idempotency record by key. Returns nil, nil if the record doesn't exist or is expired.
func (*IdempotencyStore) Initialize ¶
func (s *IdempotencyStore) Initialize(ctx context.Context) error
Initialize creates the idempotency table if it doesn't exist.
func (*IdempotencyStore) Store ¶
func (s *IdempotencyStore) Store(ctx context.Context, record *adapters.IdempotencyRecord) error
Store saves a new idempotency record, using upsert to handle conflicts.
func (*IdempotencyStore) StoreIfAbsent ¶ added in v1.0.25
func (s *IdempotencyStore) StoreIfAbsent(ctx context.Context, record *adapters.IdempotencyRecord) (bool, error)
StoreIfAbsent atomically inserts the record only if no live (non-expired) record exists for its key, overwriting an expired one. Returns true if stored.
type IdempotencyStoreOption ¶
type IdempotencyStoreOption func(*IdempotencyStore)
IdempotencyStoreOption configures an IdempotencyStore
func WithIdempotencySchema ¶
func WithIdempotencySchema(schema string) IdempotencyStoreOption
WithIdempotencySchema sets the PostgreSQL schema for the idempotency table.
func WithIdempotencyTable ¶
func WithIdempotencyTable(table string) IdempotencyStoreOption
WithIdempotencyTable sets the table name for idempotency records.
type Option ¶
type Option func(*PostgresAdapter)
Option configures a PostgresAdapter.
func WithConnectionMaxIdleTime ¶
WithConnectionMaxIdleTime sets the maximum idle time for connections.
func WithConnectionMaxLifetime ¶
WithConnectionMaxLifetime sets the maximum connection lifetime.
func WithHealthCheck ¶
WithHealthCheck enables periodic connection pool health checking. The health check runs at the specified interval and validates connections.
func WithMaxConnections ¶
WithMaxConnections sets the maximum number of open connections.
func WithMaxIdleConnections ¶
WithMaxIdleConnections sets the maximum number of idle connections.
type OutboxStore ¶
type OutboxStore struct {
// contains filtered or unexported fields
}
OutboxStore provides a PostgreSQL implementation of adapters.OutboxStore.
func NewOutboxStore ¶
func NewOutboxStore(db *sql.DB, opts ...OutboxStoreOption) *OutboxStore
NewOutboxStore creates a new PostgreSQL OutboxStore.
func NewOutboxStoreFromAdapter ¶
func NewOutboxStoreFromAdapter(adapter *PostgresAdapter, opts ...OutboxStoreOption) *OutboxStore
NewOutboxStoreFromAdapter creates a new OutboxStore using an existing PostgresAdapter's connection.
The returned store does not share the adapter's migrations: PostgresAdapter.Migrate does NOT create the outbox table. Callers must invoke Initialize(ctx) on the returned OutboxStore to create its table before use.
func (*OutboxStore) Close ¶
func (s *OutboxStore) Close() error
Close releases any resources (no-op as db is shared).
func (*OutboxStore) DB ¶
func (s *OutboxStore) DB() *sql.DB
DB returns the underlying database connection (for testing).
func (*OutboxStore) FetchPending ¶
func (s *OutboxStore) FetchPending(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
FetchPending atomically claims up to limit pending messages for processing. Uses SELECT ... FOR UPDATE SKIP LOCKED to prevent double-claiming.
func (*OutboxStore) GetDeadLetterMessages ¶
func (s *OutboxStore) GetDeadLetterMessages(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
GetDeadLetterMessages retrieves dead-lettered messages.
func (*OutboxStore) Initialize ¶
func (s *OutboxStore) Initialize(ctx context.Context) error
Initialize creates the outbox table if it doesn't exist.
func (*OutboxStore) MarkCompleted ¶
func (s *OutboxStore) MarkCompleted(ctx context.Context, ids []string) error
MarkCompleted marks messages as successfully delivered.
func (*OutboxStore) MarkFailed ¶
MarkFailed marks a message as failed with an error description.
func (*OutboxStore) MoveToDeadLetter ¶
MoveToDeadLetter transitions messages that exceeded per-message max_attempts or global maxAttempts to dead letter.
func (*OutboxStore) ReclaimStale ¶ added in v1.0.25
ReclaimStale resets messages stuck in OutboxProcessing whose last attempt is older than olderThan back to pending, recovering messages orphaned when a processor crashes between claiming and marking a message.
func (*OutboxStore) RetryFailed ¶
RetryFailed resets eligible failed messages (below per-message max_attempts or global maxAttempts) to pending.
func (*OutboxStore) Schedule ¶
func (s *OutboxStore) Schedule(ctx context.Context, messages []*adapters.OutboxMessage) error
Schedule stores outbox messages for later processing.
func (*OutboxStore) ScheduleInTx ¶
func (s *OutboxStore) ScheduleInTx(ctx context.Context, tx interface{}, messages []*adapters.OutboxMessage) error
ScheduleInTx stores outbox messages within an existing database transaction.
type OutboxStoreOption ¶
type OutboxStoreOption func(*OutboxStore)
OutboxStoreOption configures an OutboxStore.
func WithOutboxSchema ¶
func WithOutboxSchema(schema string) OutboxStoreOption
WithOutboxSchema sets the PostgreSQL schema for the outbox table.
func WithOutboxTableName ¶
func WithOutboxTableName(table string) OutboxStoreOption
WithOutboxTableName sets the table name for outbox records.
type PostgresAdapter ¶
type PostgresAdapter struct {
// contains filtered or unexported fields
}
PostgresAdapter is a PostgreSQL implementation of EventStoreAdapter.
func NewAdapter ¶
func NewAdapter(connStr string, opts ...Option) (*PostgresAdapter, error)
NewAdapter creates a new PostgreSQL event store adapter.
func NewAdapterWithDB ¶
func NewAdapterWithDB(db *sql.DB, opts ...Option) (*PostgresAdapter, error)
NewAdapterWithDB creates a new adapter with an existing database connection. Returns an error if the schema name is invalid.
func (*PostgresAdapter) Append ¶
func (a *PostgresAdapter) Append(ctx context.Context, streamID string, events []adapters.EventRecord, expectedVersion int64) ([]adapters.StoredEvent, error)
Append stores events to the specified stream with optimistic concurrency control.
func (*PostgresAdapter) AppendWithOutbox ¶
func (a *PostgresAdapter) AppendWithOutbox(ctx context.Context, streamID string, events []adapters.EventRecord, expectedVersion int64, outbox adapters.OutboxStore, outboxMessages []*adapters.OutboxMessage) ([]adapters.StoredEvent, error)
AppendWithOutbox atomically appends events and schedules outbox messages in a single transaction. The messages are scheduled into the provided outbox store (the one the caller configured) via its ScheduleInTx, so the atomic path writes to the same table the caller reads from. The store must be a *postgres.OutboxStore so it can enrol in this adapter's transaction; a nil or non-PostgreSQL store with messages present is an error (it could not participate in the transaction).
func (*PostgresAdapter) CheckSchema ¶
func (a *PostgresAdapter) CheckSchema(ctx context.Context, tableName string) (*adapters.SchemaCheckResult, error)
CheckSchema verifies the event store schema exists.
func (*PostgresAdapter) Close ¶
func (a *PostgresAdapter) Close() error
Close releases the database connection and stops health checking.
func (*PostgresAdapter) DB ¶
func (a *PostgresAdapter) DB() *sql.DB
DB returns the underlying database connection.
func (*PostgresAdapter) DeleteCheckpoint ¶
func (a *PostgresAdapter) DeleteCheckpoint(ctx context.Context, projectionName string) error
DeleteCheckpoint removes the checkpoint for a projection. This implements mink.CheckpointStore interface.
func (*PostgresAdapter) DeleteSnapshot ¶
func (a *PostgresAdapter) DeleteSnapshot(ctx context.Context, streamID string) error
DeleteSnapshot removes the snapshot for the given stream.
func (*PostgresAdapter) ExecuteSQL ¶
func (a *PostgresAdapter) ExecuteSQL(ctx context.Context, sql string) error
ExecuteSQL runs arbitrary SQL (for applying migrations).
func (*PostgresAdapter) GenerateSchema ¶
func (a *PostgresAdapter) GenerateSchema(projectName, tableName, snapshotTableName, outboxTableName string) string
GenerateSchema returns the DDL for the PostgreSQL event store schema.
func (*PostgresAdapter) GetAllCheckpoints ¶
GetAllCheckpoints returns checkpoints for all projections. This implements mink.CheckpointStore interface.
func (*PostgresAdapter) GetAppliedMigrations ¶
func (a *PostgresAdapter) GetAppliedMigrations(ctx context.Context) ([]string, error)
GetAppliedMigrations returns the list of applied migration names.
func (*PostgresAdapter) GetCheckpoint ¶
GetCheckpoint returns the last processed position for a projection.
func (*PostgresAdapter) GetDiagnosticInfo ¶
func (a *PostgresAdapter) GetDiagnosticInfo(ctx context.Context) (*adapters.DiagnosticInfo, error)
GetDiagnosticInfo returns database version and connection status.
func (*PostgresAdapter) GetEventStoreStats ¶
func (a *PostgresAdapter) GetEventStoreStats(ctx context.Context) (*adapters.EventStoreStats, error)
GetEventStoreStats returns aggregate statistics about the event store.
func (*PostgresAdapter) GetLastPosition ¶
func (a *PostgresAdapter) GetLastPosition(ctx context.Context) (uint64, error)
GetLastPosition returns the global position of the last stored event.
func (*PostgresAdapter) GetProjection ¶
func (a *PostgresAdapter) GetProjection(ctx context.Context, name string) (*adapters.ProjectionInfo, error)
GetProjection returns information about a specific projection.
func (*PostgresAdapter) GetProjectionHealth ¶
func (a *PostgresAdapter) GetProjectionHealth(ctx context.Context) (*adapters.ProjectionHealthResult, error)
GetProjectionHealth returns projection health status.
func (*PostgresAdapter) GetStreamEvents ¶
func (a *PostgresAdapter) GetStreamEvents(ctx context.Context, streamID string, fromVersion int64, limit int) ([]adapters.StoredEvent, error)
GetStreamEvents returns events from a stream with pagination for CLI display.
func (*PostgresAdapter) GetStreamInfo ¶
func (a *PostgresAdapter) GetStreamInfo(ctx context.Context, streamID string) (*adapters.StreamInfo, error)
GetStreamInfo returns metadata about a stream.
func (*PostgresAdapter) GetTotalEventCount ¶
func (a *PostgresAdapter) GetTotalEventCount(ctx context.Context) (int64, error)
GetTotalEventCount returns the highest global position.
func (*PostgresAdapter) Initialize ¶
func (a *PostgresAdapter) Initialize(ctx context.Context) error
Initialize creates the required database schema and tables.
func (*PostgresAdapter) ListProjections ¶
func (a *PostgresAdapter) ListProjections(ctx context.Context) ([]adapters.ProjectionInfo, error)
ListProjections returns all registered projections.
func (*PostgresAdapter) ListStreams ¶
func (a *PostgresAdapter) ListStreams(ctx context.Context, prefix string, limit int) ([]adapters.StreamSummary, error)
ListStreams returns a list of stream summaries for CLI display.
func (*PostgresAdapter) Load ¶
func (a *PostgresAdapter) Load(ctx context.Context, streamID string, fromVersion int64) ([]adapters.StoredEvent, error)
Load retrieves all events from a stream starting from the specified version.
func (*PostgresAdapter) LoadFromPosition ¶
func (a *PostgresAdapter) LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]adapters.StoredEvent, error)
LoadFromPosition loads events starting from a global position. This is used by projection engines to catch up on historical events.
func (*PostgresAdapter) LoadSnapshot ¶
func (a *PostgresAdapter) LoadSnapshot(ctx context.Context, streamID string) (*adapters.SnapshotRecord, error)
LoadSnapshot retrieves the latest snapshot for the given stream (SnapshotAdapter implementation).
func (*PostgresAdapter) Migrate ¶
func (a *PostgresAdapter) Migrate(ctx context.Context) error
Migrate runs database migrations.
It creates the core event-store objects only: the schema plus the streams, events, snapshots, and checkpoints tables. It deliberately does NOT create the outbox, idempotency, or saga tables. Callers that use the NewOutboxStoreFromAdapter, NewIdempotencyStoreFromAdapter, or NewSagaStoreFromAdapter sub-stores must call that store's Initialize(ctx) to create its own table.
func (*PostgresAdapter) MigrationVersion ¶
func (a *PostgresAdapter) MigrationVersion(ctx context.Context) (int, error)
MigrationVersion returns the current migration version.
func (*PostgresAdapter) Ping ¶
func (a *PostgresAdapter) Ping(ctx context.Context) error
Ping checks database connectivity.
func (*PostgresAdapter) RecordMigration ¶
func (a *PostgresAdapter) RecordMigration(ctx context.Context, name string) error
RecordMigration marks a migration as applied.
func (*PostgresAdapter) RemoveMigrationRecord ¶
func (a *PostgresAdapter) RemoveMigrationRecord(ctx context.Context, name string) error
RemoveMigrationRecord removes a migration record (for rollback).
func (*PostgresAdapter) ResetProjectionCheckpoint ¶
func (a *PostgresAdapter) ResetProjectionCheckpoint(ctx context.Context, name string) error
ResetProjectionCheckpoint resets a projection's position to 0 for rebuild.
func (*PostgresAdapter) SaveSnapshot ¶
func (a *PostgresAdapter) SaveSnapshot(ctx context.Context, streamID string, version int64, data []byte) error
SaveSnapshot stores a snapshot for the given stream.
func (*PostgresAdapter) Schema ¶
func (a *PostgresAdapter) Schema() string
Schema returns the schema name.
func (*PostgresAdapter) SetCheckpoint ¶
func (a *PostgresAdapter) SetCheckpoint(ctx context.Context, projectionName string, position uint64) error
SetCheckpoint stores the last processed position for a projection.
func (*PostgresAdapter) SetProjectionStatus ¶
func (a *PostgresAdapter) SetProjectionStatus(ctx context.Context, name string, status string) error
SetProjectionStatus updates a projection's status.
func (*PostgresAdapter) Stats ¶
func (a *PostgresAdapter) Stats() sql.DBStats
Stats returns connection pool statistics.
func (*PostgresAdapter) SubscribeAll ¶
func (a *PostgresAdapter) SubscribeAll(ctx context.Context, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
SubscribeAll subscribes to all events across all streams. Events are delivered starting from the specified global position. This uses polling-based subscription with continuous updates. Optional SubscriptionOptions can be provided to configure behavior.
func (*PostgresAdapter) SubscribeCategory ¶
func (a *PostgresAdapter) SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
SubscribeCategory subscribes to all events from streams in a category. Events are delivered starting from the specified global position with continuous polling. Optional SubscriptionOptions can be provided to configure behavior.
func (*PostgresAdapter) SubscribeStream ¶
func (a *PostgresAdapter) SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
SubscribeStream subscribes to events from a specific stream. Events are delivered starting from the specified version with continuous polling. Optional SubscriptionOptions can be provided to configure behavior.
type PostgresRepository ¶
type PostgresRepository[T any] struct { // contains filtered or unexported fields }
PostgresRepository provides a PostgreSQL implementation of ReadModelRepository. It supports automatic schema migration based on struct tags.
func NewPostgresRepository ¶
func NewPostgresRepository[T any](db *sql.DB, opts ...ReadModelOption) (*PostgresRepository[T], error)
NewPostgresRepository creates a new PostgreSQL-backed repository for read models. The type T should be a struct with mink tags for column mapping.
Supported struct tags:
- `mink:"column_name"` - Column name (default: snake_case of field name)
- `mink:"-"` - Skip this field
- `mink:"column_name,pk"` - Primary key
- `mink:"column_name,index"` - Create index
- `mink:"column_name,unique"` - Unique constraint
- `mink:"column_name,nullable"` - Allow NULL values
- `mink:"column_name,default=value"` - Default value
Example:
type OrderSummary struct {
OrderID string `mink:"order_id,pk"`
CustomerID string `mink:"customer_id,index"`
Status string `mink:"status"`
Total float64 `mink:"total_amount"`
CreatedAt time.Time `mink:"created_at"`
}
repo, err := postgres.NewPostgresRepository[OrderSummary](db,
postgres.WithReadModelSchema("projections"),
postgres.WithTableName("order_summaries"),
)
Auto-migration caveat: by default (WithAutoMigrate is true) this constructor runs blocking schema DDL — CREATE SCHEMA/TABLE/INDEX and ALTER TABLE — using context.Background(), so it cannot be cancelled or deadline-bounded by the caller. Production callers that need a cancellable/bounded migration should either use NewPostgresRepositoryContext to thread a context through, or disable auto-migration with WithAutoMigrate(false) and call Migrate(ctx) explicitly with their own context.
func NewPostgresRepositoryContext ¶ added in v1.0.25
func NewPostgresRepositoryContext[T any](ctx context.Context, db *sql.DB, opts ...ReadModelOption) (*PostgresRepository[T], error)
NewPostgresRepositoryContext is like NewPostgresRepository but threads the provided context into the auto-migration step (when WithAutoMigrate is enabled, the default). This lets callers bound or cancel the blocking schema DDL that runs at construction. The context is only used for migration; once the repository is returned, per-operation methods take their own context.
func (*PostgresRepository[T]) Clear ¶
func (r *PostgresRepository[T]) Clear(ctx context.Context) error
Clear removes all read models.
func (*PostgresRepository[T]) Delete ¶
func (r *PostgresRepository[T]) Delete(ctx context.Context, id string) error
Delete removes a read model by ID.
func (*PostgresRepository[T]) DeleteMany ¶
DeleteMany removes all read models matching the query.
func (*PostgresRepository[T]) DropTable ¶
func (r *PostgresRepository[T]) DropTable(ctx context.Context) error
DropTable removes the read model table (use with caution!).
func (*PostgresRepository[T]) Get ¶
func (r *PostgresRepository[T]) Get(ctx context.Context, id string) (*T, error)
Get retrieves a read model by ID.
func (*PostgresRepository[T]) GetAll ¶
func (r *PostgresRepository[T]) GetAll(ctx context.Context) ([]*T, error)
GetAll returns all read models in the repository.
func (*PostgresRepository[T]) GetMany ¶
func (r *PostgresRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)
GetMany retrieves multiple read models by their IDs.
func (*PostgresRepository[T]) Insert ¶
func (r *PostgresRepository[T]) Insert(ctx context.Context, model *T) error
Insert creates a new read model.
func (*PostgresRepository[T]) Migrate ¶
func (r *PostgresRepository[T]) Migrate(ctx context.Context) error
Migrate creates or updates the table schema.
func (*PostgresRepository[T]) Schema ¶
func (r *PostgresRepository[T]) Schema() string
Schema returns the PostgreSQL schema name.
func (*PostgresRepository[T]) TableName ¶
func (r *PostgresRepository[T]) TableName() string
TableName returns the fully qualified table name.
func (*PostgresRepository[T]) Update ¶
func (r *PostgresRepository[T]) Update(ctx context.Context, id string, updateFn func(*T)) error
Update modifies an existing read model.
func (*PostgresRepository[T]) Upsert ¶
func (r *PostgresRepository[T]) Upsert(ctx context.Context, model *T) error
Upsert creates or updates a read model.
func (*PostgresRepository[T]) WithTx ¶
func (r *PostgresRepository[T]) WithTx(tx *sql.Tx) *TxRepository[T]
WithTx creates a new repository instance that uses the provided transaction.
type ReadModelOption ¶
type ReadModelOption func(*readModelConfig)
ReadModelOption configures the PostgresRepository.
func WithAutoMigrate ¶
func WithAutoMigrate(enabled bool) ReadModelOption
WithAutoMigrate enables automatic table creation and migration. Default is true.
func WithIDField ¶
func WithIDField(field string) ReadModelOption
WithIDField sets the field name used as the primary key. Default is "ID".
func WithReadModelSchema ¶
func WithReadModelSchema(schema string) ReadModelOption
WithReadModelSchema sets the PostgreSQL schema for the read model table.
func WithTableName ¶
func WithTableName(name string) ReadModelOption
WithTableName sets the table name for the read model.
type SagaStore ¶
type SagaStore struct {
// contains filtered or unexported fields
}
SagaStore provides a PostgreSQL implementation of mink.SagaStore.
func NewSagaStore ¶
func NewSagaStore(db *sql.DB, opts ...SagaStoreOption) *SagaStore
NewSagaStore creates a new PostgreSQL SagaStore.
func NewSagaStoreFromAdapter ¶
func NewSagaStoreFromAdapter(adapter *PostgresAdapter, opts ...SagaStoreOption) *SagaStore
NewSagaStoreFromAdapter creates a new SagaStore using an existing PostgresAdapter's connection.
The returned store does not share the adapter's migrations: PostgresAdapter.Migrate does NOT create the saga table. Callers must invoke Initialize(ctx) on the returned SagaStore to create its table before use.
func (*SagaStore) Close ¶
Close releases any resources (no-op for this implementation as db is shared).
func (*SagaStore) CountByStatus ¶
CountByStatus returns the count of sagas by status.
func (*SagaStore) FindByCorrelationID ¶
func (s *SagaStore) FindByCorrelationID(ctx context.Context, correlationID string) (*mink.SagaState, error)
FindByCorrelationID finds a saga by its correlation ID.
func (*SagaStore) FindByType ¶
func (s *SagaStore) FindByType(ctx context.Context, sagaType string, statuses ...mink.SagaStatus) ([]*mink.SagaState, error)
FindByType finds all sagas of a given type with the specified statuses.
func (*SagaStore) Initialize ¶
Initialize creates the saga table if it doesn't exist.
func (*SagaStore) Save ¶
Save persists a saga state with optimistic concurrency control.
Version semantics:
- Version 0: Creates a new saga. Uses INSERT with version=1.
- Version > 0: Updates an existing saga. Uses UPDATE with version check. If the version doesn't match, returns ErrConcurrencyConflict.
The version is incremented atomically by the database (version = version + 1) and returned via RETURNING clause. After a successful save, state.Version is updated with the new version from the database.
Note on Version 0 with an existing ID: because the INSERT uses ON CONFLICT (id) DO UPDATE ... WHERE version = $13 and $13 is the supplied Version, calling Save with Version 0 against a saga ID that already exists (whose stored version is >= 1) matches no row in the UPDATE and surfaces as mink.ErrConcurrencyConflict — it is NOT treated as a clean insert. Use the loaded state's Version (>= 1) to update an existing saga.
type SagaStoreOption ¶
type SagaStoreOption func(*SagaStore)
SagaStoreOption configures a SagaStore.
func WithSagaSchema ¶
func WithSagaSchema(schema string) SagaStoreOption
WithSagaSchema sets the PostgreSQL schema for the saga table.
func WithSagaTable ¶
func WithSagaTable(table string) SagaStoreOption
WithSagaTable sets the table name for saga records.
type StreamNotFoundError ¶
type StreamNotFoundError = adapters.StreamNotFoundError
StreamNotFoundError is an alias for adapters.StreamNotFoundError for backward compatibility.
type TableSchema ¶
type TableSchema struct {
TableName string
Schema string
Columns []ColumnType
Indexes []IndexDef
}
TableSchema represents the schema of a read model table.
type TxRepository ¶
type TxRepository[T any] struct { // contains filtered or unexported fields }
TxRepository wraps PostgresRepository for transaction support. It implements all ReadModelRepository methods within a transaction context.
func (*TxRepository[T]) Clear ¶
func (tr *TxRepository[T]) Clear(ctx context.Context) error
Clear removes all read models within a transaction.
func (*TxRepository[T]) Count ¶
Count returns the number of read models matching the query within a transaction.
func (*TxRepository[T]) Delete ¶
func (tr *TxRepository[T]) Delete(ctx context.Context, id string) error
Delete removes a read model within a transaction.
func (*TxRepository[T]) DeleteMany ¶
DeleteMany removes all read models matching the query within a transaction.
func (*TxRepository[T]) Exists ¶
Exists checks if a read model with the given ID exists within a transaction.
func (*TxRepository[T]) Find ¶
Find queries read models with the given criteria within a transaction.
func (*TxRepository[T]) FindOne ¶
FindOne returns the first read model matching the query within a transaction.
func (*TxRepository[T]) Get ¶
func (tr *TxRepository[T]) Get(ctx context.Context, id string) (*T, error)
Get retrieves a read model by ID within a transaction.
func (*TxRepository[T]) GetAll ¶
func (tr *TxRepository[T]) GetAll(ctx context.Context) ([]*T, error)
GetAll returns all read models in the repository within a transaction.
func (*TxRepository[T]) GetMany ¶
func (tr *TxRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)
GetMany retrieves multiple read models by their IDs within a transaction.
func (*TxRepository[T]) Insert ¶
func (tr *TxRepository[T]) Insert(ctx context.Context, model *T) error
Insert creates a new read model within a transaction.