Documentation
¶
Overview ¶
Package outbox implements the transactional outbox pattern for paper-board services. Service code writes events into the outbox table within the same Postgres transaction that mutates business state; an in-process drain goroutine delivers unpublished rows to Redis Streams asynchronously.
Usage ¶
pool, _ := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
pub, err := outbox.NewPublisher(pool, outbox.Config{
Schema: "identity",
SourceService: "identity",
Stream: "paperboard.identity",
RedisAddr: "redis-master.paper-board:6379",
})
if err != nil { ... }
// In application startup:
go pub.Start(ctx)
// Inside a business transaction:
tx, _ := pool.Begin(ctx)
// ... mutate business tables ...
err = pub.Publish(ctx, tx, outbox.Event{
EventType: "identity.user.created",
Stream: "paperboard.identity",
Payload: payloadJSON,
TraceID: traceID,
SchemaVersion: 1,
OccurredAt: time.Now().UTC(),
})
tx.Commit(ctx)
The drain goroutine picks up pending rows every DrainInterval (default 200ms) and publishes them to Redis Streams. On persistent Redis failure (MaxAttempts reached) the row is promoted to dead-letter status.
Migration helper ¶
Before constructing a Publisher, run the migration helper once per service schema (idempotent, safe to run multiple times):
helper := outbox.NewMigrationHelper("identity")
if err := helper(ctx, pool); err != nil { ... }
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct {
// Schema is the owning service's Postgres schema, e.g. "identity".
Schema string
// SourceService matches Schema in Wave 1; baked into Envelope.source_service.
SourceService string
// Stream is the single Redis stream name per emitter, e.g. "paperboard.identity".
Stream string
// RedisAddr is the Redis server address, e.g. "redis-master.paper-board:6379".
RedisAddr string
// DrainInterval is how often the drain goroutine polls for pending rows.
// Default: 200ms.
DrainInterval time.Duration
// DrainBatchSize is the max rows fetched per drain tick.
// Default: 100.
DrainBatchSize int
// MaxAttempts is the outbox publish retry budget before dead-lettering.
// Default: 5.
MaxAttempts int
// BackoffSchedule is per-attempt wait times. len must equal MaxAttempts.
// Default: [1s, 2s, 4s, 8s, 16s].
BackoffSchedule []time.Duration
// DeliveredTTL is how long to keep delivered rows before cleanup.
// Default: 7 days.
DeliveredTTL time.Duration
// DeadTTL is how long to keep dead rows before cleanup.
// Default: 30 days.
DeadTTL time.Duration
// CleanupInterval is how often the cleanup goroutine runs.
// Default: 1 minute.
CleanupInterval time.Duration
// FlushTimeout is the max wait on Stop before giving up.
// Default: 5s.
FlushTimeout time.Duration
// Logger is injected; uses slog.Default() if nil.
Logger *slog.Logger
}
Config holds all configuration for the outbox publisher.
type Event ¶
type Event struct {
// EventID is the dedupe key. If zero-valued, the publisher stamps a fresh UUID.
EventID uuid.UUID
// EventType uses dotted convention: "identity.user.created".
EventType string
// Stream is the Redis stream name, e.g. "paperboard.identity".
Stream string
// Payload is protojson-encoded bytes of one of the events/v1 payload messages.
Payload []byte
// TraceID is the OpenTelemetry trace id propagated from the caller context.
TraceID string
// OrgID is the tenancy scope; empty allowed only for identity.user.created.
OrgID string
// SchemaVersion is the payload schema version (1 in Wave 1).
SchemaVersion int32
// OccurredAt is the service clock in UTC.
OccurredAt time.Time
}
Event is the payload appended to the outbox by service code.
type Publisher ¶
type Publisher interface {
// Publish writes the event into the outbox table within the caller's tx.
// Must be called inside the same pgx.Tx that mutated business state.
// Returns immediately after row insert; actual Redis delivery is async.
Publish(ctx context.Context, tx pgx.Tx, e Event) error
// Start launches the drain + cleanup goroutines.
// Blocks until ctx is cancelled; safe to call only once.
Start(ctx context.Context) error
// Stop signals both goroutines to exit cleanly.
// Blocks until in-flight publish + cleanup pass complete (bounded by FlushTimeout).
Stop(ctx context.Context) error
}
Publisher writes events into the outbox and drains to Redis in the background.