postgres

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package postgres is the Postgres/GORM implementation of outbox.Repository. Producer services wire this; the dispatcher package consumes the interface, not the concrete type.

Schema: each producer service ships a migration creating its local outbox table (one per service so write contention stays bounded). The canonical shape is documented in outbox/postgres/schema.sql — copy it verbatim and adjust schema/table names. The table name is passed to New() so a service can scope its outbox to its own schema (e.g. workspace.outbox, accounting.outbox).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

Repository is the gorm-backed implementation. Construct one per outbox table.

func New

func New(db *gormdb.DBClient, table string) *Repository

New returns a Repository scoped to a fully-qualified table name like "workspace.outbox" or "accounting.outbox". The table is expected to match the shape in schema.sql.

func (*Repository) Claim

func (r *Repository) Claim(ctx context.Context, batch int) ([]outbox.Message, error)

Claim grabs up to `batch` pending rows whose retry_at has elapsed. FOR UPDATE SKIP LOCKED ensures concurrent drainer replicas see disjoint batches. The claim is one transaction (SELECT + UPDATE); the caller then processes each message and calls MarkDone / MarkFailed separately. The attempts++ inside the claim doubles as a "claimed" marker — a crashed drainer's rows sit until retry_at elapses, then naturally back off via the Dispatcher's backoff math.

func (*Repository) Enqueue

func (r *Repository) Enqueue(ctx context.Context, drafts ...outbox.MessageDraft) error

Enqueue inserts the drafts using whatever *gorm.DB sits on the ctx — kit's Transactioner stores the txn handle there. Inside a tx, the outbox INSERT commits atomically with the producer's writes.

func (*Repository) MarkDone

func (r *Repository) MarkDone(ctx context.Context, id string) error

MarkDone deletes the row. Keeping done rows around inflates the table and slows Claim's WHERE; for audit-style traceability use the audit service, not the outbox itself.

func (*Repository) MarkFailed

func (r *Repository) MarkFailed(ctx context.Context, id string, handlerErr error, retryAt time.Time) error

MarkFailed updates retry_at + last_error. The Dispatcher computes retryAt (exponential backoff) and decides when to dead-letter; this repo just persists what it's told.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL