relay

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

Package relay contains the core domain logic and orchestration for the Outbox Relay.

This package serves as the heart of the system, defining the fundamental interfaces and data models that allow the relay to remain agnostic of specific database or message broker implementations.

The primary component is the Engine, which coordinates the lifecycle of an event: claiming it from Storage, attempting delivery via a Publisher, and handling failures through a RetryPolicy.

Core Abstractions:

  • Engine: Orchestrates the polling loop and background maintenance tasks.
  • Storage: Defines how events are persisted, claimed, and updated in the database.
  • Publisher: Defines how events are delivered to external systems.
  • Event: The central data structure carrying the payload and delivery metadata.

By adhering to the Transactional Outbox pattern, this package ensures reliable message delivery with at-least-once guarantees.

Index

Constants

This section is empty.

Variables

View Source
var ErrPublisherPaused = errors.New("publisher is paused")

ErrPublisherPaused is returned when the engine cannot proceed because the publisher (e.g., Kafka, NATS, Redis) is currently unreachable or down.

Functions

func CreateNoopTelemetry

func CreateNoopTelemetry() (telemetry.Telemetry, error)

CreateNoopTelemetry initializes a Telemetry container with discarded logs, no-op metrics, and no-op traces. Useful for isolating logic in unit tests.

Types

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine coordinates the movement of events from Storage to Publisher. It manages the polling loop, background maintenance tasks like lease reaping, and ensures that events are processed according to the configured batching and retry policies.

func NewEngine

func NewEngine(
	storage Storage,
	publisher Publisher,
	params EngineParams,
	tel telemetry.Telemetry,
) (*Engine, error)

NewEngine initializes and returns a new Engine instance. It sets up the internal state, pre-allocates memory buffers for batching, and ensures that a unique RelayID is assigned if one is not provided in the parameters.

func (*Engine) Start

func (e *Engine) Start(ctx context.Context) error

Start initiates the relay's operational loops in the background. It launches three concurrent processes: 1. A metrics watcher that periodically updates backlog statistics. 2. A lease reaper that recovers "stuck" events from crashed instances. 3. The main event processing loop that moves messages from storage to the publisher. It blocks until the context is cancelled or a critical error occurs.

func (*Engine) Stop

func (e *Engine) Stop(ctx context.Context) error

Stop performs a graceful shutdown of the Engine. It closes the underlying storage and publisher connections to ensure no data loss.

type EngineParams

type EngineParams struct {
	RelayID                       string
	Interval                      time.Duration
	BatchSize                     int
	LeaseTimeout                  time.Duration
	ReapBatchSize                 int
	PublisherConnectRetryInterval time.Duration
	HealthCheckInterval           time.Duration
	RetryPolicy                   RetryPolicy
	EnableBatchPublish            bool
	EnableStats                   bool
}

EngineParams handles the tuning and identity. It encapsulates all the operational parameters required to initialize and configure the relay engine's behavior.

type Event

type Event struct {
	// ID is the unique identifier for the event (UUID v4/v7 recommended).
	ID uuid.UUID `db:"event_id"      json:"id"`
	// Type defines the subject/topic the message should be published to.
	Type string `db:"event_type"    json:"type"`
	// PartitionKey is used for load balancing on the broker side if supported
	PartitionKey *string `db:"partition_key" json:"partition_key"`
	// Payload is the raw message body.
	Payload []byte `db:"payload"       json:"payload"`
	// Headers is a JSON blob containing custom message attributes/headers.
	Headers json.RawMessage `db:"headers"       json:"headers"`

	// Attempts tracks the number of times this event has been tried for delivery.
	Attempts int `db:"attempts" json:"attempts"`

	// CreatedAt is the timestamp when the event was first inserted into the database.
	CreatedAt time.Time `db:"created_at" json:"created_at"`
}

Event represents a single unit of work from the outbox table. It contains the message payload, metadata for routing, and delivery tracking information.

func (Event) GetPartitionKey

func (e Event) GetPartitionKey() string

GetPartitionKey safely dereferences the optional PartitionKey. It returns the value if present, or an empty string if the value was NULL in the database. This prevents nil-pointer panics when publishers access the key.

type EventStatus

type EventStatus string

EventStatus represents the lifecycle stage of an event in the outbox.

const (
	// EventStatusPending indicates the event is ready to be picked up by a relay instance.
	EventStatusPending EventStatus = "PENDING"
	// EventStatusDelivering indicates the event is currently locked and being
	// processed by a relay instance.
	EventStatusDelivering EventStatus = "DELIVERING"
	// EventStatusDelivered indicates the event was successfully published to the message broker.
	EventStatusDelivered EventStatus = "DELIVERED"
	// EventStatusDead indicates the event failed delivery attempts beyond the
	// retry limit and is quarantined.
	EventStatusDead EventStatus = "DEAD"
)

type ExponentialBackoff

type ExponentialBackoff struct {
	// MaxAttempts is the hard limit for delivery attempts.
	MaxAttempts int
	// BaseDelay is the initial backoff duration (e.g., 1s).
	BaseDelay time.Duration
	// MaxDelay is the maximum duration any single backoff can reach.
	MaxDelay time.Duration
	// Jitter is a factor (0.0 to 1.0) used to randomize the backoff interval.
	Jitter float64
}

ExponentialBackoff implements a binary exponential backoff strategy with randomized jitter. This is the recommended policy for high-throughput production systems to avoid overwhelming downstream brokers after a failure.

func (ExponentialBackoff) NextBackoff

func (p ExponentialBackoff) NextBackoff(attempts int) (time.Duration, bool)

NextBackoff calculates the next delay using the formula: BaseDelay * 2^(attempts-1). It ensures the delay does not exceed MaxDelay and applies a random jitter to stagger retries across multiple relay instances.

type FailedEvent

type FailedEvent struct {
	// ID of the event that failed.
	ID uuid.UUID `json:"id"           db:"id"`
	// NewStatus is the state the event should transition to (PENDING or DEAD).
	NewStatus EventStatus `json:"new_status"   db:"status"`
	// AvailableAt is the time when the event becomes eligible for retry.
	AvailableAt time.Time `json:"available_at" db:"available_at"`
	// Attempts is the incremented count of delivery tries.
	Attempts int `json:"attempts"     db:"attempts"`
	// LastError captures the error message from the publisher for diagnostics.
	LastError string `json:"last_error"   db:"last_error"`
}

FailedEvent is a container used to report processing failures back to the storage layer. It includes the updated status and scheduling information for the next retry.

type MockPublisher

type MockPublisher struct {
	mock.Mock
}

MockPublisher is a test double that implements the Publisher interface. It is used to verify that events are correctly dispatched to messaging systems.

func (*MockPublisher) Close

func (m *MockPublisher) Close(ctx context.Context) error

Close mocks the graceful shutdown of publisher resources.

func (*MockPublisher) Connect

func (m *MockPublisher) Connect(ctx context.Context) error

Connect mocks the connect method of publisher.

func (*MockPublisher) Ping

func (m *MockPublisher) Ping(ctx context.Context) error

Ping mocks the connectivity check for the messaging backend.

func (*MockPublisher) Publish

func (m *MockPublisher) Publish(ctx context.Context, event Event) error

Publish mocks the dispatching of a single event to the target messaging system.

type MockStorage

type MockStorage struct {
	mock.Mock
}

MockStorage is a test double that implements the Storage interface. It allows for fine-grained control and assertions over database interactions.

func (*MockStorage) ClaimBatch

func (m *MockStorage) ClaimBatch(
	ctx context.Context,
	relayID string,
	size int,
	buffer []Event,
) ([]Event, error)

ClaimBatch mocks the retrieval and locking of a batch of events.

func (*MockStorage) Close

func (m *MockStorage) Close(ctx context.Context) error

Close mocks the graceful shutdown of storage resources.

func (*MockStorage) GetStats

func (m *MockStorage) GetStats(ctx context.Context) (Stats, error)

GetStats mocks the retrieval of operational metrics from the storage layer.

func (*MockStorage) MarkDeliveredBatch

func (m *MockStorage) MarkDeliveredBatch(
	ctx context.Context,
	ids []uuid.UUID,
	relayID string,
) error

MarkDeliveredBatch mocks the finalization of successfully processed events.

func (*MockStorage) MarkFailedBatch

func (m *MockStorage) MarkFailedBatch(
	ctx context.Context,
	failed []FailedEvent,
	relayID string,
) error

MarkFailedBatch mocks the recording of processing failures and retry metadata.

func (*MockStorage) Ping

func (m *MockStorage) Ping(ctx context.Context) error

Ping mocks the connectivity check for the storage backend.

func (*MockStorage) Prune

func (m *MockStorage) Prune(
	ctx context.Context,
	opts PruneOptions,
) (PruneResult, error)

Prune mocks the archival or deletion of old processed events.

func (*MockStorage) ReapExpiredLeases

func (m *MockStorage) ReapExpiredLeases(
	ctx context.Context,
	leaseTimeout time.Duration,
	limit int,
) (int64, error)

ReapExpiredLeases mocks the recovery of events held by inactive relay instances.

type PruneOptions

type PruneOptions struct {
	// DeliveredAge defines the duration threshold for DELIVERED events.
	// The string must follow the format "[number][unit]" where unit is:
	// 'd' for days, 'h' for hours, or 'm' for minutes (e.g., "7d", "24h", "60m").
	// An empty string or "0" indicates that no pruning should be performed
	// for this status.
	DeliveredAge string

	// DeadAge defines the duration threshold for DEAD events.
	// Follows the same format as DeliveredAge (e.g., "30d").
	// Use this to clear out "quarantined" events after a period of time.
	DeadAge string

	// DryRun, if true, instructs the storage implementation to calculate
	// and return the count of rows that meet the criteria without
	// actually performing the deletion.
	DryRun bool
}

PruneOptions defines the criteria for cleaning up old records.

type PruneResult

type PruneResult struct {
	// DeliveredDeleted is the total number of events with status 'DELIVERED'
	// that were successfully removed from the storage.
	DeliveredDeleted int64

	// DeadDeleted is the total number of events with status 'DEAD'
	// that were successfully removed from the storage.
	DeadDeleted int64
}

PruneResult provides feedback on the cleanup operation, returning the number of records affected by the maintenance task.

type PublishError

type PublishError struct {
	// Err is the underlying error returned by the transport client.
	Err error
	// IsRetryable indicates if the failure is transient (e.g., network timeout)
	// or permanent (e.g., authentication failure, invalid subject).
	// This field determines whether the Engine will retry the event or move it to 'DEAD' status.
	IsRetryable bool
	// Code is a machine-readable string used for categorizing errors in metrics and logs.
	Code string // e.g., "BROKER_NACK", "AUTH_EXPIRED", "VALIDATION_ERROR"
}

PublishError is a specialized error type used by Publisher implementations to communicate the nature of a failure back to the Engine.

func (*PublishError) Error

func (e *PublishError) Error() string

func (*PublishError) Unwrap

func (e *PublishError) Unwrap() error

type Publisher

type Publisher interface {

	// Connect establishes the initial connection to the message broker.
	// It should perform handshakes, authenticate, and initialize necessary
	// resources (like NATS JetStream contexts or Kafka writers).
	Connect(ctx context.Context) error

	// Publish sends a single event to the downstream system.
	// It should block until the broker provides a delivery acknowledgment
	// or the provided context is cancelled.
	Publish(ctx context.Context, event Event) error

	// Close performs a graceful shutdown of the publisher, ensuring any
	// buffered data is flushed and network resources are released.
	Close(ctx context.Context) error

	// Ping verifies the connectivity to the message broker (e.g., Kafka, NATS).
	// It ensures the publisher is authenticated and capable of sending messages.
	Ping(ctx context.Context) error
}

Publisher is the common interface for all egress transports. It abstracts the specifics of message brokers (Kafka, NATS, Redis, etc.) providing a unified contract for the Relay Engine.

type RetryPolicy

type RetryPolicy interface {
	// NextBackoff returns the duration to wait before the next attempt and
	// a boolean indicating whether the retry limit has been reached.
	NextBackoff(attempts int) (time.Duration, bool)
}

RetryPolicy defines the contract for calculating backoff durations between event delivery attempts. It allows the relay engine to be flexible with different retry strategies (e.g., constant, linear, or exponential).

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is an HTTP server that exposes administrative and observability endpoints. It serves as a diagnostic window into the relay's operation, providing metrics and health status.

func NewServer

func NewServer(
	ctx context.Context,
	s Storage,
	p Publisher,
	addr string,
	logger *zap.Logger,
) *Server

NewServer creates an instrumented HTTP server. It uses otelhttp to automatically generate trace spans for every incoming request.

func (*Server) Start

func (s *Server) Start(ctx context.Context) (err error)

Start runs the HTTP server. It blocks until the provided context is canceled or the underlying listener returns an error. When the context is canceled, it performs a graceful shutdown with a 5-second timeout.

type State

type State int64

State represents the current operational state of the engine. It is reported via the StateGauge to provide observability into whether the engine is healthy, throttled, or failing.

const (
	// StateActive: Everything is fine. The engine is polling.
	StateActive State = 1
	// StatePaused: The publisher (Kafka/NATS) is down. We are standing by.
	StatePaused State = 2
	// StateError: A critical error occurred (like a DB connection failure).
	StateError State = 3
)

Relay Status Constants

type Stats

type Stats struct {
	// PendingCount is the total number of events currently in 'PENDING' status.
	PendingCount int64 `json:"pending_count"`
	// RetryingCount is the number of events in 'PENDING' status that have
	// failed at least once (attempts > 0).
	RetryingCount int64 `json:"retrying_count"`
	// OldestAgeSec is the age in seconds of the oldest event waiting to be processed.
	OldestAgeSec int64 `json:"oldest_age_sec"`
}

Stats represents a snapshot of the outbox table's current state.

type Storage

type Storage interface {
	// ClaimBatch identifies and locks a set of pending events for a specific relay instance.
	// It transitions events to the 'DELIVERING' status and associates them with the relayID.
	// The 'buffer' parameter allows for reusing a slice to minimize allocations.
	ClaimBatch(
		ctx context.Context,
		relayID string,
		batchSize int,
		buffer []Event,
	) ([]Event, error)

	// MarkDeliveredBatch moves events to the final 'DELIVERED' state.
	// It must verify that the events are still locked by the provided relayID
	// to prevent race conditions with the lease reaper.
	MarkDeliveredBatch(ctx context.Context, ids []uuid.UUID, relayID string) error

	// MarkFailedBatch handles events that encountered errors during publishing.
	// It updates event metadata (attempts, last_error) and determines if the event
	// should be retried (PENDING) or quarantined (DEAD).
	MarkFailedBatch(ctx context.Context, failures []FailedEvent, relayID string) error

	// ReapExpiredLeases identifies events stuck in the 'DELIVERING' state past their
	// lease duration and resets them to 'PENDING', allowing other instances to pick them up.
	ReapExpiredLeases(ctx context.Context, leaseTimeout time.Duration, limit int) (int64, error)

	// GetStats retrieves high-level operational metrics about the outbox table,
	// such as the current backlog size and the age of the oldest pending message.
	GetStats(ctx context.Context) (Stats, error)

	// Prune removes old DELIVERED and DEAD events from storage to maintain performance.
	// This is typically called by the CLI or a background maintenance job.
	Prune(ctx context.Context, opts PruneOptions) (PruneResult, error)

	// Close releases any resources held by the storage implementation, such as
	// database connection pools.
	Close(ctx context.Context) error

	// Ping verifies the connectivity to the underlying database. It should
	// return an error if the storage backend is unreachable or misconfigured.
	Ping(ctx context.Context) error
}

Storage defines the contract for how the Relay reads and updates events. Implementations are responsible for managing the persistence of outbox events

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL