outbox

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2025 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package outbox provides transactional outbox pattern for reliable event delivery.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SendEventTransactional

func SendEventTransactional[T any](
	ctx WorkflowContext,
	eventType string,
	eventSource string,
	data T,
	opts ...SendEventOption,
) error

SendEventTransactional sends an event through the transactional outbox. The event is stored in the database within the current transaction (if any) and will be delivered asynchronously by the outbox relayer.

This ensures that the event is only sent if the activity/transaction commits, providing exactly-once delivery guarantees when combined with idempotent consumers.

Types

type EventSender

type EventSender func(ctx context.Context, event *storage.OutboxEvent) error

EventSender is a function that sends an event to an external system. The default implementation uses CloudEvents HTTP client.

type Relayer

type Relayer struct {
	// contains filtered or unexported fields
}

Relayer handles background delivery of outbox events.

func NewRelayer

func NewRelayer(s storage.Storage, config RelayerConfig) *Relayer

NewRelayer creates a new outbox relayer.

func (*Relayer) CleanupOldEvents

func (r *Relayer) CleanupOldEvents(ctx context.Context, olderThan time.Duration) error

CleanupOldEvents removes old sent events from the outbox.

func (*Relayer) RelayOnce

func (r *Relayer) RelayOnce(ctx context.Context) (int, error)

RelayOnce processes pending events once (useful for testing). Returns the number of events processed and any error.

func (*Relayer) Start

func (r *Relayer) Start(ctx context.Context)

Start starts the background relayer.

func (*Relayer) Stop

func (r *Relayer) Stop()

Stop stops the background relayer gracefully.

type RelayerConfig

type RelayerConfig struct {
	// TargetURL is the CloudEvents endpoint to send events to.
	TargetURL string
	// PollInterval is how often to check for pending events.
	// Default: 1 second.
	PollInterval time.Duration
	// BatchSize is the maximum number of events to process per poll.
	// Default: 100.
	BatchSize int
	// MaxRetries is the maximum number of delivery attempts.
	// Default: 5.
	MaxRetries int
	// MaxBackoff is the maximum backoff duration when no events are pending.
	// Default: 30 seconds.
	MaxBackoff time.Duration
	// CustomSender is an optional custom event sender.
	// If nil, the default CloudEvents HTTP sender is used.
	CustomSender EventSender
	// WakeEvent is an optional channel to wake the relayer on NOTIFY events.
	// When provided, the relayer will immediately check for pending events
	// when a message is received on this channel.
	WakeEvent <-chan struct{}
}

RelayerConfig configures the outbox relayer.

type SendEventOption

type SendEventOption func(*SendEventOptions)

SendEventOption is a functional option for SendEvent.

func WithContentType

func WithContentType(contentType string) SendEventOption

WithContentType sets the content type.

func WithEventID

func WithEventID(id string) SendEventOption

WithEventID sets a custom event ID.

type SendEventOptions

type SendEventOptions struct {
	// EventID is the CloudEvents ID. If empty, a UUID will be generated.
	EventID string
	// ContentType is the MIME type of the data. Defaults to "application/json".
	ContentType string
}

SendEventOptions configures event sending.

type WorkflowContext

type WorkflowContext interface {
	Context() interface{ Done() <-chan struct{} }
	InstanceID() string
	Storage() storage.Storage
}

WorkflowContext interface that we need for outbox operations. This avoids circular imports with the main romancy package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL