messagequeue

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: MIT Imports: 7 Imported by: 0

README

contrib/messagequeue

该目录承接 gorp 的消息队列后端实现,支持多 MQ 后端统一抽象与下探原生 SDK。

当前状态

Provider 状态 SDK
redis ✅ 已完成 github.com/redis/go-redis/v9
kafka ✅ 已完成 github.com/IBM/sarama
rabbitmq ✅ 已完成 github.com/rabbitmq/amqp091-go
rocketmq ✅ 已完成 github.com/apache/rocketmq-client-go/v2

下探机制

所有 provider 都支持下探原生 SDK,允许用户使用高级特性:

// Redis 下探
if client, ok := messagequeue.NativeRedisClient(mq); ok {
    client.XAdd(ctx, &redis.XAddArgs{Stream: "events", Values: data})
    client.Eval(ctx, luaScript, keys, args)
}

// Kafka 下探
if client, ok := messagequeue.NativeKafkaClient(mq); ok {
    topics, _ := client.Topics()
    admin, _ := sarama.NewClusterAdminFromClient(client)
}

// RabbitMQ 下探
if conn, ok := messagequeue.NativeRabbitMQConnection(mq); ok {
    ch, _ := conn.Channel()
    ch.Confirm(false) // 启用 publisher confirms
}

// RocketMQ 下探
if producer, ok := messagequeue.NativeRocketMQProducer(pub); ok {
    msg.WithShardingKey("order-123") // 顺序消息
}

统一契约

所有 provider 实现统一契约:

  • MessageQueue:组合 Publisher + Subscriber
  • MessagePublisher:Publish / PublishWithDelay / PublishWithPriority / Send
  • MessageSubscriber:Subscribe / SubscribeWithGroup / Consume / Unsubscribe

各 Provider 特性

Redis
  • 轻量级,适合简单场景
  • 支持 Pub/Sub、Stream、延迟队列(ZSet)
  • 下探可用:pipeline、Lua 脚本、事务
Kafka
  • 高吞吐量,适合大规模场景
  • 支持 Consumer Group
  • 延迟消息不支持(需外置调度)
  • 下探可用:事务、压缩、自定义分区器
RabbitMQ
  • 功能丰富,支持 Exchange/Queue 模型
  • 支持 x-delayed-message 插件或死信队列延迟
  • 支持优先级队列(需声明 x-max-priority)
  • 下探可用:publisher confirms、事务、自定义 Channel
RocketMQ
  • 支持固定延迟等级(1-18)
  • 支持顺序消息、事务消息
  • 下探可用:分片键、批量发送、Tag 过滤

配置示例

message_queue:
  # Redis
  redis:
    addr: "localhost:6379"
    password: ""
    db: 0

  # Kafka
  kafka:
    brokers:
      - "localhost:9092"
    group_id: "my-group"
    client_id: "my-service"
    version: "2.8.0"

  # RabbitMQ
  rabbitmq:
    url: "amqp://guest:guest@localhost:5672/"
    exchange: "my-exchange"
    exchange_type: "topic"

  # RocketMQ
  rocketmq:
    namesrv_addr: "localhost:9876"
    group_name: "my-group"

文件结构

contrib/messagequeue/
├── native.go              # 统一下探辅助函数
├── README.md              # 本文档
│
├── redis/
│   ├── provider.go        # Redis Provider
│   ├── provider_test.go
│   ├── behavior_test.go
│   └── README.md
│
├── kafka/
│   ├── provider.go        # Kafka Provider
│   ├── config.go          # Sarama 配置构建
│   ├── provider_test.go
│   ├── behavior_test.go
│   └── README.md
│
├── rabbitmq/
│   ├── provider.go        # RabbitMQ Provider
│   ├── provider_test.go
│   ├── behavior_test.go
│   └── README.md
│
└── rocketmq/
│   ├── provider.go        # RocketMQ Provider
│   ├── provider_test.go
│   ├── behavior_test.go
│   └── README.md

P0 约束

  • 所有 provider 必须实现 NativeMQClientProvider 可选接口
  • 所有 provider 必须提供 Underlying() + As() 方法
  • 所有 provider 必须有 provider_test.go + behavior_test.go 测试
  • 文档必须包含 SDK 选择说明、配置示例、下探示例

Documentation

Overview

Package messagequeue provides shared metrics for MQ implementations. This file implements Prometheus metrics for message queue operations.

消息队列包提供 MQ 实现的共享指标。 本文件实现消息队列操作的 Prometheus 指标。

Package messagequeue provides helper functions for accessing native MQ clients. These functions allow "MQ-first" users to use native SDK capabilities while remaining within the framework's governance boundary.

本包提供访问原生 MQ 客户端的辅助函数。 这些函数允许"MQ-first"用户使用原生 SDK 能力, 同时保持在框架的治理边界内。

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NativeKafkaClient

func NativeKafkaClient(mq integrationcontract.MessageQueue) (sarama.Client, bool)

NativeKafkaClient extracts the underlying sarama.Client from a MessageQueue. Returns the client and true if the MQ implementation is Kafka-backed. Returns nil and false otherwise.

NativeKafkaClient 从 MessageQueue 中提取底层 sarama.Client。 如果 MQ 实现基于 Kafka,返回 client 和 true。 否则返回 nil 和 false。

Example:

mq := c.MustMake(integrationcontract.MessageQueueKey).(integrationcontract.MessageQueue)
if client, ok := messagequeue.NativeKafkaClient(mq); ok {
    // 使用 Sarama SDK 高级特性
    topics, _ := client.Topics()
    partitions, _ := client.Partitions("my-topic")
}

func NativeKafkaConsumerGroup

func NativeKafkaConsumerGroup(sub integrationcontract.MessageSubscriber) (sarama.ConsumerGroup, bool)

NativeKafkaConsumerGroup extracts sarama.ConsumerGroup from MessageSubscriber. Returns the consumer group and true if the subscriber implementation is Kafka-backed.

NativeKafkaConsumerGroup 从 MessageSubscriber 中提取 sarama.ConsumerGroup。 如果订阅者实现基于 Kafka,返回 consumerGroup 和 true。

func NativeKafkaProducer

func NativeKafkaProducer(pub integrationcontract.MessagePublisher) (sarama.SyncProducer, bool)

NativeKafkaProducer extracts sarama.SyncProducer from MessagePublisher. Returns the producer and true if the publisher implementation is Kafka-backed.

NativeKafkaProducer 从 MessagePublisher 中提取 sarama.SyncProducer。 如果发布者实现基于 Kafka,返回 producer 和 true。

func NativeRabbitMQChannel

func NativeRabbitMQChannel(pubOrSub any) (*amqp.Channel, bool)

NativeRabbitMQChannel extracts *amqp.Channel from MessagePublisher or MessageSubscriber. Returns the channel and true if the implementation is RabbitMQ-backed.

NativeRabbitMQChannel 从 MessagePublisher 或 MessageSubscriber 中提取 *amqp.Channel。 如果实现基于 RabbitMQ,返回 channel 和 true。

func NativeRabbitMQConnection

func NativeRabbitMQConnection(mq integrationcontract.MessageQueue) (*amqp.Connection, bool)

NativeRabbitMQConnection extracts the underlying *amqp.Connection from a MessageQueue. Returns the connection and true if the MQ implementation is RabbitMQ-backed. Returns nil and false otherwise.

NativeRabbitMQConnection 从 MessageQueue 中提取底层 *amqp.Connection。 如果 MQ 实现基于 RabbitMQ,返回 connection 和 true。 否则返回 nil 和 false。

Example:

mq := c.MustMake(integrationcontract.MessageQueueKey).(integrationcontract.MessageQueue)
if conn, ok := messagequeue.NativeRabbitMQConnection(mq); ok {
    // 创建新 Channel 使用高级特性
    ch, _ := conn.Channel()
    ch.Confirm(false) // 启用 publisher confirms
}

func NativeRedisClient

func NativeRedisClient(mq integrationcontract.MessageQueue) (*redis.Client, bool)

NativeRedisClient extracts the underlying *redis.Client from a MessageQueue. Returns the client and true if the MQ implementation is Redis-backed. Returns nil and false otherwise.

NativeRedisClient 从 MessageQueue 中提取底层 *redis.Client。 如果 MQ 实现基于 Redis,返回 client 和 true。 否则返回 nil 和 false。

Example:

mq := c.MustMake(integrationcontract.MessageQueueKey).(integrationcontract.MessageQueue)
if client, ok := messagequeue.NativeRedisClient(mq); ok {
    // 使用 Redis SDK 高级特性
    client.XAdd(ctx, &redis.XAddArgs{Stream: "events", Values: data})
    client.Eval(ctx, luaScript, keys, args)
}

func NativeRedisPubSub

func NativeRedisPubSub(sub integrationcontract.MessageSubscriber) (*redis.PubSub, bool)

NativeRedisPubSub extracts *redis.PubSub from MessageSubscriber. Returns the pubsub and true if the subscriber implementation is Redis-backed.

NativeRedisPubSub 从 MessageSubscriber 中提取 *redis.PubSub。 如果订阅者实现基于 Redis,返回 pubsub 和 true。

func NativeRedisPublisher

func NativeRedisPublisher(pub integrationcontract.MessagePublisher) (*redis.Client, bool)

NativeRedisPublisher extracts *redis.Client from MessagePublisher. Returns the client and true if the publisher implementation is Redis-backed.

NativeRedisPublisher 从 MessagePublisher 中提取 *redis.Client。 如果发布者实现基于 Redis,返回 client 和 true。

func NativeRocketMQProducer

func NativeRocketMQProducer(mqOrPub any) (rocketmq.Producer, bool)

NativeRocketMQProducer extracts rocketmq.Producer from MessageQueue or MessagePublisher. Returns the producer and true if the implementation is RocketMQ-backed.

NativeRocketMQProducer 从 MessageQueue 或 MessagePublisher 中提取 rocketmq.Producer。 如果实现基于 RocketMQ,返回 producer 和 true。

Note: The returned type is rocketmq.Producer interface from rocketmq-client-go.

Types

type MetricsRecorder

type MetricsRecorder struct {
	// contains filtered or unexported fields
}

MetricsRecorder records Prometheus metrics for message queue operations. This provides a reusable metrics layer for all MQ provider implementations.

MetricsRecorder 记录消息队列操作的 Prometheus 指标。 这为所有 MQ provider 实现提供可复用的指标层。

func NewMetricsRecorder

func NewMetricsRecorder() *MetricsRecorder

NewMetricsRecorder creates a new MQ metrics recorder. Metrics are registered with the default Prometheus registry.

NewMetricsRecorder 创建新的 MQ 指标记录器。 指标注册到默认 Prometheus 注册表。

func (*MetricsRecorder) OnConsume

func (m *MetricsRecorder) OnConsume(topic string, status string, messageSize int, latencySeconds float64)

OnConsume records a consume operation. status should be "success" or "error".

OnConsume 记录消费操作。 status 应为 "success" 或 "error"。

func (*MetricsRecorder) OnDeadLetter

func (m *MetricsRecorder) OnDeadLetter(topic string)

OnDeadLetter records a message sent to dead letter queue.

OnDeadLetter 记录发送到死信队列的消息。

func (*MetricsRecorder) OnPublish

func (m *MetricsRecorder) OnPublish(topic string, status string, messageSize int, latencySeconds float64)

OnPublish records a publish operation. status should be "success" or "error".

OnPublish 记录发布操作。 status 应为 "success" 或 "error"。

func (*MetricsRecorder) OnRetry

func (m *MetricsRecorder) OnRetry(topic string, operation string)

OnRetry records a retry operation. operation should be "publish" or "consume".

OnRetry 记录重试操作。 operation 应为 "publish" 或 "consume"。

func (*MetricsRecorder) OnSubscribe

func (m *MetricsRecorder) OnSubscribe(topic string)

OnSubscribe records a new subscription.

OnSubscribe 记录新订阅。

func (*MetricsRecorder) OnUnsubscribe

func (m *MetricsRecorder) OnUnsubscribe(topic string)

OnUnsubscribe records an unsubscribe event.

OnUnsubscribe 记录取消订阅事件。

Directories

Path Synopsis
kafka module
redis module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL