grmq

package module
v1.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: MIT Imports: 10 Imported by: 1

README

GRMQ

Go Rabbit MQ

Go Reference Build and test codecov Go Report Card

What are the typical use-cases for RabbitMQ broker ?

  • We create a durable topology (exchanges, queues, bindings).
  • Begin queue consuming (commonly in several goroutines with prefetch count) and use DLQ to avoid poison messages.
  • If we can't handle message at this time, we can retry a bit later (some external service is not available for instance)
  • Also, we expect that if something happens with connection, we can reestablish it and continue our work transparently.
  • Graceful shutdown to reduce probability of message duplication.

All of those commonly used cases are implemented in the package.

High abstraction wrapper for amqp091-go. Inspired by http package and cony

Features

  • re-connection support
  • graceful shutdown support
  • flexible context.Context based api
  • middlewares for publishers and consumers
  • DLQ declaration out of the box
  • flexible retries

Complete Example

type LogObserver struct {
	grmq.NoopObserver
}

func (o LogObserver) ClientError(err error) {
	log.Printf("rmq client error: %v", err)
}

func (o LogObserver) ConsumerError(consumer consumer.Consumer, err error) {
	log.Printf("unexpected consumer error (queue=%s): %v", consumer.Queue, err)
}

func main() {
	url := amqpUrl()

	pub := publisher.New(
		"exchange",
		"test",
		publisher.WithMiddlewares(publisher.PersistentMode()),
	)

	simpleHandler := consumer.HandlerFunc(func(ctx context.Context, delivery *consumer.Delivery) {
		log.Printf("message body: %s, queue: %s", delivery.Source().Body, delivery.Source().RoutingKey)
		err := delivery.Ack()
		if err != nil {
			panic(err)
		}
	})
	simpleConsumer := consumer.New(
		simpleHandler,
		"queue",
		consumer.WithConcurrency(32),   //default 1
		consumer.WithPrefetchCount(32), //default 1
	)

	retryPolicy := retry.NewPolicy(
		true, //move to dlq after last failed try
		retry.WithDelay(500*time.Millisecond, 1),
		retry.WithDelay(1*time.Second, 1),
		retry.WithDelay(2*time.Second, 1),
	)
	retryHandler := consumer.HandlerFunc(func(ctx context.Context, delivery *consumer.Delivery) {
		log.Printf("message body: %s, queue: %s", delivery.Source().Body, delivery.Source().RoutingKey)
		err := delivery.Retry()
		if err != nil {
			panic(err)
		}
	})
	retryConsumer := consumer.New(
		retryHandler,
		"retryQueue",
		consumer.WithRetryPolicy(retryPolicy),
	)

	cli := grmq.New(
		url,
		grmq.WithPublishers(pub),
		grmq.WithConsumers(simpleConsumer, retryConsumer),
		grmq.WithTopologyBuilding(
			topology.WithQueue("queue", topology.WithDLQ(true)),
			//you MUST declare queue with the same retry policy
			topology.WithQueue("retryQueue", topology.WithRetryPolicy(retryPolicy)),
			topology.WithDirectExchange("exchange"),
			topology.WithBinding("exchange", "queue", "test"),
		),
		grmq.WithReconnectTimeout(3*time.Second), //default 1s
		grmq.WithObserver(LogObserver{}),
	)
	//it tries to connect
	//declare topology
	//init publishers and consumers
	//returns first occurred error or nil 
	//or you can use  cli.Serve(context.Background()), which is completely non-blocking
	err := cli.Run(context.Background())
	if err != nil {
		panic(err)
	}

	err = pub.Publish(context.Background(), &amqp091.Publishing{Body: []byte("hello world")})
	if err != nil {
		panic(err)
	}

	//you may use any publisher to send message to any exchange
	err = pub.PublishTo(context.Background(), "", "retryQueue", &amqp091.Publishing{Body: []byte("retry me")})
	if err != nil {
		panic(err)
	}

	time.Sleep(10 * time.Second)

	cli.Shutdown()
}

Retries

This is quite fresh feature implemented in 1.4.0. Before using it you must know how it works under the hood. It combines two mechanisms: DLQ + TTL

Lets say we use policy below for queue test

retryPolicy := retry.NewPolicy(
	true,
	retry.WithDelay(500*time.Millisecond, 1),
	retry.WithDelay(1*time.Second, 1),
	retry.WithDelay(2*time.Second, 1), 
)

This configuration will create

  • exchange with name default-dead-letter
  • 4 extra queues
    • test.DLQ
    • test.retry.500
    • test.retry.1000
    • test.retry.2000
  • each retry queue will have x-message-ttl property equal to its delay
  • each retry queue will have DLX routing to the original queue test
  • consumer.Delivery.Retry() will find a suitable queue by grmq-retry-count header(0 by default), increment grmq-retry-count header, directly publish with confirmation to the queue, manually acknowledge the delivery
  • if there is no suitable retry option and moveToDql is true, it moves the message to test.DLQ
  • otherwise, it performs ack

Recommendation: If you want to change retry policy for a queue, before doing it, ensure there is no messages in retry queues.

Don't forget to delete old retry queues.

State and road map

  • the package is used in production (reconnection works perfect)

Documentation

Overview

Package grmq provides a high-level client for interacting with RabbitMQ, supporting publishers, consumers, automatic reconnection, and topology management.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client manages connections to a RabbitMQ broker, handling publishers, consumers, and topology declarations with automatic reconnection support.

func New

func New(url string, options ...ClientOption) *Client

New creates a new Client instance with the specified RabbitMQ URL and optional configuration. Default settings include a 10-second heartbeat and 30-second dial timeout.

func (*Client) Run

func (s *Client) Run(ctx context.Context) error

Run blocks until a successful RabbitMQ session is established. It ensures all topology declarations are applied, all publishers are initialized, and all consumers are running. Returns the first error encountered during session initialization, or nil if successful.

func (*Client) Serve

func (s *Client) Serve(ctx context.Context)

Serve starts the client without blocking for the first successful session. Unlike Run, it immediately returns and relies on the Observer to handle errors and manage reconnection attempts.

func (*Client) Shutdown

func (s *Client) Shutdown()

Shutdown performs a graceful shutdown of the client, closing all publishers, consumers, and the underlying connection.

func (*Client) UnsafeConnection added in v1.11.0

func (s *Client) UnsafeConnection() *amqp.Connection

UnsafeConnection returns the underlying *amqp.Connection for the active session. Returns nil if no session is currently connected.

Warning: This method provides direct access to the RabbitMQ connection and should be used with caution.

type ClientOption

type ClientOption func(c *Client)

ClientOption is a function that configures a Client instance.

func WithConsumers

func WithConsumers(consumers ...consumer.Consumer) ClientOption

WithConsumers sets the consumers for the client.

func WithDeclarations

func WithDeclarations(declarations topology.Declarations) ClientOption

WithDeclarations sets the topology declarations for the client.

func WithDialConfig

func WithDialConfig(config DialConfig) ClientOption

WithDialConfig sets the dial configuration for connecting to RabbitMQ.

func WithObserver

func WithObserver(observer Observer) ClientOption

WithObserver sets the observer for monitoring client events.

func WithPublishers

func WithPublishers(publishers ...*publisher.Publisher) ClientOption

WithPublishers sets the publishers for the client.

func WithReconnectTimeout

func WithReconnectTimeout(timeout time.Duration) ClientOption

WithReconnectTimeout sets the timeout between reconnection attempts.

func WithTopologyBuilding

func WithTopologyBuilding(options ...topology.DeclarationsOption) ClientOption

WithTopologyBuilding creates and sets topology declarations using the provided options.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer wraps a consumer instance with connection management and retry support.

func NewConsumer

func NewConsumer(cfg consumer.Consumer, ch *amqp.Channel, retryPub *Publisher, observer Observer) *Consumer

NewConsumer creates a new Consumer instance with the specified configuration and optional retry support.

func (*Consumer) Close

func (c *Consumer) Close() error

Close stops the consumer, waits for all in-flight deliveries to complete, and closes the channel.

func (*Consumer) Run

func (c *Consumer) Run() error

Run starts the consumer and begins processing messages. Configures QoS if prefetch count is set, and spawns worker goroutines based on the configured concurrency level.

type Declarator

type Declarator struct {
	// contains filtered or unexported fields
}

Declarator manages RabbitMQ topology declarations (exchanges, queues, bindings).

func NewDeclarator

func NewDeclarator(cfg topology.Declarations, ch *amqp.Channel) *Declarator

NewDeclarator creates a new Declarator with the specified topology configuration.

func (*Declarator) Close

func (c *Declarator) Close() error

Close closes the declarator's channel.

func (*Declarator) Run

func (c *Declarator) Run() error

Run declares all exchanges, queues, and bindings in the configured topology. Returns an error if any declaration fails.

type DialConfig

type DialConfig struct {
	amqp.Config
	DialTimeout time.Duration
}

DialConfig wraps amqp.Config with additional dial timeout configuration.

type NoopObserver

type NoopObserver struct {
}

NoopObserver provides a default no-op implementation of the Observer interface. Use this when you don't need to observe client events. You can also embed NoopObserver in your observer struct

func (NoopObserver) ClientError

func (n NoopObserver) ClientError(err error)

func (NoopObserver) ClientReady

func (n NoopObserver) ClientReady()

func (NoopObserver) ConnectionBlocked added in v1.8.0

func (n NoopObserver) ConnectionBlocked(reason string)

func (NoopObserver) ConnectionUnblocked added in v1.8.0

func (n NoopObserver) ConnectionUnblocked()

func (NoopObserver) ConsumerError

func (n NoopObserver) ConsumerError(consumer consumer.Consumer, err error)

func (NoopObserver) PublisherError

func (n NoopObserver) PublisherError(publisher *publisher.Publisher, err error)

func (NoopObserver) PublisherReconnected added in v1.10.0

func (n NoopObserver) PublisherReconnected(publisher *publisher.Publisher)

func (NoopObserver) PublishingFlow

func (n NoopObserver) PublishingFlow(publisher *publisher.Publisher, flow bool)

func (NoopObserver) ShutdownDone

func (n NoopObserver) ShutdownDone()

func (NoopObserver) ShutdownStarted

func (n NoopObserver) ShutdownStarted()

type Observer

type Observer interface {
	ClientReady()
	ClientError(err error)
	ConsumerError(consumer consumer.Consumer, err error)
	PublisherError(publisher *publisher.Publisher, err error)
	PublishingFlow(publisher *publisher.Publisher, flow bool)
	PublisherReconnected(publisher *publisher.Publisher)
	ConnectionBlocked(reason string)
	ConnectionUnblocked()
	ShutdownStarted()
	ShutdownDone()
}

Observer defines an interface for monitoring client lifecycle events and errors. Implement this interface to receive notifications about connection state, publisher/consumer errors, and shutdown events.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher wraps a publisher instance with connection management, confirmation support, and automatic reconnection on channel failures.

Reconnection Behavior: The publisher automatically reconnects when a precondition failed error occurs (e.g., exchange or queue declaration mismatch). Reconnection creates a new channel, resets confirmation mode, and re-registers the round tripper.

Note: Reconnection is attempted only once per error. If reconnection fails, the publisher stops processing and reports the error via the Observer.

func NewPublisher

func NewPublisher(publisher *publisher2.Publisher, ch *amqp.Channel, observer Observer, conn *amqp.Connection) *Publisher

NewPublisher creates a new Publisher instance with the specified configuration.

func (*Publisher) Close

func (p *Publisher) Close() error

Close terminates the publisher's channel connection.

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, exchange string, routingKey string, msg *amqp.Publishing) error

Publish sends a message to the specified exchange and routing key without waiting for confirmation. Returns an error if the publish operation fails.

func (*Publisher) PublishWithConfirmation

func (p *Publisher) PublishWithConfirmation(ctx context.Context, exchange string, routingKey string, msg *amqp.Publishing) error

PublishWithConfirmation sends a message and waits for broker confirmation. Enables publisher confirms mode on first call. Returns an error if the message is not acknowledged by the broker or if the publish operation fails.

func (*Publisher) Run

func (p *Publisher) Run() error

Run initializes the publisher's event watchers and registers it with the underlying publisher. Must be called before using the publisher.

This method starts a background watcher that monitors for channel errors and flow control events. If a precondition failed error occurs (e.g., due to topology changes), the watcher triggers automatic reconnection. The watcher runs until the channel is closed or an unrecoverable error occurs.

Directories

Path Synopsis
Package consumer provides a high-level consumer for receiving messages from RabbitMQ with support for concurrency, middleware, and retry policies.
Package consumer provides a high-level consumer for receiving messages from RabbitMQ with support for concurrency, middleware, and retry policies.
Package publisher provides a high-level publisher for sending messages to RabbitMQ with support for middleware and confirmation modes.
Package publisher provides a high-level publisher for sending messages to RabbitMQ with support for middleware and confirmation modes.
Package retry provides retry policies for failed message processing in consumers.
Package retry provides retry policies for failed message processing in consumers.
Package topology provides types and functions for defining RabbitMQ topology including exchanges, queues, and bindings.
Package topology provides types and functions for defining RabbitMQ topology including exchanges, queues, and bindings.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL