Documentation
¶
Index ¶
- Constants
- func ConsumerMessageToMap(m *sarama.ConsumerMessage) map[string]interface{}
- type ClientConfig
- type Config
- type ConsumerGroup
- func (s *ConsumerGroup) Cleanup(session sarama.ConsumerGroupSession) error
- func (s *ConsumerGroup) Close() error
- func (s *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (s *ConsumerGroup) SetHandler(h Handler)
- func (s *ConsumerGroup) Setup(session sarama.ConsumerGroupSession) error
- func (s *ConsumerGroup) Start()
- type ConsumerGroupConfig
- type Handler
- type Message
- type Producer
- type ProducerConfig
- type RetryConfig
Constants ¶
View Source
const ( PackageNameProducer = "fkafka.producer" PackageNameConsumerGroup = "fkafka.consumerGroup" )
View Source
const ( CodeOK = "OK" CodeError = "Error" )
Variables ¶
This section is empty.
Functions ¶
func ConsumerMessageToMap ¶
func ConsumerMessageToMap(m *sarama.ConsumerMessage) map[string]interface{}
Types ¶
type ClientConfig ¶
type Config ¶
type Config struct {
Debug bool
EnableAccessInterceptorReq bool // 是否开启记录 publish 消息,默认开启
EnableAccessInterceptorRes bool // 是否开启记录 consumer 消费消息, 默认开启
ClientConfig ClientConfig
ProducerConfig ProducerConfig
ConsumerGroupConfigs map[string]ConsumerGroupConfig
}
func DefaultConfig ¶
func DefaultConfig() *Config
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
func NewConsumerGroup ¶
func NewConsumerGroup(name string, config *Config, groupConfig *ConsumerGroupConfig) (*ConsumerGroup, error)
func (*ConsumerGroup) Cleanup ¶
func (s *ConsumerGroup) Cleanup(session sarama.ConsumerGroupSession) error
func (*ConsumerGroup) Close ¶
func (s *ConsumerGroup) Close() error
func (*ConsumerGroup) ConsumeClaim ¶
func (s *ConsumerGroup) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*ConsumerGroup) SetHandler ¶
func (s *ConsumerGroup) SetHandler(h Handler)
func (*ConsumerGroup) Setup ¶
func (s *ConsumerGroup) Setup(session sarama.ConsumerGroupSession) error
func (*ConsumerGroup) Start ¶
func (s *ConsumerGroup) Start()
type ConsumerGroupConfig ¶
type ConsumerGroupConfig struct {
Topics []string
GroupID string
InitialOffset string // 初始化 offset, oldest / newest, 默认 oldest
RetryConfig RetryConfig
}
type Message ¶
type Message struct {
Topic string
Key []byte
Value []byte
// The headers are key-value pairs that are transparently passed
// by Kafka between producers and consumers.
Headers []sarama.RecordHeader
// This field is used to hold arbitrary data you wish to include, so it
// will be available when receiving on the Successes and Errors channels.
// Sarama completely ignores this field and is only to be used for
// pass-through data.
Metadata interface{}
}
Message sarama.ProducerMessage for kafka publish
type ProducerConfig ¶
type ProducerConfig struct {
MaxMessageBytes int
}
type RetryConfig ¶
type RetryConfig struct {
MaxRetries int64 // consumer 消费重试次数, 默认 0 不重试
}
Click to show internal directories.
Click to hide internal directories.