outbox

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	PollInterval time.Duration
	BatchSize    int
}

Config holds the configuration for the Relay.

type Message

type Message struct {
	ID          string
	Topic       string
	Key         string
	Payload     []byte
	Headers     map[string]string
	Status      MessageStatus
	Retries     int
	LastError   string
	CreatedAt   time.Time
	ProcessedAt time.Time
}

Message represents an outbox message.

type MessageStatus

type MessageStatus string

MessageStatus represents the status of an outbox message.

const (
	StatusPending    MessageStatus = "pending"
	StatusProcessing MessageStatus = "processing"
	StatusProcessed  MessageStatus = "processed"
	StatusFailed     MessageStatus = "failed"
)

type Option

type Option interface {
	Apply(*Config)
}

An Option configures a Relay instance.

func WithBatchSize

func WithBatchSize(n int) Option

WithBatchSize sets the maximum number of messages fetched per poll cycle.

func WithPollInterval

func WithPollInterval(d time.Duration) Option

WithPollInterval sets how often the relay polls for pending messages.

type OptionFunc

type OptionFunc func(*Config)

OptionFunc is a function that configures a Relay config.

func (OptionFunc) Apply

func (f OptionFunc) Apply(config *Config)

Apply calls f(config).

type PubSubAdapter

type PubSubAdapter struct {
	// contains filtered or unexported fields
}

PubSubAdapter wraps a pubsub.Publisher to satisfy the outbox.Publisher interface.

func NewPubSubAdapter

func NewPubSubAdapter(p pubsub.Publisher, opts ...pubsub.PublishOption) *PubSubAdapter

NewPubSubAdapter creates a new PubSubAdapter.

func (*PubSubAdapter) Publish

func (a *PubSubAdapter) Publish(ctx context.Context, msgs ...Message) error

Publish forwards outbox messages to the underlying pubsub.Publisher.

type Publisher

type Publisher interface {
	Publish(ctx context.Context, msgs ...Message) error
}

Publisher is the interface for publishing outbox messages.

type Relay

type Relay struct {
	// contains filtered or unexported fields
}

Relay polls the Store for pending messages and forwards them to a Publisher.

func NewRelay

func NewRelay(store Store, pub Publisher, options ...Option) *Relay

NewRelay creates a new Relay.

func (*Relay) Start

func (r *Relay) Start(ctx context.Context)

Start begins polling for pending messages. Safe to call multiple times.

func (*Relay) Stop

func (r *Relay) Stop()

Stop cancels the relay and waits for it to finish.

type Store

type Store interface {
	// Save persists messages within the given transaction.
	// tx can be pgx.Tx, *sql.Tx, or nil (Store creates its own transaction).
	Save(ctx context.Context, tx any, msgs ...Message) error

	// FetchPending atomically claims up to limit pending messages for processing.
	FetchPending(ctx context.Context, limit int) ([]Message, error)

	// MarkProcessed marks messages as successfully processed.
	MarkProcessed(ctx context.Context, ids ...string) error

	// MarkFailed increments retries and records the error.
	// Sets status to failed when max retries reached, else back to pending.
	MarkFailed(ctx context.Context, id string, err error) error
}

Store is the interface for outbox message persistence.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL