Documentation
¶
Overview ¶
Package messagequeue provides shared metrics for MQ implementations. This file implements Prometheus metrics for message queue operations.
消息队列包提供 MQ 实现的共享指标。 本文件实现消息队列操作的 Prometheus 指标。
Package messagequeue provides helper functions for accessing native MQ clients. These functions allow "MQ-first" users to use native SDK capabilities while remaining within the framework's governance boundary.
本包提供访问原生 MQ 客户端的辅助函数。 这些函数允许"MQ-first"用户使用原生 SDK 能力, 同时保持在框架的治理边界内。
Index ¶
- func NativeKafkaClient(mq integrationcontract.MessageQueue) (sarama.Client, bool)
- func NativeKafkaConsumerGroup(sub integrationcontract.MessageSubscriber) (sarama.ConsumerGroup, bool)
- func NativeKafkaProducer(pub integrationcontract.MessagePublisher) (sarama.SyncProducer, bool)
- func NativeRabbitMQChannel(pubOrSub any) (*amqp.Channel, bool)
- func NativeRabbitMQConnection(mq integrationcontract.MessageQueue) (*amqp.Connection, bool)
- func NativeRedisClient(mq integrationcontract.MessageQueue) (*redis.Client, bool)
- func NativeRedisPubSub(sub integrationcontract.MessageSubscriber) (*redis.PubSub, bool)
- func NativeRedisPublisher(pub integrationcontract.MessagePublisher) (*redis.Client, bool)
- func NativeRocketMQProducer(mqOrPub any) (rocketmq.Producer, bool)
- type MetricsRecorder
- func (m *MetricsRecorder) OnConsume(topic string, status string, messageSize int, latencySeconds float64)
- func (m *MetricsRecorder) OnDeadLetter(topic string)
- func (m *MetricsRecorder) OnPublish(topic string, status string, messageSize int, latencySeconds float64)
- func (m *MetricsRecorder) OnRetry(topic string, operation string)
- func (m *MetricsRecorder) OnSubscribe(topic string)
- func (m *MetricsRecorder) OnUnsubscribe(topic string)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NativeKafkaClient ¶
func NativeKafkaClient(mq integrationcontract.MessageQueue) (sarama.Client, bool)
NativeKafkaClient extracts the underlying sarama.Client from a MessageQueue. Returns the client and true if the MQ implementation is Kafka-backed. Returns nil and false otherwise.
NativeKafkaClient 从 MessageQueue 中提取底层 sarama.Client。 如果 MQ 实现基于 Kafka,返回 client 和 true。 否则返回 nil 和 false。
Example:
mq := c.MustMake(integrationcontract.MessageQueueKey).(integrationcontract.MessageQueue)
if client, ok := messagequeue.NativeKafkaClient(mq); ok {
// 使用 Sarama SDK 高级特性
topics, _ := client.Topics()
partitions, _ := client.Partitions("my-topic")
}
func NativeKafkaConsumerGroup ¶
func NativeKafkaConsumerGroup(sub integrationcontract.MessageSubscriber) (sarama.ConsumerGroup, bool)
NativeKafkaConsumerGroup extracts sarama.ConsumerGroup from MessageSubscriber. Returns the consumer group and true if the subscriber implementation is Kafka-backed.
NativeKafkaConsumerGroup 从 MessageSubscriber 中提取 sarama.ConsumerGroup。 如果订阅者实现基于 Kafka,返回 consumerGroup 和 true。
func NativeKafkaProducer ¶
func NativeKafkaProducer(pub integrationcontract.MessagePublisher) (sarama.SyncProducer, bool)
NativeKafkaProducer extracts sarama.SyncProducer from MessagePublisher. Returns the producer and true if the publisher implementation is Kafka-backed.
NativeKafkaProducer 从 MessagePublisher 中提取 sarama.SyncProducer。 如果发布者实现基于 Kafka,返回 producer 和 true。
func NativeRabbitMQChannel ¶
NativeRabbitMQChannel extracts *amqp.Channel from MessagePublisher or MessageSubscriber. Returns the channel and true if the implementation is RabbitMQ-backed.
NativeRabbitMQChannel 从 MessagePublisher 或 MessageSubscriber 中提取 *amqp.Channel。 如果实现基于 RabbitMQ,返回 channel 和 true。
func NativeRabbitMQConnection ¶
func NativeRabbitMQConnection(mq integrationcontract.MessageQueue) (*amqp.Connection, bool)
NativeRabbitMQConnection extracts the underlying *amqp.Connection from a MessageQueue. Returns the connection and true if the MQ implementation is RabbitMQ-backed. Returns nil and false otherwise.
NativeRabbitMQConnection 从 MessageQueue 中提取底层 *amqp.Connection。 如果 MQ 实现基于 RabbitMQ,返回 connection 和 true。 否则返回 nil 和 false。
Example:
mq := c.MustMake(integrationcontract.MessageQueueKey).(integrationcontract.MessageQueue)
if conn, ok := messagequeue.NativeRabbitMQConnection(mq); ok {
// 创建新 Channel 使用高级特性
ch, _ := conn.Channel()
ch.Confirm(false) // 启用 publisher confirms
}
func NativeRedisClient ¶
func NativeRedisClient(mq integrationcontract.MessageQueue) (*redis.Client, bool)
NativeRedisClient extracts the underlying *redis.Client from a MessageQueue. Returns the client and true if the MQ implementation is Redis-backed. Returns nil and false otherwise.
NativeRedisClient 从 MessageQueue 中提取底层 *redis.Client。 如果 MQ 实现基于 Redis,返回 client 和 true。 否则返回 nil 和 false。
Example:
mq := c.MustMake(integrationcontract.MessageQueueKey).(integrationcontract.MessageQueue)
if client, ok := messagequeue.NativeRedisClient(mq); ok {
// 使用 Redis SDK 高级特性
client.XAdd(ctx, &redis.XAddArgs{Stream: "events", Values: data})
client.Eval(ctx, luaScript, keys, args)
}
func NativeRedisPubSub ¶
func NativeRedisPubSub(sub integrationcontract.MessageSubscriber) (*redis.PubSub, bool)
NativeRedisPubSub extracts *redis.PubSub from MessageSubscriber. Returns the pubsub and true if the subscriber implementation is Redis-backed.
NativeRedisPubSub 从 MessageSubscriber 中提取 *redis.PubSub。 如果订阅者实现基于 Redis,返回 pubsub 和 true。
func NativeRedisPublisher ¶
func NativeRedisPublisher(pub integrationcontract.MessagePublisher) (*redis.Client, bool)
NativeRedisPublisher extracts *redis.Client from MessagePublisher. Returns the client and true if the publisher implementation is Redis-backed.
NativeRedisPublisher 从 MessagePublisher 中提取 *redis.Client。 如果发布者实现基于 Redis,返回 client 和 true。
func NativeRocketMQProducer ¶
NativeRocketMQProducer extracts rocketmq.Producer from MessageQueue or MessagePublisher. Returns the producer and true if the implementation is RocketMQ-backed.
NativeRocketMQProducer 从 MessageQueue 或 MessagePublisher 中提取 rocketmq.Producer。 如果实现基于 RocketMQ,返回 producer 和 true。
Note: The returned type is rocketmq.Producer interface from rocketmq-client-go.
Types ¶
type MetricsRecorder ¶
type MetricsRecorder struct {
// contains filtered or unexported fields
}
MetricsRecorder records Prometheus metrics for message queue operations. This provides a reusable metrics layer for all MQ provider implementations.
MetricsRecorder 记录消息队列操作的 Prometheus 指标。 这为所有 MQ provider 实现提供可复用的指标层。
func NewMetricsRecorder ¶
func NewMetricsRecorder() *MetricsRecorder
NewMetricsRecorder creates a new MQ metrics recorder. Metrics are registered with the default Prometheus registry.
NewMetricsRecorder 创建新的 MQ 指标记录器。 指标注册到默认 Prometheus 注册表。
func (*MetricsRecorder) OnConsume ¶
func (m *MetricsRecorder) OnConsume(topic string, status string, messageSize int, latencySeconds float64)
OnConsume records a consume operation. status should be "success" or "error".
OnConsume 记录消费操作。 status 应为 "success" 或 "error"。
func (*MetricsRecorder) OnDeadLetter ¶
func (m *MetricsRecorder) OnDeadLetter(topic string)
OnDeadLetter records a message sent to dead letter queue.
OnDeadLetter 记录发送到死信队列的消息。
func (*MetricsRecorder) OnPublish ¶
func (m *MetricsRecorder) OnPublish(topic string, status string, messageSize int, latencySeconds float64)
OnPublish records a publish operation. status should be "success" or "error".
OnPublish 记录发布操作。 status 应为 "success" 或 "error"。
func (*MetricsRecorder) OnRetry ¶
func (m *MetricsRecorder) OnRetry(topic string, operation string)
OnRetry records a retry operation. operation should be "publish" or "consume".
OnRetry 记录重试操作。 operation 应为 "publish" 或 "consume"。
func (*MetricsRecorder) OnSubscribe ¶
func (m *MetricsRecorder) OnSubscribe(topic string)
OnSubscribe records a new subscription.
OnSubscribe 记录新订阅。
func (*MetricsRecorder) OnUnsubscribe ¶
func (m *MetricsRecorder) OnUnsubscribe(topic string)
OnUnsubscribe records an unsubscribe event.
OnUnsubscribe 记录取消订阅事件。