outbox

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package outbox implements the transactional outbox pattern for reliably publishing domain events to a message broker. A domain write and the events it emits are inserted in ONE database transaction (WithinTran); a background Relay then drains pending events to the broker with at-least-once delivery, so an event is never lost if the broker is briefly down and never published without its domain write committing.

The pieces map onto the worker package:

  • Relay = worker.Processor[Event]: LeasePending (Source) -> Publish (Handler) -> MarkSent/MarkFailed (Sink), run as a worker.Loop.
  • Sweeper = worker.Loop reclaiming leases abandoned by crashed relays.
  • Cleaner = worker.Loop deleting terminal rows past a retention window.

Event lifecycle (FSM):

 Insert
   │
   ▼
pending ──Lease──► in_flight ──MarkSent──► sent     (terminal)
   ▲                  │
   │ MarkFailed/Sweep │ MarkFailed (attempts >= max)
   └──────────────────┴──────────► failed           (terminal)

All timestamps are computed Go-side (time.Now().UTC()) and bound as query parameters — no NOW()/CURRENT_TIMESTAMP in SQL — so the clock is injectable.

Index

Constants

View Source
const (
	StatusPending  = "pending"   // awaits the relay
	StatusInFlight = "in_flight" // leased by a relay, publish in progress
	StatusSent     = "sent"      // terminal: published successfully
	StatusFailed   = "failed"    // terminal: retry budget exhausted
)

Event statuses stored in the status column.

View Source
const DefaultMaxAttempts = 10

DefaultMaxAttempts is the per-row retry budget assigned by NewEvent.

Variables

View Source
var ErrLeaseLost = errors.New("outbox: lease lost")

ErrLeaseLost is returned by MarkSent/MarkFailed when the row's lease no longer matches the caller's lease id — the sweeper or another relay reclaimed it. The mark is a no-op; the row will be reprocessed under its new lease.

Functions

func NewCleaner

func NewCleaner(log *logger.Logger, store Store, cfg CleanerConfig) *worker.Loop

NewCleaner builds the retention worker as a worker.Loop. It deletes terminal rows past the retention window to keep the table small.

func NewRelay

func NewRelay(log *logger.Logger, store Store, pub broker.Publisher, cfg RelayConfig) *worker.Loop

NewRelay builds the relay as a worker.Loop. It wires the outbox FSM onto a worker.Processor: LeasePending is the Source, the broker.Publisher is the Handler, and MarkSent/MarkFailed are the Sink. Run several relays (here via worker.Group, or across replicas) for throughput — SKIP LOCKED splits work.

Delivery is at-least-once: if the process dies after a successful publish but before MarkSent, the sweeper returns the row to pending and the relay republishes it. Consumers dedupe on the CloudEvents id.

func NewSweeper

func NewSweeper(log *logger.Logger, store Store, cfg SweeperConfig) *worker.Loop

NewSweeper builds the lease-reclaim worker as a worker.Loop. It returns in_flight rows abandoned by a crashed relay back to pending.

func Schema

func Schema(table string) string

Schema returns the DDL creating the outbox table and its pending-scan index.

func WithinTran

func WithinTran(ctx context.Context, log *logger.Logger, db *sqlx.DB, store Store, fn TxFunc) error

WithinTran runs fn inside one SQL transaction and, when fn returns a non-empty event slice, inserts those events in the SAME transaction via store.WithTx. Domain writes and the outbox insert therefore commit atomically — the core guarantee of the transactional outbox: an event is persisted if and only if the domain change that produced it commits.

This is a thin wrapper over sqldb.WithinTran; callers never thread tx through their event-building code.

Types

type CleanerConfig

type CleanerConfig struct {
	Name string // default "outbox-cleaner"
	// Interval between cleanups (default 1h).
	Interval time.Duration
	// Retention: sent/failed rows older than this are deleted (default 24h).
	Retention time.Duration
}

CleanerConfig configures the Cleaner worker.

type Event

type Event struct {
	ID            uuid.UUID
	Type          string
	ContentType   string
	Exchange      string
	RoutingKey    string
	Payload       []byte
	Headers       []byte // JSON object of transport headers ("{}" when none)
	Status        string
	Attempts      int
	MaxAttempts   int
	LastError     string
	NextAttemptAt time.Time
	CreatedAt     time.Time
	SentAt        time.Time
	LeasedAt      time.Time
	LeaseID       uuid.UUID
}

Event is an outbox record as seen by callers — a pure core type with no db tags or Null wrappers (the PG store owns that mapping). Payload is the business data only; the broker.Publisher wraps it in a CloudEvents envelope.

func NewEvent

func NewEvent(eventType, exchange, routingKey, contentType string, payload []byte, headers map[string]any) (Event, error)

NewEvent builds a pending Event ready for Insert, stamping a fresh id, UTC timestamps, attempts=0, and max_attempts=DefaultMaxAttempts. headers may be nil (normalized to "{}"). payload must already be serialized by the caller.

type Options

type Options struct {
	// Table is the backing table name (default "outbox_events").
	Table string
	// Backoff schedules retries (next_attempt_at) for failed-but-retryable
	// events. Defaults to base 2s, factor 2, max 5m.
	Backoff worker.Backoff
}

Options configures a PG store.

type PG

type PG struct {
	// contains filtered or unexported fields
}

PG is the Postgres-backed outbox Store. It is safe for concurrent use by multiple relay/sweeper replicas; LeasePending and SweepExpiredLeases claim rows with FOR UPDATE SKIP LOCKED.

func NewPG

func NewPG(log *logger.Logger, db *sqlx.DB, opts Options) *PG

NewPG builds a Postgres outbox store. The table is created by a migration (or EnsureSchema in tests), not here.

func (*PG) Cleanup

func (s *PG) Cleanup(ctx context.Context, retention time.Duration, now time.Time) (int64, error)

Cleanup deletes terminal rows past retention.

func (*PG) EnsureSchema

func (s *PG) EnsureSchema(ctx context.Context) error

EnsureSchema creates the outbox table and indexes if absent.

func (*PG) Insert

func (s *PG) Insert(ctx context.Context, events ...Event) error

Insert writes pending events.

func (*PG) LeasePending

func (s *PG) LeasePending(ctx context.Context, now time.Time, limit int) ([]Event, error)

LeasePending claims up to limit ready events.

func (*PG) MarkFailed

func (s *PG) MarkFailed(ctx context.Context, ev Event, leaseID uuid.UUID, errMsg string, now time.Time) error

MarkFailed transitions in_flight -> pending (with backoff) or -> failed.

func (*PG) MarkSent

func (s *PG) MarkSent(ctx context.Context, ev Event, leaseID uuid.UUID, now time.Time) error

MarkSent transitions in_flight -> sent.

func (*PG) SweepExpiredLeases

func (s *PG) SweepExpiredLeases(ctx context.Context, leaseTimeout time.Duration, now time.Time, limit int) (int64, error)

SweepExpiredLeases reclaims abandoned leases.

func (*PG) WithTx

func (s *PG) WithTx(tx sqlx.ExtContext) Store

WithTx returns a sibling store whose queries run on tx.

type RelayConfig

type RelayConfig struct {
	// Name labels the worker (default "outbox-relay").
	Name string
	// PollInterval is the lease tick rate (default 2s). The relay also runs an
	// immediate first tick so a fresh process drains a backlog without waiting.
	PollInterval time.Duration
	// BatchSize is the max events leased per tick (default 100).
	BatchSize int
	// PublishTimeout bounds each publish + its follow-up mark (default 5s).
	PublishTimeout time.Duration
}

RelayConfig configures the Relay worker.

type Store

type Store interface {
	// WithTx returns a sibling Store whose queries run on tx. Used by WithinTran
	// to insert events in the caller's domain transaction.
	WithTx(tx sqlx.ExtContext) Store

	// Insert writes pending events. Bound to a transaction when the Store was
	// derived via WithTx.
	Insert(ctx context.Context, events ...Event) error

	// LeasePending atomically claims up to limit pending events whose
	// next_attempt_at <= now, marks them in_flight with a fresh lease id, and
	// returns them (FOR UPDATE SKIP LOCKED, so relay replicas split work).
	LeasePending(ctx context.Context, now time.Time, limit int) ([]Event, error)

	// MarkSent transitions in_flight -> sent. Guarded by leaseID; returns
	// ErrLeaseLost on a lease mismatch.
	MarkSent(ctx context.Context, ev Event, leaseID uuid.UUID, now time.Time) error

	// MarkFailed transitions in_flight -> pending (attempts+1 < max, with
	// next_attempt_at advanced by backoff) or -> failed (attempts+1 >= max).
	// Guarded by leaseID; returns ErrLeaseLost on a lease mismatch.
	MarkFailed(ctx context.Context, ev Event, leaseID uuid.UUID, errMsg string, now time.Time) error

	// SweepExpiredLeases returns in_flight rows whose lease is older than
	// leaseTimeout back to pending (a crashed-relay signal — attempts is NOT
	// incremented). Returns the number reclaimed.
	SweepExpiredLeases(ctx context.Context, leaseTimeout time.Duration, now time.Time, limit int) (int64, error)

	// Cleanup deletes sent/failed rows created before now-retention. Returns the
	// number of rows deleted.
	Cleanup(ctx context.Context, retention time.Duration, now time.Time) (int64, error)
}

Store is the persistence port for the outbox table. The Postgres implementation is *PG.

type SweeperConfig

type SweeperConfig struct {
	Name string // default "outbox-sweeper"
	// Interval between sweeps (default 30s).
	Interval time.Duration
	// LeaseTimeout: in_flight rows leased longer than this are reclaimed
	// (default 1m). Set comfortably above RelayConfig.PublishTimeout.
	LeaseTimeout time.Duration
	// BatchSize bounds rows reclaimed per sweep (default 100).
	BatchSize int
}

SweeperConfig configures the Sweeper worker.

type TxFunc

type TxFunc func(tx *sqlx.Tx) ([]Event, error)

TxFunc is the caller's transactional closure. It receives the transaction handle for domain writes and returns the events to emit. Returning an error rolls back everything (domain writes and outbox insert together).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL