outbox

package
v0.27.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package outbox provides a transactional outbox pattern for reliable event publishing.

The transactional outbox solves the dual-write problem in microservices: events are written to an outbox table in the SAME database transaction as business data, then reliably delivered to the message broker by a background relay.

This guarantees at-least-once delivery: events are never lost even if the broker is temporarily unavailable. Consumers MUST be idempotent — use the x-outbox-event-id header for deduplication.

Usage:

func (m *Module) Init(deps *app.ModuleDeps) error {
    m.outbox = deps.Outbox
    return nil
}

func (s *Service) CreateOrder(ctx context.Context, order Order) error {
    tx, err := db.Begin(ctx)
    if err != nil { return err }
    defer tx.Rollback(ctx)

    tx.Exec(ctx, "INSERT INTO orders ...", args...)
    s.outbox.Publish(ctx, tx, &app.OutboxEvent{
        EventType:   "order.created",
        AggregateID: "order-123",
        Payload:     payload,
        Exchange:    "order.events",
    })
    return tx.Commit(ctx)
}

Index

Constants

View Source
const (
	StatusPending   = "pending"
	StatusPublished = "published"
	StatusFailed    = "failed"
)

Event status constants.

View Source
const DefaultTableName = "gobricks_outbox"

DefaultTableName is the default outbox table name.

Variables

This section is empty.

Functions

This section is empty.

Types

type Cleanup

type Cleanup struct {
	// contains filtered or unexported fields
}

Cleanup is a scheduler.Executor that removes published events older than the configured retention period.

Runs daily at 04:00 by default (registered via scheduler.DailyAt).

func (*Cleanup) Execute

func (c *Cleanup) Execute(ctx scheduler.JobContext) error

type Module

type Module struct {
	// contains filtered or unexported fields
}

Module implements the GoBricks Module interface for transactional outbox. It provides reliable event publishing by writing events to a database table within the caller's transaction, then publishing them to the message broker via a background relay job.

The module is registered like any other GoBricks module:

fw.RegisterModules(
    scheduler.NewModule(),  // Required: relay runs as a scheduled job
    outbox.NewModule(),     // Outbox module
    &myapp.OrderModule{},
)

func NewModule

func NewModule() *Module

NewModule creates a new Module instance.

func (*Module) Init

func (m *Module) Init(deps *app.ModuleDeps) error

Init implements app.Module. Stores dependencies, creates the vendor-specific store, and initializes the publisher.

func (*Module) Name

func (m *Module) Name() string

Name implements app.Module.

func (*Module) OutboxPublisher

func (m *Module) OutboxPublisher() app.OutboxPublisher

OutboxPublisher implements app.OutboxProvider — returns the Publisher for ModuleDeps wiring.

func (*Module) RegisterJobs

func (m *Module) RegisterJobs(registrar app.JobRegistrar) error

RegisterJobs implements app.JobProvider. Registers the relay and cleanup jobs with the scheduler.

func (*Module) Shutdown

func (m *Module) Shutdown() error

Shutdown implements app.Module.

type Record

type Record struct {
	ID          string     // UUID, generated on insert
	EventType   string     // Event type for routing
	AggregateID string     // Aggregate identifier for correlation
	Payload     []byte     // JSON-encoded event payload
	Headers     []byte     // JSON-encoded AMQP headers (nullable)
	Exchange    string     // Target AMQP exchange
	RoutingKey  string     // AMQP routing key
	Status      string     // "pending", "published", or "failed"
	RetryCount  int        // Number of publish attempts
	Error       string     // Last error message (empty on success)
	CreatedAt   time.Time  // When the event was created
	PublishedAt *time.Time // When the event was successfully published (nil if pending)
}

Record represents a single row in the outbox table. Records are created by Publisher.Publish() and consumed by the relay job.

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

Relay is a scheduler.Executor that polls for pending outbox events and publishes them to the message broker via existing AMQP infrastructure.

The relay runs as a scheduled job (registered via scheduler.FixedRate), getting overlapping prevention, panic recovery, and OTel metrics for free.

func (*Relay) Execute

func (r *Relay) Execute(ctx scheduler.JobContext) error

type Store

type Store interface {
	// Insert writes an event row to the outbox table within the given transaction.
	Insert(ctx context.Context, tx dbtypes.Tx, record *Record) error

	// FetchPending retrieves up to batchSize unpublished events ordered by creation time.
	// Events with retry count exceeding maxRetries are skipped.
	FetchPending(ctx context.Context, db dbtypes.Interface, batchSize, maxRetries int) ([]Record, error)

	// MarkPublished updates the event status to published with a timestamp.
	MarkPublished(ctx context.Context, db dbtypes.Interface, eventID string) error

	// MarkFailed increments retry count and records the error.
	MarkFailed(ctx context.Context, db dbtypes.Interface, eventID, errMsg string) error

	// DeletePublished removes events that were published before the given time.
	// Returns the number of rows deleted.
	DeletePublished(ctx context.Context, db dbtypes.Interface, before time.Time) (int64, error)

	// CreateTable creates the outbox table if it does not exist.
	// Used for auto-migration when outbox.auto_create_table is true.
	CreateTable(ctx context.Context, db dbtypes.Interface) error
}

Store abstracts outbox table operations for vendor-agnostic SQL. Implementations exist for PostgreSQL and Oracle with vendor-specific placeholder styles and DDL.

func NewOracleStore

func NewOracleStore(tableName string) (Store, error)

NewOracleStore creates a new Oracle outbox store. Returns an error if the table name contains invalid identifier characters.

func NewPostgresStore

func NewPostgresStore(tableName string) (Store, error)

NewPostgresStore creates a new PostgreSQL outbox store. Returns an error if the table name contains invalid identifier characters.

Directories

Path Synopsis
Package testing provides test utilities for the transactional outbox pattern.
Package testing provides test utilities for the transactional outbox pattern.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL