Documentation
¶
Overview ¶
Package outbox is the transactional-outbox primitive that lets a service emit at-least-once messages atomically with its local database writes. The producer Enqueues a Message inside the same gorm transaction as its INSERT/UPDATE; a separate process (Drainer for Cloud Run Jobs, Worker for long-running dev / VMs) later claims the rows, dispatches to a registered Handler, and marks them done/failed.
Why exist: distributed writes that span "local DB + remote gRPC" can't be wrapped in a single transaction. Without the outbox a service either makes the gRPC call inside a tx (rolled back on failure but the remote side already saw it) or outside (and risks orphaning the local row if the gRPC fails). The outbox table is the durable handoff: write intent + row atomically, dispatch later with retries.
Handlers MUST be idempotent. The drainer guarantees at-least-once delivery (replay on failure, on crash recovery, on concurrent drainer races). Use UNIQUE constraints on the remote side or check- before-write inside the handler.
Cloud Run shape: ship a `cmd/outbox-drainer/` binary alongside `cmd/server/` and trigger it via Cloud Scheduler every 30s–1min. The Drainer makes one pass and exits — no goroutines, no CPU-on instances. For local dev under `make dev`, the Worker keeps a long-running loop in the same process.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// BatchSize caps how many messages a single Claim returns. Default 50.
BatchSize int
// MaxAttempts: after this many failures a row stays put with the
// last error and isn't reclaimed. Default 10.
MaxAttempts int
// BaseBackoff is the unit of exponential backoff: retry_at = now +
// BaseBackoff * 2^(attempts-1), capped at 1h. Default 5s.
BaseBackoff time.Duration
// PollInterval is how often the Worker loop polls. Drainer ignores
// this (single pass). Default 5s.
PollInterval time.Duration
}
Config is the shared knobs Worker + Drainer expose.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher is the shared core that turns a batch of Messages into Handler invocations. Worker (long-running loop) and Drainer (single pass) both delegate here so retry/backoff/error handling stays in one place.
type Drainer ¶
type Drainer struct {
*Dispatcher
// contains filtered or unexported fields
}
Drainer is the Cloud Run Job shape: one pass over the outbox, then exits. Returns when there's nothing left to claim OR the batch came back empty. Idempotent and re-runnable: Cloud Scheduler triggers every N seconds; each invocation drains whatever's pending.
func NewDrainer ¶
func NewDrainer(repo Repository, handlers map[string]Handler, monitor monitoring.Monitor, cfg Config) *Drainer
type Handler ¶
Handler processes one Message. Returning an error → the row is retried. Returning nil → marked done. Handlers MUST be idempotent.
type HandlerFunc ¶
HandlerFunc adapts an ordinary function to Handler.
type Message ¶
type Message struct {
ID string
Kind string
Payload json.RawMessage
CreatedAt time.Time
Attempts int
LastError string
}
Message is one outbox row. Kind selects a Handler; Payload is the handler-specific shape encoded as JSON so the table doesn't depend on producer/consumer types.
type MessageDraft ¶
type MessageDraft struct {
Kind string
Payload json.RawMessage
}
MessageDraft is what callers pass to Enqueue. The repository assigns the id, created_at, attempts=0.
type Repository ¶
type Repository interface {
Enqueue(ctx context.Context, drafts ...MessageDraft) error
// Claim grabs up to `batch` rows whose retry_at <= NOW(), tags
// them as in-flight (FOR UPDATE SKIP LOCKED in postgres), and
// returns them. Concurrent drainer replicas don't step on each
// other.
Claim(ctx context.Context, batch int) ([]Message, error)
MarkDone(ctx context.Context, id string) error
// MarkFailed bumps attempts, stores the error, and pushes retry_at
// forward via exponential backoff. After maxAttempts the row is
// moved to dead-letter status (caller queries those separately).
MarkFailed(ctx context.Context, id string, handlerErr error, retryAt time.Time) error
}
Repository is the persistence contract. Implementations:
- postgres.Repository (this package) — gorm/Postgres-backed.
- outboxtest.InMemory — for unit tests.
Enqueue MUST run inside the producer's transaction so the outbox row commits atomically with the producer's INSERT. The kit persistence Transactioner ctx-propagates this automatically — pass the txCtx the Transactioner gives you.
type Worker ¶
type Worker struct {
*Dispatcher
// contains filtered or unexported fields
}
Worker keeps a long-running loop that polls the outbox at PollInterval. Use this in local dev (`make dev`) or on a process that gets always-on CPU. On Cloud Run with default CPU throttling, prefer Drainer + Cloud Scheduler.
func NewWorker ¶
func NewWorker(repo Repository, handlers map[string]Handler, monitor monitoring.Monitor, cfg Config) *Worker