Documentation
¶
Index ¶
Constants ¶
const HeaderDestination = "destination"
HeaderDestination is the conventional Message.Headers key used by WithHeaders/WithDestination to select a destination publisher. Producers set this header at write time so the router can dispatch without inspecting the topic name.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Message ¶
type Message struct {
ID string
Topic string
Key string
Payload []byte
Headers map[string]string
Status MessageStatus
Retries int
LastError string
CreatedAt time.Time
ProcessedAt time.Time
}
Message represents an outbox message.
type MessageStatus ¶
type MessageStatus string
MessageStatus represents the status of an outbox message.
const ( StatusPending MessageStatus = "pending" StatusProcessing MessageStatus = "processing" StatusProcessed MessageStatus = "processed" StatusFailed MessageStatus = "failed" )
type Option ¶
type Option interface {
Apply(*Config)
}
An Option configures a Relay instance.
func WithBatchSize ¶
WithBatchSize sets the maximum number of messages fetched per poll cycle.
func WithPollInterval ¶
WithPollInterval sets how often the relay polls for pending messages.
type OptionFunc ¶
type OptionFunc func(*Config)
OptionFunc is a function that configures a Relay config.
type PubSubAdapter ¶
type PubSubAdapter struct {
// contains filtered or unexported fields
}
PubSubAdapter wraps a pubsub.Publisher to satisfy the outbox.Publisher interface.
func NewPubSubAdapter ¶
func NewPubSubAdapter(p pubsub.Publisher, opts ...pubsub.PublishOption) *PubSubAdapter
NewPubSubAdapter creates a new PubSubAdapter.
type QueueAdapter ¶ added in v0.9.1
type QueueAdapter struct {
// contains filtered or unexported fields
}
QueueAdapter wraps a queue.Enqueuer to satisfy the outbox.Publisher interface. Use this when you want the outbox relay to deliver messages as work items processed by exactly one consumer (competing consumers), instead of broadcasting them to all subscribers via pubsub.
func NewQueueAdapter ¶ added in v0.9.1
func NewQueueAdapter(e queue.Enqueuer, opts ...queue.EnqueueOption) *QueueAdapter
NewQueueAdapter creates a new QueueAdapter. opts are applied to every Enqueue call.
type Relay ¶
type Relay struct {
// contains filtered or unexported fields
}
Relay polls the Store for pending messages and forwards them to a Publisher.
type RouteFunc ¶ added in v0.9.1
RouteFunc selects a Publisher for a given outbox Message. Return nil to indicate "no match" — the router will try the next configured rule, or the fallback if no rule matches.
type RouterOption ¶ added in v0.9.1
type RouterOption interface {
Apply(*RouterPublisher)
}
RouterOption configures a RouterPublisher.
func WithDestination ¶ added in v0.9.1
func WithDestination(byName map[string]Publisher) RouterOption
WithDestination is a shorthand for WithHeaders(HeaderDestination, byName). This is the recommended pattern for multi-destination outboxes: producers set Headers["destination"] at write time and the router dispatches by looking it up in the supplied map.
func WithFallback ¶ added in v0.9.1
func WithFallback(p Publisher) RouterOption
WithFallback sets the Publisher used when no rule matches. Without a fallback, unmatched messages cause Publish to return an error and the relay will retry them on the next poll.
func WithHeaders ¶ added in v0.9.1
func WithHeaders(key string, byName map[string]Publisher) RouterOption
WithHeaders appends a rule that dispatches based on a Message Header value. The router looks up m.Headers[key] in byName and returns the matching Publisher; on miss the rule returns nil so the next rule (or fallback) is tried.
func WithRoute ¶ added in v0.9.1
func WithRoute(f RouteFunc) RouterOption
WithRoute appends an arbitrary RouteFunc. Use this for custom dispatch logic the built-in helpers do not cover.
type RouterOptionFunc ¶ added in v0.9.1
type RouterOptionFunc func(*RouterPublisher)
RouterOptionFunc adapts a function to RouterOption.
func (RouterOptionFunc) Apply ¶ added in v0.9.1
func (f RouterOptionFunc) Apply(r *RouterPublisher)
Apply calls f(r).
type RouterPublisher ¶ added in v0.9.1
type RouterPublisher struct {
// contains filtered or unexported fields
}
RouterPublisher dispatches each outbox message to a Publisher chosen by caller-supplied rules. Rules are evaluated in registration order; the first rule whose RouteFunc returns a non-nil Publisher wins. If no rule matches, the fallback is used; if no fallback is configured, Publish returns an error so misrouted messages stay in the outbox for investigation rather than being silently dropped.
Use NewRouter with functional options to compose:
router := outbox.NewRouter(
outbox.WithDestination(map[string]outbox.Publisher{
"queue": workQueueAdapter,
}),
outbox.WithFallback(eventsAdapter),
)
func NewRouter ¶ added in v0.9.1
func NewRouter(opts ...RouterOption) *RouterPublisher
NewRouter creates a RouterPublisher from the given options. With no options the router rejects every message; supply at least one rule (WithRoute / WithHeaders / WithDestination) and/or a fallback (WithFallback).
func (*RouterPublisher) Publish ¶ added in v0.9.1
func (r *RouterPublisher) Publish(ctx context.Context, msgs ...Message) error
Publish dispatches each message individually so a single batch can land in multiple backends. The first dispatch error is returned and stops the loop; already-published messages are not rolled back. The relay's per-message MarkProcessed/MarkFailed bookkeeping (see Relay.poll) handles partial success at the message level — failed messages stay in the outbox for retry on the next poll.
type Store ¶
type Store interface {
// Save persists messages within the given transaction.
// tx can be pgx.Tx, *sql.Tx, or nil (Store creates its own transaction).
Save(ctx context.Context, tx any, msgs ...Message) error
// FetchPending atomically claims up to limit pending messages for processing.
FetchPending(ctx context.Context, limit int) ([]Message, error)
// MarkProcessed marks messages as successfully processed.
MarkProcessed(ctx context.Context, ids ...string) error
// MarkFailed increments retries and records the error.
// Sets status to failed when max retries reached, else back to pending.
MarkFailed(ctx context.Context, id string, err error) error
}
Store is the interface for outbox message persistence.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package sql provides shared building blocks for database/sql-based outbox.Store implementations.
|
Package sql provides shared building blocks for database/sql-based outbox.Store implementations. |
|
postgres
Package postgres provides a PostgreSQL implementation of outbox.Store using database/sql.
|
Package postgres provides a PostgreSQL implementation of outbox.Store using database/sql. |
|
sqlite
Package sqlite provides a SQLite implementation of outbox.Store using database/sql.
|
Package sqlite provides a SQLite implementation of outbox.Store using database/sql. |