Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RunProcessedCleanup ¶ added in v0.11.1
func RunProcessedCleanup( ctx context.Context, store ProcessedStore, db any, retention, interval time.Duration, )
RunProcessedCleanup blocks until ctx is canceled, periodically purging rows older than retention. Run as a goroutine.
A non-positive retention or interval disables the loop (returns immediately, after logging at V(1) so misconfiguration is observable). Errors from PurgeProcessedBefore are logged via logr and do not stop the loop.
Types ¶
type Config ¶
type Config struct {
PollInterval time.Duration
BatchSize int
// Retention is the age at which processed messages become eligible for
// deletion. Zero (the default) disables cleanup.
Retention time.Duration
// CleanupInterval is how often the relay sweeps for expired messages
// when Retention > 0. Defaults to one hour.
CleanupInterval time.Duration
}
Config holds the configuration for the Relay.
type Message ¶
type Message struct {
ID string
Topic string
Key string
Payload []byte
Headers map[string]string
Status MessageStatus
Retries int
LastError string
CreatedAt time.Time
ProcessedAt time.Time
}
Message represents an outbox message.
type MessageStatus ¶
type MessageStatus string
MessageStatus represents the status of an outbox message.
const ( StatusPending MessageStatus = "pending" StatusProcessing MessageStatus = "processing" StatusProcessed MessageStatus = "processed" StatusFailed MessageStatus = "failed" )
type Option ¶
type Option interface {
Apply(*Config)
}
An Option configures a Relay instance.
func WithBatchSize ¶
WithBatchSize sets the maximum number of messages fetched per poll cycle.
func WithCleanupInterval ¶ added in v0.11.1
WithCleanupInterval sets how often the relay deletes expired processed messages when retention is enabled. A non-positive value is ignored.
func WithPollInterval ¶
WithPollInterval sets how often the relay polls for pending messages.
func WithRetention ¶ added in v0.11.1
WithRetention enables periodic deletion of processed messages older than d. A non-positive value disables cleanup.
type OptionFunc ¶
type OptionFunc func(*Config)
OptionFunc is a function that configures a Relay config.
type ProcessedStore ¶ added in v0.11.1
type ProcessedStore interface {
// IsProcessed reports whether (group, msgID) is already recorded.
IsProcessed(ctx context.Context, db any, group, msgID string) (bool, error)
// MarkProcessed records (group, msgID). Returns isNew=true on first
// insert; false if a row already existed (treat as "skip — already
// done"). Concurrent inserts are absorbed via ON CONFLICT DO NOTHING.
//
// metadata may be nil. When non-nil it must be valid JSON; the
// implementation validates and returns an error otherwise. Free-form
// so callers can attach handler-specific context (printed printer
// names, snapshot of config, audit trail) without further migrations.
MarkProcessed(ctx context.Context, db any, group, msgID string, metadata json.RawMessage) (isNew bool, err error)
// PurgeProcessedBefore deletes rows whose processed_at is strictly
// before t. Match the outbox retention so the dedup window stays at
// least as wide as the message redelivery window.
PurgeProcessedBefore(ctx context.Context, db any, t time.Time) (int64, error)
}
ProcessedStore is the consumer-side mirror of Store: at-least-once stream delivery means a handler can run twice on the same message; ProcessedStore records (group, message_id) so handlers short-circuit on redelivery.
db is driver-specific: pass *sql.DB / *sql.Tx for the database/sql backends, pgx.Tx / pgxpool.Pool for a pgx backend. Each implementation type-asserts the supported handles. Pass a transaction handle to compose IsProcessed and MarkProcessed inside a domain transaction (transactional exactly-once); pass the pool/DB handle for read-only checks and the cleanup loop.
Pattern for transactional exactly-once:
tx, _ := db.BeginTx(ctx, nil)
defer tx.Rollback()
isNew, err := store.MarkProcessed(ctx, tx, group, msg.ID, nil)
if err != nil { return err }
if !isNew { return nil }
// ... domain ops in tx ...
return tx.Commit()
On rollback the mark is released; on commit it persists with the domain writes atomically.
type Relay ¶
type Relay struct {
// contains filtered or unexported fields
}
Relay polls the Store for pending messages and forwards them to a Publisher.
type Store ¶
type Store interface {
// Save persists messages within the given transaction.
// tx can be pgx.Tx, *sql.Tx, or nil (Store creates its own transaction).
Save(ctx context.Context, tx any, msgs ...Message) error
// FetchPending atomically claims up to limit pending messages for processing.
FetchPending(ctx context.Context, limit int) ([]Message, error)
// MarkProcessed marks messages as successfully processed.
MarkProcessed(ctx context.Context, ids ...string) error
// MarkFailed increments retries and records the error.
// Sets status to failed when max retries reached, else back to pending.
MarkFailed(ctx context.Context, id string, err error) error
// DeleteProcessedBefore removes messages with status 'processed' whose
// processed_at is strictly before t. Returns the number of rows deleted.
DeleteProcessedBefore(ctx context.Context, t time.Time) (int64, error)
}
Store is the interface for outbox message persistence.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package migrate dispatches outbox-table migrations to the right dialect based on a driver name string.
|
Package migrate dispatches outbox-table migrations to the right dialect based on a driver name string. |
|
Package sql provides shared building blocks for database/sql-based outbox.Store implementations.
|
Package sql provides shared building blocks for database/sql-based outbox.Store implementations. |
|
postgres
Package postgres provides a PostgreSQL implementation of outbox.Store using database/sql.
|
Package postgres provides a PostgreSQL implementation of outbox.Store using database/sql. |
|
sqlite
Package sqlite provides a SQLite implementation of outbox.Store using database/sql.
|
Package sqlite provides a SQLite implementation of outbox.Store using database/sql. |
|
Package streambridge adapts an outbox.Publisher to a libs stream.Service, so the libs Relay can drain the outbox into a stream broker without any project-specific glue.
|
Package streambridge adapts an outbox.Publisher to a libs stream.Service, so the libs Relay can drain the outbox into a stream broker without any project-specific glue. |