messaging

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

README

Messaging - 统一消息抽象层

设计理念:提供 Watermill 级别的消息传递抽象,而非简单封装某个具体的消息中间件。NSQ、RabbitMQ 只是底层实现的 Adapter。

📚 目录


核心概念

设计目标

Messaging 包的设计目标是提供一个统一的消息传递抽象层,它关注三个核心维度:

  1. 消息中间件初始化

    • 配置管理(Options)
    • 连接建立(Connection)
    • 优雅关停(Shutdown)
  2. 消息中间件使用

    • 发布订阅模型(Publisher/Subscriber)
    • 消息抽象(Message with Metadata)
    • 确认机制(Ack/Nack)
  3. 横切关注点

    • 中间件链(日志、重试、超时、限流...)
    • 路由管理(统一注册和调度)
    • 可观测性(健康检查、指标、追踪)
关键特性
  • 统一抽象:业务代码只依赖接口,不依赖具体实现
  • 开闭原则:通过 Provider 模式轻松扩展新的消息中间件
  • 中间件支持:提供 15+ 种内置中间件,支持自定义扩展
  • 消息增强:UUID、Metadata、Ack/Nack 完整支持
  • 路由器:统一管理消息处理器,支持批量注册
  • 生产就绪:健康检查、优雅关闭、错误恢复

架构设计

六边形架构(端口-适配器模式)
┌───────────────────────────────────────────────────────────┐
│                  应用层(Business Logic)                   │
│                                                             │
│   • 只依赖 messaging.EventBus 接口                          │
│   • 使用 messaging.Message 统一消息模型                      │
│   • 通过配置切换底层实现(NSQ/RabbitMQ)                     │
└──────────────────────┬────────────────────────────────────┘
                       │
                       │ 依赖接口(Port)
                       ↓
┌───────────────────────────────────────────────────────────┐
│                messaging 包(端口层 - Port)                │
│                                                             │
│  核心抽象:                                                  │
│  ┌─────────────────────────────────────────────────────┐  │
│  │ • EventBus      - 事件总线接口                       │  │
│  │ • Publisher     - 发布者接口                         │  │
│  │ • Subscriber    - 订阅者接口                         │  │
│  │ • Message       - 消息模型(UUID/Metadata/Payload)  │  │
│  │ • Handler       - 消息处理函数                       │  │
│  │ • Middleware    - 中间件函数                         │  │
│  │ • Router        - 路由器                            │  │
│  └─────────────────────────────────────────────────────┘  │
│                                                             │
│  工厂模式:                                                  │
│  • Provider        - 提供者枚举(NSQ/RabbitMQ)           │
│  • RegisterProvider - 自动注册机制                        │
│  • NewEventBus     - 统一创建入口                         │
└──────────────────────┬────────────────────────────────────┘
                       │
          ┌────────────┴─────────────┐
          │                          │
          ↓                          ↓
┌─────────────────────┐    ┌─────────────────────┐
│  nsq 包(适配器)     │    │ rabbitmq 包(适配器) │
│                     │    │                     │
│  • Publisher        │    │  • Publisher        │
│  • Subscriber       │    │  • Subscriber       │
│  • EventBus         │    │  • EventBus         │
│                     │    │                     │
│  实现细节:          │    │  实现细节:          │
│  • NSQ 协议封装     │    │  • AMQP 协议封装    │
│  • 消息转换         │    │  • Exchange/Queue   │
│  • 自动重连         │    │  • 持久化配置       │
└─────────────────────┘    └─────────────────────┘
消息流转
发布流程:
Publisher.Publish() → Adapter 转换 → NSQ/RabbitMQ → 网络传输

订阅流程:
网络接收 → NSQ/RabbitMQ → Adapter 转换 → Middleware 链 → Handler
                                              ↓
                                         Ack/Nack

快速开始

安装
go get github.com/FangcunMount/component-base/pkg/messaging
5 分钟上手
package main

import (
    "context"
    "log"
    
    "github.com/FangcunMount/component-base/pkg/messaging"
    _ "github.com/FangcunMount/component-base/pkg/messaging/nsq"
)

func main() {
    // 1. 创建配置
    config := messaging.DefaultConfig()
    
    // 2. 创建事件总线
    bus, err := messaging.NewEventBus(config)
    if err != nil {
        log.Fatal(err)
    }
    defer bus.Close()
    
    // 3. 订阅消息
    bus.Subscriber().Subscribe("user.created", "email-service", 
        func(ctx context.Context, msg *messaging.Message) error {
            log.Printf("收到消息: %s", string(msg.Payload))
            return msg.Ack() // 确认消息
        })
    
    // 4. 发布消息
    bus.Publisher().Publish(context.Background(), 
        "user.created", []byte(`{"user_id": 123}`))
    
    select {} // 保持运行
}
切换到 RabbitMQ

只需修改一行配置:

config := messaging.DefaultConfig()
config.Provider = messaging.ProviderRabbitMQ  // 切换到 RabbitMQ
config.RabbitMQ.URL = "amqp://guest:guest@localhost:5672/"

核心组件

1. Message(消息模型)

设计理念:参考 Watermill,提供完整的消息抽象。

type Message struct {
    // 核心字段
    UUID     string            // 全局唯一标识
    Payload  []byte            // 消息负载
    Metadata map[string]string // 元数据(链路追踪、业务标识)
    
    // 运行时字段
    Attempts  uint16    // 重试次数
    Timestamp int64     // 时间戳
    Topic     string    // 主题
    Channel   string    // 通道
}

核心方法

// 创建消息
msg := messaging.NewMessage("uuid-123", payload)
msg.Metadata["trace_id"] = "trace-abc"
msg.Metadata["user_id"] = "1001"

// 确认消息(处理成功)
msg.Ack()

// 拒绝消息(触发重试)
msg.Nack()

为什么需要 Metadata?

  • 链路追踪:传递 trace_id、span_id
  • 业务标识:传递 user_id、tenant_id
  • 消息路由:传递 priority、group
  • 调试信息:传递 source、version
2. Publisher(发布者)
type Publisher interface {
    // 发布字节数组(快速)
    Publish(ctx context.Context, topic string, body []byte) error
    
    // 发布消息对象(支持 Metadata)
    PublishMessage(ctx context.Context, topic string, msg *Message) error
    
    // 关闭发布者
    Close() error
}

使用示例

// 方式 1:快速发布
publisher.Publish(ctx, "user.created", []byte(`{"id": 123}`))

// 方式 2:带 Metadata 发布
msg := messaging.NewMessage("", []byte(`{"id": 123}`))
msg.Metadata["trace_id"] = "trace-123"
publisher.PublishMessage(ctx, "user.created", msg)
3. Subscriber(订阅者)
type Subscriber interface {
    // 订阅消息
    Subscribe(topic, channel string, handler Handler) error
    
    // 订阅消息(支持中间件)
    SubscribeWithMiddleware(topic, channel string, 
        handler Handler, middlewares ...Middleware) error
    
    // 停止订阅
    Stop()
    
    // 关闭订阅者
    Close() error
}

Topic vs Channel

  • Topic:消息主题(如 user.created
  • Channel:消费者分组
    • 相同 channel:负载均衡(任务队列模式)
    • 不同 channel:广播(事件驱动模式)
// 事件驱动:每个服务使用不同的 channel
subscriber.Subscribe("user.created", "email-service", emailHandler)
subscriber.Subscribe("user.created", "stat-service", statHandler)
// → 每条消息都会被两个服务接收

// 任务队列:多个 worker 使用相同的 channel
subscriber.Subscribe("email.send", "email-workers", handler1)
subscriber.Subscribe("email.send", "email-workers", handler2)
// → 每条消息只会被一个 worker 接收
4. Router(路由器)

设计理念:统一管理消息处理器,支持中间件链。

// 创建路由器
router := bus.Router()

// 添加全局中间件(应用到所有处理器)
router.AddMiddleware(messaging.LoggerMiddleware(logger))
router.AddMiddleware(messaging.RecoverMiddleware(logger))

// 注册处理器(不带中间件)
router.AddHandler("user.created", "email-service", emailHandler)

// 注册处理器(带局部中间件)
router.AddHandlerWithMiddleware(
    "order.payment", 
    "payment-service", 
    paymentHandler,
    messaging.RetryMiddleware(3, time.Second),
    messaging.TimeoutMiddleware(5 * time.Second),
)

// 启动路由器(批量订阅)
ctx, cancel := context.WithCancel(context.Background())
go router.Run(ctx)

// 优雅关闭
router.Stop()
5. EventBus(事件总线)

设计理念:组合 Publisher、Subscriber、Router,提供完整的消息总线功能。

type EventBus interface {
    Publisher() Publisher      // 获取发布者
    Subscriber() Subscriber    // 获取订阅者
    Router() *Router          // 获取路由器
    Health() error            // 健康检查
    Close() error             // 关闭总线
}

中间件系统

设计理念

中间件是处理横切关注点的标准方式,采用洋葱模型

Request → MW1 → MW2 → MW3 → Handler → MW3 → MW2 → MW1 → Response
          ↓     ↓     ↓       ↓       ↑     ↑     ↑
        日志   重试  超时    业务    超时  重试  日志
中间件类型
type Middleware func(Handler) Handler
内置中间件(15 种)
1. 可靠性中间件
中间件 功能 使用场景
RetryMiddleware 自动重试(指数退避) 网络抖动、临时故障
TimeoutMiddleware 超时控制 防止处理时间过长
RecoverMiddleware Panic 恢复 防止单个消息崩溃整个服务
CircuitBreakerMiddleware 熔断器 防止级联故障
// 示例:组合可靠性中间件
router.AddHandlerWithMiddleware(
    "order.payment",
    "payment-service",
    handler,
    messaging.RecoverMiddleware(logger),        // 最外层:捕获 panic
    messaging.RetryMiddleware(3, time.Second),  // 重试 3 次
    messaging.TimeoutMiddleware(10*time.Second), // 超时 10 秒
)
2. 流量控制中间件
中间件 功能 使用场景
RateLimitMiddleware 限流(令牌桶) 防止系统过载
BatchMiddleware 批处理 提高吞吐量
FilterMiddleware 条件过滤 选择性处理消息
PriorityMiddleware 优先级排序 VIP 消息优先处理
// 示例:限流(每秒 100 个请求)
limiter := messaging.NewTokenBucketLimiter(100, 10*time.Millisecond)
router.AddMiddleware(messaging.RateLimitMiddleware(limiter, "drop"))

// 示例:过滤高价值订单
filterMW := messaging.FilterMiddleware(func(msg *messaging.Message) bool {
    var order Order
    json.Unmarshal(msg.Payload, &order)
    return order.Amount > 1000 // 只处理金额 > 1000 的订单
})
3. 可观测性中间件
中间件 功能 使用场景
LoggerMiddleware 日志记录 调试、审计
MetricsMiddleware 指标收集 监控、告警
TracingMiddleware 链路追踪 分布式追踪
AuditMiddleware 审计日志 合规、安全
// 示例:完整的可观测性栈
router.AddMiddleware(messaging.LoggerMiddleware(logger))
router.AddMiddleware(messaging.TracingMiddleware())
router.AddMiddleware(messaging.MetricsMiddleware(metricsCollector))
4. 数据处理中间件
中间件 功能 使用场景
DeduplicationMiddleware 消息去重 防止重复处理
TransformMiddleware 消息转换 数据格式转换
ValidationMiddleware 消息校验 数据合法性检查
自定义中间件
// 示例:自定义认证中间件
func AuthMiddleware(authService AuthService) messaging.Middleware {
    return func(next messaging.Handler) messaging.Handler {
        return func(ctx context.Context, msg *messaging.Message) error {
            // 从 Metadata 提取 token
            token := msg.Metadata["auth_token"]
            
            // 验证 token
            user, err := authService.ValidateToken(token)
            if err != nil {
                return fmt.Errorf("认证失败: %w", err)
            }
            
            // 将用户信息注入 context
            ctx = context.WithValue(ctx, "user", user)
            
            // 继续处理
            return next(ctx, msg)
        }
    }
}

配置指南

统一配置结构
type Config struct {
    Provider Provider    // nsq | rabbitmq
    NSQ      NSQConfig
    RabbitMQ RabbitMQConfig
}
NSQ 配置
config := &messaging.Config{
    Provider: messaging.ProviderNSQ,
    NSQ: messaging.NSQConfig{
        LookupdAddrs: []string{"127.0.0.1:4161"},
        NSQdAddr:     "127.0.0.1:4150",
        MaxAttempts:  5,           // 最大重试次数
        MaxInFlight:  200,         // 并发处理数
        MsgTimeout:   time.Minute, // 消息超时
    },
}

// 提示:订阅时会按 MaxInFlight 启动同等数量的并发处理协程,
// 确保该值与预期的 consumer 并发度匹配。
RabbitMQ 配置
config := &messaging.Config{
    Provider: messaging.ProviderRabbitMQ,
    RabbitMQ: messaging.RabbitMQConfig{
        URL:               "amqp://guest:guest@localhost:5672/",
        PrefetchCount:     200,   // QoS
        Durable:           true,  // 持久化
        PersistentMessages: true, // 消息持久化
        AutoReconnect:     true,  // 自动重连
    },
}
默认配置
config := messaging.DefaultConfig() // 使用 NSQ 默认配置

最佳实践

1. 消息设计

✅ 推荐

// 使用结构化的消息体
type UserCreatedEvent struct {
    UserID    int64     `json:"user_id"`
    Email     string    `json:"email"`
    CreatedAt time.Time `json:"created_at"`
}

// 发布时序列化
data, _ := json.Marshal(event)
msg := messaging.NewMessage("", data)
msg.Metadata["event_type"] = "user.created"
msg.Metadata["version"] = "v1"
publisher.PublishMessage(ctx, "user.created", msg)

❌ 不推荐

// 不要在消息中包含大量数据
// 不要使用二进制格式(除非必要)
// 不要在 Metadata 中放敏感信息
2. 错误处理
func handler(ctx context.Context, msg *messaging.Message) error {
    // 可重试错误:返回 error,触发重试
    if err := processMessage(msg); err != nil {
        return fmt.Errorf("处理失败: %w", err)
    }
    
    // 不可重试错误:记录日志,返回 nil
    if err := validateMessage(msg); err != nil {
        log.Printf("消息格式错误,跳过: %v", err)
        return nil // 不重试
    }
    
    // 成功:确认消息
    return msg.Ack()
}
3. 中间件顺序

推荐顺序(从外到内):

router.AddHandlerWithMiddleware(
    "order.payment",
    "payment-service",
    handler,
    messaging.RecoverMiddleware(logger),     // 1. 最外层:捕获 panic
    messaging.LoggerMiddleware(logger),      // 2. 日志
    messaging.TracingMiddleware(),           // 3. 链路追踪
    messaging.TimeoutMiddleware(30*time.Second), // 4. 超时控制
    messaging.RetryMiddleware(3, time.Second),   // 5. 重试
    messaging.DeduplicationMiddleware(store, time.Hour), // 6. 去重
)
4. 性能优化
// 1. 调整并发数
config.NSQ.MaxInFlight = 500 // 根据 CPU 核心数调整

// 2. 使用批量发布
bodies := [][]byte{data1, data2, data3}
publisher.(*nsq.Publisher).MultiPublish(ctx, "topic", bodies)

// 3. 启用限流(防止突发流量)
limiter := messaging.NewTokenBucketLimiter(1000, time.Millisecond)
router.AddMiddleware(messaging.RateLimitMiddleware(limiter, "wait"))

示例代码

示例 1:事件驱动架构
// 场景:用户注册后,通知多个服务
publisher.Publish(ctx, "user.created", userData)

// 邮件服务(独立 channel)
subscriber.Subscribe("user.created", "email-service", emailHandler)

// 统计服务(独立 channel)
subscriber.Subscribe("user.created", "stat-service", statHandler)

// 审计服务(独立 channel)
subscriber.Subscribe("user.created", "audit-service", auditHandler)
示例 2:任务队列
// 场景:10 个 worker 处理邮件发送任务
for i := 1; i <= 10; i++ {
    go func(workerID int) {
        // 所有 worker 使用相同 channel
        subscriber.Subscribe("email.send", "email-workers", 
            func(ctx context.Context, msg *messaging.Message) error {
                log.Printf("Worker %d 处理邮件", workerID)
                return sendEmail(msg)
            })
    }(i)
}
示例 3:中间件组合
// 场景:支付服务需要高可靠性
router := bus.Router()

// 全局中间件
router.AddMiddleware(messaging.RecoverMiddleware(logger))
router.AddMiddleware(messaging.LoggerMiddleware(logger))

// 局部中间件(只用于支付)
breaker := messaging.NewSimpleCircuitBreaker(5, 30*time.Second)
router.AddHandlerWithMiddleware(
    "order.payment",
    "payment-service",
    paymentHandler,
    messaging.CircuitBreakerMiddleware(breaker),
    messaging.RetryMiddleware(3, 2*time.Second),
    messaging.TimeoutMiddleware(15*time.Second),
)

router.Run(ctx)
示例 4:链路追踪
// 发布时注入 trace_id
msg := messaging.NewMessage("", payload)
msg.Metadata["trace_id"] = "trace-" + uuid.New().String()
msg.Metadata["span_id"] = "span-" + uuid.New().String()
publisher.PublishMessage(ctx, "user.created", msg)

// 消费时提取 trace_id
handler := func(ctx context.Context, msg *messaging.Message) error {
    traceID := msg.Metadata["trace_id"]
    spanID := msg.Metadata["span_id"]
    
    log.Printf("处理消息 [trace=%s, span=%s]", traceID, spanID)
    
    // 继续传播 trace_id
    nextMsg := messaging.NewMessage("", nextPayload)
    nextMsg.Metadata["trace_id"] = traceID
    nextMsg.Metadata["parent_span_id"] = spanID
    nextMsg.Metadata["span_id"] = "span-" + uuid.New().String()
    
    return nil
}

进阶主题

Provider 扩展

如何添加新的消息中间件(如 Kafka):

// 1. 实现 Publisher、Subscriber、EventBus 接口
// 2. 在 init 函数中注册
func init() {
    messaging.RegisterProvider(messaging.ProviderKafka, NewEventBusFromConfig)
}

// 3. 业务代码无需修改,只需切换配置
config.Provider = messaging.ProviderKafka
健康检查集成
// HTTP 健康检查接口
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
    if err := bus.Health(); err != nil {
        w.WriteHeader(http.StatusServiceUnavailable)
        json.NewEncoder(w).Encode(map[string]string{
            "status": "unhealthy",
            "error":  err.Error(),
        })
        return
    }
    
    w.WriteHeader(http.StatusOK)
    json.NewEncoder(w).Encode(map[string]string{
        "status": "healthy",
    })
})
优雅关闭
func main() {
    bus, _ := messaging.NewEventBus(config)
    defer bus.Close()
    
    // 监听退出信号
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    
    router := bus.Router()
    // ... 注册处理器
    
    ctx, cancel := context.WithCancel(context.Background())
    go router.Run(ctx)
    
    // 等待退出信号
    <-sigChan
    
    log.Println("正在优雅退出...")
    
    // 1. 停止接收新消息
    cancel()
    router.Stop()
    
    // 2. 等待正在处理的消息完成(最多 30 秒)
    time.Sleep(30 * time.Second)
    
    // 3. 关闭连接
    bus.Close()
    
    log.Println("退出完成")
}

常见问题

Q1: NSQ 和 RabbitMQ 如何选择?
特性 NSQ RabbitMQ
部署复杂度 ⭐⭐ 简单 ⭐⭐⭐ 中等
性能 ⭐⭐⭐⭐⭐ 极高 ⭐⭐⭐⭐ 高
功能丰富度 ⭐⭐⭐ 基础 ⭐⭐⭐⭐⭐ 丰富
消息持久化 ⭐⭐⭐ 有限 ⭐⭐⭐⭐⭐ 强大
适用场景 高吞吐、简单队列 复杂路由、企业级

推荐

  • 开发环境 / 简单场景:NSQ
  • 生产环境 / 复杂需求:RabbitMQ
Q2: 消息会丢失吗?

NSQ

  • 默认内存队列,重启会丢失
  • 可配置 --mem-queue-size=0 强制磁盘持久化

RabbitMQ

  • 设置 Durable: true + PersistentMessages: true 保证持久化
  • 需要手动 Ack 确认
Q3: 如何保证消息顺序?

方案 1:单 Worker(降低并发)

config.NSQ.MaxInFlight = 1 // 一次只处理一条

方案 2:分区(按 key 路由)

// 同一个 user_id 的消息发送到同一个队列
topic := fmt.Sprintf("user.%d.events", userID)
Q4: 如何处理毒消息(Poison Message)?
func handler(ctx context.Context, msg *messaging.Message) error {
    // 检查重试次数
    if msg.Attempts > 5 {
        // 发送到死信队列
        dlq.Publish(ctx, "dlq.user.created", msg.Payload)
        return nil // 不再重试
    }
    
    // 继续处理
    return processMessage(msg)
}

附录

A. 完整 API 参考

查看源码注释:

  • port.go - 核心接口定义
  • middleware.go - 所有中间件
  • router.go - 路由器实现
  • config.go - 配置结构
B. 示例代码目录
example/
├── simple/              # 基础发布订阅
├── event-driven/        # 事件驱动架构
├── task-queue/          # 任务队列模式
├── middleware/          # 中间件基础使用
├── advanced-middleware/ # 高级中间件(限流、熔断)
├── unified/             # Provider 切换演示
├── semantic/            # 语义化辅助函数
└── rabbitmq/            # RabbitMQ 特定功能
C. 性能基准
# NSQ
吞吐量:100,000 msg/s(单机)
延迟:P99 < 10ms

# RabbitMQ
吞吐量:50,000 msg/s(单机)
延迟:P99 < 50ms

贡献

欢迎提交 Issue 和 Pull Request!

许可

MIT License

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ParseJSON

func ParseJSON(msg *Message, v interface{}) error

ParseJSON 解析 JSON 消息 将消息体解析为指定类型

func PublishEvent

func PublishEvent(ctx context.Context, publisher Publisher, event string, body []byte) error

PublishEvent 发布事件(语义化别名) 用于事件驱动模式,让代码意图更明确

使用示例:

messaging.PublishEvent(ctx, publisher, "user.created", data)

func PublishJSON

func PublishJSON(ctx context.Context, publisher Publisher, topic string, data interface{}) error

PublishJSON 发布 JSON 消息 将对象序列化为 JSON 后发布

func PublishTask

func PublishTask(ctx context.Context, publisher Publisher, taskType string, body []byte) error

PublishTask 发布任务(语义化别名) 用于任务队列模式,让代码意图更明确

使用示例:

messaging.PublishTask(ctx, publisher, "email.send", task)

func RegisterProvider

func RegisterProvider(provider Provider, factory EventBusFactory)

RegisterProvider 注册消息中间件提供者 这个函数由各个实现包(nsq、rabbitmq)在 init 函数中调用

示例:

func init() {
    messaging.RegisterProvider(messaging.ProviderNSQ, NewEventBusFromConfig)
}

func SubscribeEvent

func SubscribeEvent(subscriber Subscriber, event string, serviceName string, handler Handler) error

SubscribeEvent 订阅事件(事件驱动模式) 每个服务应该使用不同的 channel,以便接收所有事件

使用示例:

// 每个服务使用唯一的 channel
messaging.SubscribeEvent(subscriber, "user.created", "email-service", emailHandler)
messaging.SubscribeEvent(subscriber, "user.created", "stat-service", statHandler)

func SubscribeTask

func SubscribeTask(subscriber Subscriber, taskType string, workerGroup string, handler Handler) error

SubscribeTask 订阅任务(任务队列模式) 同一类型的 worker 应该使用相同的 workerGroup,以实现负载均衡

使用示例:

// 多个 worker 使用相同的 workerGroup
messaging.SubscribeTask(subscriber, "email.send", "email-workers", handler)
messaging.SubscribeTask(subscriber, "email.send", "email-workers", handler)

Types

type AuditLogger

type AuditLogger interface {
	// Log 记录审计日志
	Log(event string, msg *Message, err error)
}

AuditLogger 审计日志接口

type BatchProcessor

type BatchProcessor interface {
	// Add 添加消息到批次
	Add(msg *Message) error

	// Flush 刷新批次(强制处理)
	Flush() error
}

BatchProcessor 批处理器接口

type CircuitBreaker

type CircuitBreaker interface {
	// Call 执行调用(带熔断保护)
	Call(fn func() error) error

	// State 获取当前状态(closed, open, half-open)
	State() string
}

CircuitBreaker 熔断器接口

type Config

type Config struct {
	// Provider 消息中间件提供者类型(nsq, rabbitmq)
	Provider Provider `json:"provider" yaml:"provider"`

	// NSQ 配置
	NSQ NSQConfig `json:"nsq" yaml:"nsq"`

	// RabbitMQ 配置
	RabbitMQ RabbitMQConfig `json:"rabbitmq" yaml:"rabbitmq"`
}

Config 事件总线配置

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig 返回默认配置

type DeduplicationStore

type DeduplicationStore interface {
	// Exists 检查消息是否已处理
	Exists(uuid string) bool

	// Mark 标记消息已处理
	Mark(uuid string, ttl time.Duration) error
}

DeduplicationStore 去重存储接口

type EventBus

type EventBus interface {
	// Publisher 获取发布者
	Publisher() Publisher

	// Subscriber 获取订阅者
	Subscriber() Subscriber

	// Router 获取路由器(用于批量注册处理器)
	Router() *Router

	// Health 健康检查(检查连接状态)
	Health() error

	// Close 关闭事件总线(释放所有资源)
	Close() error
}

EventBus 事件总线接口 组合了发布者和订阅者,提供完整的消息总线功能

func MustNewEventBus

func MustNewEventBus(config *Config) EventBus

MustNewEventBus 创建事件总线,失败则 panic 适用于启动阶段,配置错误时快速失败

func NewEventBus

func NewEventBus(config *Config) (EventBus, error)

NewEventBus 创建事件总线 这是对外暴露的统一接口,根据配置中的 Provider 选择具体实现

使用示例:

config := &messaging.Config{
    Provider: messaging.ProviderNSQ,
    NSQ: messaging.NSQConfig{...},
}
bus, err := messaging.NewEventBus(config)

type EventBusFactory

type EventBusFactory func(config *Config) (EventBus, error)

EventBusFactory 事件总线工厂函数 接收配置,返回 EventBus 实例

type Handler

type Handler func(ctx context.Context, msg *Message) error

Handler 消息处理函数 业务层通过实现此函数来处理接收到的消息

func NewJSONHandler

func NewJSONHandler(fn func(ctx context.Context, data interface{}) error, dataType interface{}) Handler

NewJSONHandler 创建 JSON 处理器 自动解析 JSON 消息并调用业务处理函数

func NewSimpleHandler

func NewSimpleHandler(fn func([]byte) error) Handler

NewSimpleHandler 创建简单的处理器 只关心消息体,不关心其他元数据

func WrapHandler

func WrapHandler(oldHandler func(topic string, data []byte) error) Handler

WrapHandler 包装旧版本的 handler 用于兼容旧的 func(topic string, data []byte) error 签名

type Message

type Message struct {
	// UUID 全局唯一标识(每条消息都有唯一 ID)
	UUID string

	// Metadata 元数据(用于链路追踪、业务标识等)
	// 例如:trace_id, span_id, user_id, request_id
	Metadata map[string]string

	// Payload 消息负载(实际业务数据)
	Payload []byte

	// Attempts 消息重试次数
	Attempts uint16

	// Timestamp 消息时间戳(纳秒)
	Timestamp int64

	// Topic 消息主题
	Topic string

	// Channel 消息通道
	Channel string
	// contains filtered or unexported fields
}

Message 领域消息结构 参考 Watermill 设计,支持 UUID、Metadata、Ack/Nack

func NewMessage

func NewMessage(uuid string, payload []byte) *Message

NewMessage 创建新消息

func (*Message) Ack

func (m *Message) Ack() error

Ack 确认消息处理成功 调用后,消息不会被重新投递

func (*Message) Body

func (m *Message) Body() []byte

Body 向后兼容的别名(返回 Payload) Deprecated: 使用 Payload 字段代替

func (*Message) ID

func (m *Message) ID() string

ID 向后兼容的别名(返回 UUID) Deprecated: 使用 UUID 字段代替

func (*Message) Nack

func (m *Message) Nack() error

Nack 拒绝消息,触发重试 调用后,消息会被重新投递(如果未超过最大重试次数)

func (*Message) SetAckFunc

func (m *Message) SetAckFunc(ack func() error)

SetAckFunc 设置确认函数(由 Adapter 调用)

func (*Message) SetNackFunc

func (m *Message) SetNackFunc(nack func() error)

SetNackFunc 设置拒绝函数(由 Adapter 调用)

type MetricsCollector

type MetricsCollector interface {
	// IncrementProcessed 增加处理消息计数
	IncrementProcessed(topic string)

	// IncrementFailed 增加失败消息计数
	IncrementFailed(topic string)

	// RecordDuration 记录处理耗时
	RecordDuration(topic string, duration time.Duration)
}

MetricsMiddleware 指标中间件 收集消息处理的统计信息

type Middleware

type Middleware func(Handler) Handler

Middleware 中间件函数 用于在消息处理前后执行额外逻辑(日志、重试、超时、监控等)

func AuditMiddleware

func AuditMiddleware(logger AuditLogger) Middleware

AuditMiddleware 审计中间件 记录消息处理的完整审计日志

func BatchMiddleware

func BatchMiddleware(batchSize int, flushInterval time.Duration, batchHandler func([]*Message) error) Middleware

BatchMiddleware 批处理中间件 将多个消息合并处理,提高吞吐量 batchSize: 批次大小 flushInterval: 刷新间隔

func CircuitBreakerMiddleware

func CircuitBreakerMiddleware(breaker CircuitBreaker) Middleware

CircuitBreakerMiddleware 熔断器中间件 防止级联故障,当错误率超过阈值时自动熔断

func DeduplicationMiddleware

func DeduplicationMiddleware(store DeduplicationStore, ttl time.Duration) Middleware

DeduplicationMiddleware 创建去重中间件 防止重复处理相同的消息

func FilterMiddleware

func FilterMiddleware(predicate func(*Message) bool) Middleware

FilterMiddleware 条件过滤中间件 根据条件决定是否处理消息

func LoggerMiddleware

func LoggerMiddleware(logger *log.Logger) Middleware

LoggerMiddleware 日志中间件 记录消息处理的开始、结束、耗时和错误

func MetricsMiddleware

func MetricsMiddleware(collector MetricsCollector) Middleware

MetricsMiddleware 创建指标中间件

func PriorityMiddleware

func PriorityMiddleware(getPriority func(*Message) int) Middleware

PriorityMiddleware 优先级中间件 根据消息优先级排序处理

func RateLimitMiddleware

func RateLimitMiddleware(limiter RateLimiter, mode string) Middleware

RateLimitMiddleware 限流中间件 限制消息处理速率,防止系统过载 mode: "drop" 丢弃超限消息,"wait" 等待直到允许处理

func RecoverMiddleware

func RecoverMiddleware(logger *log.Logger) Middleware

RecoverMiddleware 恢复中间件 捕获 panic,防止单个消息处理失败导致整个程序崩溃

func RetryMiddleware

func RetryMiddleware(maxRetries int, delay time.Duration) Middleware

RetryMiddleware 重试中间件 在消息处理失败时自动重试 maxRetries: 最大重试次数 delay: 每次重试的延迟时间(指数退避)

func TimeoutMiddleware

func TimeoutMiddleware(timeout time.Duration) Middleware

TimeoutMiddleware 超时中间件 限制消息处理的最大时间 timeout: 超时时间

func TracingMiddleware

func TracingMiddleware() Middleware

TracingMiddleware 链路追踪中间件 自动注入和提取 trace_id、span_id

type NSQConfig

type NSQConfig struct {
	// NSQLookupd 地址列表
	LookupdAddrs []string `json:"lookupd_addrs" yaml:"lookupd_addrs"`

	// NSQd 地址(用于发布)
	NSQdAddr string `json:"nsqd_addr" yaml:"nsqd_addr"`

	// 最大消息重试次数
	MaxAttempts uint16 `json:"max_attempts" yaml:"max_attempts"`

	// 最大消息处理时间
	MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"`

	// 消息超时时间
	MsgTimeout time.Duration `json:"msg_timeout" yaml:"msg_timeout"`

	// 重新入队延迟
	RequeueDelay time.Duration `json:"requeue_delay" yaml:"requeue_delay"`

	// 拨号超时时间
	DialTimeout time.Duration `json:"dial_timeout" yaml:"dial_timeout"`

	// 读超时时间
	ReadTimeout time.Duration `json:"read_timeout" yaml:"read_timeout"`

	// 写超时时间
	WriteTimeout time.Duration `json:"write_timeout" yaml:"write_timeout"`
}

NSQConfig NSQ 配置

func DefaultNSQConfig

func DefaultNSQConfig() NSQConfig

DefaultNSQConfig 返回默认 NSQ 配置

type Provider

type Provider string

Provider 消息中间件提供者类型

const (
	// ProviderNSQ NSQ 消息队列
	ProviderNSQ Provider = "nsq"

	// ProviderRabbitMQ RabbitMQ 消息队列
	ProviderRabbitMQ Provider = "rabbitmq"
)

func GetRegisteredProviders

func GetRegisteredProviders() []Provider

GetRegisteredProviders 获取已注册的提供者列表

type Publisher

type Publisher interface {
	// Publish 发布消息到指定主题
	// topic: 主题名称
	// body: 消息体(字节数组)
	Publish(ctx context.Context, topic string, body []byte) error

	// PublishMessage 发布消息对象(支持 Metadata)
	PublishMessage(ctx context.Context, topic string, msg *Message) error

	// Close 关闭发布者,释放资源
	Close() error
}

Publisher 消息发布者接口 领域层/应用层眼中的发布接口,与具体消息中间件解耦

type RabbitMQConfig

type RabbitMQConfig struct {
	// RabbitMQ 连接 URL(推荐方式)
	// 格式:amqp://username:password@host:port/vhost
	// 例如:amqp://guest:guest@localhost:5672/
	// 注意:如果设置了 URL,下面的独立配置项会被忽略
	URL string `json:"url" yaml:"url"`

	// Host 主机地址
	Host string `json:"host" yaml:"host"`

	// Port 端口号(默认 5672)
	Port int `json:"port" yaml:"port"`

	// Username 用户名(默认 guest)
	Username string `json:"username" yaml:"username"`

	// Password 密码(默认 guest)
	Password string `json:"password" yaml:"password"`

	// VHost 虚拟主机(默认 /)
	// VHost 相当于命名空间,用于隔离不同应用的消息
	VHost string `json:"vhost" yaml:"vhost"`

	// MaxChannels 最大 channel 数量(默认 100)
	// Channel 是轻量级的连接,用于发送和接收消息
	MaxChannels int `json:"max_channels" yaml:"max_channels"`

	// PrefetchCount 预取数量(默认 200)
	// 消费者一次最多预取多少条未确认的消息
	// 值越大,吞吐量越高,但内存占用也越大
	PrefetchCount int `json:"prefetch_count" yaml:"prefetch_count"`

	// PrefetchSize 预取大小(默认 0,不限制)
	// 消费者一次最多预取多少字节的未确认消息
	PrefetchSize int `json:"prefetch_size" yaml:"prefetch_size"`

	// Durable 是否持久化 exchange 和 queue(默认 true)
	// true: RabbitMQ 重启后,exchange 和 queue 不会丢失
	Durable bool `json:"durable" yaml:"durable"`

	// PersistentMessages 消息是否持久化(默认 true)
	// true: 消息会写入磁盘,RabbitMQ 重启后消息不会丢失
	PersistentMessages bool `json:"persistent_messages" yaml:"persistent_messages"`

	// ConnectionTimeout 连接超时时间(默认 10s)
	ConnectionTimeout time.Duration `json:"connection_timeout" yaml:"connection_timeout"`

	// HeartbeatInterval 心跳间隔(默认 10s)
	// 用于检测连接是否存活
	HeartbeatInterval time.Duration `json:"heartbeat_interval" yaml:"heartbeat_interval"`

	// AutoReconnect 是否自动重连(默认 true)
	AutoReconnect bool `json:"auto_reconnect" yaml:"auto_reconnect"`

	// ReconnectDelay 重连延迟(默认 5s)
	ReconnectDelay time.Duration `json:"reconnect_delay" yaml:"reconnect_delay"`

	// MaxReconnectAttempts 最大重连次数(默认 0,无限重试)
	MaxReconnectAttempts int `json:"max_reconnect_attempts" yaml:"max_reconnect_attempts"`

	// ExchangeType Exchange 类型(默认 fanout)
	// fanout: 广播,发送给所有绑定的队列
	// direct: 直接路由,根据 routing key 精确匹配
	// topic: 主题路由,支持通配符(* 和 #)
	// headers: 根据消息头路由
	ExchangeType string `json:"exchange_type" yaml:"exchange_type"`

	// AutoDelete 是否自动删除(默认 false)
	// true: 当没有消费者时,自动删除 exchange 和 queue
	AutoDelete bool `json:"auto_delete" yaml:"auto_delete"`

	// Exclusive 是否独占(默认 false)
	// true: 队列只能被当前连接使用,连接断开后队列自动删除
	Exclusive bool `json:"exclusive" yaml:"exclusive"`
}

RabbitMQConfig RabbitMQ 配置

func DefaultRabbitMQConfig

func DefaultRabbitMQConfig() RabbitMQConfig

DefaultRabbitMQConfig 返回默认 RabbitMQ 配置

func (*RabbitMQConfig) BuildURL

func (c *RabbitMQConfig) BuildURL() string

BuildURL 构建 RabbitMQ 连接 URL 如果已经设置了 URL,直接返回;否则根据独立配置项构建

type RateLimiter

type RateLimiter interface {
	// Allow 检查是否允许处理
	Allow() bool

	// Wait 等待直到允许处理
	Wait(ctx context.Context) error
}

RateLimiter 限流器接口

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router 消息路由器 用于批量注册消息处理器,支持中间件链

func NewRouter

func NewRouter(subscriber Subscriber) *Router

NewRouter 创建路由器

func (*Router) AddHandler

func (r *Router) AddHandler(topic, channel string, handler Handler)

AddHandler 注册消息处理器 topic: 主题名称 channel: 通道名称 handler: 消息处理函数

func (*Router) AddHandlerWithMiddleware

func (r *Router) AddHandlerWithMiddleware(topic, channel string, handler Handler, middlewares ...Middleware)

AddHandlerWithMiddleware 注册消息处理器(支持中间件) topic: 主题名称 channel: 通道名称 handler: 消息处理函数 middlewares: 中间件列表(按顺序执行)

func (*Router) AddMiddleware

func (r *Router) AddMiddleware(mw Middleware)

AddMiddleware 添加全局中间件 全局中间件会应用到所有处理器

func (*Router) Run

func (r *Router) Run(ctx context.Context) error

Run 启动路由器 将所有注册的处理器订阅到消息中间件

func (*Router) Stop

func (r *Router) Stop()

Stop 停止路由器

type SimpleCircuitBreaker

type SimpleCircuitBreaker struct {
	// contains filtered or unexported fields
}

SimpleCircuitBreaker 简单熔断器实现

func NewSimpleCircuitBreaker

func NewSimpleCircuitBreaker(maxFailures int, timeout time.Duration) *SimpleCircuitBreaker

NewSimpleCircuitBreaker 创建简单熔断器 maxFailures: 最大失败次数(超过则熔断) timeout: 熔断持续时间(之后尝试恢复)

func (*SimpleCircuitBreaker) Call

func (cb *SimpleCircuitBreaker) Call(fn func() error) error

Call 执行调用(带熔断保护)

func (*SimpleCircuitBreaker) State

func (cb *SimpleCircuitBreaker) State() string

State 获取当前状态

type Subscriber

type Subscriber interface {
	// Subscribe 订阅指定主题的消息
	// topic: 主题名称
	// channel: 通道名称(NSQ 中的 channel 概念,用于负载均衡)
	// handler: 消息处理函数
	Subscribe(topic, channel string, handler Handler) error

	// SubscribeWithMiddleware 订阅消息(支持中间件)
	SubscribeWithMiddleware(topic, channel string, handler Handler, middlewares ...Middleware) error

	// Stop 停止订阅
	Stop()

	// Close 关闭订阅者,释放资源
	Close() error
}

Subscriber 消息订阅者接口 领域层/应用层眼中的订阅接口

type TaskPublisher

type TaskPublisher struct {
	// contains filtered or unexported fields
}

TaskPublisher 任务发布者(语义化包装) 这不是新接口,只是对 Publisher 的语义化封装

func NewTaskPublisher

func NewTaskPublisher(publisher Publisher) *TaskPublisher

NewTaskPublisher 创建任务发布者

func (*TaskPublisher) Close

func (tp *TaskPublisher) Close() error

Close 关闭

func (*TaskPublisher) Publish

func (tp *TaskPublisher) Publish(ctx context.Context, taskType string, body []byte) error

Publish 发布任务

type TaskSubscriber

type TaskSubscriber struct {
	// contains filtered or unexported fields
}

TaskSubscriber 任务订阅者(语义化包装) 这不是新接口,只是对 Subscriber 的语义化封装

func NewTaskSubscriber

func NewTaskSubscriber(subscriber Subscriber, workerGroup string) *TaskSubscriber

NewTaskSubscriber 创建任务订阅者 workerGroup: 工作组名称,同组的 worker 会负载均衡处理任务

func (*TaskSubscriber) Close

func (ts *TaskSubscriber) Close() error

Close 关闭

func (*TaskSubscriber) Stop

func (ts *TaskSubscriber) Stop()

Stop 停止订阅

func (*TaskSubscriber) Subscribe

func (ts *TaskSubscriber) Subscribe(taskType string, handler Handler) error

Subscribe 订阅任务

type TokenBucketLimiter

type TokenBucketLimiter struct {
	// contains filtered or unexported fields
}

TokenBucketLimiter 令牌桶限流器(简单实现)

func NewTokenBucketLimiter

func NewTokenBucketLimiter(capacity int, rate time.Duration) *TokenBucketLimiter

NewTokenBucketLimiter 创建令牌桶限流器 capacity: 桶容量 rate: 令牌生成速率

func (*TokenBucketLimiter) Allow

func (l *TokenBucketLimiter) Allow() bool

Allow 检查是否允许处理

func (*TokenBucketLimiter) Wait

func (l *TokenBucketLimiter) Wait(ctx context.Context) error

Wait 等待直到允许处理

Directories

Path Synopsis
example
01-quickstart command
Package main 演示 messaging 包的最简单用法 5 分钟快速入门:发布和订阅消息
Package main 演示 messaging 包的最简单用法 5 分钟快速入门:发布和订阅消息
02-message command
Package main 演示 Message 消息模型的完整功能 学习 UUID、Metadata、Payload、Ack/Nack
Package main 演示 Message 消息模型的完整功能 学习 UUID、Metadata、Payload、Ack/Nack
03-publisher command
Package main 演示 Publisher 的使用 发布消息、批量发布、发布选项
Package main 演示 Publisher 的使用 发布消息、批量发布、发布选项
04-subscriber command
Package main 演示 Subscriber 的使用 订阅消息、多订阅者、订阅选项
Package main 演示 Subscriber 的使用 订阅消息、多订阅者、订阅选项
05-event-driven command
Package main 演示事件驱动架构(Event-Driven Architecture) 一个事件,多个服务订阅(广播模式)
Package main 演示事件驱动架构(Event-Driven Architecture) 一个事件,多个服务订阅(广播模式)
06-task-queue command
Package main 演示任务队列模式(Task Queue Pattern) 多个 Worker 负载均衡处理任务
Package main 演示任务队列模式(Task Queue Pattern) 多个 Worker 负载均衡处理任务
07-router command
Package main 演示 Router 的使用 统一路由管理、全局中间件、局部中间件
Package main 演示 Router 的使用 统一路由管理、全局中间件、局部中间件
08-middleware-basic command
Package main 演示基础中间件的使用 Logger、Retry、Timeout、Recover
Package main 演示基础中间件的使用 Logger、Retry、Timeout、Recover
09-middleware-advanced command
Package main 演示高级中间件的使用 RateLimit、CircuitBreaker、Filter、Priority、Deduplication
Package main 演示高级中间件的使用 RateLimit、CircuitBreaker、Filter、Priority、Deduplication
10-middleware-custom command
Package main 演示如何编写自定义中间件 包括:认证中间件、审计中间件、批处理中间件
Package main 演示如何编写自定义中间件 包括:认证中间件、审计中间件、批处理中间件
11-observability command
Package main 演示可观测性实践 Metrics、Tracing、Health Check
Package main 演示可观测性实践 Metrics、Tracing、Health Check
12-reliability command
Package main 演示可靠性保障实践 错误处理、重试策略、熔断降级、消息幂等性
Package main 演示可靠性保障实践 错误处理、重试策略、熔断降级、消息幂等性
13-graceful-shutdown command
Package main 演示优雅关闭 信号处理、资源清理、未完成任务处理
Package main 演示优雅关闭 信号处理、资源清理、未完成任务处理
14-multi-provider command
Package main 演示如何在不同消息中间件之间切换 包括:NSQ、RabbitMQ 的切换和混合使用
Package main 演示如何在不同消息中间件之间切换 包括:NSQ、RabbitMQ 的切换和混合使用
15-performance command
Package main 演示性能优化和压测 并发处理、批量操作、性能监控
Package main 演示性能优化和压测 并发处理、批量操作、性能监控
16-complete-app command
Package main 完整的生产级应用示例 综合展示:配置管理、中间件、可观测性、错误处理、优雅关闭
Package main 完整的生产级应用示例 综合展示:配置管理、中间件、可观测性、错误处理、优雅关闭

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL