rabbitmq

package
v1.1.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package rabbitmq 提供RabbitMQ消息队列的Go实现

功能特性:

  • 生产者: 支持消息发布、批量发布、延时消息
  • 消费者: 支持消息消费、自动重试、死信队列
  • 安全: 支持消息加密和数字签名验证
  • 连接: 支持连接池、自动重连、健康检查
  • 监控: 提供详细的连接和队列统计信息

交换机类型常量:

  • ExchangeDirect: 直接交换机,根据路由键精确匹配
  • ExchangeTopic: 主题交换机,支持通配符匹配
  • ExchangeHeaders: 头交换机,根据消息头匹配
  • ExchangeFanout: 扇出交换机,广播到所有绑定队列

使用示例:

config := AmqpConfig{
    Host:     "localhost",
    Port:     5672,
    Username: "guest",
    Password: "guest",
}
conn, err := ConnectRabbitMQ(config)

// 使用交换机常量
option := Option{
    Exchange: "my.exchange",
    Queue:    "my.queue",
    Kind:     ExchangeDirect, // 使用常量避免拼写错误
}

Index

Constants

View Source
const (
	ExchangeDirect  = "direct"  // 直接交换机: 根据路由键精确匹配
	ExchangeTopic   = "topic"   // 主题交换机: 支持通配符匹配
	ExchangeHeaders = "headers" // 头交换机: 根据消息头匹配
	ExchangeFanout  = "fanout"  // 扇出交换机: 广播到所有绑定队列
)

交换机类型常量定义(公开常量,增强类型安全)

Variables

This section is empty.

Functions

func ConnectRabbitMQ

func ConnectRabbitMQ(conf AmqpConfig) (*amqp.Connection, error)

ConnectRabbitMQ 建立到RabbitMQ服务器的连接

使用提供的配置参数建立AMQP连接,支持高级连接配置: - 自定义心跳间隔 - 连接超时控制 - 通道数量限制 - 帧大小配置 - 连接属性标识

连接建立过程: 1. 验证并设置默认配置 2. 构建AMQP URI 3. 配置连接参数 4. 建立TCP连接并进行AMQP握手

返回:

  • *amqp.Connection: 成功建立的连接对象
  • error: 连接失败时的详细错误信息

注意: 调用者负责在适当时候关闭连接。

func GetConnectionInfo added in v1.1.0

func GetConnectionInfo(conn *amqp.Connection) (map[string]interface{}, error)

GetConnectionInfo 获取RabbitMQ连接的详细信息

返回连接的当前状态信息,便于调试和监控: - 连接是否已关闭 - 本地网络地址信息

参数:

conn: RabbitMQ连接对象,不能为nil

返回:

  • map[string]interface{}: 包含连接状态信息的字典
  • error: 获取信息失败时的错误

主要用于: - 连接健康检查 - 调试连接问题 - 监控连接状态

func PutMsgData added in v1.1.0

func PutMsgData(msg *MsgData)

PutMsgData 将MsgData实例归还到对象池

Types

type AmqpConfig

type AmqpConfig struct {
	// 字符串字段(16字节,8字节对齐)
	DsName    string // 数据源名称,用于标识不同的RabbitMQ实例
	Host      string // RabbitMQ服务器主机地址(必需)
	Username  string // RabbitMQ用户名(必需)
	Password  string // RabbitMQ密码,建议加密存储(必需)
	SecretKey string // 消息签名和加密使用的密钥
	Vhost     string // 虚拟主机路径,默认"/"

	// time.Duration字段(8字节,8字节对齐)
	Heartbeat         time.Duration // 连接心跳间隔,默认10秒
	ConnectionTimeout time.Duration // 连接超时时间,默认30秒

	// int字段(8字节,8字节对齐)
	Port       int // RabbitMQ服务器端口号,默认5672
	ChannelMax int // 最大通道数,0表示无限制
	FrameSize  int // AMQP帧大小,0表示使用服务器默认值
}

AmqpConfig RabbitMQ连接配置结构体

包含连接RabbitMQ服务器所需的所有配置参数, 支持详细的连接参数配置和安全验证。

func (*AmqpConfig) ValidateAndSetDefaults added in v1.1.0

func (conf *AmqpConfig) ValidateAndSetDefaults() error

ValidateAndSetDefaults 验证AmqpConfig配置并设置默认值

执行以下验证和设置: 1. 检查必需参数(Host、Username、Password) 2. 设置默认端口号(5672) 3. 设置默认虚拟主机("/") 4. 设置默认心跳间隔(10秒) 5. 设置默认连接超时(30秒) 6. 验证端口号范围(1-65535)

返回错误时会包含详细的错误信息,便于调试。

type Config

type Config struct {
	// amqp.Table字段(16字节,8字节对齐)
	Args amqp.Table `json:"args"` // 额外的队列声明参数,如死信队列配置

	// int字段(8字节,8字节对齐)
	PrefetchCount int `json:"prefetch_count"` // 预取消息数量,控制消费者并发处理能力
	PrefetchSize  int `json:"prefetch_size"`  // 预取消息总大小,0表示不限制

	// bool字段(1字节对齐)
	IsNack    bool `json:"is_nack"`   // 是否支持消息否定确认,用于重试机制, true是必须确认消息,false是每次消费掉数据
	Exclusive bool `json:"exclusive"` // 是否为独占队列,只允许一个消费者
	NoWait    bool `json:"no_wait"`   // 是否不等待服务器确认,异步操作

	// 结构体字段(8字节对齐)
	Option Option `json:"option"` // 消息队列基础配置选项
}

Config AMQP消费者配置参数

定义消费者连接和消费行为的所有配置选项, 包括QoS参数、确认模式、重试策略等。

func (*Config) ValidateAndSetDefaults added in v1.1.0

func (conf *Config) ValidateAndSetDefaults() error

ValidateAndSetDefaults 验证Config配置并设置默认值

执行以下验证和设置: 1. 检查必需的Option参数(Exchange、Queue) 2. 设置默认交换机类型(direct) 3. 设置默认预取参数 4. 验证交换机类型有效性 5. 验证签名类型范围(0或1)

确保消费者配置在合理范围内,避免运行时错误。

type DLX

type DLX struct {
	// 字符串字段(16字节,8字节对齐)
	DlxExchange string // 死信交换机名称
	DlxQueue    string // 死信队列名称,存储最终失败的消息
	DlkExchange string // 重试交换机名称
	DlkQueue    string // 重试队列名称,存储待重试的消息

	// 函数字段(8字节对齐)
	DlkCallFunc func(message MsgData) (MsgData, error) // 重试处理函数,返回处理后的消息或错误
}

DLX 死信队列配置

配置消息消费失败后的死信处理机制, 支持消息重试和最终失败处理。

type DLXConfig added in v1.1.0

type DLXConfig struct {
	DlxExchange string `json:"dlx_exchange"` // 死信交换机名称
	DlxQueue    string `json:"dlx_queue"`    // 死信队列名称
	DlxRouter   string `json:"dlx_router"`   // 死信路由键
}

DLXConfig 死信队列配置

当消息无法被正常消费时,会被路由到死信队列, 用于消息重试、异常处理等场景。 easyjson:json

func (DLXConfig) MarshalEasyJSON added in v1.1.0

func (v DLXConfig) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (DLXConfig) MarshalJSON added in v1.1.0

func (v DLXConfig) MarshalJSON() ([]byte, error)

MarshalJSON supports json.Marshaler interface

func (*DLXConfig) UnmarshalEasyJSON added in v1.1.0

func (v *DLXConfig) UnmarshalEasyJSON(l *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*DLXConfig) UnmarshalJSON added in v1.1.0

func (v *DLXConfig) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

type MsgData

type MsgData struct {
	// 字符串字段(16字节对齐)- 按声明顺序排列以优化对齐
	Content    string `json:"co"` // 消息内容,可以是字符串或其他类型
	Nonce      string `json:"no"` // 随机数,用于防重放攻击
	Signature  string `json:"sg"` // 消息数字签名
	Expiration string `json:"ex"` // 消息过期时间(RabbitMQ格式)

	// int64字段(8字节对齐)
	Type      int64 `json:"ty"` // 消息类型标识
	Delay     int64 `json:"dy"` // 延时投递时间(秒),0表示立即投递
	Retries   int64 `json:"rt"` // 已重试次数
	CreatedAt int64 `json:"ct"` // 消息信息的创建时间戳(秒)

	// 结构体字段(8字节对齐)
	Option Option `json:"op"` // 消息队列配置选项

	// 小字段分组(1字节对齐)- 放在最后减少填充
	Priority uint8 `json:"pr"` // 消息优先级(0-255)
}

MsgData 消息数据结构体

包含完整的消息信息,支持消息加密、签名、 延时投递、重试等高级功能。 easyjson:json

func GetMsgData added in v1.1.0

func GetMsgData() *MsgData

GetMsgData 从对象池获取MsgData实例

func (MsgData) MarshalEasyJSON added in v1.1.0

func (v MsgData) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (MsgData) MarshalJSON added in v1.1.0

func (v MsgData) MarshalJSON() ([]byte, error)

MarshalJSON supports json.Marshaler interface

func (*MsgData) Reset added in v1.1.0

func (m *MsgData) Reset()

Reset 重置MsgData实例到初始状态

func (*MsgData) UnmarshalEasyJSON added in v1.1.0

func (v *MsgData) UnmarshalEasyJSON(l *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*MsgData) UnmarshalJSON added in v1.1.0

func (v *MsgData) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

type Option

type Option struct {
	// 字符串字段(16字节对齐)- 按声明顺序排列以优化对齐
	Exchange string `json:"ex"` // 交换机名称
	Queue    string `json:"qe"` // 队列名称
	Kind     string `json:"kd"` // 交换机类型: direct/topic/headers/fanout
	Router   string `json:"ru"` // 路由键,用于消息路由
	SigKey   string `json:"-"`  // 签名密钥,用于消息验证和解密

	// 8字节对齐字段
	SigTyp         int           `json:"st"`                   // 签名类型: 0=明文签名, 1=AES加密签名
	ConfirmTimeout time.Duration `json:"ct"`                   // Confirm模式确认超时,默认30秒
	DLXConfig      *DLXConfig    `json:"dlx_config,omitempty"` // 死信队列配置,可选

	// 小字段分组(1字节对齐)- 放在最后减少填充
	Durable        bool `json:"du"` // 是否持久化交换机和队列
	AutoDelete     bool `json:"ad"` // 是否自动删除
	Exclusive      bool `json:"ev"` // 是否排他队列
	UseTransaction bool `json:"ut"` // 是否使用事务模式(默认true)批量发布时有效
}

Option 消息队列选项配置

定义交换机、队列和路由等消息队列基础配置, 以及消息安全相关的配置参数。 easyjson:json

func (Option) MarshalEasyJSON added in v1.1.0

func (v Option) MarshalEasyJSON(w *jwriter.Writer)

MarshalEasyJSON supports easyjson.Marshaler interface

func (Option) MarshalJSON added in v1.1.0

func (v Option) MarshalJSON() ([]byte, error)

MarshalJSON supports json.Marshaler interface

func (*Option) UnmarshalEasyJSON added in v1.1.0

func (v *Option) UnmarshalEasyJSON(l *jlexer.Lexer)

UnmarshalEasyJSON supports easyjson.Unmarshaler interface

func (*Option) UnmarshalJSON added in v1.1.0

func (v *Option) UnmarshalJSON(data []byte) error

UnmarshalJSON supports json.Unmarshaler interface

type PublishMQ

type PublishMQ struct {
	// contains filtered or unexported fields
}

type PublishManager

type PublishManager struct {
	// contains filtered or unexported fields
}

func NewPublish

func NewPublish(ds ...string) (*PublishManager, error)

func (*PublishManager) Close added in v1.1.0

func (self *PublishManager) Close() error

Close 关闭PublishManager

func (*PublishManager) Connect

func (self *PublishManager) Connect() error

Connect 建立连接

func (*PublishManager) InitConfig

func (self *PublishManager) InitConfig(confs ...AmqpConfig) error

InitConfig 初始化配置(支持多数据源配置) 通过双重检查锁定机制确保线程安全,避免重复初始化

func (*PublishManager) Publish

func (self *PublishManager) Publish(exchange, queue string, dataType int64, content interface{}) error

func (*PublishManager) PublishMsgData

func (self *PublishManager) PublishMsgData(data *MsgData) error

func (*PublishManager) Queue

func (self *PublishManager) Queue(data *MsgData) (*QueueData, error)

type PullManager

type PullManager struct {
	// contains filtered or unexported fields
}

PullManager 管理RabbitMQ消费连接和接收器

func NewPull

func NewPull(ds ...string) (*PullManager, error)

NewPull 快捷创建PullManager客户端

func (*PullManager) AddPullReceiver

func (self *PullManager) AddPullReceiver(receivers ...*PullReceiver) error

AddPullReceiver 添加接收器

func (*PullManager) Close added in v1.1.0

func (self *PullManager) Close() error

Close 关闭PullManager

func (*PullManager) Connect

func (self *PullManager) Connect() error

Connect 建立连接

func (*PullManager) HealthCheck added in v1.1.0

func (self *PullManager) HealthCheck() error

HealthCheck 健康检查 检查RabbitMQ连接和通道的健康状态

func (*PullManager) InitConfig

func (self *PullManager) InitConfig(confs ...AmqpConfig) error

InitConfig 初始化配置(支持多数据源配置) 通过双重检查锁定机制确保线程安全,避免重复初始化

type PullReceiver

type PullReceiver struct {
	Config   *Config
	Callback func(msg *MsgData) error
	Debug    bool
	// contains filtered or unexported fields
}

PullReceiver 消息接收器

func (*PullReceiver) IsHealthy added in v1.1.0

func (self *PullReceiver) IsHealthy() bool

IsHealthy 检查健康状态 返回接收器的当前健康状态

func (*PullReceiver) OnError

func (self *PullReceiver) OnError(err error)

OnError 错误处理回调 处理接收器运行过程中的错误,记录日志并触发重连

func (*PullReceiver) Stop added in v1.1.0

func (self *PullReceiver) Stop()

Stop 停止接收器 优雅地停止接收器的运行,清理资源并等待goroutine退出

type QueueData

type QueueData struct {
	Name      string `json:"name"`      // 队列名称
	Consumers int    `json:"consumers"` // 当前消费者数量
	Messages  int    `json:"messages"`  // 队列中待消费的消息数量
}

QueueData 队列状态数据

包含队列的实时状态信息,用于监控和管理。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL