kafka

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2026 License: MIT Imports: 14 Imported by: 0

README

contrib/messagequeue/kafka

Kafka 消息队列 provider,使用 IBM/sarama SDK 实现。

SDK

使用 github.com/IBM/sarama(原 Shopify/sarama):

  • 纯 Go 实现,无 C 绑定
  • 成熟稳定,广泛使用
  • 支持 Kafka 0.10+ 特性

配置

message_queue:
  kafka:
    brokers:
      - localhost:9092
    group_id: my-consumer-group
    client_id: my-service
    version: "2.8.0"
    compression: gzip      # none, gzip, snappy, lz4, zstd
    partitioner: hash      # hash, random, round-robin
    required_acks: -1      # 0=NoResponse, 1=Leader, -1=All
    max_message_bytes: 1000000
    flush_frequency_ms: 100
    enable_tls: false

使用

// 标准抽象路径
mq := c.MustMake(integrationcontract.MessageQueueKey).(integrationcontract.MessageQueue)
mq.Publisher().Publish(ctx, "topic", message)

// 消费者组订阅
subscriber := mq.Subscriber()
unsubscribe, err := subscriber.SubscribeWithGroup(ctx, "topic", "my-group", handler)

下探原生 SDK

mq := c.MustMake(integrationcontract.MessageQueueKey).(integrationcontract.MessageQueue)

// 获取 sarama.Client
if client, ok := messagequeue.NativeKafkaClient(mq); ok {
    // 使用 Sarama 高级特性
    topics, _ := client.Topics()
    partitions, _ := client.Partitions("my-topic")
    
    // 创建 admin client
    admin, _ := sarama.NewClusterAdminFromClient(client)
    admin.CreateTopic("new-topic", &sarama.TopicDetail{}, false)
}

// 获取 SyncProducer
pub := mq.Publisher()
if producer, ok := messagequeue.NativeKafkaProducer(pub); ok {
    // 使用 producer 高级特性
    msg := &sarama.ProducerMessage{
        Topic: "topic",
        Key:   sarama.StringEncoder("key"),
        Value: sarama.ByteEncoder(message),
    }
    partition, offset, err := producer.SendMessage(msg)
}

特性适配

契约方法 Kafka 实现 说明
Publish SendMessage 同步发送
PublishWithDelay 返回错误 Kafka 不支持,建议外置调度
PublishWithPriority 分区路由 通过 partition key 实现
Send SendMessage 视 queue 为 topic
Subscribe ConsumerGroup 自动创建唯一组名
SubscribeWithGroup ConsumerGroup 推荐方式
Consume 返回错误 不支持,使用 SubscribeWithGroup

注意事项

  1. 延迟消息:Kafka 不原生支持延迟消息,建议使用 Redis 延迟队列 + 定时投递
  2. 消费者组:推荐使用 SubscribeWithGroup,保证消息可靠消费
  3. 消息顺序:同一 partition 内保证顺序,使用 partition key 控制
  4. 事务:通过下探获取 client 后可使用 Kafka 事务特性

Documentation

Overview

Package kafka 提供 gorp 框架基于 Kafka 的消息队列 provider。 本文件内联了 basemq.BaseMQProvider 和 native.As,消除对 contrib/internal 的依赖, 使本包成为可独立引用的模块。

Package kafka provides Kafka Subscriber implementation. This file implements the MessageSubscriber contract using IBM/sarama SDK.

本包提供 Kafka Subscriber 实现。 本文件使用 IBM/sarama SDK 实现 MessageSubscriber 契约。

Package kafka provides Kafka Publisher implementation. This file implements the MessagePublisher contract using IBM/sarama SDK.

本包提供 Kafka Publisher 实现。 本文件使用 IBM/sarama SDK 实现 MessagePublisher 契约。

Package kafka provides Kafka-based message queue provider for the gorp framework. This provider implements MessageQueue contract with IBM/sarama SDK integration.

本包提供 gorp 框架基于 Kafka 的消息队列 provider。 本 provider 实现 MessageQueue 契约,集成 IBM/sarama SDK。

Package kafka provides Kafka Queue implementation. This file implements the MessageQueue core structure with client and producer management.

本包提供 Kafka Queue 实现。 本文件实现 MessageQueue 核心结构,包含 client 和 producer 管理。

Package kafka provides kafka message queue implementation for gorp. Import this package to register the kafka provider with bootstrap.

kafka 消息队列 Provider,通过 init() 自动注册到 bootstrap。

Example:

import _ "github.com/ngq/gorp/contrib/messagequeue/kafka"

Index

Constants

This section is empty.

Variables

View Source
var ErrConsumeNotSupported = errors.New("messagequeue.kafka: Consume not supported, use SubscribeWithGroup instead")

Consume consumes messages from a topic (treated as queue). Kafka does not support direct queue consumption, use SubscribeWithGroup instead. Returns error indicating unsupported operation.

ErrConsumeNotSupported is returned when Consume is called on Kafka subscriber. Kafka does not support direct queue consumption, use SubscribeWithGroup instead.

Functions

func As

func As(source any, target any) bool

As 尝试通过 reflect 将 source 转换为 target 指向的类型。 支持直接赋值、接口实现和类型转换三种路径。 当 target 为 nil、非指针、nil 指针或不可设置时返回 false。

Types

type BaseMQProvider

BaseMQProvider 消除各 MQ provider 之间的结构重复。 内联自 contrib/internal/basemq,使本包成为独立模块。

各字段含义:

  • NameStr: provider 名称标识
  • GetConfig: 从容器获取 MQ 配置的回调
  • NewQueue: 根据配置创建 MQ 实例的回调

func (*BaseMQProvider) Boot

Boot 延迟 provider 无需 boot 操作,直接返回 nil。

func (*BaseMQProvider) DependsOn

func (p *BaseMQProvider) DependsOn() []string

DependsOn 返回该 provider 依赖的契约键列表:ConfigKey。

func (*BaseMQProvider) IsDefer

func (p *BaseMQProvider) IsDefer() bool

IsDefer 返回 true,表示 provider 延迟初始化。

func (*BaseMQProvider) Name

func (p *BaseMQProvider) Name() string

Name 返回 provider 名称标识。

func (*BaseMQProvider) Provides

func (p *BaseMQProvider) Provides() []string

Provides 返回该 provider 提供的契约键列表: MessageQueueKey、MessagePublisherKey、MessageSubscriberKey。

func (*BaseMQProvider) Register

Register 将 MessageQueue、MessagePublisher、MessageSubscriber 绑定到容器。 创建 MQ 实例后会注册 closer,在容器销毁时自动关闭资源。

type Provider

type Provider struct {
	BaseMQProvider
}

Provider implements runtimecontract.ServiceProvider for Kafka message queue.

func NewProvider

func NewProvider() *Provider

NewProvider creates a new Kafka message queue provider.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue implements integrationcontract.MessageQueue using IBM/sarama SDK. Manages sarama client, sync producer, and consumer groups.

Queue 使用 IBM/sarama SDK 实现 integrationcontract.MessageQueue。 管理 sarama client、sync producer 和 consumer groups。

func NewQueue

NewQueue creates a new Kafka Queue instance. Establishes sarama client and creates sync producer from client.

NewQueue 创建新的 Kafka Queue 实例。 建立 sarama client 并从 client 创建 sync producer。

func (*Queue) As

func (q *Queue) As(target any) bool

As attempts to cast the underlying sarama.Client to the target type. Uses internalnative.As for type assertion.

As 尝试将底层 sarama.Client 转换为目标类型。 使用 internalnative.As 进行类型断言。

func (*Queue) Close

func (q *Queue) Close() error

Close closes all Kafka resources. Implements integrationcontract.MessageQueue.Close. Closes consumer groups, producers, and client in order. Logs warnings for non-fatal close errors and returns the first critical error.

Close 关闭所有 Kafka 资源。 实现 integrationcontract.MessageQueue.Close。 按顺序关闭 consumer groups、producers 和 client。 对非致命关闭错误记录警告,返回第一个关键错误。

func (*Queue) NativeMQClient

func (q *Queue) NativeMQClient() any

NativeMQClient implements NativeMQClientProvider interface. Returns the underlying sarama.Client.

NativeMQClient 实现 NativeMQClientProvider 接口。 返回底层 sarama.Client。

func (*Queue) NativeSyncProducer

func (q *Queue) NativeSyncProducer() sarama.SyncProducer

NativeSyncProducer returns the underlying sarama.SyncProducer. Useful for users who want to use producer directly for advanced operations.

NativeSyncProducer 返回底层 sarama.SyncProducer。 用于用户直接使用 producer 进行高级操作。

func (*Queue) Publisher

Publisher returns a Kafka-based MessagePublisher.

Publisher 返回基于 Kafka 的 MessagePublisher。

func (*Queue) Subscriber

Subscriber returns a Kafka-based MessageSubscriber.

Subscriber 返回基于 Kafka 的 MessageSubscriber。

func (*Queue) Underlying

func (q *Queue) Underlying() any

Underlying returns the underlying sarama.Client for advanced usage. This allows users to access native Kafka SDK capabilities such as custom partitioners, transactions, admin operations, etc.

Underlying 返回底层 sarama.Client 供高级使用。 这允许用户访问原生 Kafka SDK 能力,如自定义分区器、事务、管理操作等。

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL