consumer

package
v1.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: MIT Imports: 7 Imported by: 5

Documentation

Overview

Package consumer provides a high-level consumer for receiving messages from RabbitMQ with support for concurrency, middleware, and retry policies.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrDeliveryAlreadyHandled is returned when attempting to handle an already processed delivery.
	ErrDeliveryAlreadyHandled = errors.New("delivery already handled")
)

Functions

This section is empty.

Types

type Closer

type Closer interface {
	Close()
}

Closer defines an interface for closing resources.

type Consumer

type Consumer struct {
	Queue         string
	Name          string
	Concurrency   int
	PrefetchCount int
	Middlewares   []Middleware
	RetryPolicy   *retry.Policy
	Closer        Closer
	// contains filtered or unexported fields
}

Consumer represents a message consumer with configurable queue, concurrency, and middleware support.

func New

func New(handler Handler, queue string, opts ...Option) Consumer

New creates a new Consumer with the specified handler and queue. A unique name base on os.Args[0] is automatically generated if not provided. Optional configuration can be provided using Option functions.

func (*Consumer) Handler

func (c *Consumer) Handler() Handler

Handler returns the configured message handler.

type Delivery

type Delivery struct {
	// contains filtered or unexported fields
}

Delivery wraps an AMQP delivery with acknowledgment and retry capabilities.

func NewDelivery

func NewDelivery(donner Donner, source *amqp.Delivery, retrier *retry.Retryer) *Delivery

NewDelivery creates a new Delivery instance with the specified configuration.

func (*Delivery) Ack

func (d *Delivery) Ack() error

Ack Acknowledges the delivery, marking it as successfully processed and removing it from the queue. The delivery must be acknowledged exactly once. Subsequent calls will return ErrDeliveryAlreadyHandled. Returns an error if the broker fails to acknowledge the delivery (e.g., connection issues).

After Ack returns (either successfully or with an error), the delivery is considered handled and no further acknowledgment operations should be performed on it.

func (*Delivery) Nack

func (d *Delivery) Nack(requeue bool) error

Nack negatively acknowledges the delivery, indicating processing failed. The requeue parameter controls whether the message is returned to the queue immediately (true) or sent to the dead-letter queue (false). Returns an error if the delivery has already been handled or if the broker rejects the nack.

After Nack returns (either successfully or with an error), the delivery is considered handled and no further acknowledgment operations should be performed on it.

func (*Delivery) Retry

func (d *Delivery) Retry() error

Retry attempts to redeliver the message according to the configured retry policy. If a retry policy is configured, the message is published to a retry queue with a delay. If no retry policy is configured, the message is requeued via Nack.

Returns ErrDeliveryAlreadyHandled if the delivery has already been processed. Returns an error if the retry operation fails (e.g., publishing to retry queue fails).

Note: When Retry returns an error, the message has NOT been acknowledged and will need to be handled again by the caller.

func (*Delivery) Source

func (d *Delivery) Source() *amqp.Delivery

Source returns the underlying AMQP delivery.

type Donner

type Donner interface {
	Done()
}

Donner defines an interface for signaling completion of delivery processing.

type Handler

type Handler interface {
	Handle(ctx context.Context, delivery *Delivery)
}

Handler defines an interface for processing message deliveries.

type HandlerFunc

type HandlerFunc func(ctx context.Context, delivery *Delivery)

HandlerFunc is an adapter that allows regular functions to be used as Handler.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(ctx context.Context, delivery *Delivery)

Handle implements the Handler interface.

type Middleware

type Middleware func(next Handler) Handler

Middleware is a function that wraps a Handler, enabling request processing chains.

type Option

type Option func(c *Consumer)

Option is a function that configures a Consumer instance.

func WithCloser

func WithCloser(closer Closer) Option

WithCloser sets the resource closer for the consumer.

func WithConcurrency

func WithConcurrency(concurrency int) Option

WithConcurrency sets the number of concurrent workers processing messages.

func WithMiddlewares

func WithMiddlewares(middlewares ...Middleware) Option

WithMiddlewares sets the middleware chain for the consumer. Middlewares are executed in the order they are provided.

func WithName

func WithName(name string) Option

WithName sets the consumer name.

func WithPrefetchCount

func WithPrefetchCount(prefetchCount int) Option

WithPrefetchCount sets the maximum number of unacknowledged messages.

func WithRetryPolicy

func WithRetryPolicy(policy retry.Policy) Option

WithRetryPolicy configures the retry policy for failed message processing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL