Documentation
¶
Overview ¶
Package kafka provides a Consumer and a Producer for basic Kafka operations.
It relies on https://github.com/confluentinc/confluent-kafka-go which is a Go wrapper on top of https://github.com/edenhill/librdkafka. This provides a reliable implementation, fully supported by the community, but also from Confluent, the creators of Kafka.
Index ¶
- Constants
- Variables
- type Config
- type ConfigOpt
- type ConfluentConsumer
- func (r *ConfluentConsumer) Close() error
- func (r *ConfluentConsumer) CommitMessage(msg *kafkalib.Message) error
- func (r *ConfluentConsumer) FetchMessage(ctx context.Context) (*kafkalib.Message, error)
- func (r *ConfluentConsumer) Seek(offset int64, timeout time.Duration) error
- func (r *ConfluentConsumer) SetInitialOffset(offset int64) error
- type ConfluentProducer
- type Consumer
- type ConsumerConfig
- type OffsetAwareConsumer
- type Producer
- type ProducerConfig
Examples ¶
Constants ¶
const ( AuthTypePlain = "plain" AuthTypeSCRAM256 = "scram-sha256" AuthTypeSCRAM512 = "scram-sha512" )
Supported auth types
const ( // DefaultProducerDeliveryTimeoutMs configures `delivery.timeout.ms`. The timeout for the producer from sending a message until is considered as delivered. // This value should always be greater than DefaultProducerBufferMaxMs. // The default value in librdkafka is `300000`, but we reduced it to `5000`. DefaultProducerDeliveryTimeoutMs = 5000 // DefaultProducerBufferMaxMs configures `queue.buffering.max.ms`. The max amount of ms the buffer will wait before sending it to kafka. // This value should always be lower than DefaultProducerDeliveryTimeoutMs. // The default value in librdkafka is `5`. DefaultProducerBufferMaxMs = 5 // DefaultProducerBufferMaxMessages configures `queue.buffering.max.messages`. The max number of messages in buffer before sending to Kafka. // The default value in librdkafka is `100000`. DefaultProducerBufferMaxMessages = 100000 )
const DefaultLogLevel = logrus.ErrorLevel
DefaultLogLevel is the log level Kafka producers/consumers will use if non set.
Variables ¶
var ErrSeekTimedOut = errors.New("Kafka Seek timed out. Please try again.")
ErrSeekTimedOut is the error returned when a consumer timed out during Seek.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
Brokers []string `json:"brokers"`
Topic string `json:"topic"`
Producer ProducerConfig `json:"producer"`
Consumer ConsumerConfig `json:"consumer"`
AuthType string `json:"auth" split_words:"true"`
User string `json:"user"`
Password string `json:"password"`
CAPEMFile string `json:"ca_pem_file"`
LogLevel string `json:"log_level" split_words:"true"`
}
Config holds all the configuration for this package.
Example (Auth) ¶
_ = Config{
// Append the following to your configuration (Consumer or Producer)
AuthType: AuthTypeSCRAM256,
User: "my-user",
Password: "my-secret-password",
CAPEMFile: "/etc/certificate.pem",
}
type ConfigOpt ¶
ConfigOpt configures Kafka consumers and producers.
func WithConsumerGroupID ¶
WithConsumerGroupID sets the Consumer consumer group ID.
func WithLogger ¶
func WithLogger(ctx context.Context, log logrus.FieldLogger) ConfigOpt
WithLogger adds a logger to a Kafka consumer or producer.
type ConfluentConsumer ¶
type ConfluentConsumer struct {
// contains filtered or unexported fields
}
ConfluentConsumer implements Consumer interface.
func NewConsumer ¶
func NewConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (*ConfluentConsumer, error)
NewConsumer creates a ConfluentConsumer based on config.
func (*ConfluentConsumer) Close ¶
func (r *ConfluentConsumer) Close() error
Close should be called when no more readings will be performed.
func (*ConfluentConsumer) CommitMessage ¶
func (r *ConfluentConsumer) CommitMessage(msg *kafkalib.Message) error
CommitMessage implements Consumer interface.
func (*ConfluentConsumer) FetchMessage ¶
FetchMessage implements Consumer interface.
func (*ConfluentConsumer) Seek ¶
func (r *ConfluentConsumer) Seek(offset int64, timeout time.Duration) error
Seek implements OffsetAwareConsumer interface.
func (*ConfluentConsumer) SetInitialOffset ¶
func (r *ConfluentConsumer) SetInitialOffset(offset int64) error
SetInitialOffset implements OffsetAwareConsumer interface.
type ConfluentProducer ¶
type ConfluentProducer struct {
// contains filtered or unexported fields
}
ConfluentProducer implements Producer interface.
func NewProducer ¶
func NewProducer(conf Config, opts ...ConfigOpt) (w *ConfluentProducer, err error)
NewProducer creates a ConfluentProducer based on config.
func (ConfluentProducer) Close ¶
func (w ConfluentProducer) Close() error
Close should be called when no more writes will be performed.
type Consumer ¶
type Consumer interface {
io.Closer
// FetchMessage fetches one message, if there is any available at the current offset.
FetchMessage(ctx context.Context) (*kafkalib.Message, error)
// CommitMessage commits the offset of a given message.
CommitMessage(msg *kafkalib.Message) error
}
Consumer reads messages from Kafka.
Example ¶
conf := Config{
Topic: "example-topic",
Brokers: []string{"localhost:9092"},
Consumer: ConsumerConfig{
GroupID: "example-group",
},
}
log := logrus.New()
c, err := NewConsumer(log, conf)
if err != nil {
log.Fatal(err)
}
defer c.Close()
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
// Consider implementing a retry mechanism.
for {
// 1. Fetch the message.
msg, err := c.FetchMessage(ctx)
if err != nil {
log.WithError(err).Fatal("error fetching message")
}
log.WithField("msg", msg.String()).Debug("Msg got fetched")
// 2. Do whatever you need to do with the msg.
// 3. Then commit the message.
if err := c.CommitMessage(msg); err != nil {
log.WithError(err).Fatal("error commiting message")
}
}
type ConsumerConfig ¶
type ConsumerConfig struct {
GroupID string `json:"group_id" split_words:"true"`
}
ConsumerConfig holds the specific configuration for a consumer.
func (ConsumerConfig) Apply ¶
func (c ConsumerConfig) Apply(kafkaConf *kafkalib.ConfigMap)
Apply applies the specific configuration for a consumer.
type OffsetAwareConsumer ¶
type OffsetAwareConsumer interface {
Consumer
// SetInitialOffset resets the current offset to the given one.
// Used for setting the initial offset a consumer should start consuming from.
// Should be called before start consuming messages.
SetInitialOffset(offset int64) error
// Seek seeks the assigned topic partitions to the given offset.
// Seek() may only be used for partitions already being consumed.
Seek(offset int64, timeout time.Duration) error
}
OffsetAwareConsumer is a Consumer that can reset its offset.
type Producer ¶
Producer produces messages into Kafka.
Example ¶
conf := Config{
Brokers: []string{"localhost:9092"},
}
log := logrus.New()
p, err := NewProducer(conf)
if err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
topic := "example-topic"
msg := &kafkalib.Message{
TopicPartition: kafkalib.TopicPartition{Topic: &topic},
Key: []byte("example"),
Value: []byte("Hello World!"),
Timestamp: time.Now(),
}
if err := p.Produce(ctx, msg); err != nil {
log.WithError(err).Fatal("error producing message")
}
type ProducerConfig ¶
type ProducerConfig struct {
FlushPeriod time.Duration `json:"flush_period" split_words:"true"`
BatchSize int `json:"batch_size" split_words:"true"`
DeliveryTimeout time.Duration `json:"delivery_timeout" split_words:"true"`
}
ProducerConfig holds the specific configuration for a producer.
func (ProducerConfig) Apply ¶
func (c ProducerConfig) Apply(kafkaConf *kafkalib.ConfigMap)
Apply applies the specific configuration for a producer.