batch_handler

package
v1.64.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2026 License: MIT Imports: 7 Imported by: 0

README

Package batch_handler

Пакет batch_handler предоставляет инструменты для пакетной обработки сообщений RabbitMQ, включая базовые обработчики, middleware и управление результатами.

Types

Handler

Обработчик для пакетной обработки сообщений из очереди RabbitMQ.

Methods:

New(adapter SyncHandlerAdapter, purgeInterval time.Duration, maxSize int) *Handler

Конструктор обработчика. Принимает адаптер бизнес-логики, который должен реализовывать интерфейс SyncHandlerAdapter или быть преобразованным к SyncHandlerAdapterFunc, если это функция-обработчик. Также принимает интервал очистки очереди и максимальный ее размер.

(r *Handler) Handle(ctx context.Context, delivery *consumer.Delivery)

Добавить сообщение в текущий батч.

(r *Handler) Close()

Завершить работу обработчика сообщений.

Sync

Структура Sync реализует пакетный обработчик сообщений с поддержкой middleware.

Methods:

NewSync(logger log.Logger, adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

Конструктор синхронного обработчика, принимающий на вход адаптер бизнес-логики, который должен реализовывать интерфейс SyncHandlerAdapter или быть преобразованным к SyncHandlerAdapterFunc, если это функция-обработчик.

Опциональные middleware:

  • Metrics(metricStorage ConsumerMetricStorage) Middleware – middleware для сбора метрик, регистрирующая время обработки, размер сообщения, статусы (Ack/Retry/DLQ). Принимает на вход хранилище метрик, реализующее интерфейс ConsumerMetricStorage.
  • Log(logger log.Logger) Middleware – логирования событий обработки.
  • Recovery(logger log.Logger) Middleware – предотвращает падение сервиса при панике в обработчике, логируя ее в ошибку.
(r Sync) Handle(items []Item)

Выполняет обработку пакета сообщений и применяет установленный результат (Ack/Retry/MoveToDlq) для каждого из них. Логирует ошибки при выполнении операций с брокером.

Result

Структура Result содержит результат обработки сообщения.

BatchItem

Структура BatchItem содержит обрабатываемое сообщение из принятого на обработку пакета, позволяет устанавливать и управлять результатом обработки сообщения.

(b *BatchItem) Ack()

Установка результата сообщения как успешно обработанного.

(b *BatchItem) MoveToDlq(err error)

Установка результата сообщения как требующего повторной обработки, с логированием ошибки.

(b *BatchItem) Retry(err error)

Установка результата сообщения как неуспешно обработанного, с отправкой в DLQ и логированием ошибки.

BatchItem

Структура BatchItems содержит принятый на обработку пакет сообщений, позволяет устанавливать и управлять результатом обработки всех сообщений пакета.

(bs BatchItems) AckAll()

Установка результата сообщений как успешно обработанных.

(bs BatchItems) MoveToDlqAll(err error)

Установка результата сообщений как требующих повторной обработки, с логированием ошибки.

(bs BatchItems) RetryAll(err error)

Установка результата сообщений как неуспешно обработанных, с отправкой в DLQ и логированием ошибки.

Usage

Custom adapter
package main

import (
	"log"

	"github.com/txix-open/grmq/consumer"
	"github.com/txix-open/isp-kit/grmqx"
	"github.com/txix-open/isp-kit/grmqx/batch_handler"
	log2 "github.com/txix-open/isp-kit/log"
)

type customHandler struct{}

func (h customHandler) Handle(batch batch_handler.BatchItems) {
	for _, item := range batch {
		/* put here business logic */
		item.Ack()
		/*
			if err != nil {
			    item.MoveToDlq(err)
			}
		*/
	}
}

func main() {
	logger, err := log2.New()
	if err != nil {
		log.Fatal(err)
	}

	var (
		metricStorage = NewMetricStorage() /* ConsumerMetricStorage interface implementation */
		adapter       customHandler
	)
	handler := batch_handler.NewSync(logger, adapter, []batch_handler.Middleware{
		batch_handler.Metrics(metricStorage),
		batch_handler.Log(logger),
		batch_handler.Recovery(logger),
	}...)

	/* handler's call for example */
	batch := make([]*batch_handler.BatchItem, 0) /* placeholder for example */
	handler.Handle(batch)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchItem

type BatchItem struct {
	Context  context.Context
	Delivery *consumer.Delivery
	Result   Result
}

nolint:containedctx

func (*BatchItem) Ack

func (b *BatchItem) Ack()

func (*BatchItem) MoveToDlq

func (b *BatchItem) MoveToDlq(err error)

func (*BatchItem) Retry

func (b *BatchItem) Retry(err error)

type BatchItems

type BatchItems []*BatchItem

func (BatchItems) AckAll

func (bs BatchItems) AckAll()

func (BatchItems) MoveToDlqAll

func (bs BatchItems) MoveToDlqAll(err error)

func (BatchItems) RetryAll

func (bs BatchItems) RetryAll(err error)

type ConsumerMetricStorage

type ConsumerMetricStorage interface {
	ObserveConsumeDuration(exchange string, routingKey string, t time.Duration)
	ObserveConsumeMsgSize(exchange string, routingKey string, size int)
	IncDlqCount(exchange string, routingKey string)
	IncSuccessCount(exchange string, routingKey string)
	IncRetryCount(exchange string, routingKey string)
}

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

func New

func New(adapter SyncHandlerAdapter, purgeInterval time.Duration, maxSize int) *Handler

func (*Handler) Close

func (r *Handler) Close()

func (*Handler) Handle

func (r *Handler) Handle(ctx context.Context, delivery *consumer.Delivery)

type Middleware

type Middleware func(next SyncHandlerAdapter) SyncHandlerAdapter

func Log

func Log(logger log.Logger) Middleware

func Metrics

func Metrics(metricStorage ConsumerMetricStorage) Middleware

func Recovery

func Recovery(logger log.Logger) Middleware

nolint:nonamedreturns

type Result

type Result struct {
	Ack       bool
	Retry     bool
	MoveToDlq bool
	Err       error
}

type Sync

type Sync struct {
	// contains filtered or unexported fields
}

func NewSync

func NewSync(logger log.Logger, adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

func (Sync) Handle

func (r Sync) Handle(batch BatchItems)

type SyncHandlerAdapter

type SyncHandlerAdapter interface {
	Handle(batch BatchItems)
}

type SyncHandlerAdapterFunc

type SyncHandlerAdapterFunc func(batch BatchItems)

func (SyncHandlerAdapterFunc) Handle

func (a SyncHandlerAdapterFunc) Handle(batch BatchItems)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL