Documentation
¶
Overview ¶
Package postgres is the Postgres/GORM implementation of outbox.Repository. Producer services wire this; the dispatcher package consumes the interface, not the concrete type.
Schema: each producer service ships a migration creating its local outbox table (one per service so write contention stays bounded). The canonical shape is documented in outbox/postgres/schema.sql — copy it verbatim and adjust schema/table names. The table name is passed to New() so a service can scope its outbox to its own schema (e.g. workspace.outbox, accounting.outbox).
Index ¶
- type Repository
- func (r *Repository) Claim(ctx context.Context, batch int) ([]outbox.Message, error)
- func (r *Repository) Enqueue(ctx context.Context, drafts ...outbox.MessageDraft) error
- func (r *Repository) MarkDone(ctx context.Context, id string) error
- func (r *Repository) MarkFailed(ctx context.Context, id string, handlerErr error, retryAt time.Time) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Repository ¶
type Repository struct {
// contains filtered or unexported fields
}
Repository is the gorm-backed implementation. Construct one per outbox table.
func New ¶
func New(db *gormdb.DBClient, table string) *Repository
New returns a Repository scoped to a fully-qualified table name like "workspace.outbox" or "accounting.outbox". The table is expected to match the shape in schema.sql.
func (*Repository) Claim ¶
Claim grabs up to `batch` pending rows whose retry_at has elapsed. FOR UPDATE SKIP LOCKED ensures concurrent drainer replicas see disjoint batches. The claim is one transaction (SELECT + UPDATE); the caller then processes each message and calls MarkDone / MarkFailed separately. The attempts++ inside the claim doubles as a "claimed" marker — a crashed drainer's rows sit until retry_at elapses, then naturally back off via the Dispatcher's backoff math.
func (*Repository) Enqueue ¶
func (r *Repository) Enqueue(ctx context.Context, drafts ...outbox.MessageDraft) error
Enqueue inserts the drafts using whatever *gorm.DB sits on the ctx — kit's Transactioner stores the txn handle there. Inside a tx, the outbox INSERT commits atomically with the producer's writes.
func (*Repository) MarkDone ¶
func (r *Repository) MarkDone(ctx context.Context, id string) error
MarkDone deletes the row. Keeping done rows around inflates the table and slows Claim's WHERE; for audit-style traceability use the audit service, not the outbox itself.
func (*Repository) MarkFailed ¶
func (r *Repository) MarkFailed(ctx context.Context, id string, handlerErr error, retryAt time.Time) error
MarkFailed updates retry_at + last_error. The Dispatcher computes retryAt (exponential backoff) and decides when to dead-letter; this repo just persists what it's told.