postgres

package
v0.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package postgres provides a PostgreSQL implementation of outbox.Store using database/sql. Use it with any PostgreSQL-compatible driver (lib/pq, jackc/pgx/v5/stdlib, etc.).

Index

Constants

View Source
const DefaultProcessedTable = "processed_messages"

DefaultProcessedTable is the default table name used by ProcessedStore when none is provided.

Variables

This section is empty.

Functions

func CreateProcessedTableSQL added in v0.11.1

func CreateProcessedTableSQL(tableName string) string

CreateProcessedTableSQL returns the PostgreSQL DDL for creating the processed_messages dedup table used by outbox.ProcessedStore.

metadata is JSONB so callers can attach handler-specific context (printed printer names, config snapshots, audit trails) without further migrations.

func CreateTableSQL

func CreateTableSQL(tableName string) string

CreateTableSQL returns the PostgreSQL DDL for creating the outbox messages table.

func Migrate

func Migrate(ctx context.Context, db osql.Execer, tableName string) error

Migrate creates the outbox messages table if it does not exist. db can be a *sql.DB, *sql.Tx, *sql.Conn or any other type implementing osql.Execer. tableName defaults to "outbox_messages" when empty (matches sql.DefaultConfig and MigrateProcessed's symmetric default).

Boot-fast path: when db also satisfies osql.DBTX (it does for *sql.DB / *sql.Tx / *sql.Conn), a single to_regclass($1) probe checks whether the canonical table already exists. The DDL itself is idempotent but CREATE ... IF NOT EXISTS still does catalog work + pg_class locking on every call, which adds up across the libraries that run on every process start. to_regclass is a plain catalog read with no locking.

Schema-monotonicity assumption: if the table exists we skip every statement, so new indexes/columns added in a later release will not be applied to existing installs by this function. When the schema evolves, ship the change through a separately named migration call (e.g. MigrateV2) and invoke both at startup.

func MigrateProcessed added in v0.11.1

func MigrateProcessed(ctx context.Context, db osql.Execer, tableName string) error

MigrateProcessed creates the processed_messages table if it does not exist. db can be a *sql.DB, *sql.Tx, *sql.Conn or any other type implementing osql.Execer. tableName defaults to DefaultProcessedTable when empty. Shares the to_regclass fast-path documented on Migrate; the same monotonic-schema caveat applies.

Types

type ProcessedStore added in v0.11.1

type ProcessedStore struct {
	// contains filtered or unexported fields
}

ProcessedStore implements outbox.ProcessedStore for PostgreSQL.

Unlike Store, ProcessedStore does not hold a *sql.DB — every method takes the db/tx handle so callers compose with *sql.Tx for transactional exactly-once or pass *sql.DB for the cleanup loop.

func NewProcessed added in v0.11.1

func NewProcessed(tableName string) *ProcessedStore

NewProcessed creates a ProcessedStore for PostgreSQL. tableName defaults to DefaultProcessedTable when empty.

Pass an unqualified name; set the schema via the connection's search_path. See sql.WithTableName for details.

func (*ProcessedStore) IsProcessed added in v0.11.1

func (s *ProcessedStore) IsProcessed(ctx context.Context, db any, group, msgID string) (bool, error)

IsProcessed implements outbox.ProcessedStore. db must satisfy osql.DBTX (e.g. *sql.DB or *sql.Tx).

func (*ProcessedStore) MarkProcessed added in v0.11.1

func (s *ProcessedStore) MarkProcessed(ctx context.Context, db any, group, msgID string, metadata json.RawMessage) (bool, error)

MarkProcessed implements outbox.ProcessedStore. Non-empty metadata must be valid JSON; invalid input is rejected before the INSERT runs.

func (*ProcessedStore) PurgeProcessedBefore added in v0.11.1

func (s *ProcessedStore) PurgeProcessedBefore(ctx context.Context, db any, t time.Time) (int64, error)

PurgeProcessedBefore implements outbox.ProcessedStore.

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store implements outbox.Store for PostgreSQL.

func New

func New(db *stdsql.DB, options ...osql.Option) *Store

New creates a new PostgreSQL outbox store backed by *sql.DB.

func (*Store) DeleteProcessedBefore added in v0.11.1

func (s *Store) DeleteProcessedBefore(ctx context.Context, t time.Time) (int64, error)

DeleteProcessedBefore removes processed messages older than t.

func (*Store) FetchPending

func (s *Store) FetchPending(ctx context.Context, limit int) ([]outbox.Message, error)

FetchPending atomically claims up to limit pending messages for processing using FOR UPDATE SKIP LOCKED so concurrent workers don't collide.

func (*Store) MarkFailed

func (s *Store) MarkFailed(ctx context.Context, id string, err error) error

MarkFailed increments retries and records the error. Sets status to failed when max retries reached, else back to pending.

func (*Store) MarkProcessed

func (s *Store) MarkProcessed(ctx context.Context, ids ...string) error

MarkProcessed marks messages as successfully processed.

func (*Store) Save

func (s *Store) Save(ctx context.Context, tx any, msgs ...outbox.Message) error

Save persists messages within the given transaction. tx may be a *sql.Tx, or nil to have the Store create its own transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL