kafka

package
v0.39.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2020 License: MIT Imports: 9 Imported by: 0

README

github.com/netlify/netlify-commons/kafka

Package kafka provides a Consumer and a Producer for basic Kafka operations.

It relies on https://github.com/confluentinc/confluent-kafka-go which is a Go wrapper on top of https://github.com/edenhill/librdkafka. This provides a reliable implementation, fully supported by the community, but also from Confluent, the creators of Kafka.

Docs

Please find the generated godoc documentation including some examples in pkg.go.dev.

TODO

  • Support standalone consumers and not only consumers members of a consumer group.
  • Support seeking by timestamp (only offset is supported)
  • Integration tests

Documentation

Overview

Package kafka provides a Consumer and a Producer for basic Kafka operations.

It relies on https://github.com/confluentinc/confluent-kafka-go which is a Go wrapper on top of https://github.com/edenhill/librdkafka. This provides a reliable implementation, fully supported by the community, but also from Confluent, the creators of Kafka.

Index

Examples

Constants

View Source
const (
	AuthTypePlain    = "plain"
	AuthTypeSCRAM256 = "scram-sha256"
	AuthTypeSCRAM512 = "scram-sha512"
)

Supported auth types

View Source
const (
	// DefaultProducerDeliveryTimeoutMs configures `delivery.timeout.ms`. The timeout for the producer from sending a message until is considered as delivered.
	// This value should always be greater than DefaultProducerBufferMaxMs.
	// The default value in librdkafka is `300000`, but we reduced it to `5000`.
	DefaultProducerDeliveryTimeoutMs = 5000

	// DefaultProducerBufferMaxMs configures `queue.buffering.max.ms`. The max amount of ms the buffer will wait before sending it to kafka.
	// This value should always be lower than DefaultProducerDeliveryTimeoutMs.
	// The default value in librdkafka is `5`.
	DefaultProducerBufferMaxMs = 5

	// DefaultProducerBufferMaxMessages configures `queue.buffering.max.messages`. The max number of messages in buffer before sending to Kafka.
	// The default value in librdkafka is `100000`.
	DefaultProducerBufferMaxMessages = 100000
)
View Source
const DefaultLogLevel = logrus.ErrorLevel

DefaultLogLevel is the log level Kafka producers/consumers will use if non set.

Variables

View Source
var ErrSeekTimedOut = errors.New("Kafka Seek timed out. Please try again.")

ErrSeekTimedOut is the error returned when a consumer timed out during Seek.

Functions

This section is empty.

Types

type Config

type Config struct {
	Brokers   []string       `json:"brokers"`
	Topic     string         `json:"topic"`
	Producer  ProducerConfig `json:"producer"`
	Consumer  ConsumerConfig `json:"consumer"`
	AuthType  string         `json:"auth" split_words:"true"`
	User      string         `json:"user"`
	Password  string         `json:"password"`
	CAPEMFile string         `json:"ca_pem_file"`
	LogLevel  string         `json:"log_level" split_words:"true"`
}

Config holds all the configuration for this package.

Example (Auth)
_ = Config{
	// Append the following to your configuration (Consumer or Producer)
	AuthType:  AuthTypeSCRAM256,
	User:      "my-user",
	Password:  "my-secret-password",
	CAPEMFile: "/etc/certificate.pem",
}

type ConfigOpt

type ConfigOpt func(c *kafkalib.ConfigMap)

ConfigOpt configures Kafka consumers and producers.

func WithConsumerGroupID

func WithConsumerGroupID(groupID string) ConfigOpt

WithConsumerGroupID sets the Consumer consumer group ID.

func WithLogger

func WithLogger(ctx context.Context, log logrus.FieldLogger) ConfigOpt

WithLogger adds a logger to a Kafka consumer or producer.

type ConfluentConsumer

type ConfluentConsumer struct {
	// contains filtered or unexported fields
}

ConfluentConsumer implements Consumer interface.

func NewConsumer

func NewConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (*ConfluentConsumer, error)

NewConsumer creates a ConfluentConsumer based on config.

func (*ConfluentConsumer) Close

func (r *ConfluentConsumer) Close() error

Close should be called when no more readings will be performed.

func (*ConfluentConsumer) CommitMessage

func (r *ConfluentConsumer) CommitMessage(msg *kafkalib.Message) error

CommitMessage implements Consumer interface.

func (*ConfluentConsumer) FetchMessage

func (r *ConfluentConsumer) FetchMessage(ctx context.Context) (*kafkalib.Message, error)

FetchMessage implements Consumer interface.

func (*ConfluentConsumer) Seek

func (r *ConfluentConsumer) Seek(offset int64, timeout time.Duration) error

Seek implements OffsetAwareConsumer interface.

func (*ConfluentConsumer) SetInitialOffset

func (r *ConfluentConsumer) SetInitialOffset(offset int64) error

SetInitialOffset implements OffsetAwareConsumer interface.

type ConfluentProducer

type ConfluentProducer struct {
	// contains filtered or unexported fields
}

ConfluentProducer implements Producer interface.

func NewProducer

func NewProducer(conf Config, opts ...ConfigOpt) (w *ConfluentProducer, err error)

NewProducer creates a ConfluentProducer based on config.

func (ConfluentProducer) Close

func (w ConfluentProducer) Close() error

Close should be called when no more writes will be performed.

func (ConfluentProducer) Produce

func (w ConfluentProducer) Produce(ctx context.Context, msgs ...*kafkalib.Message) error

Produce produces messages into Kafka.

type Consumer

type Consumer interface {
	io.Closer

	// FetchMessage fetches one message, if there is any available at the current offset.
	FetchMessage(ctx context.Context) (*kafkalib.Message, error)

	// CommitMessage commits the offset of a given message.
	CommitMessage(msg *kafkalib.Message) error
}

Consumer reads messages from Kafka.

Example
conf := Config{
	Topic:   "example-topic",
	Brokers: []string{"localhost:9092"},
	Consumer: ConsumerConfig{
		GroupID: "example-group",
	},
}

log := logrus.New()
c, err := NewConsumer(log, conf)
if err != nil {
	log.Fatal(err)
}

defer c.Close()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

// Consider implementing a retry mechanism.
for {
	// 1. Fetch the message.
	msg, err := c.FetchMessage(ctx)
	if err != nil {
		log.WithError(err).Fatal("error fetching message")
	}

	log.WithField("msg", msg.String()).Debug("Msg got fetched")

	// 2. Do whatever you need to do with the msg.

	// 3. Then commit the message.
	if err := c.CommitMessage(msg); err != nil {
		log.WithError(err).Fatal("error commiting message")
	}
}

type ConsumerConfig

type ConsumerConfig struct {
	GroupID string `json:"group_id" split_words:"true"`
}

ConsumerConfig holds the specific configuration for a consumer.

func (ConsumerConfig) Apply

func (c ConsumerConfig) Apply(kafkaConf *kafkalib.ConfigMap)

Apply applies the specific configuration for a consumer.

type OffsetAwareConsumer

type OffsetAwareConsumer interface {
	Consumer

	// SetInitialOffset resets the current offset to the given one.
	// Used for setting the initial offset a consumer should start consuming from.
	// Should be called before start consuming messages.
	SetInitialOffset(offset int64) error

	// Seek seeks the assigned topic partitions to the given offset.
	// Seek() may only be used for partitions already being consumed.
	Seek(offset int64, timeout time.Duration) error
}

OffsetAwareConsumer is a Consumer that can reset its offset.

type Producer

type Producer interface {
	io.Closer
	Produce(ctx context.Context, msgs ...*kafkalib.Message) error
}

Producer produces messages into Kafka.

Example
conf := Config{
	Brokers: []string{"localhost:9092"},
}

log := logrus.New()
p, err := NewProducer(conf)
if err != nil {
	log.Fatal(err)
}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

topic := "example-topic"
msg := &kafkalib.Message{
	TopicPartition: kafkalib.TopicPartition{Topic: &topic},
	Key:            []byte("example"),
	Value:          []byte("Hello World!"),
	Timestamp:      time.Now(),
}
if err := p.Produce(ctx, msg); err != nil {
	log.WithError(err).Fatal("error producing message")
}

type ProducerConfig

type ProducerConfig struct {
	FlushPeriod     time.Duration `json:"flush_period" split_words:"true"`
	BatchSize       int           `json:"batch_size" split_words:"true"`
	DeliveryTimeout time.Duration `json:"delivery_timeout" split_words:"true"`
}

ProducerConfig holds the specific configuration for a producer.

func (ProducerConfig) Apply

func (c ProducerConfig) Apply(kafkaConf *kafkalib.ConfigMap)

Apply applies the specific configuration for a producer.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL