Documentation
¶
Index ¶
- type BatchConfig
- type BatchOption
- type Config
- type DLQConfig
- type ExchangeConfig
- type Logger
- type Option
- func WithCircuitBreaker(enabled bool) Option
- func WithCircuitBreakerHalfOpenRequests(n int) Option
- func WithCircuitBreakerMaxFailures(n int) Option
- func WithCircuitBreakerResetTimeout(d time.Duration) Option
- func WithConfirmTimeout(timeout time.Duration) Option
- func WithCustomDLQ(dlqConfig DLQConfig) Option
- func WithDLQ(enabled bool) Option
- func WithDLQExchange(exchangeName string) Option
- func WithDLQPrefix(prefix string) Option
- func WithExchanges(exchanges []ExchangeConfig) Option
- func WithLogger(log Logger) Option
- func WithMaxRetries(retries int) Option
- func WithPrefetchCount(count int) Option
- func WithPublisherConfirms(enabled bool) Option
- func WithQueues(queues []QueueConfig) Option
- func WithReconnectDelay(delay time.Duration) Option
- func WithTimeout(timeout time.Duration) Option
- func WithURI(uri string) Option
- type QueueConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchConfig ¶
type BatchConfig struct {
UsePipelining bool // Use pipelining (send all, then wait confirms). Default: true
FailFast bool // Stop on first error. Default: false (collect all errors)
MaxConcurrency int // Max concurrent workers for async. Default: 0 (unlimited)
}
batchConfig holds configuration for batch publish operations.
func DefaultBatchConfig ¶
func DefaultBatchConfig() BatchConfig
defaultBatchConfig returns the default batch configuration.
type BatchOption ¶
type BatchOption func(*BatchConfig)
BatchOption is a functional option for configuring batch publish operations.
func WithFailFast ¶
func WithFailFast(enabled bool) BatchOption
WithFailFast enables or disables fail-fast mode for batch publishes.
When enabled, the operation stops at the first error encountered. When disabled (default), all messages are attempted and all errors are collected.
func WithMaxConcurrency ¶
func WithMaxConcurrency(n int) BatchOption
WithMaxConcurrency sets the maximum number of concurrent workers for async batch publishes.
Default: 0 (unlimited) Recommended: 50-100 for most use cases
func WithPipelining ¶
func WithPipelining(enabled bool) BatchOption
WithPipelining enables or disables pipelining for batch publishes.
When enabled (default), all messages are sent first without waiting for confirmations, then all confirmations are collected. This is much faster for large batches (5-10x improvement).
When disabled, messages are published one at a time, waiting for each confirmation before sending the next (legacy behavior).
type Config ¶
type Config struct {
// Required fields
URI string // RabbitMQ connection URI (e.g., "amqp://user:pass@host:5672/")
// Optional fields with defaults
ReconnectDelay time.Duration // Delay between reconnection attempts (default: 5s)
Timeout time.Duration // Default timeout for operations (default: 10s)
PrefetchCount int // Number of messages to prefetch (default: 10)
MaxRetries int // Maximum number of retries for failed messages (default: 3)
PublisherConfirms bool // Enable publisher confirms for guaranteed delivery (default: false)
ConfirmTimeout time.Duration // Timeout for waiting publisher confirms (default: 5s)
Logger Logger // Logger for internal logging (default: DefaultLogger)
// Circuit Breaker configuration
CircuitBreakerEnabled bool // Enable circuit breaker for consumers (default: false)
CircuitBreakerMaxFailures int // Max failures before opening circuit (default: 5)
CircuitBreakerResetTimeout time.Duration // Time to wait before attempting half-open (default: 60s)
CircuitBreakerHalfOpenRequests int // Max requests in half-open state (default: 3)
// Dead Letter Queue configuration
DLQEnabled bool // Enable automatic DLQ setup (default: false)
DLQConfig DLQConfig // DLQ configuration options
// Topology configuration
Exchanges []ExchangeConfig // Exchanges to declare on connect
Queues []QueueConfig // Queues to declare and bind on connect
}
Config holds the configuration for the RabbitMQ client.
func DefaultConfig ¶
func DefaultConfig() Config
DefaultConfig returns a Config with sensible defaults.
Default values:
- ReconnectDelay: 5 seconds
- Timeout: 10 seconds
- PrefetchCount: 10
- MaxRetries: 3
- Logger: nil (will be set to DefaultLogger if not provided)
- CircuitBreakerEnabled: false
- CircuitBreakerMaxFailures: 5
- CircuitBreakerResetTimeout: 60 seconds
- CircuitBreakerHalfOpenRequests: 3
- Exchanges: empty
- Queues: empty
Note: If Logger is nil, the broker will automatically use NewDefaultLogger().
type DLQConfig ¶
type DLQConfig struct {
// ExchangeName is the name of the Dead Letter Exchange.
// Default: "dlx.exchange"
ExchangeName string
// QueuePrefix is the prefix to use for DLQ queue names.
// The full DLQ name will be: QueuePrefix + original queue name
// Default: "dlq."
QueuePrefix string
// ExchangeType is the type of the Dead Letter Exchange.
// Default: "direct"
ExchangeType string
// Durable specifies if the DLX and DLQ should be durable.
// Default: true
Durable bool
// AutoDelete specifies if the DLX and DLQ should be auto-deleted.
// Default: false
AutoDelete bool
}
DLQConfig holds the configuration for Dead Letter Queue (DLQ) setup.
When enabled, the library will automatically:
- Create a Dead Letter Exchange (DLX)
- Create a Dead Letter Queue (DLQ) with naming pattern "dlq.{queueName}"
- Configure main queues with x-dead-letter-exchange and x-dead-letter-routing-key
- Route failed messages (after max retries) to DLQ for analysis
Example:
cfg := config.DefaultConfig()
cfg.DLQEnabled = true
cfg.DLQConfig = config.DLQConfig{
ExchangeName: "my.dlx",
QueuePrefix: "dlq.",
}
func DefaultDLQConfig ¶
func DefaultDLQConfig() DLQConfig
DefaultDLQConfig returns a DLQConfig with sensible defaults.
func (*DLQConfig) GetDLQName ¶
GetDLQName returns the DLQ queue name for a given queue.
Example:
dlqConfig := config.DefaultDLQConfig()
dlqName := dlqConfig.GetDLQName("orders.queue")
// Returns: "dlq.orders.queue"
func (*DLQConfig) GetDLXRoutingKey ¶
GetDLXRoutingKey returns the routing key for DLX bindings.
By default, uses the original queue name as the routing key.
type ExchangeConfig ¶
type ExchangeConfig struct {
// Name is the name of the exchange
Name string
// Type is the exchange type: "direct", "fanout", "topic", or "headers"
Type string
// Durable indicates if the exchange should survive broker restart
// Default: true
Durable bool
// AutoDelete indicates if the exchange should be deleted when no longer used
// Default: false
AutoDelete bool
// Internal indicates if the exchange is internal (cannot be published to directly)
// Default: false
Internal bool
// Args are optional exchange arguments
Args map[string]interface{}
}
ExchangeConfig defines configuration for a RabbitMQ exchange.
type Logger ¶
type Logger interface {
Info(ctx context.Context, msg string, fields map[string]any)
Error(ctx context.Context, msg string, fields map[string]any)
Warn(ctx context.Context, msg string, fields map[string]any)
Debug(ctx context.Context, msg string, fields map[string]any)
Close() error
}
Logger is the interface that any logger implementation must satisfy. This is re-declared here to avoid import cycles with the rabbitmq package. The actual public interface is defined in the rabbitmq package root.
type Option ¶
type Option func(*Config)
Option is a functional option for configuring the client.
func WithCircuitBreaker ¶
WithCircuitBreaker enables or disables the circuit breaker for consumers.
When enabled, the consumer will automatically stop processing messages when error rates are too high, protecting against cascading failures.
The circuit breaker has three states:
- Closed: Normal operation, all messages processed
- Open: Too many failures, messages are rejected (nacked without requeue)
- Half-Open: Testing recovery, limited messages allowed
Example:
eventBus, _ := rabbitmq.NewEventBus(
config.DefaultConfig(),
config.WithCircuitBreaker(true),
config.WithCircuitBreakerMaxFailures(10),
config.WithCircuitBreakerResetTimeout(2 * time.Minute),
)
Default: false (disabled)
func WithCircuitBreakerHalfOpenRequests ¶
WithCircuitBreakerHalfOpenRequests sets the number of requests to allow in half-open state before deciding whether to close or re-open the circuit.
Default: 3
func WithCircuitBreakerMaxFailures ¶
WithCircuitBreakerMaxFailures sets the maximum number of consecutive failures before opening the circuit.
Default: 5
func WithCircuitBreakerResetTimeout ¶
WithCircuitBreakerResetTimeout sets how long to wait in open state before attempting to transition to half-open.
Default: 60 seconds
func WithConfirmTimeout ¶
WithConfirmTimeout sets the timeout for waiting publisher confirms.
This is only used when PublisherConfirms is enabled. Default: 5 seconds
func WithCustomDLQ ¶
WithCustomDLQ enables DLQ with custom configuration.
This allows full control over DLX naming, queue prefix, exchange type, etc.
Example:
eventBus, _ := rabbitmq.NewEventBus(
config.DefaultConfig(),
config.WithCustomDLQ(config.DLQConfig{
ExchangeName: "my.custom.dlx",
QueuePrefix: "failed.",
ExchangeType: "topic",
Durable: true,
}),
)
func WithDLQ ¶
WithDLQ enables automatic Dead Letter Queue (DLQ) setup with default configuration.
When enabled, the library will automatically:
- Create a Dead Letter Exchange (DLX) named "dlx.exchange"
- Create DLQ queues with prefix "dlq." for each main queue
- Configure main queues to route failed messages to DLQ
Example:
eventBus, _ := rabbitmq.NewEventBus(
config.DefaultConfig(),
config.WithDLQ(true),
)
Default: false (disabled)
func WithExchanges ¶
func WithExchanges(exchanges []ExchangeConfig) Option
WithExchanges sets the exchanges to declare on connection.
These exchanges will be automatically created when the client connects. If the exchange already exists, it will be validated against the config.
func WithLogger ¶
WithLogger sets a custom logger for the RabbitMQ client.
This allows you to inject your own logger implementation (zap, logrus, zerolog, etc.) instead of using the default logger.
Example with custom logger:
type MyLogger struct{}
func (l *MyLogger) Info(ctx context.Context, msg string, fields map[string]any) {
// Your logging implementation
}
// ... implement other methods
client, _ := broker.New(
config.DefaultConfig(),
config.WithLogger(&MyLogger{}),
)
Default: DefaultLogger (writes to stdout with timestamps) Use rabbitmq.NewDefaultLogger() to create the default logger.
func WithMaxRetries ¶
WithMaxRetries sets the maximum number of retries for failed messages.
When a message handler returns an error, the message will be retried up to MaxRetries times before being discarded or sent to a DLQ. Default: 3
func WithPrefetchCount ¶
WithPrefetchCount sets the number of messages to prefetch per consumer.
This controls how many unacknowledged messages can be delivered to a consumer. Default: 10
func WithPublisherConfirms ¶
WithPublisherConfirms enables or disables publisher confirms.
When enabled, the client will wait for RabbitMQ to confirm that messages have been received and persisted. This provides guaranteed delivery but has a performance impact.
Recommended for production environments where message loss is not acceptable. Default: false
func WithQueues ¶
func WithQueues(queues []QueueConfig) Option
WithQueues sets the queues to declare and bind on connection.
These queues will be automatically created and bound to their respective exchanges when the client connects.
func WithReconnectDelay ¶
WithReconnectDelay sets the delay between reconnection attempts.
Default: 5 seconds
type QueueConfig ¶
type QueueConfig struct {
// Name is the name of the queue
Name string
// Exchange is the name of the exchange to bind this queue to (optional)
Exchange string
// RoutingKeys are the routing keys to use for binding (optional)
// Multiple routing keys will create multiple bindings
RoutingKeys []string
// Durable indicates if the queue should survive broker restart
// Default: true
Durable bool
// AutoDelete indicates if the queue should be deleted when no longer used
// Default: false
AutoDelete bool
// Exclusive indicates if the queue is exclusive to this connection
// Default: false
Exclusive bool
// Args are optional queue arguments for advanced features.
// Common arguments:
// - "x-dead-letter-exchange": DLX name for failed messages
// - "x-dead-letter-routing-key": routing key for DLX
// - "x-message-ttl": message TTL in milliseconds (int32)
// - "x-max-length": maximum number of messages in queue (int32)
// - "x-max-priority": maximum priority level (int32, typically 1-10)
Args map[string]any
}
QueueConfig defines configuration for a RabbitMQ queue with bindings.
func CreateDLQQueue ¶
func CreateDLQQueue(mainQueue QueueConfig, dlqConfig DLQConfig) QueueConfig
CreateDLQQueue creates a DLQ queue configuration for a given main queue.
The DLQ queue will have the same durability settings as the main queue.
Example:
dlqConfig := config.DefaultDLQConfig()
mainQueue := config.QueueConfig{Name: "orders.queue", Durable: true}
dlqQueue := config.CreateDLQQueue(mainQueue, dlqConfig)
// Returns: QueueConfig{Name: "dlq.orders.queue", Durable: true, ...}
func (*QueueConfig) WithDLX ¶
func (q *QueueConfig) WithDLX(dlxExchange, routingKey string) *QueueConfig
WithDLX configures a queue to use Dead Letter Exchange.
This helper automatically sets the x-dead-letter-exchange and x-dead-letter-routing-key arguments for the queue.
The method is idempotent - if DLX is already configured, it won't overwrite.
Example:
queue := config.QueueConfig{
Name: "orders.queue",
Durable: true,
}
queue.WithDLX("dlx.exchange", "orders.queue")