Documentation
¶
Overview ¶
Package outbox implements the transactional outbox pattern for reliably publishing domain events to a message broker. A domain write and the events it emits are inserted in ONE database transaction (WithinTran); a background Relay then drains pending events to the broker with at-least-once delivery, so an event is never lost if the broker is briefly down and never published without its domain write committing.
The pieces map onto the worker package:
- Relay = worker.Processor[Event]: LeasePending (Source) -> Publish (Handler) -> MarkSent/MarkFailed (Sink), run as a worker.Loop.
- Sweeper = worker.Loop reclaiming leases abandoned by crashed relays.
- Cleaner = worker.Loop deleting terminal rows past a retention window.
Event lifecycle (FSM):
Insert │ ▼ pending ──Lease──► in_flight ──MarkSent──► sent (terminal) ▲ │ │ MarkFailed/Sweep │ MarkFailed (attempts >= max) └──────────────────┴──────────► failed (terminal)
All timestamps are computed Go-side (time.Now().UTC()) and bound as query parameters — no NOW()/CURRENT_TIMESTAMP in SQL — so the clock is injectable.
Index ¶
- Constants
- Variables
- func NewCleaner(log *logger.Logger, store Store, cfg CleanerConfig) *worker.Loop
- func NewRelay(log *logger.Logger, store Store, pub broker.Publisher, cfg RelayConfig) *worker.Loop
- func NewSweeper(log *logger.Logger, store Store, cfg SweeperConfig) *worker.Loop
- func Schema(table string) string
- func WithinTran(ctx context.Context, log *logger.Logger, db *sqlx.DB, store Store, fn TxFunc) error
- type CleanerConfig
- type Event
- type Options
- type PG
- func (s *PG) Cleanup(ctx context.Context, retention time.Duration, now time.Time) (int64, error)
- func (s *PG) EnsureSchema(ctx context.Context) error
- func (s *PG) Insert(ctx context.Context, events ...Event) error
- func (s *PG) LeasePending(ctx context.Context, now time.Time, limit int) ([]Event, error)
- func (s *PG) MarkFailed(ctx context.Context, ev Event, leaseID uuid.UUID, errMsg string, now time.Time) error
- func (s *PG) MarkSent(ctx context.Context, ev Event, leaseID uuid.UUID, now time.Time) error
- func (s *PG) SweepExpiredLeases(ctx context.Context, leaseTimeout time.Duration, now time.Time, limit int) (int64, error)
- func (s *PG) WithTx(tx sqlx.ExtContext) Store
- type RelayConfig
- type Store
- type SweeperConfig
- type TxFunc
Constants ¶
const ( StatusPending = "pending" // awaits the relay StatusInFlight = "in_flight" // leased by a relay, publish in progress StatusSent = "sent" // terminal: published successfully StatusFailed = "failed" // terminal: retry budget exhausted )
Event statuses stored in the status column.
const DefaultMaxAttempts = 10
DefaultMaxAttempts is the per-row retry budget assigned by NewEvent.
Variables ¶
var ErrLeaseLost = errors.New("outbox: lease lost")
ErrLeaseLost is returned by MarkSent/MarkFailed when the row's lease no longer matches the caller's lease id — the sweeper or another relay reclaimed it. The mark is a no-op; the row will be reprocessed under its new lease.
Functions ¶
func NewCleaner ¶
NewCleaner builds the retention worker as a worker.Loop. It deletes terminal rows past the retention window to keep the table small.
func NewRelay ¶
NewRelay builds the relay as a worker.Loop. It wires the outbox FSM onto a worker.Processor: LeasePending is the Source, the broker.Publisher is the Handler, and MarkSent/MarkFailed are the Sink. Run several relays (here via worker.Group, or across replicas) for throughput — SKIP LOCKED splits work.
Delivery is at-least-once: if the process dies after a successful publish but before MarkSent, the sweeper returns the row to pending and the relay republishes it. Consumers dedupe on the CloudEvents id.
func NewSweeper ¶
NewSweeper builds the lease-reclaim worker as a worker.Loop. It returns in_flight rows abandoned by a crashed relay back to pending.
func WithinTran ¶
WithinTran runs fn inside one SQL transaction and, when fn returns a non-empty event slice, inserts those events in the SAME transaction via store.WithTx. Domain writes and the outbox insert therefore commit atomically — the core guarantee of the transactional outbox: an event is persisted if and only if the domain change that produced it commits.
This is a thin wrapper over sqldb.WithinTran; callers never thread tx through their event-building code.
Types ¶
type CleanerConfig ¶
type CleanerConfig struct {
Name string // default "outbox-cleaner"
// Interval between cleanups (default 1h).
Interval time.Duration
// Retention: sent/failed rows older than this are deleted (default 24h).
Retention time.Duration
}
CleanerConfig configures the Cleaner worker.
type Event ¶
type Event struct {
ID uuid.UUID
Type string
ContentType string
Exchange string
RoutingKey string
Payload []byte
Headers []byte // JSON object of transport headers ("{}" when none)
Status string
Attempts int
MaxAttempts int
LastError string
NextAttemptAt time.Time
CreatedAt time.Time
SentAt time.Time
LeasedAt time.Time
LeaseID uuid.UUID
}
Event is an outbox record as seen by callers — a pure core type with no db tags or Null wrappers (the PG store owns that mapping). Payload is the business data only; the broker.Publisher wraps it in a CloudEvents envelope.
func NewEvent ¶
func NewEvent(eventType, exchange, routingKey, contentType string, payload []byte, headers map[string]any) (Event, error)
NewEvent builds a pending Event ready for Insert, stamping a fresh id, UTC timestamps, attempts=0, and max_attempts=DefaultMaxAttempts. headers may be nil (normalized to "{}"). payload must already be serialized by the caller.
type Options ¶
type Options struct {
// Table is the backing table name (default "outbox_events").
Table string
// Backoff schedules retries (next_attempt_at) for failed-but-retryable
// events. Defaults to base 2s, factor 2, max 5m.
Backoff worker.Backoff
}
Options configures a PG store.
type PG ¶
type PG struct {
// contains filtered or unexported fields
}
PG is the Postgres-backed outbox Store. It is safe for concurrent use by multiple relay/sweeper replicas; LeasePending and SweepExpiredLeases claim rows with FOR UPDATE SKIP LOCKED.
func NewPG ¶
NewPG builds a Postgres outbox store. The table is created by a migration (or EnsureSchema in tests), not here.
func (*PG) EnsureSchema ¶
EnsureSchema creates the outbox table and indexes if absent.
func (*PG) LeasePending ¶
LeasePending claims up to limit ready events.
func (*PG) MarkFailed ¶
func (s *PG) MarkFailed(ctx context.Context, ev Event, leaseID uuid.UUID, errMsg string, now time.Time) error
MarkFailed transitions in_flight -> pending (with backoff) or -> failed.
type RelayConfig ¶
type RelayConfig struct {
// Name labels the worker (default "outbox-relay").
Name string
// PollInterval is the lease tick rate (default 2s). The relay also runs an
// immediate first tick so a fresh process drains a backlog without waiting.
PollInterval time.Duration
// BatchSize is the max events leased per tick (default 100).
BatchSize int
// PublishTimeout bounds each publish + its follow-up mark (default 5s).
PublishTimeout time.Duration
}
RelayConfig configures the Relay worker.
type Store ¶
type Store interface {
// WithTx returns a sibling Store whose queries run on tx. Used by WithinTran
// to insert events in the caller's domain transaction.
WithTx(tx sqlx.ExtContext) Store
// Insert writes pending events. Bound to a transaction when the Store was
// derived via WithTx.
Insert(ctx context.Context, events ...Event) error
// LeasePending atomically claims up to limit pending events whose
// next_attempt_at <= now, marks them in_flight with a fresh lease id, and
// returns them (FOR UPDATE SKIP LOCKED, so relay replicas split work).
LeasePending(ctx context.Context, now time.Time, limit int) ([]Event, error)
// MarkSent transitions in_flight -> sent. Guarded by leaseID; returns
// ErrLeaseLost on a lease mismatch.
MarkSent(ctx context.Context, ev Event, leaseID uuid.UUID, now time.Time) error
// MarkFailed transitions in_flight -> pending (attempts+1 < max, with
// next_attempt_at advanced by backoff) or -> failed (attempts+1 >= max).
// Guarded by leaseID; returns ErrLeaseLost on a lease mismatch.
MarkFailed(ctx context.Context, ev Event, leaseID uuid.UUID, errMsg string, now time.Time) error
// SweepExpiredLeases returns in_flight rows whose lease is older than
// leaseTimeout back to pending (a crashed-relay signal — attempts is NOT
// incremented). Returns the number reclaimed.
SweepExpiredLeases(ctx context.Context, leaseTimeout time.Duration, now time.Time, limit int) (int64, error)
// Cleanup deletes sent/failed rows created before now-retention. Returns the
// number of rows deleted.
Cleanup(ctx context.Context, retention time.Duration, now time.Time) (int64, error)
}
Store is the persistence port for the outbox table. The Postgres implementation is *PG.
type SweeperConfig ¶
type SweeperConfig struct {
Name string // default "outbox-sweeper"
// Interval between sweeps (default 30s).
Interval time.Duration
// LeaseTimeout: in_flight rows leased longer than this are reclaimed
// (default 1m). Set comfortably above RelayConfig.PublishTimeout.
LeaseTimeout time.Duration
// BatchSize bounds rows reclaimed per sweep (default 100).
BatchSize int
}
SweeperConfig configures the Sweeper worker.