mq

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2026 License: MIT Imports: 15 Imported by: 0

README

mq - Genesis 消息队列组件

Go Reference

mq 是 Genesis 业务层的消息队列抽象组件,提供统一的发布订阅 API,支持 NATS Core、NATS JetStream、Redis Stream 等多种后端实现。

设计理念

  • 简单优于复杂:核心接口精简,通过 Option 扩展能力
  • 显式优于隐式:不做自动注入,用户完全掌控消息流
  • 可扩展性:Transport 接口设计兼顾未来 Kafka 等重量级 MQ

支持的后端

驱动 说明 持久化 消息确认 队列组
nats_core NATS Core(高性能)
nats_jetstream NATS JetStream
redis_stream Redis Stream
kafka Kafka(预留) - - -

特性对比

特性 NATS Core JetStream Redis Stream
持久化
消息确认 (Ack)
消息拒绝 (Nak) ❌*
队列组
批量消费
最大在途限制
持久化订阅

*Redis Stream 无原生 Nak,消息留在 Pending 列表可被 XCLAIM

快速开始

安装
go get github.com/ceyewan/genesis/mq
NATS JetStream 示例
package main

import (
    "context"

    "github.com/ceyewan/genesis/connector"
    "github.com/ceyewan/genesis/mq"
)

func main() {
    ctx := context.Background()

    // 1. 创建 NATS 连接
    natsConn, _ := connector.NewNATS(&connector.NATSConfig{
        URLs: []string{"nats://localhost:4222"},
    })
    _ = natsConn.Connect(ctx)
    defer natsConn.Close()

    // 2. 创建 MQ 实例
    mq, _ := mq.New(&mq.Config{
        Driver: mq.DriverNATSJetStream,
        JetStream: &mq.JetStreamConfig{
            AutoCreateStream: true,
        },
    }, mq.WithNATSConnector(natsConn))
    defer mq.Close()

    // 3. 订阅消息
    sub, _ := mq.Subscribe(ctx, "orders.created", func(msg mq.Message) error {
        // 处理消息,返回 nil 自动 Ack
        return processOrder(msg.Data())
    }, mq.WithQueueGroup("order-workers"))

    // 4. 发布消息
    _ = mq.Publish(ctx, "orders.created", []byte(`{"id": 123}`),
        mq.WithHeader("trace-id", "abc123"))

    // 5. 等待订阅结束
    <-sub.Done()
}

func processOrder(data []byte) error {
    // 业务逻辑
    return nil
}
Redis Stream 示例
// 创建 Redis 连接
redisConn, _ := connector.NewRedis(&connector.RedisConfig{
    Addr: "localhost:6379",
})
_ = redisConn.Connect(ctx)
defer redisConn.Close()

// 创建 MQ 实例
mq, _ := mq.New(&mq.Config{
    Driver: mq.DriverRedisStream,
    RedisStream: &mq.RedisStreamConfig{
        MaxLen: 10000,
    },
}, mq.WithRedisConnector(redisConn))
defer mq.Close()

// 订阅(Consumer Group 模式)
mq.Subscribe(ctx, "events", handler,
    mq.WithQueueGroup("event-processors"),
    mq.WithDurable("worker-1"),
    mq.WithBatchSize(50),
)

核心接口

MQ

消息队列核心接口,提供发布订阅能力。

type MQ interface {
    // 发布消息
    Publish(ctx context.Context, topic string, data []byte, opts ...PublishOption) error

    // 订阅消息
    Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscription, error)

    // 关闭客户端
    Close() error
}
Message

消息接口,提供统一的数据访问和确认机制。

type Message interface {
    Context() context.Context  // 获取处理上下文
    Topic() string              // 获取主题
    Data() []byte               // 获取消息体
    Headers() Headers           // 获取消息头
    Ack() error                 // 确认消息
    Nak() error                 // 拒绝消息
    ID() string                 // 获取消息ID
}
Handler

消息处理函数,只接收 Message 参数,通过 msg.Context() 获取上下文。

type Handler func(msg Message) error

设计说明:去掉冗余的 ctx 参数,避免 ctx 和 msg.Context() 同时存在造成的困惑。

Subscription

订阅句柄,用于管理订阅生命周期。

type Subscription interface {
    Unsubscribe() error           // 取消订阅
    Done() <-chan struct{}        // 订阅结束时关闭
}

配置选项

创建 MQ
mq, err := mq.New(&mq.Config{
    Driver: mq.DriverNATSJetStream,
    JetStream: &mq.JetStreamConfig{
        AutoCreateStream: true,
        StreamPrefix:     "S-",
    },
},
    mq.WithLogger(logger),      // 注入日志器
    mq.WithMeter(meter),        // 注入指标收集器
    mq.WithNATSConnector(conn), // 注入连接器
)
发布选项
mq.Publish(ctx, "topic", data,
    mq.WithHeaders(mq.Headers{"trace-id": "abc123"}),  // 设置消息头
    mq.WithHeader("key", "value"),                      // 设置单个消息头
    mq.WithKey("routing-key"),                          // 设置路由键(预留)
)
订阅选项
mq.Subscribe(ctx, "topic", handler,
    mq.WithQueueGroup("workers"),     // 队列组(负载均衡)
    mq.WithManualAck(),               // 关闭自动确认,手动调用 msg.Ack()
    mq.WithAutoAck(),                 // 开启自动确认(默认)
    mq.WithDurable("durable-1"),      // 持久化订阅名(JetStream/Redis)
    mq.WithBatchSize(50),             // 批量拉取大小(默认 10)
    mq.WithMaxInflight(100),          // 最大在途消息数(JetStream)
    mq.WithBufferSize(100),           // 内部缓冲区大小(默认 100)
)

中间件

mq 提供中间件机制增强 Handler,支持链式组合。

内置中间件
// 重试中间件
retryHandler := mq.WithRetry(mq.DefaultRetryConfig, logger)(handler)

// 日志中间件
loggedHandler := mq.WithLogging(logger)(handler)

// Panic 恢复中间件
safeHandler := mq.WithRecover(logger)(handler)
链式组合

执行顺序:第一个中间件最先执行,最后一个最接近原始 Handler。

handler = mq.Chain(
    mq.WithRecover(logger),            // 最外层:捕获 panic
    mq.WithLogging(logger),            // 中间层:记录日志
    mq.WithRetry(mq.DefaultRetryConfig, logger), // 内层:重试
)(handler)

// 执行顺序:WithRecover -> WithLogging -> WithRetry -> handler
重试配置
type RetryConfig struct {
    MaxRetries     int           // 最大重试次数(不含首次)
    InitialBackoff time.Duration // 初始退避时间
    MaxBackoff     time.Duration // 最大退避时间
    Multiplier     float64       // 退避倍数
}

// 默认配置
var DefaultRetryConfig = RetryConfig{
    MaxRetries:     3,
    InitialBackoff: 100 * time.Millisecond,
    MaxBackoff:     5 * time.Second,
    Multiplier:     2.0,
}

指标

指标名 类型 描述
mq.publish.total Counter 发布消息总数
mq.publish.duration Histogram 发布延迟 (秒)
mq.consume.total Counter 消费消息总数
mq.handle.duration Histogram 处理耗时 (秒)

标签:topicstatusdriver

错误处理

预定义错误常量:

var (
    ErrClosed              = xerrors.New("mq: client closed")
    ErrInvalidConfig       = xerrors.New("mq: invalid config")
    ErrNotSupported        = xerrors.New("mq: operation not supported by this driver")
    ErrSubscriptionClosed  = xerrors.New("mq: subscription closed")
    ErrPanicRecovered      = xerrors.New("mq: handler panic recovered")
)

能力检查

不同后端能力差异较大,可根据配置时的 Driver 判断:

if cfg.Driver == mq.DriverNATSCore {
    // NATS Core 不支持持久化,不要用于关键业务
}

if cfg.Driver == mq.DriverNATSJetStream {
    // JetStream 支持 Ack/Nak,可配合 WithManualAck 使用
}

配置详情

JetStream 配置
type JetStreamConfig struct {
    AutoCreateStream bool   // 是否自动创建 Stream
    StreamPrefix     string // Stream 名称前缀,默认 "S-"
}
Redis Stream 配置
type RedisStreamConfig struct {
    MaxLen      int64  // Stream 最大长度,0 表示不限制
    Approximate bool   // 是否使用近似裁剪(性能更好)
}

最佳实践

1. 手动确认模式
mq.Subscribe(ctx, "topic", func(msg mq.Message) error {
    if err := process(msg.Data()); err != nil {
        msg.Nak() // 拒绝消息,触发重投
        return err
    }
    msg.Ack() // 确认消息
    return nil
}, mq.WithManualAck())
2. 中间件组合
// 推荐顺序:Recover -> Logging -> Retry -> Handler
handler = mq.Chain(
    mq.WithRecover(logger),
    mq.WithLogging(logger),
    mq.WithRetry(mq.DefaultRetryConfig, logger),
)(businessHandler)
3. 上下文传递
handler := func(msg mq.Message) error {
    ctx := msg.Context() // 获取订阅时的上下文

    // 业务逻辑使用 msg.Context(),而非 Subscribe 时的 ctx
    return processOrder(ctx, msg.Data())
}

测试

# 运行单元测试
go test ./mq/...

# 运行集成测试(需要本地环境)
make up
go test ./mq/... -tags=integration
make down

未来规划

  • Kafka 支持
  • 死信队列实现
  • 延迟消息支持
  • 事务消息(Kafka)

Documentation

Overview

Package mq 提供消息队列组件,支持 NATS Core, JetStream, Redis Stream 等多种后端。

MQ 组件是 Genesis 微服务组件库的消息中间件抽象层,提供统一的发布-订阅语义。 设计原则:

  • 简单优于复杂:核心接口精简,通过 Option 扩展能力
  • 显式优于隐式:不做自动注入,用户完全掌控消息流
  • 可扩展性:Transport 接口设计兼顾未来 Kafka 等重量级 MQ

Index

Constants

View Source
const (
	// MetricPublishTotal 发布消息总数
	MetricPublishTotal = "mq.publish.total"

	// MetricPublishDuration 发布延迟(秒)
	MetricPublishDuration = "mq.publish.duration"

	// MetricConsumeTotal 消费消息总数
	MetricConsumeTotal = "mq.consume.total"

	// MetricHandleDuration 消息处理耗时(秒)
	MetricHandleDuration = "mq.handle.duration"
)

指标名称常量

View Source
const (
	// LabelTopic 主题标签
	LabelTopic = "topic"

	// LabelStatus 状态标签(success/error)
	LabelStatus = "status"

	// LabelDriver 驱动标签
	LabelDriver = "driver"
)

标签名称常量

Variables

View Source
var (
	// ErrClosed MQ 已关闭
	ErrClosed = xerrors.New("mq: client closed")

	// ErrInvalidConfig 配置无效
	ErrInvalidConfig = xerrors.New("mq: invalid config")

	// ErrNotSupported 操作不支持
	ErrNotSupported = xerrors.New("mq: operation not supported by this driver")

	// ErrSubscriptionClosed 订阅已关闭
	ErrSubscriptionClosed = xerrors.New("mq: subscription closed")

	// ErrPanicRecovered Handler panic 已恢复
	ErrPanicRecovered = xerrors.New("mq: handler panic recovered")
)

预定义错误

View Source
var CapabilitiesKafka = Capabilities{
	Persistence:     true,
	ExactlyOnce:     true,
	Nak:             false,
	DeadLetter:      true,
	QueueGroup:      true,
	OrderedDelivery: true,
	BatchConsume:    true,
	DelayedMessage:  false,
}

CapabilitiesKafka Kafka 的能力描述(预留)

View Source
var CapabilitiesNATSCore = Capabilities{
	Persistence:     false,
	ExactlyOnce:     false,
	Nak:             false,
	DeadLetter:      false,
	QueueGroup:      true,
	OrderedDelivery: false,
	BatchConsume:    false,
	DelayedMessage:  false,
}

CapabilitiesNATSCore NATS Core 的能力描述

View Source
var CapabilitiesNATSJetStream = Capabilities{
	Persistence:     true,
	ExactlyOnce:     true,
	Nak:             true,
	DeadLetter:      false,
	QueueGroup:      true,
	OrderedDelivery: true,
	BatchConsume:    true,
	DelayedMessage:  false,
}

CapabilitiesNATSJetStream NATS JetStream 的能力描述

View Source
var CapabilitiesRedisStream = Capabilities{
	Persistence:     true,
	ExactlyOnce:     false,
	Nak:             false,
	DeadLetter:      false,
	QueueGroup:      true,
	OrderedDelivery: true,
	BatchConsume:    true,
	DelayedMessage:  false,
}

CapabilitiesRedisStream Redis Stream 的能力描述

View Source
var DefaultRetryConfig = RetryConfig{
	MaxRetries:     3,
	InitialBackoff: 100 * time.Millisecond,
	MaxBackoff:     5 * time.Second,
	Multiplier:     2.0,
}

DefaultRetryConfig 默认重试配置

Functions

This section is empty.

Types

type Capabilities

type Capabilities struct {
	// Persistence 是否支持消息持久化
	Persistence bool

	// ExactlyOnce 是否支持精确一次语义
	ExactlyOnce bool

	// Nak 是否支持消息拒绝重投
	Nak bool

	// DeadLetter 是否支持死信队列
	DeadLetter bool

	// QueueGroup 是否支持队列组(负载均衡)
	QueueGroup bool

	// OrderedDelivery 是否保证顺序投递
	OrderedDelivery bool

	// BatchConsume 是否支持批量消费
	BatchConsume bool

	// DelayedMessage 是否支持延迟消息
	DelayedMessage bool
}

Capabilities 描述 Transport 支持的能力

不同后端能力差异较大,通过此结构暴露给上层:

  • NATS Core: 最简单,无持久化
  • NATS JetStream: 持久化、重投、死信队列
  • Redis Stream: 持久化、Consumer Group
  • Kafka (未来): 持久化、分区、事务

type Config

type Config struct {
	// Driver 底层驱动类型
	// 必填,可选值:nats_core, nats_jetstream, redis_stream
	Driver Driver `json:"driver" yaml:"driver"`

	// JetStream JetStream 特有配置(仅 DriverNATSJetStream 时生效)
	JetStream *JetStreamConfig `json:"jetstream,omitempty" yaml:"jetstream,omitempty"`

	// RedisStream Redis Stream 特有配置(仅 DriverRedisStream 时生效)
	RedisStream *RedisStreamConfig `json:"redis_stream,omitempty" yaml:"redis_stream,omitempty"`
}

Config MQ 配置

type DeadLetterConfig

type DeadLetterConfig struct {
	// MaxRetries 最大重试次数,超过后进入死信队列
	MaxRetries int

	// Topic 死信队列主题
	Topic string
}

DeadLetterConfig 死信队列配置

注意:当前为预留配置,各驱动暂未实现。 未来实现计划:

  • JetStream: 利用 RedeliveryPolicy + 自定义逻辑
  • Redis Stream: 基于 Pending 列表 + XCLAIM 实现
  • Kafka: 发送到 error topic

type Driver

type Driver string

Driver 驱动类型

const (
	// DriverNATSCore NATS Core 驱动(高性能,无持久化)
	DriverNATSCore Driver = "nats_core"

	// DriverNATSJetStream NATS JetStream 驱动(持久化,支持重投)
	DriverNATSJetStream Driver = "nats_jetstream"

	// DriverRedisStream Redis Stream 驱动(持久化队列)
	DriverRedisStream Driver = "redis_stream"

	// DriverKafka Kafka 驱动(预留,未实现)
	DriverKafka Driver = "kafka"
)

type Handler

type Handler func(msg Message) error

Handler 消息处理函数

设计说明:只接收 Message 参数,通过 msg.Context() 获取上下文, 避免 ctx 和 msg.Context() 同时存在造成的困惑。

返回值:

  • nil: 处理成功,AutoAck 模式下自动确认
  • error: 处理失败,AutoAck 模式下自动 Nak(如后端支持)

重要提示(JetStream 用户必读):

  • JetStream 的 Nak 会触发消息重新投递
  • 如果返回 error 是非暂时性的(如数据格式错误),消息会无限重投
  • 解决方案: 1. 使用 WithManualAck 手动控制 Ack/Nak 2. 对于不可恢复的错误,也调用 Ack() 并记录日志 3. 结合 WithRetry 中间件在应用层重试,失败后仍 Ack

type Headers

type Headers map[string]string

Headers 消息元数据(键值对)

用于传递 trace、业务标签等元信息。 注意:MQ 不做自动注入/提取,业务自行处理。

func (Headers) Clone

func (h Headers) Clone() Headers

Clone 返回 Headers 的深拷贝

func (Headers) Get

func (h Headers) Get(key string) string

Get 获取指定 key 的值,不存在返回空字符串

func (Headers) Set

func (h Headers) Set(key, value string)

Set 设置键值对

type JetStreamConfig

type JetStreamConfig struct {
	// AutoCreateStream 是否自动创建 Stream(如果不存在)
	// 生产环境建议关闭,通过运维手动创建
	AutoCreateStream bool `json:"auto_create_stream" yaml:"auto_create_stream"`

	// StreamPrefix Stream 名称前缀,默认 "S-"
	StreamPrefix string `json:"stream_prefix" yaml:"stream_prefix"`
}

JetStreamConfig JetStream 特有配置

type MQ

type MQ interface {
	// Publish 发布消息到指定主题
	//
	// 参数:
	//   - ctx: 上下文,用于超时控制和取消
	//   - topic: 消息主题(NATS subject / Redis stream key)
	//   - data: 消息体
	//   - opts: 发布选项(Headers 等)
	Publish(ctx context.Context, topic string, data []byte, opts ...PublishOption) error

	// Subscribe 订阅主题并处理消息
	//
	// Handler 签名:func(msg Message) error
	// 通过 msg.Context() 获取上下文,避免参数冗余。
	//
	// 参数:
	//   - ctx: 订阅生命周期上下文,取消时自动停止订阅
	//   - topic: 订阅主题
	//   - handler: 消息处理函数
	//   - opts: 订阅选项(QueueGroup、AutoAck 等)
	Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscription, error)

	// Close 关闭 MQ 客户端
	// 注意:底层连接由 Connector 管理,此方法仅释放 MQ 内部资源
	Close() error
}

MQ 消息队列核心接口

提供统一的发布订阅能力,屏蔽底层实现差异。 支持的后端:NATS Core、NATS JetStream、Redis Stream

func New

func New(cfg *Config, opts ...Option) (MQ, error)

New 创建 MQ 实例

根据 Config.Driver 选择底层 Transport 实现。 必需依赖通过 Option 注入:

  • NATS 系列: WithNATSConnector
  • Redis Stream: WithRedisConnector

示例:

mq, err := mq.New(&mq.Config{
    Driver: mq.DriverNATSJetStream,
}, mq.WithNATSConnector(natsConn), mq.WithLogger(logger))

type Message

type Message interface {
	// Context 获取消息处理上下文
	//
	// 该上下文继承自 Subscribe 调用时的 ctx,可用于:
	//   - 超时控制
	//   - 取消传播
	//   - 传递 trace 信息(需业务自行注入)
	Context() context.Context

	// Topic 获取消息主题
	Topic() string

	// Data 获取消息体(原始字节)
	Data() []byte

	// Headers 获取消息头(返回副本)
	Headers() Headers

	// Ack 确认消息处理成功
	//
	// 不同后端行为:
	//   - NATS Core: 无操作(无持久化语义)
	//   - NATS JetStream: 发送 Ack 到服务端
	//   - Redis Stream: Consumer Group 模式下执行 XACK
	//   - Kafka (未来): 提交 offset
	Ack() error

	// Nak 拒绝消息,请求重投
	//
	// 不同后端行为:
	//   - NATS Core: 无操作
	//   - NATS JetStream: 发送 Nak,触发重投
	//   - Redis Stream: 无原生支持,消息留在 Pending 列表
	//   - Kafka (未来): 不提交 offset,触发 rebalance 后重投
	//
	// 注意:部分后端不支持真正的 Nak,错误处理需结合业务设计。
	Nak() error

	// ID 获取消息唯一标识(可选)
	//
	// 不同后端返回值:
	//   - NATS Core: 空字符串
	//   - NATS JetStream: Stream sequence
	//   - Redis Stream: 消息 ID (如 "1234567890-0")
	ID() string
}

Message 消息接口

封装底层消息细节,提供统一的数据访问和确认机制。 不同后端的 Ack/Nak 行为有差异,详见各方法注释。

type Middleware

type Middleware func(Handler) Handler

Middleware Handler 中间件

中间件模式允许在不修改业务逻辑的情况下增强 Handler 能力。 常见用途:重试、日志、链路追踪、指标等。

func Chain

func Chain(middlewares ...Middleware) Middleware

Chain 将多个中间件串联成一个

执行顺序:第一个中间件最先执行,最后一个最接近原始 Handler。

示例:

handler = mq.Chain(logging, retry, tracing)(handler)
// 执行顺序:logging -> retry -> tracing -> handler

func WithLogging

func WithLogging(logger clog.Logger) Middleware

WithLogging 创建日志中间件

记录每条消息的处理情况,包括:topic、消息 ID、处理耗时、错误信息。

func WithRecover

func WithRecover(logger clog.Logger) Middleware

WithRecover 创建 panic 恢复中间件

捕获 Handler 中的 panic,转换为错误返回,避免整个消费者崩溃。

func WithRetry

func WithRetry(cfg RetryConfig, logger clog.Logger) Middleware

WithRetry 创建重试中间件

在应用层实现重试逻辑(不依赖 MQ 后端的重投机制)。 适用于幂等操作或可安全重试的场景。

注意:

  • 重试发生在单次消息处理内,不影响 MQ 层面的 Ack/Nak
  • 如果所有重试都失败,最终错误会返回给上层(可能触发 Nak)

示例:

handler := mq.WithRetry(mq.DefaultRetryConfig, logger)(myHandler)

type Option

type Option func(*options)

Option MQ 配置选项

func WithLogger

func WithLogger(l clog.Logger) Option

WithLogger 注入日志记录器

func WithMeter

func WithMeter(m metrics.Meter) Option

WithMeter 注入指标收集器

func WithNATSConnector

func WithNATSConnector(conn connector.NATSConnector) Option

WithNATSConnector 注入 NATS 连接器(用于 NATS Core / JetStream)

func WithRedisConnector

func WithRedisConnector(conn connector.RedisConnector) Option

WithRedisConnector 注入 Redis 连接器(用于 Redis Stream)

type PublishOption

type PublishOption func(*publishOptions)

PublishOption 发布选项

func WithHeader

func WithHeader(key, value string) PublishOption

WithHeader 设置单个消息头

func WithHeaders

func WithHeaders(h Headers) PublishOption

WithHeaders 设置消息头

示例:

mq.Publish(ctx, "topic", data, mq.WithHeaders(mq.Headers{
    "trace-id": "abc123",
}))

func WithKey

func WithKey(key string) PublishOption

WithKey 设置消息 Key(用于分区路由)

注意:当前仅预留,NATS/Redis 不使用此选项。 Kafka 场景下用于保证相同 Key 的消息路由到同一分区。

type RedisStreamConfig

type RedisStreamConfig struct {
	// MaxLen Stream 最大长度,0 表示不限制
	// 超过后自动裁剪旧消息
	MaxLen int64 `json:"max_len" yaml:"max_len"`

	// Approximate 是否使用近似裁剪(MAXLEN ~)
	// 开启后性能更好,但长度控制不精确
	Approximate bool `json:"approximate" yaml:"approximate"`
}

RedisStreamConfig Redis Stream 特有配置

type RetryConfig

type RetryConfig struct {
	// MaxRetries 最大重试次数(不含首次执行)
	MaxRetries int

	// InitialBackoff 初始退避时间
	InitialBackoff time.Duration

	// MaxBackoff 最大退避时间
	MaxBackoff time.Duration

	// Multiplier 退避倍数
	Multiplier float64
}

RetryConfig 重试配置

type SubscribeOption

type SubscribeOption func(*subscribeOptions)

SubscribeOption 订阅选项

func WithAutoAck

func WithAutoAck() SubscribeOption

WithAutoAck 开启自动确认(默认行为)

func WithBatchSize

func WithBatchSize(size int) SubscribeOption

WithBatchSize 设置批量拉取大小

影响单次拉取的消息数量,适当增大可提升吞吐量。 默认值:10

func WithBufferSize

func WithBufferSize(size int) SubscribeOption

WithBufferSize 设置内部缓冲区大小

默认值:100

func WithDeadLetter

func WithDeadLetter(maxRetries int, topic string) SubscribeOption

WithDeadLetter 设置死信队列配置

注意:当前为预留配置,各驱动暂未实现。 调用此选项不会报错,但不会生效。

func WithDurable

func WithDurable(name string) SubscribeOption

WithDurable 设置持久化订阅名称

持久化订阅会记录消费进度,重启后继续消费。 仅 JetStream / Redis Stream 有效。

func WithManualAck

func WithManualAck() SubscribeOption

WithManualAck 关闭自动确认

启用后需要在 Handler 中手动调用 msg.Ack() 或 msg.Nak()。 适用于需要精确控制确认时机的场景。

重要说明(JetStream):

  • 调用 msg.Nak() 会触发消息重新投递
  • 对于无法恢复的错误,应该 Ack() 而非 Nak(),避免无限循环
  • 建议配合 WithRetry 中间件在应用层重试

func WithMaxInflight

func WithMaxInflight(n int) SubscribeOption

WithMaxInflight 设置最大在途消息数

限制未确认消息的数量,用于背压控制。 仅 JetStream 有效(对应 MaxAckPending)。

func WithQueueGroup

func WithQueueGroup(name string) SubscribeOption

WithQueueGroup 设置队列组(用于负载均衡)

同一队列组内的消费者竞争消费消息,实现负载均衡。 不同队列组独立消费,实现广播。

对应关系:

  • NATS: Queue Subscribe
  • Redis Stream: Consumer Group
  • Kafka (未来): Consumer Group

type Subscription

type Subscription interface {
	// Unsubscribe 取消订阅
	//
	// 调用后停止接收新消息。
	// 注意:不保证等待当前 Handler 完成,具体行为依赖后端实现。
	Unsubscribe() error

	// Done 返回一个 channel,订阅结束时关闭
	//
	// 可用于等待订阅完全停止:
	//   <-sub.Done()
	//   sub.Unsubscribe()
	Done() <-chan struct{}
}

Subscription 订阅句柄

用于管理订阅的生命周期。

type Transport

type Transport interface {
	// Publish 发布消息
	//
	// 实现要求:
	//   - 支持 Headers 透传
	//   - 错误应包含足够上下文信息
	Publish(ctx context.Context, topic string, data []byte, opts publishOptions) error

	// Subscribe 订阅消息
	//
	// 实现要求:
	//   - 将 subscribeCtx 传递给 Message.Context()
	//   - 根据 opts.AutoAck 决定是否自动确认
	//   - 支持 QueueGroup 负载均衡
	Subscribe(subscribeCtx context.Context, topic string, handler Handler, opts subscribeOptions) (Subscription, error)

	// Close 关闭 Transport
	//
	// 注意:底层连接由 Connector 管理,此方法仅释放 Transport 内部资源。
	Close() error

	// Capabilities 返回该 Transport 支持的能力
	//
	// 用于运行时能力检查,避免用户使用不支持的功能。
	Capabilities() Capabilities
}

Transport 底层传输层接口(内部使用)

定义了 MQ 后端必须实现的核心能力。 设计考量:

  • 接口精简,只包含必需方法
  • 扩展能力通过 Option 传递,不污染接口签名
  • 为未来 Kafka 等重量级 MQ 预留扩展空间

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL