consumer

package
v1.67.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: MIT Imports: 7 Imported by: 0

README

Package consumer

Пакет consumer предоставляет реализацию консумера Apache Kafka с поддержкой параллельной обработки сообщений, middleware, метрик и наблюдения за состоянием. Интегрируется с пакетом kafkax для полноценной работы с Kafka.

Types

Consumer

Основная структура для чтения и обработки сообщений из Kafka. Поддерживает:

  • Параллельную обработку (конкурентность)
  • Middleware-цепочки

Methods:

New(client *kgo.Client, consumerGroupId string, handler Handler, concurrency int, opts ...Option) *Consumer

Создать нового консумера из клиента из библиотеки franz-go с указанным обработчиком сообщений Handler.

Основные опции:

  • WithMiddlewares(mws ...Middleware) Option – добавить middleware в цепочку обработки получаемых сообщений.
  • WithObserver(observer Observer) Option – добавить реализацию интерфейса Observer.
(c *Consumer) Run(ctx context.Context)

Запустить чтение сообщений из Kafka и их обработку.

(c *Consumer) Close() error

Остановить консумера, завершить все активные обработки.

(c *Consumer) Healthcheck(ctx context.Context) error

Проверить активность консумера. Возвращает ошибку, если консумер не может получать сообщения.

Delivery

Структура, представляющая полученное сообщение Kafka. Обеспечивает безопасное управление подтверждением (commit) сообщения.

Methods:

(d *Delivery) Commit(ctx context.Context) error

Подтвердить успешную обработку сообщения. Должен вызываться только один раз за сообщение.

(d *Delivery) Source() *kgo.Record

Получить исходное сообщение Kafka (топик, партиция, ключ, значение).

(d *Delivery) Done()

Отметить завершение обработки (используется для синхронизации).

(d *Delivery) ConsumerGroupId() string

Получить groupId консумера.

LogObserver

Реализация интерфейса consumer.Observer для логирования событий консумера.

Methods:

NewLogObserver(ctx context.Context, logger log.Logger) LogObserver

Конструктор обсервера.

(l LogObserver) ConsumerError(err error)

Залогировать сообщение об ошибке консумера.

(l LogObserver) BeginConsuming()

Залогировать сообщение о начале получения данных от консумера.

(l LogObserver) CloseStart()

Залогировать сообщение о начале процесса завершения работы консумера.

(l LogObserver) CloseDone()

Залогировать сообщение об окончании процесса завершения работы консумера.

Usage

Default usage flow
package main

import (
	"context"
	"github.com/twmb/franz-go/pkg/kgo"
	"log"

	"github.com/txix-open/isp-kit/kafkax"
	"github.com/txix-open/isp-kit/kafkax/consumer"
	log2 "github.com/txix-open/isp-kit/log"
)

func noopHandlerFn(ctx context.Context, delivery *consumer.Delivery) {
	/* put here business logic */
	_ = delivery.Commit(ctx)
}

func main() {
	logger, err := log2.New()
	if err != nil {
		log.Fatal(err)
	}

	client, err := kgo.NewClient(
		kgo.SeedBrokers("localhost:9092"),
		kgo.ConsumerGroup("test"),
		kgo.ConsumeTopics("test"))

	observer := consumer.NewLogObserver(context.Background(), logger)
	consumer := consumer.New(
		client,
		"test",
		consumer.HandlerFunc(noopHandlerFn),
		3,   /* concurrency */
		nil, /* metrics */
		consumer.WithMiddlewares(kafkax.ConsumerLog(logger, true)),
		consumer.WithObserver(observer),
	)

	consumer.Run(context.Background())
}

Documentation

Overview

Package consumer provides a high-level abstraction for consuming messages from Apache Kafka topics. It wraps the franz-go client and supports concurrent message processing with middleware chains for cross-cutting concerns like logging, metrics, and request IDs.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrDeliveryAlreadyHandled is returned when attempting to commit a delivery
	// that has already been handled.
	ErrDeliveryAlreadyHandled = errors.New("delivery already handled")
)

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer handles consuming messages from Kafka topics with configurable concurrency and middleware support. It manages offset committing and provides lifecycle hooks through the observer pattern.

func New

func New(client *kgo.Client, consumerGroupId string, handler Handler, concurrency int, opts ...Option) *Consumer

New creates a new Consumer instance with the provided Kafka client, consumer group ID, handler, and concurrency level. Options can be used to configure middlewares and the observer.

func (*Consumer) Close

func (c *Consumer) Close() error

Close gracefully shuts down the consumer, waits for pending message processing to complete, and releases the underlying Kafka client connection.

func (*Consumer) Healthcheck

func (c *Consumer) Healthcheck(ctx context.Context) error

Healthcheck returns nil if the consumer is healthy and able to fetch messages, or an error if it has encountered issues.

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context)

Run starts the consumer and begins processing messages. It returns immediately and runs message processing in a separate goroutine.

type Delivery

type Delivery struct {
	// contains filtered or unexported fields
}

Delivery represents a consumed message from Kafka. It provides methods to access the message source, commit offsets, and signal completion.

func NewDelivery

func NewDelivery(donner Donner, client *kgo.Client, source *kgo.Record, consumerGroupId string) *Delivery

NewDelivery creates a new Delivery instance with the provided configuration.

func (*Delivery) Commit

func (d *Delivery) Commit(ctx context.Context) error

Commit commits the message offset to Kafka and signals completion. Returns an error if the delivery has already been handled. Only one of Commit or Done should be called per delivery.

func (*Delivery) ConsumerGroupId

func (d *Delivery) ConsumerGroupId() string

ConsumerGroupId returns the consumer group ID associated with this delivery.

func (*Delivery) Done

func (d *Delivery) Done()

Done signals that message processing is complete without committing the offset. This is typically used when the message should be skipped or reprocessed later.

func (*Delivery) Source

func (d *Delivery) Source() *kgo.Record

Source returns the underlying Kafka record containing the consumed message.

type Donner

type Donner interface {
	Done()
}

Donner defines an interface for signaling when message processing is complete.

type Handler

type Handler interface {
	Handle(ctx context.Context, delivery *Delivery)
}

Handler defines the interface for processing consumed messages.

type HandlerFunc

type HandlerFunc func(ctx context.Context, delivery *Delivery)

HandlerFunc is an adapter that allows a function to be used as a Handler.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(ctx context.Context, delivery *Delivery)

Handle implements the Handler interface by calling the underlying function.

type LogObserver

type LogObserver struct {
	NoopObserver
	// contains filtered or unexported fields
}

LogObserver is an Observer implementation that logs consumer lifecycle events to the provided logger.

func NewLogObserver

func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver

NewLogObserver creates a new LogObserver with the provided context and logger.

func (LogObserver) BeginConsuming

func (l LogObserver) BeginConsuming()

BeginConsuming logs that the consumer has started processing messages.

func (LogObserver) CloseDone

func (l LogObserver) CloseDone()

CloseDone logs that the consumer shutdown process has completed.

func (LogObserver) CloseStart

func (l LogObserver) CloseStart()

CloseStart logs that the consumer shutdown process has begun.

func (LogObserver) ConsumerError

func (l LogObserver) ConsumerError(err error)

ConsumerError logs an unexpected consumer error.

type Middleware

type Middleware func(next Handler) Handler

Middleware is a function that wraps a Handler to add cross-cutting functionality such as logging, metrics, or request ID propagation.

type NoopObserver

type NoopObserver struct{}

NoopObserver is a no-op implementation of the Observer interface that ignores all events.

func (NoopObserver) BeginConsuming

func (n NoopObserver) BeginConsuming()

BeginConsuming does nothing.

func (NoopObserver) CloseDone

func (n NoopObserver) CloseDone()

CloseDone does nothing.

func (NoopObserver) CloseStart

func (n NoopObserver) CloseStart()

CloseStart does nothing.

func (NoopObserver) ConsumerError

func (n NoopObserver) ConsumerError(err error)

ConsumerError does nothing.

type Observer

type Observer interface {
	ConsumerError(err error)
	BeginConsuming()
	CloseStart()
	CloseDone()
}

Observer defines an interface for receiving lifecycle events from the consumer, such as error notifications and shutdown progress.

type Option

type Option func(p *Consumer)

Option is a function that configures a Consumer instance.

func WithMiddlewares

func WithMiddlewares(mws ...Middleware) Option

WithMiddlewares configures the consumer with the provided middlewares. Middlewares are applied in the order they are provided.

func WithObserver

func WithObserver(observer Observer) Option

WithObserver configures the consumer with the provided observer for lifecycle event notifications.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL