Documentation
¶
Overview ¶
Package mq 提供消息队列组件,支持 NATS Core, JetStream, Redis Stream 等多种后端。
MQ 组件是 Genesis 微服务组件库的消息中间件抽象层,提供统一的发布-订阅语义。 设计原则:
- 简单优于复杂:核心接口精简,通过 Option 扩展能力
- 显式优于隐式:不做自动注入,用户完全掌控消息流
- 可扩展性:Transport 接口设计兼顾未来 Kafka 等重量级 MQ
Index ¶
- Constants
- Variables
- type Capabilities
- type Config
- type DeadLetterConfig
- type Driver
- type Handler
- type Headers
- type JetStreamConfig
- type MQ
- type Message
- type Middleware
- type Option
- type PublishOption
- type RedisStreamConfig
- type RetryConfig
- type SubscribeOption
- func WithAutoAck() SubscribeOption
- func WithBatchSize(size int) SubscribeOption
- func WithBufferSize(size int) SubscribeOption
- func WithDeadLetter(maxRetries int, topic string) SubscribeOption
- func WithDurable(name string) SubscribeOption
- func WithManualAck() SubscribeOption
- func WithMaxInflight(n int) SubscribeOption
- func WithQueueGroup(name string) SubscribeOption
- type Subscription
- type Transport
Constants ¶
const ( // MetricPublishTotal 发布消息总数 MetricPublishTotal = "mq.publish.total" // MetricPublishDuration 发布延迟(秒) MetricPublishDuration = "mq.publish.duration" // MetricConsumeTotal 消费消息总数 MetricConsumeTotal = "mq.consume.total" // MetricHandleDuration 消息处理耗时(秒) MetricHandleDuration = "mq.handle.duration" )
指标名称常量
const ( // LabelTopic 主题标签 LabelTopic = "topic" // LabelStatus 状态标签(success/error) LabelStatus = "status" // LabelDriver 驱动标签 LabelDriver = "driver" )
标签名称常量
Variables ¶
var ( // ErrClosed MQ 已关闭 ErrClosed = xerrors.New("mq: client closed") // ErrInvalidConfig 配置无效 ErrInvalidConfig = xerrors.New("mq: invalid config") // ErrNotSupported 操作不支持 ErrNotSupported = xerrors.New("mq: operation not supported by this driver") // ErrSubscriptionClosed 订阅已关闭 ErrSubscriptionClosed = xerrors.New("mq: subscription closed") // ErrPanicRecovered Handler panic 已恢复 ErrPanicRecovered = xerrors.New("mq: handler panic recovered") )
预定义错误
var CapabilitiesKafka = Capabilities{ Persistence: true, ExactlyOnce: true, Nak: false, DeadLetter: true, QueueGroup: true, OrderedDelivery: true, BatchConsume: true, DelayedMessage: false, }
CapabilitiesKafka Kafka 的能力描述(预留)
var CapabilitiesNATSCore = Capabilities{ Persistence: false, ExactlyOnce: false, Nak: false, DeadLetter: false, QueueGroup: true, OrderedDelivery: false, BatchConsume: false, DelayedMessage: false, }
CapabilitiesNATSCore NATS Core 的能力描述
var CapabilitiesNATSJetStream = Capabilities{ Persistence: true, ExactlyOnce: true, Nak: true, DeadLetter: false, QueueGroup: true, OrderedDelivery: true, BatchConsume: true, DelayedMessage: false, }
CapabilitiesNATSJetStream NATS JetStream 的能力描述
var CapabilitiesRedisStream = Capabilities{ Persistence: true, ExactlyOnce: false, Nak: false, DeadLetter: false, QueueGroup: true, OrderedDelivery: true, BatchConsume: true, DelayedMessage: false, }
CapabilitiesRedisStream Redis Stream 的能力描述
var DefaultRetryConfig = RetryConfig{ MaxRetries: 3, InitialBackoff: 100 * time.Millisecond, MaxBackoff: 5 * time.Second, Multiplier: 2.0, }
DefaultRetryConfig 默认重试配置
Functions ¶
This section is empty.
Types ¶
type Capabilities ¶
type Capabilities struct {
// Persistence 是否支持消息持久化
Persistence bool
// ExactlyOnce 是否支持精确一次语义
ExactlyOnce bool
// Nak 是否支持消息拒绝重投
Nak bool
// DeadLetter 是否支持死信队列
DeadLetter bool
// QueueGroup 是否支持队列组(负载均衡)
QueueGroup bool
// OrderedDelivery 是否保证顺序投递
OrderedDelivery bool
// BatchConsume 是否支持批量消费
BatchConsume bool
// DelayedMessage 是否支持延迟消息
DelayedMessage bool
}
Capabilities 描述 Transport 支持的能力
不同后端能力差异较大,通过此结构暴露给上层:
- NATS Core: 最简单,无持久化
- NATS JetStream: 持久化、重投、死信队列
- Redis Stream: 持久化、Consumer Group
- Kafka (未来): 持久化、分区、事务
type Config ¶
type Config struct {
// Driver 底层驱动类型
// 必填,可选值:nats_core, nats_jetstream, redis_stream
Driver Driver `json:"driver" yaml:"driver"`
// JetStream JetStream 特有配置(仅 DriverNATSJetStream 时生效)
JetStream *JetStreamConfig `json:"jetstream,omitempty" yaml:"jetstream,omitempty"`
// RedisStream Redis Stream 特有配置(仅 DriverRedisStream 时生效)
RedisStream *RedisStreamConfig `json:"redis_stream,omitempty" yaml:"redis_stream,omitempty"`
}
Config MQ 配置
type DeadLetterConfig ¶
type DeadLetterConfig struct {
// MaxRetries 最大重试次数,超过后进入死信队列
MaxRetries int
// Topic 死信队列主题
Topic string
}
DeadLetterConfig 死信队列配置
注意:当前为预留配置,各驱动暂未实现。 未来实现计划:
- JetStream: 利用 RedeliveryPolicy + 自定义逻辑
- Redis Stream: 基于 Pending 列表 + XCLAIM 实现
- Kafka: 发送到 error topic
type Driver ¶
type Driver string
Driver 驱动类型
const ( // DriverNATSCore NATS Core 驱动(高性能,无持久化) DriverNATSCore Driver = "nats_core" // DriverNATSJetStream NATS JetStream 驱动(持久化,支持重投) DriverNATSJetStream Driver = "nats_jetstream" // DriverRedisStream Redis Stream 驱动(持久化队列) DriverRedisStream Driver = "redis_stream" // DriverKafka Kafka 驱动(预留,未实现) DriverKafka Driver = "kafka" )
type Handler ¶
Handler 消息处理函数
设计说明:只接收 Message 参数,通过 msg.Context() 获取上下文, 避免 ctx 和 msg.Context() 同时存在造成的困惑。
返回值:
- nil: 处理成功,AutoAck 模式下自动确认
- error: 处理失败,AutoAck 模式下自动 Nak(如后端支持)
重要提示(JetStream 用户必读):
- JetStream 的 Nak 会触发消息重新投递
- 如果返回 error 是非暂时性的(如数据格式错误),消息会无限重投
- 解决方案: 1. 使用 WithManualAck 手动控制 Ack/Nak 2. 对于不可恢复的错误,也调用 Ack() 并记录日志 3. 结合 WithRetry 中间件在应用层重试,失败后仍 Ack
type JetStreamConfig ¶
type JetStreamConfig struct {
// AutoCreateStream 是否自动创建 Stream(如果不存在)
// 生产环境建议关闭,通过运维手动创建
AutoCreateStream bool `json:"auto_create_stream" yaml:"auto_create_stream"`
// StreamPrefix Stream 名称前缀,默认 "S-"
StreamPrefix string `json:"stream_prefix" yaml:"stream_prefix"`
}
JetStreamConfig JetStream 特有配置
type MQ ¶
type MQ interface {
// Publish 发布消息到指定主题
//
// 参数:
// - ctx: 上下文,用于超时控制和取消
// - topic: 消息主题(NATS subject / Redis stream key)
// - data: 消息体
// - opts: 发布选项(Headers 等)
Publish(ctx context.Context, topic string, data []byte, opts ...PublishOption) error
// Subscribe 订阅主题并处理消息
//
// Handler 签名:func(msg Message) error
// 通过 msg.Context() 获取上下文,避免参数冗余。
//
// 参数:
// - ctx: 订阅生命周期上下文,取消时自动停止订阅
// - topic: 订阅主题
// - handler: 消息处理函数
// - opts: 订阅选项(QueueGroup、AutoAck 等)
Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscription, error)
// Close 关闭 MQ 客户端
// 注意:底层连接由 Connector 管理,此方法仅释放 MQ 内部资源
Close() error
}
MQ 消息队列核心接口
提供统一的发布订阅能力,屏蔽底层实现差异。 支持的后端:NATS Core、NATS JetStream、Redis Stream
type Message ¶
type Message interface {
// Context 获取消息处理上下文
//
// 该上下文继承自 Subscribe 调用时的 ctx,可用于:
// - 超时控制
// - 取消传播
// - 传递 trace 信息(需业务自行注入)
Context() context.Context
// Topic 获取消息主题
Topic() string
// Data 获取消息体(原始字节)
Data() []byte
// Headers 获取消息头(返回副本)
Headers() Headers
// Ack 确认消息处理成功
//
// 不同后端行为:
// - NATS Core: 无操作(无持久化语义)
// - NATS JetStream: 发送 Ack 到服务端
// - Redis Stream: Consumer Group 模式下执行 XACK
// - Kafka (未来): 提交 offset
Ack() error
// Nak 拒绝消息,请求重投
//
// 不同后端行为:
// - NATS Core: 无操作
// - NATS JetStream: 发送 Nak,触发重投
// - Redis Stream: 无原生支持,消息留在 Pending 列表
// - Kafka (未来): 不提交 offset,触发 rebalance 后重投
//
// 注意:部分后端不支持真正的 Nak,错误处理需结合业务设计。
Nak() error
// ID 获取消息唯一标识(可选)
//
// 不同后端返回值:
// - NATS Core: 空字符串
// - NATS JetStream: Stream sequence
// - Redis Stream: 消息 ID (如 "1234567890-0")
ID() string
}
Message 消息接口
封装底层消息细节,提供统一的数据访问和确认机制。 不同后端的 Ack/Nak 行为有差异,详见各方法注释。
type Middleware ¶
Middleware Handler 中间件
中间件模式允许在不修改业务逻辑的情况下增强 Handler 能力。 常见用途:重试、日志、链路追踪、指标等。
func Chain ¶
func Chain(middlewares ...Middleware) Middleware
Chain 将多个中间件串联成一个
执行顺序:第一个中间件最先执行,最后一个最接近原始 Handler。
示例:
handler = mq.Chain(logging, retry, tracing)(handler) // 执行顺序:logging -> retry -> tracing -> handler
func WithLogging ¶
func WithLogging(logger clog.Logger) Middleware
WithLogging 创建日志中间件
记录每条消息的处理情况,包括:topic、消息 ID、处理耗时、错误信息。
func WithRecover ¶
func WithRecover(logger clog.Logger) Middleware
WithRecover 创建 panic 恢复中间件
捕获 Handler 中的 panic,转换为错误返回,避免整个消费者崩溃。
func WithRetry ¶
func WithRetry(cfg RetryConfig, logger clog.Logger) Middleware
WithRetry 创建重试中间件
在应用层实现重试逻辑(不依赖 MQ 后端的重投机制)。 适用于幂等操作或可安全重试的场景。
注意:
- 重试发生在单次消息处理内,不影响 MQ 层面的 Ack/Nak
- 如果所有重试都失败,最终错误会返回给上层(可能触发 Nak)
示例:
handler := mq.WithRetry(mq.DefaultRetryConfig, logger)(myHandler)
type Option ¶
type Option func(*options)
Option MQ 配置选项
func WithNATSConnector ¶
func WithNATSConnector(conn connector.NATSConnector) Option
WithNATSConnector 注入 NATS 连接器(用于 NATS Core / JetStream)
func WithRedisConnector ¶
func WithRedisConnector(conn connector.RedisConnector) Option
WithRedisConnector 注入 Redis 连接器(用于 Redis Stream)
type PublishOption ¶
type PublishOption func(*publishOptions)
PublishOption 发布选项
func WithHeaders ¶
func WithHeaders(h Headers) PublishOption
WithHeaders 设置消息头
示例:
mq.Publish(ctx, "topic", data, mq.WithHeaders(mq.Headers{
"trace-id": "abc123",
}))
func WithKey ¶
func WithKey(key string) PublishOption
WithKey 设置消息 Key(用于分区路由)
注意:当前仅预留,NATS/Redis 不使用此选项。 Kafka 场景下用于保证相同 Key 的消息路由到同一分区。
type RedisStreamConfig ¶
type RedisStreamConfig struct {
// MaxLen Stream 最大长度,0 表示不限制
// 超过后自动裁剪旧消息
MaxLen int64 `json:"max_len" yaml:"max_len"`
// Approximate 是否使用近似裁剪(MAXLEN ~)
// 开启后性能更好,但长度控制不精确
Approximate bool `json:"approximate" yaml:"approximate"`
}
RedisStreamConfig Redis Stream 特有配置
type RetryConfig ¶
type RetryConfig struct {
// MaxRetries 最大重试次数(不含首次执行)
MaxRetries int
// InitialBackoff 初始退避时间
InitialBackoff time.Duration
// MaxBackoff 最大退避时间
MaxBackoff time.Duration
// Multiplier 退避倍数
Multiplier float64
}
RetryConfig 重试配置
type SubscribeOption ¶
type SubscribeOption func(*subscribeOptions)
SubscribeOption 订阅选项
func WithBatchSize ¶
func WithBatchSize(size int) SubscribeOption
WithBatchSize 设置批量拉取大小
影响单次拉取的消息数量,适当增大可提升吞吐量。 默认值:10
func WithBufferSize ¶
func WithBufferSize(size int) SubscribeOption
WithBufferSize 设置内部缓冲区大小
默认值:100
func WithDeadLetter ¶
func WithDeadLetter(maxRetries int, topic string) SubscribeOption
WithDeadLetter 设置死信队列配置
注意:当前为预留配置,各驱动暂未实现。 调用此选项不会报错,但不会生效。
func WithDurable ¶
func WithDurable(name string) SubscribeOption
WithDurable 设置持久化订阅名称
持久化订阅会记录消费进度,重启后继续消费。 仅 JetStream / Redis Stream 有效。
func WithManualAck ¶
func WithManualAck() SubscribeOption
WithManualAck 关闭自动确认
启用后需要在 Handler 中手动调用 msg.Ack() 或 msg.Nak()。 适用于需要精确控制确认时机的场景。
重要说明(JetStream):
- 调用 msg.Nak() 会触发消息重新投递
- 对于无法恢复的错误,应该 Ack() 而非 Nak(),避免无限循环
- 建议配合 WithRetry 中间件在应用层重试
func WithMaxInflight ¶
func WithMaxInflight(n int) SubscribeOption
WithMaxInflight 设置最大在途消息数
限制未确认消息的数量,用于背压控制。 仅 JetStream 有效(对应 MaxAckPending)。
func WithQueueGroup ¶
func WithQueueGroup(name string) SubscribeOption
WithQueueGroup 设置队列组(用于负载均衡)
同一队列组内的消费者竞争消费消息,实现负载均衡。 不同队列组独立消费,实现广播。
对应关系:
- NATS: Queue Subscribe
- Redis Stream: Consumer Group
- Kafka (未来): Consumer Group
type Subscription ¶
type Subscription interface {
// Unsubscribe 取消订阅
//
// 调用后停止接收新消息。
// 注意:不保证等待当前 Handler 完成,具体行为依赖后端实现。
Unsubscribe() error
// Done 返回一个 channel,订阅结束时关闭
//
// 可用于等待订阅完全停止:
// <-sub.Done()
// sub.Unsubscribe()
Done() <-chan struct{}
}
Subscription 订阅句柄
用于管理订阅的生命周期。
type Transport ¶
type Transport interface {
// Publish 发布消息
//
// 实现要求:
// - 支持 Headers 透传
// - 错误应包含足够上下文信息
Publish(ctx context.Context, topic string, data []byte, opts publishOptions) error
// Subscribe 订阅消息
//
// 实现要求:
// - 将 subscribeCtx 传递给 Message.Context()
// - 根据 opts.AutoAck 决定是否自动确认
// - 支持 QueueGroup 负载均衡
Subscribe(subscribeCtx context.Context, topic string, handler Handler, opts subscribeOptions) (Subscription, error)
// Close 关闭 Transport
//
// 注意:底层连接由 Connector 管理,此方法仅释放 Transport 内部资源。
Close() error
// Capabilities 返回该 Transport 支持的能力
//
// 用于运行时能力检查,避免用户使用不支持的功能。
Capabilities() Capabilities
}
Transport 底层传输层接口(内部使用)
定义了 MQ 后端必须实现的核心能力。 设计考量:
- 接口精简,只包含必需方法
- 扩展能力通过 Option 传递,不污染接口签名
- 为未来 Kafka 等重量级 MQ 预留扩展空间