handler

package
v1.67.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: MIT Imports: 5 Imported by: 0

README

Package handler

Пакет handler предоставляет инструменты для обработки сообщений RabbitMQ, включая базовые обработчики, middleware и управление результатами.

Types

Sync

Структура Sync реализует обработчик сообщений с поддержкой middleware.

Methods:

NewSync(logger log.Logger, adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

Конструктор синхронного обработчика, принимающий на вход адаптер бизнес-логики, который должен реализовывать интерфейс SyncHandlerAdapter или быть преобразованным к SyncHandlerAdapterFunc, если это функция-обработчик.

Опциональные middleware:

  • Metrics(metricStorage ConsumerMetricStorage) Middleware – middleware для сбора метрик, регистрирующая время обработки, размер сообщения, статусы (Ack/Requeue/Retry/DLQ). Принимает на вход хранилище метрик, реализующее интерфейс ConsumerMetricStorage.
  • Log(logger log.Logger) Middleware – логирования событий обработки.
  • Recovery() Middleware – предотвращает падение сервиса при панике в обработчике, преобразуя ее в ошибку.
(r Sync) Handle(ctx context.Context, delivery *consumer.Delivery)

Выполняет обработку сообщения и применяет результат (Ack/Requeue/Retry/MoveToDlq). Логирует ошибки при выполнении операций с брокером.

Usage

Custom adapter
package main

import (
	"context"
	"log"

	"github.com/txix-open/grmq/consumer"
	"github.com/txix-open/isp-kit/grmqx/handler"
	log2 "github.com/txix-open/isp-kit/log"
)

type customHandler struct{}

func (h customHandler) Handle(ctx context.Context, delivery *consumer.Delivery) handler.Result {
	/* put here business logic */
	return handler.Ack()
}

func main() {
	logger, err := log2.New()
	if err != nil {
		log.Fatal(err)
	}

	var (
		metricStorage = NewMetricStorage() /* ConsumerMetricStorage interface implementation */
		adapter       customHandler
	)
	syncHandler := handler.NewSync(logger, adapter, []handler.Middleware{
		handler.Metrics(metricStorage),
		handler.Log(logger),
	}...)

	/* handler's call for example */
	delivery := new(consumer.Delivery) /* placeholder for example */
	syncHandler.Handle(context.Background(), delivery)
}

Documentation

Overview

Package handler provides synchronous message processing for RabbitMQ consumers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerMetricStorage

type ConsumerMetricStorage interface {
	ObserveConsumeDuration(exchange string, routingKey string, t time.Duration)
	ObserveConsumeMsgSize(exchange string, routingKey string, size int)
	IncRequeueCount(exchange string, routingKey string)
	IncDlqCount(exchange string, routingKey string)
	IncSuccessCount(exchange string, routingKey string)
	IncRetryCount(exchange string, routingKey string)
}

ConsumerMetricStorage defines an interface for consumer metrics storage.

type Middleware

type Middleware func(next SyncHandlerAdapter) SyncHandlerAdapter

Middleware is a function that wraps a SyncHandlerAdapter.

func Log

func Log(logger log.Logger) Middleware

Log creates a middleware that logs message processing results with appropriate log levels based on the outcome (Ack, Requeue, Retry, or MoveToDlq).

func Metrics

func Metrics(metricStorage ConsumerMetricStorage) Middleware

Metrics creates a middleware that collects consumer metrics including message processing duration, message size, and result counts (success, requeue, retry, DLQ).

func Recovery added in v1.53.0

func Recovery() Middleware

Recovery creates a middleware that recovers from panics during message processing. On panic, the message is moved to the DLQ and the error is recorded.

type Result

type Result struct {
	// Ack indicates the message should be acknowledged (successfully processed).
	Ack bool
	// Requeue indicates the message should be requeued after a delay.
	Requeue bool
	// RequeueTimeout specifies the delay before requeuing the message.
	RequeueTimeout time.Duration
	// Retry indicates the message should be retried using the retry policy.
	Retry bool
	// MoveToDlq indicates the message should be moved to the dead letter queue.
	MoveToDlq bool
	// Err contains the error that occurred during processing.
	Err error
}

Result represents the outcome of message processing.

func Ack

func Ack() Result

Ack creates a Result indicating successful message processing.

func MoveToDlq

func MoveToDlq(err error) Result

MoveToDlq creates a Result indicating the message should be moved to the dead letter queue. If no DLQ is configured, the message will be dropped.

func Requeue deprecated

func Requeue(after time.Duration, err error) Result

Requeue creates a Result indicating the message should be requeued after a delay.

Deprecated: use Retry and RetryPolicy instead.

func Retry

func Retry(err error) Result

Retry creates a Result indicating the message should be retried.

type Sync

type Sync struct {
	// contains filtered or unexported fields
}

Sync wraps a handler with middleware and manages message acknowledgment.

func NewSync

func NewSync(logger log.Logger, adapter SyncHandlerAdapter, middlewares ...Middleware) Sync

NewSync creates a new Sync handler with the specified logger, adapter, and middleware. Middleware functions are applied in reverse order (last to first).

func (Sync) Handle

func (r Sync) Handle(ctx context.Context, delivery *consumer.Delivery)

Handle processes a message delivery and performs the appropriate action based on the Result (Ack, Requeue, Retry, or MoveToDlq).

type SyncHandlerAdapter

type SyncHandlerAdapter interface {
	Handle(ctx context.Context, delivery *consumer.Delivery) Result
}

SyncHandlerAdapter defines the interface for synchronous message handlers.

type SyncHandlerAdapterFunc

type SyncHandlerAdapterFunc func(ctx context.Context, delivery *consumer.Delivery) Result

SyncHandlerAdapterFunc is an adapter that allows using functions as SyncHandlerAdapters.

func (SyncHandlerAdapterFunc) Handle

func (a SyncHandlerAdapterFunc) Handle(ctx context.Context, delivery *consumer.Delivery) Result

Handle handles a message delivery using the function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL