Documentation
¶
Overview ¶
Package rabbit provides functionality for interacting with RabbitMQ.
The rabbit package offers a simplified interface for working with RabbitMQ message queues, providing connection management, message publishing, and consuming capabilities with a focus on reliability and ease of use.
Architecture ¶
This package follows the "accept interfaces, return structs" design pattern:
- Client interface: Defines the contract for RabbitMQ operations
- RabbitClient struct: Concrete implementation of the Client interface
- Message interface: Defines the contract for consumed messages
- NewClient constructor: Returns *RabbitClient (concrete type)
- FX module: Provides both *RabbitClient and Client interface for dependency injection
Core Features:
- Robust connection management with automatic reconnection
- Simple publishing interface with error handling
- Consumer interface with automatic acknowledgment handling
- Dead letter queue support
- Optional observability hooks for metrics and tracing
- Optional context-aware logging for lifecycle events
- Distributed tracing support via message headers
Direct Usage (Without FX) ¶
For simple applications or tests, create a client directly:
import (
"github.com/Aleph-Alpha/std/v1/rabbit"
"context"
"sync"
)
// Create a new RabbitMQ client (returns concrete *RabbitClient)
client, err := rabbit.NewClient(rabbit.Config{
Connection: rabbit.Connection{
Host: "localhost",
Port: 5672,
User: "guest",
Password: "guest",
},
Channel: rabbit.Channel{
ExchangeName: "events",
ExchangeType: "topic",
RoutingKey: "user.created",
QueueName: "user-events",
IsConsumer: true,
},
})
if err != nil {
return err
}
defer client.GracefulShutdown()
// Optionally attach logger and observer
client = client.
WithLogger(myLogger).
WithObserver(myObserver)
// Publish a message
ctx := context.Background()
message := []byte(`{"id": "123", "name": "John"}`)
err = client.Publish(ctx, message)
Builder Pattern for Optional Dependencies ¶
The client supports optional dependencies via builder methods:
import (
"github.com/Aleph-Alpha/std/v1/rabbit"
"github.com/Aleph-Alpha/std/v1/logger"
"github.com/Aleph-Alpha/std/v1/observability"
)
// Create client with optional logger and observer
client, err := rabbit.NewClient(config)
if err != nil {
return err
}
// Attach optional dependencies using builder pattern
client = client.
WithLogger(loggerInstance). // Optional: for lifecycle logging
WithObserver(observerInstance) // Optional: for metrics/tracing
defer client.GracefulShutdown()
FX Module Integration ¶
For production applications using Uber's fx, use the FXModule which provides both the concrete type and interface, with automatic injection of optional dependencies:
import (
"github.com/Aleph-Alpha/std/v1/rabbit"
"github.com/Aleph-Alpha/std/v1/logger"
"github.com/Aleph-Alpha/std/v1/observability"
"go.uber.org/fx"
)
app := fx.New(
logger.FXModule, // Optional: provides std logger
rabbit.FXModule, // Provides *RabbitClient and rabbit.Client interface
fx.Provide(
func() rabbit.Config {
return rabbit.Config{
Connection: rabbit.Connection{
Host: "localhost",
Port: 5672,
User: "guest",
Password: "guest",
},
Channel: rabbit.Channel{
ExchangeName: "events",
QueueName: "user-events",
IsConsumer: true,
},
}
},
// Optional: provide observer for metrics
func(metrics *prometheus.Metrics) observability.Observer {
return NewObserverAdapter(metrics)
},
),
fx.Invoke(func(client *rabbit.RabbitClient) {
// Logger and Observer are automatically injected if provided
ctx := context.Background()
client.Publish(ctx, []byte("message"))
}),
)
app.Run()
The FX module automatically injects optional dependencies:
- Logger (rabbit.Logger): If provided via fx, automatically attached
- Observer (observability.Observer): If provided via fx, automatically attached
Observability ¶
The package supports optional observability hooks for tracking operations. When an observer is attached, it will be notified of all publish and consume operations with detailed context.
Observer Integration:
import (
"github.com/Aleph-Alpha/std/v1/observability"
"github.com/Aleph-Alpha/std/v1/rabbit"
)
// Create an observer (typically wraps your metrics system)
type MetricsObserver struct {
metrics *prometheus.Metrics
}
func (o *MetricsObserver) ObserveOperation(ctx observability.OperationContext) {
// Track metrics based on the operation
switch ctx.Operation {
case "produce":
o.metrics.MessageQueue.MessagesPublished.
WithLabelValues(ctx.Resource, ctx.SubResource, errorStatus(ctx.Error)).
Inc()
case "consume":
o.metrics.MessageQueue.MessagesConsumed.
WithLabelValues(ctx.Resource, errorStatus(ctx.Error)).
Inc()
}
}
// Attach observer to client
client = client.WithObserver(&MetricsObserver{metrics: promMetrics})
Observer receives the following context for each operation:
Publish operations:
- Component: "rabbit"
- Operation: "produce"
- Resource: exchange name
- SubResource: routing key
- Duration: time taken to publish
- Error: any error that occurred
- Size: message size in bytes
Consume operations:
- Component: "rabbit"
- Operation: "consume"
- Resource: queue name
- SubResource: ""
- Duration: time taken to receive message
- Error: nil (errors in message processing are not tracked here)
- Size: message size in bytes
Logging ¶
The package supports optional context-aware logging for lifecycle events and background operations. When a logger is attached, it will be used for:
- Connection lifecycle events (connected, disconnected, reconnecting)
- Consumer lifecycle events (started, stopped, shutdown)
- Background reconnection attempts and errors
Logger Integration:
import (
"github.com/Aleph-Alpha/std/v1/rabbit"
"github.com/Aleph-Alpha/std/v1/logger"
)
// Create logger instance
loggerClient, err := logger.NewLogger(loggerConfig)
if err != nil {
return err
}
// Attach logger to client
client = client.WithLogger(loggerClient)
The logger interface matches std/v1/logger for seamless integration:
type Logger interface {
InfoWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
WarnWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
ErrorWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
}
Logging is designed to be minimal and non-intrusive:
- Errors that are returned to the caller are NOT logged (avoid duplicate logs)
- Only background operations and lifecycle events are logged
- Context is propagated for distributed tracing
Type Aliases in Consumer Code ¶
To simplify your code and make it message-broker-agnostic, use type aliases:
package myapp
import stdRabbit "github.com/Aleph-Alpha/std/v1/rabbit"
// Use type alias to reference std's interface
type RabbitClient = stdRabbit.Client
type RabbitMessage = stdRabbit.Message
// Now use RabbitClient throughout your codebase
func MyFunction(client RabbitClient) {
client.Publish(ctx, []byte("message"))
}
This eliminates the need for adapters and allows you to switch implementations by only changing the alias definition.
Message Consumption ¶
// Consume messages
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
msgChan := client.Consume(ctx, wg)
for msg := range msgChan {
// Process the message
log.Info("Received message", nil, map[string]interface{}{
"body": string(msg.Body()),
})
// Acknowledge the message
if err := msg.AckMsg(); err != nil {
log.Error("Failed to acknowledge message", err, nil)
}
}
Distributed Tracing with Message Headers ¶
This package supports distributed tracing by allowing you to propagate trace context through message headers, enabling end-to-end visibility across services.
Publisher Example (sending trace context):
import (
"github.com/Aleph-Alpha/std/v1/tracer"
// other imports...
)
// Create a tracer client
tracerClient := tracer.NewClient(tracerConfig, log)
// Create a span for the operation that includes publishing
ctx, span := tracerClient.StartSpan(ctx, "process-and-publish")
defer span.End()
// Process data...
// Extract trace context as headers before publishing
traceHeaders := tracerClient.GetCarrier(ctx)
// Publish with trace headers
err = rabbitClient.Publish(ctx, message, traceHeaders)
if err != nil {
span.RecordError(err)
log.Error("Failed to publish message", err, nil)
}
Consumer Example (continuing the trace):
msgChan := rabbitClient.Consume(ctx, wg)
for msg := range msgChan {
// Extract trace headers from the message
headers := msg.Header()
// Create a new context with the trace information
ctx = tracerClient.SetCarrierOnContext(ctx, headers)
// Create a span as a child of the incoming trace
ctx, span := tracerClient.StartSpan(ctx, "process-message")
defer span.End()
// Add relevant attributes to the span
span.SetAttributes(map[string]interface{}{
"message.size": len(msg.Body()),
"message.type": "user.created",
})
// Process the message...
if err := processMessage(msg.Body()) {
// Record any errors in the span
span.RecordError(err)
msg.NackMsg(true) // Requeue for retry
continue
}
// Acknowledge successful processing
if err := msg.AckMsg(); err != nil {
span.RecordError(err)
log.Error("Failed to acknowledge message", err, nil)
}
}
Consuming from Dead Letter Queue:
dlqChan := client.ConsumeDLQ(ctx, wg)
for msg := range dlqChan {
log.Info("Processing failed message", nil, map[string]interface{}{
"body": string(msg.Body()),
})
// Process the failed message
// Acknowledge after processing
msg.AckMsg()
}
FX Module Integration:
This package provides a fx module for easy integration:
app := fx.New( logger.Module, rabbit.Module, // ... other modules ) app.Run()
Configuration:
The rabbit client can be configured via environment variables or explicitly:
RABBIT_URI=amqp://guest:guest@localhost:5672/ RABBIT_EXCHANGE_NAME=events RABBIT_EXCHANGE_TYPE=topic RABBIT_ROUTING_KEY=user.created RABBIT_QUEUE_NAME=user-events
Thread Safety:
All methods on the Rabbit type are safe for concurrent use by multiple goroutines, except for Close() which should only be called once.
Index ¶
- Variables
- func RegisterRabbitLifecycle(params RabbitLifecycleParams)
- type Channel
- type Client
- type Config
- type Connection
- type ConsumerMessage
- type DeadLetter
- type ErrorCategory
- type Logger
- type Message
- type RabbitClient
- func (rb *RabbitClient) Consume(ctx context.Context, wg *sync.WaitGroup) <-chan Message
- func (rb *RabbitClient) ConsumeDLQ(ctx context.Context, wg *sync.WaitGroup) <-chan Message
- func (rb *RabbitClient) GetChannel() *amqp.Channel
- func (r *RabbitClient) GetErrorCategory(err error) ErrorCategory
- func (rb *RabbitClient) GracefulShutdown()
- func (r *RabbitClient) IsAlarmError(err error) bool
- func (r *RabbitClient) IsAuthenticationError(err error) bool
- func (r *RabbitClient) IsChannelError(err error) bool
- func (r *RabbitClient) IsConnectionError(err error) bool
- func (r *RabbitClient) IsPermanentError(err error) bool
- func (r *RabbitClient) IsResourceError(err error) bool
- func (r *RabbitClient) IsRetryableError(err error) bool
- func (r *RabbitClient) IsTemporaryError(err error) bool
- func (rb *RabbitClient) Publish(ctx context.Context, msg []byte, headers ...map[string]interface{}) error
- func (rb *RabbitClient) RetryConnection(cfg Config)
- func (r *RabbitClient) TranslateError(err error) error
- func (rb *RabbitClient) WithLogger(logger Logger) *RabbitClient
- func (rb *RabbitClient) WithObserver(observer observability.Observer) *RabbitClient
- type RabbitLifecycleParams
- type RabbitParams
Constants ¶
This section is empty.
Variables ¶
var ( // ErrConnectionFailed is returned when connection to RabbitMQ cannot be established ErrConnectionFailed = errors.New("connection failed") // ErrConnectionLost is returned when connection to RabbitMQ is lost ErrConnectionLost = errors.New("connection lost") // ErrConnectionClosed is returned when connection is closed ErrConnectionClosed = errors.New("connection closed") // ErrChannelClosed is returned when channel is closed ErrChannelClosed = errors.New("channel closed") // ErrChannelException is returned when channel encounters an exception ErrChannelException = errors.New("channel exception") // ErrAuthenticationFailed is returned when authentication fails ErrAuthenticationFailed = errors.New("authentication failed") // ErrAccessDenied is returned when access is denied to a resource ErrAccessDenied = errors.New("access denied") // ErrInsufficientPermissions is returned when user lacks necessary permissions ErrInsufficientPermissions = errors.New("insufficient permissions") // ErrInvalidCredentials is returned when credentials are invalid ErrInvalidCredentials = errors.New("invalid credentials") // ErrExchangeNotFound is returned when exchange doesn't exist ErrExchangeNotFound = errors.New("exchange not found") // ErrQueueNotFound is returned when queue doesn't exist ErrQueueNotFound = errors.New("queue not found") // ErrQueueEmpty is returned when queue is empty ErrQueueEmpty = errors.New("queue empty") // ErrQueueExists is returned when queue already exists with different properties ErrQueueExists = errors.New("queue already exists") // ErrExchangeExists is returned when exchange already exists with different properties ErrExchangeExists = errors.New("exchange already exists") // ErrResourceLocked is returned when resource is locked ErrResourceLocked = errors.New("resource locked") // ErrPreconditionFailed is returned when precondition check fails ErrPreconditionFailed = errors.New("precondition failed") // ErrInvalidArgument is returned when argument is invalid ErrInvalidArgument = errors.New("invalid argument") // ErrInvalidFrameFormat is returned when frame format is invalid ErrInvalidFrameFormat = errors.New("invalid frame format") // ErrInvalidFrameSize is returned when frame size is invalid ErrInvalidFrameSize = errors.New("invalid frame size") // ErrFrameError is returned for frame-related errors ErrFrameError = errors.New("frame error") // ErrSyntaxError is returned for syntax errors in AMQP protocol ErrSyntaxError = errors.New("syntax error") // ErrCommandInvalid is returned when command is invalid ErrCommandInvalid = errors.New("command invalid") // ErrChannelError is returned for channel-related errors ErrChannelError = errors.New("channel error") // ErrUnexpectedFrame is returned when unexpected frame is received ErrUnexpectedFrame = errors.New("unexpected frame") // ErrResourceError is returned for resource-related errors ErrResourceError = errors.New("resource error") // ErrNotAllowed is returned when operation is not allowed ErrNotAllowed = errors.New("not allowed") // ErrNotImplemented is returned when feature is not implemented ErrNotImplemented = errors.New("not implemented") // ErrInternalError is returned for internal errors ErrInternalError = errors.New("internal error") // ErrTimeout is returned when operation times out ErrTimeout = errors.New("timeout") // ErrNetworkError is returned for network-related errors ErrNetworkError = errors.New("network error") // ErrTLSError is returned for TLS/SSL errors ErrTLSError = errors.New("TLS error") // ErrCertificateError is returned for certificate-related errors ErrCertificateError = errors.New("certificate error") // ErrHandshakeFailed is returned when handshake fails ErrHandshakeFailed = errors.New("handshake failed") // ErrProtocolError is returned for protocol-related errors ErrProtocolError = errors.New("protocol error") // ErrVersionMismatch is returned when version mismatch occurs ErrVersionMismatch = errors.New("version mismatch") // ErrServerError is returned for server-side errors ErrServerError = errors.New("server error") // ErrClientError is returned for client-side errors ErrClientError = errors.New("client error") // ErrMessageTooLarge is returned when message exceeds size limits ErrMessageTooLarge = errors.New("message too large") // ErrInvalidMessage is returned when message format is invalid ErrInvalidMessage = errors.New("invalid message") // ErrMessageNacked is returned when message is negatively acknowledged ErrMessageNacked = errors.New("message nacked") // ErrMessageReturned is returned when message is returned by broker ErrMessageReturned = errors.New("message returned") // ErrPublishFailed is returned when publish operation fails ErrPublishFailed = errors.New("publish failed") // ErrConsumeFailed is returned when consume operation fails ErrConsumeFailed = errors.New("consume failed") // ErrAckFailed is returned when acknowledge operation fails ErrAckFailed = errors.New("acknowledge failed") // ErrNackFailed is returned when negative acknowledge operation fails ErrNackFailed = errors.New("negative acknowledge failed") // ErrRejectFailed is returned when reject operation fails ErrRejectFailed = errors.New("reject failed") // ErrQoSFailed is returned when QoS operation fails ErrQoSFailed = errors.New("QoS failed") // ErrBindFailed is returned when bind operation fails ErrBindFailed = errors.New("bind failed") // ErrUnbindFailed is returned when unbind operation fails ErrUnbindFailed = errors.New("unbind failed") // ErrDeclareFailed is returned when declare operation fails ErrDeclareFailed = errors.New("declare failed") // ErrDeleteFailed is returned when delete operation fails ErrDeleteFailed = errors.New("delete failed") // ErrPurgeFailed is returned when purge operation fails ErrPurgeFailed = errors.New("purge failed") // ErrTransactionFailed is returned when transaction fails ErrTransactionFailed = errors.New("transaction failed") // ErrCancelled is returned when operation is cancelled ErrCancelled = errors.New("operation cancelled") // ErrShutdown is returned when system is shutting down ErrShutdown = errors.New("shutdown") // ErrConfigurationError is returned for configuration-related errors ErrConfigurationError = errors.New("configuration error") // ErrVirtualHostNotFound is returned when virtual host doesn't exist ErrVirtualHostNotFound = errors.New("virtual host not found") // ErrUserNotFound is returned when user doesn't exist ErrUserNotFound = errors.New("user not found") // ErrClusterError is returned for cluster-related errors ErrClusterError = errors.New("cluster error") // ErrNodeDown is returned when node is down ErrNodeDown = errors.New("node down") // ErrMemoryAlarm is returned when memory alarm is triggered ErrMemoryAlarm = errors.New("memory alarm") // ErrDiskAlarm is returned when disk alarm is triggered ErrDiskAlarm = errors.New("disk alarm") // ErrResourceAlarm is returned when resource alarm is triggered ErrResourceAlarm = errors.New("resource alarm") // ErrFlowControl is returned when flow control is active ErrFlowControl = errors.New("flow control") // ErrQuotaExceeded is returned when quota is exceeded ErrQuotaExceeded = errors.New("quota exceeded") // ErrRateLimit is returned when rate limit is exceeded ErrRateLimit = errors.New("rate limit exceeded") // ErrBackpressure is returned when backpressure is applied ErrBackpressure = errors.New("backpressure") // ErrUnknownError is returned for unknown/unhandled errors ErrUnknownError = errors.New("unknown error") )
Common RabbitMQ error types that can be used by consumers of this package. These provide a standardized set of errors that abstract away the underlying AMQP-specific error details.
var FXModule = fx.Module("rabbit", fx.Provide( NewClientWithDI, fx.Annotate( func(r *RabbitClient) Client { return r }, fx.As(new(Client)), ), ), fx.Invoke(RegisterRabbitLifecycle), )
FXModule is an fx.Module that provides and configures the RabbitMQ client. This module registers the RabbitMQ client with the Fx dependency injection framework, making it available to other components in the application.
The module provides: 1. *RabbitClient (concrete type) for direct use 2. Client interface for dependency injection 3. Lifecycle management for graceful startup and shutdown
Usage:
app := fx.New(
rabbit.FXModule,
// other modules...
)
Functions ¶
func RegisterRabbitLifecycle ¶
func RegisterRabbitLifecycle(params RabbitLifecycleParams)
RegisterRabbitLifecycle registers the RabbitMQ client with the fx lifecycle system. This function sets up proper initialization and graceful shutdown of the RabbitMQ client, including starting the connection monitoring goroutine.
Parameters:
- lc: The fx lifecycle controller
- client: The RabbitMQ client instance
- logger: Logger for recording lifecycle events
- cfg: Configuration for the RabbitMQ client
The function:
- On application start: Launches a background goroutine that monitors and maintains the RabbitMQ connection, automatically reconnecting if it fails.
- On application stop: Triggers a graceful shutdown of the RabbitMQ client, closing channels and connections cleanly.
This ensures that the RabbitMQ client remains available throughout the application's lifetime and is properly cleaned up during shutdown.
Types ¶
type Channel ¶
type Channel struct {
// ExchangeName is the name of the exchange to publish to or consume from
ExchangeName string
// ExchangeType defines the routing behavior of the exchange
// Common values: "direct", "fanout", "topic", "headers"
ExchangeType string
// RoutingKey is used for routing messages from exchanges to queues
// The meaning depends on the exchange type:
// - For direct exchanges: exact matching key
// - For topic exchanges: routing pattern with wildcards
// - For fanout exchanges: ignored
RoutingKey string
// QueueName is the name of the queue to declare or consume from
QueueName string
// DelayToReconnect is the time in milliseconds to wait between reconnection attempts
DelayToReconnect int
// PrefetchCount limits the number of unacknowledged messages that can be sent to a consumer
// A value of 0 means no limit (not recommended for production)
PrefetchCount int
// IsConsumer determines whether this client will declare exchanges and queues
// Set to true for consumers, false for publishers that use existing exchanges
IsConsumer bool
// ContentType specifies the MIME type of published messages
// Common values: "application/json", "text/plain", "application/octet-stream"
ContentType string
}
Channel contains configuration for AMQP channels, exchanges, queues, and bindings. These settings determine how messages are routed and processed.
type Client ¶ added in v0.12.0
type Client interface {
// Publish sends a message to RabbitMQ with optional headers.
// The message is sent using the configured exchange and routing key.
Publish(ctx context.Context, msg []byte, headers ...map[string]interface{}) error
// Consume starts consuming messages from the main queue.
// Returns a channel that delivers consumed messages.
Consume(ctx context.Context, wg *sync.WaitGroup) <-chan Message
// ConsumeDLQ starts consuming messages from the dead letter queue (DLQ).
// This allows processing of messages that failed in the main queue.
ConsumeDLQ(ctx context.Context, wg *sync.WaitGroup) <-chan Message
// RetryConnection monitors the connection and automatically reconnects on failure.
// This method should be run in a goroutine.
RetryConnection(cfg Config)
// GracefulShutdown closes all RabbitMQ connections and channels cleanly.
GracefulShutdown()
// GetChannel returns the underlying AMQP channel for direct operations when needed.
GetChannel() *amqp.Channel
}
Client provides a high-level interface for interacting with RabbitMQ. It abstracts connection management, channel operations, and message publishing/consuming.
This interface is implemented by the concrete *RabbitClient type.
type Config ¶
type Config struct {
// Connection contains the settings needed to establish a connection to the RabbitMQ server
Connection Connection
// Channel contains configuration for exchanges, queues, and message routing
Channel Channel
// DeadLetter contains configuration for the dead-letter exchange and queue
// used for handling failed messages
DeadLetter DeadLetter
}
Config defines the top-level configuration structure for the RabbitMQ client. It contains all the necessary configuration sections for establishing connections, setting up channels, and configuring dead-letter behavior.
type Connection ¶
type Connection struct {
// Host is the RabbitMQ server hostname or IP address
Host string
// Port is the RabbitMQ server port (typically 5672 for non-SSL, 5671 for SSL)
Port uint
// User is the RabbitMQ username for authentication
User string
// Password is the RabbitMQ password for authentication
Password string
// IsSSLEnabled determines whether to use SSL/TLS for the connection
// When true, connections will use the AMQPs protocol
IsSSLEnabled bool
// UseCert determines whether to use client certificate authentication
// When true, client certificates will be sent for mutual TLS authentication
UseCert bool
// CACertPath is the file path to the CA certificate for verifying the server
// Used when IsSSLEnabled is true
CACertPath string
// ClientCertPath is the file path to the client certificate
// Used when both IsSSLEnabled and UseCert are true
ClientCertPath string
// ClientKeyPath is the file path to the client certificate's private key
// Used when both IsSSLEnabled and UseCert are true
ClientKeyPath string
// ServerName is the server name to use for TLS verification
// This should match a CN or SAN in the server's certificate
ServerName string
}
Connection contains the configuration parameters needed to establish a connection to a RabbitMQ server, including authentication and TLS settings.
type ConsumerMessage ¶
type ConsumerMessage struct {
// contains filtered or unexported fields
}
ConsumerMessage implements the Message interface and wraps an AMQP delivery. This struct provides access to the message content and acknowledgment methods.
func (*ConsumerMessage) AckMsg ¶
func (rb *ConsumerMessage) AckMsg() error
AckMsg acknowledges the message, informing RabbitMQ that the message has been successfully processed and can be removed from the queue.
Returns an error if the acknowledgment fails.
func (*ConsumerMessage) Body ¶
func (rb *ConsumerMessage) Body() []byte
Body returns the message payload as a byte slice.
func (*ConsumerMessage) Header ¶
func (rb *ConsumerMessage) Header() map[string]interface{}
Header returns the headers associated with the message. Message headers provide metadata about the message and can contain application-specific information set by the message publisher.
Headers are a map of key-value pairs where the keys are strings and values can be of various types. Common uses for headers include:
- Message type identification
- Content format specification
- Routing information
- Tracing context propagation
- Custom application metadata
Returns:
- map[string]interface{}: A map containing the message headers
Example:
msgChan := rabbitClient.Consume(ctx, wg)
for msg := range msgChan {
// Access message headers
headers := msg.Header()
// Check for specific headers
if contentType, ok := headers["content-type"].(string); ok {
fmt.Printf("Content type: %s\n", contentType)
}
// Access trace context from headers for distributed tracing
if traceID, ok := headers["trace-id"].(string); ok {
ctx = tracer.SetTraceID(ctx, traceID)
}
// Process the message...
}
func (*ConsumerMessage) NackMsg ¶
func (rb *ConsumerMessage) NackMsg(requeue bool) error
NackMsg rejects the message. If requeue is true, the message will be returned to the queue for redelivery; otherwise, it will be discarded or sent to a dead-letter exchange if configured.
Parameters:
- requeue: Whether to requeue the message for another delivery attempt
Returns an error if the rejection fails.
type DeadLetter ¶
type DeadLetter struct {
// ExchangeName is the name of the dead-letter exchange
ExchangeName string
// QueueName is the name of the queue bound to the dead-letter exchange
QueueName string
// RoutingKey is the routing key used when dead-lettering messages
// This can be different from the original routing key
RoutingKey string
// Ttl is the time-to-live for messages in seconds
// Messages that remain in the queue longer than this TTL will be dead-lettered
// A value of 0 means no TTL (messages never expire)
Ttl int
}
DeadLetter contains configuration for dead-letter handling. Dead-letter exchanges receive messages that are rejected, expire, or exceed queue limits. This provides a mechanism for handling failed message processing.
type ErrorCategory ¶
type ErrorCategory int
ErrorCategory represents different categories of RabbitMQ errors
const ( CategoryUnknown ErrorCategory = iota CategoryConnection CategoryChannel CategoryAuthentication CategoryPermission CategoryResource CategoryMessage CategoryProtocol CategoryNetwork CategoryServer CategoryConfiguration CategoryCluster CategoryOperation CategoryAlarm CategoryTimeout )
type Logger ¶ added in v0.13.0
type Logger interface {
// InfoWithContext logs an informational message with trace context.
InfoWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
// WarnWithContext logs a warning message with trace context.
WarnWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
// ErrorWithContext logs an error message with trace context.
ErrorWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
}
Logger is an interface that matches the std/v1/logger.Logger interface. It provides context-aware structured logging with optional error and field parameters.
type Message ¶
type Message interface {
// AckMsg acknowledges the message, removing it from the queue.
AckMsg() error
// NackMsg negatively acknowledges the message.
// If requeue is true, the message is requeued; otherwise it goes to DLQ.
NackMsg(requeue bool) error
// Body returns the message payload as a byte slice.
Body() []byte
// Header returns the message headers.
Header() map[string]interface{}
}
Message represents a consumed message from RabbitMQ. It provides methods for acknowledging, rejecting, and accessing message data.
type RabbitClient ¶ added in v0.12.0
type RabbitClient struct {
// Channel is the main AMQP channel used for publishing and consuming messages.
// It's exposed publicly to allow direct operations when needed.
Channel *amqp.Channel
// contains filtered or unexported fields
}
Rabbit represents a client for interacting with RabbitMQ. It manages connections, channels, and provides methods for publishing and consuming messages with automatic reconnection capabilities.
func NewClient ¶
func NewClient(config Config) (*RabbitClient, error)
NewClient creates and initializes a new RabbitMQ client with the provided configuration. This function establishes the initial connection to RabbitMQ, sets up channels, and configures exchanges and queues as specified in the configuration.
Parameters:
- cfg: Configuration for connecting to RabbitMQ and setting up channels
- logger: Logger implementation for recording events and errors
Returns a new RabbitClient instance that is ready to use. If connection fails after all retries or channel setup fails, it will return an error.
Example:
client, err := rabbit.NewClient(config)
if err != nil {
log.Fatal(err)
}
defer client.GracefulShutdown()
func NewClientWithDI ¶
func NewClientWithDI(params RabbitParams) (*RabbitClient, error)
NewClientWithDI creates a new RabbitMQ client using dependency injection. This function is designed to be used with Uber's fx dependency injection framework where dependencies are automatically provided via the RabbitParams struct.
Parameters:
- params: A RabbitParams struct that contains the Config instance and optionally a Logger and Observer instances required to initialize the RabbitMQ client. This struct embeds fx.In to enable automatic injection of these dependencies.
Returns:
- *RabbitClient: A fully initialized RabbitMQ client ready for use.
Example usage with fx:
app := fx.New(
rabbit.FXModule,
logger.FXModule, // Optional: provides logger
fx.Provide(
func() rabbit.Config {
return loadRabbitConfig() // Your config loading function
},
func(metrics *prometheus.Metrics) observability.Observer {
return &MyObserver{metrics: metrics} // Optional observer
},
),
)
Under the hood, this function creates the client and injects the optional logger and observer before returning.
func (*RabbitClient) Consume ¶ added in v0.12.0
Consume starts consuming messages from the queue specified in the configuration. This method provides a channel where consumed messages will be delivered.
Parameters:
- ctx: Context for cancellation control
- wg: WaitGroup for coordinating shutdown
Returns a channel that delivers Message interfaces for each consumed message.
Example:
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
msgChan := rabbitClient.Consume(ctx, wg)
for msg := range msgChan {
// Process the message
fmt.Println("Received:", string(msg.Body()))
// Acknowledge successful processing
if err := msg.AckMsg(); err != nil {
log.Printf("Failed to ack message: %v", err)
}
}
func (*RabbitClient) ConsumeDLQ ¶ added in v0.12.0
ConsumeDLQ starts consuming messages from the dead-letter queue. This method is useful for processing failed messages sent to the dead-letter queue.
Parameters:
- ctx: Context for cancellation control
- wg: WaitGroup for coordinating shutdown
Returns a channel that delivers Message interfaces for each consumed message from the dead-letter queue.
Example:
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dlqChan := rabbitClient.ConsumeDLQ(ctx, wg)
for msg := range dlqChan {
// Process the failed message
fmt.Println("Failed message:", string(msg.Body()))
// Acknowledge after processing
msg.AckMsg()
}
func (*RabbitClient) GetChannel ¶ added in v0.12.0
func (rb *RabbitClient) GetChannel() *amqp.Channel
GetChannel returns the underlying AMQP channel for direct operations when needed. This allows advanced users to access RabbitMQ-specific functionality.
func (*RabbitClient) GetErrorCategory ¶ added in v0.12.0
func (r *RabbitClient) GetErrorCategory(err error) ErrorCategory
GetErrorCategory returns the category of the given error
func (*RabbitClient) GracefulShutdown ¶ added in v0.12.0
func (rb *RabbitClient) GracefulShutdown()
GracefulShutdown closes the RabbitMQ client's connections and channels cleanly. This method ensures that all resources are properly released when the application is shutting down.
The shutdown process: 1. Signals all goroutines to stop by closing the shutdownSignal channel 2. Acquires a lock to prevent concurrent access during shutdown 3. Closes the AMQP channel if it exists 4. Closes the AMQP connection if it exists and is not already closed
Any errors during shutdown are logged but not propagated, as they typically cannot be handled at this stage of application shutdown.
func (*RabbitClient) IsAlarmError ¶ added in v0.12.0
func (r *RabbitClient) IsAlarmError(err error) bool
IsAlarmError returns true if the error is alarm-related
func (*RabbitClient) IsAuthenticationError ¶ added in v0.12.0
func (r *RabbitClient) IsAuthenticationError(err error) bool
IsAuthenticationError returns true if the error is authentication-related
func (*RabbitClient) IsChannelError ¶ added in v0.12.0
func (r *RabbitClient) IsChannelError(err error) bool
IsChannelError returns true if the error is channel-related
func (*RabbitClient) IsConnectionError ¶ added in v0.12.0
func (r *RabbitClient) IsConnectionError(err error) bool
IsConnectionError returns true if the error is connection-related
func (*RabbitClient) IsPermanentError ¶ added in v0.12.0
func (r *RabbitClient) IsPermanentError(err error) bool
IsPermanentError returns true if the error is permanent and should not be retried
func (*RabbitClient) IsResourceError ¶ added in v0.12.0
func (r *RabbitClient) IsResourceError(err error) bool
IsResourceError returns true if the error is resource-related
func (*RabbitClient) IsRetryableError ¶ added in v0.12.0
func (r *RabbitClient) IsRetryableError(err error) bool
IsRetryableError returns true if the error is retryable
func (*RabbitClient) IsTemporaryError ¶ added in v0.12.0
func (r *RabbitClient) IsTemporaryError(err error) bool
IsTemporaryError returns true if the error is temporary
func (*RabbitClient) Publish ¶ added in v0.12.0
func (rb *RabbitClient) Publish(ctx context.Context, msg []byte, headers ...map[string]interface{}) error
Publish sends a message to the RabbitMQ exchange specified in the configuration. This method is thread-safe and respects context cancellation.
Parameters:
- ctx: Context for cancellation control
- msg: Message payload as a byte slice
- header: Optional message headers as a map of key-value pairs; can be used for metadata and distributed tracing propagation
The headers parameter is particularly useful for distributed tracing, allowing trace context to be propagated across service boundaries through message queues. When using with the tracer package, you can extract trace headers and include them in the message:
traceHeaders := tracerClient.GetCarrier(ctx) err := rabbitClient.Publish(ctx, message, traceHeaders)
Returns an error if publishing fails or if the context is canceled.
Example:
ctx := context.Background()
message := []byte("Hello, RabbitMQ!")
// Basic publishing without headers
err := rabbitClient.Publish(ctx, message, nil)
if err != nil {
log.Printf("Failed to publish message: %v", err)
} else {
log.Println("Message published successfully")
}
Example with distributed tracing:
// Create a span for the publish operation
ctx, span := tracer.StartSpan(ctx, "publish-message")
defer span.End()
// Add relevant attributes to the span
span.SetAttributes(map[string]interface{}{
"message.size": len(message),
"routing.key": rabbitClient.Config().Channel.RoutingKey,
})
// Extract trace context to include in the message headers
traceHeaders := tracerClient.GetCarrier(ctx)
// Publish the message with trace headers
err := rabbitClient.Publish(ctx, message, traceHeaders)
if err != nil {
span.RecordError(err)
log.Printf("Failed to publish message: %v", err)
return err
}
log.Println("Message published successfully with trace context")
func (*RabbitClient) RetryConnection ¶ added in v0.12.0
func (rb *RabbitClient) RetryConnection(cfg Config)
RetryConnection continuously monitors the RabbitMQ connection and automatically re-establishes it if it fails. This method is typically run in a goroutine.
Parameters:
- logger: Logger for recording reconnection events and errors
- cfg: Configuration for establishing new connections
The method will:
- Monitor the connection for closure events
- Attempt to reconnect when the connection is lost
- Re-establish channels and their configurations
- Continue monitoring until the shutdownSignal is received
This provides resilience against network issues and RabbitMQ server restarts.
func (*RabbitClient) TranslateError ¶ added in v0.12.0
func (r *RabbitClient) TranslateError(err error) error
TranslateError converts AMQP/RabbitMQ-specific errors into standardized application errors. This function provides abstraction from the underlying AMQP implementation details, allowing application code to handle errors in a RabbitMQ-agnostic way.
It maps common RabbitMQ errors to the standardized error types defined above. If an error doesn't match any known type, it's returned unchanged.
func (*RabbitClient) WithLogger ¶ added in v0.13.0
func (rb *RabbitClient) WithLogger(logger Logger) *RabbitClient
WithLogger attaches a logger to the RabbitMQ client for internal logging. This method uses the builder pattern and returns the client for method chaining.
The logger will be used for lifecycle events, background worker logs, and cleanup errors. This is particularly useful for debugging and monitoring reconnection behavior.
This is useful for non-FX usage where you want to enable logging after creating the client. When using FX, the logger is automatically injected via NewClientWithDI.
Example:
client, err := rabbit.NewClient(config)
if err != nil {
return err
}
client = client.WithLogger(myLogger)
defer client.GracefulShutdown()
func (*RabbitClient) WithObserver ¶ added in v0.13.0
func (rb *RabbitClient) WithObserver(observer observability.Observer) *RabbitClient
WithObserver attaches an observer to the RabbitMQ client for observability hooks. This method uses the builder pattern and returns the client for method chaining.
The observer will be notified of all publish and consume operations, allowing external systems to track metrics, traces, or other observability data.
This is useful for non-FX usage where you want to attach an observer after creating the client. When using FX, the observer is automatically injected via NewClientWithDI.
Example:
client, err := rabbit.NewClient(config)
if err != nil {
return err
}
client = client.WithObserver(myObserver)
defer client.GracefulShutdown()
type RabbitLifecycleParams ¶
type RabbitLifecycleParams struct {
fx.In
Lifecycle fx.Lifecycle
Client *RabbitClient
Config Config
}
RabbitLifecycleParams groups the dependencies needed for RabbitMQ lifecycle management
type RabbitParams ¶
type RabbitParams struct {
fx.In
Config Config
Logger Logger `optional:"true"`
Observer observability.Observer `optional:"true"`
}
RabbitParams groups the dependencies needed to create a Rabbit client