pgoutbox

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: MIT Imports: 14 Imported by: 0

README

pgoutbox - a transactional outbox for pgx

pgoutbox implements a simple transactional outbox for pgx. New messages can be added to a Postgres table using AddMessages and can be flushed to a destination via ProcessMessages.

Here's an example of flushing messages on topic1 by simply printing them to the console:

type printFlusher struct{}

func (printFlusher) Flush(_ context.Context, msgs []*sqlc.Message) error {
	for _, m := range msgs {
		fmt.Printf("  flushed id=%d topic=%s payload=%s\n", m.ID, m.Topic, string(m.Payload))
	}
	return nil
}

outbox, err := pgoutbox.NewOutbox(pool)

if err != nil {
    panic(err)
}

outbox.AddFlusher("topic1", printFlusher{})

Then, within a transaction, messages can be added via:

if err := outbox.AddMessages(ctx, tx, "topic1", msgs); err != nil {
    panic(err)
}

And messages can be flushed after the transaction commits using:

_, err := outbox.ProcessMessages(ctx, "topic1");

if err != nil {
    panic(err)
}

Schema

By default, NewOutbox runs migrations and creates an outbox table in the schema outbox.messages. This can be overwritten via:

outbox, err := pgoutbox.NewOutbox(pool, pgoutbox.WithSchema("my_schema"))

If you'd rather run migrations yourself (for example, as part of a separate release step), disable the auto-migration and invoke Migrate explicitly:

outbox, err := pgoutbox.NewOutbox(pool,
    pgoutbox.WithSchema("my_schema"),
    pgoutbox.WithAutoMigrate(false),
)

if err := pgoutbox.Migrate(ctx, pool, pgoutbox.WithSchema("my_schema")); err != nil {
    panic(err)
}

Multiple topics and flushers

It's easy to configure multiple destinations using topics registered for each flusher:

outbox.AddFlusher("orders", ordersFlusher{})
outbox.AddFlusher("shipments", shipmentsFlusher{})

// within a single transaction, write to whichever topics you need
tx, err := pool.Begin(ctx)
if err != nil {
    panic(err)
}

if err := outbox.AddMessages(ctx, tx, "orders", orderMsgs); err != nil {
    panic(err)
}
if err := outbox.AddMessages(ctx, tx, "shipments", shipmentMsgs); err != nil {
    panic(err)
}

if err := tx.Commit(ctx); err != nil {
    panic(err)
}

// each topic is drained independently by its registered flusher
outbox.ProcessMessages(ctx, "orders")
outbox.ProcessMessages(ctx, "shipments")

Benchmarks

You can run benchmarks locally; for example, to write and flush 100k messages, you can run:

go test -bench=. -benchtime=100000x

On a local Macbook with an M3 Max core, this results in 8492 msgs/sec:

$ go test -bench=. -benchtime=100000x
goos: darwin
goarch: arm64
pkg: github.com/hatchet-dev/pgoutbox
cpu: Apple M3 Max
BenchmarkOutbox_WriteAndPublishThroughput-14              100000            117757 ns/op              8492 msgs/sec

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Migrate

func Migrate(ctx context.Context, pool *pgxpool.Pool, opts ...OutboxOpt) error

Migrate runs the embedded pgoutbox migrations against the given pool. It is the explicit alternative to NewOutbox's auto-migration: callers that want to control when DDL runs (separate startup phase, release pipeline, etc.) should construct the outbox with WithAutoMigrate(false) and invoke Migrate themselves.

Only WithSchema is consulted from opts; other options are accepted for API symmetry but ignored.

Types

type Flusher

type Flusher interface {
	Flush(ctx context.Context, msgs []*sqlc.Message) error
}

type MessageOpts

type MessageOpts struct {
	Payload []byte
}

type Outbox

type Outbox interface {
	AddFlusher(topic string, flusher Flusher)

	AddMessages(ctx context.Context, tx pgx.Tx, topic string, msgs []MessageOpts) error

	// ProcessMessages grabs a batch of messages for the given topic, flushes them using the registered Flusher for that
	// topic, and deletes them from the outbox if the flush is successful.
	ProcessMessages(ctx context.Context, topic string) ([]*sqlc.Message, error)
}

func NewOutbox

func NewOutbox(pool *pgxpool.Pool, fs ...OutboxOpt) (Outbox, error)

type OutboxOpt

type OutboxOpt func(*outboxImplOpts)

func WithAutoMigrate

func WithAutoMigrate(enabled bool) OutboxOpt

WithAutoMigrate controls whether NewOutbox runs the embedded migrations on construction. Defaults to true. Set to false when the caller wants to run migrations explicitly via Migrate (for example, in a separate startup phase or release pipeline).

func WithBatchSize

func WithBatchSize(n int) OutboxOpt

WithBatchSize sets the maximum number of messages ProcessMessages will acquire and hand to the Flusher per call. Must be > 0. Values are clamped to int32; sizes above math.MaxInt32 fall back to the default.

func WithSchema

func WithSchema(searchPath string) OutboxOpt

type TxFlusher

type TxFlusher interface {
	Flusher

	FlushWithTx(ctx context.Context, tx pgx.Tx, msgs []*sqlc.Message) error
}

TxFlusher is an optional extension of Flusher. When a registered flusher implements it, ProcessMessages calls FlushWithTx instead of Flush, handing over the same pgx.Tx it used to acquire the messages. This lets the flusher run its own writes in the same transaction that deletes the flushed messages, so the flush and the delete commit (or roll back) atomically.

Directories

Path Synopsis
internal
harness
Package harness provides reusable scaffolding for the pgoutbox e2e tests.
Package harness provides reusable scaffolding for the pgoutbox e2e tests.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL