Documentation
¶
Overview ¶
Package kafka
@author: xwc1125 @date: 2021/3/23
Package kafka ¶
@author: xwc1125 @date: 2021/3/24
Package kafka ¶
@author: xwc1125 @date: 2021/3/19
Package kafka ¶
@author: xwc1125 @date: 2021/3/19
Index ¶
- func DefaultConfig() *sarama.Config
- type Consumer
- func (c *Consumer) Close() error
- func (c *Consumer) ClosePartition(topic string, partition int32) error
- func (c *Consumer) Consumer() sarama.Consumer
- func (c *Consumer) ConsumerGroup() sarama.ConsumerGroup
- func (c *Consumer) ConsumerGroupMessage(ctx context.Context, topic []string, result chan *Result, feedback <-chan bool) error
- func (c *Consumer) ConsumerMessage(topic string, partition int32, result chan *Result) error
- type ConsumerHandler
- type Kafka
- func (k *Kafka) NewConsumer() (*Consumer, error)
- func (k *Kafka) NewConsumerGroup() (*Consumer, error)
- func (k *Kafka) NewConsumerGroupConfig(config *sarama.Config) (*Consumer, error)
- func (k *Kafka) NewConsumerWithConfig(kafkaConfig *sarama.Config) (*Consumer, error)
- func (k *Kafka) NewProducer() (*Producer, error)
- func (k *Kafka) NewProducerWithConfig(kafkaConfig *sarama.Config) (*Producer, error)
- type KafkaConfig
- type Message
- func (m *Message) Format(s fmt.State, c rune)
- func (m *Message) MarshalJSON() ([]byte, error)
- func (m *Message) MarshalText() ([]byte, error)
- func (m *Message) String() string
- func (m *Message) TerminalString() string
- func (m *Message) UnmarshalJSON(input []byte) error
- func (m *Message) UnmarshalText(input []byte) error
- type Producer
- type Result
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer ...
func (*Consumer) ClosePartition ¶
ClosePartition ...
func (*Consumer) ConsumerGroup ¶
func (c *Consumer) ConsumerGroup() sarama.ConsumerGroup
ConsumerGroup ...
func (*Consumer) ConsumerGroupMessage ¶
type ConsumerHandler ¶
type ConsumerHandler struct {
IsLog bool
Ready chan bool
Result chan *Result
Feedback <-chan bool // 成功还是失败
}
ConsumerHandler represents a Sarama consumer group consumer
func (*ConsumerHandler) Cleanup ¶
func (h *ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ConsumerHandler) ConsumeClaim ¶
func (h *ConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*ConsumerHandler) Setup ¶
func (h *ConsumerHandler) Setup(sarama.ConsumerGroupSession) error
Setup is run at the beginning of a new session, before ConsumeClaim
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
Kafka ...
func (*Kafka) NewConsumerGroup ¶
NewConsumerWithGroup ...
func (*Kafka) NewConsumerGroupConfig ¶
func (*Kafka) NewConsumerWithConfig ¶
NewConsumerWithConfig ...
type KafkaConfig ¶
type KafkaConfig struct {
IsAsync bool `json:"is_async" mapstructure:"is_async"` // 是否为异步
Addrs []string `json:"addrs" mapstructure:"addrs"` // kafka服务的地址
GroupId string `json:"group_id" mapstructure:"group_id"` // groupId
Topic []string `json:"topic" mapstructure:"topic"` // topic
IsLog bool `json:"is_log" mapstructure:"is_log"` // 是否打印日志
KafkaVersion string `json:"kafka_version" mapstructure:"kafka_version"` // kafka版本
Strategy string `json:"strategy" mapstructure:"strategy"` // group Rebalance策略
SASLEnable bool `json:"sasl_enable" mapstructure:"sasl_enable"` // 是否开启SASL
SASLUser string `json:"sasl_user" mapstructure:"sasl_user"` // SASL用户名
SASLPassword string `json:"sasl_password" mapstructure:"sasl_password"` // SASL密码
}
KafkaConfig kafka配置
type Message ¶
type Message struct {
Topic string
Key []byte
Value []byte
// Headers []sarama.RecordHeader
Metadata interface{}
Offset int64
Partition int32
Timestamp time.Time
}
func (*Message) MarshalJSON ¶
func (*Message) MarshalText ¶
func (*Message) TerminalString ¶
func (*Message) UnmarshalJSON ¶
func (*Message) UnmarshalText ¶
Click to show internal directories.
Click to hide internal directories.