kafka

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 25, 2025 License: MIT Imports: 19 Imported by: 0

README

ulog

making Kafka less Kafka-esque


blugnu/kafka

Features

  • Discoverable Configuration: Provides option functions for configuring Kafka clients, with separation of general, consumer and producer-specific configuration

  • Reduced Boilerplate: Provides a complete implementation of a Kafka consumer with a single function call, handling all the boilerplate code for you including offset commits, signal handling and graceful shutdown

  • Producer Retries: Provides a producer implementation that will retry sending messages to Kafka in the event of a timeout (configurable timeout and MaxRetries)

  • Mock Producer: Provides a mock producer implementation that can be used for testing that applications produce expected messages

Installation

go get github.com/blugnu/kafka

Usage

  • Establish a base configuration (e.g. bootstrap servers)
  • Configure a consumer and/or producer
  • Start the consumer

Example

package main

import (
  "context"
  "fmt"
  "os"
  "os/signal"
  "syscall"

  "github.com/blugnu/kafka"
)

func HandleEvent(ctx context.Context, msg *kafka.Message) error {
  fmt.Printf("received message: %s\n", string(msg.Value))
  return nil
}

func main() {
  // initialise a base configuration
  cfg := kafka.NewConfig(
    kafka.BootstrapServers("localhost:9092"),
  )

  // configure a consumer
  consumer, err := kafka.NewConsumer(cfg,
    kafka.ConsumerGroupID("my-group"),
    kafka.TopicHandler("event", kafka.HandlerFunc(HandleEvent)),
  )
  if err != nil {
    log.Fatal("error creating consumer:", err)
  }

  // start the consumer
  if err := consumer.Start(ctx); err != nil {
    log.Fatal(err)
  }

  if err := consumer.Wait(); err != nil {
    log.Fatal(err)
  }
}

Logging

To avoid importing a specific logging library or imposing a log format on applications, logs are written using internal log hooks. These are set to no-op by default.

To enable logging you must call the kafka.EnableLogs function, providing functions to log entries at different levels as required. The levels supported are those at which Kafka logs messages: DEBUG, INFO and ERROR.

The logging function at each level is passed a context.Context, the log message, and a kafka.LogInfo struct containing additional information about the log entry.

For example, the following might be used to initialise a blugnu/ulog context logger and enable logging of ERROR level Kafka logs to that logger; logs at all other levels are left as no-ops:

func logger(ctx context.Context) (context.Context, ulog.Logger, func()) {
    kafka.EnableLogs(&kafka.Loggers{
        Error: func(ctx context.Context, msg string, info: kafka.LogInfo) {
            log := ulog.FromContext(ctx)
            if info.Consumer != nil {
                log = log.WithField("consumer", *info.Consumer)
            }
            if info.Topic != nil {
                log = log.WithField("topic", *info.Topic)
            }
            log.Error(msg)
        },
    })

    log, cfn, err := ulog.NewLogger(
        ulog.WithLevel(ulog.DebugLevel),
    )
    if err != {
        log.Fatal(fmt.Errorf("error initialising logger: %v", err))
    }

    return ulog.ContextWithLogger(ctx, log), log, cfn
}

Default Logging

A nil argument may be passed to EnableLogs to enable default logging, which will write simple text logs using the standard log package.

Default logging is also emitted if a zero-value &Loggers{} is passed to EnableLogs, i.e. all functions set nil.

Note: Default logging is not recommended for production use.

Documentation

Index

Constants

View Source
const (
	OffsetResetEarliest = OffsetReset("earliest")
	OffsetResetLatest   = OffsetReset("latest")
	OffsetResetNone     = OffsetReset("none")
)
View Source
const PartitionAny = kafka.PartitionAny

PartitionAny is an alias for the confluent kafka.PartitionAny constant.

View Source
const ReadTimeoutNever = time.Duration(-1)

ReadTimeoutNever is a special value for the ReadTimeout option that indicates the consumer should never time out when reading messages.

Use this with caution; a consumer that never times out may be ejected from the consumer group if it does not receive a message within the configured session timeout.

Variables

View Source
var (
	ErrConsumerError        = errors.New("consumer error")
	ErrConsumerNotRunning   = errors.New("consumer is not running")
	ErrConsumerNotStarted   = errors.New("consumer has not been started")
	ErrInvalidOperation     = errors.New("invalid operation")
	ErrInvalidReadTimeout   = errors.New("invalid read timeout (must be kafka.ReadTimeoutNever or >= 0ms)")
	ErrInvalidSeekTimeout   = errors.New("invalid seek timeout (must be > 0ms)")
	ErrNoHandler            = errors.New("no handler for topic")
	ErrNoHandlersConfigured = errors.New("no handlers configured")
	ErrReprocessMessage     = errors.New("message will be reprocessed")
	ErrRetryLimitReached    = errors.New("retry limit reached")
	ErrTimeout              = errors.New("time out")
)

Functions

func EnableLogs

func EnableLogs(fns *Loggers)

EnableLogs sets the loggers to the functions provided in the supplied Loggers struct. If any of the loggers are nil, the default loggers are used for those log levels.

Default Loggers

If a nil Loggers is specified or if ALL of the loggers are nil, default loggers will be configured that log to the standard logger using log.Println. The default logger output is prefixed with the KAFKA:<LEVEL>, the log message and the LogInfo struct. The LogInfo struct is rendered using the .String(), e.g.:

KAFKA:ERROR commit failed consumer=group-id topic=message-topic partition=1 offset=2 key=[key-value] headers={key1:[value1] key2:[value2]} timestamp=2010-09-08T07:06:05Z error="error"

example

This example logs Info and Error logs to the standard logger. The Debug logger is set to a no-op function.

EnableLogs(&Loggers{
	Info: func(ctx context.Context, s string, i LogInfo) {
		log.Println("INFO", s, i.String())
	},
	Error: func(ctx context.Context, s string, i LogInfo) {
		log.Println("ERROR", s, i.String())
	},
})

Note that logging the LogInfo using the .String() method is not recommended; loggers should use the LogInfo struct to extract the information required for logging and render that to the application log in the appropriate form. e.g. using slog fields.

func Headers added in v0.2.0

func Headers(msg *kafka.Message) headers

Headers returns the headers of the specified message type-cast to a more convenient type for working with headers.

The returned value supports methods for querying the headers:

  • Get(key string) []byte
  • GetString(key string) string
  • HasKey(key string) bool

func IsKafkaError added in v0.3.0

func IsKafkaError(err error, code kafka.ErrorCode) bool

IsKafkaError checks whether the error is a kafka.Error and has the specified code. It returns true if the error is a kafka.Error and its code matches the provided code.

func NewMockProducer

func NewMockProducer[T comparable](ctx context.Context) (Producer, MockProducer[T])

NewMockProducer creates a new mock producer that can be used in tests. The function returns a Producer interface to be injected into dependents that require a Producer, and a MockProducer interface, used by a test to set and test expectations.

func UnmarshalJSON

func UnmarshalJSON[T any](msg *kafka.Message) (*T, error)

UnmarshalJSON unmarshals the value of a kafka message into a new instance of the specified type. If the unmarshalling fails, the function returns an error.

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

func NewConfig

func NewConfig(opts ...ConfigOption) (*Config, error)

NewConfig creates a new Config and applies the specified options.

type ConfigMap

type ConfigMap = kafka.ConfigMap

ConfigMap is an alias for the confluent kafka.ConfigMap type.

type ConfigOption

type ConfigOption func(*Config) error

func BootstrapServers

func BootstrapServers(servers ...string) ConfigOption

BootstrapServers sets the Kafka broker addresses.

Sets the 'bootstrap.servers' key in the config map.

The addresses should be a comma-separated list of host:port pairs.

func Broker

func Broker(name string, port int) ConfigOption

Broker provides an alternative way to set a broker host name and address.

Sets the 'bootstrap.servers' key in the config map.

Broker() is equivalent to using BootstrapServers() with a single server and will overwrite any existing value for 'bootstrap.servers'.

func ConfigKey

func ConfigKey(key string, value any) ConfigOption

ConfigKey sets a key-value pair in the ConfigMap contained by the Config.

This option may be used to set any key-value pair in the ConfigMap for which there is no specific option function provided.

func Cypher

func Cypher(handler CypherHandler) ConfigOption

Cypher identifies a handler for both encrypting and decrypting messages.

This option provides configuration for the Consumer and Producer implementation provided by this package; it does not affect the config map.

The handler must implement the CypherHandler interface:

type CypherHandler interface {
	Decrypt(context.Context, *kafka.Message) error
	Encrypt(context.Context, *kafka.Message) error
}

A consumer created with this config will use the Cypher.Decrypt method to decrypt messages before passing them to the appropriate topic handler.

A producer created with this config will use the Cypher.Encrypt method to encrypt messages before sending them to the Kafka broker.

Separate Configuration

Consumer and Producer specific options can also be used to configure encryption and decryption separately:

kafka.NewConsumer(ctx, cfg, kafka.MessageDecryption(decryptFn))
kafka.NewProducer(ctx, cfg, kafka.MessageEncryption(encryptFn))

Where decryptFn and encryptFn are functions with the signature required by the CypherHandler interface methods: func(context.Context, *kafka.Message) error

This may be preferred if an application only incorporates a Consumer or Producer, but not both or where an application produces different messages with differing encryption needs:

pBlue, err := kafka.NewProducer(ctx, cfg, kafka.MessageEncryption(blue))
pGreen, err := kafka.NewProducer(ctx, cfg, kafka.MessageEncryption(green))

func SASL

func SASL(mechanisms, username, password string) ConfigOption

SASL sets the SASL mechanisms, username, and password for the Kafka client.

Sets the following keys in the config map:

- sasl.mechanisms - sasl.username - sasl.password

The mechanisms should be a comma-separated list of mechanisms to use for authentication. The username and password are the credentials to use for authentication.

func SSLCALocation

func SSLCALocation(filepath string) ConfigOption

SSLCALocation sets the file path to the CA certificate for SSL connections.

Sets the 'ssl.ca.location' key in the config map.

func SecurityProtocol

func SecurityProtocol(protocol string) ConfigOption

SecurityProtocol sets the security protocol for the Kafka client.

Sets the 'security.protocol' key in the config map.

The protocol should be one of the following values:

- PLAINTEXT - SSL - SASL_PLAINTEXT - SASL_SSL

type ConfigurationError

type ConfigurationError struct {
	// contains filtered or unexported fields
}

ConfigurationError is an error that indicates a configuration error

func (ConfigurationError) Error

func (e ConfigurationError) Error() string

Error implements the error interface for a ConfigurationError

func (ConfigurationError) Is

func (e ConfigurationError) Is(target error) bool

Is determines whether the error matches some target. The target is a match if it is a ConfigurationError and:

- target.error is nil, or - target.error matches the wrapped error

func (ConfigurationError) Unwrap

func (e ConfigurationError) Unwrap() error

Unwrap returns the wrapped error

type Consumer

type Consumer interface {
	IsRunning() error
	Start(ctx context.Context) error
	Wait() error
}

func NewConsumer

func NewConsumer(config *Config, opts ...ConsumerOption) (Consumer, error)

NewConsumer creates a new Kafka consumer with the specified configuration and options. The consumer must be started before it can receive messages.

type ConsumerOption

type ConsumerOption func(*Config, *consumer) error

func AutoOffsetReset

func AutoOffsetReset(opt OffsetReset) ConsumerOption

AutoOffsetReset sets the auto.offset.reset configuration option for a consumer. This option determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server (e.g. because that data has been deleted).

Valid values are:

OffsetResetEarliest (earliest)  // the consumer will resume reading
                                // messages from the earliest available
                                // unread message for the consumer group

OffsetResetLatest   (latest)    // the consumer will resume reading
                                // messages from the latest available
                                // message, skipping any messages
                                // produced while the consumer group was
                                // not active

OffsetResetNone     (none)      // the consumer will throw an error if
                                // no offset is found for the consumer
                                // group

func ConsumerGroupID

func ConsumerGroupID(id string) ConsumerOption

ConsumerGroupID establishes the ID of the consumer group this consumer will join.

func MessageDecryption

func MessageDecryption(fn CypherFunc) ConsumerOption

MessageDecryption sets a function the consumer will use to decrypt messages before passing them to the handler for that message's topic.

For applications involving both consumers and producers the base configuration kafka.Cypher may be specified which provides both encryption and decryption functions.

func ReadTimeout

func ReadTimeout(timeout time.Duration) ConsumerOption

ReadTimeout sets the read timeout for the consumer. If not set, a default of 1s is used.

func SeekTimeout added in v0.3.0

func SeekTimeout(timeout time.Duration) ConsumerOption

SeekTimeout sets the timeout for seek operations on the consumer. If not set, a default of 100ms is used.

func TopicHandler

func TopicHandler[T comparable](topic T, h Handler) ConsumerOption

TopicHandler registers a handler for a specific topic.

func TopicHandlers

func TopicHandlers[T comparable](handlers map[T]Handler) ConsumerOption

TopicHandlers registers multiple handlers for specific topics.

type ConsumerPanicError added in v0.3.0

type ConsumerPanicError struct {
	Recovered any
}

ConsumerPanicError is an error that indicates a consumer panic

func (ConsumerPanicError) Error added in v0.3.0

func (e ConsumerPanicError) Error() string

Error implements the error interface for a ConsumerPanicError

func (ConsumerPanicError) Is added in v0.3.0

func (e ConsumerPanicError) Is(target error) bool

Is determines whether the error matches some target. The target is a match if it is a ConsumerPanicError and:

  • has a nil Recovered, or
  • has a non-nil Recovered which equals the target.Recovered

type CypherFunc

type CypherFunc func(context.Context, *kafka.Message) error

CypherFunc is the signature for a function that can be used to encrypt or decrypt a message, as required by the kafka.MessageDecryption or kafka.MessageEncryption Consumer and Producer options.

type CypherHandler

type CypherHandler interface {
	Decrypt(ctx context.Context, msg *kafka.Message) error
	Encrypt(ctx context.Context, msg *kafka.Message) error
}

CypherHandler is an interface that must be implemented by a handler provided to a Config for the encryption and decryption of messages.

type Handler

type Handler interface {
	HandleMessage(ctx context.Context, msg *Message) error
}

func If added in v0.2.0

func If(cond func(*Message) bool, h Handler) Handler

If returns a Handler that only calls the given Handler if the given condition function returns true for the message. If the condition function returns false, the message is ignored and the Handler is not called.

type HandlerFunc

type HandlerFunc func(context.Context, *Message) error

HandlerFunc is the function signature for a handler function.

For simple handlers you may choose to implement a function with this signature and cast it as this type for use as a Handler (modelled on the http.HandlerFunc type):

kafka.TopicHandler("my-topic", kafka.HandlerFunc(func(ctx context.Context, msg *kafka.Message) error {
    // handle the message
    return nil
}))

For more complex handlers you may choose to implement the Handler interface on a struct type that holds additional state or dependencies:

type MyHandler struct {
    db *sql.DB
    p kakfa.Producer
}

func (h *MyHandler) HandleMessage(ctx context.Context, msg *kafka.Message) error {
    // handle the message using h.db
    // ...
    // then produce a message to the "done" topic
    msg, _ := kafka.NewMessage("my-topic-done")).
                  WithKey(msg.Key).
                  Build()
    _, err := h.MustProduce(ctx, msg)
    return err
}

kafka.TopicHandler("my-topic", &MyHandler{
    db: theDB,
    p: theProducer,
})

func (HandlerFunc) HandleMessage

func (fn HandlerFunc) HandleMessage(ctx context.Context, msg *Message) error

HandleMessage implements the Handler interface by calling the HandlerFunc. If the HandlerFunc is nil, this method does nothing (except return nil).

type Header = kafka.Header

Header is an alias for the confluent kafka.Header type.

type LogInfo

type LogInfo struct {
	// Consumer is the consumer group that the log event is related to
	Consumer *string `json:"consumer,omitempty"`

	// Error is an error that the log event is related to
	Error error `json:"-"`

	// Recovered is a value that has been recovered from a panic
	Recovered *any `json:"-"`

	// Headers is a map of headers from a message
	Headers map[string][]byte `json:"headers,omitempty"`

	// Key is the key of a message
	Key []byte `json:"key,omitempty"`

	// Offset is the offset of a message
	Offset *kafka.Offset `json:"offset,omitempty"`

	// Partition is the partition of a message or otherwise related to the log event
	// (e.g. when logging a producer error)
	Partition *int32 `json:"partition,omitempty"`

	// Topic is the topic of a message or otherwise related to the log event
	// (e.g. when logging a producer error)
	Topic *string `json:"topic,omitempty"`

	// Topics identifies 1 or more topics when a log event relates to those topics.
	// It should not be used to identify a single topic when a log event relates to
	// a message; the Topic field should be used instead.
	Topics *[]string `json:"topics,omitempty"`

	// Timestamp is the timestamp of a message
	Timestamp *time.Time `json:"timestamp,omitempty"`

	// Reason is a reason that the log event is being logged
	Reason *string `json:"reason,omitempty"`
}

LogInfo contains information about a logging event; it is used to pass information to logger functions configured by an application using this module.

LogInfo implements the fmt.Stringer interface and provides a string representation of the log event. However, this string representation is intended to provide a human-readable representation of the log event to support unit tests and debugging; it is not intended to be used for log serialization which should be implemented by the logger functions configured by the application.

func (LogInfo) String

func (info LogInfo) String() string

String implements the fmt.Stringer interface and returns a string representation of the LogInfo. This implementation is provided for convenience (primarily in unit test output) and is not intended to be used for log serialization.

Details

The string representation is a space-separated list of key=<value> pairs where the key is the name of the field and the value is the string representation of the field's value.

The string representation of the value of each field is determined by the type of the field:

[]byte              // converted to string, enclosed by [square brackets]
identifier          // string (consumer, topic fields)
string              // quoted string (including error field)
int32               // string representation of the integer
kafka.Offset:       // string representation of the offset
time.Time           // RFC3339 representation of the timestamp
any                 // %v representation of the value (recovered field)

Message headers are represented as a map of key-value pairs enclosed by {curly braces} where the key is the header key and the value is the []byte string representation of the header value.

type Loggers

type Loggers struct {
	Debug func(context.Context, string, LogInfo)
	Info  func(context.Context, string, LogInfo)
	Error func(context.Context, string, LogInfo)
}

Loggers is a struct that contains functions for logging debug, info and error messages. The functions are expected to log the message and the LogInfo provided.

type Message

type Message = kafka.Message

Message is an alias for the confluent kafka.Message type.

func CreateMessage added in v0.2.0

func CreateMessage(opts ...MessageOption) (Message, error)

CreateMessage creates a new message with the specified topic and options. Any error that occurs while creating the message is returned with a zero-value message.

Parameters

The function is generic with a type parameter T that specifies the type of the topic; the topic type must support conversion to a string using fmt.Sprintf("%v").

Options

Message options are applied in the order they are provided. Available options are:

message.Copy(msg *Message)            // copies the Key and Value from a specified message
message.Header(key, value string)     // adds a header to the message
message.Headers(map[string]string)    // adds headers to the message
message.Key(key []byte)               // sets the message key
message.Partition(partition int32)    // sets the message partition (will use kafka.AnyPartition by default)
message.Topic[T](topic T)             // sets the message topic
message.JSON(value any)               // sets the message value as a JSON representation of the given value
message.String(value string)          // sets the message value to the specified string
message.Value(value []byte)           // sets the message value to a copy of the specified byte slice

func NewMessage

func NewMessage[T comparable](topic T, opts ...MessageOption) Message

NewMessage returns a new kafka.Message configured with a specified topic and any of the supported options, as required.

Errors

If an error occurs while creating the message the function will panic. If you need to handle the error use the CreateMessage function.

Parameters

The function is generic with a type parameter T that specifies the type of the topic; the topic type must support conversion to a string using fmt.Sprintf("%v").

Options

Message options are applied in the order they are provided. Available options are:

message.Copy(msg *Message)            // copies the Key and Value from a specified message
message.Header(key, value string)     // adds a header to the message
message.Headers(map[string]string)    // adds headers to the message
message.Key(key []byte)               // sets the message key
message.Partition(partition int32)    // sets the message partition (will use kafka.AnyPartition by default)
message.Topic[T](topic T)             // sets the message topic
message.JSON(value any)               // sets the message value as a JSON representation of the given value
message.String(value string)          // sets the message value to the specified string
message.Value(value []byte)           // sets the message value to a copy of the specified byte slice

Example

msg := NewMessage("topic",
	message.Header("key", "value"),
	message.JSON(model),
)

type MessageOption added in v0.2.0

type MessageOption func(*Message) error

MessageOption is a function type that applies an option to a Message.

type MockProducer

type MockProducer[T comparable] interface {
	Expect(topic T) *mock.Expectation
	ExpectationsWereMet() error
	Reset()
}

type OffsetReset

type OffsetReset string

type Producer

type Producer interface {
	Err() error
	MustProduce(ctx context.Context, msg kafka.Message) (*TopicPartition, error)
}

func NewProducer

func NewProducer(ctx context.Context, config *Config, opts ...ProducerOption) (Producer, error)

NewProducer creates a new Producer with the given configuration and options.

type ProducerOption

type ProducerOption func(*Config, *producer) error

func DeliveryTimeout

func DeliveryTimeout(timeout time.Duration) ProducerOption

DeliveryTimeout configures the producer to wait a specified duration for a message to be delivered before failing. If not configured, the default timeout is determined by the broker (typically 30 seconds).

This configuration option works in conjunction with the MaxRetries option.

DeliveryTimeout and MaxRetries

If a delivery timeout occurs, the producer will retry producing the message up to the configured number of MaxRetries. A timeout error will only be returned by a producer if a message is not delivered within the specified timeout period on the initial production and each retry attempt.

e.g. DeliveryTimeout -> 1 second, MaxRetries -> 2 will return a timeout error after 3 seconds (1 second + 2 retries).

func MaxRetries added in v0.2.0

func MaxRetries(retries int) ProducerOption

MaxRetries configures the producer to retry producing messages a specified number of times before failing.

The producer will only retry messages that fail to produce due to a timeout. All other delivery errors are returned immediately with no retries.

The default number of retries is 2 (3 attempts in total).

func MessageEncryption

func MessageEncryption(fn CypherFunc) ProducerOption

MessageEncryption configures the producer to encrypt messages before producing them, using a specified function.

Messages are encrypted after any configured message pipeline but before being produced.

func MessagePipeline

func MessagePipeline(pipeline ProducerPipeline) ProducerOption

MessagePipeline configures the producer to use a specified pipeline for processing messages before production and after delivery, consisting of two handlers:

BeforeProduction   // called before a message is produced; this handler
                   // is called before any encryption handler;

AfterDelivery      // called after a message has been delivered (or
                   // delivery has failed).

The BeforeProduction handler is called with the message to be produced. If the BeforeProduction handler returns an error, the message is not produced.

The AfterDelivery handler is called with the message that was produced and the delivery error, if any.

The MessagePipeline is intended to be used to introduce cross-cutting concerns into message production, such as logging and trace instrumentation. Message encryption should be configured using the MessageEncryption option.

type ProducerPipeline

type ProducerPipeline struct {
	BeforeProduction Handler
	AfterDelivery    func(context.Context, *Message, error)
}

type TopicPartition

type TopicPartition = kafka.TopicPartition

TopicPartition is an alias for the confluent kafka.TopicPartition type.

type UnexpectedDeliveryEvent

type UnexpectedDeliveryEvent struct {
	// contains filtered or unexported fields
}

UnexpectedDeliveryEvent is an error returned when a delivery event was not of an expected type (Message or Event)

func (UnexpectedDeliveryEvent) Error

func (err UnexpectedDeliveryEvent) Error() string

Error implements the error interface for an UnexpectedDeliveryEvent

func (UnexpectedDeliveryEvent) Is

func (err UnexpectedDeliveryEvent) Is(target error) bool

Is determines whether the error matches some target. The target is a match if it is an UnexpectedDeliveryEvent and:

  • has a nil event, or
  • has the same event

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL