outbox

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package outbox is the transactional-outbox primitive that lets a service emit at-least-once messages atomically with its local database writes. The producer Enqueues a Message inside the same gorm transaction as its INSERT/UPDATE; a separate process (Drainer for Cloud Run Jobs, Worker for long-running dev / VMs) later claims the rows, dispatches to a registered Handler, and marks them done/failed.

Why exist: distributed writes that span "local DB + remote gRPC" can't be wrapped in a single transaction. Without the outbox a service either makes the gRPC call inside a tx (rolled back on failure but the remote side already saw it) or outside (and risks orphaning the local row if the gRPC fails). The outbox table is the durable handoff: write intent + row atomically, dispatch later with retries.

Handlers MUST be idempotent. The drainer guarantees at-least-once delivery (replay on failure, on crash recovery, on concurrent drainer races). Use UNIQUE constraints on the remote side or check- before-write inside the handler.

Cloud Run shape: ship a `cmd/outbox-drainer/` binary alongside `cmd/server/` and trigger it via Cloud Scheduler every 30s–1min. The Drainer makes one pass and exits — no goroutines, no CPU-on instances. For local dev under `make dev`, the Worker keeps a long-running loop in the same process.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// BatchSize caps how many messages a single Claim returns. Default 50.
	BatchSize int
	// MaxAttempts: after this many failures a row stays put with the
	// last error and isn't reclaimed. Default 10.
	MaxAttempts int
	// BaseBackoff is the unit of exponential backoff: retry_at = now +
	// BaseBackoff * 2^(attempts-1), capped at 1h. Default 5s.
	BaseBackoff time.Duration
	// PollInterval is how often the Worker loop polls. Drainer ignores
	// this (single pass). Default 5s.
	PollInterval time.Duration
}

Config is the shared knobs Worker + Drainer expose.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher is the shared core that turns a batch of Messages into Handler invocations. Worker (long-running loop) and Drainer (single pass) both delegate here so retry/backoff/error handling stays in one place.

type Drainer

type Drainer struct {
	*Dispatcher
	// contains filtered or unexported fields
}

Drainer is the Cloud Run Job shape: one pass over the outbox, then exits. Returns when there's nothing left to claim OR the batch came back empty. Idempotent and re-runnable: Cloud Scheduler triggers every N seconds; each invocation drains whatever's pending.

func NewDrainer

func NewDrainer(repo Repository, handlers map[string]Handler, monitor monitoring.Monitor, cfg Config) *Drainer

func (*Drainer) Drain

func (d *Drainer) Drain(ctx context.Context) (int, error)

Drain processes batches until Claim returns empty or ctx is done. Returns the total messages successfully handled across all batches in this invocation.

type Handler

type Handler interface {
	Handle(ctx context.Context, msg Message) error
}

Handler processes one Message. Returning an error → the row is retried. Returning nil → marked done. Handlers MUST be idempotent.

type HandlerFunc

type HandlerFunc func(ctx context.Context, msg Message) error

HandlerFunc adapts an ordinary function to Handler.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(ctx context.Context, msg Message) error

type Message

type Message struct {
	ID        string
	Kind      string
	Payload   json.RawMessage
	CreatedAt time.Time
	Attempts  int
	LastError string
}

Message is one outbox row. Kind selects a Handler; Payload is the handler-specific shape encoded as JSON so the table doesn't depend on producer/consumer types.

type MessageDraft

type MessageDraft struct {
	Kind    string
	Payload json.RawMessage
}

MessageDraft is what callers pass to Enqueue. The repository assigns the id, created_at, attempts=0.

func NewDraft

func NewDraft(kind string, payload any) (MessageDraft, error)

NewDraft is a small constructor that JSON-encodes the payload so call sites don't have to import encoding/json.

type Repository

type Repository interface {
	Enqueue(ctx context.Context, drafts ...MessageDraft) error
	// Claim grabs up to `batch` rows whose retry_at <= NOW(), tags
	// them as in-flight (FOR UPDATE SKIP LOCKED in postgres), and
	// returns them. Concurrent drainer replicas don't step on each
	// other.
	Claim(ctx context.Context, batch int) ([]Message, error)
	MarkDone(ctx context.Context, id string) error
	// MarkFailed bumps attempts, stores the error, and pushes retry_at
	// forward via exponential backoff. After maxAttempts the row is
	// moved to dead-letter status (caller queries those separately).
	MarkFailed(ctx context.Context, id string, handlerErr error, retryAt time.Time) error
}

Repository is the persistence contract. Implementations:

  • postgres.Repository (this package) — gorm/Postgres-backed.
  • outboxtest.InMemory — for unit tests.

Enqueue MUST run inside the producer's transaction so the outbox row commits atomically with the producer's INSERT. The kit persistence Transactioner ctx-propagates this automatically — pass the txCtx the Transactioner gives you.

type Worker

type Worker struct {
	*Dispatcher
	// contains filtered or unexported fields
}

Worker keeps a long-running loop that polls the outbox at PollInterval. Use this in local dev (`make dev`) or on a process that gets always-on CPU. On Cloud Run with default CPU throttling, prefer Drainer + Cloud Scheduler.

func NewWorker

func NewWorker(repo Repository, handlers map[string]Handler, monitor monitoring.Monitor, cfg Config) *Worker

func (*Worker) Run

func (w *Worker) Run(ctx context.Context) error

Run blocks until ctx is cancelled. Drains one batch per PollInterval; if a batch returns < BatchSize messages, sleeps the full interval; if it returns a full batch, immediately polls again (catch-up mode).

Directories

Path Synopsis
Package postgres is the Postgres/GORM implementation of outbox.Repository.
Package postgres is the Postgres/GORM implementation of outbox.Repository.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL