kafka

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2025 License: BSD-3-Clause Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Compression

type Compression string
const (
	// 不压缩, 最快,但占用较多带宽, 低延迟应用
	CompessionNone Compression = "none"
	// 压缩率高,但 CPu 开销大 日志/历史数据存储
	CompessionGzip Compression = "gzip"
	// 速度块,CPU占用低,但压缩率一般,实时消息/高吞吐
	CompessionSnappy Compression = "snappy"
	// 比 Snappy 更快,压缩率中等, 高吞吐/低延迟
	CompessionLz4 Compression = "lz4"
)

type Config

type Config struct {
	Addrs []string `mapstructure:"addrs"` // Kafka服务器地址列表

	// 生产者配置
	Producer ProducerConfig `mapstructure:"producer"`
	// 消费者配置
	Consumer ConsumerConfig `mapstructure:"consumer"`
}

Config Kafka服务器配置

type ConsumerConfig

type ConsumerConfig struct {
	GroupID        string         `mapstructure:"group_id"`                                                           // 消费者组ID
	GroupRebalance GroupRebalance `mapstructure:"group_rebalance" validate:"omitempty,oneof=range roundrobin sticky"` // 消费者组重新分配策略(当有消费者退出时) 默认 GroupRebalanceRange
	OffsetInitial  Offset         `mapstructure:"offset_initial" validate:"omitempty,oneof=oldest newest"`            // 初始offset 默认 OffsetNewest
	// 最长等待时间,当 MinBytes 还未满足时,最多等待多久再返回数据 默认 250ms
	MaxWaitTime types.Duration `mapstructure:"max_wait_time"` // 最大等待时间
	// 最小拉取字节数,如果消息小于该值,Kafka 可能会等待 默认 1 B
	MinBytes types.ByteSize `mapstructure:"min_bytes"`
	// 最大拉取字节数, 一次 fetch 请求最多拉取多少数据  默认 1MB                                                        // 最小获取字节数
	MaxBytes types.ByteSize `mapstructure:"max_bytes"` // 最大获取字节数
}

ConsumerConfig 消费者配置

type ConsumerInfo

type ConsumerInfo struct {
	Handler ConsumeHandlerFunc
	Mode    ConsumerMode
}

ConsumerInfo 消费者处理器信息

type ConsumerMode

type ConsumerMode int

ConsumerMode 消费模式

const (
	// ModeMutex 互斥模式:消息只被一个消费者消费
	ModeMutex ConsumerMode = iota
	// ModeBroadcast 广播模式:所有消费者都能收到消息
	ModeBroadcast
)

type GroupRebalance

type GroupRebalance string
const (
	// GroupRebalanceRange 顺序消费:Range(适合处理有序数据,但可能不均衡)
	GroupRebalanceRange GroupRebalance = sarama.RangeBalanceStrategyName
	// GroupRebalanceRoundRobin 负载均衡:RoundRobin(确保消费者负载均衡)
	GroupRebalanceRoundRobin GroupRebalance = sarama.RoundRobinBalanceStrategyName
	// GroupRebalanceSticky 生产环境:Sticky(最稳定,减少 Kafka 重新平衡的影响)
	GroupRebalanceSticky GroupRebalance = sarama.StickyBalanceStrategyName
)

type Offset

type Offset string
const (
	OffsetOldest Offset = "oldest"
	OffsetNewest Offset = "newest"
)

type ProducerConfig

type ProducerConfig struct {
	// 重试配置
	Retry RetryConfig `mapstructure:"retry"`

	// 发送配置
	RequiredAcks RequiredAcks   `mapstructure:"required_acks" validate:"omitempty,oneof=none local all"`     // 需要的ack数量
	Timeout      types.Duration `mapstructure:"timeout"`                                                     // 发送超时时间
	Compression  Compression    `mapstructure:"compression" validate:"omitempty,oneof=none gzip snappy lz4"` // 压缩算法
}

ProducerConfig 生产者配置

type ProducerOpt

type ProducerOpt func(o *ProducerOpts)

func WithHeaders

func WithHeaders(headers map[string]string) ProducerOpt

WithHeaders 元数据(追踪,认证等)

func WithKey

func WithKey(key string) ProducerOpt

WithKey 影响消息的分区(相同key在同一分区 相同 key 的消息进入相同分区,保证消息顺序 默认情况下,Kafka 使用 key 的 哈希值 决定消息进入哪个分区 如果 key 为空,Kafka 会使用轮询(Round-Robin)方式选择分区

func WithOffset

func WithOffset(offset int64) ProducerOpt

WithOffset 影响消息的分区唯一标识 kafka 中的消息

func WithPartition

func WithPartition(partition int32) ProducerOpt

WithPartition 决定消息存储在那个分区 Kafka 存储数据的基本单位,每个 Topic 由多个 分区(Partition) 组成 影响 Kafka 的并行消费(多个分区可同时被不同消费者处理) 保证有序性(同一分区的消息按写入顺序消费)

func WithTimestamp

func WithTimestamp(timestamp time.Time) ProducerOpt

WithTimestamp 消息产生时间 数据恢复(回溯特定时间点的数据)

type ProducerOpts

type ProducerOpts struct {
	// contains filtered or unexported fields
}

type ProducerServer

type ProducerServer struct {
	// contains filtered or unexported fields
}

func (*ProducerServer) SendMessage

func (p *ProducerServer) SendMessage(ctx context.Context, topic string, value []byte, opts ...ProducerOpt) error

SendMessage 发送消息到指定主题

type RequiredAcks

type RequiredAcks string
const (
	// 消息可靠性较高,数据不会丢失,延迟较高,吞吐量较低
	RequiredAcksAll RequiredAcks = "all" // 所有节点确认
	// 性能较好,如果 Leader 崩溃,消息可能会丢失
	RequiredAcksLocal RequiredAcks = "local" // Leader节点确认
	// 最高吞吐量,最低延迟,如果 kafka 崩溃,消息可能会丢失
	RequiredAcksNone RequiredAcks = "none" // 无需确认
)

type RetryConfig

type RetryConfig struct {
	Max     int            `mapstructure:"max" validate:"gte=0"` // 最大重试次数
	Backoff types.Duration `mapstructure:"backoff"`              // 重试间隔
}

RetryConfig 重试配置

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server 实现 server.Server 接口的 Kafka 服务器

func New

func New(appName string, config Config, subs ...Subscriber) *Server

New 创建新的Kafka服务器实例

func (*Server) Get

func (s *Server) Get() any

Get 获取组件实例

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start 启动Kafka服务器

func (*Server) Stop

func (s *Server) Stop(ctx context.Context) error

Stop 停止Kafka服务器

type Subscriber

type Subscriber struct {
	*ConsumerInfo
	Topic string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL