batch_handler

package
v1.67.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: MIT Imports: 7 Imported by: 0

README

Package batch_handler

Пакет batch_handler предоставляет инструменты для пакетной обработки сообщений RabbitMQ, включая базовые обработчики, middleware и управление результатами.

Types

Handler

Обработчик для пакетной обработки сообщений из очереди RabbitMQ.

Methods:

New(adapter SyncHandlerAdapter, purgeInterval time.Duration, maxSize int) *Handler

Конструктор обработчика. Принимает адаптер бизнес-логики, который должен реализовывать интерфейс SyncHandlerAdapter или быть преобразованным к SyncHandlerAdapterFunc, если это функция-обработчик. Также принимает интервал очистки очереди и максимальный ее размер.

(r *Handler) Handle(ctx context.Context, delivery *consumer.Delivery)

Добавить сообщение в текущий батч.

(r *Handler) Close()

Завершить работу обработчика сообщений.

Sync

Структура Sync реализует пакетный обработчик сообщений с поддержкой middleware.

Methods:

NewSync(logger log.Logger, adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

Конструктор синхронного обработчика, принимающий на вход адаптер бизнес-логики, который должен реализовывать интерфейс SyncHandlerAdapter или быть преобразованным к SyncHandlerAdapterFunc, если это функция-обработчик.

Опциональные middleware:

  • Metrics(metricStorage ConsumerMetricStorage) Middleware – middleware для сбора метрик, регистрирующая время обработки, размер сообщения, статусы (Ack/Retry/DLQ). Принимает на вход хранилище метрик, реализующее интерфейс ConsumerMetricStorage.
  • Log(logger log.Logger) Middleware – логирования событий обработки.
  • Recovery(logger log.Logger) Middleware – предотвращает падение сервиса при панике в обработчике, логируя ее в ошибку.
(r Sync) Handle(items []Item)

Выполняет обработку пакета сообщений и применяет установленный результат (Ack/Retry/MoveToDlq) для каждого из них. Логирует ошибки при выполнении операций с брокером.

Result

Структура Result содержит результат обработки сообщения.

BatchItem

Структура BatchItem содержит обрабатываемое сообщение из принятого на обработку пакета, позволяет устанавливать и управлять результатом обработки сообщения.

(b *BatchItem) Ack()

Установка результата сообщения как успешно обработанного.

(b *BatchItem) MoveToDlq(err error)

Установка результата сообщения как требующего повторной обработки, с логированием ошибки.

(b *BatchItem) Retry(err error)

Установка результата сообщения как неуспешно обработанного, с отправкой в DLQ и логированием ошибки.

BatchItem

Структура BatchItems содержит принятый на обработку пакет сообщений, позволяет устанавливать и управлять результатом обработки всех сообщений пакета.

(bs BatchItems) AckAll()

Установка результата сообщений как успешно обработанных.

(bs BatchItems) MoveToDlqAll(err error)

Установка результата сообщений как требующих повторной обработки, с логированием ошибки.

(bs BatchItems) RetryAll(err error)

Установка результата сообщений как неуспешно обработанных, с отправкой в DLQ и логированием ошибки.

Usage

Custom adapter
package main

import (
	"log"

	"github.com/txix-open/grmq/consumer"
	"github.com/txix-open/isp-kit/grmqx"
	"github.com/txix-open/isp-kit/grmqx/batch_handler"
	log2 "github.com/txix-open/isp-kit/log"
)

type customHandler struct{}

func (h customHandler) Handle(batch batch_handler.BatchItems) {
	for _, item := range batch {
		/* put here business logic */
		item.Ack()
		/*
			if err != nil {
			    item.MoveToDlq(err)
			}
		*/
	}
}

func main() {
	logger, err := log2.New()
	if err != nil {
		log.Fatal(err)
	}

	var (
		metricStorage = NewMetricStorage() /* ConsumerMetricStorage interface implementation */
		adapter       customHandler
	)
	handler := batch_handler.NewSync(logger, adapter, []batch_handler.Middleware{
		batch_handler.Metrics(metricStorage),
		batch_handler.Log(logger),
		batch_handler.Recovery(logger),
	}...)

	/* handler's call for example */
	batch := make([]*batch_handler.BatchItem, 0) /* placeholder for example */
	handler.Handle(batch)
}

Documentation

Overview

Package batch_handler provides batch message processing for RabbitMQ consumers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchItem

type BatchItem struct {
	// Context contains the request context.
	Context context.Context
	// Delivery contains the raw message delivery from the consumer.
	Delivery *consumer.Delivery
	// Result stores the processing outcome.
	Result Result
}

BatchItem represents a single message in a batch with its processing result.

func (*BatchItem) Ack

func (b *BatchItem) Ack()

Ack sets the result to indicate successful message acknowledgment.

func (*BatchItem) MoveToDlq

func (b *BatchItem) MoveToDlq(err error)

MoveToDlq sets the result to indicate the message should be moved to the DLQ.

func (*BatchItem) Retry

func (b *BatchItem) Retry(err error)

Retry sets the result to indicate the message should be retried.

type BatchItems

type BatchItems []*BatchItem

BatchItems is a slice of batch items.

func (BatchItems) AckAll

func (bs BatchItems) AckAll()

AckAll sets all items to be acknowledged.

func (BatchItems) MoveToDlqAll

func (bs BatchItems) MoveToDlqAll(err error)

MoveToDlqAll sets all items to be moved to the DLQ.

func (BatchItems) RetryAll

func (bs BatchItems) RetryAll(err error)

RetryAll sets all items to be retried.

type ConsumerMetricStorage

type ConsumerMetricStorage interface {
	ObserveConsumeDuration(exchange string, routingKey string, t time.Duration)
	ObserveConsumeMsgSize(exchange string, routingKey string, size int)
	IncDlqCount(exchange string, routingKey string)
	IncSuccessCount(exchange string, routingKey string)
	IncRetryCount(exchange string, routingKey string)
}

ConsumerMetricStorage defines an interface for consumer metrics storage.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler accumulates messages and processes them in batches. It triggers processing when the batch reaches maxSize or when the purgeInterval elapses.

func New

func New(adapter SyncHandlerAdapter, purgeInterval time.Duration, maxSize int) *Handler

New creates a new batch handler with the specified adapter, purge interval, and max batch size.

func (*Handler) Close

func (r *Handler) Close()

Close stops the batch handler and prevents further message processing.

func (*Handler) Handle

func (r *Handler) Handle(ctx context.Context, delivery *consumer.Delivery)

Handle adds a message to the batch and triggers processing if needed. If the handler is closed, the message is negatively acknowledged.

type Middleware

type Middleware func(next SyncHandlerAdapter) SyncHandlerAdapter

Middleware is a function that wraps a SyncHandlerAdapter.

func Log

func Log(logger log.Logger) Middleware

Log creates a middleware that logs batch message processing results with appropriate log levels based on the outcome (Ack, Retry, or MoveToDlq).

func Metrics

func Metrics(metricStorage ConsumerMetricStorage) Middleware

Metrics creates a middleware that collects consumer metrics for batch processing, including message processing duration, message size, and result counts (success, retry, DLQ).

func Recovery

func Recovery(logger log.Logger) Middleware

Recovery creates a middleware that recovers from panics during batch message processing. On panic, the error is logged but message acknowledgment state remains unchanged.

type Result

type Result struct {
	// Ack indicates the message should be acknowledged (successfully processed).
	Ack bool
	// Retry indicates the message should be retried using the retry policy.
	Retry bool
	// MoveToDlq indicates the message should be moved to the dead letter queue.
	MoveToDlq bool
	// Err contains the error that occurred during processing.
	Err error
}

Result represents the outcome of batch message processing.

type Sync

type Sync struct {
	// contains filtered or unexported fields
}

Sync wraps a handler with middleware and manages batch message acknowledgment.

func NewSync

func NewSync(logger log.Logger, adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

NewSync creates a new Sync handler with the specified logger, adapter, and middleware. Middleware functions are applied in reverse order (last to first).

func (Sync) Handle

func (r Sync) Handle(batch BatchItems)

Handle processes a batch of messages and performs the appropriate action for each message based on its Result (Ack, Retry, or MoveToDlq).

type SyncHandlerAdapter

type SyncHandlerAdapter interface {
	Handle(batch BatchItems)
}

SyncHandlerAdapter defines the interface for synchronous batch message handlers.

type SyncHandlerAdapterFunc

type SyncHandlerAdapterFunc func(batch BatchItems)

SyncHandlerAdapterFunc is an adapter that allows using functions as SyncHandlerAdapters.

func (SyncHandlerAdapterFunc) Handle

func (a SyncHandlerAdapterFunc) Handle(batch BatchItems)

Handle handles a batch of messages using the function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL