consumer

package
v1.52.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2025 License: MIT Imports: 10 Imported by: 0

README

Package consumer

Пакет consumer предоставляет реализацию консумера Apache Kafka с поддержкой параллельной обработки сообщений, middleware, метрик и наблюдения за состоянием. Интегрируется с пакетом kafkax для полноценной работы с Kafka.

Types

Consumer

Основная структура для чтения и обработки сообщений из Kafka. Поддерживает:

  • Параллельную обработку (конкурентность)
  • Middleware-цепочки
  • Сбор метрик

Methods:

New(reader *kafka.Reader, handler Handler, concurrency int, metrics *Metrics, opts ...Option) *Consumer

Создать нового консумера из низкоуровневого ридера из библиотеки kafka-go с указанным обработчиком сообщений Handler.

Основные опции:

  • WithMiddlewares(mws ...Middleware) Option – добавить middleware в цепочку обработки получаемых сообщений.
  • WithObserver(observer Observer) Option – добавить реализацию интерфейса Observer.
(c *Consumer) Run(ctx context.Context)

Запустить чтение сообщений из Kafka и их обработку.

(c *Consumer) Close() error

Остановить консумера, завершить все активные обработки.

(c *Consumer) Healthcheck(ctx context.Context) error

Проверить активность консумера. Возвращает ошибку, если консумер не может получать сообщения.

Delivery

Структура, представляющая полученное сообщение Kafka. Обеспечивает безопасное управление подтверждением (commit) сообщения.

Methods:

(d *Delivery) Commit(ctx context.Context) error

Подтвердить успешную обработку сообщения. Должен вызываться только один раз за сообщение.

(d *Delivery) Source() *kafka.Message

Получить исходное сообщение Kafka (топик, партиция, ключ, значение).

(d *Delivery) Done()

Отметить завершение обработки (используется для синхронизации).

(d *Delivery) ConsumerGroupId() string

Получить groupId консумера.

Metrics

Структура для сбора и отправки метрик консумера в Prometheus.

Methods:

NewMetrics(sendMetricPeriod time.Duration, reader *kafka.Reader, consumerId string) *Metrics

Создает сборщик метрик из низкоуровневого ридера Kafka.

(m *Metrics) Send(stats kafka.ReaderStats)

Единожды отправить метрики.

(m *Metrics) Run()

Запустить периодическую отправку метрик.

LogObserver

Реализация интерфейса consumer.Observer для логирования событий консумера.

Methods:

NewLogObserver(ctx context.Context, logger log.Logger) LogObserver

Конструктор обсервера.

(l LogObserver) ConsumerError(err error)

Залогировать сообщение об ошибке консумера.

(l LogObserver) BeginConsuming()

Залогировать сообщение о начале получения данных от консумера.

(l LogObserver) CloseStart()

Залогировать сообщение о начале процесса завершения работы консумера.

(l LogObserver) CloseDone()

Залогировать сообщение об окончании процесса завершения работы консумера.

Usage

Default usage flow
package main

import (
	"context"
	"log"

	"github.com/txix-open/isp-kit/kafkax"
	"github.com/txix-open/isp-kit/kafkax/consumer"
	log2 "github.com/txix-open/isp-kit/log"
)

func noopHandlerFn(ctx context.Context, delivery *consumer.Delivery) {
	/* put here business logic */
	_ = delivery.Commit(ctx)
}

func main() {
	logger, err := log2.New()
	if err != nil {
		log.Fatal(err)
	}

	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers: []string{"localhost:9092"},
		Topic:   "test",
		GroupID: "test",
	})

	observer := consumer.NewLogObserver(context.Background(), logger)
	consumer := consumer.New(
		reader,
		consumer.HandlerFunc(noopHandlerFn),
		3,   /* concurrency */
		nil, /* metrics */
		consumer.WithMiddlewares(kafkax.ConsumerLog(logger, true)),
		consumer.WithObserver(observer),
	)

	consumer.Run(context.Background())
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrDeliveryAlreadyHandled = errors.New("delivery already handled")
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func New

func New(reader *kafka.Reader, handler Handler, concurrency int, metrics *Metrics, opts ...Option) *Consumer

func (*Consumer) Close

func (c *Consumer) Close() error

func (*Consumer) Healthcheck

func (c *Consumer) Healthcheck(ctx context.Context) error

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context)

type Delivery

type Delivery struct {
	// contains filtered or unexported fields
}

func NewDelivery

func NewDelivery(donner Donner, reader *kafka.Reader, source *kafka.Message, consumerGroupId string) *Delivery

func (*Delivery) Commit

func (d *Delivery) Commit(ctx context.Context) error

func (*Delivery) ConsumerGroupId

func (d *Delivery) ConsumerGroupId() string

func (*Delivery) Done

func (d *Delivery) Done()

func (*Delivery) Source

func (d *Delivery) Source() *kafka.Message

type Donner

type Donner interface {
	Done()
}

type Handler

type Handler interface {
	Handle(ctx context.Context, delivery *Delivery)
}

type HandlerFunc

type HandlerFunc func(ctx context.Context, delivery *Delivery)

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(ctx context.Context, delivery *Delivery)

type LogObserver

type LogObserver struct {
	NoopObserver
	// contains filtered or unexported fields
}

nolint:containedctx

func NewLogObserver

func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver

func (LogObserver) BeginConsuming

func (l LogObserver) BeginConsuming()

func (LogObserver) CloseDone

func (l LogObserver) CloseDone()

func (LogObserver) CloseStart

func (l LogObserver) CloseStart()

func (LogObserver) ConsumerError

func (l LogObserver) ConsumerError(err error)

type MetricStorage added in v1.47.0

type MetricStorage interface {
	ObserveConsumerDials(dials int64)
	ObserveConsumerFetches(fetches int64)
	ObserveConsumerMessages(messages int64)
	ObserveConsumerMessageBytes(messageBytes int64)
	ObserveConsumerRebalances(rebalances int64)
	ObserveConsumerTimeouts(timeouts int64)
	ObserveConsumerError(errors int64)

	ObserveConsumerDialTime(dialTime kafka.DurationStats)
	ObserveConsumerReadTime(readTime kafka.DurationStats)
	ObserveConsumerWaitTime(waitTime kafka.DurationStats)
	ObserveConsumerFetchSize(fetchSize kafka.SummaryStats)
	ObserveConsumerFetchBytes(fetchBytes kafka.SummaryStats)

	ObserveConsumerOffset(offset int64)
	ObserveConsumerLag(lag int64)
	ObserveConsumerQueueLength(queueLength int64)
	ObserveConsumerQueueCapacity(queueCapacity int64)
}

nolint:interfacebloat

type Metrics added in v1.47.0

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics added in v1.47.0

func NewMetrics(sendMetricPeriod time.Duration, reader *kafka.Reader, consumerId string) *Metrics

func (*Metrics) Close added in v1.47.0

func (m *Metrics) Close()

func (*Metrics) Run added in v1.47.0

func (m *Metrics) Run()

func (*Metrics) Send added in v1.47.0

func (m *Metrics) Send(stats kafka.ReaderStats)

type Middleware

type Middleware func(next Handler) Handler

type NoopObserver

type NoopObserver struct{}

func (NoopObserver) BeginConsuming

func (n NoopObserver) BeginConsuming()

func (NoopObserver) CloseDone

func (n NoopObserver) CloseDone()

func (NoopObserver) CloseStart

func (n NoopObserver) CloseStart()

func (NoopObserver) ConsumerError

func (n NoopObserver) ConsumerError(err error)

type Observer

type Observer interface {
	ConsumerError(err error)
	BeginConsuming()
	CloseStart()
	CloseDone()
}

type Option

type Option func(p *Consumer)

func WithMiddlewares

func WithMiddlewares(mws ...Middleware) Option

func WithObserver

func WithObserver(observer Observer) Option

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL