queue

package
v1.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CalculateRetryDelay

func CalculateRetryDelay(retryCount int, config RetryConfig) time.Duration

CalculateRetryDelay calculates delay with exponential backoff

func GetRetryCount

func GetRetryCount(msg amqp.Delivery) int

GetRetryCount extracts the retry count from message headers

func ScheduleRetry

func ScheduleRetry(
	channel *amqp.Channel,
	config *Config,
	body []byte,
	headers amqp.Table,
	delay time.Duration,
)

ScheduleRetry schedules a message for retry with delay

Types

type Config

type Config struct {
	ExchangeName    string
	ExchangeType    string // "direct", "topic", "fanout", "headers" - defaults to "direct"
	QueueName       string
	RoutingKey      string
	DLXExchangeName string
	DLQName         string
	DLQRoutingKey   string
}

Config holds the configuration for RabbitMQ queues and exchanges

func (*Config) GetQueueArguments

func (qc *Config) GetQueueArguments() amqp.Table

GetQueueArguments returns the queue arguments for both main queue and DLQ

func (*Config) SetupAllQueues

func (qc *Config) SetupAllQueues(ch *amqp.Channel) error

SetupAllQueues sets up all exchanges and queues

func (*Config) SetupDeadLetterExchange

func (qc *Config) SetupDeadLetterExchange(ch *amqp.Channel) error

SetupDeadLetterExchange declares the dead letter exchange

func (*Config) SetupDeadLetterQueue

func (qc *Config) SetupDeadLetterQueue(ch *amqp.Channel) error

SetupDeadLetterQueue declares the dead letter queue

func (*Config) SetupExchange

func (qc *Config) SetupExchange(ch *amqp.Channel) error

SetupExchange declares the main exchange

func (*Config) SetupMainQueue

func (qc *Config) SetupMainQueue(ch *amqp.Channel) error

SetupMainQueue declares the main queue with DLQ configuration

type ConnectionConfig

type ConnectionConfig struct {
	Host     string
	Port     string
	Username string
	Password string
	VHost    string
}

ConnectionConfig holds RabbitMQ connection details

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a RabbitMQ consumer with automatic reconnection

func NewConsumer

func NewConsumer(connConfig ConnectionConfig, queueConfig *Config, retryConfig RetryConfig, handler MessageHandler) (*Consumer, error)

NewConsumer creates a new RabbitMQ consumer with automatic reconnection

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the consumer and its connections

func (*Consumer) IsConnected

func (c *Consumer) IsConnected() bool

IsConnected returns true if the consumer has a valid connection

func (*Consumer) StartConsuming

func (c *Consumer) StartConsuming() error

StartConsuming starts consuming messages from the queue

type MessageHandler

type MessageHandler func(msg amqp.Delivery) error

MessageHandler is a function that processes a message It should return an error if processing failed and retry is needed

type Producer added in v1.7.0

type Producer struct {
	// contains filtered or unexported fields
}

Producer is a RabbitMQ producer with automatic reconnection

func NewProducer added in v1.7.0

func NewProducer(connConfig ConnectionConfig, queueConfig *Config) (*Producer, error)

NewProducer creates a new RabbitMQ producer with automatic reconnection

func (*Producer) Close added in v1.7.0

func (p *Producer) Close() error

Close closes the producer and its connections

func (*Producer) IsConnected added in v1.7.0

func (p *Producer) IsConnected() bool

IsConnected returns true if the producer has a valid connection

func (*Producer) Publish added in v1.7.0

func (p *Producer) Publish(ctx context.Context, body []byte, headers amqp.Table) error

Publish publishes a message to the queue

func (*Producer) PublishWithRoutingKey added in v1.12.0

func (p *Producer) PublishWithRoutingKey(ctx context.Context, body []byte, headers amqp.Table, routingKey string) error

PublishWithRoutingKey publishes a message to the queue with a custom routing key

type RetryConfig

type RetryConfig struct {
	MaxRetries     int
	RetryDelayBase int
	MaxRetryDelay  int
}

RetryConfig holds retry configuration

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL