config

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchConfig

type BatchConfig struct {
	UsePipelining  bool // Use pipelining (send all, then wait confirms). Default: true
	FailFast       bool // Stop on first error. Default: false (collect all errors)
	MaxConcurrency int  // Max concurrent workers for async. Default: 0 (unlimited)
}

batchConfig holds configuration for batch publish operations.

func DefaultBatchConfig

func DefaultBatchConfig() BatchConfig

defaultBatchConfig returns the default batch configuration.

type BatchOption

type BatchOption func(*BatchConfig)

BatchOption is a functional option for configuring batch publish operations.

func WithFailFast

func WithFailFast(enabled bool) BatchOption

WithFailFast enables or disables fail-fast mode for batch publishes.

When enabled, the operation stops at the first error encountered. When disabled (default), all messages are attempted and all errors are collected.

func WithMaxConcurrency

func WithMaxConcurrency(n int) BatchOption

WithMaxConcurrency sets the maximum number of concurrent workers for async batch publishes.

Default: 0 (unlimited) Recommended: 50-100 for most use cases

func WithPipelining

func WithPipelining(enabled bool) BatchOption

WithPipelining enables or disables pipelining for batch publishes.

When enabled (default), all messages are sent first without waiting for confirmations, then all confirmations are collected. This is much faster for large batches (5-10x improvement).

When disabled, messages are published one at a time, waiting for each confirmation before sending the next (legacy behavior).

type Config

type Config struct {
	// Required fields
	URI string // RabbitMQ connection URI (e.g., "amqp://user:pass@host:5672/")

	// Optional fields with defaults
	ReconnectDelay    time.Duration // Delay between reconnection attempts (default: 5s)
	Timeout           time.Duration // Default timeout for operations (default: 10s)
	PrefetchCount     int           // Number of messages to prefetch (default: 10)
	MaxRetries        int           // Maximum number of retries for failed messages (default: 3)
	PublisherConfirms bool          // Enable publisher confirms for guaranteed delivery (default: false)
	ConfirmTimeout    time.Duration // Timeout for waiting publisher confirms (default: 5s)
	Logger            Logger        // Logger for internal logging (default: DefaultLogger)

	// Circuit Breaker configuration
	CircuitBreakerEnabled          bool          // Enable circuit breaker for consumers (default: false)
	CircuitBreakerMaxFailures      int           // Max failures before opening circuit (default: 5)
	CircuitBreakerResetTimeout     time.Duration // Time to wait before attempting half-open (default: 60s)
	CircuitBreakerHalfOpenRequests int           // Max requests in half-open state (default: 3)

	// Dead Letter Queue configuration
	DLQEnabled bool      // Enable automatic DLQ setup (default: false)
	DLQConfig  DLQConfig // DLQ configuration options

	// Topology configuration
	Exchanges []ExchangeConfig // Exchanges to declare on connect
	Queues    []QueueConfig    // Queues to declare and bind on connect
}

Config holds the configuration for the RabbitMQ client.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults.

Default values:

  • ReconnectDelay: 5 seconds
  • Timeout: 10 seconds
  • PrefetchCount: 10
  • MaxRetries: 3
  • Logger: nil (will be set to DefaultLogger if not provided)
  • CircuitBreakerEnabled: false
  • CircuitBreakerMaxFailures: 5
  • CircuitBreakerResetTimeout: 60 seconds
  • CircuitBreakerHalfOpenRequests: 3
  • Exchanges: empty
  • Queues: empty

Note: If Logger is nil, the broker will automatically use NewDefaultLogger().

func (*Config) Validate

func (c *Config) Validate() error

validate checks if the configuration is valid.

type DLQConfig

type DLQConfig struct {
	// ExchangeName is the name of the Dead Letter Exchange.
	// Default: "dlx.exchange"
	ExchangeName string

	// QueuePrefix is the prefix to use for DLQ queue names.
	// The full DLQ name will be: QueuePrefix + original queue name
	// Default: "dlq."
	QueuePrefix string

	// ExchangeType is the type of the Dead Letter Exchange.
	// Default: "direct"
	ExchangeType string

	// Durable specifies if the DLX and DLQ should be durable.
	// Default: true
	Durable bool

	// AutoDelete specifies if the DLX and DLQ should be auto-deleted.
	// Default: false
	AutoDelete bool
}

DLQConfig holds the configuration for Dead Letter Queue (DLQ) setup.

When enabled, the library will automatically:

  • Create a Dead Letter Exchange (DLX)
  • Create a Dead Letter Queue (DLQ) with naming pattern "dlq.{queueName}"
  • Configure main queues with x-dead-letter-exchange and x-dead-letter-routing-key
  • Route failed messages (after max retries) to DLQ for analysis

Example:

cfg := config.DefaultConfig()
cfg.DLQEnabled = true
cfg.DLQConfig = config.DLQConfig{
    ExchangeName: "my.dlx",
    QueuePrefix:  "dlq.",
}

func DefaultDLQConfig

func DefaultDLQConfig() DLQConfig

DefaultDLQConfig returns a DLQConfig with sensible defaults.

func (*DLQConfig) GetDLQName

func (c *DLQConfig) GetDLQName(queueName string) string

GetDLQName returns the DLQ queue name for a given queue.

Example:

dlqConfig := config.DefaultDLQConfig()
dlqName := dlqConfig.GetDLQName("orders.queue")
// Returns: "dlq.orders.queue"

func (*DLQConfig) GetDLXRoutingKey

func (c *DLQConfig) GetDLXRoutingKey(queueName string) string

GetDLXRoutingKey returns the routing key for DLX bindings.

By default, uses the original queue name as the routing key.

type ExchangeConfig

type ExchangeConfig struct {
	// Name is the name of the exchange
	Name string

	// Type is the exchange type: "direct", "fanout", "topic", or "headers"
	Type string

	// Durable indicates if the exchange should survive broker restart
	// Default: true
	Durable bool

	// AutoDelete indicates if the exchange should be deleted when no longer used
	// Default: false
	AutoDelete bool

	// Internal indicates if the exchange is internal (cannot be published to directly)
	// Default: false
	Internal bool

	// Args are optional exchange arguments
	Args map[string]interface{}
}

ExchangeConfig defines configuration for a RabbitMQ exchange.

type Logger

type Logger interface {
	Info(ctx context.Context, msg string, fields map[string]any)
	Error(ctx context.Context, msg string, fields map[string]any)
	Warn(ctx context.Context, msg string, fields map[string]any)
	Debug(ctx context.Context, msg string, fields map[string]any)
	Close() error
}

Logger is the interface that any logger implementation must satisfy. This is re-declared here to avoid import cycles with the rabbitmq package. The actual public interface is defined in the rabbitmq package root.

type Option

type Option func(*Config)

Option is a functional option for configuring the client.

func WithCircuitBreaker

func WithCircuitBreaker(enabled bool) Option

WithCircuitBreaker enables or disables the circuit breaker for consumers.

When enabled, the consumer will automatically stop processing messages when error rates are too high, protecting against cascading failures.

The circuit breaker has three states:

  • Closed: Normal operation, all messages processed
  • Open: Too many failures, messages are rejected (nacked without requeue)
  • Half-Open: Testing recovery, limited messages allowed

Example:

eventBus, _ := rabbitmq.NewEventBus(
    config.DefaultConfig(),
    config.WithCircuitBreaker(true),
    config.WithCircuitBreakerMaxFailures(10),
    config.WithCircuitBreakerResetTimeout(2 * time.Minute),
)

Default: false (disabled)

func WithCircuitBreakerHalfOpenRequests

func WithCircuitBreakerHalfOpenRequests(n int) Option

WithCircuitBreakerHalfOpenRequests sets the number of requests to allow in half-open state before deciding whether to close or re-open the circuit.

Default: 3

func WithCircuitBreakerMaxFailures

func WithCircuitBreakerMaxFailures(n int) Option

WithCircuitBreakerMaxFailures sets the maximum number of consecutive failures before opening the circuit.

Default: 5

func WithCircuitBreakerResetTimeout

func WithCircuitBreakerResetTimeout(d time.Duration) Option

WithCircuitBreakerResetTimeout sets how long to wait in open state before attempting to transition to half-open.

Default: 60 seconds

func WithConfirmTimeout

func WithConfirmTimeout(timeout time.Duration) Option

WithConfirmTimeout sets the timeout for waiting publisher confirms.

This is only used when PublisherConfirms is enabled. Default: 5 seconds

func WithCustomDLQ

func WithCustomDLQ(dlqConfig DLQConfig) Option

WithCustomDLQ enables DLQ with custom configuration.

This allows full control over DLX naming, queue prefix, exchange type, etc.

Example:

eventBus, _ := rabbitmq.NewEventBus(
    config.DefaultConfig(),
    config.WithCustomDLQ(config.DLQConfig{
        ExchangeName: "my.custom.dlx",
        QueuePrefix:  "failed.",
        ExchangeType: "topic",
        Durable:      true,
    }),
)

func WithDLQ

func WithDLQ(enabled bool) Option

WithDLQ enables automatic Dead Letter Queue (DLQ) setup with default configuration.

When enabled, the library will automatically:

  • Create a Dead Letter Exchange (DLX) named "dlx.exchange"
  • Create DLQ queues with prefix "dlq." for each main queue
  • Configure main queues to route failed messages to DLQ

Example:

eventBus, _ := rabbitmq.NewEventBus(
    config.DefaultConfig(),
    config.WithDLQ(true),
)

Default: false (disabled)

func WithDLQExchange

func WithDLQExchange(exchangeName string) Option

WithDLQExchange sets the Dead Letter Exchange name.

Default: "dlx.exchange"

func WithDLQPrefix

func WithDLQPrefix(prefix string) Option

WithDLQPrefix sets the prefix for DLQ queue names.

Default: "dlq."

func WithExchanges

func WithExchanges(exchanges []ExchangeConfig) Option

WithExchanges sets the exchanges to declare on connection.

These exchanges will be automatically created when the client connects. If the exchange already exists, it will be validated against the config.

func WithLogger

func WithLogger(log Logger) Option

WithLogger sets a custom logger for the RabbitMQ client.

This allows you to inject your own logger implementation (zap, logrus, zerolog, etc.) instead of using the default logger.

Example with custom logger:

type MyLogger struct{}

func (l *MyLogger) Info(ctx context.Context, msg string, fields map[string]any) {
    // Your logging implementation
}
// ... implement other methods

client, _ := broker.New(
    config.DefaultConfig(),
    config.WithLogger(&MyLogger{}),
)

Default: DefaultLogger (writes to stdout with timestamps) Use rabbitmq.NewDefaultLogger() to create the default logger.

func WithMaxRetries

func WithMaxRetries(retries int) Option

WithMaxRetries sets the maximum number of retries for failed messages.

When a message handler returns an error, the message will be retried up to MaxRetries times before being discarded or sent to a DLQ. Default: 3

func WithPrefetchCount

func WithPrefetchCount(count int) Option

WithPrefetchCount sets the number of messages to prefetch per consumer.

This controls how many unacknowledged messages can be delivered to a consumer. Default: 10

func WithPublisherConfirms

func WithPublisherConfirms(enabled bool) Option

WithPublisherConfirms enables or disables publisher confirms.

When enabled, the client will wait for RabbitMQ to confirm that messages have been received and persisted. This provides guaranteed delivery but has a performance impact.

Recommended for production environments where message loss is not acceptable. Default: false

func WithQueues

func WithQueues(queues []QueueConfig) Option

WithQueues sets the queues to declare and bind on connection.

These queues will be automatically created and bound to their respective exchanges when the client connects.

func WithReconnectDelay

func WithReconnectDelay(delay time.Duration) Option

WithReconnectDelay sets the delay between reconnection attempts.

Default: 5 seconds

func WithTimeout

func WithTimeout(timeout time.Duration) Option

WithTimeout sets the default timeout for operations.

Default: 10 seconds

func WithURI

func WithURI(uri string) Option

WithURI sets the RabbitMQ connection URI.

Example: "amqp://guest:guest@localhost:5672/"

type QueueConfig

type QueueConfig struct {
	// Name is the name of the queue
	Name string

	// Exchange is the name of the exchange to bind this queue to (optional)
	Exchange string

	// RoutingKeys are the routing keys to use for binding (optional)
	// Multiple routing keys will create multiple bindings
	RoutingKeys []string

	// Durable indicates if the queue should survive broker restart
	// Default: true
	Durable bool

	// AutoDelete indicates if the queue should be deleted when no longer used
	// Default: false
	AutoDelete bool

	// Exclusive indicates if the queue is exclusive to this connection
	// Default: false
	Exclusive bool

	// Args are optional queue arguments for advanced features.
	// Common arguments:
	//   - "x-dead-letter-exchange": DLX name for failed messages
	//   - "x-dead-letter-routing-key": routing key for DLX
	//   - "x-message-ttl": message TTL in milliseconds (int32)
	//   - "x-max-length": maximum number of messages in queue (int32)
	//   - "x-max-priority": maximum priority level (int32, typically 1-10)
	Args map[string]any
}

QueueConfig defines configuration for a RabbitMQ queue with bindings.

func CreateDLQQueue

func CreateDLQQueue(mainQueue QueueConfig, dlqConfig DLQConfig) QueueConfig

CreateDLQQueue creates a DLQ queue configuration for a given main queue.

The DLQ queue will have the same durability settings as the main queue.

Example:

dlqConfig := config.DefaultDLQConfig()
mainQueue := config.QueueConfig{Name: "orders.queue", Durable: true}
dlqQueue := config.CreateDLQQueue(mainQueue, dlqConfig)
// Returns: QueueConfig{Name: "dlq.orders.queue", Durable: true, ...}

func (*QueueConfig) WithDLX

func (q *QueueConfig) WithDLX(dlxExchange, routingKey string) *QueueConfig

WithDLX configures a queue to use Dead Letter Exchange.

This helper automatically sets the x-dead-letter-exchange and x-dead-letter-routing-key arguments for the queue.

The method is idempotent - if DLX is already configured, it won't overwrite.

Example:

queue := config.QueueConfig{
    Name:    "orders.queue",
    Durable: true,
}
queue.WithDLX("dlx.exchange", "orders.queue")

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL