Documentation
¶
Index ¶
- type ConnectionFactory
- type ConnectionPool
- type Consumer
- type DelayQueue
- func (dq *DelayQueue) Close() error
- func (dq *DelayQueue) Pop(ctx context.Context) (*message.Message, error)
- func (dq *DelayQueue) Push(ctx context.Context, msg *message.Message, delay time.Duration) error
- func (dq *DelayQueue) Remove(ctx context.Context, msgID string) error
- func (dq *DelayQueue) Size(ctx context.Context) (int64, error)
- type KeyGenerator
- func (kg *KeyGenerator) ConsumerTag(topic string) string
- func (kg *KeyGenerator) DeadLetterExchangeName() string
- func (kg *KeyGenerator) DeadLetterQueueName(topic string) string
- func (kg *KeyGenerator) DelayExchangeName() string
- func (kg *KeyGenerator) DelayQueueName(topic string) string
- func (kg *KeyGenerator) ExchangeName() string
- func (kg *KeyGenerator) QueueName(topic string) string
- func (kg *KeyGenerator) RoutingKey(topic string) string
- type Producer
- type RabbitMQ
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConnectionFactory ¶
type ConnectionFactory struct {
// contains filtered or unexported fields
}
ConnectionFactory RabbitMQ连接工厂
func NewConnectionFactory ¶
func NewConnectionFactory(config config.RabbitMQConfig) *ConnectionFactory
NewConnectionFactory 创建连接工厂
func (*ConnectionFactory) CreateConnectionPool ¶
func (f *ConnectionFactory) CreateConnectionPool(observer observability.Observer, recorder *observability.MetricsRecorder) (*ConnectionPool, error)
CreateConnectionPool 创建连接池
type ConnectionPool ¶
type ConnectionPool struct {
// contains filtered or unexported fields
}
ConnectionPool RabbitMQ连接池
func (*ConnectionPool) GetChannel ¶
GetChannel 获取通道
func (*ConnectionPool) HealthCheck ¶
func (p *ConnectionPool) HealthCheck(ctx context.Context) error
HealthCheck 健康检查
func (*ConnectionPool) ReturnChannel ¶
func (p *ConnectionPool) ReturnChannel(ch *amqp.Channel)
ReturnChannel 归还通道
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer RabbitMQ消费者
func NewRabbitConsumer ¶
func NewRabbitConsumer( pool *ConnectionPool, observer observability.Observer, recorder *observability.MetricsRecorder, config config.RabbitMQConfig, ser serializer.Serializer, keyGen *KeyGenerator, messagePool *pool.MessagePool, byteBufferPool *pool.ByteBufferPool, ) *Consumer
NewRabbitConsumer 创建RabbitMQ消费者
type DelayQueue ¶
type DelayQueue struct {
// contains filtered or unexported fields
}
DelayQueue RabbitMQ延时队列实现 - 使用x-delayed-message插件
func NewRabbitDelayQueue ¶
func NewRabbitDelayQueue( pool *ConnectionPool, observer observability.Observer, recorder *observability.MetricsRecorder, config config.RabbitMQConfig, ser serializer.Serializer, keyGen *KeyGenerator, ) *DelayQueue
NewRabbitDelayQueue 创建RabbitMQ延时队列
type KeyGenerator ¶
type KeyGenerator struct {
// contains filtered or unexported fields
}
KeyGenerator RabbitMQ键名生成器
func (*KeyGenerator) ConsumerTag ¶
func (kg *KeyGenerator) ConsumerTag(topic string) string
ConsumerTag 生成消费者标签
func (*KeyGenerator) DeadLetterExchangeName ¶
func (kg *KeyGenerator) DeadLetterExchangeName() string
DeadLetterExchangeName 生成死信交换机名称
func (*KeyGenerator) DeadLetterQueueName ¶
func (kg *KeyGenerator) DeadLetterQueueName(topic string) string
DeadLetterQueueName 生成死信队列名称
func (*KeyGenerator) DelayExchangeName ¶
func (kg *KeyGenerator) DelayExchangeName() string
DelayExchangeName 生成延时交换机名称
func (*KeyGenerator) DelayQueueName ¶
func (kg *KeyGenerator) DelayQueueName(topic string) string
DelayQueueName 生成延时队列名称
func (*KeyGenerator) ExchangeName ¶
func (kg *KeyGenerator) ExchangeName() string
ExchangeName 生成交换机名称
func (*KeyGenerator) QueueName ¶
func (kg *KeyGenerator) QueueName(topic string) string
QueueName 生成队列名称
func (*KeyGenerator) RoutingKey ¶
func (kg *KeyGenerator) RoutingKey(topic string) string
RoutingKey 生成路由键
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer RabbitMQ生产者
func NewRabbitProducer ¶
func NewRabbitProducer( pool *ConnectionPool, observer observability.Observer, recorder *observability.MetricsRecorder, config config.RabbitMQConfig, ser serializer.Serializer, keyGen *KeyGenerator, messagePool *pool.MessagePool, byteBufferPool *pool.ByteBufferPool, ) *Producer
NewRabbitProducer 创建RabbitMQ生产者
type RabbitMQ ¶
type RabbitMQ struct {
// contains filtered or unexported fields
}
RabbitMQ RabbitMQ消息队列实现
func NewRabbitMQ ¶
func NewRabbitMQ(cfg config.RabbitMQConfig, observer observability.Observer, keyPrefix string) (*RabbitMQ, error)
NewRabbitMQ 创建RabbitMQ消息队列
Click to show internal directories.
Click to hide internal directories.