kafkax

package
v1.64.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2026 License: MIT Imports: 23 Imported by: 0

README

Package kafkax

Пакет kafkax предоставляет высокоуровневую абстракцию для работы с Apache Kafka, включая продюсеры, консьюмеры, аутентификацию, TLS, middleware для логирования, метрик и обработки ошибок. Поддерживает динамическое обновление конфигурации без остановки сервиса.

Types

Client

Центральный клиент для управления продюсерами и консьюмерами Kafka.

Methods:

New(logger log.Logger) *Client

Создать новый клиент с логгером.

(c *Client) UpgradeAndServe(ctx context.Context, config Config)

Обновить конфигурацию и перезапустить подключения:

  • Останавливает старые соединения
  • Инициализирует новые продюсеры/консьюмеры
  • Запускает обработку сообщений
(c *Client) Healthcheck(ctx context.Context) error

Проверить доступность всех продюсеров и консьюмеров.

(c *Client) Close()

Остановить все соединения.

PublisherConfig

Конфигурация паблишера для отправки сообщений.

Methods:

DefaultPublisher(logCtx context.Context, logger log.Logger, restMiddlewares ...publisher.Middleware) *publisher.Publisher

Создать паблишера с настройками по умолчанию:

  • Таймаут отправки: 10 сек
  • Размер батча: 64 МБ
  • Middleware для метрик и requestId
ConsumerConfig

Конфигурация консумера для чтения сообщений.

Methods:

DefaultConsumer(logCtx context.Context, logger log.Logger, handler consumer.Handler, restMiddlewares ...consumer.Middleware) consumer.Consumer

Создать консьюмер с настройками по умолчанию:

  • Таймаут подключения: 5 сек
  • Интервал коммита: 1 сек
  • Middleware для логирования и requestId
LogObserver

Реализация интерфейса kafkax.Observer для логирования событий Kafka-клиента.

Methods:

NewLogObserver(ctx context.Context, logger log.Logger) LogObserver

Конструктор обсервера.

(l LogObserver) ClientReady()

Залогировать сообщение о готовности Kafka-клиента.

(l LogObserver) ClientError(err error)

Залогировать сообщение об ошибке Kafka-клиента.

(l LogObserver) ShutdownStarted()

Залогировать сообщение о начале процесса завершения работы Kafka-клиента.

(l LogObserver) ShutdownDone()

Залогировать сообщение об окончании процесса завершения работы Kafka-клиента.

Functions

NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync

Создать обработчик сообщений с:

  • Логированием
  • Метриками
  • Поддержкой синхронной обработки
  • Восстановлением при панике
PublisherLog(logger log.Logger, logBody bool) publisher.Middleware

Middleware для логирования информации о публикуемых сообщениях. Логирует тело сообщения, если logBody = true

ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware

Middleware для логирования информации о получаемых сообщениях. Логирует тело сообщения, если logBody = true

PublisherRetry(retrier Retrier) publisher.Middleware

Middleware для повторной отправки сообщений при ошибках. Принимает реализацию интерфейса Retrier с логикой выдержки ретрая.

PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware

Middleware для сбора следующих метрик паблишера:

  • Время публикации сообщений
  • Размеры публикуемых сообщений
  • Количество ошибок
PublisherRequestId() publisher.Middleware

Middleware, добавляющая в заголовки сообщений паблишера requestId из контекста. Автоматически генерирует requestId, если в контексте его нет.

ConsumerRequestId() consumer.Middleware

Middleware, добавляющая в контекст requestId из заголовков полученных сообщений. Автоматически генерирует requestId, если в заголовках его нет.

Usage

Consumer & publisher
package main

import (
	"context"
	"log"

	"github.com/txix-open/isp-kit/kafkax/handler"
	"github.com/txix-open/isp-kit/kafkax"
	"github.com/txix-open/isp-kit/kafkax/consumer"
	log2 "github.com/txix-open/isp-kit/log"
)

type noopHandler struct{}

func (h noopHandler) Handle(ctx context.Context, d *consumer.Delivery) handler.Result {
	/* put here business logic */
	return handler.Commit()
}

func main() {
	logger, err := log2.New()
	if err != nil {
		log.Fatal(err)
	}

	publisherCfg := kafkax.PublisherConfig{
		Addresses:             []string{"localhost:9092"},
		Topic:                 "topic-2",
		BatchSizePerPartition: 1,
		Auth: &kafkax.Auth{
			Username: "test",
			Password: "test",
		},
	}
	consumerCfg := kafkax.ConsumerConfig{
		Addresses:   []string{"localhost:9092"},
		Topic:       "topic-1",
		GroupId:     "test",
		Concurrency: 1,
		Auth: &kafkax.Auth{
			Username: "test",
			Password: "test",
		},
	}

	consumer := consumerCfg.DefaultConsumer(
		context.Background(),
		logger,
		handler.NewSync(logger, noopHandler{}),
	)
	publisher := publisherCfg.DefaultPublisher(context.Background(), logger)

	cli := kafkax.New(logger)
	cli.UpgradeAndServe(context.Background(), kafkax.NewConfig(
		kafkax.WithConsumers(consumer),
		kafkax.WithPublishers(publisher),
	))
}

Documentation

Overview

nolint:ireturn,lll,unparam,gosec

Index

Constants

View Source
const (
	AuthTypePlain   = "SASL/PLAIN"
	AuthTypeSCRAM   = "SASL/SCRAM"
	ScramTypeSHA256 = "SHA256"
	ScramTypeSHA512 = "SHA512"
)

Variables

This section is empty.

Functions

func ConsumerLog

func ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware

func ConsumerRequestId

func ConsumerRequestId() consumer.Middleware

func GetHeaderValue

func GetHeaderValue(headers []kgo.RecordHeader, key string) string

func NewResultHandler

func NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync

func PlainAuth

func PlainAuth(auth *Auth) sasl.Mechanism

func PublisherLog

func PublisherLog(logger log.Logger, logBody bool) publisher.Middleware

func PublisherMetrics

func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware

func PublisherRequestId

func PublisherRequestId() publisher.Middleware

func PublisherRetry added in v1.50.0

func PublisherRetry(retrier Retrier) publisher.Middleware

PublisherRetry creates a middleware for retrying message publications. It is recommended to use this middleware after logging, to avoid duplicate logging of publication attempts.

func ScramAuth added in v1.47.0

func ScramAuth(auth *Auth) (sasl.Mechanism, error)

Types

type Auth

type Auth struct {
	Mechanism *string `` /* 173-byte string literal not displayed */
	ScramType *string `` /* 187-byte string literal not displayed */
	Username  string  `validate:"required" schema:"Логин"`
	Password  string  `validate:"required" schema:"Пароль"`
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(logger log.Logger) *Client

func (*Client) Close

func (c *Client) Close()

func (*Client) Healthcheck

func (c *Client) Healthcheck(ctx context.Context) error

func (*Client) Shutdown added in v1.39.3

func (c *Client) Shutdown()

func (*Client) UpgradeAndServe

func (c *Client) UpgradeAndServe(ctx context.Context, config Config)

type Config

type Config struct {
	Publishers []*publisher.Publisher
	Consumers  []consumer.Consumer
}

func NewConfig

func NewConfig(opts ...ConfigOption) Config

type ConfigOption

type ConfigOption func(c *Config)

func WithConsumers

func WithConsumers(consumers ...consumer.Consumer) ConfigOption

func WithPublishers

func WithPublishers(publishers ...*publisher.Publisher) ConfigOption

type ConsumerConfig

type ConsumerConfig struct {
	Addresses         []string `validate:"required" schema:"Список адресов брокеров для чтения сообщений"`
	Topic             string   `validate:"required" schema:"Топик"`
	GroupId           string   `validate:"required" schema:"Идентификатор консьюмера"`
	Concurrency       int      `schema:"Кол-во обработчиков, по умолчанию 1"`
	MaxBatchSizeMb    int32    `` /* 133-byte string literal not displayed */
	CommitIntervalSec *int     `` /* 141-byte string literal not displayed */
	Auth              *Auth    `schema:"Параметры аутентификации"`
	TLS               *TLS     `schema:"Данные для установки TLS-соединения"`
	DialTimeoutMs     *int     `schema:"Таймаут установки соединения, по умолчанию 5 секунд"`
	MetricConsumerId  *string  `` /* 150-byte string literal not displayed */
}

func (ConsumerConfig) DefaultConsumer

func (c ConsumerConfig) DefaultConsumer(
	logCtx context.Context,
	logger log.Logger,
	handler consumer.Handler,
	restMiddlewares ...consumer.Middleware,
) consumer.Consumer

func (ConsumerConfig) GetCommitInterval added in v1.37.1

func (c ConsumerConfig) GetCommitInterval() time.Duration

func (ConsumerConfig) GetDialTimeout added in v1.41.0

func (c ConsumerConfig) GetDialTimeout() time.Duration

func (ConsumerConfig) GetMaxBatchSizeMb added in v1.37.1

func (c ConsumerConfig) GetMaxBatchSizeMb() int32

type LogObserver

type LogObserver struct {
	NoopObserver
	// contains filtered or unexported fields
}

nolint:containedctx

func NewLogObserver

func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver

func (LogObserver) ClientError

func (l LogObserver) ClientError(err error)

func (LogObserver) ClientReady

func (l LogObserver) ClientReady()

func (LogObserver) ShutdownDone

func (l LogObserver) ShutdownDone()

func (LogObserver) ShutdownStarted

func (l LogObserver) ShutdownStarted()

type Logger added in v1.64.1

type Logger struct {
	// contains filtered or unexported fields
}

func NewLogger added in v1.64.1

func NewLogger(ctx context.Context, name string, level kgo.LogLevel, logger log.Logger) *Logger

func (*Logger) Level added in v1.64.1

func (l *Logger) Level() kgo.LogLevel

func (*Logger) Log added in v1.64.1

func (l *Logger) Log(level kgo.LogLevel, msg string, keyvals ...any)

type NoopObserver

type NoopObserver struct{}

func (NoopObserver) ClientError

func (n NoopObserver) ClientError(err error)

func (NoopObserver) ClientReady

func (n NoopObserver) ClientReady()

func (NoopObserver) ShutdownDone

func (n NoopObserver) ShutdownDone()

func (NoopObserver) ShutdownStarted

func (n NoopObserver) ShutdownStarted()

type Observer

type Observer interface {
	ClientReady()
	ClientError(err error)
	ShutdownStarted()
	ShutdownDone()
}

type PublisherConfig

type PublisherConfig struct {
	Addresses                  []string `validate:"required" schema:"Список адресов брокеров для отправки сообщений"`
	Topic                      string   `` /* 160-byte string literal not displayed */
	MaxMsgSizeMbPerPartition   int32    `schema:"Максимальный размер сообщений в Мб, по умолчанию 64 Мб"`
	BatchSizePerPartition      int      `` /* 142-byte string literal not displayed */
	BatchTimeoutPerPartitionMs *int     `schema:"Периодичность записи батчей в кафку в мс, по умолчанию 500 мс"`
	WriteTimeoutSec            *int     `schema:"Таймаут отправки сообщений, по умолчанию 10 секунд"`
	RequiredAckLevel           int      `` /* 179-byte string literal not displayed */
	Auth                       *Auth    `schema:"Параметры аутентификации"`
	TLS                        *TLS     `schema:"Данные для установки TLS-соединения"`
	DialTimeoutMs              *int     `schema:"Таймаут установки соединения, по умолчанию 5 секунд"`
	MetricPublisherId          *string  `` /* 148-byte string literal not displayed */
}

func (PublisherConfig) DefaultPublisher

func (p PublisherConfig) DefaultPublisher(
	logCtx context.Context,
	logger log.Logger,
	restMiddlewares ...publisher.Middleware,
) *publisher.Publisher

func (PublisherConfig) GetBatchSizePerPartition added in v1.38.1

func (p PublisherConfig) GetBatchSizePerPartition() int

func (PublisherConfig) GetBatchTimeoutPerPartition added in v1.38.1

func (p PublisherConfig) GetBatchTimeoutPerPartition() time.Duration

func (PublisherConfig) GetDialTimeout added in v1.41.0

func (p PublisherConfig) GetDialTimeout() time.Duration

func (PublisherConfig) GetMaxMessageSizePerPartition added in v1.38.1

func (p PublisherConfig) GetMaxMessageSizePerPartition() int32

func (PublisherConfig) GetRequiredAckLevel

func (p PublisherConfig) GetRequiredAckLevel() kgo.Acks

func (PublisherConfig) GetWriteTimeout added in v1.37.1

func (p PublisherConfig) GetWriteTimeout() time.Duration

type PublisherMetricStorage

type PublisherMetricStorage interface {
	ObservePublishDuration(topic string, t time.Duration)
	ObservePublishMsgSize(topic string, size int)
	IncPublishError(topic string)
}

type Retrier added in v1.50.0

type Retrier interface {
	Do(ctx context.Context, f func() error) error
}

type TLS added in v1.47.0

type TLS struct {
	RootCA             *string `schema:"Корневой сертификат"`
	PrivateKey         *string `schema:"Закрытый ключ"`
	Certificate        *string `schema:"Сертификат клиента"`
	InsecureSkipVerify bool    `schema:"Пропуск проверки сертификатов"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL