Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Compression ¶
type Compression string
const ( // 不压缩, 最快,但占用较多带宽, 低延迟应用 CompessionNone Compression = "none" // 压缩率高,但 CPu 开销大 日志/历史数据存储 CompessionGzip Compression = "gzip" // 速度块,CPU占用低,但压缩率一般,实时消息/高吞吐 CompessionSnappy Compression = "snappy" // 比 Snappy 更快,压缩率中等, 高吞吐/低延迟 CompessionLz4 Compression = "lz4" )
type Config ¶
type Config struct {
Addrs []string `mapstructure:"addrs"` // Kafka服务器地址列表
// 生产者配置
Producer ProducerConfig `mapstructure:"producer"`
// 消费者配置
Consumer ConsumerConfig `mapstructure:"consumer"`
}
Config Kafka服务器配置
type ConsumeHandlerFunc ¶
type ConsumeHandlerFunc func(sarama.ConsumerGroupSession, sarama.ConsumerGroupClaim) error
type ConsumerConfig ¶
type ConsumerConfig struct {
GroupID string `mapstructure:"group_id"` // 消费者组ID
GroupRebalance GroupRebalance `mapstructure:"group_rebalance" validate:"omitempty,oneof=range roundrobin sticky"` // 消费者组重新分配策略(当有消费者退出时) 默认 GroupRebalanceRange
OffsetInitial Offset `mapstructure:"offset_initial" validate:"omitempty,oneof=oldest newest"` // 初始offset 默认 OffsetNewest
// 最长等待时间,当 MinBytes 还未满足时,最多等待多久再返回数据 默认 250ms
MaxWaitTime types.Duration `mapstructure:"max_wait_time"` // 最大等待时间
// 最小拉取字节数,如果消息小于该值,Kafka 可能会等待 默认 1 B
MinBytes types.ByteSize `mapstructure:"min_bytes"`
// 最大拉取字节数, 一次 fetch 请求最多拉取多少数据 默认 1MB // 最小获取字节数
MaxBytes types.ByteSize `mapstructure:"max_bytes"` // 最大获取字节数
}
ConsumerConfig 消费者配置
type ConsumerInfo ¶
type ConsumerInfo struct {
Handler ConsumeHandlerFunc
Mode ConsumerMode
}
ConsumerInfo 消费者处理器信息
type ConsumerMode ¶
type ConsumerMode int
ConsumerMode 消费模式
const ( // ModeMutex 互斥模式:消息只被一个消费者消费 ModeMutex ConsumerMode = iota // ModeBroadcast 广播模式:所有消费者都能收到消息 ModeBroadcast )
type GroupRebalance ¶
type GroupRebalance string
const ( // GroupRebalanceRange 顺序消费:Range(适合处理有序数据,但可能不均衡) GroupRebalanceRange GroupRebalance = sarama.RangeBalanceStrategyName // GroupRebalanceRoundRobin 负载均衡:RoundRobin(确保消费者负载均衡) GroupRebalanceRoundRobin GroupRebalance = sarama.RoundRobinBalanceStrategyName // GroupRebalanceSticky 生产环境:Sticky(最稳定,减少 Kafka 重新平衡的影响) GroupRebalanceSticky GroupRebalance = sarama.StickyBalanceStrategyName )
type ProducerConfig ¶
type ProducerConfig struct {
// 重试配置
Retry RetryConfig `mapstructure:"retry"`
// 发送配置
RequiredAcks RequiredAcks `mapstructure:"required_acks" validate:"omitempty,oneof=none local all"` // 需要的ack数量
Timeout types.Duration `mapstructure:"timeout"` // 发送超时时间
Compression Compression `mapstructure:"compression" validate:"omitempty,oneof=none gzip snappy lz4"` // 压缩算法
}
ProducerConfig 生产者配置
type ProducerOpt ¶
type ProducerOpt func(o *ProducerOpts)
func WithKey ¶
func WithKey(key string) ProducerOpt
WithKey 影响消息的分区(相同key在同一分区 相同 key 的消息进入相同分区,保证消息顺序 默认情况下,Kafka 使用 key 的 哈希值 决定消息进入哪个分区 如果 key 为空,Kafka 会使用轮询(Round-Robin)方式选择分区
func WithPartition ¶
func WithPartition(partition int32) ProducerOpt
WithPartition 决定消息存储在那个分区 Kafka 存储数据的基本单位,每个 Topic 由多个 分区(Partition) 组成 影响 Kafka 的并行消费(多个分区可同时被不同消费者处理) 保证有序性(同一分区的消息按写入顺序消费)
func WithTimestamp ¶
func WithTimestamp(timestamp time.Time) ProducerOpt
WithTimestamp 消息产生时间 数据恢复(回溯特定时间点的数据)
type ProducerOpts ¶
type ProducerOpts struct {
// contains filtered or unexported fields
}
type ProducerServer ¶
type ProducerServer struct {
// contains filtered or unexported fields
}
func (*ProducerServer) SendMessage ¶
func (p *ProducerServer) SendMessage(ctx context.Context, topic string, value []byte, opts ...ProducerOpt) error
SendMessage 发送消息到指定主题
type RequiredAcks ¶
type RequiredAcks string
const ( // 消息可靠性较高,数据不会丢失,延迟较高,吞吐量较低 RequiredAcksAll RequiredAcks = "all" // 所有节点确认 // 性能较好,如果 Leader 崩溃,消息可能会丢失 RequiredAcksLocal RequiredAcks = "local" // Leader节点确认 // 最高吞吐量,最低延迟,如果 kafka 崩溃,消息可能会丢失 RequiredAcksNone RequiredAcks = "none" // 无需确认 )
type RetryConfig ¶
type RetryConfig struct {
Max int `mapstructure:"max" validate:"gte=0"` // 最大重试次数
Backoff types.Duration `mapstructure:"backoff"` // 重试间隔
}
RetryConfig 重试配置
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server 实现 server.Server 接口的 Kafka 服务器
type Subscriber ¶
type Subscriber struct {
*ConsumerInfo
Topic string
}
Click to show internal directories.
Click to hide internal directories.