dispatch

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package dispatch implements the transactional outbox dispatcher.

The dispatcher polls unpublished outbox rows, publishes them to a message broker via the Publisher interface, and marks each row published only after the broker acknowledges receipt. Delivery is at-least-once: if the process crashes between a successful broker publish and the MarkPublished write, the row will be claimed again on the next poll and re-published.

Consumer-side idempotency is assumed. The OutboxEvent.ID field is stable across retries and can serve as a deduplication key.

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidConfig = errors.New("dispatch: invalid configuration")

ErrInvalidConfig is returned by NewDispatcher when a configuration value violates a constraint.

View Source
var ErrNilArg = errors.New("dispatch: required argument is nil")

ErrNilArg is returned by NewDispatcher when a required argument is nil.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher polls the outbox for unpublished rows, publishes them to the configured broker, and marks each row published only after the broker acknowledges receipt.

Delivery semantics are at-least-once: if the process crashes between a successful broker publish and the MarkPublished write, the row will be re-claimed on the next poll and re-published. Consumer-side idempotency is assumed.

func NewDispatcher

func NewDispatcher(repo DispatcherRepo, publisher Publisher, cfg DispatcherConfig) (*Dispatcher, error)

NewDispatcher constructs a Dispatcher. All arguments must be non-nil and DispatcherConfig.BatchSize and DispatcherConfig.PollInterval must be positive.

func (*Dispatcher) Run

func (d *Dispatcher) Run(ctx context.Context)

Run starts the dispatch loop and blocks until ctx is cancelled. It returns when the context is done; any cleanup (e.g. broker connection close) is the caller's responsibility.

Run is safe to call from a goroutine. The typical pattern:

go dispatcher.Run(ctx)

type DispatcherConfig

type DispatcherConfig struct {
	// BatchSize is the maximum number of outbox rows claimed and processed per
	// poll cycle. Must be greater than zero.
	BatchSize int

	// PollInterval is the duration the dispatcher sleeps between poll cycles
	// when the queue is empty. Must be greater than zero.
	PollInterval time.Duration

	// MaxBackoff is the upper bound for the exponential sleep when consecutive
	// poll cycles encounter errors. The dispatcher resets to PollInterval on
	// any successful empty-queue or publish cycle.
	MaxBackoff time.Duration
}

DispatcherConfig controls the runtime behaviour of the Dispatcher.

func DefaultDispatcherConfig

func DefaultDispatcherConfig() DispatcherConfig

DefaultDispatcherConfig returns sensible production defaults.

type DispatcherRepo

type DispatcherRepo interface {
	// ClaimUnpublished returns up to limit unpublished outbox rows ordered by
	// created_at ASC, id ASC. The implementation uses SELECT FOR UPDATE SKIP
	// LOCKED inside a short-lived transaction to prevent concurrent dispatchers
	// from claiming the same rows simultaneously.
	//
	// Rows returned are not mutated by the claim; they remain unpublished in
	// the database until MarkPublished is called for each one.
	ClaimUnpublished(ctx context.Context, limit int) ([]*outbox.OutboxEvent, error)

	// MarkPublished sets published_at to now for the given event ID.
	// Returns an error if the event does not exist or the update fails.
	MarkPublished(ctx context.Context, id string) error
}

DispatcherRepo is the persistence interface required by the Dispatcher. It is a strict subset of outbox.Repository extended with claiming semantics.

type Header struct {
	Key   string
	Value []byte
}

Header is a single key/value pair attached to a Message.

type Message

type Message struct {
	// Topic is the logical destination (maps to a Kafka topic, SNS topic, etc.)
	Topic string

	// Key is the optional partition / ordering key. May be empty.
	Key []byte

	// Value is the JSON-encoded event payload. Always valid JSON.
	Value []byte

	// Headers carry event metadata without requiring payload deserialization.
	Headers []Header
}

Message is the broker-agnostic envelope that Publisher implementations receive. All fields are populated from the outbox row before publishing.

type Publisher

type Publisher interface {
	// Publish sends msg to the broker. Returns nil only when the broker has
	// acknowledged receipt. Any returned error means the message may or may
	// not have been delivered; the caller should treat the event as
	// unpublished and retry on the next poll cycle.
	Publish(ctx context.Context, msg Message) error
}

Publisher delivers a single message to a message broker and returns only after the broker has acknowledged receipt.

Implementations must be safe for concurrent use. Publish must be idempotent or the caller must accept that duplicate messages may be produced.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL