Documentation
¶
Index ¶
- func NewConsumerGroup(opts ...KafkaConfigOption) (sarama.ConsumerGroup, func(), error)
- func NewProducer(opts ...KafkaConfigOption) (sarama.AsyncProducer, func(), error)
- type KafkaConfig
- type KafkaConfigOption
- func WithBootstrapServers(bootstrapServers string) KafkaConfigOption
- func WithDialTimeout(t uint32) KafkaConfigOption
- func WithGroupID(groupID string) KafkaConfigOption
- func WithReadTimeout(t uint32) KafkaConfigOption
- func WithSASLMechanism(mechanism string) KafkaConfigOption
- func WithSASLPassword(password string) KafkaConfigOption
- func WithSASLUsername(username string) KafkaConfigOption
- func WithWriteTimeout(t uint32) KafkaConfigOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewConsumerGroup ¶
func NewConsumerGroup(opts ...KafkaConfigOption) (sarama.ConsumerGroup, func(), error)
func NewProducer ¶
func NewProducer(opts ...KafkaConfigOption) (sarama.AsyncProducer, func(), error)
Types ¶
type KafkaConfig ¶
type KafkaConfig struct {
// Kafka Broker 地址列表
BootstrapServers []string `json:"bootstrap_servers" yaml:"bootstrap_servers" env:"DEFAULT_KAFKA_BOOTSTRAP_SERVERS"`
// Kafka Consumer Group Id
GroupId string `json:"group_id" yaml:"group_id" env:"DEFAULT_KAFKA_GROUP_ID"`
// SASL 用户名(如果需要使用 SASL 认证)
SASLUsername string `json:"sasl_username" yaml:"sasl_username" env:"DEFAULT_KAFKA_SASL_USERNAME"`
// SASL 密码(如果需要使用 SASL 认证)
SASLPassword string `json:"sasl_password" yaml:"sasl_password" env:"DEFAULT_KAFKA_SASL_PASSWORD"`
// SASL 认证机制(如 PLAIN)
SASLMechanism string `json:"sasl_mechanism" yaml:"sasl_mechanism" env:"DEFAULT_KAFKA_SASL_MECHANISM"`
// Kafka 最大请求空闲时间(单位:秒)
MaxOpenRequests uint32 `json:"max_open_requests" yaml:"max_open_requests" env:"DEFAULT_KAFKA_MAX_OPEN_REQUESTS"`
// DialTimeout 最大连接超时时间(单位:秒)
DialTimeout uint32 `json:"dial_read_timeout" yaml:"dial_read_timeout" env:"DEFAULT_KAFKA_DIAL_READ_TIMEOUT"`
// ReadTmout 最大读取超时时间(单位:秒)
ReadTimeout uint32 `json:"read_timeout" yaml:"read_timeout" env:"DEFAULT_KAFKA_READ_TIMEOUT"`
// WriteTimeout 最大写入超时时间(单位:秒)
WriteTimeout uint32 `json:"write_timeout" yaml:"write_timeout" env:"DEFAULT_KAFKA_WRITE_TIMEOUT"`
// OpenTelemetry 跟踪提供者
TracerProvider trace.TracerProvider `json:"-"`
// OpenTelemetry 度量提供者
MeterProvider metric.MeterProvider `json:"-"`
// OpenTelemetry 传播器
Propagator propagation.TextMapPropagator `json:"-"`
}
KafkaConfig 包含 Kafka 连接配置
type KafkaConfigOption ¶
type KafkaConfigOption func(c *KafkaConfig)
KafkaConfigOption 用于配置 Kafka 配置项的函数类型
func WithBootstrapServers ¶
func WithBootstrapServers(bootstrapServers string) KafkaConfigOption
WithBrokerAddrs 配置 Kafka BootstrapServer 地址列表
func WithDialTimeout ¶
func WithDialTimeout(t uint32) KafkaConfigOption
WithMaxOpenConns 配置 Kafka 最大连接超时时间
func WithGroupID ¶
func WithGroupID(groupID string) KafkaConfigOption
WithGroupID 配置 Kafka Consumer Group ID
func WithReadTimeout ¶
func WithReadTimeout(t uint32) KafkaConfigOption
WithMaxIdleConns 配置 Kafka 最大读取超时时间
func WithSASLMechanism ¶
func WithSASLMechanism(mechanism string) KafkaConfigOption
WithSASLMechanism 配置 SASL 认证机制
func WithSASLPassword ¶
func WithSASLPassword(password string) KafkaConfigOption
WithSASLPassword 配置 SASL 密码
func WithSASLUsername ¶
func WithSASLUsername(username string) KafkaConfigOption
WithSASLUsername 配置 SASL 用户名
func WithWriteTimeout ¶
func WithWriteTimeout(t uint32) KafkaConfigOption
WithConnMaxLifetime 配置 Kafka 最大写入超时时间
Click to show internal directories.
Click to hide internal directories.