Documentation
¶
Index ¶
- Constants
- func Close()
- func SetupConsumer(opt *ConsumerConfig) (err error)
- func SetupProducer(opt *ProducerConfig) (err error)
- type ByteEncoder
- type Consumer
- type ConsumerConfig
- type ConsumerError
- type ConsumerErrorHandler
- type ConsumerMessage
- type ConsumerMessageHandler
- type Producer
- type ProducerConfig
- type ProducerError
- type ProducerErrorHandler
- type ProducerMessage
- type ProducerMessageHandler
- type StringEncoder
Constants ¶
View Source
const ( ACK_BEFORE_AUTO = 0 ACK_AFTER_NOERROR = 1 ACK_AFTER_NOMATTER = 2 )
Variables ¶
This section is empty.
Functions ¶
func SetupConsumer ¶ added in v1.1.19
func SetupConsumer(opt *ConsumerConfig) (err error)
func SetupProducer ¶ added in v1.1.19
func SetupProducer(opt *ProducerConfig) (err error)
Types ¶
type ByteEncoder ¶ added in v1.1.19
type ByteEncoder = sarama.ByteEncoder
type Consumer ¶ added in v1.1.19
type Consumer interface {
Close() error
// blocking to consume the messages
Consume(topics string, mh ConsumerMessageHandler, eh ConsumerErrorHandler) error
ConsumeM(topics []string, mh ConsumerMessageHandler, eh ConsumerErrorHandler) error
}
func GetConsumer ¶ added in v1.1.19
type ConsumerConfig ¶ added in v1.1.19
type ConsumerConfig struct {
Key string
Address []string // kafka地址
Group string // groupId
Offset int64
Ack int // ack类型
User string //username and password for SASL/PLAIN or SASL/SCRAM authentication
Password string
DialTimeout time.Duration // How long to wait for the initial connection.
ReadTimeout time.Duration // How long to wait for a response.
WriteTimeout time.Duration // How long to wait for a transmit.
KeepAlive time.Duration
Version *sarama.KafkaVersion // kafka version
}
type ConsumerError ¶ added in v1.1.19
type ConsumerError = sarama.ConsumerError
type ConsumerErrorHandler ¶ added in v1.1.19
type ConsumerErrorHandler func(err error)
type ConsumerMessage ¶ added in v1.1.19
type ConsumerMessage = sarama.ConsumerMessage
type ConsumerMessageHandler ¶ added in v1.1.19
type ConsumerMessageHandler func(msg *ConsumerMessage) error
type Producer ¶ added in v1.1.19
type Producer interface {
Close() error
Produce(msgs ...*ProducerMessage) error
AsyncHandle(mh ProducerMessageHandler, eh ProducerErrorHandler) // 必须设置 asyncReturnSuccess 或 asyncReturnError
}
func GetProducer ¶ added in v1.1.19
type ProducerConfig ¶ added in v1.1.19
type ProducerError ¶ added in v1.1.19
type ProducerError = sarama.ProducerError
type ProducerErrorHandler ¶ added in v1.1.19
type ProducerErrorHandler func(err *ProducerError)
type ProducerMessage ¶ added in v1.1.19
type ProducerMessage = sarama.ProducerMessage
type ProducerMessageHandler ¶ added in v1.1.19
type ProducerMessageHandler func(msg *ProducerMessage)
type StringEncoder ¶ added in v1.1.19
type StringEncoder = sarama.StringEncoder
Click to show internal directories.
Click to hide internal directories.