kafkax

package
v1.67.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: MIT Imports: 23 Imported by: 0

README

Package kafkax

Пакет kafkax предоставляет высокоуровневую абстракцию для работы с Apache Kafka, включая продюсеры, консьюмеры, аутентификацию, TLS, middleware для логирования, метрик и обработки ошибок. Поддерживает динамическое обновление конфигурации без остановки сервиса.

Types

Client

Центральный клиент для управления продюсерами и консьюмерами Kafka.

Methods:

New(logger log.Logger) *Client

Создать новый клиент с логгером.

(c *Client) UpgradeAndServe(ctx context.Context, config Config)

Обновить конфигурацию и перезапустить подключения:

  • Останавливает старые соединения
  • Инициализирует новые продюсеры/консьюмеры
  • Запускает обработку сообщений
(c *Client) Healthcheck(ctx context.Context) error

Проверить доступность всех продюсеров и консьюмеров.

(c *Client) Close()

Остановить все соединения.

PublisherConfig

Конфигурация паблишера для отправки сообщений.

Methods:

DefaultPublisher(logCtx context.Context, logger log.Logger, restMiddlewares ...publisher.Middleware) *publisher.Publisher

Создать паблишера с настройками по умолчанию:

  • Таймаут отправки: 10 сек
  • Размер батча: 64 МБ
  • Middleware для метрик и requestId
ConsumerConfig

Конфигурация консумера для чтения сообщений.

Methods:

DefaultConsumer(logCtx context.Context, logger log.Logger, handler consumer.Handler, restMiddlewares ...consumer.Middleware) consumer.Consumer

Создать консьюмер с настройками по умолчанию:

  • Таймаут подключения: 5 сек
  • Интервал коммита: 1 сек
  • Middleware для логирования и requestId
LogObserver

Реализация интерфейса kafkax.Observer для логирования событий Kafka-клиента.

Methods:

NewLogObserver(ctx context.Context, logger log.Logger) LogObserver

Конструктор обсервера.

(l LogObserver) ClientReady()

Залогировать сообщение о готовности Kafka-клиента.

(l LogObserver) ClientError(err error)

Залогировать сообщение об ошибке Kafka-клиента.

(l LogObserver) ShutdownStarted()

Залогировать сообщение о начале процесса завершения работы Kafka-клиента.

(l LogObserver) ShutdownDone()

Залогировать сообщение об окончании процесса завершения работы Kafka-клиента.

Functions

NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync

Создать обработчик сообщений с:

  • Логированием
  • Метриками
  • Поддержкой синхронной обработки
  • Восстановлением при панике
PublisherLog(logger log.Logger, logBody bool) publisher.Middleware

Middleware для логирования информации о публикуемых сообщениях. Логирует тело сообщения, если logBody = true

ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware

Middleware для логирования информации о получаемых сообщениях. Логирует тело сообщения, если logBody = true

PublisherRetry(retrier Retrier) publisher.Middleware

Middleware для повторной отправки сообщений при ошибках. Принимает реализацию интерфейса Retrier с логикой выдержки ретрая.

PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware

Middleware для сбора следующих метрик паблишера:

  • Время публикации сообщений
  • Размеры публикуемых сообщений
  • Количество ошибок
PublisherRequestId() publisher.Middleware

Middleware, добавляющая в заголовки сообщений паблишера requestId из контекста. Автоматически генерирует requestId, если в контексте его нет.

ConsumerRequestId() consumer.Middleware

Middleware, добавляющая в контекст requestId из заголовков полученных сообщений. Автоматически генерирует requestId, если в заголовках его нет.

Usage

Consumer & publisher
package main

import (
	"context"
	"log"

	"github.com/txix-open/isp-kit/kafkax/handler"
	"github.com/txix-open/isp-kit/kafkax"
	"github.com/txix-open/isp-kit/kafkax/consumer"
	log2 "github.com/txix-open/isp-kit/log"
)

type noopHandler struct{}

func (h noopHandler) Handle(ctx context.Context, d *consumer.Delivery) handler.Result {
	/* put here business logic */
	return handler.Commit()
}

func main() {
	logger, err := log2.New()
	if err != nil {
		log.Fatal(err)
	}

	publisherCfg := kafkax.PublisherConfig{
		Addresses:             []string{"localhost:9092"},
		Topic:                 "topic-2",
		BatchSizePerPartition: 1,
		Auth: &kafkax.Auth{
			Username: "test",
			Password: "test",
		},
	}
	consumerCfg := kafkax.ConsumerConfig{
		Addresses:   []string{"localhost:9092"},
		Topic:       "topic-1",
		GroupId:     "test",
		Concurrency: 1,
		Auth: &kafkax.Auth{
			Username: "test",
			Password: "test",
		},
	}

	consumer := consumerCfg.DefaultConsumer(
		context.Background(),
		logger,
		handler.NewSync(logger, noopHandler{}),
	)
	publisher := publisherCfg.DefaultPublisher(context.Background(), logger)

	cli := kafkax.New(logger)
	cli.UpgradeAndServe(context.Background(), kafkax.NewConfig(
		kafkax.WithConsumers(consumer),
		kafkax.WithPublishers(publisher),
	))
}

Documentation

Overview

nolint:ireturn,lll,unparam,gosec

Package kafkax provides a high-level abstraction over Apache Kafka for publishing and consuming messages. It builds on top of the franz-go client library and includes built-in support for metrics, logging, middlewares, and graceful shutdown.

The package supports both synchronous publishing and concurrent consuming with configurable concurrency levels, automatic offset committing, and middleware chains for cross-cutting concerns like logging, metrics, and request IDs.

Index

Constants

View Source
const (
	AuthTypePlain   = "SASL/PLAIN"
	AuthTypeSCRAM   = "SASL/SCRAM"
	ScramTypeSHA256 = "SHA256"
	ScramTypeSHA512 = "SHA512"
)

Variables

This section is empty.

Functions

func ConsumerLog

func ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware

ConsumerLog creates a middleware that logs consume operations. When logBody is true, the message body is included in the log output.

func ConsumerRequestId

func ConsumerRequestId() consumer.Middleware

ConsumerRequestId creates a middleware that extracts request IDs from Kafka message headers and adds them to the context. If no request ID is found, a new one is generated.

func GetHeaderValue

func GetHeaderValue(headers []kgo.RecordHeader, key string) string

GetHeaderValue retrieves the value of a header with the specified key from the provided headers slice. Returns an empty string if the key is not found.

func NewResultHandler

func NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync

NewResultHandler creates a new synchronous message handler with default middlewares including logging, metrics, and panic recovery. The provided adapter implements the business logic for handling messages.

func PlainAuth

func PlainAuth(auth *Auth) sasl.Mechanism

PlainAuth creates a SASL/PLAIN authentication mechanism from the provided credentials. Returns nil if auth is nil.

func PublisherLog

func PublisherLog(logger log.Logger, logBody bool) publisher.Middleware

PublisherLog creates a middleware that logs publish operations. When logBody is true, the message body is included in the log output.

func PublisherMetrics

func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware

PublisherMetrics creates a middleware that records metrics for message publishing operations, including duration, message size, and error counts.

func PublisherRequestId

func PublisherRequestId() publisher.Middleware

PublisherRequestId creates a middleware that propagates request IDs to Kafka message headers. If no request ID is present in the context, a new one is generated.

func PublisherRetry added in v1.50.0

func PublisherRetry(retrier Retrier) publisher.Middleware

PublisherRetry creates a middleware for retrying message publications. It is recommended to use this middleware after logging, to avoid duplicate logging of publication attempts.

func ScramAuth added in v1.47.0

func ScramAuth(auth *Auth) (sasl.Mechanism, error)

ScramAuth creates a SASL/SCRAM authentication mechanism from the provided credentials. Returns an error if ScramType is not configured or is invalid.

Types

type Auth

type Auth struct {
	Mechanism *string `` /* 173-byte string literal not displayed */
	ScramType *string `` /* 187-byte string literal not displayed */
	Username  string  `validate:"required" schema:"Логин"`
	Password  string  `validate:"required" schema:"Пароль"`
}

Auth holds authentication configuration for connecting to Kafka.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client manages the lifecycle of Kafka publishers and consumers. It supports dynamic reconfiguration through UpgradeAndServe and provides health checking and graceful shutdown capabilities.

Client is safe for concurrent use.

func New

func New(logger log.Logger) *Client

New creates a new Client instance with the provided logger.

func (*Client) Close

func (c *Client) Close()

Close gracefully shuts down the client and releases all resources. It is safe to call Close multiple times.

func (*Client) Healthcheck

func (c *Client) Healthcheck(ctx context.Context) error

Healthcheck verifies the health of all consumers and publishers. Returns an error if any component is unhealthy.

func (*Client) Shutdown added in v1.39.3

func (c *Client) Shutdown()

Shutdown gracefully stops all consumers and publishers, waiting for pending operations to complete.

func (*Client) UpgradeAndServe

func (c *Client) UpgradeAndServe(ctx context.Context, config Config)

UpgradeAndServe initializes or reinitializes Kafka publishers and consumers based on the provided configuration. If the new configuration is identical to the previous one, initialization is skipped. Existing consumers and publishers are gracefully shut down before new ones are started.

type Config

type Config struct {
	Publishers []*publisher.Publisher
	Consumers  []consumer.Consumer
}

Config holds the configuration for Kafka publishers and consumers.

func NewConfig

func NewConfig(opts ...ConfigOption) Config

NewConfig creates a new Config instance using the provided options.

type ConfigOption

type ConfigOption func(c *Config)

ConfigOption is a function that configures a Config instance.

func WithConsumers

func WithConsumers(consumers ...consumer.Consumer) ConfigOption

WithConsumers sets the consumers for the Config.

func WithPublishers

func WithPublishers(publishers ...*publisher.Publisher) ConfigOption

WithPublishers sets the publishers for the Config.

type ConsumerConfig

type ConsumerConfig struct {
	Addresses         []string `validate:"required" schema:"Список адресов брокеров для чтения сообщений"`
	Topic             string   `validate:"required" schema:"Топик"`
	GroupId           string   `validate:"required" schema:"Идентификатор консьюмера"`
	Concurrency       int      `schema:"Кол-во обработчиков, по умолчанию 1"`
	MaxBatchSizeMb    int32    `` /* 133-byte string literal not displayed */
	CommitIntervalSec *int     `` /* 141-byte string literal not displayed */
	Auth              *Auth    `schema:"Параметры аутентификации"`
	TLS               *TLS     `schema:"Данные для установки TLS-соединения"`
	DialTimeoutMs     *int     `schema:"Таймаут установки соединения, по умолчанию 5 секунд"`
	MetricConsumerId  *string  `` /* 150-byte string literal not displayed */
}

ConsumerConfig holds configuration for a Kafka consumer.

func (ConsumerConfig) DefaultConsumer

func (c ConsumerConfig) DefaultConsumer(
	logCtx context.Context,
	logger log.Logger,
	handler consumer.Handler,
	restMiddlewares ...consumer.Middleware,
) consumer.Consumer

DefaultConsumer creates a new consumer with default configuration, including built-in request ID middleware and log observer. Additional middlewares can be provided via restMiddlewares.

func (ConsumerConfig) GetCommitInterval added in v1.37.1

func (c ConsumerConfig) GetCommitInterval() time.Duration

GetCommitInterval returns the auto-commit interval duration. Returns 1 second by default if not configured.

func (ConsumerConfig) GetDialTimeout added in v1.41.0

func (c ConsumerConfig) GetDialTimeout() time.Duration

GetDialTimeout returns the dial timeout duration. Returns 5 seconds by default if not configured.

func (ConsumerConfig) GetMaxBatchSizeMb added in v1.37.1

func (c ConsumerConfig) GetMaxBatchSizeMb() int32

GetMaxBatchSizeMb returns the maximum batch size in MB. Returns 64MB by default if not configured or set to a non-positive value.

type LogObserver

type LogObserver struct {
	NoopObserver
	// contains filtered or unexported fields
}

LogObserver is an Observer implementation that logs lifecycle events to the provided logger.

func NewLogObserver

func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver

NewLogObserver creates a new LogObserver with the provided context and logger.

func (LogObserver) ClientError

func (l LogObserver) ClientError(err error)

ClientError logs an unexpected client error.

func (LogObserver) ClientReady

func (l LogObserver) ClientReady()

ClientReady logs that the Kafka client has successfully connected.

func (LogObserver) ShutdownDone

func (l LogObserver) ShutdownDone()

ShutdownDone logs that the shutdown process has completed.

func (LogObserver) ShutdownStarted

func (l LogObserver) ShutdownStarted()

ShutdownStarted logs that the shutdown process has begun.

type Logger added in v1.64.1

type Logger struct {
	// contains filtered or unexported fields
}

Logger is an adapter that bridges franz-go's logging interface to the application's logger. It prefixes log messages with a configurable name and maps franz-go log levels to the application's log levels.

func NewLogger added in v1.64.1

func NewLogger(ctx context.Context, name string, level kgo.LogLevel, logger log.Logger) *Logger

NewLogger creates a new Logger instance with the provided configuration.

func (*Logger) Level added in v1.64.1

func (l *Logger) Level() kgo.LogLevel

Level returns the logger's configured log level.

func (*Logger) Log added in v1.64.1

func (l *Logger) Log(level kgo.LogLevel, msg string, keyvals ...any)

Log writes a log message at the specified level. The message is prefixed with the logger's name and forwarded to the underlying logger.

type NoopObserver

type NoopObserver struct{}

NoopObserver is a no-op implementation of the Observer interface that ignores all events.

func (NoopObserver) ClientError

func (n NoopObserver) ClientError(err error)

ClientError does nothing.

func (NoopObserver) ClientReady

func (n NoopObserver) ClientReady()

ClientReady does nothing.

func (NoopObserver) ShutdownDone

func (n NoopObserver) ShutdownDone()

ShutdownDone does nothing.

func (NoopObserver) ShutdownStarted

func (n NoopObserver) ShutdownStarted()

ShutdownStarted does nothing.

type Observer

type Observer interface {
	ClientReady()
	ClientError(err error)
	ShutdownStarted()
	ShutdownDone()
}

Observer defines an interface for receiving lifecycle events from the Kafka client, such as connection status and shutdown notifications.

type PublisherConfig

type PublisherConfig struct {
	Addresses                  []string `validate:"required" schema:"Список адресов брокеров для отправки сообщений"`
	Topic                      string   `` /* 160-byte string literal not displayed */
	MaxMsgSizeMbPerPartition   int32    `schema:"Максимальный размер сообщений в Мб, по умолчанию 64 Мб"`
	BatchSizePerPartition      int      `` /* 142-byte string literal not displayed */
	BatchTimeoutPerPartitionMs *int     `schema:"Периодичность записи батчей в кафку в мс, по умолчанию 500 мс"`
	WriteTimeoutSec            *int     `schema:"Таймаут отправки сообщений, по умолчанию 10 секунд"`
	RequiredAckLevel           int      `` /* 179-byte string literal not displayed */
	Auth                       *Auth    `schema:"Параметры аутентификации"`
	TLS                        *TLS     `schema:"Данные для установки TLS-соединения"`
	DialTimeoutMs              *int     `schema:"Таймаут установки соединения, по умолчанию 5 секунд"`
	MetricPublisherId          *string  `` /* 148-byte string literal not displayed */
}

PublisherConfig holds configuration for a Kafka publisher.

func (PublisherConfig) DefaultPublisher

func (p PublisherConfig) DefaultPublisher(
	logCtx context.Context,
	logger log.Logger,
	restMiddlewares ...publisher.Middleware,
) *publisher.Publisher

DefaultPublisher creates a new publisher with default configuration, including built-in metrics and request ID middlewares. Additional middlewares can be provided via restMiddlewares.

func (PublisherConfig) GetBatchSizePerPartition added in v1.38.1

func (p PublisherConfig) GetBatchSizePerPartition() int

GetBatchSizePerPartition returns the batch size per partition. Returns 10 by default if not configured or set to a non-positive value.

func (PublisherConfig) GetBatchTimeoutPerPartition added in v1.38.1

func (p PublisherConfig) GetBatchTimeoutPerPartition() time.Duration

GetBatchTimeoutPerPartition returns the batch timeout duration per partition. Returns 500ms by default if not configured.

func (PublisherConfig) GetDialTimeout added in v1.41.0

func (p PublisherConfig) GetDialTimeout() time.Duration

GetDialTimeout returns the dial timeout duration. Returns 5 seconds by default if not configured.

func (PublisherConfig) GetMaxMessageSizePerPartition added in v1.38.1

func (p PublisherConfig) GetMaxMessageSizePerPartition() int32

GetMaxMessageSizePerPartition returns the maximum message size per partition in bytes. Returns 64MB by default if not configured or set to a non-positive value.

func (PublisherConfig) GetRequiredAckLevel

func (p PublisherConfig) GetRequiredAckLevel() kgo.Acks

GetRequiredAckLevel returns the required acknowledgment level for message production. Returns LeaderAck() for level 1, AllISRAcks() for level -1, and NoAck() for all other values.

func (PublisherConfig) GetWriteTimeout added in v1.37.1

func (p PublisherConfig) GetWriteTimeout() time.Duration

GetWriteTimeout returns the write timeout duration. Returns 10 seconds by default if not configured.

type PublisherMetricStorage

type PublisherMetricStorage interface {
	ObservePublishDuration(topic string, t time.Duration)
	ObservePublishMsgSize(topic string, size int)
	IncPublishError(topic string)
}

PublisherMetricStorage defines the interface for publisher metrics storage.

type Retrier added in v1.50.0

type Retrier interface {
	Do(ctx context.Context, f func() error) error
}

Retrier defines an interface for retry logic implementations.

type TLS added in v1.47.0

type TLS struct {
	RootCA             *string `schema:"Корневой сертификат"`
	PrivateKey         *string `schema:"Закрытый ключ"`
	Certificate        *string `schema:"Сертификат клиента"`
	InsecureSkipVerify bool    `schema:"Пропуск проверки сертификатов"`
}

TLS holds TLS configuration for secure connections to Kafka.

Directories

Path Synopsis
Package consumer provides a high-level abstraction for consuming messages from Apache Kafka topics.
Package consumer provides a high-level abstraction for consuming messages from Apache Kafka topics.
Package handler provides synchronous message processing with support for retry logic, commit handling, and middleware chains for cross-cutting concerns like logging, metrics, and panic recovery.
Package handler provides synchronous message processing with support for retry logic, commit handling, and middleware chains for cross-cutting concerns like logging, metrics, and panic recovery.
Package publisher provides a high-level abstraction for publishing messages to Apache Kafka topics.
Package publisher provides a high-level abstraction for publishing messages to Apache Kafka topics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL