Documentation
¶
Overview ¶
Package outbox implements the transactional outbox pattern for reliably publishing domain events to a message broker.
A domain write and the events it emits are inserted in ONE database transaction (WithinTran); a background Relay then drains pending events to a broker.Publisher with at-least-once delivery. So an event is never lost if the broker is briefly down, and never published unless its domain write commits.
Design goals ¶
The domain layer stays oblivious to persistence and transport. Producers publish a plain typed value through a tx-bound Publisher; which broker topic it lands on, and over which transport, are wiring concerns resolved by a Registry — never named in domain code. Routing is transport-neutral (Topic + optional Key), so the same event row can ship over RabbitMQ, Kafka, NATS, or a WebSocket gateway by swapping the broker.Publisher behind the Relay.
Components ¶
The moving parts map onto the worker package:
- Relay = worker.Processor[Event]: LeasePending (Source) -> broker.Publish (Handler) -> MarkSent/MarkFailed (Sink), run as a worker.Loop.
- Sweeper = worker.Loop reclaiming leases abandoned by crashed relays.
- Cleaner = worker.Loop deleting terminal rows past a retention window.
Producer side: a Registry maps each event's Go type to a route; Publisher (obtained from WithinTran or Bind) records typed values into the outbox in the caller's transaction; PublishOption overrides the route per call.
The Relay paces adaptively (worker.NewPacedLoop): a full batch means a backlog likely remains, so it drains again at once; an empty batch idles for PollInterval (optionally backing off to MaxPollInterval).
Metrics are optional and registered on the shared registry (see the metrics package): NewMetrics builds relay/sweeper/cleaner throughput counters (servicekit_outbox_*), wired via the *Metrics field on RelayConfig/ SweeperConfig/CleanerConfig; NewBacklogCollector exposes backlog gauges (pending/in_flight/failed/oldest_pending_seconds) by querying a StatsReader on each scrape.
Event lifecycle (FSM) ¶
Insert │ ▼ pending ──Lease──► in_flight ──MarkSent──► sent (terminal) ▲ │ │ MarkFailed/Sweep │ MarkFailed (attempts >= max) ──────────────────┴──────────► failed (terminal)
At-least-once: if a relay dies after publishing but before MarkSent, the Sweeper returns the row to pending and it is republished; consumers dedupe on the CloudEvents id. All timestamps are computed Go-side (time.Now().UTC()) and bound as query parameters — no NOW()/CURRENT_TIMESTAMP in SQL — so the clock is injectable in tests.
Tracing: Publish captures the W3C trace context from the publishing ctx into the event's headers (otel.Carrier), so the relay carries it onto the broker message and a consumer can continue the producer's trace with otel.ExtractFromCarrier. With no active trace context the headers stay empty.
Wiring (once, at startup) ¶
Register each event type, then build the store and the workers:
reg := outbox.NewRegistry()
outbox.Register[widget.Created](reg, "widget.created", "widgets",
outbox.WithKey("created"))
store := outbox.NewPG(log, db, outbox.Options{}) // Postgres-backed Store
group.Add(outbox.NewRelay(log, store, publisher, outbox.RelayConfig{}))
group.Add(outbox.NewSweeper(log, store, outbox.SweeperConfig{}))
group.Add(outbox.NewCleaner(log, store, outbox.CleanerConfig{}))
Producing events ¶
Inside WithinTran the domain writes and publishes in the same transaction; any error rolls back both. The closure receives tx (so an application can bind its own store: store.WithTx(tx)) and a tx-bound Publisher:
err := outbox.WithinTran(ctx, log, db, store, reg,
func(tx *sqlx.Tx, pub outbox.Publisher) error {
if err := repo.WithTx(tx).Create(ctx, w); err != nil { // domain write
return err
}
return pub.Publish(ctx, widget.Created{ID: w.ID.String()}) // domain event
})
One type, many events (PublishOption) ¶
When one payload type fans out to several CloudEvents types chosen at runtime — e.g. a promocode whose discount kind decides the event — register a default route and override it per publish with As (and optionally OnTopic / WithRouteKey). The routing-by-condition stays domain logic:
// Register[PushPayload](reg, "promo.created", "push-gateway", WithKey("push-gateway.send"))
for _, batch := range batches { // fan-out: N events, one transaction
if err := pub.Publish(ctx, PushPayload{...}, outbox.As(eventType)); err != nil {
return err // any failure rolls back the whole unit of work
}
}
Options ¶
- Register options: WithKey, WithContentType (default application/json), WithMarshaler (default encoding/json).
- Publish options: As (override CloudEvents type), OnTopic (override topic), WithRouteKey (override key).
- Store options (Options): Backoff (retry schedule for failed-but-retryable events). The table is fixed (outbox_events).
- Worker configs: RelayConfig (PollInterval/MaxPollInterval/BatchSize/ PublishTimeout/Metrics), SweeperConfig (Interval/LeaseTimeout/BatchSize/ Metrics), CleanerConfig (Interval/Retention/Metrics).
- Metrics: NewMetrics(reg) (throughput counters), NewBacklogCollector(sr, log) (backlog gauges), WithScrapeTimeout.
Escape hatch ¶
For a fully dynamic event built without the Registry, construct it with NewEvent and insert it on the transaction directly:
ev, _ := outbox.NewEvent(eventType, topic, key, "application/json", payload, nil) err := store.WithTx(tx).Insert(ctx, ev)
Publisher (this package) writes events into the outbox table; it is distinct from broker.Publisher, which delivers an already-stored event to the wire.
Index ¶
- Constants
- Variables
- func NewCleaner(log *logger.Logger, store Store, cfg CleanerConfig) *worker.Loop
- func NewRelay(log *logger.Logger, store Store, pub broker.Publisher, cfg RelayConfig) *worker.Loop
- func NewSweeper(log *logger.Logger, store Store, cfg SweeperConfig) *worker.Loop
- func Register[T any](r *Registry, eventType, topic string, opts ...RouteOption)
- func Schema() string
- func WithinTran(ctx context.Context, log *logger.Logger, db *sqlx.DB, store Store, ...) error
- type BacklogCollector
- type BacklogOption
- type CleanerConfig
- type Event
- type Marshaler
- type Metrics
- type Options
- type PG
- func (s *PG) Cleanup(ctx context.Context, retention time.Duration, now time.Time) (int64, error)
- func (s *PG) EnsureSchema(ctx context.Context) error
- func (s *PG) Insert(ctx context.Context, events ...Event) error
- func (s *PG) LeasePending(ctx context.Context, now time.Time, limit int) ([]Event, error)
- func (s *PG) MarkFailed(ctx context.Context, ev Event, leaseID uuid.UUID, errMsg string, now time.Time) error
- func (s *PG) MarkSent(ctx context.Context, ev Event, leaseID uuid.UUID, now time.Time) error
- func (s *PG) Stats(ctx context.Context, now time.Time) (Stats, error)
- func (s *PG) SweepExpiredLeases(ctx context.Context, leaseTimeout time.Duration, now time.Time, limit int) (int64, error)
- func (s *PG) WithTx(tx sqlx.ExtContext) Store
- type PublishFunc
- type PublishOption
- type Publisher
- type Registry
- type RelayConfig
- type RouteOption
- type Stats
- type StatsReader
- type Store
- type SweeperConfig
Constants ¶
const ( StatusPending = "pending" // awaits the relay StatusInFlight = "in_flight" // leased by a relay, publish in progress StatusSent = "sent" // terminal: published successfully StatusFailed = "failed" // terminal: retry budget exhausted )
Event statuses stored in the status column.
const DefaultMaxAttempts = 10
DefaultMaxAttempts is the per-row retry budget assigned by NewEvent.
Variables ¶
var ErrLeaseLost = errors.New("outbox: lease lost")
ErrLeaseLost is returned by MarkSent/MarkFailed when the row's lease no longer matches the caller's lease id — the sweeper or another relay reclaimed it. The mark is a no-op; the row will be reprocessed under its new lease.
Functions ¶
func NewCleaner ¶
NewCleaner builds the retention worker as a worker.Loop. It deletes terminal rows past the retention window to keep the table small.
func NewRelay ¶
NewRelay builds the relay as a worker.Loop. It wires the outbox FSM onto a worker.Processor: LeasePending is the Source, the broker.Publisher is the Handler, and MarkSent/MarkFailed are the Sink. Run several relays (here via worker.Group, or across replicas) for throughput — SKIP LOCKED splits work.
The loop paces adaptively (NewPacedLoop): a full batch means a backlog likely remains, so it drains again at once; an empty batch idles for PollInterval (optionally backing off to MaxPollInterval). Delivery is at-least-once: if the process dies after a successful publish but before MarkSent, the sweeper returns the row to pending and the relay republishes it. Consumers dedupe on the CloudEvents id.
func NewSweeper ¶
NewSweeper builds the lease-reclaim worker as a worker.Loop. It returns in_flight rows abandoned by a crashed relay back to pending.
func Register ¶ added in v0.4.0
func Register[T any](r *Registry, eventType, topic string, opts ...RouteOption)
Register maps the Go type T to a CloudEvents type name and a transport topic. A pointer type registers its element type, so emitting either T or *T resolves the same route.
It panics on a wiring mistake — empty name/topic or a duplicate type — since these are caught at startup, not at runtime.
func Schema ¶
func Schema() string
Schema returns the DDL creating the outbox_events table and its pending-scan index. Apply it in a migration, or call PG.EnsureSchema in tests.
func WithinTran ¶
func WithinTran(ctx context.Context, log *logger.Logger, db *sqlx.DB, store Store, reg *Registry, fn PublishFunc) error
WithinTran runs fn inside one SQL transaction with a tx-bound Publisher. Events published through pub are inserted in the SAME transaction, so domain writes and their events commit atomically — the core guarantee of the transactional outbox: an event is persisted if and only if the domain change that produced it commits. Any error (from fn or an insert) rolls everything back.
reg resolves each published value's route (topic/key/content type). This is a thin wrapper over sqldb.WithinTran; the domain never threads tx through its event code.
Types ¶
type BacklogCollector ¶ added in v0.5.0
type BacklogCollector struct {
// contains filtered or unexported fields
}
BacklogCollector is a prometheus.Collector that, on each scrape, reads the outbox backlog via a StatsReader and reports it as gauges:
servicekit_outbox_pending — events awaiting the relay servicekit_outbox_in_flight — events leased, publish in progress servicekit_outbox_failed — terminal failures awaiting attention servicekit_outbox_oldest_pending_seconds — age of the oldest pending event
It queries the store on every scrape, so keep the scrape interval sane across replicas. A query error is logged and that scrape emits no outbox gauges (rather than stale values). Register it on the shared registry like any collector.
func NewBacklogCollector ¶ added in v0.5.0
func NewBacklogCollector(sr StatsReader, log *logger.Logger, opts ...BacklogOption) *BacklogCollector
NewBacklogCollector builds a backlog collector over sr. log may be nil.
func (*BacklogCollector) Collect ¶ added in v0.5.0
func (c *BacklogCollector) Collect(ch chan<- prometheus.Metric)
Collect implements prometheus.Collector: it queries the backlog and emits the gauges. On a query error it logs and emits nothing for this scrape.
func (*BacklogCollector) Describe ¶ added in v0.5.0
func (c *BacklogCollector) Describe(ch chan<- *prometheus.Desc)
Describe implements prometheus.Collector.
type BacklogOption ¶ added in v0.5.0
type BacklogOption func(*BacklogCollector)
BacklogOption customizes a BacklogCollector.
func WithScrapeTimeout ¶ added in v0.5.0
func WithScrapeTimeout(d time.Duration) BacklogOption
WithScrapeTimeout bounds the stats query run on each scrape (default 2s).
type CleanerConfig ¶
type CleanerConfig struct {
Name string // default "outbox-cleaner"
// Interval between cleanups (default 1h).
Interval time.Duration
// Retention: sent/failed rows older than this are deleted (default 24h).
Retention time.Duration
// Metrics, when set, counts deleted rows.
Metrics *Metrics
}
CleanerConfig configures the Cleaner worker.
type Event ¶
type Event struct {
ID uuid.UUID
Type string
ContentType string
Topic string
Key string
Payload []byte
Headers []byte // JSON object of transport headers ("{}" when none)
Status string
Attempts int
MaxAttempts int
LastError string
NextAttemptAt time.Time
CreatedAt time.Time
SentAt time.Time
LeasedAt time.Time
LeaseID uuid.UUID
}
Event is an outbox record as seen by callers — a pure core type with no db tags or Null wrappers (the PG store owns that mapping). Payload is the business data only; the broker.Publisher wraps it in a CloudEvents envelope.
Routing is transport-neutral: Topic is the logical destination and Key an optional routing/ordering key. Each broker maps them to its own concepts (RabbitMQ exchange+routing key, Kafka topic+partition key, NATS subject), so the same event row can be delivered over any transport.
func NewEvent ¶
func NewEvent(eventType, topic, key, contentType string, payload []byte, headers map[string]any) (Event, error)
NewEvent builds a pending Event ready for Insert, stamping a fresh id, UTC timestamps, attempts=0, and max_attempts=DefaultMaxAttempts. topic is the logical destination (required); key is the optional routing/ordering key. headers may be nil (normalized to "{}"). payload must already be serialized by the caller.
Most producers don't call NewEvent directly — they Publish a typed value through a tx-bound Publisher (see Bind / WithinTran), which resolves the route from a Registry. NewEvent is the low-level escape hatch for building a row with a dynamic topic.
type Marshaler ¶ added in v0.4.0
Marshaler serializes a domain event value into the bytes stored as an event's payload. The default is encoding/json.Marshal; override per type with WithMarshaler (e.g. for protobuf).
type Metrics ¶ added in v0.5.0
type Metrics struct {
// contains filtered or unexported fields
}
Metrics are the outbox's own throughput counters, owned by this package and registered on the shared registry passed to NewMetrics. Wire it into RelayConfig/SweeperConfig/CleanerConfig to record relay activity; a nil *Metrics is a safe no-op, so metrics stay optional.
Backlog gauges (depth, age) are separate — they need a query, so they live in the BacklogCollector rather than being incremented inline here.
func NewMetrics ¶ added in v0.5.0
func NewMetrics(reg prometheus.Registerer) *Metrics
NewMetrics builds the outbox counters and registers them on reg (idempotently, via metrics.Register). reg may be nil to disable them.
type Options ¶
type Options struct {
// Backoff schedules retries (next_attempt_at) for failed-but-retryable
// events. Defaults to base 2s, factor 2, max 5m.
Backoff worker.Backoff
}
Options configures a PG store.
type PG ¶
type PG struct {
// contains filtered or unexported fields
}
PG is the Postgres-backed implementation of Store (and StatsReader). Every query is an inline const right next to the method that runs it — no string-templated query cache, no fmt.Sprintf — so the SQL reads as SQL. The backing table is fixed (outbox_events); status values are bound as named params from the Status* constants so the SQL and Go agree on one source of truth.
All statements go through the sqldb named-query helpers for consistent tracing and error wrapping. The Store holds its handle as sqlx.ExtContext, so the same type works against either a *sqlx.DB (pool-bound) or a *sqlx.Tx (transaction-bound, via WithTx) — that is how Insert joins the caller's domain transaction in WithinTran.
PG is safe for concurrent use by multiple relay/sweeper replicas: LeasePending and SweepExpiredLeases claim rows with FOR UPDATE SKIP LOCKED.
func NewPG ¶
NewPG builds a Postgres outbox store. The table is created by a migration (or EnsureSchema in tests), not here.
func (*PG) Cleanup ¶
Cleanup deletes terminal (sent/failed) rows created before now-retention. Returns the number of rows deleted.
func (*PG) EnsureSchema ¶
EnsureSchema creates the outbox table and indexes if absent.
func (*PG) Insert ¶
Insert writes pending events. Bound to the caller's transaction when the store was derived via WithTx.
func (*PG) LeasePending ¶
LeasePending atomically claims up to limit pending events whose next_attempt_at <= now and marks them in_flight under a fresh lease id. A single CTE + UPDATE … RETURNING does selection and lease assignment in one round-trip; FOR UPDATE SKIP LOCKED lets relay replicas split the work. The whole batch shares one lease id (one per relay tick).
func (*PG) MarkFailed ¶
func (s *PG) MarkFailed(ctx context.Context, ev Event, leaseID uuid.UUID, errMsg string, now time.Time) error
MarkFailed transitions in_flight -> pending (with backoff) when the retry budget remains, or -> failed once attempts+1 reaches max_attempts. Guarded by the same lease predicate as MarkSent.
func (*PG) MarkSent ¶
MarkSent transitions in_flight -> sent in a single guarded UPDATE. The lease_id + status='in_flight' predicate is the lease guard: a sweeper or parallel relay that reclaimed the row clears its lease_id, so the UPDATE matches no row and we return ErrLeaseLost.
func (*PG) Stats ¶ added in v0.5.0
Stats returns a backlog snapshot (pending/in_flight/failed counts and the age of the oldest pending row) in one aggregate query. Used by BacklogCollector to expose backlog gauges.
func (*PG) SweepExpiredLeases ¶
func (s *PG) SweepExpiredLeases(ctx context.Context, leaseTimeout time.Duration, now time.Time, limit int) (int64, error)
SweepExpiredLeases returns in_flight rows whose lease is older than leaseTimeout back to pending so a crashed relay's work is retried. attempts is left unchanged — a lease expiry means the worker died, not that the broker rejected the message. Returns the number reclaimed.
type PublishFunc ¶ added in v0.4.0
PublishFunc is the caller's transactional closure. It receives the transaction handle for domain writes and a tx-bound Publisher for emitting events. Returning an error rolls back everything — domain writes and the events recorded through pub together.
The domain code inside fn never sees pub's mechanics: it calls pub.Publish(ctx, SomeEvent{...}) with a plain typed value. The tx handle is here so an application can bind its own store to the transaction (e.g. store.WithTx(tx)); domain methods themselves take the bound store, not tx.
type PublishOption ¶ added in v0.4.0
type PublishOption func(*route)
PublishOption overrides the registered route for a single Publish. The Go type still resolves marshaling and the route defaults from the Registry; these options replace individual fields for this call only.
func As ¶ added in v0.4.0
func As(eventType string) PublishOption
As overrides the CloudEvents type for this publish. Use it when one payload type maps to several event types selected by runtime conditions (e.g. a promocode whose discount kind decides amount/percentage/category.created).
func OnTopic ¶ added in v0.4.0
func OnTopic(topic string) PublishOption
OnTopic overrides the destination topic for this publish.
func WithRouteKey ¶ added in v0.4.0
func WithRouteKey(key string) PublishOption
WithRouteKey overrides the routing/ordering key for this publish.
type Publisher ¶ added in v0.4.0
type Publisher interface {
// Publish records event in the outbox within the bound transaction. event's
// Go type must be registered; its route (topic, key, content type) and
// marshaling are resolved from the Registry. Returns an error if the type is
// unregistered or marshaling/insert fails.
//
// opts override the registered route per call. This is how one Go payload
// type fans out to several CloudEvents types chosen at runtime: register the
// type once with a default route, then pass As(...) (and optionally
// OnTopic/WithRouteKey) to pick the concrete event for this publish.
Publish(ctx context.Context, event any, opts ...PublishOption) error
}
Publisher records domain events into the outbox. The domain depends only on this interface: it publishes a plain typed value and knows nothing about the transaction, the database, or the transport. A tx-bound Publisher is provided to the WithinTran closure (or built directly with Bind), so the recorded event commits atomically with the domain writes in the same transaction.
This is distinct from broker.Publisher: that one delivers an already-stored event to the wire (used by the Relay); this one writes the event into the outbox table inside the producer's transaction.
func Bind ¶ added in v0.4.0
func Bind(tx sqlx.ExtContext, store Store, reg *Registry) Publisher
Bind returns a Publisher whose Publish writes events into the outbox on tx, resolving each event's route from reg. Events recorded through it commit atomically with the other writes in tx.
Usually you don't call Bind directly — WithinTran builds a bound Publisher and passes it to your closure. Bind is exposed for callers that manage the transaction themselves.
type Registry ¶ added in v0.4.0
type Registry struct {
// contains filtered or unexported fields
}
Registry maps a domain event's Go type to its transport route (event type name, topic, key, content type, and how to marshal it). It lets the domain publish a plain typed value while the routing — which broker topic, which key — stays a wiring concern: the domain never names a topic or exchange.
Populate it once at startup with Register, then hand it to WithinTran / Bind. Registry is safe for concurrent reads after setup.
func NewRegistry ¶ added in v0.4.0
func NewRegistry() *Registry
NewRegistry returns an empty Registry.
type RelayConfig ¶
type RelayConfig struct {
// Name labels the worker (default "outbox-relay").
Name string
// PollInterval is the idle tick rate (default 2s) — the wait after a tick
// that found no work. The relay paces adaptively: after a full batch it
// drains again immediately, so a backlog clears without waiting a full
// interval. It also runs an immediate first tick on startup.
PollInterval time.Duration
// MaxPollInterval, when greater than PollInterval, makes the idle wait back
// off geometrically up to this cap over consecutive empty ticks (a quiet
// relay polls less often). 0 keeps the idle wait at PollInterval.
MaxPollInterval time.Duration
// BatchSize is the max events leased per tick (default 100).
BatchSize int
// PublishTimeout bounds each publish + its follow-up mark (default 5s).
PublishTimeout time.Duration
// Metrics, when set, records relay throughput (published/failed/latency).
Metrics *Metrics
}
RelayConfig configures the Relay worker.
type RouteOption ¶ added in v0.4.0
type RouteOption func(*route)
RouteOption customizes a registration.
func WithContentType ¶ added in v0.4.0
func WithContentType(ct string) RouteOption
WithContentType overrides the payload MIME type (default "application/json").
func WithKey ¶ added in v0.4.0
func WithKey(key string) RouteOption
WithKey sets the routing/ordering key for the event type (RabbitMQ routing key, Kafka partition key, ...). Optional; defaults to empty.
func WithMarshaler ¶ added in v0.4.0
func WithMarshaler(m Marshaler) RouteOption
WithMarshaler overrides how the event value is serialized (default JSON).
type Stats ¶ added in v0.5.0
type Stats struct {
Pending int64
InFlight int64
Failed int64
OldestPendingAge time.Duration // age of the oldest pending row; 0 when none
}
Stats is a point-in-time snapshot of the outbox table used for backlog metrics: how much work is waiting and how old the oldest waiting event is.
type StatsReader ¶ added in v0.5.0
StatsReader reports outbox backlog stats. *PG implements it; it is a separate capability from Store so the core port stays minimal and backlog metrics stay opt-in.
type Store ¶
type Store interface {
// WithTx returns a sibling Store whose queries run on tx. Used by WithinTran
// to insert events in the caller's domain transaction.
WithTx(tx sqlx.ExtContext) Store
// Insert writes pending events. Bound to a transaction when the Store was
// derived via WithTx.
Insert(ctx context.Context, events ...Event) error
// LeasePending atomically claims up to limit pending events whose
// next_attempt_at <= now, marks them in_flight with a fresh lease id, and
// returns them (FOR UPDATE SKIP LOCKED, so relay replicas split work).
LeasePending(ctx context.Context, now time.Time, limit int) ([]Event, error)
// MarkSent transitions in_flight -> sent. Guarded by leaseID; returns
// ErrLeaseLost on a lease mismatch.
MarkSent(ctx context.Context, ev Event, leaseID uuid.UUID, now time.Time) error
// MarkFailed transitions in_flight -> pending (attempts+1 < max, with
// next_attempt_at advanced by backoff) or -> failed (attempts+1 >= max).
// Guarded by leaseID; returns ErrLeaseLost on a lease mismatch.
MarkFailed(ctx context.Context, ev Event, leaseID uuid.UUID, errMsg string, now time.Time) error
// SweepExpiredLeases returns in_flight rows whose lease is older than
// leaseTimeout back to pending (a crashed-relay signal — attempts is NOT
// incremented). Returns the number reclaimed.
SweepExpiredLeases(ctx context.Context, leaseTimeout time.Duration, now time.Time, limit int) (int64, error)
// Cleanup deletes sent/failed rows created before now-retention. Returns the
// number of rows deleted.
Cleanup(ctx context.Context, retention time.Duration, now time.Time) (int64, error)
}
Store is the persistence port for the outbox table. The Postgres implementation is *PG.
type SweeperConfig ¶
type SweeperConfig struct {
Name string // default "outbox-sweeper"
// Interval between sweeps (default 30s).
Interval time.Duration
// LeaseTimeout: in_flight rows leased longer than this are reclaimed
// (default 1m). Set comfortably above RelayConfig.PublishTimeout.
LeaseTimeout time.Duration
// BatchSize bounds rows reclaimed per sweep (default 100).
BatchSize int
// Metrics, when set, counts reclaimed leases.
Metrics *Metrics
}
SweeperConfig configures the Sweeper worker.