Documentation
¶
Index ¶
- func ParseJSON(msg *Message, v interface{}) error
- func PublishEvent(ctx context.Context, publisher Publisher, event string, body []byte) error
- func PublishJSON(ctx context.Context, publisher Publisher, topic string, data interface{}) error
- func PublishTask(ctx context.Context, publisher Publisher, taskType string, body []byte) error
- func RegisterProvider(provider Provider, factory EventBusFactory)
- func SubscribeEvent(subscriber Subscriber, event string, serviceName string, handler Handler) error
- func SubscribeTask(subscriber Subscriber, taskType string, workerGroup string, handler Handler) error
- type AuditLogger
- type BatchProcessor
- type CircuitBreaker
- type Config
- type DeduplicationStore
- type EventBus
- type EventBusFactory
- type Handler
- type Message
- type MetricsCollector
- type Middleware
- func AuditMiddleware(logger AuditLogger) Middleware
- func BatchMiddleware(batchSize int, flushInterval time.Duration, ...) Middleware
- func CircuitBreakerMiddleware(breaker CircuitBreaker) Middleware
- func DeduplicationMiddleware(store DeduplicationStore, ttl time.Duration) Middleware
- func FilterMiddleware(predicate func(*Message) bool) Middleware
- func LoggerMiddleware(logger *log.Logger) Middleware
- func MetricsMiddleware(collector MetricsCollector) Middleware
- func PriorityMiddleware(getPriority func(*Message) int) Middleware
- func RateLimitMiddleware(limiter RateLimiter, mode string) Middleware
- func RecoverMiddleware(logger *log.Logger) Middleware
- func RetryMiddleware(maxRetries int, delay time.Duration) Middleware
- func TimeoutMiddleware(timeout time.Duration) Middleware
- func TracingMiddleware() Middleware
- type NSQConfig
- type Provider
- type Publisher
- type RabbitMQConfig
- type RateLimiter
- type Router
- type SimpleCircuitBreaker
- type Subscriber
- type TaskPublisher
- type TaskSubscriber
- type TokenBucketLimiter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func PublishEvent ¶
PublishEvent 发布事件(语义化别名) 用于事件驱动模式,让代码意图更明确
使用示例:
messaging.PublishEvent(ctx, publisher, "user.created", data)
func PublishJSON ¶
PublishJSON 发布 JSON 消息 将对象序列化为 JSON 后发布
func PublishTask ¶
PublishTask 发布任务(语义化别名) 用于任务队列模式,让代码意图更明确
使用示例:
messaging.PublishTask(ctx, publisher, "email.send", task)
func RegisterProvider ¶
func RegisterProvider(provider Provider, factory EventBusFactory)
RegisterProvider 注册消息中间件提供者 这个函数由各个实现包(nsq、rabbitmq)在 init 函数中调用
示例:
func init() {
messaging.RegisterProvider(messaging.ProviderNSQ, NewEventBusFromConfig)
}
func SubscribeEvent ¶
func SubscribeEvent(subscriber Subscriber, event string, serviceName string, handler Handler) error
SubscribeEvent 订阅事件(事件驱动模式) 每个服务应该使用不同的 channel,以便接收所有事件
使用示例:
// 每个服务使用唯一的 channel messaging.SubscribeEvent(subscriber, "user.created", "email-service", emailHandler) messaging.SubscribeEvent(subscriber, "user.created", "stat-service", statHandler)
func SubscribeTask ¶
func SubscribeTask(subscriber Subscriber, taskType string, workerGroup string, handler Handler) error
SubscribeTask 订阅任务(任务队列模式) 同一类型的 worker 应该使用相同的 workerGroup,以实现负载均衡
使用示例:
// 多个 worker 使用相同的 workerGroup messaging.SubscribeTask(subscriber, "email.send", "email-workers", handler) messaging.SubscribeTask(subscriber, "email.send", "email-workers", handler)
Types ¶
type AuditLogger ¶
AuditLogger 审计日志接口
type BatchProcessor ¶
type BatchProcessor interface {
// Add 添加消息到批次
Add(msg *Message) error
// Flush 刷新批次(强制处理)
Flush() error
}
BatchProcessor 批处理器接口
type CircuitBreaker ¶
type CircuitBreaker interface {
// Call 执行调用(带熔断保护)
Call(fn func() error) error
// State 获取当前状态(closed, open, half-open)
State() string
}
CircuitBreaker 熔断器接口
type Config ¶
type Config struct {
// Provider 消息中间件提供者类型(nsq, rabbitmq)
Provider Provider `json:"provider" yaml:"provider"`
// NSQ 配置
NSQ NSQConfig `json:"nsq" yaml:"nsq"`
// RabbitMQ 配置
RabbitMQ RabbitMQConfig `json:"rabbitmq" yaml:"rabbitmq"`
}
Config 事件总线配置
type DeduplicationStore ¶
type DeduplicationStore interface {
// Exists 检查消息是否已处理
Exists(uuid string) bool
// Mark 标记消息已处理
Mark(uuid string, ttl time.Duration) error
}
DeduplicationStore 去重存储接口
type EventBus ¶
type EventBus interface {
// Publisher 获取发布者
Publisher() Publisher
// Subscriber 获取订阅者
Subscriber() Subscriber
// Router 获取路由器(用于批量注册处理器)
Router() *Router
// Health 健康检查(检查连接状态)
Health() error
// Close 关闭事件总线(释放所有资源)
Close() error
}
EventBus 事件总线接口 组合了发布者和订阅者,提供完整的消息总线功能
func MustNewEventBus ¶
MustNewEventBus 创建事件总线,失败则 panic 适用于启动阶段,配置错误时快速失败
func NewEventBus ¶
NewEventBus 创建事件总线 这是对外暴露的统一接口,根据配置中的 Provider 选择具体实现
使用示例:
config := &messaging.Config{
Provider: messaging.ProviderNSQ,
NSQ: messaging.NSQConfig{...},
}
bus, err := messaging.NewEventBus(config)
type EventBusFactory ¶
EventBusFactory 事件总线工厂函数 接收配置,返回 EventBus 实例
type Handler ¶
Handler 消息处理函数 业务层通过实现此函数来处理接收到的消息
func NewJSONHandler ¶
func NewJSONHandler(fn func(ctx context.Context, data interface{}) error, dataType interface{}) Handler
NewJSONHandler 创建 JSON 处理器 自动解析 JSON 消息并调用业务处理函数
func NewSimpleHandler ¶
NewSimpleHandler 创建简单的处理器 只关心消息体,不关心其他元数据
type Message ¶
type Message struct {
// UUID 全局唯一标识(每条消息都有唯一 ID)
UUID string
// Metadata 元数据(用于链路追踪、业务标识等)
// 例如:trace_id, span_id, user_id, request_id
Metadata map[string]string
// Payload 消息负载(实际业务数据)
Payload []byte
// Attempts 消息重试次数
Attempts uint16
// Timestamp 消息时间戳(纳秒)
Timestamp int64
// Topic 消息主题
Topic string
// Channel 消息通道
Channel string
// contains filtered or unexported fields
}
Message 领域消息结构 参考 Watermill 设计,支持 UUID、Metadata、Ack/Nack
func (*Message) SetAckFunc ¶
SetAckFunc 设置确认函数(由 Adapter 调用)
func (*Message) SetNackFunc ¶
SetNackFunc 设置拒绝函数(由 Adapter 调用)
type MetricsCollector ¶
type MetricsCollector interface {
// IncrementProcessed 增加处理消息计数
IncrementProcessed(topic string)
// IncrementFailed 增加失败消息计数
IncrementFailed(topic string)
// RecordDuration 记录处理耗时
RecordDuration(topic string, duration time.Duration)
}
MetricsMiddleware 指标中间件 收集消息处理的统计信息
type Middleware ¶
Middleware 中间件函数 用于在消息处理前后执行额外逻辑(日志、重试、超时、监控等)
func AuditMiddleware ¶
func AuditMiddleware(logger AuditLogger) Middleware
AuditMiddleware 审计中间件 记录消息处理的完整审计日志
func BatchMiddleware ¶
func BatchMiddleware(batchSize int, flushInterval time.Duration, batchHandler func([]*Message) error) Middleware
BatchMiddleware 批处理中间件 将多个消息合并处理,提高吞吐量 batchSize: 批次大小 flushInterval: 刷新间隔
func CircuitBreakerMiddleware ¶
func CircuitBreakerMiddleware(breaker CircuitBreaker) Middleware
CircuitBreakerMiddleware 熔断器中间件 防止级联故障,当错误率超过阈值时自动熔断
func DeduplicationMiddleware ¶
func DeduplicationMiddleware(store DeduplicationStore, ttl time.Duration) Middleware
DeduplicationMiddleware 创建去重中间件 防止重复处理相同的消息
func FilterMiddleware ¶
func FilterMiddleware(predicate func(*Message) bool) Middleware
FilterMiddleware 条件过滤中间件 根据条件决定是否处理消息
func LoggerMiddleware ¶
func LoggerMiddleware(logger *log.Logger) Middleware
LoggerMiddleware 日志中间件 记录消息处理的开始、结束、耗时和错误
func MetricsMiddleware ¶
func MetricsMiddleware(collector MetricsCollector) Middleware
MetricsMiddleware 创建指标中间件
func PriorityMiddleware ¶
func PriorityMiddleware(getPriority func(*Message) int) Middleware
PriorityMiddleware 优先级中间件 根据消息优先级排序处理
func RateLimitMiddleware ¶
func RateLimitMiddleware(limiter RateLimiter, mode string) Middleware
RateLimitMiddleware 限流中间件 限制消息处理速率,防止系统过载 mode: "drop" 丢弃超限消息,"wait" 等待直到允许处理
func RecoverMiddleware ¶
func RecoverMiddleware(logger *log.Logger) Middleware
RecoverMiddleware 恢复中间件 捕获 panic,防止单个消息处理失败导致整个程序崩溃
func RetryMiddleware ¶
func RetryMiddleware(maxRetries int, delay time.Duration) Middleware
RetryMiddleware 重试中间件 在消息处理失败时自动重试 maxRetries: 最大重试次数 delay: 每次重试的延迟时间(指数退避)
func TimeoutMiddleware ¶
func TimeoutMiddleware(timeout time.Duration) Middleware
TimeoutMiddleware 超时中间件 限制消息处理的最大时间 timeout: 超时时间
func TracingMiddleware ¶
func TracingMiddleware() Middleware
TracingMiddleware 链路追踪中间件 自动注入和提取 trace_id、span_id
type NSQConfig ¶
type NSQConfig struct {
// NSQLookupd 地址列表
LookupdAddrs []string `json:"lookupd_addrs" yaml:"lookupd_addrs"`
// NSQd 地址(用于发布)
NSQdAddr string `json:"nsqd_addr" yaml:"nsqd_addr"`
// 最大消息重试次数
MaxAttempts uint16 `json:"max_attempts" yaml:"max_attempts"`
// 最大消息处理时间
MaxInFlight int `json:"max_in_flight" yaml:"max_in_flight"`
// 消息超时时间
MsgTimeout time.Duration `json:"msg_timeout" yaml:"msg_timeout"`
// 重新入队延迟
RequeueDelay time.Duration `json:"requeue_delay" yaml:"requeue_delay"`
// 拨号超时时间
DialTimeout time.Duration `json:"dial_timeout" yaml:"dial_timeout"`
// 读超时时间
ReadTimeout time.Duration `json:"read_timeout" yaml:"read_timeout"`
// 写超时时间
WriteTimeout time.Duration `json:"write_timeout" yaml:"write_timeout"`
}
NSQConfig NSQ 配置
type Provider ¶
type Provider string
Provider 消息中间件提供者类型
func GetRegisteredProviders ¶
func GetRegisteredProviders() []Provider
GetRegisteredProviders 获取已注册的提供者列表
type Publisher ¶
type Publisher interface {
// Publish 发布消息到指定主题
// topic: 主题名称
// body: 消息体(字节数组)
Publish(ctx context.Context, topic string, body []byte) error
// PublishMessage 发布消息对象(支持 Metadata)
PublishMessage(ctx context.Context, topic string, msg *Message) error
// Close 关闭发布者,释放资源
Close() error
}
Publisher 消息发布者接口 领域层/应用层眼中的发布接口,与具体消息中间件解耦
type RabbitMQConfig ¶
type RabbitMQConfig struct {
// RabbitMQ 连接 URL(推荐方式)
// 格式:amqp://username:password@host:port/vhost
// 例如:amqp://guest:guest@localhost:5672/
// 注意:如果设置了 URL,下面的独立配置项会被忽略
URL string `json:"url" yaml:"url"`
// Host 主机地址
Host string `json:"host" yaml:"host"`
// Port 端口号(默认 5672)
Port int `json:"port" yaml:"port"`
// Username 用户名(默认 guest)
Username string `json:"username" yaml:"username"`
// Password 密码(默认 guest)
Password string `json:"password" yaml:"password"`
// VHost 虚拟主机(默认 /)
// VHost 相当于命名空间,用于隔离不同应用的消息
VHost string `json:"vhost" yaml:"vhost"`
// MaxChannels 最大 channel 数量(默认 100)
// Channel 是轻量级的连接,用于发送和接收消息
MaxChannels int `json:"max_channels" yaml:"max_channels"`
// PrefetchCount 预取数量(默认 200)
// 消费者一次最多预取多少条未确认的消息
// 值越大,吞吐量越高,但内存占用也越大
PrefetchCount int `json:"prefetch_count" yaml:"prefetch_count"`
// PrefetchSize 预取大小(默认 0,不限制)
// 消费者一次最多预取多少字节的未确认消息
PrefetchSize int `json:"prefetch_size" yaml:"prefetch_size"`
// Durable 是否持久化 exchange 和 queue(默认 true)
// true: RabbitMQ 重启后,exchange 和 queue 不会丢失
Durable bool `json:"durable" yaml:"durable"`
// PersistentMessages 消息是否持久化(默认 true)
// true: 消息会写入磁盘,RabbitMQ 重启后消息不会丢失
PersistentMessages bool `json:"persistent_messages" yaml:"persistent_messages"`
// ConnectionTimeout 连接超时时间(默认 10s)
ConnectionTimeout time.Duration `json:"connection_timeout" yaml:"connection_timeout"`
// HeartbeatInterval 心跳间隔(默认 10s)
// 用于检测连接是否存活
HeartbeatInterval time.Duration `json:"heartbeat_interval" yaml:"heartbeat_interval"`
// AutoReconnect 是否自动重连(默认 true)
AutoReconnect bool `json:"auto_reconnect" yaml:"auto_reconnect"`
// ReconnectDelay 重连延迟(默认 5s)
ReconnectDelay time.Duration `json:"reconnect_delay" yaml:"reconnect_delay"`
// MaxReconnectAttempts 最大重连次数(默认 0,无限重试)
MaxReconnectAttempts int `json:"max_reconnect_attempts" yaml:"max_reconnect_attempts"`
// ExchangeType Exchange 类型(默认 fanout)
// fanout: 广播,发送给所有绑定的队列
// direct: 直接路由,根据 routing key 精确匹配
// topic: 主题路由,支持通配符(* 和 #)
// headers: 根据消息头路由
ExchangeType string `json:"exchange_type" yaml:"exchange_type"`
// AutoDelete 是否自动删除(默认 false)
// true: 当没有消费者时,自动删除 exchange 和 queue
AutoDelete bool `json:"auto_delete" yaml:"auto_delete"`
// Exclusive 是否独占(默认 false)
// true: 队列只能被当前连接使用,连接断开后队列自动删除
Exclusive bool `json:"exclusive" yaml:"exclusive"`
}
RabbitMQConfig RabbitMQ 配置
func DefaultRabbitMQConfig ¶
func DefaultRabbitMQConfig() RabbitMQConfig
DefaultRabbitMQConfig 返回默认 RabbitMQ 配置
func (*RabbitMQConfig) BuildURL ¶
func (c *RabbitMQConfig) BuildURL() string
BuildURL 构建 RabbitMQ 连接 URL 如果已经设置了 URL,直接返回;否则根据独立配置项构建
type RateLimiter ¶
type RateLimiter interface {
// Allow 检查是否允许处理
Allow() bool
// Wait 等待直到允许处理
Wait(ctx context.Context) error
}
RateLimiter 限流器接口
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router 消息路由器 用于批量注册消息处理器,支持中间件链
func (*Router) AddHandler ¶
AddHandler 注册消息处理器 topic: 主题名称 channel: 通道名称 handler: 消息处理函数
func (*Router) AddHandlerWithMiddleware ¶
func (r *Router) AddHandlerWithMiddleware(topic, channel string, handler Handler, middlewares ...Middleware)
AddHandlerWithMiddleware 注册消息处理器(支持中间件) topic: 主题名称 channel: 通道名称 handler: 消息处理函数 middlewares: 中间件列表(按顺序执行)
func (*Router) AddMiddleware ¶
func (r *Router) AddMiddleware(mw Middleware)
AddMiddleware 添加全局中间件 全局中间件会应用到所有处理器
type SimpleCircuitBreaker ¶
type SimpleCircuitBreaker struct {
// contains filtered or unexported fields
}
SimpleCircuitBreaker 简单熔断器实现
func NewSimpleCircuitBreaker ¶
func NewSimpleCircuitBreaker(maxFailures int, timeout time.Duration) *SimpleCircuitBreaker
NewSimpleCircuitBreaker 创建简单熔断器 maxFailures: 最大失败次数(超过则熔断) timeout: 熔断持续时间(之后尝试恢复)
func (*SimpleCircuitBreaker) Call ¶
func (cb *SimpleCircuitBreaker) Call(fn func() error) error
Call 执行调用(带熔断保护)
type Subscriber ¶
type Subscriber interface {
// Subscribe 订阅指定主题的消息
// topic: 主题名称
// channel: 通道名称(NSQ 中的 channel 概念,用于负载均衡)
// handler: 消息处理函数
Subscribe(topic, channel string, handler Handler) error
// SubscribeWithMiddleware 订阅消息(支持中间件)
SubscribeWithMiddleware(topic, channel string, handler Handler, middlewares ...Middleware) error
// Stop 停止订阅
Stop()
// Close 关闭订阅者,释放资源
Close() error
}
Subscriber 消息订阅者接口 领域层/应用层眼中的订阅接口
type TaskPublisher ¶
type TaskPublisher struct {
// contains filtered or unexported fields
}
TaskPublisher 任务发布者(语义化包装) 这不是新接口,只是对 Publisher 的语义化封装
func NewTaskPublisher ¶
func NewTaskPublisher(publisher Publisher) *TaskPublisher
NewTaskPublisher 创建任务发布者
type TaskSubscriber ¶
type TaskSubscriber struct {
// contains filtered or unexported fields
}
TaskSubscriber 任务订阅者(语义化包装) 这不是新接口,只是对 Subscriber 的语义化封装
func NewTaskSubscriber ¶
func NewTaskSubscriber(subscriber Subscriber, workerGroup string) *TaskSubscriber
NewTaskSubscriber 创建任务订阅者 workerGroup: 工作组名称,同组的 worker 会负载均衡处理任务
type TokenBucketLimiter ¶
type TokenBucketLimiter struct {
// contains filtered or unexported fields
}
TokenBucketLimiter 令牌桶限流器(简单实现)
func NewTokenBucketLimiter ¶
func NewTokenBucketLimiter(capacity int, rate time.Duration) *TokenBucketLimiter
NewTokenBucketLimiter 创建令牌桶限流器 capacity: 桶容量 rate: 令牌生成速率
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
example
|
|
|
01-quickstart
command
Package main 演示 messaging 包的最简单用法 5 分钟快速入门:发布和订阅消息
|
Package main 演示 messaging 包的最简单用法 5 分钟快速入门:发布和订阅消息 |
|
02-message
command
Package main 演示 Message 消息模型的完整功能 学习 UUID、Metadata、Payload、Ack/Nack
|
Package main 演示 Message 消息模型的完整功能 学习 UUID、Metadata、Payload、Ack/Nack |
|
03-publisher
command
Package main 演示 Publisher 的使用 发布消息、批量发布、发布选项
|
Package main 演示 Publisher 的使用 发布消息、批量发布、发布选项 |
|
04-subscriber
command
Package main 演示 Subscriber 的使用 订阅消息、多订阅者、订阅选项
|
Package main 演示 Subscriber 的使用 订阅消息、多订阅者、订阅选项 |
|
05-event-driven
command
Package main 演示事件驱动架构(Event-Driven Architecture) 一个事件,多个服务订阅(广播模式)
|
Package main 演示事件驱动架构(Event-Driven Architecture) 一个事件,多个服务订阅(广播模式) |
|
06-task-queue
command
Package main 演示任务队列模式(Task Queue Pattern) 多个 Worker 负载均衡处理任务
|
Package main 演示任务队列模式(Task Queue Pattern) 多个 Worker 负载均衡处理任务 |
|
07-router
command
Package main 演示 Router 的使用 统一路由管理、全局中间件、局部中间件
|
Package main 演示 Router 的使用 统一路由管理、全局中间件、局部中间件 |
|
08-middleware-basic
command
Package main 演示基础中间件的使用 Logger、Retry、Timeout、Recover
|
Package main 演示基础中间件的使用 Logger、Retry、Timeout、Recover |
|
09-middleware-advanced
command
Package main 演示高级中间件的使用 RateLimit、CircuitBreaker、Filter、Priority、Deduplication
|
Package main 演示高级中间件的使用 RateLimit、CircuitBreaker、Filter、Priority、Deduplication |
|
10-middleware-custom
command
Package main 演示如何编写自定义中间件 包括:认证中间件、审计中间件、批处理中间件
|
Package main 演示如何编写自定义中间件 包括:认证中间件、审计中间件、批处理中间件 |
|
11-observability
command
Package main 演示可观测性实践 Metrics、Tracing、Health Check
|
Package main 演示可观测性实践 Metrics、Tracing、Health Check |
|
12-reliability
command
Package main 演示可靠性保障实践 错误处理、重试策略、熔断降级、消息幂等性
|
Package main 演示可靠性保障实践 错误处理、重试策略、熔断降级、消息幂等性 |
|
13-graceful-shutdown
command
Package main 演示优雅关闭 信号处理、资源清理、未完成任务处理
|
Package main 演示优雅关闭 信号处理、资源清理、未完成任务处理 |
|
14-multi-provider
command
Package main 演示如何在不同消息中间件之间切换 包括:NSQ、RabbitMQ 的切换和混合使用
|
Package main 演示如何在不同消息中间件之间切换 包括:NSQ、RabbitMQ 的切换和混合使用 |
|
15-performance
command
Package main 演示性能优化和压测 并发处理、批量操作、性能监控
|
Package main 演示性能优化和压测 并发处理、批量操作、性能监控 |
|
16-complete-app
command
Package main 完整的生产级应用示例 综合展示:配置管理、中间件、可观测性、错误处理、优雅关闭
|
Package main 完整的生产级应用示例 综合展示:配置管理、中间件、可观测性、错误处理、优雅关闭 |