kafka

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package kafka

@author: xwc1125 @date: 2021/3/23

Package kafka

@author: xwc1125 @date: 2021/3/24

Package kafka

@author: xwc1125 @date: 2021/3/19

Package kafka

@author: xwc1125 @date: 2021/3/19

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultConfig

func DefaultConfig() *sarama.Config

DefaultConfig ...

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer ...

func (*Consumer) Close

func (c *Consumer) Close() error

Close ...

func (*Consumer) ClosePartition

func (c *Consumer) ClosePartition(topic string, partition int32) error

ClosePartition ...

func (*Consumer) Consumer

func (c *Consumer) Consumer() sarama.Consumer

Consumer ...

func (*Consumer) ConsumerGroup

func (c *Consumer) ConsumerGroup() sarama.ConsumerGroup

ConsumerGroup ...

func (*Consumer) ConsumerGroupMessage

func (c *Consumer) ConsumerGroupMessage(ctx context.Context, topic []string, result chan *Result, feedback <-chan bool) error

func (*Consumer) ConsumerMessage

func (c *Consumer) ConsumerMessage(topic string, partition int32, result chan *Result) error

ConsumerMessage ...

type ConsumerHandler

type ConsumerHandler struct {
	IsLog    bool
	Ready    chan bool
	Result   chan *Result
	Feedback <-chan bool // 成功还是失败
}

ConsumerHandler represents a Sarama consumer group consumer

func (*ConsumerHandler) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*ConsumerHandler) ConsumeClaim

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*ConsumerHandler) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka ...

func New

func New(config *KafkaConfig) (*Kafka, error)

New ...

func (*Kafka) NewConsumer

func (k *Kafka) NewConsumer() (*Consumer, error)

NewConsumer ...

func (*Kafka) NewConsumerGroup

func (k *Kafka) NewConsumerGroup() (*Consumer, error)

NewConsumerWithGroup ...

func (*Kafka) NewConsumerGroupConfig

func (k *Kafka) NewConsumerGroupConfig(config *sarama.Config) (*Consumer, error)

func (*Kafka) NewConsumerWithConfig

func (k *Kafka) NewConsumerWithConfig(kafkaConfig *sarama.Config) (*Consumer, error)

NewConsumerWithConfig ...

func (*Kafka) NewProducer

func (k *Kafka) NewProducer() (*Producer, error)

NewProducer ...

func (*Kafka) NewProducerWithConfig

func (k *Kafka) NewProducerWithConfig(kafkaConfig *sarama.Config) (*Producer, error)

NewProducerWithConfig ...

type KafkaConfig

type KafkaConfig struct {
	IsAsync      bool     `json:"is_async" mapstructure:"is_async"`           // 是否为异步
	Addrs        []string `json:"addrs" mapstructure:"addrs"`                 // kafka服务的地址
	GroupId      string   `json:"group_id" mapstructure:"group_id"`           // groupId
	Topic        []string `json:"topic" mapstructure:"topic"`                 // topic
	IsLog        bool     `json:"is_log" mapstructure:"is_log"`               // 是否打印日志
	KafkaVersion string   `json:"kafka_version" mapstructure:"kafka_version"` // kafka版本
	Strategy     string   `json:"strategy" mapstructure:"strategy"`           // group Rebalance策略
	SASLEnable   bool     `json:"sasl_enable" mapstructure:"sasl_enable"`     // 是否开启SASL
	SASLUser     string   `json:"sasl_user" mapstructure:"sasl_user"`         // SASL用户名
	SASLPassword string   `json:"sasl_password" mapstructure:"sasl_password"` // SASL密码
}

KafkaConfig kafka配置

type Message

type Message struct {
	Topic string
	Key   []byte
	Value []byte
	// Headers   []sarama.RecordHeader
	Metadata  interface{}
	Offset    int64
	Partition int32
	Timestamp time.Time
}

func (*Message) Format

func (m *Message) Format(s fmt.State, c rune)

func (*Message) MarshalJSON

func (m *Message) MarshalJSON() ([]byte, error)

func (*Message) MarshalText

func (m *Message) MarshalText() ([]byte, error)

func (*Message) String

func (m *Message) String() string

func (*Message) TerminalString

func (m *Message) TerminalString() string

func (*Message) UnmarshalJSON

func (m *Message) UnmarshalJSON(input []byte) error

func (*Message) UnmarshalText

func (m *Message) UnmarshalText(input []byte) error

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer ...

func (*Producer) Close

func (p *Producer) Close() error

Close ...

func (*Producer) ProducerMessage

func (p *Producer) ProducerMessage(topic, key, value string, partition int32, result chan *Result) error

ProducerMessage ...

type Result

type Result struct {
	Error error
	Data  Message
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL