outbox

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package outbox implements the transactional outbox pattern for paper-board services. Service code writes events into the outbox table within the same Postgres transaction that mutates business state; an in-process drain goroutine delivers unpublished rows to Redis Streams asynchronously.

Usage

pool, _ := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
pub, err := outbox.NewPublisher(pool, outbox.Config{
    Schema:        "identity",
    SourceService: "identity",
    Stream:        "paperboard.identity",
    RedisAddr:     "redis-master.paper-board:6379",
})
if err != nil { ... }

// In application startup:
go pub.Start(ctx)

// Inside a business transaction:
tx, _ := pool.Begin(ctx)
// ... mutate business tables ...
err = pub.Publish(ctx, tx, outbox.Event{
    EventType:     "identity.user.created",
    Stream:        "paperboard.identity",
    Payload:       payloadJSON,
    TraceID:       traceID,
    SchemaVersion: 1,
    OccurredAt:    time.Now().UTC(),
})
tx.Commit(ctx)

The drain goroutine picks up pending rows every DrainInterval (default 200ms) and publishes them to Redis Streams. On persistent Redis failure (MaxAttempts reached) the row is promoted to dead-letter status.

Migration helper

Before constructing a Publisher, run the migration helper once per service schema (idempotent, safe to run multiple times):

helper := outbox.NewMigrationHelper("identity")
if err := helper(ctx, pool); err != nil { ... }

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMigrationHelper

func NewMigrationHelper(schema string) func(ctx context.Context, pool *pgxpool.Pool) error

NewMigrationHelper returns an idempotent function that creates the outbox_events table in the given schema. Safe to call multiple times (all DDL uses IF NOT EXISTS guards).

Types

type Config

type Config struct {
	// Schema is the owning service's Postgres schema, e.g. "identity".
	Schema string
	// SourceService matches Schema in Wave 1; baked into Envelope.source_service.
	SourceService string
	// Stream is the single Redis stream name per emitter, e.g. "paperboard.identity".
	Stream string
	// RedisAddr is the Redis server address, e.g. "redis-master.paper-board:6379".
	RedisAddr string
	// DrainInterval is how often the drain goroutine polls for pending rows.
	// Default: 200ms.
	DrainInterval time.Duration
	// DrainBatchSize is the max rows fetched per drain tick.
	// Default: 100.
	DrainBatchSize int
	// MaxAttempts is the outbox publish retry budget before dead-lettering.
	// Default: 5.
	MaxAttempts int
	// BackoffSchedule is per-attempt wait times. len must equal MaxAttempts.
	// Default: [1s, 2s, 4s, 8s, 16s].
	BackoffSchedule []time.Duration
	// DeliveredTTL is how long to keep delivered rows before cleanup.
	// Default: 7 days.
	DeliveredTTL time.Duration
	// DeadTTL is how long to keep dead rows before cleanup.
	// Default: 30 days.
	DeadTTL time.Duration
	// CleanupInterval is how often the cleanup goroutine runs.
	// Default: 1 minute.
	CleanupInterval time.Duration
	// FlushTimeout is the max wait on Stop before giving up.
	// Default: 5s.
	FlushTimeout time.Duration
	// Logger is injected; uses slog.Default() if nil.
	Logger *slog.Logger
}

Config holds all configuration for the outbox publisher.

type Event

type Event struct {
	// EventID is the dedupe key. If zero-valued, the publisher stamps a fresh UUID.
	EventID uuid.UUID
	// EventType uses dotted convention: "identity.user.created".
	EventType string
	// Stream is the Redis stream name, e.g. "paperboard.identity".
	Stream string
	// Payload is protojson-encoded bytes of one of the events/v1 payload messages.
	Payload []byte
	// TraceID is the OpenTelemetry trace id propagated from the caller context.
	TraceID string
	// OrgID is the tenancy scope; empty allowed only for identity.user.created.
	OrgID string
	// SchemaVersion is the payload schema version (1 in Wave 1).
	SchemaVersion int32
	// OccurredAt is the service clock in UTC.
	OccurredAt time.Time
}

Event is the payload appended to the outbox by service code.

type Publisher

type Publisher interface {
	// Publish writes the event into the outbox table within the caller's tx.
	// Must be called inside the same pgx.Tx that mutated business state.
	// Returns immediately after row insert; actual Redis delivery is async.
	Publish(ctx context.Context, tx pgx.Tx, e Event) error

	// Start launches the drain + cleanup goroutines.
	// Blocks until ctx is cancelled; safe to call only once.
	Start(ctx context.Context) error

	// Stop signals both goroutines to exit cleanly.
	// Blocks until in-flight publish + cleanup pass complete (bounded by FlushTimeout).
	Stop(ctx context.Context) error
}

Publisher writes events into the outbox and drains to Redis in the background.

func NewPublisher

func NewPublisher(pool *pgxpool.Pool, cfg Config) (Publisher, error)

NewPublisher creates a Publisher backed by the given pool and config.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL