rabbitmq

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2025 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConnectionFactory

type ConnectionFactory struct {
	// contains filtered or unexported fields
}

ConnectionFactory RabbitMQ连接工厂

func NewConnectionFactory

func NewConnectionFactory(config config.RabbitMQConfig) *ConnectionFactory

NewConnectionFactory 创建连接工厂

func (*ConnectionFactory) CreateConnectionPool

func (f *ConnectionFactory) CreateConnectionPool(observer observability.Observer, recorder *observability.MetricsRecorder) (*ConnectionPool, error)

CreateConnectionPool 创建连接池

type ConnectionPool

type ConnectionPool struct {
	// contains filtered or unexported fields
}

ConnectionPool RabbitMQ连接池

func (*ConnectionPool) Close

func (p *ConnectionPool) Close() error

Close 关闭连接池

func (*ConnectionPool) GetChannel

func (p *ConnectionPool) GetChannel(ctx context.Context) (*amqp.Channel, error)

GetChannel 获取通道

func (*ConnectionPool) HealthCheck

func (p *ConnectionPool) HealthCheck(ctx context.Context) error

HealthCheck 健康检查

func (*ConnectionPool) ReturnChannel

func (p *ConnectionPool) ReturnChannel(ch *amqp.Channel)

ReturnChannel 归还通道

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer RabbitMQ消费者

func NewRabbitConsumer

func NewRabbitConsumer(
	pool *ConnectionPool,
	observer observability.Observer,
	recorder *observability.MetricsRecorder,
	config config.RabbitMQConfig,
	ser serializer.Serializer,
	keyGen *KeyGenerator,
	messagePool *pool.MessagePool,
	byteBufferPool *pool.ByteBufferPool,
) *Consumer

NewRabbitConsumer 创建RabbitMQ消费者

func (*Consumer) Close

func (c *Consumer) Close() error

Close 关闭消费者

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(ctx context.Context, topic string, handler message.Handler) error

Subscribe 订阅消息

func (*Consumer) Unsubscribe

func (c *Consumer) Unsubscribe(topic string) error

Unsubscribe 取消订阅

type DelayQueue

type DelayQueue struct {
	// contains filtered or unexported fields
}

DelayQueue RabbitMQ延时队列实现 - 使用x-delayed-message插件

func NewRabbitDelayQueue

func NewRabbitDelayQueue(
	pool *ConnectionPool,
	observer observability.Observer,
	recorder *observability.MetricsRecorder,
	config config.RabbitMQConfig,
	ser serializer.Serializer,
	keyGen *KeyGenerator,
) *DelayQueue

NewRabbitDelayQueue 创建RabbitMQ延时队列

func (*DelayQueue) Close

func (dq *DelayQueue) Close() error

Close 关闭延时队列

func (*DelayQueue) Pop

func (dq *DelayQueue) Pop(ctx context.Context) (*message.Message, error)

Pop 弹出到期消息 (RabbitMQ通过x-delayed-message插件自动处理)

func (*DelayQueue) Push

func (dq *DelayQueue) Push(ctx context.Context, msg *message.Message, delay time.Duration) error

Push 推送延时消息

func (*DelayQueue) Remove

func (dq *DelayQueue) Remove(ctx context.Context, msgID string) error

Remove 移除消息

func (*DelayQueue) Size

func (dq *DelayQueue) Size(ctx context.Context) (int64, error)

Size 获取队列大小

type KeyGenerator

type KeyGenerator struct {
	// contains filtered or unexported fields
}

KeyGenerator RabbitMQ键名生成器

func NewKeyGenerator

func NewKeyGenerator(keyPrefix string) *KeyGenerator

NewKeyGenerator 创建键名生成器

func (*KeyGenerator) ConsumerTag

func (kg *KeyGenerator) ConsumerTag(topic string) string

ConsumerTag 生成消费者标签

func (*KeyGenerator) DeadLetterExchangeName

func (kg *KeyGenerator) DeadLetterExchangeName() string

DeadLetterExchangeName 生成死信交换机名称

func (*KeyGenerator) DeadLetterQueueName

func (kg *KeyGenerator) DeadLetterQueueName(topic string) string

DeadLetterQueueName 生成死信队列名称

func (*KeyGenerator) DelayExchangeName

func (kg *KeyGenerator) DelayExchangeName() string

DelayExchangeName 生成延时交换机名称

func (*KeyGenerator) DelayQueueName

func (kg *KeyGenerator) DelayQueueName(topic string) string

DelayQueueName 生成延时队列名称

func (*KeyGenerator) ExchangeName

func (kg *KeyGenerator) ExchangeName() string

ExchangeName 生成交换机名称

func (*KeyGenerator) QueueName

func (kg *KeyGenerator) QueueName(topic string) string

QueueName 生成队列名称

func (*KeyGenerator) RoutingKey

func (kg *KeyGenerator) RoutingKey(topic string) string

RoutingKey 生成路由键

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer RabbitMQ生产者

func NewRabbitProducer

func NewRabbitProducer(
	pool *ConnectionPool,
	observer observability.Observer,
	recorder *observability.MetricsRecorder,
	config config.RabbitMQConfig,
	ser serializer.Serializer,
	keyGen *KeyGenerator,
	messagePool *pool.MessagePool,
	byteBufferPool *pool.ByteBufferPool,
) *Producer

NewRabbitProducer 创建RabbitMQ生产者

func (*Producer) Close

func (p *Producer) Close() error

Close 关闭生产者

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, msg *message.Message) error

Send 发送消息

func (*Producer) SendBatch

func (p *Producer) SendBatch(ctx context.Context, messages []*message.Message) error

SendBatch 批量发送消息

func (*Producer) SendDelay

func (p *Producer) SendDelay(ctx context.Context, msg *message.Message, delay time.Duration) error

SendDelay 发送延时消息 - 使用x-delayed-message插件

type RabbitMQ

type RabbitMQ struct {
	// contains filtered or unexported fields
}

RabbitMQ RabbitMQ消息队列实现

func NewRabbitMQ

func NewRabbitMQ(cfg config.RabbitMQConfig, observer observability.Observer, keyPrefix string) (*RabbitMQ, error)

NewRabbitMQ 创建RabbitMQ消息队列

func (*RabbitMQ) Close

func (r *RabbitMQ) Close() error

Close 关闭连接

func (*RabbitMQ) Consumer

func (r *RabbitMQ) Consumer() contract.Consumer

Consumer 获取消费者

func (*RabbitMQ) DelayQueue

func (r *RabbitMQ) DelayQueue() contract.DelayQueue

DelayQueue 获取延时队列

func (*RabbitMQ) HealthCheck

func (r *RabbitMQ) HealthCheck() error

HealthCheck 健康检查

func (*RabbitMQ) Producer

func (r *RabbitMQ) Producer() contract.Producer

Producer 获取生产者

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL