outbox

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package outbox implements the transactional outbox pattern for reliably publishing domain events to a message broker.

A domain write and the events it emits are inserted in ONE database transaction (WithinTran); a background Relay then drains pending events to a broker.Publisher with at-least-once delivery. So an event is never lost if the broker is briefly down, and never published unless its domain write commits.

Design goals

The domain layer stays oblivious to persistence and transport. Producers publish a plain typed value through a tx-bound Publisher; which broker topic it lands on, and over which transport, are wiring concerns resolved by a Registry — never named in domain code. Routing is transport-neutral (Topic + optional Key), so the same event row can ship over RabbitMQ, Kafka, NATS, or a WebSocket gateway by swapping the broker.Publisher behind the Relay.

Components

The moving parts map onto the worker package:

  • Relay = worker.Processor[Event]: LeasePending (Source) -> broker.Publish (Handler) -> MarkSent/MarkFailed (Sink), run as a worker.Loop.
  • Sweeper = worker.Loop reclaiming leases abandoned by crashed relays.
  • Cleaner = worker.Loop deleting terminal rows past a retention window.

Producer side: a Registry maps each event's Go type to a route; Publisher (obtained from WithinTran or Bind) records typed values into the outbox in the caller's transaction; PublishOption overrides the route per call.

The Relay paces adaptively (worker.NewPacedLoop): a full batch means a backlog likely remains, so it drains again at once; an empty batch idles for PollInterval (optionally backing off to MaxPollInterval).

Metrics are optional and registered on the shared registry (see the metrics package): NewMetrics builds relay/sweeper/cleaner throughput counters (servicekit_outbox_*), wired via the *Metrics field on RelayConfig/ SweeperConfig/CleanerConfig; NewBacklogCollector exposes backlog gauges (pending/in_flight/failed/oldest_pending_seconds) by querying a StatsReader on each scrape.

Event lifecycle (FSM)

 Insert
   │
   ▼
pending ──Lease──► in_flight ──MarkSent──► sent     (terminal)
   ▲                   │
   │ MarkFailed/Sweep │ MarkFailed (attempts >= max)
   ──────────────────┴──────────► failed           (terminal)

At-least-once: if a relay dies after publishing but before MarkSent, the Sweeper returns the row to pending and it is republished; consumers dedupe on the CloudEvents id. All timestamps are computed Go-side (time.Now().UTC()) and bound as query parameters — no NOW()/CURRENT_TIMESTAMP in SQL — so the clock is injectable in tests.

Tracing: Publish captures the W3C trace context from the publishing ctx into the event's headers (otel.Carrier), so the relay carries it onto the broker message and a consumer can continue the producer's trace with otel.ExtractFromCarrier. With no active trace context the headers stay empty.

Wiring (once, at startup)

Register each event type, then build the store and the workers:

reg := outbox.NewRegistry()
outbox.Register[widget.Created](reg, "widget.created", "widgets",
    outbox.WithKey("created"))

store := outbox.NewPG(log, db, outbox.Options{})       // Postgres-backed Store
group.Add(outbox.NewRelay(log, store, publisher, outbox.RelayConfig{}))
group.Add(outbox.NewSweeper(log, store, outbox.SweeperConfig{}))
group.Add(outbox.NewCleaner(log, store, outbox.CleanerConfig{}))

Producing events

Inside WithinTran the domain writes and publishes in the same transaction; any error rolls back both. The closure receives tx (so an application can bind its own store: store.WithTx(tx)) and a tx-bound Publisher:

err := outbox.WithinTran(ctx, log, db, store, reg,
    func(tx *sqlx.Tx, pub outbox.Publisher) error {
        if err := repo.WithTx(tx).Create(ctx, w); err != nil { // domain write
            return err
        }
        return pub.Publish(ctx, widget.Created{ID: w.ID.String()}) // domain event
    })

One type, many events (PublishOption)

When one payload type fans out to several CloudEvents types chosen at runtime — e.g. a promocode whose discount kind decides the event — register a default route and override it per publish with As (and optionally OnTopic / WithRouteKey). The routing-by-condition stays domain logic:

// Register[PushPayload](reg, "promo.created", "push-gateway", WithKey("push-gateway.send"))
for _, batch := range batches { // fan-out: N events, one transaction
    if err := pub.Publish(ctx, PushPayload{...}, outbox.As(eventType)); err != nil {
        return err // any failure rolls back the whole unit of work
    }
}

Options

  • Register options: WithKey, WithContentType (default application/json), WithMarshaler (default encoding/json).
  • Publish options: As (override CloudEvents type), OnTopic (override topic), WithRouteKey (override key).
  • Store options (Options): Backoff (retry schedule for failed-but-retryable events). The table is fixed (outbox_events).
  • Worker configs: RelayConfig (PollInterval/MaxPollInterval/BatchSize/ PublishTimeout/Metrics), SweeperConfig (Interval/LeaseTimeout/BatchSize/ Metrics), CleanerConfig (Interval/Retention/Metrics).
  • Metrics: NewMetrics(reg) (throughput counters), NewBacklogCollector(sr, log) (backlog gauges), WithScrapeTimeout.

Escape hatch

For a fully dynamic event built without the Registry, construct it with NewEvent and insert it on the transaction directly:

ev, _ := outbox.NewEvent(eventType, topic, key, "application/json", payload, nil)
err := store.WithTx(tx).Insert(ctx, ev)

Publisher (this package) writes events into the outbox table; it is distinct from broker.Publisher, which delivers an already-stored event to the wire.

Index

Constants

View Source
const (
	StatusPending  = "pending"   // awaits the relay
	StatusInFlight = "in_flight" // leased by a relay, publish in progress
	StatusSent     = "sent"      // terminal: published successfully
	StatusFailed   = "failed"    // terminal: retry budget exhausted
)

Event statuses stored in the status column.

View Source
const DefaultMaxAttempts = 10

DefaultMaxAttempts is the per-row retry budget assigned by NewEvent.

Variables

View Source
var ErrLeaseLost = errors.New("outbox: lease lost")

ErrLeaseLost is returned by MarkSent/MarkFailed when the row's lease no longer matches the caller's lease id — the sweeper or another relay reclaimed it. The mark is a no-op; the row will be reprocessed under its new lease.

Functions

func NewCleaner

func NewCleaner(log *logger.Logger, store Store, cfg CleanerConfig) *worker.Loop

NewCleaner builds the retention worker as a worker.Loop. It deletes terminal rows past the retention window to keep the table small.

func NewRelay

func NewRelay(log *logger.Logger, store Store, pub broker.Publisher, cfg RelayConfig) *worker.Loop

NewRelay builds the relay as a worker.Loop. It wires the outbox FSM onto a worker.Processor: LeasePending is the Source, the broker.Publisher is the Handler, and MarkSent/MarkFailed are the Sink. Run several relays (here via worker.Group, or across replicas) for throughput — SKIP LOCKED splits work.

The loop paces adaptively (NewPacedLoop): a full batch means a backlog likely remains, so it drains again at once; an empty batch idles for PollInterval (optionally backing off to MaxPollInterval). Delivery is at-least-once: if the process dies after a successful publish but before MarkSent, the sweeper returns the row to pending and the relay republishes it. Consumers dedupe on the CloudEvents id.

func NewSweeper

func NewSweeper(log *logger.Logger, store Store, cfg SweeperConfig) *worker.Loop

NewSweeper builds the lease-reclaim worker as a worker.Loop. It returns in_flight rows abandoned by a crashed relay back to pending.

func Register added in v0.4.0

func Register[T any](r *Registry, eventType, topic string, opts ...RouteOption)

Register maps the Go type T to a CloudEvents type name and a transport topic. A pointer type registers its element type, so emitting either T or *T resolves the same route.

It panics on a wiring mistake — empty name/topic or a duplicate type — since these are caught at startup, not at runtime.

func Schema

func Schema() string

Schema returns the DDL creating the outbox_events table and its pending-scan index. Apply it in a migration, or call PG.EnsureSchema in tests.

func WithinTran

func WithinTran(ctx context.Context, log *logger.Logger, db *sqlx.DB, store Store, reg *Registry, fn PublishFunc) error

WithinTran runs fn inside one SQL transaction with a tx-bound Publisher. Events published through pub are inserted in the SAME transaction, so domain writes and their events commit atomically — the core guarantee of the transactional outbox: an event is persisted if and only if the domain change that produced it commits. Any error (from fn or an insert) rolls everything back.

reg resolves each published value's route (topic/key/content type). This is a thin wrapper over sqldb.WithinTran; the domain never threads tx through its event code.

Types

type BacklogCollector added in v0.5.0

type BacklogCollector struct {
	// contains filtered or unexported fields
}

BacklogCollector is a prometheus.Collector that, on each scrape, reads the outbox backlog via a StatsReader and reports it as gauges:

servicekit_outbox_pending             — events awaiting the relay
servicekit_outbox_in_flight           — events leased, publish in progress
servicekit_outbox_failed              — terminal failures awaiting attention
servicekit_outbox_oldest_pending_seconds — age of the oldest pending event

It queries the store on every scrape, so keep the scrape interval sane across replicas. A query error is logged and that scrape emits no outbox gauges (rather than stale values). Register it on the shared registry like any collector.

func NewBacklogCollector added in v0.5.0

func NewBacklogCollector(sr StatsReader, log *logger.Logger, opts ...BacklogOption) *BacklogCollector

NewBacklogCollector builds a backlog collector over sr. log may be nil.

func (*BacklogCollector) Collect added in v0.5.0

func (c *BacklogCollector) Collect(ch chan<- prometheus.Metric)

Collect implements prometheus.Collector: it queries the backlog and emits the gauges. On a query error it logs and emits nothing for this scrape.

func (*BacklogCollector) Describe added in v0.5.0

func (c *BacklogCollector) Describe(ch chan<- *prometheus.Desc)

Describe implements prometheus.Collector.

type BacklogOption added in v0.5.0

type BacklogOption func(*BacklogCollector)

BacklogOption customizes a BacklogCollector.

func WithScrapeTimeout added in v0.5.0

func WithScrapeTimeout(d time.Duration) BacklogOption

WithScrapeTimeout bounds the stats query run on each scrape (default 2s).

type CleanerConfig

type CleanerConfig struct {
	Name string // default "outbox-cleaner"
	// Interval between cleanups (default 1h).
	Interval time.Duration
	// Retention: sent/failed rows older than this are deleted (default 24h).
	Retention time.Duration
	// Metrics, when set, counts deleted rows.
	Metrics *Metrics
}

CleanerConfig configures the Cleaner worker.

type Event

type Event struct {
	ID            uuid.UUID
	Type          string
	ContentType   string
	Topic         string
	Key           string
	Payload       []byte
	Headers       []byte // JSON object of transport headers ("{}" when none)
	Status        string
	Attempts      int
	MaxAttempts   int
	LastError     string
	NextAttemptAt time.Time
	CreatedAt     time.Time
	SentAt        time.Time
	LeasedAt      time.Time
	LeaseID       uuid.UUID
}

Event is an outbox record as seen by callers — a pure core type with no db tags or Null wrappers (the PG store owns that mapping). Payload is the business data only; the broker.Publisher wraps it in a CloudEvents envelope.

Routing is transport-neutral: Topic is the logical destination and Key an optional routing/ordering key. Each broker maps them to its own concepts (RabbitMQ exchange+routing key, Kafka topic+partition key, NATS subject), so the same event row can be delivered over any transport.

func NewEvent

func NewEvent(eventType, topic, key, contentType string, payload []byte, headers map[string]any) (Event, error)

NewEvent builds a pending Event ready for Insert, stamping a fresh id, UTC timestamps, attempts=0, and max_attempts=DefaultMaxAttempts. topic is the logical destination (required); key is the optional routing/ordering key. headers may be nil (normalized to "{}"). payload must already be serialized by the caller.

Most producers don't call NewEvent directly — they Publish a typed value through a tx-bound Publisher (see Bind / WithinTran), which resolves the route from a Registry. NewEvent is the low-level escape hatch for building a row with a dynamic topic.

type Marshaler added in v0.4.0

type Marshaler func(v any) ([]byte, error)

Marshaler serializes a domain event value into the bytes stored as an event's payload. The default is encoding/json.Marshal; override per type with WithMarshaler (e.g. for protobuf).

type Metrics added in v0.5.0

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics are the outbox's own throughput counters, owned by this package and registered on the shared registry passed to NewMetrics. Wire it into RelayConfig/SweeperConfig/CleanerConfig to record relay activity; a nil *Metrics is a safe no-op, so metrics stay optional.

Backlog gauges (depth, age) are separate — they need a query, so they live in the BacklogCollector rather than being incremented inline here.

func NewMetrics added in v0.5.0

func NewMetrics(reg prometheus.Registerer) *Metrics

NewMetrics builds the outbox counters and registers them on reg (idempotently, via metrics.Register). reg may be nil to disable them.

type Options

type Options struct {
	// Backoff schedules retries (next_attempt_at) for failed-but-retryable
	// events. Defaults to base 2s, factor 2, max 5m.
	Backoff worker.Backoff
}

Options configures a PG store.

type PG

type PG struct {
	// contains filtered or unexported fields
}

PG is the Postgres-backed implementation of Store (and StatsReader). Every query is an inline const right next to the method that runs it — no string-templated query cache, no fmt.Sprintf — so the SQL reads as SQL. The backing table is fixed (outbox_events); status values are bound as named params from the Status* constants so the SQL and Go agree on one source of truth.

All statements go through the sqldb named-query helpers for consistent tracing and error wrapping. The Store holds its handle as sqlx.ExtContext, so the same type works against either a *sqlx.DB (pool-bound) or a *sqlx.Tx (transaction-bound, via WithTx) — that is how Insert joins the caller's domain transaction in WithinTran.

PG is safe for concurrent use by multiple relay/sweeper replicas: LeasePending and SweepExpiredLeases claim rows with FOR UPDATE SKIP LOCKED.

func NewPG

func NewPG(log *logger.Logger, db *sqlx.DB, opts Options) *PG

NewPG builds a Postgres outbox store. The table is created by a migration (or EnsureSchema in tests), not here.

func (*PG) Cleanup

func (s *PG) Cleanup(ctx context.Context, retention time.Duration, now time.Time) (int64, error)

Cleanup deletes terminal (sent/failed) rows created before now-retention. Returns the number of rows deleted.

func (*PG) EnsureSchema

func (s *PG) EnsureSchema(ctx context.Context) error

EnsureSchema creates the outbox table and indexes if absent.

func (*PG) Insert

func (s *PG) Insert(ctx context.Context, events ...Event) error

Insert writes pending events. Bound to the caller's transaction when the store was derived via WithTx.

func (*PG) LeasePending

func (s *PG) LeasePending(ctx context.Context, now time.Time, limit int) ([]Event, error)

LeasePending atomically claims up to limit pending events whose next_attempt_at <= now and marks them in_flight under a fresh lease id. A single CTE + UPDATE … RETURNING does selection and lease assignment in one round-trip; FOR UPDATE SKIP LOCKED lets relay replicas split the work. The whole batch shares one lease id (one per relay tick).

func (*PG) MarkFailed

func (s *PG) MarkFailed(ctx context.Context, ev Event, leaseID uuid.UUID, errMsg string, now time.Time) error

MarkFailed transitions in_flight -> pending (with backoff) when the retry budget remains, or -> failed once attempts+1 reaches max_attempts. Guarded by the same lease predicate as MarkSent.

func (*PG) MarkSent

func (s *PG) MarkSent(ctx context.Context, ev Event, leaseID uuid.UUID, now time.Time) error

MarkSent transitions in_flight -> sent in a single guarded UPDATE. The lease_id + status='in_flight' predicate is the lease guard: a sweeper or parallel relay that reclaimed the row clears its lease_id, so the UPDATE matches no row and we return ErrLeaseLost.

func (*PG) Stats added in v0.5.0

func (s *PG) Stats(ctx context.Context, now time.Time) (Stats, error)

Stats returns a backlog snapshot (pending/in_flight/failed counts and the age of the oldest pending row) in one aggregate query. Used by BacklogCollector to expose backlog gauges.

func (*PG) SweepExpiredLeases

func (s *PG) SweepExpiredLeases(ctx context.Context, leaseTimeout time.Duration, now time.Time, limit int) (int64, error)

SweepExpiredLeases returns in_flight rows whose lease is older than leaseTimeout back to pending so a crashed relay's work is retried. attempts is left unchanged — a lease expiry means the worker died, not that the broker rejected the message. Returns the number reclaimed.

func (*PG) WithTx

func (s *PG) WithTx(tx sqlx.ExtContext) Store

WithTx returns a sibling store whose queries run on tx.

type PublishFunc added in v0.4.0

type PublishFunc func(tx *sqlx.Tx, pub Publisher) error

PublishFunc is the caller's transactional closure. It receives the transaction handle for domain writes and a tx-bound Publisher for emitting events. Returning an error rolls back everything — domain writes and the events recorded through pub together.

The domain code inside fn never sees pub's mechanics: it calls pub.Publish(ctx, SomeEvent{...}) with a plain typed value. The tx handle is here so an application can bind its own store to the transaction (e.g. store.WithTx(tx)); domain methods themselves take the bound store, not tx.

type PublishOption added in v0.4.0

type PublishOption func(*route)

PublishOption overrides the registered route for a single Publish. The Go type still resolves marshaling and the route defaults from the Registry; these options replace individual fields for this call only.

func As added in v0.4.0

func As(eventType string) PublishOption

As overrides the CloudEvents type for this publish. Use it when one payload type maps to several event types selected by runtime conditions (e.g. a promocode whose discount kind decides amount/percentage/category.created).

func OnTopic added in v0.4.0

func OnTopic(topic string) PublishOption

OnTopic overrides the destination topic for this publish.

func WithRouteKey added in v0.4.0

func WithRouteKey(key string) PublishOption

WithRouteKey overrides the routing/ordering key for this publish.

type Publisher added in v0.4.0

type Publisher interface {
	// Publish records event in the outbox within the bound transaction. event's
	// Go type must be registered; its route (topic, key, content type) and
	// marshaling are resolved from the Registry. Returns an error if the type is
	// unregistered or marshaling/insert fails.
	//
	// opts override the registered route per call. This is how one Go payload
	// type fans out to several CloudEvents types chosen at runtime: register the
	// type once with a default route, then pass As(...) (and optionally
	// OnTopic/WithRouteKey) to pick the concrete event for this publish.
	Publish(ctx context.Context, event any, opts ...PublishOption) error
}

Publisher records domain events into the outbox. The domain depends only on this interface: it publishes a plain typed value and knows nothing about the transaction, the database, or the transport. A tx-bound Publisher is provided to the WithinTran closure (or built directly with Bind), so the recorded event commits atomically with the domain writes in the same transaction.

This is distinct from broker.Publisher: that one delivers an already-stored event to the wire (used by the Relay); this one writes the event into the outbox table inside the producer's transaction.

func Bind added in v0.4.0

func Bind(tx sqlx.ExtContext, store Store, reg *Registry) Publisher

Bind returns a Publisher whose Publish writes events into the outbox on tx, resolving each event's route from reg. Events recorded through it commit atomically with the other writes in tx.

Usually you don't call Bind directly — WithinTran builds a bound Publisher and passes it to your closure. Bind is exposed for callers that manage the transaction themselves.

type Registry added in v0.4.0

type Registry struct {
	// contains filtered or unexported fields
}

Registry maps a domain event's Go type to its transport route (event type name, topic, key, content type, and how to marshal it). It lets the domain publish a plain typed value while the routing — which broker topic, which key — stays a wiring concern: the domain never names a topic or exchange.

Populate it once at startup with Register, then hand it to WithinTran / Bind. Registry is safe for concurrent reads after setup.

func NewRegistry added in v0.4.0

func NewRegistry() *Registry

NewRegistry returns an empty Registry.

type RelayConfig

type RelayConfig struct {
	// Name labels the worker (default "outbox-relay").
	Name string
	// PollInterval is the idle tick rate (default 2s) — the wait after a tick
	// that found no work. The relay paces adaptively: after a full batch it
	// drains again immediately, so a backlog clears without waiting a full
	// interval. It also runs an immediate first tick on startup.
	PollInterval time.Duration
	// MaxPollInterval, when greater than PollInterval, makes the idle wait back
	// off geometrically up to this cap over consecutive empty ticks (a quiet
	// relay polls less often). 0 keeps the idle wait at PollInterval.
	MaxPollInterval time.Duration
	// BatchSize is the max events leased per tick (default 100).
	BatchSize int
	// PublishTimeout bounds each publish + its follow-up mark (default 5s).
	PublishTimeout time.Duration
	// Metrics, when set, records relay throughput (published/failed/latency).
	Metrics *Metrics
}

RelayConfig configures the Relay worker.

type RouteOption added in v0.4.0

type RouteOption func(*route)

RouteOption customizes a registration.

func WithContentType added in v0.4.0

func WithContentType(ct string) RouteOption

WithContentType overrides the payload MIME type (default "application/json").

func WithKey added in v0.4.0

func WithKey(key string) RouteOption

WithKey sets the routing/ordering key for the event type (RabbitMQ routing key, Kafka partition key, ...). Optional; defaults to empty.

func WithMarshaler added in v0.4.0

func WithMarshaler(m Marshaler) RouteOption

WithMarshaler overrides how the event value is serialized (default JSON).

type Stats added in v0.5.0

type Stats struct {
	Pending          int64
	InFlight         int64
	Failed           int64
	OldestPendingAge time.Duration // age of the oldest pending row; 0 when none
}

Stats is a point-in-time snapshot of the outbox table used for backlog metrics: how much work is waiting and how old the oldest waiting event is.

type StatsReader added in v0.5.0

type StatsReader interface {
	Stats(ctx context.Context, now time.Time) (Stats, error)
}

StatsReader reports outbox backlog stats. *PG implements it; it is a separate capability from Store so the core port stays minimal and backlog metrics stay opt-in.

type Store

type Store interface {
	// WithTx returns a sibling Store whose queries run on tx. Used by WithinTran
	// to insert events in the caller's domain transaction.
	WithTx(tx sqlx.ExtContext) Store

	// Insert writes pending events. Bound to a transaction when the Store was
	// derived via WithTx.
	Insert(ctx context.Context, events ...Event) error

	// LeasePending atomically claims up to limit pending events whose
	// next_attempt_at <= now, marks them in_flight with a fresh lease id, and
	// returns them (FOR UPDATE SKIP LOCKED, so relay replicas split work).
	LeasePending(ctx context.Context, now time.Time, limit int) ([]Event, error)

	// MarkSent transitions in_flight -> sent. Guarded by leaseID; returns
	// ErrLeaseLost on a lease mismatch.
	MarkSent(ctx context.Context, ev Event, leaseID uuid.UUID, now time.Time) error

	// MarkFailed transitions in_flight -> pending (attempts+1 < max, with
	// next_attempt_at advanced by backoff) or -> failed (attempts+1 >= max).
	// Guarded by leaseID; returns ErrLeaseLost on a lease mismatch.
	MarkFailed(ctx context.Context, ev Event, leaseID uuid.UUID, errMsg string, now time.Time) error

	// SweepExpiredLeases returns in_flight rows whose lease is older than
	// leaseTimeout back to pending (a crashed-relay signal — attempts is NOT
	// incremented). Returns the number reclaimed.
	SweepExpiredLeases(ctx context.Context, leaseTimeout time.Duration, now time.Time, limit int) (int64, error)

	// Cleanup deletes sent/failed rows created before now-retention. Returns the
	// number of rows deleted.
	Cleanup(ctx context.Context, retention time.Duration, now time.Time) (int64, error)
}

Store is the persistence port for the outbox table. The Postgres implementation is *PG.

type SweeperConfig

type SweeperConfig struct {
	Name string // default "outbox-sweeper"
	// Interval between sweeps (default 30s).
	Interval time.Duration
	// LeaseTimeout: in_flight rows leased longer than this are reclaimed
	// (default 1m). Set comfortably above RelayConfig.PublishTimeout.
	LeaseTimeout time.Duration
	// BatchSize bounds rows reclaimed per sweep (default 100).
	BatchSize int
	// Metrics, when set, counts reclaimed leases.
	Metrics *Metrics
}

SweeperConfig configures the Sweeper worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL