gorabbit

package module
v0.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2024 License: Apache-2.0 Imports: 5 Imported by: 0

README

Go Reference GitHub tag (with filter) GitHub Workflow Status (with event)

GoRabbit

GoRabbit provides additional capabilities for official RabbitMQ Go client amqp091-go.

Quick start example:

consumer, _ := gorabbit.NewConsumer("amqp://localhost:5672", "my-queue")
handleMessage := func(ctx context.Context, message []byte) error {
    log.Println(message)
    return nil
}
consumer.Start(context.Background(), consume.ByOne(handleMessage, false))

This will create consumer which will connect to RabbitMQ on localhost, start consuming messages one-by-one from "my-queue" and pass them to handleMessage function.

It will re-dial if connection got broken and re-open channel if it will be closed.

You can use prepared consumers and publishers, or just re-dialing of connection and re-opening of a channel.

Consumer

GoRabbit provides consumer, which simplifies creation of RabbitMQ consumer. It aims to provide sane defaults, but at the same time is fully configurable:

cons, _ := gorabbit.NewConsumer("amqp://localhost:5672", "example-queue")
cons.Start(context.Background(), consumer.ProcessorFunc(func(ctx context.Context, deliveries <-chan amqp.Delivery) error {
    for d := range deliveries {
        // process deliveries
    }
    return nil
}))

See full example at examples/consumer.

To simplify it even more, GoRabbit is providing consume building blocks: ByOne and InBatches. All you need is to write a function for handling received messages: if function returns error, it will automatically NACK message, or ACK on success.

One

One consume reads messages one-by-one and passes them to Transaction. Will ACK them on success, NACK on error.

Example of usage is in examples/one.

One Middlewares

Easily plug-in any middlewares and pass them to consume.ByOne:

consume.ByOne(tx, false, one.NewDeliveryLogging(zerolog.New(os.Stdout)))

Or implement your own with simple API:

type Middleware func(DeliveryHandler) DeliveryHandler

Check out more examples at consume/middleware/one.go.

Batch

If you want to process messages in batches, Batch consume is here for you.

Batch processor reads a batch of messages from a broker and passes them to BatchTransaction. Your code expected to return one-to-one errors for each passed message (len(messages) == len(errors)).

Batch processor will ACK each of messages on success, NACK on error.

Example of usage is in examples/batch.

Batch Middlewares

Plug in any middlewares and pass them to the Batch consumer, implementing simple API:

consume.InBatches(100, time.Second, tx, false, batch.NewDeliveryLogging(zerolog.New(os.Stdout)))

Or implement your own with simple API:

type BatchMiddleware func(BatchDeliveryHandler) BatchDeliveryHandler

Check out more examples at consume/middleware/batch.go.

Publisher

GoRabbit provides publisher, which simplifies creation of RabbitMQ publisher. It aims to provide sane defaults, but at the same time is fully configurable:

pub, _ := gorabbit.NewPublisher("amqp://localhost:5672", "example-exchange")
pub.Publish(context.Background(), "example-key", []byte("hello world!"))

See full example at examples/publisher.

Connection re-dialing

Add re-dialing capabilities when connection got closed:

conn, _ := connection.Dial("amqp://localhost:5672")
ch, _ := conn.Channel() // use as regular connection, but it will re-dial if connection is closed 
ch.Publish(...)

See full example at examples/connection.

Channel re-opening

Add channel re-opening capabilities on the top of a plain channel, so you can reliably consume or publish, even is something bad happened over the network:

conn, _ := connection.Dial("amqp://localhost:5672")
ch, _ := conn.Channel(conn) // open channel with re-opening capabilities
// get constant flow of deliveries, channel will be re-opened if something goes south
for delivery := range ch.Consume("example-queue", "", false, false, false, false, nil) {
    // process deliveries
}
// or publisher reliably
ch.PublishWithContext(ctx, "example-exchange", "", false, false, amqp.Publishing{})

See full example at examples/channel.

Testing

GoRabbit comes with testing helpers in a form of stretchr/testify/suite testing suite, with prepared RabbitMQ connection, channel or topology.

type TestSuite struct {
  rabbittest.ChannelSuite
}

func (s *TestSuite) TestMyFunction() {
  s.Channel.Publish(...)
}

Set up RABBITMQ_URL environment variable and tun tests:

RABBITMQ_URL=amqp://localhost:5672 go test -v  ./...

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewConsumer

func NewConsumer(url string, queue string, ops ...consumer.Option) (*consumer.Consumer, error)

NewConsumer creates a new consumer from RabbitMQ, which will consume from a queue. Will automatically re-open channel on channel errors. Reconnection is done with exponential backoff.

func NewPublisher added in v0.5.0

func NewPublisher(url string, exchange string, mws ...publisher.Middleware) (publisher.Publisher, error)

NewPublisher creates a new published to RabbitMQ, which will publish to exchange. Will automatically re-open channel on channel errors. Reconnection is done with exponential backoff.

Types

This section is empty.

Directories

Path Synopsis
one
examples
batch command
channel command
connection command
consumer command
one
publisher command
Package rabbittest provides "stretchr/testify/suite" testing suite with prepared RabbitMQ connection, channel or topology.
Package rabbittest provides "stretchr/testify/suite" testing suite with prepared RabbitMQ connection, channel or topology.
v2

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL