Documentation
¶
Overview ¶
Package kafka provides a Consumer and a Producer for basic Kafka operations.
It relies on https://github.com/confluentinc/confluent-kafka-go which is a Go wrapper on top of https://github.com/edenhill/librdkafka. This provides a reliable implementation, fully supported by the community, but also from Confluent, the creators of Kafka.
Index ¶
- Constants
- Variables
- type Config
- type ConfigOpt
- type ConfluentConsumer
- func (r *ConfluentConsumer) Close() error
- func (r *ConfluentConsumer) CommitMessage(msg *kafkalib.Message) error
- func (r *ConfluentConsumer) FetchMessage(ctx context.Context) (*kafkalib.Message, error)
- func (r *ConfluentConsumer) Seek(offset int64, timeout time.Duration) error
- func (r *ConfluentConsumer) SetInitialOffset(offset int64) error
- type ConfluentProducer
- type Consumer
- type ConsumerConfig
- type OffsetAwareConsumer
- type Producer
- type ProducerConfig
Examples ¶
Constants ¶
const ( AuthTypePlain = "plain" AuthTypeSCRAM256 = "scram-sha256" AuthTypeSCRAM512 = "scram-sha512" )
Supported auth types
const ( // DefaultProducerDeliveryTimeoutMs configures `delivery.timeout.ms`. The timeout for the producer from sending a message until is considered as delivered. // This value should always be greater than DefaultProducerBufferMaxMs. // The default value in librdkafka is `300000`, but we reduced it to `5000`. DefaultProducerDeliveryTimeoutMs = 5000 // DefaultProducerBufferMaxMs configures `queue.buffering.max.ms`. The max amount of ms the buffer will wait before sending it to kafka. // This value should always be lower than DefaultProducerDeliveryTimeoutMs. // The default value in librdkafka is `5`. DefaultProducerBufferMaxMs = 5 // DefaultProducerBufferMaxMessages configures `queue.buffering.max.messages`. The max number of messages in buffer before sending to Kafka. // The default value in librdkafka is `100000`. DefaultProducerBufferMaxMessages = 100000 )
const DefaultLogLevel = logrus.ErrorLevel
DefaultLogLevel is the log level Kafka producers/consumers will use if non set.
Variables ¶
var ErrSeekTimedOut = errors.New("Kafka Seek timed out. Please try again.")
ErrSeekTimedOut is the error returned when a consumer timed out during Seek.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Brokers []string `json:"brokers"` Topic string `json:"topic"` Producer ProducerConfig `json:"producer"` Consumer ConsumerConfig `json:"consumer"` AuthType string `json:"auth" split_words:"true"` User string `json:"user"` Password string `json:"password"` CAPEMFile string `json:"ca_pem_file"` LogLevel string `json:"log_level" split_words:"true"` }
Config holds all the configuration for this package.
Example (Auth) ¶
_ = Config{ // Append the following to your configuration (Consumer or Producer) AuthType: AuthTypeSCRAM256, User: "my-user", Password: "my-secret-password", CAPEMFile: "/etc/certificate.pem", }
type ConfigOpt ¶
ConfigOpt configures Kafka consumers and producers.
func WithConsumerGroupID ¶
WithConsumerGroupID sets the Consumer consumer group ID.
func WithLogger ¶
func WithLogger(ctx context.Context, log logrus.FieldLogger) ConfigOpt
WithLogger adds a logger to a Kafka consumer or producer.
type ConfluentConsumer ¶
type ConfluentConsumer struct {
// contains filtered or unexported fields
}
ConfluentConsumer implements Consumer interface.
func NewConsumer ¶
func NewConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (*ConfluentConsumer, error)
NewConsumer creates a ConfluentConsumer based on config.
func (*ConfluentConsumer) Close ¶
func (r *ConfluentConsumer) Close() error
Close should be called when no more readings will be performed.
func (*ConfluentConsumer) CommitMessage ¶
func (r *ConfluentConsumer) CommitMessage(msg *kafkalib.Message) error
CommitMessage implements Consumer interface.
func (*ConfluentConsumer) FetchMessage ¶
FetchMessage implements Consumer interface.
func (*ConfluentConsumer) Seek ¶
func (r *ConfluentConsumer) Seek(offset int64, timeout time.Duration) error
Seek implements OffsetAwareConsumer interface.
func (*ConfluentConsumer) SetInitialOffset ¶
func (r *ConfluentConsumer) SetInitialOffset(offset int64) error
SetInitialOffset implements OffsetAwareConsumer interface.
type ConfluentProducer ¶
type ConfluentProducer struct {
// contains filtered or unexported fields
}
ConfluentProducer implements Producer interface.
func NewProducer ¶
func NewProducer(conf Config, opts ...ConfigOpt) (w *ConfluentProducer, err error)
NewProducer creates a ConfluentProducer based on config.
func (ConfluentProducer) Close ¶
func (w ConfluentProducer) Close() error
Close should be called when no more writes will be performed.
type Consumer ¶
type Consumer interface { io.Closer // FetchMessage fetches one message, if there is any available at the current offset. FetchMessage(ctx context.Context) (*kafkalib.Message, error) // CommitMessage commits the offset of a given message. CommitMessage(msg *kafkalib.Message) error }
Consumer reads messages from Kafka.
Example ¶
conf := Config{ Topic: "example-topic", Brokers: []string{"localhost:9092"}, Consumer: ConsumerConfig{ GroupID: "example-group", }, } log := logrus.New() c, err := NewConsumer(log, conf) if err != nil { log.Fatal(err) } defer c.Close() ctx, cancel := context.WithCancel(context.TODO()) defer cancel() // Consider implementing a retry mechanism. for { // 1. Fetch the message. msg, err := c.FetchMessage(ctx) if err != nil { log.WithError(err).Fatal("error fetching message") } log.WithField("msg", msg.String()).Debug("Msg got fetched") // 2. Do whatever you need to do with the msg. // 3. Then commit the message. if err := c.CommitMessage(msg); err != nil { log.WithError(err).Fatal("error commiting message") } }
type ConsumerConfig ¶
type ConsumerConfig struct {
GroupID string `json:"group_id" split_words:"true"`
}
ConsumerConfig holds the specific configuration for a consumer.
func (ConsumerConfig) Apply ¶
func (c ConsumerConfig) Apply(kafkaConf *kafkalib.ConfigMap)
Apply applies the specific configuration for a consumer.
type OffsetAwareConsumer ¶
type OffsetAwareConsumer interface { Consumer // SetInitialOffset resets the current offset to the given one. // Used for setting the initial offset a consumer should start consuming from. // Should be called before start consuming messages. SetInitialOffset(offset int64) error // Seek seeks the assigned topic partitions to the given offset. // Seek() may only be used for partitions already being consumed. Seek(offset int64, timeout time.Duration) error }
OffsetAwareConsumer is a Consumer that can reset its offset.
type Producer ¶
Producer produces messages into Kafka.
Example ¶
conf := Config{ Brokers: []string{"localhost:9092"}, } log := logrus.New() p, err := NewProducer(conf) if err != nil { log.Fatal(err) } ctx, cancel := context.WithCancel(context.TODO()) defer cancel() topic := "example-topic" msg := &kafkalib.Message{ TopicPartition: kafkalib.TopicPartition{Topic: &topic}, Key: []byte("example"), Value: []byte("Hello World!"), Timestamp: time.Now(), } if err := p.Produce(ctx, msg); err != nil { log.WithError(err).Fatal("error producing message") }
type ProducerConfig ¶
type ProducerConfig struct { FlushPeriod time.Duration `json:"flush_period" split_words:"true"` BatchSize int `json:"batch_size" split_words:"true"` DeliveryTimeout time.Duration `json:"delivery_timeout" split_words:"true"` }
ProducerConfig holds the specific configuration for a producer.
func (ProducerConfig) Apply ¶
func (c ProducerConfig) Apply(kafkaConf *kafkalib.ConfigMap)
Apply applies the specific configuration for a producer.