kafka

package
v2.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

KafkaMessageTypeEnum noinspection all

Functions

func ConsumerGroupHandler

func ConsumerGroupHandler(client sarama.ConsumerGroup, topics []string, ctx context.Context, handler *ConsumerGroupHander)

ConsumerGroupHandler 消费者组消费,通用处理器

Types

type ConsumerConfig

type ConsumerConfig struct {
	Topic    string        // 消费主题
	Offset   int64         // -1 OffsetNewest -2 OffsetOldest
	Interval time.Duration // 分区刷新周期

	// 自动重试配置
	MaxRetry   int           // 最大重试次数
	MinBackoff time.Duration // 最小避让时间
	MaxBackoff time.Duration // 最大避让时间
}

type ConsumerGroupHander

type ConsumerGroupHander struct {
	Msg chan *sarama.ConsumerMessage
}

ConsumerGroupHander 消费者组处理器 noinspection all

func (*ConsumerGroupHander) Cleanup

func (*ConsumerGroupHander) ConsumeClaim

func (*ConsumerGroupHander) Setup

type ConsumerHandler

type ConsumerHandler struct {
	// contains filtered or unexported fields
}

func NewConsumerHandler

func NewConsumerHandler(ctx context.Context, consumer sarama.Consumer, opt *ConsumerConfig) (*ConsumerHandler, error)

func (*ConsumerHandler) IsPause

func (c *ConsumerHandler) IsPause() bool

func (*ConsumerHandler) Pause

func (c *ConsumerHandler) Pause()

Pause 暂停消费

func (*ConsumerHandler) Resume

func (c *ConsumerHandler) Resume()

Resume 恢复消费

func (*ConsumerHandler) Run

func (c *ConsumerHandler) Run(onMessage onMessageFunc)

type KafkaConfig

type KafkaConfig struct {
	Addrs       []string      `yaml:"addrs" json:"addrs" ini:"addrs,omitempty"`
	Version     string        `yaml:"version" json:"version" ini:"version"` // kafka版本
	Sasl        bool          `yaml:"sasl" json:"sasl" ini:"sasl"`
	User        string        `yaml:"user" json:"user" ini:"user"`
	Password    string        `yaml:"password" json:"password" ini:"password"`
	Mechanism   string        `yaml:"mechanism" json:"mechanism" ini:"mechanism"`
	Offset      int64         `yaml:"offset" json:"offset" ini:"offset"`                // 默认从最新开始消费 -1 -2从最后
	MaxRetry    int           `yaml:"max_retry" json:"max_retry" ini:"max_retry"`       // 生产消息失败,默认重试3次
	Timeout     time.Duration `json:"timeout" yaml:"timeout" ini:"timeout"`             // 超时时间
	Compression bool          `json:"compression" yaml:"compression" ini:"compression"` // 发送消息是否开启压缩
	// 这里的kafka无复杂业务,可以用下方的相关配置
	ProducerMessage ProducerMessage `json:"producer_message" yaml:"producer_message" ini:"producer_message"`
	// 生产者配置
	//消费者配置
	GroupName string `json:"group_name" yaml:"group_name" ini:"group_name"`
}

noinspection all

func (KafkaConfig) GormDBDataType

func (KafkaConfig) GormDBDataType(db *gorm.DB, field *schema.Field) string

noinspection all

func (KafkaConfig) GormDataType

func (kc KafkaConfig) GormDataType() string

noinspection all

func (*KafkaConfig) NewConsumerClient

func (kc *KafkaConfig) NewConsumerClient() (sarama.Consumer, error)

NewConsumerClient 创建消费者客户端 noinspection all

func (*KafkaConfig) NewConsumerGroupClient

func (kc *KafkaConfig) NewConsumerGroupClient() (sarama.ConsumerGroup, error)

NewConsumerGroupClient 创建消费者组客户端 noinspection all

func (*KafkaConfig) NewProducerAsyncProducer

func (kc *KafkaConfig) NewProducerAsyncProducer() (sarama.AsyncProducer, error)

NewProducerAsyncProducer 创建异步生产者客户端 noinspection all

func (*KafkaConfig) NewProducerSyncProducer

func (kc *KafkaConfig) NewProducerSyncProducer() (sarama.SyncProducer, error)

NewProducerSyncProducer 创建同步生产者客户端 noinspection all

func (*KafkaConfig) RemovePasswd

func (kc *KafkaConfig) RemovePasswd()

noinspection all

func (*KafkaConfig) Scan

func (kc *KafkaConfig) Scan(val interface{}) error

noinspection all

func (*KafkaConfig) SetInfo

func (kc *KafkaConfig) SetInfo(args ...any)

noinspection all

func (*KafkaConfig) Valid

func (kc *KafkaConfig) Valid() error

noinspection all

func (KafkaConfig) Value

func (kc KafkaConfig) Value() (driver.Value, error)

noinspection all

type ProducerMessage

type ProducerMessage struct {
	Topic  string            `json:"topic" yaml:"topic" ini:"topic"`
	Key    string            `json:"key" yaml:"key" ini:"key"`
	Header map[string]string `json:"header" yaml:"header" ini:"header"`
	Role   string            `json:"role" yaml:"role" ini:"role"` // 同步生产者 或者异步生产者
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL