kafka

package
v0.4.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2025 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewConsumerGroup

func NewConsumerGroup(opts ...KafkaConfigOption) (sarama.ConsumerGroup, func(), error)

func NewProducer

func NewProducer(opts ...KafkaConfigOption) (sarama.AsyncProducer, func(), error)

Types

type KafkaConfig

type KafkaConfig struct {
	// Kafka Broker 地址列表
	BootstrapServers []string `json:"bootstrap_servers" yaml:"bootstrap_servers" env:"DEFAULT_KAFKA_BOOTSTRAP_SERVERS"`

	// Kafka Consumer Group Id
	GroupId string `json:"group_id" yaml:"group_id" env:"DEFAULT_KAFKA_GROUP_ID"`

	// SASL 用户名(如果需要使用 SASL 认证)
	SASLUsername string `json:"sasl_username" yaml:"sasl_username" env:"DEFAULT_KAFKA_SASL_USERNAME"`

	// SASL 密码(如果需要使用 SASL 认证)
	SASLPassword string `json:"sasl_password" yaml:"sasl_password" env:"DEFAULT_KAFKA_SASL_PASSWORD"`

	// SASL 认证机制(如 PLAIN)
	SASLMechanism string `json:"sasl_mechanism" yaml:"sasl_mechanism" env:"DEFAULT_KAFKA_SASL_MECHANISM"`

	// Kafka 最大请求空闲时间(单位:秒)
	MaxOpenRequests uint32 `json:"max_open_requests" yaml:"max_open_requests" env:"DEFAULT_KAFKA_MAX_OPEN_REQUESTS"`

	// DialTimeout   最大连接超时时间(单位:秒)
	DialTimeout uint32 `json:"dial_read_timeout" yaml:"dial_read_timeout" env:"DEFAULT_KAFKA_DIAL_READ_TIMEOUT"`

	// ReadTmout 最大读取超时时间(单位:秒)
	ReadTimeout uint32 `json:"read_timeout" yaml:"read_timeout" env:"DEFAULT_KAFKA_READ_TIMEOUT"`

	// WriteTimeout 最大写入超时时间(单位:秒)
	WriteTimeout uint32 `json:"write_timeout" yaml:"write_timeout" env:"DEFAULT_KAFKA_WRITE_TIMEOUT"`

	// OpenTelemetry 跟踪提供者
	TracerProvider trace.TracerProvider `json:"-"`

	// OpenTelemetry 度量提供者
	MeterProvider metric.MeterProvider `json:"-"`

	// OpenTelemetry 传播器
	Propagator propagation.TextMapPropagator `json:"-"`
}

KafkaConfig 包含 Kafka 连接配置

type KafkaConfigOption

type KafkaConfigOption func(c *KafkaConfig)

KafkaConfigOption 用于配置 Kafka 配置项的函数类型

func WithBootstrapServers

func WithBootstrapServers(bootstrapServers string) KafkaConfigOption

WithBrokerAddrs 配置 Kafka BootstrapServer 地址列表

func WithDialTimeout

func WithDialTimeout(t uint32) KafkaConfigOption

WithMaxOpenConns 配置 Kafka 最大连接超时时间

func WithGroupID

func WithGroupID(groupID string) KafkaConfigOption

WithGroupID 配置 Kafka Consumer Group ID

func WithReadTimeout

func WithReadTimeout(t uint32) KafkaConfigOption

WithMaxIdleConns 配置 Kafka 最大读取超时时间

func WithSASLMechanism

func WithSASLMechanism(mechanism string) KafkaConfigOption

WithSASLMechanism 配置 SASL 认证机制

func WithSASLPassword

func WithSASLPassword(password string) KafkaConfigOption

WithSASLPassword 配置 SASL 密码

func WithSASLUsername

func WithSASLUsername(username string) KafkaConfigOption

WithSASLUsername 配置 SASL 用户名

func WithWriteTimeout

func WithWriteTimeout(t uint32) KafkaConfigOption

WithConnMaxLifetime 配置 Kafka 最大写入超时时间

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL