Documentation
¶
Overview ¶
Package outbox implements the Transactional Outbox pattern: events are stored in the caller's DB transaction and a relay ships them to NATS.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var RelayModule = fx.Options( fx.Provide(NewRelay), fx.Invoke(func(lc fx.Lifecycle, r *Relay) { lc.Append(fx.Hook{OnStart: r.Start, OnStop: r.Stop}) }), )
RelayModule provides the Relay and registers its lifecycle hooks. Include only in long-running processes (worker, e2e tests). Integration tests must NOT include this — the relay polls the DB in a background goroutine and canceling its context during test teardown corrupts the shared connection. Requires an outbox.Config in the graph, supplied by the service.
var StorageModule = fx.Options( fx.Provide( NewStorage, func(p *Storage) Outbox { return p }, ), )
StorageModule provides the Storage and binds it as the Outbox port. Include in every app mode — server, worker, console — so use cases can always write to the outbox transactionally.
Functions ¶
This section is empty.
Types ¶
type Config ¶
Config carries the relay's publishing identity. SubjectPrefix prefixes every published subject; Source identifies the publishing service in the event envelope (typically the app slug).
type Event ¶
type Event struct {
ID int64 `gorm:"column:id;type:bigserial;primaryKey;autoIncrement"`
MessageID uuid.UUID `gorm:"column:message_id;type:char(36);not null;uniqueIndex"`
Subject string `gorm:"column:subject;type:varchar(255);not null"`
Version string `gorm:"column:version;type:varchar(20);not null"`
Payload datatypes.JSON `gorm:"column:payload;type:jsonb;not null"`
OccurredAt time.Time `gorm:"column:occurred_at;type:timestamp;not null"`
PublishedAt *time.Time `gorm:"column:published_at;type:timestamp"`
Error *string `gorm:"column:error;type:text"`
FailedAt *time.Time `gorm:"column:failed_at;type:timestamp"`
}
Event is one row in the outbox_events table. It holds a pending integration event until the Relay publishes it to the broker and marks it published.
FailedAt is set only for permanent failures (e.g. marshal errors). Transient broker failures never touch the row — the event is simply retried on the next poll tick. Error records the reason for ops inspection and replay.
type Outbox ¶
type Outbox interface {
Store(ctx context.Context, e OutboxEvent) error
}
Outbox stores integration events to the outbox for at-least-once delivery to the central message bus.
type OutboxEvent ¶
OutboxEvent is the constraint for publishable integration events.
type Relay ¶
type Relay struct {
// contains filtered or unexported fields
}
Relay polls the outbox for unpublished rows, publishes each to NATS JetStream, then marks the row published. It runs as a single goroutine; no per-row concurrency keeps order-per-subject roughly stable.
Loop behavior:
- Empty outbox → sleep pollInterval, then poll again.
- Full batch → loop back immediately (more events likely queued).
- Broker/DB error → sleep errorBackoff, then retry.
Two failure modes for individual rows:
- Permanent (marshal failure): quarantined immediately, does not block others.
- Transient (NATS unavailable): batch stopped, retried after errorBackoff.