consumer

package
v0.5.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2024 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel

type Channel interface {
	Consume(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table) <-chan amqp.Delivery
	Cancel(consumer string, noWait bool) error
	Close() error
}

Channel is a RabbitMQ channel opened for consuming deliveries.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is Consumer for RabbiMQ. Will automatically recreate channel on channel errors. Reconnection is done with exponential backoff.

func New

func New(channel Channel, queue string, ops ...Option) *Consumer

New creates new RabbitMQ Consumer. By default, it will consume with autoAck=false, exclusive=false, noWait=false, and empty args. Pass Options to configure it as you wish.

An empty consumer name will cause the library to generate a unique identity. An empty queue name will cause the broker to generate a unique name https://www.rabbitmq.com/queues.html#server-named-queues.

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context, processor Processor) error

Start consuming messages and pass them to Processor. If autoAck is not set, will Reject messages if Processor returns error, otherwise Ack them. Call Stop to stop consuming.

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop consuming, wait for all in-flight messages to be processed and close a channel.

type Option

type Option func(c *consumeCfg)

Option allows to configure RabbitMQ Consumer. Please refer to https://pkg.go.dev/github.com/rabbitmq/amqp091-go?utm_source=godoc#Channel.Consume.

func WithArgs

func WithArgs(args amqp.Table) Option

WithArgs sets additional arguments for consuming.

func WithAutoAck

func WithAutoAck() Option

WithAutoAck sets the server to acknowledge deliveries to this consumer prior to writing the delivery to the network.

func WithConsumerTag

func WithConsumerTag(tag string) Option

WithConsumerTag sets consumer consumerTag. Otherwise, library will generate a unique identity.

func WithExclusive

func WithExclusive() Option

WithExclusive sets the server to ensure that this is the sole consumer from this queue.

func WithNoWait

func WithNoWait() Option

WithNoWait sets the server to not wait to confirm the request and immediately begin deliveries.

type ProcessFunc

type ProcessFunc func(ctx context.Context, deliveries <-chan amqp.Delivery) error

ProcessFunc type is an adapter to allow the use of ordinary functions as Processor.

func (ProcessFunc) Process

func (f ProcessFunc) Process(ctx context.Context, deliveries <-chan amqp.Delivery) error

Process implements Processor.

type Processor

type Processor interface {
	Process(ctx context.Context, deliveries <-chan amqp.Delivery) error
}

Processor consumes all provided deliveries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL