Documentation
¶
Overview ¶
Package kafka 提供 gorp 框架基于 Kafka 的消息队列 provider。 本文件内联了 basemq.BaseMQProvider 和 native.As,消除对 contrib/internal 的依赖, 使本包成为可独立引用的模块。
Package kafka provides Kafka Subscriber implementation. This file implements the MessageSubscriber contract using IBM/sarama SDK.
本包提供 Kafka Subscriber 实现。 本文件使用 IBM/sarama SDK 实现 MessageSubscriber 契约。
Package kafka provides Kafka Publisher implementation. This file implements the MessagePublisher contract using IBM/sarama SDK.
本包提供 Kafka Publisher 实现。 本文件使用 IBM/sarama SDK 实现 MessagePublisher 契约。
Package kafka provides Kafka-based message queue provider for the gorp framework. This provider implements MessageQueue contract with IBM/sarama SDK integration.
本包提供 gorp 框架基于 Kafka 的消息队列 provider。 本 provider 实现 MessageQueue 契约,集成 IBM/sarama SDK。
Package kafka provides Kafka Queue implementation. This file implements the MessageQueue core structure with client and producer management.
本包提供 Kafka Queue 实现。 本文件实现 MessageQueue 核心结构,包含 client 和 producer 管理。
Package kafka provides kafka message queue implementation for gorp. Import this package to register the kafka provider with bootstrap.
kafka 消息队列 Provider,通过 init() 自动注册到 bootstrap。
Example:
import _ "github.com/ngq/gorp/contrib/messagequeue/kafka"
Index ¶
- Variables
- func As(source any, target any) bool
- type BaseMQProvider
- func (p *BaseMQProvider) Boot(runtimecontract.Container) error
- func (p *BaseMQProvider) DependsOn() []string
- func (p *BaseMQProvider) IsDefer() bool
- func (p *BaseMQProvider) Name() string
- func (p *BaseMQProvider) Provides() []string
- func (p *BaseMQProvider) Register(c runtimecontract.Container) error
- type Provider
- type Queue
- func (q *Queue) As(target any) bool
- func (q *Queue) Close() error
- func (q *Queue) NativeMQClient() any
- func (q *Queue) NativeSyncProducer() sarama.SyncProducer
- func (q *Queue) Publisher() integrationcontract.MessagePublisher
- func (q *Queue) Subscriber() integrationcontract.MessageSubscriber
- func (q *Queue) Underlying() any
Constants ¶
This section is empty.
Variables ¶
var ErrConsumeNotSupported = errors.New("messagequeue.kafka: Consume not supported, use SubscribeWithGroup instead")
Consume consumes messages from a topic (treated as queue). Kafka does not support direct queue consumption, use SubscribeWithGroup instead. Returns error indicating unsupported operation.
ErrConsumeNotSupported is returned when Consume is called on Kafka subscriber. Kafka does not support direct queue consumption, use SubscribeWithGroup instead.
Functions ¶
Types ¶
type BaseMQProvider ¶
type BaseMQProvider struct {
NameStr string
GetConfig func(c runtimecontract.Container) (*integrationcontract.MessageQueueConfig, error)
NewQueue func(cfg *integrationcontract.MessageQueueConfig) (integrationcontract.MessageQueue, error)
}
BaseMQProvider 消除各 MQ provider 之间的结构重复。 内联自 contrib/internal/basemq,使本包成为独立模块。
各字段含义:
- NameStr: provider 名称标识
- GetConfig: 从容器获取 MQ 配置的回调
- NewQueue: 根据配置创建 MQ 实例的回调
func (*BaseMQProvider) Boot ¶
func (p *BaseMQProvider) Boot(runtimecontract.Container) error
Boot 延迟 provider 无需 boot 操作,直接返回 nil。
func (*BaseMQProvider) DependsOn ¶
func (p *BaseMQProvider) DependsOn() []string
DependsOn 返回该 provider 依赖的契约键列表:ConfigKey。
func (*BaseMQProvider) IsDefer ¶
func (p *BaseMQProvider) IsDefer() bool
IsDefer 返回 true,表示 provider 延迟初始化。
func (*BaseMQProvider) Provides ¶
func (p *BaseMQProvider) Provides() []string
Provides 返回该 provider 提供的契约键列表: MessageQueueKey、MessagePublisherKey、MessageSubscriberKey。
func (*BaseMQProvider) Register ¶
func (p *BaseMQProvider) Register(c runtimecontract.Container) error
Register 将 MessageQueue、MessagePublisher、MessageSubscriber 绑定到容器。 创建 MQ 实例后会注册 closer,在容器销毁时自动关闭资源。
type Provider ¶
type Provider struct {
BaseMQProvider
}
Provider implements runtimecontract.ServiceProvider for Kafka message queue.
func NewProvider ¶
func NewProvider() *Provider
NewProvider creates a new Kafka message queue provider.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue implements integrationcontract.MessageQueue using IBM/sarama SDK. Manages sarama client, sync producer, and consumer groups.
Queue 使用 IBM/sarama SDK 实现 integrationcontract.MessageQueue。 管理 sarama client、sync producer 和 consumer groups。
func NewQueue ¶
func NewQueue(cfg *integrationcontract.MessageQueueConfig) (*Queue, error)
NewQueue creates a new Kafka Queue instance. Establishes sarama client and creates sync producer from client.
NewQueue 创建新的 Kafka Queue 实例。 建立 sarama client 并从 client 创建 sync producer。
func (*Queue) As ¶
As attempts to cast the underlying sarama.Client to the target type. Uses internalnative.As for type assertion.
As 尝试将底层 sarama.Client 转换为目标类型。 使用 internalnative.As 进行类型断言。
func (*Queue) Close ¶
Close closes all Kafka resources. Implements integrationcontract.MessageQueue.Close. Closes consumer groups, producers, and client in order. Logs warnings for non-fatal close errors and returns the first critical error.
Close 关闭所有 Kafka 资源。 实现 integrationcontract.MessageQueue.Close。 按顺序关闭 consumer groups、producers 和 client。 对非致命关闭错误记录警告,返回第一个关键错误。
func (*Queue) NativeMQClient ¶
NativeMQClient implements NativeMQClientProvider interface. Returns the underlying sarama.Client.
NativeMQClient 实现 NativeMQClientProvider 接口。 返回底层 sarama.Client。
func (*Queue) NativeSyncProducer ¶
func (q *Queue) NativeSyncProducer() sarama.SyncProducer
NativeSyncProducer returns the underlying sarama.SyncProducer. Useful for users who want to use producer directly for advanced operations.
NativeSyncProducer 返回底层 sarama.SyncProducer。 用于用户直接使用 producer 进行高级操作。
func (*Queue) Publisher ¶
func (q *Queue) Publisher() integrationcontract.MessagePublisher
Publisher returns a Kafka-based MessagePublisher.
Publisher 返回基于 Kafka 的 MessagePublisher。
func (*Queue) Subscriber ¶
func (q *Queue) Subscriber() integrationcontract.MessageSubscriber
Subscriber returns a Kafka-based MessageSubscriber.
Subscriber 返回基于 Kafka 的 MessageSubscriber。
func (*Queue) Underlying ¶
Underlying returns the underlying sarama.Client for advanced usage. This allows users to access native Kafka SDK capabilities such as custom partitioners, transactions, admin operations, etc.
Underlying 返回底层 sarama.Client 供高级使用。 这允许用户访问原生 Kafka SDK 能力,如自定义分区器、事务、管理操作等。