gorabbit

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2023 License: Apache-2.0 Imports: 4 Imported by: 0

README

GoRabbit

GoRabbit provides additional capabilities for official RabbitMQ Go client amqp091-go.

Connection re-dialing

Add re-dialing capabilities when connection got closed, you can reliably open new channel:

conn, _ := connection.Dial("amqp://localhost:5672")
ch, _ := conn.Channel() // use as regular connection, but it will re-dial if connection is closed 
ch.Publish(...)

See full example at examples/connection.

Channel re-opening

Add channel re-opening capabilities on the top of a plain channel, so you can get reliably consume or publish, even is something bad happened over the network.

conn, _ := connection.Dial("amqp://localhost:5672")
ch, _ := channel.New(conn) // add re-opening capabilities
// get constant flow of deliveries, channel will be re-opened if something goes south
deliveries := ch.Consume("example-queue", "", false, false, false, false, nil)
for d := range deliveries {
    // process deliveries
}
// or publish reliably
ch.PublishWithContext(ctx, "example-exchange", "", false, false, amqp.Publishing{})

See full example at examples/channel.

Consumers

GoRabbit provides consumer, which simplifies creation of RabbitMQ consumer. It aims to provide sane defaults, but at the same time is fully configurable:

conn, _ := connection.Dial("amqp://localhost:5672")
ch, _ := channel.New(conn) 
consumer := gorabbit.NewConsumer(channel, "example-queue")
consumer.Start(context.Background(), gorabbit.ProcessorFunc(func(ctx context.Context, deliveries <-chan amqp.Delivery) error {
    for d := range deliveries {
        // process deliveries
    }
    return nil
}))

See full example at examples/consumer.

To simplify it even more, GoRabbit is providing process building blocks: ByOne and InBatches. All you need is to write a function for handling received messages: if it returns error, processor will automatically NACK message, ACK on success.

One

One consumer reads messages one-by-one and passes them to Transaction. Will ACK them on success, NACK on error.

Example of usage is in examples/one.

One Middlewares

Easily plug-in any middlewares and pass them to process.ByOne:

process.ByOne(myHandler, true, middleware.NewErrorLogging(zerolog.New(os.Stderr))) 

Or implement your own with simple API:

type Middleware func(DeliveryHandler) DeliveryHandler

Check out more examples at process/middleware/one.go.

Batch

If you want to process messages in batches, Batch processor is here for you.

Batch processor reads a batch of messages from a broker and passes them to BatchTransaction. Your code expected to return one-to-one errors for each passed message (len(messages) == len(errors)).

Consumer will ACK each of messages on success, NACK on error.

Example of usage is in examples/batch.

Batch Middlewares

Plug in any middlewares and pass them to the Batch consumer, implementing simple API:

process.InBatches(myHandler, true, middleware.NewBatchErrorLogging(zerolog.New(os.Stderr))) 

Or implement your own with simple API:

type BatchMiddleware func(BatchDeliveryHandler) BatchDeliveryHandler

Check out more examples at process/middleware/batch.go.

Testing

GoRabbit comes with testing helpers in a form of stretchr/testify/suite testing suite, with prepared RabbitMQ connection, channel or topology.

type TestSuite struct {
  rabbittest.ChannelSuite
}

func (s *TestSuite) TestMyFunction() {
  s.Channel.Publish(...)
}

Set up RABBITMQ_URL environment variable and tun tests:

RABBITMQ_URL=amqp://localhost:5672 go test -v  ./...

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel interface {
	Consume(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table) <-chan amqp.Delivery
	Cancel(consumer string, noWait bool) error
	Close() error
}

Channel is a RabbitMQ channel opened for consuming deliveries.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is Consumer for RabbiMQ. Will automatically recreate channel on channel errors. Reconnection is done with exponential backoff.

func NewConsumer

func NewConsumer(ch Channel, queue string, ops ...Option) Consumer

NewConsumer creates new RabbitMQ Consumer. Default configuration will create a durable queue and is ready for competing consumers.

An empty consumer name will cause the library to generate a unique identity. An empty queue name will cause the broker to generate a unique name https://www.rabbitmq.com/queues.html#server-named-queues.

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context, processor Processor) error

Start consuming messages and pass them to Processor. If autoAck is not set, will Reject messages if Processor returns error, otherwise Ack them. Call Stop to stop consuming.

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop consuming, wait for all in-flight messages to be processed and close a channel.

type Option

type Option func(c *consumeCfg)

Option allows to configure RabbitMQ Consumer. Please refer to https://pkg.go.dev/github.com/rabbitmq/amqp091-go?utm_source=godoc#Channel.Consume.

func WithConsumeArgs

func WithConsumeArgs(args amqp.Table) Option

WithConsumeArgs sets additional arguments for consuming.

func WithConsumeAutoAck

func WithConsumeAutoAck() Option

WithConsumeAutoAck sets the server to acknowledge deliveries to this consumer prior to writing the delivery to the network.

func WithConsumeExclusive

func WithConsumeExclusive() Option

WithConsumeExclusive sets the server to ensure that this is the sole consumer from this queue.

func WithConsumeNoWait

func WithConsumeNoWait() Option

WithConsumeNoWait sets the server to not wait to confirm the request and immediately begin deliveries.

func WithConsumerTag

func WithConsumerTag(tag string) Option

WithConsumerTag sets consumer consumerTag. Otherwise, library will generate a unique identity.

type ProcessFunc

type ProcessFunc func(ctx context.Context, deliveries <-chan amqp.Delivery) error

ProcessFunc type is an adapter to allow the use of ordinary functions as Processor.

func (ProcessFunc) Process

func (f ProcessFunc) Process(ctx context.Context, deliveries <-chan amqp.Delivery) error

Process implements Processor.

type Processor

type Processor interface {
	Process(ctx context.Context, deliveries <-chan amqp.Delivery) error
}

Processor consumes all provided deliveries.

Directories

Path Synopsis
examples
batch command
channel command
connection command
consumer command
one
Package rabbittest provides "stretchr/testify/suite" testing suite with prepared RabbitMQ connection, channel or topology.
Package rabbittest provides "stretchr/testify/suite" testing suite with prepared RabbitMQ connection, channel or topology.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL