amqpx

package module
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2025 License: MIT Imports: 10 Imported by: 0

README

amqpx

GitHub license Go Reference Go Report Card codecov GitHub go.mod Go version of a Go module GitHub latest release

amqpx is a robust and easy to use wrapper for github.com/rabbitmq/amqp091-go.

Core features

  • connection & session (channel) pooling
  • reconnect handling
  • batch processing
  • pause/resume consumers
  • clean shutdown handling
  • sane defaults
  • resilience & robustness over performance by default (publisher & subscriber acks)
  • every default can be changed to your liking

This library is highly inspired by https://github.com/houseofcat/turbocookedrabbit

Requirements:

Getting started

go get github.com/jxsl13/amqpx@latest
Example
package main

import (
	"context"
	"fmt"
	"os/signal"
	"syscall"

	"github.com/jxsl13/amqpx"
	"github.com/jxsl13/amqpx/logging"
	"github.com/jxsl13/amqpx/pool"
	"github.com/jxsl13/amqpx/types"
)

func main() {
	ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer cancel()

	amqpx.RegisterTopologyCreator(func(ctx context.Context, t *pool.Topologer) error {
		// error handling omitted for brevity
		_ = t.ExchangeDeclare(ctx, "example-exchange", "topic") // durable exchange by default
		_, _ = t.QueueDeclare(ctx, "example-queue")             // durable quorum queue by default
		_ = t.QueueBind(ctx, "example-queue", "route.name.v1.event", "example-exchange")
		return nil
	})
	amqpx.RegisterTopologyDeleter(func(ctx context.Context, t *pool.Topologer) error {
		// error handling omitted for brevity
		_, _ = t.QueueDelete(ctx, "example-queue")
		_ = t.ExchangeDelete(ctx, "example-exchange")
		return nil
	})

	amqpx.RegisterHandler("example-queue", func(ctx context.Context, msg types.Delivery) error {
		fmt.Println("received message:", string(msg.Body))
		fmt.Println("canceling context")
		cancel()

		// return error for nack + requeue
		return nil
	})

	_ = amqpx.Start(
		ctx,
		amqpx.NewURL("localhost", 5672, "admin", "password"), // or amqp://username@password:localhost:5672
		amqpx.WithLogger(logging.NewNoOpLogger()),            // provide a logger that implements the logging.Logger interface
	)
	defer amqpx.Close()

	_ = amqpx.Publish(ctx, "example-exchange", "route.name.v1.event", types.Publishing{
		ContentType: "application/json",
		Body:        []byte("my test event"),
	})

	<-ctx.Done()
}

Example with optional paramters
package main

import (
	"context"
	"fmt"
	"os/signal"
	"syscall"

	"github.com/jxsl13/amqpx"
	"github.com/jxsl13/amqpx/logging"
	"github.com/jxsl13/amqpx/pool"
	"github.com/jxsl13/amqpx/types"
)

func SomeConsumer(cancel func()) pool.HandlerFunc {
	return func(ctx context.Context, msg types.Delivery) error {
		fmt.Println("received message:", string(msg.Body))
		fmt.Println("canceling context")
		cancel()

		// return error for nack + requeue
		return nil
	}
}

func main() {
	ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer cancel()

	amqpx.RegisterTopologyCreator(func(ctx context.Context, t *pool.Topologer) error {
		// error handling omitted for brevity

		_ = t.ExchangeDeclare(ctx, "example-exchange", "topic",
			types.ExchangeDeclareOptions{
				Durable: true,
			},
		)
		_, _ = t.QueueDeclare(ctx, "example-queue",
			types.QueueDeclareOptions{
				Durable: true,
				Args:    types.QuorumQueue,
			},
		)
		t.QueueBind(ctx, "example-queue", "route.name.v1.event", "example-exchange")
		return nil
	})
	amqpx.RegisterTopologyDeleter(func(ctx context.Context, t *pool.Topologer) error {
		// error handling omitted for brevity
		_, _ = t.QueueDelete(ctx, "example-queue")
		_ = t.ExchangeDelete(ctx, "example-exchange")
		return nil
	})

	amqpx.RegisterHandler("example-queue",
		SomeConsumer(cancel),
		types.ConsumeOptions{
			ConsumerTag: "example-queue-cunsumer",
			Exclusive:   true,
		},
	)

	_ = amqpx.Start(
		ctx,
		amqpx.NewURL("localhost", 5672, "admin", "password"), // or amqp://username@password:localhost:5672
		amqpx.WithLogger(logging.NewNoOpLogger()),            // provide a logger that implements the logging.Logger interface (logrus adapter is provided)
	)
	defer amqpx.Close()

	_ = amqpx.Publish(ctx, "example-exchange", "route.name.v1.event", types.Publishing{
		ContentType: "application/json",
		Body:        []byte("my test event"),
	})

	<-ctx.Done()
}

Types

The amqpx package provides a single type which incoorporates everything needed for consuming and publishing messages.

The pool package provides all of the implementation details .

amqpx.AMQPX

The AMQPX struct consists at least one connection pool, a Publisher, a Subscriber and a Topologer. Upon Start(..) and upon Close() a Topologer is created which creates the topology or destroys a topology based on one or multiple functions that were registered via RegisterTopologyCreator or RegisterTopologyDeleter. After the topology has been created, a Publisher is instantiated from a publisher connection and session Pool. The Publisher can be used to publish messages to specific exchanges with a given routing key. In case you register an event handler function via RegisterHandler or RegisterBatchHandler, then another connection and session Pool is created which is then used to instantiate a Subscriber. The Subscriber communicates via one or multiple separate TCP connections in order to prevent interference between the Publisher and Subscriber (tcp pushback).

The amqpx package defines a global variable that allows the package amqpx to be used like the AMQPX object.

pool.Topologer

The Topologer allows to create, delete, bind or unbind exchanges or queues

pool.Publisher

The Publisher allows to publish individual events or messages to exchanges with a given routing key.

pool.Subscriber

The Subscriber allows to register event handler functions that consume messages from individual queues. A Subscriber must be Start()ed in order for it to create consumer goroutines that process events from broker queues.

Development

Tests can all be run in parallel but the parallel testing is disabled for now because of the GitHub runners starting to behave weirdly when under such a load. That is why those tests were disabled for the CI pipeline.

Test flags you might want to add:

go test -v -race -count=1 ./...
  • see test logs
  • detect data races
  • do not cache test results

Starting the tests:

go test -v -race -count=1 ./...
Test environment
  • Requires docker (and docker compose subcommand)

Starting the test environment:

make environment
#or
docker compose up -d

The test environment looks like this:

Web interfaces:

127.0.0.1:5670 	-> rabbitmq-broken:5672 	# out of memory rabbitmq
127.0.0.1:5671 	-> rabbitmq:5672 			# healthy rabbitmq connection which is never disconnected


127.0.0.1:5672	-> toxiproxy:5672	-> rabbitmq:5672	# connection which is disconnected by toxiproxy
127.0.0.1:5673	-> toxiproxy:5673	-> rabbitmq:5672	# connection which is disconnected by toxiproxy
127.0.0.1:5674	-> toxiproxy:5674	-> rabbitmq:5672	# connection which is disconnected by toxiproxy
...
127.0.0.1:5771	-> toxiproxy:5771	-> rabbitmq:5672	# connection which is disconnected by toxiproxy

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Close

func Close() error

func Get

func Get(ctx context.Context, queue string, autoAck bool) (msg types.Delivery, ok bool, err error)

Get is only supposed to be used for testing, do not use get for polling any broker queues.

func NewURL

func NewURL(hostname string, port int, username, password string, vhost ...string) string

NewURL creates a new connection string for the NewSessionFactory hostname: e.g. localhost port: e.g. 5672 username: e.g. username password: e.g. password vhost: e.g. "" or "/"

func Publish

func Publish(ctx context.Context, exchange string, routingKey string, msg types.Publishing) error

Publish a message to a specific exchange with a given routingKey. You may set exchange to "" and routingKey to your queue name in order to publish directly to a queue.

func RegisterBatchHandler added in v0.7.0

func RegisterBatchHandler(queue string, handlerFunc pool.BatchHandlerFunc, option ...pool.BatchHandlerOption) *pool.BatchHandler

RegisterBatchHandler registers a handler function for a specific queue that processes batches. consumer can be set to a unique consumer name (if left empty, a unique name will be generated)

func RegisterHandler

func RegisterHandler(queue string, handlerFunc pool.HandlerFunc, option ...types.ConsumeOptions) *pool.Handler

RegisterHandler registers a handler function for a specific queue. consumer can be set to a unique consumer name (if left empty, a unique name will be generated) The returned handler can be used to pause message processing and resume paused processing. The processing must have been started with Start before it can be paused or resumed.

func RegisterTopologyCreator

func RegisterTopologyCreator(topology TopologyFunc)

RegisterTopology registers a topology creating function that is called upon Start. The creation of topologie sis the first step before any publisher or subscriber is started.

func RegisterTopologyDeleter

func RegisterTopologyDeleter(finalizer TopologyFunc)

RegisterTopologyDeleter registers a topology finalizer that is executed at the end of amqpx.Close().

func Reset

func Reset() error

Reset closes the current package and resets its state before it was initialized and started.

func Start

func Start(ctx context.Context, connectUrl string, options ...Option) (err error)

Start starts the subscriber and publisher pools. In case no handlers were registered, no subscriber pool will be started. connectUrl has the form: amqp://username:password@localhost:5672 pubSessions is the number of pooled sessions (channels) for the publisher. options are optional pool connection options. They might also contain publisher specific settings like publish confirmations or a custom context which can signal an application shutdown. This customcontext does not replace the Close() call. Always defer a Close() call. Start is a non-blocking operation. The startup context may differ from the cancelation context provided via the options.

Types

type AMQPX added in v0.3.0

type AMQPX struct {
	// contains filtered or unexported fields
}

func New added in v0.3.0

func New() *AMQPX

func (*AMQPX) Close added in v0.3.0

func (a *AMQPX) Close() error

func (*AMQPX) Get added in v0.3.0

func (a *AMQPX) Get(ctx context.Context, queue string, autoAck bool) (msg types.Delivery, ok bool, err error)

Get is only supposed to be used for testing, do not use get for polling any broker queues.

func (*AMQPX) Publish added in v0.3.0

func (a *AMQPX) Publish(ctx context.Context, exchange string, routingKey string, msg types.Publishing) error

Publish a message to a specific exchange with a given routingKey. You may set exchange to "" and routingKey to your queue name in order to publish directly to a queue.

func (*AMQPX) RegisterBatchHandler added in v0.6.0

func (a *AMQPX) RegisterBatchHandler(queue string, handlerFunc pool.BatchHandlerFunc, option ...pool.BatchHandlerOption) *pool.BatchHandler

RegisterBatchHandler registers a handler function for a specific queue that processes batches. consumer can be set to a unique consumer name (if left empty, a unique name will be generated)

func (*AMQPX) RegisterHandler added in v0.3.0

func (a *AMQPX) RegisterHandler(queue string, handlerFunc pool.HandlerFunc, option ...types.ConsumeOptions) *pool.Handler

RegisterHandler registers a handler function for a specific queue. consumer can be set to a unique consumer name (if left empty, a unique name will be generated)

func (*AMQPX) RegisterTopologyCreator added in v0.3.0

func (a *AMQPX) RegisterTopologyCreator(topology TopologyFunc)

RegisterTopology registers a topology creating function that is called upon Start. The creation of topologie sis the first step before any publisher or subscriber is started.

func (*AMQPX) RegisterTopologyDeleter added in v0.3.0

func (a *AMQPX) RegisterTopologyDeleter(finalizer TopologyFunc)

RegisterTopologyDeleter registers a topology finalizer that is executed at the end of amqpx.Close().

func (*AMQPX) Reset added in v0.3.0

func (a *AMQPX) Reset() error

Reset closes the current package and resets its state before it was initialized and started.

func (*AMQPX) Start added in v0.3.0

func (a *AMQPX) Start(ctx context.Context, connectUrl string, options ...Option) (err error)

Start starts the subscriber and publisher pools. In case no handlers were registered, no subscriber pool will be started. connectUrl has the form: amqp://username:password@localhost:5672 pubSessions is the number of pooled sessions (channels) for the publisher. options are optional pool connection options. They might also contain publisher specific settings like publish confirmations or a custom context which can signal an application shutdown. This customcontext does not replace the Close() call. Always defer a Close() call. Start is a non-blocking operation.

type Option

type Option func(*option)

func WithBufferCapacity added in v0.8.0

func WithBufferCapacity(capacity int) Option

WithBufferCapacity allows to configurethe size of the confirmation, error & blocker buffers of all sessions

func WithCloseTimeout added in v0.7.0

func WithCloseTimeout(timeout time.Duration) Option

WithCloseTimeout affects the duration that the topology deleter functions are allowed to delete topologies. This timeout is especially interesting for containerized environments where containers may potentionally be killed after a specific timeout. To we want to cancel deletion operations before those hard kill comes into play.

func WithConfirms

func WithConfirms(requirePublishConfirms bool) Option

WithConfirms requires all messages from sessions to be acked. This affects publishers.

func WithConnectionTimeout

func WithConnectionTimeout(timeout time.Duration) Option

WithConnectionTimeout allows to set a custom connection timeout, that MUST be >= 1 * time.Second

func WithHeartbeatInterval

func WithHeartbeatInterval(interval time.Duration) Option

WithHeartbeatInterval allows to set a custom heartbeat interval, that MUST be >= 1 * time.Second

func WithLogger

func WithLogger(logger logging.Logger) Option

WithLogger allows to set a custom logger for the connection AND session pool

func WithName

func WithName(name string) Option

WithName gives all of your pooled connections a prefix name

func WithPoolOption

func WithPoolOption(po pool.Option) Option

WithPoolOption is a functionthat allows to directly manipulate the options of the underlying pool. DO NOT USE this option unless you have read the source code enough in order to understand what configutaion options influence what behavior. This might make sense if you want to change the pool name prefix or suffix.

func WithPublisherConnections

func WithPublisherConnections(connections int) Option

WithPublisherConnections defines the number of tcp connections of the publisher.

func WithPublisherSessions

func WithPublisherSessions(sessions int) Option

WithPublisherSessions defines the number of multiplexed sessions for all connections. Meaning, if you have 1 connection and two sessions, every connectionhas two sessions. If you have two connections and two sessions, every connection gets one session. Every connection gets a session assigned to it in a round robin manner.

func WithSubscriberConnections

func WithSubscriberConnections(connections int) Option

WithSubscriberConnections defines the number connections all of the consumer sessions share. Meaning, if you have registered 10 handlers and define 5 connections, every connection has two sessions that are multiplexed over it. If you have 1 connection, all consumers will derive sessions from that connection in order to consume from the specified queue. You cannot have less than one connection, nor can you havemore connections than handlers, as there can at most be one (tcp) connection with one session per handler.

func WithTLS

func WithTLS(config *tls.Config) Option

WithTLS allows to configure tls connectivity.

type TopologyFunc

type TopologyFunc func(context.Context, *pool.Topologer) error

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL