Documentation
¶
Overview ¶
Package rabbit provides functionality for interacting with RabbitMQ.
The rabbit package offers a simplified interface for working with RabbitMQ message queues, providing connection management, message publishing, and consuming capabilities with a focus on reliability and ease of use.
Core Features:
- Robust connection management with automatic reconnection
- Simple publishing interface with error handling
- Consumer interface with automatic acknowledgment handling
- Dead letter queue support
- Integration with the Logger package for structured logging
- Distributed tracing support via message headers
Basic Usage:
import (
"github.com/Aleph-Alpha/std/v1/rabbit"
"github.com/Aleph-Alpha/std/v1/logger"
"context"
"sync"
)
// Create a new RabbitMQ client
client, err := rabbit.New(rabbit.Config{
Connection: rabbit.ConnectionConfig{
URI: "amqp://guest:guest@localhost:5672/",
},
Channel: rabbit.ChannelConfig{
ExchangeName: "events",
ExchangeType: "topic",
RoutingKey: "user.created",
QueueName: "user-events",
ContentType: "application/json",
},
}, log)
if err != nil {
log.Fatal("Failed to connect to RabbitMQ", err, nil)
}
defer client.Close()
// Publish a message
ctx := context.Background()
message := []byte(`{"id": "123", "name": "John"}`)
err = client.Publish(ctx, message, nil)
if err != nil {
log.Error("Failed to publish message", err, nil)
}
// Consume messages
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
msgChan := client.Consume(ctx, wg)
for msg := range msgChan {
log.Info("Received message", nil, map[string]interface{}{
"body": string(msg.Body()),
})
// Process the message
// Acknowledge the message
if err := msg.AckMsg(); err != nil {
log.Error("Failed to acknowledge message", err, nil)
}
}
Distributed Tracing with Message Headers:
This package supports distributed tracing by allowing you to propagate trace context through message headers, enabling end-to-end visibility across services.
Publisher Example (sending trace context):
import (
"github.com/Aleph-Alpha/std/v1/tracer"
// other imports...
)
// Create a tracer client
tracerClient := tracer.NewClient(tracerConfig, log)
// Create a span for the operation that includes publishing
ctx, span := tracerClient.StartSpan(ctx, "process-and-publish")
defer span.End()
// Process data...
// Extract trace context as headers before publishing
traceHeaders := tracerClient.GetCarrier(ctx)
// Publish with trace headers
err = rabbitClient.Publish(ctx, message, traceHeaders)
if err != nil {
span.RecordError(err)
log.Error("Failed to publish message", err, nil)
}
Consumer Example (continuing the trace):
msgChan := rabbitClient.Consume(ctx, wg)
for msg := range msgChan {
// Extract trace headers from the message
headers := msg.Header()
// Create a new context with the trace information
ctx = tracerClient.SetCarrierOnContext(ctx, headers)
// Create a span as a child of the incoming trace
ctx, span := tracerClient.StartSpan(ctx, "process-message")
defer span.End()
// Add relevant attributes to the span
span.SetAttributes(map[string]interface{}{
"message.size": len(msg.Body()),
"message.type": "user.created",
})
// Process the message...
if err := processMessage(msg.Body()) {
// Record any errors in the span
span.RecordError(err)
msg.NackMsg(true) // Requeue for retry
continue
}
// Acknowledge successful processing
if err := msg.AckMsg(); err != nil {
span.RecordError(err)
log.Error("Failed to acknowledge message", err, nil)
}
}
Consuming from Dead Letter Queue:
dlqChan := client.ConsumeDLQ(ctx, wg)
for msg := range dlqChan {
log.Info("Processing failed message", nil, map[string]interface{}{
"body": string(msg.Body()),
})
// Process the failed message
// Acknowledge after processing
msg.AckMsg()
}
FX Module Integration:
This package provides a fx module for easy integration:
app := fx.New( logger.Module, rabbit.Module, // ... other modules ) app.Run()
Configuration:
The rabbit client can be configured via environment variables or explicitly:
RABBIT_URI=amqp://guest:guest@localhost:5672/ RABBIT_EXCHANGE_NAME=events RABBIT_EXCHANGE_TYPE=topic RABBIT_ROUTING_KEY=user.created RABBIT_QUEUE_NAME=user-events
Thread Safety:
All methods on the Rabbit type are safe for concurrent use by multiple goroutines, except for Close() which should only be called once.
Index ¶
- Variables
- func RegisterRabbitLifecycle(params RabbitLifecycleParams)
- type Channel
- type Config
- type Connection
- type ConsumerMessage
- type DeadLetter
- type ErrorCategory
- type Message
- type Rabbit
- func (rb *Rabbit) Consume(ctx context.Context, wg *sync.WaitGroup) <-chan Message
- func (rb *Rabbit) ConsumeDLQ(ctx context.Context, wg *sync.WaitGroup) <-chan Message
- func (r *Rabbit) GetErrorCategory(err error) ErrorCategory
- func (rb *Rabbit) GracefulShutdown()
- func (r *Rabbit) IsAlarmError(err error) bool
- func (r *Rabbit) IsAuthenticationError(err error) bool
- func (r *Rabbit) IsChannelError(err error) bool
- func (r *Rabbit) IsConnectionError(err error) bool
- func (r *Rabbit) IsPermanentError(err error) bool
- func (r *Rabbit) IsResourceError(err error) bool
- func (r *Rabbit) IsRetryableError(err error) bool
- func (r *Rabbit) IsTemporaryError(err error) bool
- func (rb *Rabbit) Publish(ctx context.Context, msg []byte, headers ...map[string]interface{}) error
- func (rb *Rabbit) RetryConnection(cfg Config)
- func (r *Rabbit) TranslateError(err error) error
- type RabbitLifecycleParams
- type RabbitParams
Constants ¶
This section is empty.
Variables ¶
var ( // ErrConnectionFailed is returned when connection to RabbitMQ cannot be established ErrConnectionFailed = errors.New("connection failed") // ErrConnectionLost is returned when connection to RabbitMQ is lost ErrConnectionLost = errors.New("connection lost") // ErrConnectionClosed is returned when connection is closed ErrConnectionClosed = errors.New("connection closed") // ErrChannelClosed is returned when channel is closed ErrChannelClosed = errors.New("channel closed") // ErrChannelException is returned when channel encounters an exception ErrChannelException = errors.New("channel exception") // ErrAuthenticationFailed is returned when authentication fails ErrAuthenticationFailed = errors.New("authentication failed") // ErrAccessDenied is returned when access is denied to a resource ErrAccessDenied = errors.New("access denied") // ErrInsufficientPermissions is returned when user lacks necessary permissions ErrInsufficientPermissions = errors.New("insufficient permissions") // ErrInvalidCredentials is returned when credentials are invalid ErrInvalidCredentials = errors.New("invalid credentials") // ErrExchangeNotFound is returned when exchange doesn't exist ErrExchangeNotFound = errors.New("exchange not found") // ErrQueueNotFound is returned when queue doesn't exist ErrQueueNotFound = errors.New("queue not found") // ErrQueueEmpty is returned when queue is empty ErrQueueEmpty = errors.New("queue empty") // ErrQueueExists is returned when queue already exists with different properties ErrQueueExists = errors.New("queue already exists") // ErrExchangeExists is returned when exchange already exists with different properties ErrExchangeExists = errors.New("exchange already exists") // ErrResourceLocked is returned when resource is locked ErrResourceLocked = errors.New("resource locked") // ErrPreconditionFailed is returned when precondition check fails ErrPreconditionFailed = errors.New("precondition failed") // ErrInvalidArgument is returned when argument is invalid ErrInvalidArgument = errors.New("invalid argument") // ErrInvalidFrameFormat is returned when frame format is invalid ErrInvalidFrameFormat = errors.New("invalid frame format") // ErrInvalidFrameSize is returned when frame size is invalid ErrInvalidFrameSize = errors.New("invalid frame size") // ErrFrameError is returned for frame-related errors ErrFrameError = errors.New("frame error") // ErrSyntaxError is returned for syntax errors in AMQP protocol ErrSyntaxError = errors.New("syntax error") // ErrCommandInvalid is returned when command is invalid ErrCommandInvalid = errors.New("command invalid") // ErrChannelError is returned for channel-related errors ErrChannelError = errors.New("channel error") // ErrUnexpectedFrame is returned when unexpected frame is received ErrUnexpectedFrame = errors.New("unexpected frame") // ErrResourceError is returned for resource-related errors ErrResourceError = errors.New("resource error") // ErrNotAllowed is returned when operation is not allowed ErrNotAllowed = errors.New("not allowed") // ErrNotImplemented is returned when feature is not implemented ErrNotImplemented = errors.New("not implemented") // ErrInternalError is returned for internal errors ErrInternalError = errors.New("internal error") // ErrTimeout is returned when operation times out ErrTimeout = errors.New("timeout") // ErrNetworkError is returned for network-related errors ErrNetworkError = errors.New("network error") // ErrTLSError is returned for TLS/SSL errors ErrTLSError = errors.New("TLS error") // ErrCertificateError is returned for certificate-related errors ErrCertificateError = errors.New("certificate error") // ErrHandshakeFailed is returned when handshake fails ErrHandshakeFailed = errors.New("handshake failed") // ErrProtocolError is returned for protocol-related errors ErrProtocolError = errors.New("protocol error") // ErrVersionMismatch is returned when version mismatch occurs ErrVersionMismatch = errors.New("version mismatch") // ErrServerError is returned for server-side errors ErrServerError = errors.New("server error") // ErrClientError is returned for client-side errors ErrClientError = errors.New("client error") // ErrMessageTooLarge is returned when message exceeds size limits ErrMessageTooLarge = errors.New("message too large") // ErrInvalidMessage is returned when message format is invalid ErrInvalidMessage = errors.New("invalid message") // ErrMessageNacked is returned when message is negatively acknowledged ErrMessageNacked = errors.New("message nacked") // ErrMessageReturned is returned when message is returned by broker ErrMessageReturned = errors.New("message returned") // ErrPublishFailed is returned when publish operation fails ErrPublishFailed = errors.New("publish failed") // ErrConsumeFailed is returned when consume operation fails ErrConsumeFailed = errors.New("consume failed") // ErrAckFailed is returned when acknowledge operation fails ErrAckFailed = errors.New("acknowledge failed") // ErrNackFailed is returned when negative acknowledge operation fails ErrNackFailed = errors.New("negative acknowledge failed") // ErrRejectFailed is returned when reject operation fails ErrRejectFailed = errors.New("reject failed") // ErrQoSFailed is returned when QoS operation fails ErrQoSFailed = errors.New("QoS failed") // ErrBindFailed is returned when bind operation fails ErrBindFailed = errors.New("bind failed") // ErrUnbindFailed is returned when unbind operation fails ErrUnbindFailed = errors.New("unbind failed") // ErrDeclareFailed is returned when declare operation fails ErrDeclareFailed = errors.New("declare failed") // ErrDeleteFailed is returned when delete operation fails ErrDeleteFailed = errors.New("delete failed") // ErrPurgeFailed is returned when purge operation fails ErrPurgeFailed = errors.New("purge failed") // ErrTransactionFailed is returned when transaction fails ErrTransactionFailed = errors.New("transaction failed") // ErrCancelled is returned when operation is cancelled ErrCancelled = errors.New("operation cancelled") // ErrShutdown is returned when system is shutting down ErrShutdown = errors.New("shutdown") // ErrConfigurationError is returned for configuration-related errors ErrConfigurationError = errors.New("configuration error") // ErrVirtualHostNotFound is returned when virtual host doesn't exist ErrVirtualHostNotFound = errors.New("virtual host not found") // ErrUserNotFound is returned when user doesn't exist ErrUserNotFound = errors.New("user not found") // ErrClusterError is returned for cluster-related errors ErrClusterError = errors.New("cluster error") // ErrNodeDown is returned when node is down ErrNodeDown = errors.New("node down") // ErrMemoryAlarm is returned when memory alarm is triggered ErrMemoryAlarm = errors.New("memory alarm") // ErrDiskAlarm is returned when disk alarm is triggered ErrDiskAlarm = errors.New("disk alarm") // ErrResourceAlarm is returned when resource alarm is triggered ErrResourceAlarm = errors.New("resource alarm") // ErrFlowControl is returned when flow control is active ErrFlowControl = errors.New("flow control") // ErrQuotaExceeded is returned when quota is exceeded ErrQuotaExceeded = errors.New("quota exceeded") // ErrRateLimit is returned when rate limit is exceeded ErrRateLimit = errors.New("rate limit exceeded") // ErrBackpressure is returned when backpressure is applied ErrBackpressure = errors.New("backpressure") // ErrUnknownError is returned for unknown/unhandled errors ErrUnknownError = errors.New("unknown error") )
Common RabbitMQ error types that can be used by consumers of this package. These provide a standardized set of errors that abstract away the underlying AMQP-specific error details.
var FXModule = fx.Module("rabbit", fx.Provide( NewClientWithDI, ), fx.Invoke(RegisterRabbitLifecycle), )
FXModule is an fx.Module that provides and configures the RabbitMQ client. This module registers the RabbitMQ client with the Fx dependency injection framework, making it available to other components in the application.
The module: 1. Provides the RabbitMQ client factory function 2. Invokes the lifecycle registration to manage the client's lifecycle
Usage:
app := fx.New(
rabbit.FXModule,
// other modules...
)
Functions ¶
func RegisterRabbitLifecycle ¶
func RegisterRabbitLifecycle(params RabbitLifecycleParams)
RegisterRabbitLifecycle registers the RabbitMQ client with the fx lifecycle system. This function sets up proper initialization and graceful shutdown of the RabbitMQ client, including starting the connection monitoring goroutine.
Parameters:
- lc: The fx lifecycle controller
- client: The RabbitMQ client instance
- logger: Logger for recording lifecycle events
- cfg: Configuration for the RabbitMQ client
The function:
- On application start: Launches a background goroutine that monitors and maintains the RabbitMQ connection, automatically reconnecting if it fails.
- On application stop: Triggers a graceful shutdown of the RabbitMQ client, closing channels and connections cleanly.
This ensures that the RabbitMQ client remains available throughout the application's lifetime and is properly cleaned up during shutdown.
Types ¶
type Channel ¶
type Channel struct {
// ExchangeName is the name of the exchange to publish to or consume from
ExchangeName string
// ExchangeType defines the routing behavior of the exchange
// Common values: "direct", "fanout", "topic", "headers"
ExchangeType string
// RoutingKey is used for routing messages from exchanges to queues
// The meaning depends on the exchange type:
// - For direct exchanges: exact matching key
// - For topic exchanges: routing pattern with wildcards
// - For fanout exchanges: ignored
RoutingKey string
// QueueName is the name of the queue to declare or consume from
QueueName string
// DelayToReconnect is the time in milliseconds to wait between reconnection attempts
DelayToReconnect int
// PrefetchCount limits the number of unacknowledged messages that can be sent to a consumer
// A value of 0 means no limit (not recommended for production)
PrefetchCount int
// IsConsumer determines whether this client will declare exchanges and queues
// Set to true for consumers, false for publishers that use existing exchanges
IsConsumer bool
// ContentType specifies the MIME type of published messages
// Common values: "application/json", "text/plain", "application/octet-stream"
ContentType string
}
Channel contains configuration for AMQP channels, exchanges, queues, and bindings. These settings determine how messages are routed and processed.
type Config ¶
type Config struct {
// Connection contains the settings needed to establish a connection to the RabbitMQ server
Connection Connection
// Channel contains configuration for exchanges, queues, and message routing
Channel Channel
// DeadLetter contains configuration for the dead-letter exchange and queue
// used for handling failed messages
DeadLetter DeadLetter
}
Config defines the top-level configuration structure for the RabbitMQ client. It contains all the necessary configuration sections for establishing connections, setting up channels, and configuring dead-letter behavior.
type Connection ¶
type Connection struct {
// Host is the RabbitMQ server hostname or IP address
Host string
// Port is the RabbitMQ server port (typically 5672 for non-SSL, 5671 for SSL)
Port uint
// User is the RabbitMQ username for authentication
User string
// Password is the RabbitMQ password for authentication
Password string
// IsSSLEnabled determines whether to use SSL/TLS for the connection
// When true, connections will use the AMQPs protocol
IsSSLEnabled bool
// UseCert determines whether to use client certificate authentication
// When true, client certificates will be sent for mutual TLS authentication
UseCert bool
// CACertPath is the file path to the CA certificate for verifying the server
// Used when IsSSLEnabled is true
CACertPath string
// ClientCertPath is the file path to the client certificate
// Used when both IsSSLEnabled and UseCert are true
ClientCertPath string
// ClientKeyPath is the file path to the client certificate's private key
// Used when both IsSSLEnabled and UseCert are true
ClientKeyPath string
// ServerName is the server name to use for TLS verification
// This should match a CN or SAN in the server's certificate
ServerName string
}
Connection contains the configuration parameters needed to establish a connection to a RabbitMQ server, including authentication and TLS settings.
type ConsumerMessage ¶
type ConsumerMessage struct {
// contains filtered or unexported fields
}
ConsumerMessage implements the Message interface and wraps an AMQP delivery. This struct provides access to the message content and acknowledgment methods.
func (*ConsumerMessage) AckMsg ¶
func (rb *ConsumerMessage) AckMsg() error
AckMsg acknowledges the message, informing RabbitMQ that the message has been successfully processed and can be removed from the queue.
Returns an error if the acknowledgment fails.
func (*ConsumerMessage) Body ¶
func (rb *ConsumerMessage) Body() []byte
Body returns the message payload as a byte slice.
func (*ConsumerMessage) Header ¶
func (rb *ConsumerMessage) Header() map[string]interface{}
Header returns the headers associated with the message. Message headers provide metadata about the message and can contain application-specific information set by the message publisher.
Headers are a map of key-value pairs where the keys are strings and values can be of various types. Common uses for headers include:
- Message type identification
- Content format specification
- Routing information
- Tracing context propagation
- Custom application metadata
Returns:
- map[string]interface{}: A map containing the message headers
Example:
msgChan := rabbitClient.Consume(ctx, wg)
for msg := range msgChan {
// Access message headers
headers := msg.Header()
// Check for specific headers
if contentType, ok := headers["content-type"].(string); ok {
fmt.Printf("Content type: %s\n", contentType)
}
// Access trace context from headers for distributed tracing
if traceID, ok := headers["trace-id"].(string); ok {
ctx = tracer.SetTraceID(ctx, traceID)
}
// Process the message...
}
func (*ConsumerMessage) NackMsg ¶
func (rb *ConsumerMessage) NackMsg(requeue bool) error
NackMsg rejects the message. If requeue is true, the message will be returned to the queue for redelivery; otherwise, it will be discarded or sent to a dead-letter exchange if configured.
Parameters:
- requeue: Whether to requeue the message for another delivery attempt
Returns an error if the rejection fails.
type DeadLetter ¶
type DeadLetter struct {
// ExchangeName is the name of the dead-letter exchange
ExchangeName string
// QueueName is the name of the queue bound to the dead-letter exchange
QueueName string
// RoutingKey is the routing key used when dead-lettering messages
// This can be different from the original routing key
RoutingKey string
// Ttl is the time-to-live for messages in seconds
// Messages that remain in the queue longer than this TTL will be dead-lettered
// A value of 0 means no TTL (messages never expire)
Ttl int
}
DeadLetter contains configuration for dead-letter handling. Dead-letter exchanges receive messages that are rejected, expire, or exceed queue limits. This provides a mechanism for handling failed message processing.
type ErrorCategory ¶
type ErrorCategory int
ErrorCategory represents different categories of RabbitMQ errors
const ( CategoryUnknown ErrorCategory = iota CategoryConnection CategoryChannel CategoryAuthentication CategoryPermission CategoryResource CategoryMessage CategoryProtocol CategoryNetwork CategoryServer CategoryConfiguration CategoryCluster CategoryOperation CategoryAlarm CategoryTimeout )
type Message ¶
type Message interface {
// AckMsg acknowledges the message, informing RabbitMQ that the message
// has been successfully processed and can be removed from the queue.
AckMsg() error
// NackMsg rejects the message. If requeue is true, the message will be
// returned to the queue for redelivery; otherwise, it will be discarded
// or sent to a dead-letter exchange if configured.
NackMsg(requeue bool) error
// Body returns the message payload as a byte slice.
Body() []byte
// Header returns the headers associated with the message.
// Message headers provide metadata about the message and can contain
// application-specific information set by the message publisher.
//
// Headers are a map of key-value pairs where the keys are strings
// and values can be of various types. Common uses for headers include:
// - Message type identification
// - Content format specification
// - Routing information
// - Tracing context propagation
// - Custom application metadata
//
// For distributed tracing with OpenTelemetry, headers can carry trace
// context between services, enabling end-to-end tracing across
// message-based communication.
Header() map[string]interface{}
}
Message defines the interface for consumed messages from RabbitMQ. This interface abstracts the underlying AMQP message structure and provides methods for acknowledging or rejecting messages.
type Rabbit ¶
type Rabbit struct {
// Channel is the main AMQP channel used for publishing and consuming messages.
// It's exposed publicly to allow direct operations when needed.
Channel *amqp.Channel
// contains filtered or unexported fields
}
Rabbit represents a client for interacting with RabbitMQ. It manages connections, channels, and provides methods for publishing and consuming messages with automatic reconnection capabilities.
func NewClient ¶
NewClient creates and initializes a new RabbitMQ client with the provided configuration. This function establishes the initial connection to RabbitMQ, sets up channels, and configures exchanges and queues as specified in the configuration.
Parameters:
- cfg: Configuration for connecting to RabbitMQ and setting up channels
- logger: Logger implementation for recording events and errors
Returns a new Rabbit client instance that is ready to use. If connection fails after all retries or channel setup fails, it will log a fatal error.
Example:
client := rabbit.NewClient(config, myLogger) defer client.Close()
func NewClientWithDI ¶
func NewClientWithDI(params RabbitParams) (*Rabbit, error)
NewClientWithDI creates a new RabbitMQ client using dependency injection. This function is designed to be used with Uber's fx dependency injection framework where dependencies are automatically provided via the RabbitParams struct.
Parameters:
- params: A RabbitParams struct that contains the Config and Logger instances required to initialize the RabbitMQ client. This struct embeds fx.In to enable automatic injection of these dependencies.
Returns:
- *Rabbit: A fully initialized RabbitMQ client ready for use.
Example usage with fx:
app := fx.New(
rabbit.FXModule,
fx.Provide(
func() rabbit.Config {
return loadRabbitConfig() // Your config loading function
},
func() rabbit.Logger {
return initLogger() // Your logger initialization
},
),
)
Under the hood, this function simply delegates to the standard NewClient function, making it easier to integrate with dependency injection frameworks while maintaining the same initialization logic.
func (*Rabbit) Consume ¶
Consume starts consuming messages from the queue specified in the configuration. This method provides a channel where consumed messages will be delivered.
Parameters:
- ctx: Context for cancellation control
- wg: WaitGroup for coordinating shutdown
Returns a channel that delivers Message interfaces for each consumed message.
Example:
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
msgChan := rabbitClient.Consume(ctx, wg)
for msg := range msgChan {
// Process the message
fmt.Println("Received:", string(msg.Body()))
// Acknowledge successful processing
if err := msg.AckMsg(); err != nil {
log.Printf("Failed to ack message: %v", err)
}
}
func (*Rabbit) ConsumeDLQ ¶
ConsumeDLQ starts consuming messages from the dead-letter queue. This method is useful for processing failed messages sent to the dead-letter queue.
Parameters:
- ctx: Context for cancellation control
- wg: WaitGroup for coordinating shutdown
Returns a channel that delivers Message interfaces for each consumed message from the dead-letter queue.
Example:
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dlqChan := rabbitClient.ConsumeDLQ(ctx, wg)
for msg := range dlqChan {
// Process the failed message
fmt.Println("Failed message:", string(msg.Body()))
// Acknowledge after processing
msg.AckMsg()
}
func (*Rabbit) GetErrorCategory ¶
func (r *Rabbit) GetErrorCategory(err error) ErrorCategory
GetErrorCategory returns the category of the given error
func (*Rabbit) GracefulShutdown ¶
func (rb *Rabbit) GracefulShutdown()
GracefulShutdown closes the RabbitMQ client's connections and channels cleanly. This method ensures that all resources are properly released when the application is shutting down.
The shutdown process: 1. Signals all goroutines to stop by closing the shutdownSignal channel 2. Acquires a lock to prevent concurrent access during shutdown 3. Closes the AMQP channel if it exists 4. Closes the AMQP connection if it exists and is not already closed
Any errors during shutdown are logged but not propagated, as they typically cannot be handled at this stage of application shutdown.
func (*Rabbit) IsAlarmError ¶
IsAlarmError returns true if the error is alarm-related
func (*Rabbit) IsAuthenticationError ¶
IsAuthenticationError returns true if the error is authentication-related
func (*Rabbit) IsChannelError ¶
IsChannelError returns true if the error is channel-related
func (*Rabbit) IsConnectionError ¶
IsConnectionError returns true if the error is connection-related
func (*Rabbit) IsPermanentError ¶
IsPermanentError returns true if the error is permanent and should not be retried
func (*Rabbit) IsResourceError ¶
IsResourceError returns true if the error is resource-related
func (*Rabbit) IsRetryableError ¶
IsRetryableError returns true if the error is retryable
func (*Rabbit) IsTemporaryError ¶
IsTemporaryError returns true if the error is temporary
func (*Rabbit) Publish ¶
Publish sends a message to the RabbitMQ exchange specified in the configuration. This method is thread-safe and respects context cancellation.
Parameters:
- ctx: Context for cancellation control
- msg: Message payload as a byte slice
- header: Optional message headers as a map of key-value pairs; can be used for metadata and distributed tracing propagation
The headers parameter is particularly useful for distributed tracing, allowing trace context to be propagated across service boundaries through message queues. When using with the tracer package, you can extract trace headers and include them in the message:
traceHeaders := tracerClient.GetCarrier(ctx) err := rabbitClient.Publish(ctx, message, traceHeaders)
Returns an error if publishing fails or if the context is canceled.
Example:
ctx := context.Background()
message := []byte("Hello, RabbitMQ!")
// Basic publishing without headers
err := rabbitClient.Publish(ctx, message, nil)
if err != nil {
log.Printf("Failed to publish message: %v", err)
} else {
log.Println("Message published successfully")
}
Example with distributed tracing:
// Create a span for the publish operation
ctx, span := tracer.StartSpan(ctx, "publish-message")
defer span.End()
// Add relevant attributes to the span
span.SetAttributes(map[string]interface{}{
"message.size": len(message),
"routing.key": rabbitClient.Config().Channel.RoutingKey,
})
// Extract trace context to include in the message headers
traceHeaders := tracerClient.GetCarrier(ctx)
// Publish the message with trace headers
err := rabbitClient.Publish(ctx, message, traceHeaders)
if err != nil {
span.RecordError(err)
log.Printf("Failed to publish message: %v", err)
return err
}
log.Println("Message published successfully with trace context")
func (*Rabbit) RetryConnection ¶
RetryConnection continuously monitors the RabbitMQ connection and automatically re-establishes it if it fails. This method is typically run in a goroutine.
Parameters:
- logger: Logger for recording reconnection events and errors
- cfg: Configuration for establishing new connections
The method will:
- Monitor the connection for closure events
- Attempt to reconnect when the connection is lost
- Re-establish channels and their configurations
- Continue monitoring until the shutdownSignal is received
This provides resilience against network issues and RabbitMQ server restarts.
func (*Rabbit) TranslateError ¶
TranslateError converts AMQP/RabbitMQ-specific errors into standardized application errors. This function provides abstraction from the underlying AMQP implementation details, allowing application code to handle errors in a RabbitMQ-agnostic way.
It maps common RabbitMQ errors to the standardized error types defined above. If an error doesn't match any known type, it's returned unchanged.
type RabbitLifecycleParams ¶
RabbitLifecycleParams groups the dependencies needed for RabbitMQ lifecycle management
type RabbitParams ¶
RabbitParams groups the dependencies needed to create a Rabbit client