rabbit

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package rabbit provides functionality for interacting with RabbitMQ.

The rabbit package offers a simplified interface for working with RabbitMQ message queues, providing connection management, message publishing, and consuming capabilities with a focus on reliability and ease of use.

Architecture

This package follows the "accept interfaces, return structs" design pattern:

  • Client interface: Defines the contract for RabbitMQ operations
  • RabbitClient struct: Concrete implementation of the Client interface
  • Message interface: Defines the contract for consumed messages
  • NewClient constructor: Returns *RabbitClient (concrete type)
  • FX module: Provides both *RabbitClient and Client interface for dependency injection

Core Features:

  • Robust connection management with automatic reconnection
  • Simple publishing interface with error handling
  • Consumer interface with automatic acknowledgment handling
  • Dead letter queue support
  • Optional observability hooks for metrics and tracing
  • Optional context-aware logging for lifecycle events
  • Distributed tracing support via message headers

Direct Usage (Without FX)

For simple applications or tests, create a client directly:

import (
	"github.com/Aleph-Alpha/std/v1/rabbit"
	"context"
	"sync"
)

// Create a new RabbitMQ client (returns concrete *RabbitClient)
client, err := rabbit.NewClient(rabbit.Config{
	Connection: rabbit.Connection{
		Host: "localhost",
		Port: 5672,
		User: "guest",
		Password: "guest",
	},
	Channel: rabbit.Channel{
		ExchangeName: "events",
		ExchangeType: "topic",
		RoutingKey:   "user.created",
		QueueName:    "user-events",
		IsConsumer:   true,
	},
})
if err != nil {
	return err
}
defer client.GracefulShutdown()

// Optionally attach logger and observer
client = client.
	WithLogger(myLogger).
	WithObserver(myObserver)

// Publish a message
ctx := context.Background()
message := []byte(`{"id": "123", "name": "John"}`)
err = client.Publish(ctx, message)

Builder Pattern for Optional Dependencies

The client supports optional dependencies via builder methods:

import (
	"github.com/Aleph-Alpha/std/v1/rabbit"
	"github.com/Aleph-Alpha/std/v1/logger"
	"github.com/Aleph-Alpha/std/v1/observability"
)

// Create client with optional logger and observer
client, err := rabbit.NewClient(config)
if err != nil {
	return err
}

// Attach optional dependencies using builder pattern
client = client.
	WithLogger(loggerInstance).    // Optional: for lifecycle logging
	WithObserver(observerInstance)  // Optional: for metrics/tracing

defer client.GracefulShutdown()

FX Module Integration

For production applications using Uber's fx, use the FXModule which provides both the concrete type and interface, with automatic injection of optional dependencies:

import (
	"github.com/Aleph-Alpha/std/v1/rabbit"
	"github.com/Aleph-Alpha/std/v1/logger"
	"github.com/Aleph-Alpha/std/v1/observability"
	"go.uber.org/fx"
)

app := fx.New(
	logger.FXModule,      // Optional: provides std logger
	rabbit.FXModule,      // Provides *RabbitClient and rabbit.Client interface
	fx.Provide(
		func() rabbit.Config {
			return rabbit.Config{
				Connection: rabbit.Connection{
					Host: "localhost",
					Port: 5672,
					User: "guest",
					Password: "guest",
				},
				Channel: rabbit.Channel{
					ExchangeName: "events",
					QueueName:    "user-events",
					IsConsumer:   true,
				},
			}
		},
		// Optional: provide observer for metrics
		func(metrics *prometheus.Metrics) observability.Observer {
			return NewObserverAdapter(metrics)
		},
	),
	fx.Invoke(func(client *rabbit.RabbitClient) {
		// Logger and Observer are automatically injected if provided
		ctx := context.Background()
		client.Publish(ctx, []byte("message"))
	}),
)
app.Run()

The FX module automatically injects optional dependencies:

  • Logger (rabbit.Logger): If provided via fx, automatically attached
  • Observer (observability.Observer): If provided via fx, automatically attached

Observability

The package supports optional observability hooks for tracking operations. When an observer is attached, it will be notified of all publish and consume operations with detailed context.

Observer Integration:

import (
	"github.com/Aleph-Alpha/std/v1/observability"
	"github.com/Aleph-Alpha/std/v1/rabbit"
)

// Create an observer (typically wraps your metrics system)
type MetricsObserver struct {
	metrics *prometheus.Metrics
}

func (o *MetricsObserver) ObserveOperation(ctx observability.OperationContext) {
	// Track metrics based on the operation
	switch ctx.Operation {
	case "produce":
		o.metrics.MessageQueue.MessagesPublished.
			WithLabelValues(ctx.Resource, ctx.SubResource, errorStatus(ctx.Error)).
			Inc()
	case "consume":
		o.metrics.MessageQueue.MessagesConsumed.
			WithLabelValues(ctx.Resource, errorStatus(ctx.Error)).
			Inc()
	}
}

// Attach observer to client
client = client.WithObserver(&MetricsObserver{metrics: promMetrics})

Observer receives the following context for each operation:

Publish operations:

  • Component: "rabbit"
  • Operation: "produce"
  • Resource: exchange name
  • SubResource: routing key
  • Duration: time taken to publish
  • Error: any error that occurred
  • Size: message size in bytes

Consume operations:

  • Component: "rabbit"
  • Operation: "consume"
  • Resource: queue name
  • SubResource: ""
  • Duration: time taken to receive message
  • Error: nil (errors in message processing are not tracked here)
  • Size: message size in bytes

Logging

The package supports optional context-aware logging for lifecycle events and background operations. When a logger is attached, it will be used for:

  • Connection lifecycle events (connected, disconnected, reconnecting)
  • Consumer lifecycle events (started, stopped, shutdown)
  • Background reconnection attempts and errors

Logger Integration:

import (
	"github.com/Aleph-Alpha/std/v1/rabbit"
	"github.com/Aleph-Alpha/std/v1/logger"
)

// Create logger instance
loggerClient, err := logger.NewLogger(loggerConfig)
if err != nil {
	return err
}

// Attach logger to client
client = client.WithLogger(loggerClient)

The logger interface matches std/v1/logger for seamless integration:

type Logger interface {
	InfoWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
	WarnWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
	ErrorWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
}

Logging is designed to be minimal and non-intrusive:

  • Errors that are returned to the caller are NOT logged (avoid duplicate logs)
  • Only background operations and lifecycle events are logged
  • Context is propagated for distributed tracing

Type Aliases in Consumer Code

To simplify your code and make it message-broker-agnostic, use type aliases:

package myapp

import stdRabbit "github.com/Aleph-Alpha/std/v1/rabbit"

// Use type alias to reference std's interface
type RabbitClient = stdRabbit.Client
type RabbitMessage = stdRabbit.Message

// Now use RabbitClient throughout your codebase
func MyFunction(client RabbitClient) {
	client.Publish(ctx, []byte("message"))
}

This eliminates the need for adapters and allows you to switch implementations by only changing the alias definition.

Message Consumption

// Consume messages
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

msgChan := client.Consume(ctx, wg)
for msg := range msgChan {
	// Process the message
	log.Info("Received message", nil, map[string]interface{}{
		"body": string(msg.Body()),
	})

	// Acknowledge the message
	if err := msg.AckMsg(); err != nil {
		log.Error("Failed to acknowledge message", err, nil)
	}
}

Distributed Tracing with Message Headers

This package supports distributed tracing by allowing you to propagate trace context through message headers, enabling end-to-end visibility across services.

Publisher Example (sending trace context):

import (
	"github.com/Aleph-Alpha/std/v1/tracer"
	// other imports...
)

// Create a tracer client
tracerClient := tracer.NewClient(tracerConfig, log)

// Create a span for the operation that includes publishing
ctx, span := tracerClient.StartSpan(ctx, "process-and-publish")
defer span.End()

// Process data...

// Extract trace context as headers before publishing
traceHeaders := tracerClient.GetCarrier(ctx)

// Publish with trace headers
err = rabbitClient.Publish(ctx, message, traceHeaders)
if err != nil {
	span.RecordError(err)
	log.Error("Failed to publish message", err, nil)
}

Consumer Example (continuing the trace):

msgChan := rabbitClient.Consume(ctx, wg)
for msg := range msgChan {
	// Extract trace headers from the message
	headers := msg.Header()

	// Create a new context with the trace information
	ctx = tracerClient.SetCarrierOnContext(ctx, headers)

	// Create a span as a child of the incoming trace
	ctx, span := tracerClient.StartSpan(ctx, "process-message")
	defer span.End()

	// Add relevant attributes to the span
	span.SetAttributes(map[string]interface{}{
		"message.size": len(msg.Body()),
		"message.type": "user.created",
	})

	// Process the message...

	if err := processMessage(msg.Body()) {
		// Record any errors in the span
		span.RecordError(err)
		msg.NackMsg(true) // Requeue for retry
		continue
	}

	// Acknowledge successful processing
	if err := msg.AckMsg(); err != nil {
		span.RecordError(err)
		log.Error("Failed to acknowledge message", err, nil)
	}
}

Consuming from Dead Letter Queue:

dlqChan := client.ConsumeDLQ(ctx, wg)
for msg := range dlqChan {
	log.Info("Processing failed message", nil, map[string]interface{}{
		"body": string(msg.Body()),
	})

	// Process the failed message

	// Acknowledge after processing
	msg.AckMsg()
}

FX Module Integration:

This package provides a fx module for easy integration:

app := fx.New(
	logger.Module,
	rabbit.Module,
	// ... other modules
)
app.Run()

Configuration:

The rabbit client can be configured via environment variables or explicitly:

RABBIT_URI=amqp://guest:guest@localhost:5672/
RABBIT_EXCHANGE_NAME=events
RABBIT_EXCHANGE_TYPE=topic
RABBIT_ROUTING_KEY=user.created
RABBIT_QUEUE_NAME=user-events

Thread Safety:

All methods on the Rabbit type are safe for concurrent use by multiple goroutines, except for Close() which should only be called once.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrConnectionFailed is returned when connection to RabbitMQ cannot be established
	ErrConnectionFailed = errors.New("connection failed")

	// ErrConnectionLost is returned when connection to RabbitMQ is lost
	ErrConnectionLost = errors.New("connection lost")

	// ErrConnectionClosed is returned when connection is closed
	ErrConnectionClosed = errors.New("connection closed")

	// ErrChannelClosed is returned when channel is closed
	ErrChannelClosed = errors.New("channel closed")

	// ErrChannelException is returned when channel encounters an exception
	ErrChannelException = errors.New("channel exception")

	// ErrAuthenticationFailed is returned when authentication fails
	ErrAuthenticationFailed = errors.New("authentication failed")

	// ErrAccessDenied is returned when access is denied to a resource
	ErrAccessDenied = errors.New("access denied")

	// ErrInsufficientPermissions is returned when user lacks necessary permissions
	ErrInsufficientPermissions = errors.New("insufficient permissions")

	// ErrInvalidCredentials is returned when credentials are invalid
	ErrInvalidCredentials = errors.New("invalid credentials")

	// ErrExchangeNotFound is returned when exchange doesn't exist
	ErrExchangeNotFound = errors.New("exchange not found")

	// ErrQueueNotFound is returned when queue doesn't exist
	ErrQueueNotFound = errors.New("queue not found")

	// ErrQueueEmpty is returned when queue is empty
	ErrQueueEmpty = errors.New("queue empty")

	// ErrQueueExists is returned when queue already exists with different properties
	ErrQueueExists = errors.New("queue already exists")

	// ErrExchangeExists is returned when exchange already exists with different properties
	ErrExchangeExists = errors.New("exchange already exists")

	// ErrResourceLocked is returned when resource is locked
	ErrResourceLocked = errors.New("resource locked")

	// ErrPreconditionFailed is returned when precondition check fails
	ErrPreconditionFailed = errors.New("precondition failed")

	// ErrInvalidArgument is returned when argument is invalid
	ErrInvalidArgument = errors.New("invalid argument")

	// ErrInvalidFrameFormat is returned when frame format is invalid
	ErrInvalidFrameFormat = errors.New("invalid frame format")

	// ErrInvalidFrameSize is returned when frame size is invalid
	ErrInvalidFrameSize = errors.New("invalid frame size")

	// ErrFrameError is returned for frame-related errors
	ErrFrameError = errors.New("frame error")

	// ErrSyntaxError is returned for syntax errors in AMQP protocol
	ErrSyntaxError = errors.New("syntax error")

	// ErrCommandInvalid is returned when command is invalid
	ErrCommandInvalid = errors.New("command invalid")

	// ErrChannelError is returned for channel-related errors
	ErrChannelError = errors.New("channel error")

	// ErrUnexpectedFrame is returned when unexpected frame is received
	ErrUnexpectedFrame = errors.New("unexpected frame")

	// ErrResourceError is returned for resource-related errors
	ErrResourceError = errors.New("resource error")

	// ErrNotAllowed is returned when operation is not allowed
	ErrNotAllowed = errors.New("not allowed")

	// ErrNotImplemented is returned when feature is not implemented
	ErrNotImplemented = errors.New("not implemented")

	// ErrInternalError is returned for internal errors
	ErrInternalError = errors.New("internal error")

	// ErrTimeout is returned when operation times out
	ErrTimeout = errors.New("timeout")

	// ErrNetworkError is returned for network-related errors
	ErrNetworkError = errors.New("network error")

	// ErrTLSError is returned for TLS/SSL errors
	ErrTLSError = errors.New("TLS error")

	// ErrCertificateError is returned for certificate-related errors
	ErrCertificateError = errors.New("certificate error")

	// ErrHandshakeFailed is returned when handshake fails
	ErrHandshakeFailed = errors.New("handshake failed")

	// ErrProtocolError is returned for protocol-related errors
	ErrProtocolError = errors.New("protocol error")

	// ErrVersionMismatch is returned when version mismatch occurs
	ErrVersionMismatch = errors.New("version mismatch")

	// ErrServerError is returned for server-side errors
	ErrServerError = errors.New("server error")

	// ErrClientError is returned for client-side errors
	ErrClientError = errors.New("client error")

	// ErrMessageTooLarge is returned when message exceeds size limits
	ErrMessageTooLarge = errors.New("message too large")

	// ErrInvalidMessage is returned when message format is invalid
	ErrInvalidMessage = errors.New("invalid message")

	// ErrMessageNacked is returned when message is negatively acknowledged
	ErrMessageNacked = errors.New("message nacked")

	// ErrMessageReturned is returned when message is returned by broker
	ErrMessageReturned = errors.New("message returned")

	// ErrPublishFailed is returned when publish operation fails
	ErrPublishFailed = errors.New("publish failed")

	// ErrConsumeFailed is returned when consume operation fails
	ErrConsumeFailed = errors.New("consume failed")

	// ErrAckFailed is returned when acknowledge operation fails
	ErrAckFailed = errors.New("acknowledge failed")

	// ErrNackFailed is returned when negative acknowledge operation fails
	ErrNackFailed = errors.New("negative acknowledge failed")

	// ErrRejectFailed is returned when reject operation fails
	ErrRejectFailed = errors.New("reject failed")

	// ErrQoSFailed is returned when QoS operation fails
	ErrQoSFailed = errors.New("QoS failed")

	// ErrBindFailed is returned when bind operation fails
	ErrBindFailed = errors.New("bind failed")

	// ErrUnbindFailed is returned when unbind operation fails
	ErrUnbindFailed = errors.New("unbind failed")

	// ErrDeclareFailed is returned when declare operation fails
	ErrDeclareFailed = errors.New("declare failed")

	// ErrDeleteFailed is returned when delete operation fails
	ErrDeleteFailed = errors.New("delete failed")

	// ErrPurgeFailed is returned when purge operation fails
	ErrPurgeFailed = errors.New("purge failed")

	// ErrTransactionFailed is returned when transaction fails
	ErrTransactionFailed = errors.New("transaction failed")

	// ErrCancelled is returned when operation is cancelled
	ErrCancelled = errors.New("operation cancelled")

	// ErrShutdown is returned when system is shutting down
	ErrShutdown = errors.New("shutdown")

	// ErrConfigurationError is returned for configuration-related errors
	ErrConfigurationError = errors.New("configuration error")

	// ErrVirtualHostNotFound is returned when virtual host doesn't exist
	ErrVirtualHostNotFound = errors.New("virtual host not found")

	// ErrUserNotFound is returned when user doesn't exist
	ErrUserNotFound = errors.New("user not found")

	// ErrClusterError is returned for cluster-related errors
	ErrClusterError = errors.New("cluster error")

	// ErrNodeDown is returned when node is down
	ErrNodeDown = errors.New("node down")

	// ErrMemoryAlarm is returned when memory alarm is triggered
	ErrMemoryAlarm = errors.New("memory alarm")

	// ErrDiskAlarm is returned when disk alarm is triggered
	ErrDiskAlarm = errors.New("disk alarm")

	// ErrResourceAlarm is returned when resource alarm is triggered
	ErrResourceAlarm = errors.New("resource alarm")

	// ErrFlowControl is returned when flow control is active
	ErrFlowControl = errors.New("flow control")

	// ErrQuotaExceeded is returned when quota is exceeded
	ErrQuotaExceeded = errors.New("quota exceeded")

	// ErrRateLimit is returned when rate limit is exceeded
	ErrRateLimit = errors.New("rate limit exceeded")

	// ErrBackpressure is returned when backpressure is applied
	ErrBackpressure = errors.New("backpressure")

	// ErrUnknownError is returned for unknown/unhandled errors
	ErrUnknownError = errors.New("unknown error")
)

Common RabbitMQ error types that can be used by consumers of this package. These provide a standardized set of errors that abstract away the underlying AMQP-specific error details.

View Source
var FXModule = fx.Module("rabbit",
	fx.Provide(
		NewClientWithDI,

		fx.Annotate(
			func(r *RabbitClient) Client { return r },
			fx.As(new(Client)),
		),
	),
	fx.Invoke(RegisterRabbitLifecycle),
)

FXModule is an fx.Module that provides and configures the RabbitMQ client. This module registers the RabbitMQ client with the Fx dependency injection framework, making it available to other components in the application.

The module provides: 1. *RabbitClient (concrete type) for direct use 2. Client interface for dependency injection 3. Lifecycle management for graceful startup and shutdown

Usage:

app := fx.New(
    rabbit.FXModule,
    // other modules...
)

Functions

func RegisterRabbitLifecycle

func RegisterRabbitLifecycle(params RabbitLifecycleParams)

RegisterRabbitLifecycle registers the RabbitMQ client with the fx lifecycle system. This function sets up proper initialization and graceful shutdown of the RabbitMQ client, including starting the connection monitoring goroutine.

Parameters:

  • lc: The fx lifecycle controller
  • client: The RabbitMQ client instance
  • logger: Logger for recording lifecycle events
  • cfg: Configuration for the RabbitMQ client

The function:

  1. On application start: Launches a background goroutine that monitors and maintains the RabbitMQ connection, automatically reconnecting if it fails.
  2. On application stop: Triggers a graceful shutdown of the RabbitMQ client, closing channels and connections cleanly.

This ensures that the RabbitMQ client remains available throughout the application's lifetime and is properly cleaned up during shutdown.

Types

type Channel

type Channel struct {
	// ExchangeName is the name of the exchange to publish to or consume from
	ExchangeName string

	// ExchangeType defines the routing behavior of the exchange
	// Common values: "direct", "fanout", "topic", "headers"
	ExchangeType string

	// RoutingKey is used for routing messages from exchanges to queues
	// The meaning depends on the exchange type:
	// - For direct exchanges: exact matching key
	// - For topic exchanges: routing pattern with wildcards
	// - For fanout exchanges: ignored
	RoutingKey string

	// QueueName is the name of the queue to declare or consume from
	QueueName string

	// DelayToReconnect is the time in milliseconds to wait between reconnection attempts
	DelayToReconnect int

	// PrefetchCount limits the number of unacknowledged messages that can be sent to a consumer
	// A value of 0 means no limit (not recommended for production)
	PrefetchCount int

	// IsConsumer determines whether this client will declare exchanges and queues
	// Set to true for consumers, false for publishers that use existing exchanges
	IsConsumer bool

	// ContentType specifies the MIME type of published messages
	// Common values: "application/json", "text/plain", "application/octet-stream"
	ContentType string
}

Channel contains configuration for AMQP channels, exchanges, queues, and bindings. These settings determine how messages are routed and processed.

type Client added in v0.12.0

type Client interface {

	// Publish sends a message to RabbitMQ with optional headers.
	// The message is sent using the configured exchange and routing key.
	Publish(ctx context.Context, msg []byte, headers ...map[string]interface{}) error

	// Consume starts consuming messages from the main queue.
	// Returns a channel that delivers consumed messages.
	Consume(ctx context.Context, wg *sync.WaitGroup) <-chan Message

	// ConsumeDLQ starts consuming messages from the dead letter queue (DLQ).
	// This allows processing of messages that failed in the main queue.
	ConsumeDLQ(ctx context.Context, wg *sync.WaitGroup) <-chan Message

	// RetryConnection monitors the connection and automatically reconnects on failure.
	// This method should be run in a goroutine.
	RetryConnection(cfg Config)

	// GracefulShutdown closes all RabbitMQ connections and channels cleanly.
	GracefulShutdown()

	// GetChannel returns the underlying AMQP channel for direct operations when needed.
	GetChannel() *amqp.Channel
}

Client provides a high-level interface for interacting with RabbitMQ. It abstracts connection management, channel operations, and message publishing/consuming.

This interface is implemented by the concrete *RabbitClient type.

type Config

type Config struct {
	// Connection contains the settings needed to establish a connection to the RabbitMQ server
	Connection Connection

	// Channel contains configuration for exchanges, queues, and message routing
	Channel Channel

	// DeadLetter contains configuration for the dead-letter exchange and queue
	// used for handling failed messages
	DeadLetter DeadLetter
}

Config defines the top-level configuration structure for the RabbitMQ client. It contains all the necessary configuration sections for establishing connections, setting up channels, and configuring dead-letter behavior.

type Connection

type Connection struct {
	// Host is the RabbitMQ server hostname or IP address
	Host string

	// Port is the RabbitMQ server port (typically 5672 for non-SSL, 5671 for SSL)
	Port uint

	// User is the RabbitMQ username for authentication
	User string

	// Password is the RabbitMQ password for authentication
	Password string

	// IsSSLEnabled determines whether to use SSL/TLS for the connection
	// When true, connections will use the AMQPs protocol
	IsSSLEnabled bool

	// UseCert determines whether to use client certificate authentication
	// When true, client certificates will be sent for mutual TLS authentication
	UseCert bool

	// CACertPath is the file path to the CA certificate for verifying the server
	// Used when IsSSLEnabled is true
	CACertPath string

	// ClientCertPath is the file path to the client certificate
	// Used when both IsSSLEnabled and UseCert are true
	ClientCertPath string

	// ClientKeyPath is the file path to the client certificate's private key
	// Used when both IsSSLEnabled and UseCert are true
	ClientKeyPath string

	// ServerName is the server name to use for TLS verification
	// This should match a CN or SAN in the server's certificate
	ServerName string
}

Connection contains the configuration parameters needed to establish a connection to a RabbitMQ server, including authentication and TLS settings.

type ConsumerMessage

type ConsumerMessage struct {
	// contains filtered or unexported fields
}

ConsumerMessage implements the Message interface and wraps an AMQP delivery. This struct provides access to the message content and acknowledgment methods.

func (*ConsumerMessage) AckMsg

func (rb *ConsumerMessage) AckMsg() error

AckMsg acknowledges the message, informing RabbitMQ that the message has been successfully processed and can be removed from the queue.

Returns an error if the acknowledgment fails.

func (*ConsumerMessage) Body

func (rb *ConsumerMessage) Body() []byte

Body returns the message payload as a byte slice.

func (*ConsumerMessage) Header

func (rb *ConsumerMessage) Header() map[string]interface{}

Header returns the headers associated with the message. Message headers provide metadata about the message and can contain application-specific information set by the message publisher.

Headers are a map of key-value pairs where the keys are strings and values can be of various types. Common uses for headers include:

  • Message type identification
  • Content format specification
  • Routing information
  • Tracing context propagation
  • Custom application metadata

Returns:

  • map[string]interface{}: A map containing the message headers

Example:

msgChan := rabbitClient.Consume(ctx, wg)
for msg := range msgChan {
    // Access message headers
    headers := msg.Header()

    // Check for specific headers
    if contentType, ok := headers["content-type"].(string); ok {
        fmt.Printf("Content type: %s\n", contentType)
    }

    // Access trace context from headers for distributed tracing
    if traceID, ok := headers["trace-id"].(string); ok {
        ctx = tracer.SetTraceID(ctx, traceID)
    }

    // Process the message...
}

func (*ConsumerMessage) NackMsg

func (rb *ConsumerMessage) NackMsg(requeue bool) error

NackMsg rejects the message. If requeue is true, the message will be returned to the queue for redelivery; otherwise, it will be discarded or sent to a dead-letter exchange if configured.

Parameters:

  • requeue: Whether to requeue the message for another delivery attempt

Returns an error if the rejection fails.

type DeadLetter

type DeadLetter struct {
	// ExchangeName is the name of the dead-letter exchange
	ExchangeName string

	// QueueName is the name of the queue bound to the dead-letter exchange
	QueueName string

	// RoutingKey is the routing key used when dead-lettering messages
	// This can be different from the original routing key
	RoutingKey string

	// Ttl is the time-to-live for messages in seconds
	// Messages that remain in the queue longer than this TTL will be dead-lettered
	// A value of 0 means no TTL (messages never expire)
	Ttl int
}

DeadLetter contains configuration for dead-letter handling. Dead-letter exchanges receive messages that are rejected, expire, or exceed queue limits. This provides a mechanism for handling failed message processing.

type ErrorCategory

type ErrorCategory int

ErrorCategory represents different categories of RabbitMQ errors

const (
	CategoryUnknown ErrorCategory = iota
	CategoryConnection
	CategoryChannel
	CategoryAuthentication
	CategoryPermission
	CategoryResource
	CategoryMessage
	CategoryProtocol
	CategoryNetwork
	CategoryServer
	CategoryConfiguration
	CategoryCluster
	CategoryOperation
	CategoryAlarm
	CategoryTimeout
)

type Logger added in v0.13.0

type Logger interface {
	// InfoWithContext logs an informational message with trace context.
	InfoWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})

	// WarnWithContext logs a warning message with trace context.
	WarnWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})

	// ErrorWithContext logs an error message with trace context.
	ErrorWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
}

Logger is an interface that matches the std/v1/logger.Logger interface. It provides context-aware structured logging with optional error and field parameters.

type Message

type Message interface {
	// AckMsg acknowledges the message, removing it from the queue.
	AckMsg() error

	// NackMsg negatively acknowledges the message.
	// If requeue is true, the message is requeued; otherwise it goes to DLQ.
	NackMsg(requeue bool) error

	// Body returns the message payload as a byte slice.
	Body() []byte

	// Header returns the message headers.
	Header() map[string]interface{}
}

Message represents a consumed message from RabbitMQ. It provides methods for acknowledging, rejecting, and accessing message data.

type RabbitClient added in v0.12.0

type RabbitClient struct {

	// Channel is the main AMQP channel used for publishing and consuming messages.
	// It's exposed publicly to allow direct operations when needed.
	Channel *amqp.Channel
	// contains filtered or unexported fields
}

Rabbit represents a client for interacting with RabbitMQ. It manages connections, channels, and provides methods for publishing and consuming messages with automatic reconnection capabilities.

func NewClient

func NewClient(config Config) (*RabbitClient, error)

NewClient creates and initializes a new RabbitMQ client with the provided configuration. This function establishes the initial connection to RabbitMQ, sets up channels, and configures exchanges and queues as specified in the configuration.

Parameters:

  • cfg: Configuration for connecting to RabbitMQ and setting up channels
  • logger: Logger implementation for recording events and errors

Returns a new RabbitClient instance that is ready to use. If connection fails after all retries or channel setup fails, it will return an error.

Example:

client, err := rabbit.NewClient(config)
if err != nil {
	log.Fatal(err)
}
defer client.GracefulShutdown()

func NewClientWithDI

func NewClientWithDI(params RabbitParams) (*RabbitClient, error)

NewClientWithDI creates a new RabbitMQ client using dependency injection. This function is designed to be used with Uber's fx dependency injection framework where dependencies are automatically provided via the RabbitParams struct.

Parameters:

  • params: A RabbitParams struct that contains the Config instance and optionally a Logger and Observer instances required to initialize the RabbitMQ client. This struct embeds fx.In to enable automatic injection of these dependencies.

Returns:

  • *RabbitClient: A fully initialized RabbitMQ client ready for use.

Example usage with fx:

app := fx.New(
    rabbit.FXModule,
    logger.FXModule,  // Optional: provides logger
    fx.Provide(
        func() rabbit.Config {
            return loadRabbitConfig() // Your config loading function
        },
        func(metrics *prometheus.Metrics) observability.Observer {
            return &MyObserver{metrics: metrics}  // Optional observer
        },
    ),
)

Under the hood, this function creates the client and injects the optional logger and observer before returning.

func (*RabbitClient) Consume added in v0.12.0

func (rb *RabbitClient) Consume(ctx context.Context, wg *sync.WaitGroup) <-chan Message

Consume starts consuming messages from the queue specified in the configuration. This method provides a channel where consumed messages will be delivered.

Parameters:

  • ctx: Context for cancellation control
  • wg: WaitGroup for coordinating shutdown

Returns a channel that delivers Message interfaces for each consumed message.

Example:

wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

msgChan := rabbitClient.Consume(ctx, wg)
for msg := range msgChan {
    // Process the message
    fmt.Println("Received:", string(msg.Body()))

    // Acknowledge successful processing
    if err := msg.AckMsg(); err != nil {
        log.Printf("Failed to ack message: %v", err)
    }
}

func (*RabbitClient) ConsumeDLQ added in v0.12.0

func (rb *RabbitClient) ConsumeDLQ(ctx context.Context, wg *sync.WaitGroup) <-chan Message

ConsumeDLQ starts consuming messages from the dead-letter queue. This method is useful for processing failed messages sent to the dead-letter queue.

Parameters:

  • ctx: Context for cancellation control
  • wg: WaitGroup for coordinating shutdown

Returns a channel that delivers Message interfaces for each consumed message from the dead-letter queue.

Example:

wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dlqChan := rabbitClient.ConsumeDLQ(ctx, wg)
for msg := range dlqChan {
    // Process the failed message
    fmt.Println("Failed message:", string(msg.Body()))

    // Acknowledge after processing
    msg.AckMsg()
}

func (*RabbitClient) GetChannel added in v0.12.0

func (rb *RabbitClient) GetChannel() *amqp.Channel

GetChannel returns the underlying AMQP channel for direct operations when needed. This allows advanced users to access RabbitMQ-specific functionality.

func (*RabbitClient) GetErrorCategory added in v0.12.0

func (r *RabbitClient) GetErrorCategory(err error) ErrorCategory

GetErrorCategory returns the category of the given error

func (*RabbitClient) GracefulShutdown added in v0.12.0

func (rb *RabbitClient) GracefulShutdown()

GracefulShutdown closes the RabbitMQ client's connections and channels cleanly. This method ensures that all resources are properly released when the application is shutting down.

The shutdown process: 1. Signals all goroutines to stop by closing the shutdownSignal channel 2. Acquires a lock to prevent concurrent access during shutdown 3. Closes the AMQP channel if it exists 4. Closes the AMQP connection if it exists and is not already closed

Any errors during shutdown are logged but not propagated, as they typically cannot be handled at this stage of application shutdown.

func (*RabbitClient) IsAlarmError added in v0.12.0

func (r *RabbitClient) IsAlarmError(err error) bool

IsAlarmError returns true if the error is alarm-related

func (*RabbitClient) IsAuthenticationError added in v0.12.0

func (r *RabbitClient) IsAuthenticationError(err error) bool

IsAuthenticationError returns true if the error is authentication-related

func (*RabbitClient) IsChannelError added in v0.12.0

func (r *RabbitClient) IsChannelError(err error) bool

IsChannelError returns true if the error is channel-related

func (*RabbitClient) IsConnectionError added in v0.12.0

func (r *RabbitClient) IsConnectionError(err error) bool

IsConnectionError returns true if the error is connection-related

func (*RabbitClient) IsPermanentError added in v0.12.0

func (r *RabbitClient) IsPermanentError(err error) bool

IsPermanentError returns true if the error is permanent and should not be retried

func (*RabbitClient) IsResourceError added in v0.12.0

func (r *RabbitClient) IsResourceError(err error) bool

IsResourceError returns true if the error is resource-related

func (*RabbitClient) IsRetryableError added in v0.12.0

func (r *RabbitClient) IsRetryableError(err error) bool

IsRetryableError returns true if the error is retryable

func (*RabbitClient) IsTemporaryError added in v0.12.0

func (r *RabbitClient) IsTemporaryError(err error) bool

IsTemporaryError returns true if the error is temporary

func (*RabbitClient) Publish added in v0.12.0

func (rb *RabbitClient) Publish(ctx context.Context, msg []byte, headers ...map[string]interface{}) error

Publish sends a message to the RabbitMQ exchange specified in the configuration. This method is thread-safe and respects context cancellation.

Parameters:

  • ctx: Context for cancellation control
  • msg: Message payload as a byte slice
  • header: Optional message headers as a map of key-value pairs; can be used for metadata and distributed tracing propagation

The headers parameter is particularly useful for distributed tracing, allowing trace context to be propagated across service boundaries through message queues. When using with the tracer package, you can extract trace headers and include them in the message:

traceHeaders := tracerClient.GetCarrier(ctx)
err := rabbitClient.Publish(ctx, message, traceHeaders)

Returns an error if publishing fails or if the context is canceled.

Example:

ctx := context.Background()
message := []byte("Hello, RabbitMQ!")

// Basic publishing without headers
err := rabbitClient.Publish(ctx, message, nil)
if err != nil {
    log.Printf("Failed to publish message: %v", err)
} else {
    log.Println("Message published successfully")
}

Example with distributed tracing:

// Create a span for the publish operation
ctx, span := tracer.StartSpan(ctx, "publish-message")
defer span.End()

// Add relevant attributes to the span
span.SetAttributes(map[string]interface{}{
    "message.size": len(message),
    "routing.key": rabbitClient.Config().Channel.RoutingKey,
})

// Extract trace context to include in the message headers
traceHeaders := tracerClient.GetCarrier(ctx)

// Publish the message with trace headers
err := rabbitClient.Publish(ctx, message, traceHeaders)
if err != nil {
    span.RecordError(err)
    log.Printf("Failed to publish message: %v", err)
    return err
}

log.Println("Message published successfully with trace context")

func (*RabbitClient) RetryConnection added in v0.12.0

func (rb *RabbitClient) RetryConnection(cfg Config)

RetryConnection continuously monitors the RabbitMQ connection and automatically re-establishes it if it fails. This method is typically run in a goroutine.

Parameters:

  • logger: Logger for recording reconnection events and errors
  • cfg: Configuration for establishing new connections

The method will:

  • Monitor the connection for closure events
  • Attempt to reconnect when the connection is lost
  • Re-establish channels and their configurations
  • Continue monitoring until the shutdownSignal is received

This provides resilience against network issues and RabbitMQ server restarts.

func (*RabbitClient) TranslateError added in v0.12.0

func (r *RabbitClient) TranslateError(err error) error

TranslateError converts AMQP/RabbitMQ-specific errors into standardized application errors. This function provides abstraction from the underlying AMQP implementation details, allowing application code to handle errors in a RabbitMQ-agnostic way.

It maps common RabbitMQ errors to the standardized error types defined above. If an error doesn't match any known type, it's returned unchanged.

func (*RabbitClient) WithLogger added in v0.13.0

func (rb *RabbitClient) WithLogger(logger Logger) *RabbitClient

WithLogger attaches a logger to the RabbitMQ client for internal logging. This method uses the builder pattern and returns the client for method chaining.

The logger will be used for lifecycle events, background worker logs, and cleanup errors. This is particularly useful for debugging and monitoring reconnection behavior.

This is useful for non-FX usage where you want to enable logging after creating the client. When using FX, the logger is automatically injected via NewClientWithDI.

Example:

client, err := rabbit.NewClient(config)
if err != nil {
    return err
}
client = client.WithLogger(myLogger)
defer client.GracefulShutdown()

func (*RabbitClient) WithObserver added in v0.13.0

func (rb *RabbitClient) WithObserver(observer observability.Observer) *RabbitClient

WithObserver attaches an observer to the RabbitMQ client for observability hooks. This method uses the builder pattern and returns the client for method chaining.

The observer will be notified of all publish and consume operations, allowing external systems to track metrics, traces, or other observability data.

This is useful for non-FX usage where you want to attach an observer after creating the client. When using FX, the observer is automatically injected via NewClientWithDI.

Example:

client, err := rabbit.NewClient(config)
if err != nil {
    return err
}
client = client.WithObserver(myObserver)
defer client.GracefulShutdown()

type RabbitLifecycleParams

type RabbitLifecycleParams struct {
	fx.In

	Lifecycle fx.Lifecycle
	Client    *RabbitClient
	Config    Config
}

RabbitLifecycleParams groups the dependencies needed for RabbitMQ lifecycle management

type RabbitParams

type RabbitParams struct {
	fx.In

	Config   Config
	Logger   Logger                 `optional:"true"`
	Observer observability.Observer `optional:"true"`
}

RabbitParams groups the dependencies needed to create a Rabbit client

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL