Documentation
¶
Overview ¶
Package postgres provides a PostgreSQL implementation of outbox.Store using database/sql. Use it with any PostgreSQL-compatible driver (lib/pq, jackc/pgx/v5/stdlib, etc.).
Index ¶
- Constants
- func CreateProcessedTableSQL(tableName string) string
- func CreateTableSQL(tableName string) string
- func Migrate(ctx context.Context, db osql.Execer, tableName string) error
- func MigrateProcessed(ctx context.Context, db osql.Execer, tableName string) error
- type ProcessedStore
- func (s *ProcessedStore) IsProcessed(ctx context.Context, db any, group, msgID string) (bool, error)
- func (s *ProcessedStore) MarkProcessed(ctx context.Context, db any, group, msgID string, metadata json.RawMessage) (bool, error)
- func (s *ProcessedStore) PurgeProcessedBefore(ctx context.Context, db any, t time.Time) (int64, error)
- type Store
- func (s *Store) DeleteProcessedBefore(ctx context.Context, t time.Time) (int64, error)
- func (s *Store) FetchPending(ctx context.Context, limit int) ([]outbox.Message, error)
- func (s *Store) MarkFailed(ctx context.Context, id string, err error) error
- func (s *Store) MarkProcessed(ctx context.Context, ids ...string) error
- func (s *Store) Save(ctx context.Context, tx any, msgs ...outbox.Message) error
Constants ¶
const DefaultProcessedTable = "processed_messages"
DefaultProcessedTable is the default table name used by ProcessedStore when none is provided.
Variables ¶
This section is empty.
Functions ¶
func CreateProcessedTableSQL ¶ added in v0.11.1
CreateProcessedTableSQL returns the PostgreSQL DDL for creating the processed_messages dedup table used by outbox.ProcessedStore.
metadata is JSONB so callers can attach handler-specific context (printed printer names, config snapshots, audit trails) without further migrations.
func CreateTableSQL ¶
CreateTableSQL returns the PostgreSQL DDL for creating the outbox messages table.
func Migrate ¶
Migrate creates the outbox messages table if it does not exist. db can be a *sql.DB, *sql.Tx, *sql.Conn or any other type implementing osql.Execer. tableName defaults to "outbox_messages" when empty (matches sql.DefaultConfig and MigrateProcessed's symmetric default).
Boot-fast path: when db also satisfies osql.DBTX (it does for *sql.DB / *sql.Tx / *sql.Conn), a single to_regclass($1) probe checks whether the canonical table already exists. The DDL itself is idempotent but CREATE ... IF NOT EXISTS still does catalog work + pg_class locking on every call, which adds up across the libraries that run on every process start. to_regclass is a plain catalog read with no locking.
Schema-monotonicity assumption: if the table exists we skip every statement, so new indexes/columns added in a later release will not be applied to existing installs by this function. When the schema evolves, ship the change through a separately named migration call (e.g. MigrateV2) and invoke both at startup.
func MigrateProcessed ¶ added in v0.11.1
MigrateProcessed creates the processed_messages table if it does not exist. db can be a *sql.DB, *sql.Tx, *sql.Conn or any other type implementing osql.Execer. tableName defaults to DefaultProcessedTable when empty. Shares the to_regclass fast-path documented on Migrate; the same monotonic-schema caveat applies.
Types ¶
type ProcessedStore ¶ added in v0.11.1
type ProcessedStore struct {
// contains filtered or unexported fields
}
ProcessedStore implements outbox.ProcessedStore for PostgreSQL.
Unlike Store, ProcessedStore does not hold a *sql.DB — every method takes the db/tx handle so callers compose with *sql.Tx for transactional exactly-once or pass *sql.DB for the cleanup loop.
func NewProcessed ¶ added in v0.11.1
func NewProcessed(tableName string) *ProcessedStore
NewProcessed creates a ProcessedStore for PostgreSQL. tableName defaults to DefaultProcessedTable when empty.
Pass an unqualified name; set the schema via the connection's search_path. See sql.WithTableName for details.
func (*ProcessedStore) IsProcessed ¶ added in v0.11.1
func (s *ProcessedStore) IsProcessed(ctx context.Context, db any, group, msgID string) (bool, error)
IsProcessed implements outbox.ProcessedStore. db must satisfy osql.DBTX (e.g. *sql.DB or *sql.Tx).
func (*ProcessedStore) MarkProcessed ¶ added in v0.11.1
func (s *ProcessedStore) MarkProcessed(ctx context.Context, db any, group, msgID string, metadata json.RawMessage) (bool, error)
MarkProcessed implements outbox.ProcessedStore. Non-empty metadata must be valid JSON; invalid input is rejected before the INSERT runs.
func (*ProcessedStore) PurgeProcessedBefore ¶ added in v0.11.1
func (s *ProcessedStore) PurgeProcessedBefore(ctx context.Context, db any, t time.Time) (int64, error)
PurgeProcessedBefore implements outbox.ProcessedStore.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store implements outbox.Store for PostgreSQL.
func (*Store) DeleteProcessedBefore ¶ added in v0.11.1
DeleteProcessedBefore removes processed messages older than t.
func (*Store) FetchPending ¶
FetchPending atomically claims up to limit pending messages for processing using FOR UPDATE SKIP LOCKED so concurrent workers don't collide.
func (*Store) MarkFailed ¶
MarkFailed increments retries and records the error. Sets status to failed when max retries reached, else back to pending.
func (*Store) MarkProcessed ¶
MarkProcessed marks messages as successfully processed.