Documentation
¶
Index ¶
- Variables
- func ConsumerGroupHandler(client sarama.ConsumerGroup, topics []string, ctx context.Context, ...)
- type ConsumerConfig
- type ConsumerGroupHander
- type ConsumerHandler
- type KafkaConfig
- func (KafkaConfig) GormDBDataType(db *gorm.DB, field *schema.Field) string
- func (kc KafkaConfig) GormDataType() string
- func (kc *KafkaConfig) NewConsumerClient() (sarama.Consumer, error)
- func (kc *KafkaConfig) NewConsumerGroupClient() (sarama.ConsumerGroup, error)
- func (kc *KafkaConfig) NewProducerAsyncProducer() (sarama.AsyncProducer, error)
- func (kc *KafkaConfig) NewProducerSyncProducer() (sarama.SyncProducer, error)
- func (kc *KafkaConfig) RemovePasswd()
- func (kc *KafkaConfig) Scan(val interface{}) error
- func (kc *KafkaConfig) SetInfo(args ...any)
- func (kc *KafkaConfig) Valid() error
- func (kc KafkaConfig) Value() (driver.Value, error)
- type ProducerMessage
Constants ¶
This section is empty.
Variables ¶
View Source
var KafkaMessageTypeEnum = map[string]string{ sarama.SASLTypePlaintext: sarama.SASLTypePlaintext, sarama.SASLTypeSCRAMSHA256: sarama.SASLTypeSCRAMSHA256, sarama.SASLTypeSCRAMSHA512: sarama.SASLTypeSCRAMSHA512, sarama.SASLTypeGSSAPI: sarama.SASLTypeGSSAPI, sarama.SASLTypeOAuth: sarama.SASLTypeOAuth, }
KafkaMessageTypeEnum noinspection all
Functions ¶
func ConsumerGroupHandler ¶
func ConsumerGroupHandler(client sarama.ConsumerGroup, topics []string, ctx context.Context, handler *ConsumerGroupHander)
ConsumerGroupHandler 消费者组消费,通用处理器
Types ¶
type ConsumerConfig ¶
type ConsumerGroupHander ¶
type ConsumerGroupHander struct {
Msg chan *sarama.ConsumerMessage
}
ConsumerGroupHander 消费者组处理器 noinspection all
func (*ConsumerGroupHander) Cleanup ¶
func (h *ConsumerGroupHander) Cleanup(session sarama.ConsumerGroupSession) error
func (*ConsumerGroupHander) ConsumeClaim ¶
func (h *ConsumerGroupHander) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*ConsumerGroupHander) Setup ¶
func (h *ConsumerGroupHander) Setup(session sarama.ConsumerGroupSession) error
type ConsumerHandler ¶
type ConsumerHandler struct {
// contains filtered or unexported fields
}
func NewConsumerHandler ¶
func NewConsumerHandler(ctx context.Context, consumer sarama.Consumer, opt *ConsumerConfig) (*ConsumerHandler, error)
func (*ConsumerHandler) IsPause ¶
func (c *ConsumerHandler) IsPause() bool
func (*ConsumerHandler) Run ¶
func (c *ConsumerHandler) Run(onMessage onMessageFunc)
type KafkaConfig ¶
type KafkaConfig struct {
Addrs []string `yaml:"addrs" json:"addrs" ini:"addrs,omitempty"`
Version string `yaml:"version" json:"version" ini:"version"` // kafka版本
Sasl bool `yaml:"sasl" json:"sasl" ini:"sasl"`
User string `yaml:"user" json:"user" ini:"user"`
Password string `yaml:"password" json:"password" ini:"password"`
Mechanism string `yaml:"mechanism" json:"mechanism" ini:"mechanism"`
Offset int64 `yaml:"offset" json:"offset" ini:"offset"` // 默认从最新开始消费 -1 -2从最后
MaxRetry int `yaml:"max_retry" json:"max_retry" ini:"max_retry"` // 生产消息失败,默认重试3次
Timeout time.Duration `json:"timeout" yaml:"timeout" ini:"timeout"` // 超时时间
Compression bool `json:"compression" yaml:"compression" ini:"compression"` // 发送消息是否开启压缩
// 这里的kafka无复杂业务,可以用下方的相关配置
ProducerMessage ProducerMessage `json:"producer_message" yaml:"producer_message" ini:"producer_message"`
// 生产者配置
//消费者配置
GroupName string `json:"group_name" yaml:"group_name" ini:"group_name"`
}
noinspection all
func (KafkaConfig) GormDBDataType ¶
noinspection all
func (*KafkaConfig) NewConsumerClient ¶
func (kc *KafkaConfig) NewConsumerClient() (sarama.Consumer, error)
NewConsumerClient 创建消费者客户端 noinspection all
func (*KafkaConfig) NewConsumerGroupClient ¶
func (kc *KafkaConfig) NewConsumerGroupClient() (sarama.ConsumerGroup, error)
NewConsumerGroupClient 创建消费者组客户端 noinspection all
func (*KafkaConfig) NewProducerAsyncProducer ¶
func (kc *KafkaConfig) NewProducerAsyncProducer() (sarama.AsyncProducer, error)
NewProducerAsyncProducer 创建异步生产者客户端 noinspection all
func (*KafkaConfig) NewProducerSyncProducer ¶
func (kc *KafkaConfig) NewProducerSyncProducer() (sarama.SyncProducer, error)
NewProducerSyncProducer 创建同步生产者客户端 noinspection all
Click to show internal directories.
Click to hide internal directories.