handler

package
v1.59.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2025 License: MIT Imports: 5 Imported by: 0

README

Package handler

Пакет handler предоставляет инструменты для обработки сообщений RabbitMQ, включая базовые обработчики, middleware и управление результатами.

Types

Sync

Структура Sync реализует обработчик сообщений с поддержкой middleware.

Methods:

NewSync(logger log.Logger, adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

Конструктор синхронного обработчика, принимающий на вход адаптер бизнес-логики, который должен реализовывать интерфейс SyncHandlerAdapter или быть преобразованным к SyncHandlerAdapterFunc, если это функция-обработчик.

Опциональные middleware:

  • Metrics(metricStorage ConsumerMetricStorage) Middleware – middleware для сбора метрик, регистрирующая время обработки, размер сообщения, статусы (Ack/Requeue/Retry/DLQ). Принимает на вход хранилище метрик, реализующее интерфейс ConsumerMetricStorage.
  • Log(logger log.Logger) Middleware – логирования событий обработки.
  • Recovery() Middleware – предотвращает падение сервиса при панике в обработчике, преобразуя ее в ошибку.
(r Sync) Handle(ctx context.Context, delivery *consumer.Delivery)

Выполняет обработку сообщения и применяет результат (Ack/Requeue/Retry/MoveToDlq). Логирует ошибки при выполнении операций с брокером.

Usage

Custom adapter
package main

import (
	"context"
	"log"

	"github.com/txix-open/grmq/consumer"
	"github.com/txix-open/isp-kit/grmqx/handler"
	log2 "github.com/txix-open/isp-kit/log"
)

type customHandler struct{}

func (h customHandler) Handle(ctx context.Context, delivery *consumer.Delivery) handler.Result {
	/* put here business logic */
	return handler.Ack()
}

func main() {
	logger, err := log2.New()
	if err != nil {
		log.Fatal(err)
	}

	var (
		metricStorage = NewMetricStorage() /* ConsumerMetricStorage interface implementation */
		adapter       customHandler
	)
	syncHandler := handler.NewSync(logger, adapter, []handler.Middleware{
		handler.Metrics(metricStorage),
		handler.Log(logger),
	}...)

	/* handler's call for example */
	delivery := new(consumer.Delivery) /* placeholder for example */
	syncHandler.Handle(context.Background(), delivery)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerMetricStorage

type ConsumerMetricStorage interface {
	ObserveConsumeDuration(exchange string, routingKey string, t time.Duration)
	ObserveConsumeMsgSize(exchange string, routingKey string, size int)
	IncRequeueCount(exchange string, routingKey string)
	IncDlqCount(exchange string, routingKey string)
	IncSuccessCount(exchange string, routingKey string)
	IncRetryCount(exchange string, routingKey string)
}

type Middleware

type Middleware func(next SyncHandlerAdapter) SyncHandlerAdapter

func Log

func Log(logger log.Logger) Middleware

func Metrics

func Metrics(metricStorage ConsumerMetricStorage) Middleware

func Recovery added in v1.53.0

func Recovery() Middleware

nolint:nonamedreturns

type Result

type Result struct {
	Ack            bool
	Requeue        bool
	RequeueTimeout time.Duration
	Retry          bool
	MoveToDlq      bool
	Err            error
}

func Ack

func Ack() Result

func MoveToDlq

func MoveToDlq(err error) Result

MoveToDlq if there is no DLQ, the message will be dropped

func Requeue

func Requeue(after time.Duration, err error) Result

Requeue Deprecated: use Retry and RetryPolicy instead

func Retry

func Retry(err error) Result

type Sync

type Sync struct {
	// contains filtered or unexported fields
}

func NewSync

func NewSync(logger log.Logger, adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

func (Sync) Handle

func (r Sync) Handle(ctx context.Context, delivery *consumer.Delivery)

type SyncHandlerAdapter

type SyncHandlerAdapter interface {
	Handle(ctx context.Context, delivery *consumer.Delivery) Result
}

type SyncHandlerAdapterFunc

type SyncHandlerAdapterFunc func(ctx context.Context, delivery *consumer.Delivery) Result

func (SyncHandlerAdapterFunc) Handle

func (a SyncHandlerAdapterFunc) Handle(ctx context.Context, delivery *consumer.Delivery) Result

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL