postgres

package
v1.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package postgres provides a PostgreSQL implementation of the event store adapter.

Package postgres provides a PostgreSQL implementation of the read model repository.

Index

Constants

View Source
const (
	AnyVersion   = adapters.AnyVersion
	NoStream     = adapters.NoStream
	StreamExists = adapters.StreamExists
)

Version constants for optimistic concurrency control. These are re-exported from the adapters package for convenience.

Variables

View Source
var (
	ErrAdapterClosed       = adapters.ErrAdapterClosed
	ErrEmptyStreamID       = adapters.ErrEmptyStreamID
	ErrNoEvents            = adapters.ErrNoEvents
	ErrConcurrencyConflict = adapters.ErrConcurrencyConflict
	ErrStreamNotFound      = adapters.ErrStreamNotFound
	ErrInvalidVersion      = adapters.ErrInvalidVersion
	ErrInvalidSchemaName   = fmt.Errorf("mink/postgres: invalid schema name")
)

Sentinel errors for the postgres adapter. These are aliases to the adapters package errors for compatibility with errors.Is().

View Source
var NewConcurrencyError = adapters.NewConcurrencyError

NewConcurrencyError is an alias for adapters.NewConcurrencyError for backward compatibility.

View Source
var NewStreamNotFoundError = adapters.NewStreamNotFoundError

NewStreamNotFoundError is an alias for adapters.NewStreamNotFoundError for backward compatibility.

Functions

This section is empty.

Types

type ColumnType

type ColumnType struct {
	Name       string
	SQLType    string
	Nullable   bool
	PrimaryKey bool
	Index      bool
	Unique     bool
	Default    string
}

ColumnType represents SQL column type information.

type ConcurrencyError

type ConcurrencyError = adapters.ConcurrencyError

ConcurrencyError is an alias for adapters.ConcurrencyError for backward compatibility.

type IdempotencyStore

type IdempotencyStore struct {
	// contains filtered or unexported fields
}

IdempotencyStore provides a PostgreSQL implementation of mink.IdempotencyStore.

func NewIdempotencyStore

func NewIdempotencyStore(db *sql.DB, opts ...IdempotencyStoreOption) *IdempotencyStore

NewIdempotencyStore creates a new PostgreSQL IdempotencyStore.

func NewIdempotencyStoreFromAdapter

func NewIdempotencyStoreFromAdapter(adapter *PostgresAdapter, opts ...IdempotencyStoreOption) *IdempotencyStore

NewIdempotencyStoreFromAdapter creates a new IdempotencyStore using an existing PostgresAdapter's connection.

func (*IdempotencyStore) Cleanup

func (s *IdempotencyStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)

Cleanup removes records older than the specified duration. Returns the number of records deleted.

func (*IdempotencyStore) Clear

func (s *IdempotencyStore) Clear(ctx context.Context) error

Clear removes all records from the store. Useful for testing.

func (*IdempotencyStore) Count

func (s *IdempotencyStore) Count(ctx context.Context) (int64, error)

Count returns the total number of records in the store. Useful for testing and monitoring.

func (*IdempotencyStore) Delete

func (s *IdempotencyStore) Delete(ctx context.Context, key string) error

Delete removes an idempotency record by key.

func (*IdempotencyStore) Exists

func (s *IdempotencyStore) Exists(ctx context.Context, key string) (bool, error)

Exists checks if a record with the given key exists and is not expired.

func (*IdempotencyStore) Get

Get retrieves an idempotency record by key. Returns nil, nil if the record doesn't exist or is expired.

func (*IdempotencyStore) Initialize

func (s *IdempotencyStore) Initialize(ctx context.Context) error

Initialize creates the idempotency table if it doesn't exist.

func (*IdempotencyStore) Store

Store saves a new idempotency record, using upsert to handle conflicts.

type IdempotencyStoreOption

type IdempotencyStoreOption func(*IdempotencyStore)

IdempotencyStoreOption configures an IdempotencyStore

func WithIdempotencySchema

func WithIdempotencySchema(schema string) IdempotencyStoreOption

WithIdempotencySchema sets the PostgreSQL schema for the idempotency table.

func WithIdempotencyTable

func WithIdempotencyTable(table string) IdempotencyStoreOption

WithIdempotencyTable sets the table name for idempotency records.

type IndexDef

type IndexDef struct {
	Name    string
	Columns []string
	Unique  bool
}

IndexDef represents an index definition.

type Option

type Option func(*PostgresAdapter)

Option configures a PostgresAdapter.

func WithConnectionMaxIdleTime

func WithConnectionMaxIdleTime(d time.Duration) Option

WithConnectionMaxIdleTime sets the maximum idle time for connections.

func WithConnectionMaxLifetime

func WithConnectionMaxLifetime(d time.Duration) Option

WithConnectionMaxLifetime sets the maximum connection lifetime.

func WithHealthCheck

func WithHealthCheck(interval time.Duration) Option

WithHealthCheck enables periodic connection pool health checking. The health check runs at the specified interval and validates connections.

func WithMaxConnections

func WithMaxConnections(n int) Option

WithMaxConnections sets the maximum number of open connections.

func WithMaxIdleConnections

func WithMaxIdleConnections(n int) Option

WithMaxIdleConnections sets the maximum number of idle connections.

func WithSchema

func WithSchema(schema string) Option

WithSchema sets the database schema name.

type OutboxStore

type OutboxStore struct {
	// contains filtered or unexported fields
}

OutboxStore provides a PostgreSQL implementation of adapters.OutboxStore.

func NewOutboxStore

func NewOutboxStore(db *sql.DB, opts ...OutboxStoreOption) *OutboxStore

NewOutboxStore creates a new PostgreSQL OutboxStore.

func NewOutboxStoreFromAdapter

func NewOutboxStoreFromAdapter(adapter *PostgresAdapter, opts ...OutboxStoreOption) *OutboxStore

NewOutboxStoreFromAdapter creates a new OutboxStore using an existing PostgresAdapter's connection.

func (*OutboxStore) Cleanup

func (s *OutboxStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)

Cleanup removes old completed messages.

func (*OutboxStore) Close

func (s *OutboxStore) Close() error

Close releases any resources (no-op as db is shared).

func (*OutboxStore) DB

func (s *OutboxStore) DB() *sql.DB

DB returns the underlying database connection (for testing).

func (*OutboxStore) FetchPending

func (s *OutboxStore) FetchPending(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)

FetchPending atomically claims up to limit pending messages for processing. Uses SELECT ... FOR UPDATE SKIP LOCKED to prevent double-claiming.

func (*OutboxStore) GetDeadLetterMessages

func (s *OutboxStore) GetDeadLetterMessages(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)

GetDeadLetterMessages retrieves dead-lettered messages.

func (*OutboxStore) Initialize

func (s *OutboxStore) Initialize(ctx context.Context) error

Initialize creates the outbox table if it doesn't exist.

func (*OutboxStore) MarkCompleted

func (s *OutboxStore) MarkCompleted(ctx context.Context, ids []string) error

MarkCompleted marks messages as successfully delivered.

func (*OutboxStore) MarkFailed

func (s *OutboxStore) MarkFailed(ctx context.Context, id string, lastErr error) error

MarkFailed marks a message as failed with an error description.

func (*OutboxStore) MoveToDeadLetter

func (s *OutboxStore) MoveToDeadLetter(ctx context.Context, maxAttempts int) (int64, error)

MoveToDeadLetter transitions messages that exceeded per-message max_attempts or global maxAttempts to dead letter.

func (*OutboxStore) RetryFailed

func (s *OutboxStore) RetryFailed(ctx context.Context, maxAttempts int) (int64, error)

RetryFailed resets eligible failed messages (below per-message max_attempts or global maxAttempts) to pending.

func (*OutboxStore) Schedule

func (s *OutboxStore) Schedule(ctx context.Context, messages []*adapters.OutboxMessage) error

Schedule stores outbox messages for later processing.

func (*OutboxStore) ScheduleInTx

func (s *OutboxStore) ScheduleInTx(ctx context.Context, tx interface{}, messages []*adapters.OutboxMessage) error

ScheduleInTx stores outbox messages within an existing database transaction.

type OutboxStoreOption

type OutboxStoreOption func(*OutboxStore)

OutboxStoreOption configures an OutboxStore.

func WithOutboxSchema

func WithOutboxSchema(schema string) OutboxStoreOption

WithOutboxSchema sets the PostgreSQL schema for the outbox table.

func WithOutboxTableName

func WithOutboxTableName(table string) OutboxStoreOption

WithOutboxTableName sets the table name for outbox records.

type PostgresAdapter

type PostgresAdapter struct {
	// contains filtered or unexported fields
}

PostgresAdapter is a PostgreSQL implementation of EventStoreAdapter.

func NewAdapter

func NewAdapter(connStr string, opts ...Option) (*PostgresAdapter, error)

NewAdapter creates a new PostgreSQL event store adapter.

func NewAdapterWithDB

func NewAdapterWithDB(db *sql.DB, opts ...Option) (*PostgresAdapter, error)

NewAdapterWithDB creates a new adapter with an existing database connection. Returns an error if the schema name is invalid.

func (*PostgresAdapter) Append

func (a *PostgresAdapter) Append(ctx context.Context, streamID string, events []adapters.EventRecord, expectedVersion int64) ([]adapters.StoredEvent, error)

Append stores events to the specified stream with optimistic concurrency control.

func (*PostgresAdapter) AppendWithOutbox

func (a *PostgresAdapter) AppendWithOutbox(ctx context.Context, streamID string, events []adapters.EventRecord, expectedVersion int64, outboxMessages []*adapters.OutboxMessage) ([]adapters.StoredEvent, error)

AppendWithOutbox atomically appends events and schedules outbox messages in a single transaction.

func (*PostgresAdapter) CheckSchema

func (a *PostgresAdapter) CheckSchema(ctx context.Context, tableName string) (*adapters.SchemaCheckResult, error)

CheckSchema verifies the event store schema exists.

func (*PostgresAdapter) Close

func (a *PostgresAdapter) Close() error

Close releases the database connection and stops health checking.

func (*PostgresAdapter) DB

func (a *PostgresAdapter) DB() *sql.DB

DB returns the underlying database connection.

func (*PostgresAdapter) DeleteCheckpoint

func (a *PostgresAdapter) DeleteCheckpoint(ctx context.Context, projectionName string) error

DeleteCheckpoint removes the checkpoint for a projection. This implements mink.CheckpointStore interface.

func (*PostgresAdapter) DeleteSnapshot

func (a *PostgresAdapter) DeleteSnapshot(ctx context.Context, streamID string) error

DeleteSnapshot removes the snapshot for the given stream.

func (*PostgresAdapter) ExecuteSQL

func (a *PostgresAdapter) ExecuteSQL(ctx context.Context, sql string) error

ExecuteSQL runs arbitrary SQL (for applying migrations).

func (*PostgresAdapter) GenerateSchema

func (a *PostgresAdapter) GenerateSchema(projectName, tableName, snapshotTableName, outboxTableName string) string

GenerateSchema returns the DDL for the PostgreSQL event store schema.

func (*PostgresAdapter) GetAllCheckpoints

func (a *PostgresAdapter) GetAllCheckpoints(ctx context.Context) (map[string]uint64, error)

GetAllCheckpoints returns checkpoints for all projections. This implements mink.CheckpointStore interface.

func (*PostgresAdapter) GetAppliedMigrations

func (a *PostgresAdapter) GetAppliedMigrations(ctx context.Context) ([]string, error)

GetAppliedMigrations returns the list of applied migration names.

func (*PostgresAdapter) GetCheckpoint

func (a *PostgresAdapter) GetCheckpoint(ctx context.Context, projectionName string) (uint64, error)

GetCheckpoint returns the last processed position for a projection.

func (*PostgresAdapter) GetDiagnosticInfo

func (a *PostgresAdapter) GetDiagnosticInfo(ctx context.Context) (*adapters.DiagnosticInfo, error)

GetDiagnosticInfo returns database version and connection status.

func (*PostgresAdapter) GetEventStoreStats

func (a *PostgresAdapter) GetEventStoreStats(ctx context.Context) (*adapters.EventStoreStats, error)

GetEventStoreStats returns aggregate statistics about the event store.

func (*PostgresAdapter) GetLastPosition

func (a *PostgresAdapter) GetLastPosition(ctx context.Context) (uint64, error)

GetLastPosition returns the global position of the last stored event.

func (*PostgresAdapter) GetProjection

func (a *PostgresAdapter) GetProjection(ctx context.Context, name string) (*adapters.ProjectionInfo, error)

GetProjection returns information about a specific projection.

func (*PostgresAdapter) GetProjectionHealth

func (a *PostgresAdapter) GetProjectionHealth(ctx context.Context) (*adapters.ProjectionHealthResult, error)

GetProjectionHealth returns projection health status.

func (*PostgresAdapter) GetStreamEvents

func (a *PostgresAdapter) GetStreamEvents(ctx context.Context, streamID string, fromVersion int64, limit int) ([]adapters.StoredEvent, error)

GetStreamEvents returns events from a stream with pagination for CLI display.

func (*PostgresAdapter) GetStreamInfo

func (a *PostgresAdapter) GetStreamInfo(ctx context.Context, streamID string) (*adapters.StreamInfo, error)

GetStreamInfo returns metadata about a stream.

func (*PostgresAdapter) GetTotalEventCount

func (a *PostgresAdapter) GetTotalEventCount(ctx context.Context) (int64, error)

GetTotalEventCount returns the highest global position.

func (*PostgresAdapter) Initialize

func (a *PostgresAdapter) Initialize(ctx context.Context) error

Initialize creates the required database schema and tables.

func (*PostgresAdapter) ListProjections

func (a *PostgresAdapter) ListProjections(ctx context.Context) ([]adapters.ProjectionInfo, error)

ListProjections returns all registered projections.

func (*PostgresAdapter) ListStreams

func (a *PostgresAdapter) ListStreams(ctx context.Context, prefix string, limit int) ([]adapters.StreamSummary, error)

ListStreams returns a list of stream summaries for CLI display.

func (*PostgresAdapter) Load

func (a *PostgresAdapter) Load(ctx context.Context, streamID string, fromVersion int64) ([]adapters.StoredEvent, error)

Load retrieves all events from a stream starting from the specified version.

func (*PostgresAdapter) LoadFromPosition

func (a *PostgresAdapter) LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]adapters.StoredEvent, error)

LoadFromPosition loads events starting from a global position. This is used by projection engines to catch up on historical events.

func (*PostgresAdapter) LoadSnapshot

func (a *PostgresAdapter) LoadSnapshot(ctx context.Context, streamID string) (*adapters.SnapshotRecord, error)

LoadSnapshot retrieves the latest snapshot for the given stream (SnapshotAdapter implementation).

func (*PostgresAdapter) Migrate

func (a *PostgresAdapter) Migrate(ctx context.Context) error

Migrate runs database migrations.

func (*PostgresAdapter) MigrationVersion

func (a *PostgresAdapter) MigrationVersion(ctx context.Context) (int, error)

MigrationVersion returns the current migration version.

func (*PostgresAdapter) Ping

func (a *PostgresAdapter) Ping(ctx context.Context) error

Ping checks database connectivity.

func (*PostgresAdapter) RecordMigration

func (a *PostgresAdapter) RecordMigration(ctx context.Context, name string) error

RecordMigration marks a migration as applied.

func (*PostgresAdapter) RemoveMigrationRecord

func (a *PostgresAdapter) RemoveMigrationRecord(ctx context.Context, name string) error

RemoveMigrationRecord removes a migration record (for rollback).

func (*PostgresAdapter) ResetProjectionCheckpoint

func (a *PostgresAdapter) ResetProjectionCheckpoint(ctx context.Context, name string) error

ResetProjectionCheckpoint resets a projection's position to 0 for rebuild.

func (*PostgresAdapter) SaveSnapshot

func (a *PostgresAdapter) SaveSnapshot(ctx context.Context, streamID string, version int64, data []byte) error

SaveSnapshot stores a snapshot for the given stream.

func (*PostgresAdapter) Schema

func (a *PostgresAdapter) Schema() string

Schema returns the schema name.

func (*PostgresAdapter) SetCheckpoint

func (a *PostgresAdapter) SetCheckpoint(ctx context.Context, projectionName string, position uint64) error

SetCheckpoint stores the last processed position for a projection.

func (*PostgresAdapter) SetProjectionStatus

func (a *PostgresAdapter) SetProjectionStatus(ctx context.Context, name string, status string) error

SetProjectionStatus updates a projection's status.

func (*PostgresAdapter) Stats

func (a *PostgresAdapter) Stats() sql.DBStats

Stats returns connection pool statistics.

func (*PostgresAdapter) SubscribeAll

func (a *PostgresAdapter) SubscribeAll(ctx context.Context, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)

SubscribeAll subscribes to all events across all streams. Events are delivered starting from the specified global position. This uses polling-based subscription with continuous updates. Optional SubscriptionOptions can be provided to configure behavior.

func (*PostgresAdapter) SubscribeCategory

func (a *PostgresAdapter) SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)

SubscribeCategory subscribes to all events from streams in a category. Events are delivered starting from the specified global position with continuous polling. Optional SubscriptionOptions can be provided to configure behavior.

func (*PostgresAdapter) SubscribeStream

func (a *PostgresAdapter) SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)

SubscribeStream subscribes to events from a specific stream. Events are delivered starting from the specified version with continuous polling. Optional SubscriptionOptions can be provided to configure behavior.

type PostgresRepository

type PostgresRepository[T any] struct {
	// contains filtered or unexported fields
}

PostgresRepository provides a PostgreSQL implementation of ReadModelRepository. It supports automatic schema migration based on struct tags.

func NewPostgresRepository

func NewPostgresRepository[T any](db *sql.DB, opts ...ReadModelOption) (*PostgresRepository[T], error)

NewPostgresRepository creates a new PostgreSQL-backed repository for read models. The type T should be a struct with mink tags for column mapping.

Supported struct tags:

  • `mink:"column_name"` - Column name (default: snake_case of field name)
  • `mink:"-"` - Skip this field
  • `mink:"column_name,pk"` - Primary key
  • `mink:"column_name,index"` - Create index
  • `mink:"column_name,unique"` - Unique constraint
  • `mink:"column_name,nullable"` - Allow NULL values
  • `mink:"column_name,default=value"` - Default value

Example:

type OrderSummary struct {
    OrderID    string    `mink:"order_id,pk"`
    CustomerID string    `mink:"customer_id,index"`
    Status     string    `mink:"status"`
    Total      float64   `mink:"total_amount"`
    CreatedAt  time.Time `mink:"created_at"`
}

repo, err := postgres.NewPostgresRepository[OrderSummary](db,
    postgres.WithReadModelSchema("projections"),
    postgres.WithTableName("order_summaries"),
)

func (*PostgresRepository[T]) Clear

func (r *PostgresRepository[T]) Clear(ctx context.Context) error

Clear removes all read models.

func (*PostgresRepository[T]) Count

func (r *PostgresRepository[T]) Count(ctx context.Context, query mink.Query) (int64, error)

Count returns the number of read models matching the query.

func (*PostgresRepository[T]) Delete

func (r *PostgresRepository[T]) Delete(ctx context.Context, id string) error

Delete removes a read model by ID.

func (*PostgresRepository[T]) DeleteMany

func (r *PostgresRepository[T]) DeleteMany(ctx context.Context, query mink.Query) (int64, error)

DeleteMany removes all read models matching the query.

func (*PostgresRepository[T]) DropTable

func (r *PostgresRepository[T]) DropTable(ctx context.Context) error

DropTable removes the read model table (use with caution!).

func (*PostgresRepository[T]) Exists

func (r *PostgresRepository[T]) Exists(ctx context.Context, id string) (bool, error)

Exists checks if a read model with the given ID exists.

func (*PostgresRepository[T]) Find

func (r *PostgresRepository[T]) Find(ctx context.Context, query mink.Query) ([]*T, error)

Find queries read models with the given criteria.

func (*PostgresRepository[T]) FindOne

func (r *PostgresRepository[T]) FindOne(ctx context.Context, query mink.Query) (*T, error)

FindOne returns the first read model matching the query.

func (*PostgresRepository[T]) Get

func (r *PostgresRepository[T]) Get(ctx context.Context, id string) (*T, error)

Get retrieves a read model by ID.

func (*PostgresRepository[T]) GetAll

func (r *PostgresRepository[T]) GetAll(ctx context.Context) ([]*T, error)

GetAll returns all read models in the repository.

func (*PostgresRepository[T]) GetMany

func (r *PostgresRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)

GetMany retrieves multiple read models by their IDs.

func (*PostgresRepository[T]) Insert

func (r *PostgresRepository[T]) Insert(ctx context.Context, model *T) error

Insert creates a new read model.

func (*PostgresRepository[T]) Migrate

func (r *PostgresRepository[T]) Migrate(ctx context.Context) error

Migrate creates or updates the table schema.

func (*PostgresRepository[T]) Schema

func (r *PostgresRepository[T]) Schema() string

Schema returns the PostgreSQL schema name.

func (*PostgresRepository[T]) TableName

func (r *PostgresRepository[T]) TableName() string

TableName returns the fully qualified table name.

func (*PostgresRepository[T]) Update

func (r *PostgresRepository[T]) Update(ctx context.Context, id string, updateFn func(*T)) error

Update modifies an existing read model.

func (*PostgresRepository[T]) Upsert

func (r *PostgresRepository[T]) Upsert(ctx context.Context, model *T) error

Upsert creates or updates a read model.

func (*PostgresRepository[T]) WithTx

func (r *PostgresRepository[T]) WithTx(tx *sql.Tx) *TxRepository[T]

WithTx creates a new repository instance that uses the provided transaction.

type ReadModelOption

type ReadModelOption func(*readModelConfig)

ReadModelOption configures the PostgresRepository.

func WithAutoMigrate

func WithAutoMigrate(enabled bool) ReadModelOption

WithAutoMigrate enables automatic table creation and migration. Default is true.

func WithIDField

func WithIDField(field string) ReadModelOption

WithIDField sets the field name used as the primary key. Default is "ID".

func WithReadModelSchema

func WithReadModelSchema(schema string) ReadModelOption

WithReadModelSchema sets the PostgreSQL schema for the read model table.

func WithTableName

func WithTableName(name string) ReadModelOption

WithTableName sets the table name for the read model.

type SagaStore

type SagaStore struct {
	// contains filtered or unexported fields
}

SagaStore provides a PostgreSQL implementation of mink.SagaStore.

func NewSagaStore

func NewSagaStore(db *sql.DB, opts ...SagaStoreOption) *SagaStore

NewSagaStore creates a new PostgreSQL SagaStore.

func NewSagaStoreFromAdapter

func NewSagaStoreFromAdapter(adapter *PostgresAdapter, opts ...SagaStoreOption) *SagaStore

NewSagaStoreFromAdapter creates a new SagaStore using an existing PostgresAdapter's connection.

func (*SagaStore) Cleanup

func (s *SagaStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)

Cleanup removes expired or old completed sagas.

func (*SagaStore) Close

func (s *SagaStore) Close() error

Close releases any resources (no-op for this implementation as db is shared).

func (*SagaStore) CountByStatus

func (s *SagaStore) CountByStatus(ctx context.Context) (map[mink.SagaStatus]int64, error)

CountByStatus returns the count of sagas by status.

func (*SagaStore) Delete

func (s *SagaStore) Delete(ctx context.Context, sagaID string) error

Delete removes a saga state.

func (*SagaStore) FindByCorrelationID

func (s *SagaStore) FindByCorrelationID(ctx context.Context, correlationID string) (*mink.SagaState, error)

FindByCorrelationID finds a saga by its correlation ID.

func (*SagaStore) FindByType

func (s *SagaStore) FindByType(ctx context.Context, sagaType string, statuses ...mink.SagaStatus) ([]*mink.SagaState, error)

FindByType finds all sagas of a given type with the specified statuses.

func (*SagaStore) Initialize

func (s *SagaStore) Initialize(ctx context.Context) error

Initialize creates the saga table if it doesn't exist.

func (*SagaStore) Load

func (s *SagaStore) Load(ctx context.Context, sagaID string) (*mink.SagaState, error)

Load retrieves a saga state by ID.

func (*SagaStore) Save

func (s *SagaStore) Save(ctx context.Context, state *mink.SagaState) error

Save persists a saga state with optimistic concurrency control.

Version semantics:

  • Version 0: Creates a new saga. Uses INSERT with version=1.
  • Version > 0: Updates an existing saga. Uses UPDATE with version check. If the version doesn't match, returns ErrConcurrencyConflict.

The version is incremented atomically by the database (version = version + 1) and returned via RETURNING clause. After a successful save, state.Version is updated with the new version from the database.

type SagaStoreOption

type SagaStoreOption func(*SagaStore)

SagaStoreOption configures a SagaStore.

func WithSagaSchema

func WithSagaSchema(schema string) SagaStoreOption

WithSagaSchema sets the PostgreSQL schema for the saga table.

func WithSagaTable

func WithSagaTable(table string) SagaStoreOption

WithSagaTable sets the table name for saga records.

type StreamNotFoundError

type StreamNotFoundError = adapters.StreamNotFoundError

StreamNotFoundError is an alias for adapters.StreamNotFoundError for backward compatibility.

type TableSchema

type TableSchema struct {
	TableName string
	Schema    string
	Columns   []ColumnType
	Indexes   []IndexDef
}

TableSchema represents the schema of a read model table.

type TxRepository

type TxRepository[T any] struct {
	// contains filtered or unexported fields
}

TxRepository wraps PostgresRepository for transaction support. It implements all ReadModelRepository methods within a transaction context.

func (*TxRepository[T]) Clear

func (tr *TxRepository[T]) Clear(ctx context.Context) error

Clear removes all read models within a transaction.

func (*TxRepository[T]) Count

func (tr *TxRepository[T]) Count(ctx context.Context, query mink.Query) (int64, error)

Count returns the number of read models matching the query within a transaction.

func (*TxRepository[T]) Delete

func (tr *TxRepository[T]) Delete(ctx context.Context, id string) error

Delete removes a read model within a transaction.

func (*TxRepository[T]) DeleteMany

func (tr *TxRepository[T]) DeleteMany(ctx context.Context, query mink.Query) (int64, error)

DeleteMany removes all read models matching the query within a transaction.

func (*TxRepository[T]) Exists

func (tr *TxRepository[T]) Exists(ctx context.Context, id string) (bool, error)

Exists checks if a read model with the given ID exists within a transaction.

func (*TxRepository[T]) Find

func (tr *TxRepository[T]) Find(ctx context.Context, query mink.Query) ([]*T, error)

Find queries read models with the given criteria within a transaction.

func (*TxRepository[T]) FindOne

func (tr *TxRepository[T]) FindOne(ctx context.Context, query mink.Query) (*T, error)

FindOne returns the first read model matching the query within a transaction.

func (*TxRepository[T]) Get

func (tr *TxRepository[T]) Get(ctx context.Context, id string) (*T, error)

Get retrieves a read model by ID within a transaction.

func (*TxRepository[T]) GetAll

func (tr *TxRepository[T]) GetAll(ctx context.Context) ([]*T, error)

GetAll returns all read models in the repository within a transaction.

func (*TxRepository[T]) GetMany

func (tr *TxRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)

GetMany retrieves multiple read models by their IDs within a transaction.

func (*TxRepository[T]) Insert

func (tr *TxRepository[T]) Insert(ctx context.Context, model *T) error

Insert creates a new read model within a transaction.

func (*TxRepository[T]) Update

func (tr *TxRepository[T]) Update(ctx context.Context, id string, updateFn func(*T)) error

Update modifies an existing read model within a transaction.

func (*TxRepository[T]) Upsert

func (tr *TxRepository[T]) Upsert(ctx context.Context, model *T) error

Upsert creates or updates a read model within a transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL