Documentation
¶
Overview ¶
Package rabbitmq 提供RabbitMQ消息队列的Go实现
功能特性:
- 生产者: 支持消息发布、批量发布、延时消息
- 消费者: 支持消息消费、自动重试、死信队列
- 安全: 支持消息加密和数字签名验证
- 连接: 支持连接池、自动重连、健康检查
- 监控: 提供详细的连接和队列统计信息
交换机类型常量:
- ExchangeDirect: 直接交换机,根据路由键精确匹配
- ExchangeTopic: 主题交换机,支持通配符匹配
- ExchangeHeaders: 头交换机,根据消息头匹配
- ExchangeFanout: 扇出交换机,广播到所有绑定队列
使用示例:
config := AmqpConfig{
Host: "localhost",
Port: 5672,
Username: "guest",
Password: "guest",
}
conn, err := ConnectRabbitMQ(config)
// 使用交换机常量
option := Option{
Exchange: "my.exchange",
Queue: "my.queue",
Kind: ExchangeDirect, // 使用常量避免拼写错误
}
Index ¶
- Constants
- func ConnectRabbitMQ(conf AmqpConfig) (*amqp.Connection, error)
- func GetConnectionInfo(conn *amqp.Connection) (map[string]interface{}, error)
- func PutMsgData(msg *MsgData)
- type AmqpConfig
- type Config
- type DLX
- type DLXConfig
- type MsgData
- type Option
- type PublishMQ
- type PublishManager
- func (self *PublishManager) Close() error
- func (self *PublishManager) Connect() error
- func (self *PublishManager) InitConfig(confs ...AmqpConfig) error
- func (self *PublishManager) Publish(exchange, queue string, dataType int64, content interface{}) error
- func (self *PublishManager) PublishMsgData(data *MsgData) error
- func (self *PublishManager) Queue(data *MsgData) (*QueueData, error)
- type PullManager
- type PullReceiver
- type QueueData
Constants ¶
const ( ExchangeDirect = "direct" // 直接交换机: 根据路由键精确匹配 ExchangeTopic = "topic" // 主题交换机: 支持通配符匹配 ExchangeHeaders = "headers" // 头交换机: 根据消息头匹配 ExchangeFanout = "fanout" // 扇出交换机: 广播到所有绑定队列 )
交换机类型常量定义(公开常量,增强类型安全)
Variables ¶
This section is empty.
Functions ¶
func ConnectRabbitMQ ¶
func ConnectRabbitMQ(conf AmqpConfig) (*amqp.Connection, error)
ConnectRabbitMQ 建立到RabbitMQ服务器的连接
使用提供的配置参数建立AMQP连接,支持高级连接配置: - 自定义心跳间隔 - 连接超时控制 - 通道数量限制 - 帧大小配置 - 连接属性标识
连接建立过程: 1. 验证并设置默认配置 2. 构建AMQP URI 3. 配置连接参数 4. 建立TCP连接并进行AMQP握手
返回:
- *amqp.Connection: 成功建立的连接对象
- error: 连接失败时的详细错误信息
注意: 调用者负责在适当时候关闭连接。
func GetConnectionInfo ¶ added in v1.1.0
func GetConnectionInfo(conn *amqp.Connection) (map[string]interface{}, error)
GetConnectionInfo 获取RabbitMQ连接的详细信息
返回连接的当前状态信息,便于调试和监控: - 连接是否已关闭 - 本地网络地址信息
参数:
conn: RabbitMQ连接对象,不能为nil
返回:
- map[string]interface{}: 包含连接状态信息的字典
- error: 获取信息失败时的错误
主要用于: - 连接健康检查 - 调试连接问题 - 监控连接状态
Types ¶
type AmqpConfig ¶
type AmqpConfig struct {
// 字符串字段(16字节,8字节对齐)
DsName string // 数据源名称,用于标识不同的RabbitMQ实例
Host string // RabbitMQ服务器主机地址(必需)
Username string // RabbitMQ用户名(必需)
Password string // RabbitMQ密码,建议加密存储(必需)
SecretKey string // 消息签名和加密使用的密钥
Vhost string // 虚拟主机路径,默认"/"
// time.Duration字段(8字节,8字节对齐)
Heartbeat time.Duration // 连接心跳间隔,默认10秒
ConnectionTimeout time.Duration // 连接超时时间,默认30秒
// int字段(8字节,8字节对齐)
Port int // RabbitMQ服务器端口号,默认5672
ChannelMax int // 最大通道数,0表示无限制
FrameSize int // AMQP帧大小,0表示使用服务器默认值
}
AmqpConfig RabbitMQ连接配置结构体
包含连接RabbitMQ服务器所需的所有配置参数, 支持详细的连接参数配置和安全验证。
func (*AmqpConfig) ValidateAndSetDefaults ¶ added in v1.1.0
func (conf *AmqpConfig) ValidateAndSetDefaults() error
ValidateAndSetDefaults 验证AmqpConfig配置并设置默认值
执行以下验证和设置: 1. 检查必需参数(Host、Username、Password) 2. 设置默认端口号(5672) 3. 设置默认虚拟主机("/") 4. 设置默认心跳间隔(10秒) 5. 设置默认连接超时(30秒) 6. 验证端口号范围(1-65535)
返回错误时会包含详细的错误信息,便于调试。
type Config ¶
type Config struct {
// amqp.Table字段(16字节,8字节对齐)
Args amqp.Table `json:"args"` // 额外的队列声明参数,如死信队列配置
// int字段(8字节,8字节对齐)
PrefetchCount int `json:"prefetch_count"` // 预取消息数量,控制消费者并发处理能力
PrefetchSize int `json:"prefetch_size"` // 预取消息总大小,0表示不限制
// bool字段(1字节对齐)
IsNack bool `json:"is_nack"` // 是否支持消息否定确认,用于重试机制, true是必须确认消息,false是每次消费掉数据
Exclusive bool `json:"exclusive"` // 是否为独占队列,只允许一个消费者
NoWait bool `json:"no_wait"` // 是否不等待服务器确认,异步操作
// 结构体字段(8字节对齐)
Option Option `json:"option"` // 消息队列基础配置选项
}
Config AMQP消费者配置参数
定义消费者连接和消费行为的所有配置选项, 包括QoS参数、确认模式、重试策略等。
func (*Config) ValidateAndSetDefaults ¶ added in v1.1.0
ValidateAndSetDefaults 验证Config配置并设置默认值
执行以下验证和设置: 1. 检查必需的Option参数(Exchange、Queue) 2. 设置默认交换机类型(direct) 3. 设置默认预取参数 4. 验证交换机类型有效性 5. 验证签名类型范围(0或1)
确保消费者配置在合理范围内,避免运行时错误。
type DLX ¶
type DLX struct {
// 字符串字段(16字节,8字节对齐)
DlxExchange string // 死信交换机名称
DlxQueue string // 死信队列名称,存储最终失败的消息
DlkExchange string // 重试交换机名称
DlkQueue string // 重试队列名称,存储待重试的消息
// 函数字段(8字节对齐)
DlkCallFunc func(message MsgData) (MsgData, error) // 重试处理函数,返回处理后的消息或错误
}
DLX 死信队列配置
配置消息消费失败后的死信处理机制, 支持消息重试和最终失败处理。
type DLXConfig ¶ added in v1.1.0
type DLXConfig struct {
DlxExchange string `json:"dlx_exchange"` // 死信交换机名称
DlxQueue string `json:"dlx_queue"` // 死信队列名称
DlxRouter string `json:"dlx_router"` // 死信路由键
}
DLXConfig 死信队列配置
当消息无法被正常消费时,会被路由到死信队列, 用于消息重试、异常处理等场景。 easyjson:json
func (DLXConfig) MarshalEasyJSON ¶ added in v1.1.0
MarshalEasyJSON supports easyjson.Marshaler interface
func (DLXConfig) MarshalJSON ¶ added in v1.1.0
MarshalJSON supports json.Marshaler interface
func (*DLXConfig) UnmarshalEasyJSON ¶ added in v1.1.0
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*DLXConfig) UnmarshalJSON ¶ added in v1.1.0
UnmarshalJSON supports json.Unmarshaler interface
type MsgData ¶
type MsgData struct {
// 字符串字段(16字节对齐)- 按声明顺序排列以优化对齐
Content string `json:"co"` // 消息内容,可以是字符串或其他类型
Nonce string `json:"no"` // 随机数,用于防重放攻击
Signature string `json:"sg"` // 消息数字签名
Expiration string `json:"ex"` // 消息过期时间(RabbitMQ格式)
// int64字段(8字节对齐)
Type int64 `json:"ty"` // 消息类型标识
Delay int64 `json:"dy"` // 延时投递时间(秒),0表示立即投递
Retries int64 `json:"rt"` // 已重试次数
CreatedAt int64 `json:"ct"` // 消息信息的创建时间戳(秒)
// 结构体字段(8字节对齐)
Option Option `json:"op"` // 消息队列配置选项
// 小字段分组(1字节对齐)- 放在最后减少填充
Priority uint8 `json:"pr"` // 消息优先级(0-255)
}
MsgData 消息数据结构体
包含完整的消息信息,支持消息加密、签名、 延时投递、重试等高级功能。 easyjson:json
func (MsgData) MarshalEasyJSON ¶ added in v1.1.0
MarshalEasyJSON supports easyjson.Marshaler interface
func (MsgData) MarshalJSON ¶ added in v1.1.0
MarshalJSON supports json.Marshaler interface
func (*MsgData) UnmarshalEasyJSON ¶ added in v1.1.0
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*MsgData) UnmarshalJSON ¶ added in v1.1.0
UnmarshalJSON supports json.Unmarshaler interface
type Option ¶
type Option struct {
// 字符串字段(16字节对齐)- 按声明顺序排列以优化对齐
Exchange string `json:"ex"` // 交换机名称
Queue string `json:"qe"` // 队列名称
Kind string `json:"kd"` // 交换机类型: direct/topic/headers/fanout
Router string `json:"ru"` // 路由键,用于消息路由
SigKey string `json:"-"` // 签名密钥,用于消息验证和解密
// 8字节对齐字段
SigTyp int `json:"st"` // 签名类型: 0=明文签名, 1=AES加密签名
ConfirmTimeout time.Duration `json:"ct"` // Confirm模式确认超时,默认30秒
DLXConfig *DLXConfig `json:"dlx_config,omitempty"` // 死信队列配置,可选
// 小字段分组(1字节对齐)- 放在最后减少填充
Durable bool `json:"du"` // 是否持久化交换机和队列
AutoDelete bool `json:"ad"` // 是否自动删除
Exclusive bool `json:"ev"` // 是否排他队列
UseTransaction bool `json:"ut"` // 是否使用事务模式(默认true)批量发布时有效
}
Option 消息队列选项配置
定义交换机、队列和路由等消息队列基础配置, 以及消息安全相关的配置参数。 easyjson:json
func (Option) MarshalEasyJSON ¶ added in v1.1.0
MarshalEasyJSON supports easyjson.Marshaler interface
func (Option) MarshalJSON ¶ added in v1.1.0
MarshalJSON supports json.Marshaler interface
func (*Option) UnmarshalEasyJSON ¶ added in v1.1.0
UnmarshalEasyJSON supports easyjson.Unmarshaler interface
func (*Option) UnmarshalJSON ¶ added in v1.1.0
UnmarshalJSON supports json.Unmarshaler interface
type PublishManager ¶
type PublishManager struct {
// contains filtered or unexported fields
}
func NewPublish ¶
func NewPublish(ds ...string) (*PublishManager, error)
func (*PublishManager) Close ¶ added in v1.1.0
func (self *PublishManager) Close() error
Close 关闭PublishManager
func (*PublishManager) InitConfig ¶
func (self *PublishManager) InitConfig(confs ...AmqpConfig) error
InitConfig 初始化配置(支持多数据源配置) 通过双重检查锁定机制确保线程安全,避免重复初始化
func (*PublishManager) Publish ¶
func (self *PublishManager) Publish(exchange, queue string, dataType int64, content interface{}) error
func (*PublishManager) PublishMsgData ¶
func (self *PublishManager) PublishMsgData(data *MsgData) error
type PullManager ¶
type PullManager struct {
// contains filtered or unexported fields
}
PullManager 管理RabbitMQ消费连接和接收器
func (*PullManager) AddPullReceiver ¶
func (self *PullManager) AddPullReceiver(receivers ...*PullReceiver) error
AddPullReceiver 添加接收器
func (*PullManager) Close ¶ added in v1.1.0
func (self *PullManager) Close() error
Close 关闭PullManager
func (*PullManager) HealthCheck ¶ added in v1.1.0
func (self *PullManager) HealthCheck() error
HealthCheck 健康检查 检查RabbitMQ连接和通道的健康状态
func (*PullManager) InitConfig ¶
func (self *PullManager) InitConfig(confs ...AmqpConfig) error
InitConfig 初始化配置(支持多数据源配置) 通过双重检查锁定机制确保线程安全,避免重复初始化
type PullReceiver ¶
type PullReceiver struct {
Config *Config
Callback func(msg *MsgData) error
Debug bool
// contains filtered or unexported fields
}
PullReceiver 消息接收器
func (*PullReceiver) IsHealthy ¶ added in v1.1.0
func (self *PullReceiver) IsHealthy() bool
IsHealthy 检查健康状态 返回接收器的当前健康状态
func (*PullReceiver) OnError ¶
func (self *PullReceiver) OnError(err error)
OnError 错误处理回调 处理接收器运行过程中的错误,记录日志并触发重连
func (*PullReceiver) Stop ¶ added in v1.1.0
func (self *PullReceiver) Stop()
Stop 停止接收器 优雅地停止接收器的运行,清理资源并等待goroutine退出