Documentation
¶
Overview ¶
Package outbox provides transactional outbox pattern for reliable event delivery.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SendEventTransactional ¶
func SendEventTransactional[T any]( ctx WorkflowContext, eventType string, eventSource string, data T, opts ...SendEventOption, ) error
SendEventTransactional sends an event through the transactional outbox. The event is stored in the database within the current transaction (if any) and will be delivered asynchronously by the outbox relayer.
This ensures that the event is only sent if the activity/transaction commits, providing exactly-once delivery guarantees when combined with idempotent consumers.
Types ¶
type EventSender ¶
type EventSender func(ctx context.Context, event *storage.OutboxEvent) error
EventSender is a function that sends an event to an external system. The default implementation uses CloudEvents HTTP client.
type Relayer ¶
type Relayer struct {
// contains filtered or unexported fields
}
Relayer handles background delivery of outbox events.
func NewRelayer ¶
func NewRelayer(s storage.Storage, config RelayerConfig) *Relayer
NewRelayer creates a new outbox relayer.
func (*Relayer) CleanupOldEvents ¶
CleanupOldEvents removes old sent events from the outbox.
func (*Relayer) RelayOnce ¶
RelayOnce processes pending events once (useful for testing). Returns the number of events processed and any error.
type RelayerConfig ¶
type RelayerConfig struct {
// TargetURL is the CloudEvents endpoint to send events to.
TargetURL string
// PollInterval is how often to check for pending events.
// Default: 1 second.
PollInterval time.Duration
// BatchSize is the maximum number of events to process per poll.
// Default: 100.
BatchSize int
// MaxRetries is the maximum number of delivery attempts.
// Default: 5.
MaxRetries int
// MaxBackoff is the maximum backoff duration when no events are pending.
// Default: 30 seconds.
MaxBackoff time.Duration
// CustomSender is an optional custom event sender.
// If nil, the default CloudEvents HTTP sender is used.
CustomSender EventSender
// WakeEvent is an optional channel to wake the relayer on NOTIFY events.
// When provided, the relayer will immediately check for pending events
// when a message is received on this channel.
WakeEvent <-chan struct{}
}
RelayerConfig configures the outbox relayer.
type SendEventOption ¶
type SendEventOption func(*SendEventOptions)
SendEventOption is a functional option for SendEvent.
func WithContentType ¶
func WithContentType(contentType string) SendEventOption
WithContentType sets the content type.
type SendEventOptions ¶
type SendEventOptions struct {
// EventID is the CloudEvents ID. If empty, a UUID will be generated.
EventID string
// ContentType is the MIME type of the data. Defaults to "application/json".
ContentType string
}
SendEventOptions configures event sending.
type WorkflowContext ¶
type WorkflowContext interface {
Context() interface{ Done() <-chan struct{} }
InstanceID() string
Storage() storage.Storage
}
WorkflowContext interface that we need for outbox operations. This avoids circular imports with the main romancy package.