outbox

package
v0.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunProcessedCleanup added in v0.11.1

func RunProcessedCleanup(
	ctx context.Context,
	store ProcessedStore,
	db any,
	retention, interval time.Duration,
)

RunProcessedCleanup blocks until ctx is canceled, periodically purging rows older than retention. Run as a goroutine.

A non-positive retention or interval disables the loop (returns immediately, after logging at V(1) so misconfiguration is observable). Errors from PurgeProcessedBefore are logged via logr and do not stop the loop.

Types

type Config

type Config struct {
	PollInterval time.Duration
	BatchSize    int
	// Retention is the age at which processed messages become eligible for
	// deletion. Zero (the default) disables cleanup.
	Retention time.Duration
	// CleanupInterval is how often the relay sweeps for expired messages
	// when Retention > 0. Defaults to one hour.
	CleanupInterval time.Duration
}

Config holds the configuration for the Relay.

type Message

type Message struct {
	ID          string
	Topic       string
	Key         string
	Payload     []byte
	Headers     map[string]string
	Status      MessageStatus
	Retries     int
	LastError   string
	CreatedAt   time.Time
	ProcessedAt time.Time
}

Message represents an outbox message.

type MessageStatus

type MessageStatus string

MessageStatus represents the status of an outbox message.

const (
	StatusPending    MessageStatus = "pending"
	StatusProcessing MessageStatus = "processing"
	StatusProcessed  MessageStatus = "processed"
	StatusFailed     MessageStatus = "failed"
)

type Option

type Option interface {
	Apply(*Config)
}

An Option configures a Relay instance.

func WithBatchSize

func WithBatchSize(n int) Option

WithBatchSize sets the maximum number of messages fetched per poll cycle.

func WithCleanupInterval added in v0.11.1

func WithCleanupInterval(d time.Duration) Option

WithCleanupInterval sets how often the relay deletes expired processed messages when retention is enabled. A non-positive value is ignored.

func WithPollInterval

func WithPollInterval(d time.Duration) Option

WithPollInterval sets how often the relay polls for pending messages.

func WithRetention added in v0.11.1

func WithRetention(d time.Duration) Option

WithRetention enables periodic deletion of processed messages older than d. A non-positive value disables cleanup.

type OptionFunc

type OptionFunc func(*Config)

OptionFunc is a function that configures a Relay config.

func (OptionFunc) Apply

func (f OptionFunc) Apply(config *Config)

Apply calls f(config).

type ProcessedStore added in v0.11.1

type ProcessedStore interface {
	// IsProcessed reports whether (group, msgID) is already recorded.
	IsProcessed(ctx context.Context, db any, group, msgID string) (bool, error)

	// MarkProcessed records (group, msgID). Returns isNew=true on first
	// insert; false if a row already existed (treat as "skip — already
	// done"). Concurrent inserts are absorbed via ON CONFLICT DO NOTHING.
	//
	// metadata may be nil. When non-nil it must be valid JSON; the
	// implementation validates and returns an error otherwise. Free-form
	// so callers can attach handler-specific context (printed printer
	// names, snapshot of config, audit trail) without further migrations.
	MarkProcessed(ctx context.Context, db any, group, msgID string, metadata json.RawMessage) (isNew bool, err error)

	// PurgeProcessedBefore deletes rows whose processed_at is strictly
	// before t. Match the outbox retention so the dedup window stays at
	// least as wide as the message redelivery window.
	PurgeProcessedBefore(ctx context.Context, db any, t time.Time) (int64, error)
}

ProcessedStore is the consumer-side mirror of Store: at-least-once stream delivery means a handler can run twice on the same message; ProcessedStore records (group, message_id) so handlers short-circuit on redelivery.

db is driver-specific: pass *sql.DB / *sql.Tx for the database/sql backends, pgx.Tx / pgxpool.Pool for a pgx backend. Each implementation type-asserts the supported handles. Pass a transaction handle to compose IsProcessed and MarkProcessed inside a domain transaction (transactional exactly-once); pass the pool/DB handle for read-only checks and the cleanup loop.

Pattern for transactional exactly-once:

tx, _ := db.BeginTx(ctx, nil)
defer tx.Rollback()
isNew, err := store.MarkProcessed(ctx, tx, group, msg.ID, nil)
if err != nil { return err }
if !isNew { return nil }
// ... domain ops in tx ...
return tx.Commit()

On rollback the mark is released; on commit it persists with the domain writes atomically.

type Publisher

type Publisher interface {
	Publish(ctx context.Context, msgs ...Message) error
}

Publisher is the interface for publishing outbox messages.

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

Relay polls the Store for pending messages and forwards them to a Publisher.

func NewRelay

func NewRelay(store Store, pub Publisher, options ...Option) *Relay

NewRelay creates a new Relay.

func (*Relay) Start

func (r *Relay) Start(ctx context.Context)

Start begins polling for pending messages. Safe to call multiple times. When Retention is set, a background cleanup goroutine is also started.

func (*Relay) Stop

func (r *Relay) Stop()

Stop cancels the relay and waits for it to finish.

type Store

type Store interface {
	// Save persists messages within the given transaction.
	// tx can be pgx.Tx, *sql.Tx, or nil (Store creates its own transaction).
	Save(ctx context.Context, tx any, msgs ...Message) error

	// FetchPending atomically claims up to limit pending messages for processing.
	FetchPending(ctx context.Context, limit int) ([]Message, error)

	// MarkProcessed marks messages as successfully processed.
	MarkProcessed(ctx context.Context, ids ...string) error

	// MarkFailed increments retries and records the error.
	// Sets status to failed when max retries reached, else back to pending.
	MarkFailed(ctx context.Context, id string, err error) error

	// DeleteProcessedBefore removes messages with status 'processed' whose
	// processed_at is strictly before t. Returns the number of rows deleted.
	DeleteProcessedBefore(ctx context.Context, t time.Time) (int64, error)
}

Store is the interface for outbox message persistence.

Directories

Path Synopsis
Package migrate dispatches outbox-table migrations to the right dialect based on a driver name string.
Package migrate dispatches outbox-table migrations to the right dialect based on a driver name string.
sql
Package sql provides shared building blocks for database/sql-based outbox.Store implementations.
Package sql provides shared building blocks for database/sql-based outbox.Store implementations.
postgres
Package postgres provides a PostgreSQL implementation of outbox.Store using database/sql.
Package postgres provides a PostgreSQL implementation of outbox.Store using database/sql.
sqlite
Package sqlite provides a SQLite implementation of outbox.Store using database/sql.
Package sqlite provides a SQLite implementation of outbox.Store using database/sql.
Package streambridge adapts an outbox.Publisher to a libs stream.Service, so the libs Relay can drain the outbox into a stream broker without any project-specific glue.
Package streambridge adapts an outbox.Publisher to a libs stream.Service, so the libs Relay can drain the outbox into a stream broker without any project-specific glue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL