Documentation
¶
Overview ¶
Package outbox provides a transactional outbox pattern for reliable event publishing.
The transactional outbox solves the dual-write problem in microservices: events are written to an outbox table in the SAME database transaction as business data, then reliably delivered to the message broker by a background relay.
This guarantees at-least-once delivery: events are never lost even if the broker is temporarily unavailable. Consumers MUST be idempotent — use the x-outbox-event-id header for deduplication.
Usage:
func (m *Module) Init(deps *app.ModuleDeps) error {
m.outbox = deps.Outbox
return nil
}
func (s *Service) CreateOrder(ctx context.Context, order Order) error {
tx, err := db.Begin(ctx)
if err != nil { return err }
defer tx.Rollback(ctx)
tx.Exec(ctx, "INSERT INTO orders ...", args...)
s.outbox.Publish(ctx, tx, &app.OutboxEvent{
EventType: "order.created",
AggregateID: "order-123",
Payload: payload,
Exchange: "order.events",
})
return tx.Commit(ctx)
}
Index ¶
Constants ¶
const ( StatusPending = "pending" StatusPublished = "published" StatusFailed = "failed" )
Event status constants.
const DefaultTableName = "gobricks_outbox"
DefaultTableName is the default outbox table name.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Cleanup ¶
type Cleanup struct {
// contains filtered or unexported fields
}
Cleanup is a scheduler.Executor that removes published events older than the configured retention period.
Runs daily at 04:00 by default (registered via scheduler.DailyAt).
type Module ¶
type Module struct {
// contains filtered or unexported fields
}
Module implements the GoBricks Module interface for transactional outbox. It provides reliable event publishing by writing events to a database table within the caller's transaction, then publishing them to the message broker via a background relay job.
The module is registered like any other GoBricks module:
fw.RegisterModules(
scheduler.NewModule(), // Required: relay runs as a scheduled job
outbox.NewModule(), // Outbox module
&myapp.OrderModule{},
)
func (*Module) Init ¶
func (m *Module) Init(deps *app.ModuleDeps) error
Init implements app.Module. Stores dependencies, creates the vendor-specific store, and initializes the publisher.
func (*Module) OutboxPublisher ¶
func (m *Module) OutboxPublisher() app.OutboxPublisher
OutboxPublisher implements app.OutboxProvider — returns the Publisher for ModuleDeps wiring.
func (*Module) RegisterJobs ¶
func (m *Module) RegisterJobs(registrar app.JobRegistrar) error
RegisterJobs implements app.JobProvider. Registers the relay and cleanup jobs with the scheduler.
type Record ¶
type Record struct {
ID string // UUID, generated on insert
EventType string // Event type for routing
AggregateID string // Aggregate identifier for correlation
Payload []byte // JSON-encoded event payload
Headers []byte // JSON-encoded AMQP headers (nullable)
Exchange string // Target AMQP exchange
RoutingKey string // AMQP routing key
Status string // "pending", "published", or "failed"
RetryCount int // Number of publish attempts
Error string // Last error message (empty on success)
CreatedAt time.Time // When the event was created
PublishedAt *time.Time // When the event was successfully published (nil if pending)
}
Record represents a single row in the outbox table. Records are created by Publisher.Publish() and consumed by the relay job.
type Relay ¶
type Relay struct {
// contains filtered or unexported fields
}
Relay is a scheduler.Executor that polls for pending outbox events and publishes them to the message broker via existing AMQP infrastructure.
The relay runs as a scheduled job (registered via scheduler.FixedRate), getting overlapping prevention, panic recovery, and OTel metrics for free.
type Store ¶
type Store interface {
// Insert writes an event row to the outbox table within the given transaction.
Insert(ctx context.Context, tx dbtypes.Tx, record *Record) error
// FetchPending retrieves up to batchSize unpublished events ordered by creation time.
// Events with retry count exceeding maxRetries are skipped.
FetchPending(ctx context.Context, db dbtypes.Interface, batchSize, maxRetries int) ([]Record, error)
// MarkPublished updates the event status to published with a timestamp.
MarkPublished(ctx context.Context, db dbtypes.Interface, eventID string) error
// MarkFailed increments retry count and records the error.
MarkFailed(ctx context.Context, db dbtypes.Interface, eventID, errMsg string) error
// DeletePublished removes events that were published before the given time.
// Returns the number of rows deleted.
DeletePublished(ctx context.Context, db dbtypes.Interface, before time.Time) (int64, error)
// CreateTable creates the outbox table if it does not exist.
// Used for auto-migration when outbox.auto_create_table is true.
CreateTable(ctx context.Context, db dbtypes.Interface) error
}
Store abstracts outbox table operations for vendor-agnostic SQL. Implementations exist for PostgreSQL and Oracle with vendor-specific placeholder styles and DDL.
func NewOracleStore ¶
NewOracleStore creates a new Oracle outbox store. Returns an error if the table name contains invalid identifier characters.
func NewPostgresStore ¶
NewPostgresStore creates a new PostgreSQL outbox store. Returns an error if the table name contains invalid identifier characters.