outbox

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package outbox implements the Transactional Outbox pattern: events are stored in the caller's DB transaction and a relay ships them to NATS.

Index

Constants

This section is empty.

Variables

View Source
var RelayModule = fx.Options(
	fx.Provide(NewRelay),
	fx.Invoke(func(lc fx.Lifecycle, r *Relay) {
		lc.Append(fx.Hook{OnStart: r.Start, OnStop: r.Stop})
	}),
)

RelayModule provides the Relay and registers its lifecycle hooks. Include only in long-running processes (worker, e2e tests). Integration tests must NOT include this — the relay polls the DB in a background goroutine and canceling its context during test teardown corrupts the shared connection. Requires an outbox.Config in the graph, supplied by the service.

View Source
var StorageModule = fx.Options(
	fx.Provide(
		NewStorage,
		func(p *Storage) Outbox { return p },
	),
)

StorageModule provides the Storage and binds it as the Outbox port. Include in every app mode — server, worker, console — so use cases can always write to the outbox transactionally.

Functions

This section is empty.

Types

type Config

type Config struct {
	SubjectPrefix string
	Source        string
}

Config carries the relay's publishing identity. SubjectPrefix prefixes every published subject; Source identifies the publishing service in the event envelope (typically the app slug).

type Event

type Event struct {
	ID          int64          `gorm:"column:id;type:bigserial;primaryKey;autoIncrement"`
	MessageID   uuid.UUID      `gorm:"column:message_id;type:char(36);not null;uniqueIndex"`
	Subject     string         `gorm:"column:subject;type:varchar(255);not null"`
	Version     string         `gorm:"column:version;type:varchar(20);not null"`
	Payload     datatypes.JSON `gorm:"column:payload;type:jsonb;not null"`
	OccurredAt  time.Time      `gorm:"column:occurred_at;type:timestamp;not null"`
	PublishedAt *time.Time     `gorm:"column:published_at;type:timestamp"`
	Error       *string        `gorm:"column:error;type:text"`
	FailedAt    *time.Time     `gorm:"column:failed_at;type:timestamp"`
}

Event is one row in the outbox_events table. It holds a pending integration event until the Relay publishes it to the broker and marks it published.

FailedAt is set only for permanent failures (e.g. marshal errors). Transient broker failures never touch the row — the event is simply retried on the next poll tick. Error records the reason for ops inspection and replay.

func (*Event) TableName

func (*Event) TableName() string

type Outbox

type Outbox interface {
	Store(ctx context.Context, e OutboxEvent) error
}

Outbox stores integration events to the outbox for at-least-once delivery to the central message bus.

type OutboxEvent

type OutboxEvent interface {
	Subject() string
	Version() string
}

OutboxEvent is the constraint for publishable integration events.

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

Relay polls the outbox for unpublished rows, publishes each to NATS JetStream, then marks the row published. It runs as a single goroutine; no per-row concurrency keeps order-per-subject roughly stable.

Loop behavior:

  • Empty outbox → sleep pollInterval, then poll again.
  • Full batch → loop back immediately (more events likely queued).
  • Broker/DB error → sleep errorBackoff, then retry.

Two failure modes for individual rows:

  • Permanent (marshal failure): quarantined immediately, does not block others.
  • Transient (NATS unavailable): batch stopped, retried after errorBackoff.

func NewRelay

func NewRelay(
	l logger.Logger,
	c clock.Clock,
	db *gorm.DB,
	js natslib.JetStreamContext,
	cfg Config,
) (*Relay, error)

func (*Relay) Start

func (r *Relay) Start(ctx context.Context) error

Start kicks off the polling goroutine. The startup ctx is checked for early cancellation only — the loop uses the relay's own lifetime context so it is not bounded by the short-lived fx startup context. Idempotent.

func (*Relay) Stop

func (r *Relay) Stop(ctx context.Context) error

Stop cancels the relay's context, causing the loop to exit, then waits for the goroutine to finish.

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage implements the Outbox port by writing each event to the outbox_events table in the caller's transaction (Transactional Outbox). The Relay picks up unpublished rows and ships them to the central event bus.

func NewStorage

func NewStorage(c clock.Clock, db *gorm.DB) *Storage

func (*Storage) Store

func (p *Storage) Store(ctx context.Context, e OutboxEvent) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL