grmqx

package
v1.67.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: MIT Imports: 21 Imported by: 3

README

Package grmqx

Пакет grmqx предназначен для работы с брокером сообщений RabbitMQ, предоставляющий высокоуровневую абстракцию над grmq с дополнительными возможностями:

  • Автоматическое объявление топологии
  • Пакетная обработка сообщений
  • Интеграция с метриками и трейсингом
  • Гибкая система повторов и автогенерация DLQ
  • Контекстное логирование
  • Поддержка аргументов очередей (x-single-active-consumer и др.)

Types

Client

Структура Client управляет подключением к RabbitMQ и жизненным циклом обработчиков.

Methods:

New(logger log.Logger) *Client

Конструктор клиента RabbitMQ.

(c *Client) Upgrade(ctx context.Context, config Config) error

Обновить конфигурацию, синхронно инициализировать клиент с гарантией готовности всех компонентов:

  • Блокировка и ожидание первого успешно установленной сессии.
  • Запуск всех консумеров, инициализация всех паблишеров и применение всех объявлений.
  • Вернет первую возникшую ошибку во время открытия первой сессии или nil
(c *Client) UpgradeAndServe(ctx context.Context, config Config)

Аналогично методу Upgrade обновляет конфигурацию, но не ждет первой успешно установленной сессии. Передает в Observer возникшие ошибки и ретраит.

(c *Client) Healthcheck(ctx context.Context) error

Проверить возможность подключения к брокеру.

(c *Client) Close()

Закрыть соединения и остановить клиент.

(c *Client) DeleteQueues(ctx context.Context, queueNames ...string) error

Удалить очереди брокера, указанные в массиве. При ошибке удаления конкретной очереди, происходит логирование ошибки и процесс удаления других очередей, при наличии таковых, продолжается. Если переданный массив имён пуст, то процесс сразу прекращается без ошибок.

Connection

Конфигурация параметров подключения.

Methods:

(c Connection) Url() string

Получить URL подключения к RabbitMQ.

Publisher

Конфигурация параметров паблишера.

Methods:

(p Publisher) DefaultPublisher(restMiddlewares ...publisher.Middleware) *publisher.Publisher

Создать паблишера с предустановленными middleware и настройками:

  • PersistentMode.
  • Генерация и добавление в заголовки requestId.
  • Метрики и трейсинг.

Опциональные middleware:

  • PublisherLog(logger log.Logger, logBody bool) publisher.Middleware – логировать публикуемые сообщения.
  • PublisherRequestId() publisher.Middleware – генерация и добавление в заголовки requestId (установленно по-умолчанию).
  • PublisherRetry(retrier Retrier) publisher.Middleware – добавить ретраи при возникновении ошибок публикации при помощи объекта, реализующего интерфейс Retrier.
  • PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware – добавить метрики при помощи объекта, реализующего интерфейс PublisherMetricStorage (установленно по-умолчанию).
Consumer

Конфигурация параметров консумера.

Methods:

(c Consumer) DefaultConsumer(handler consumer.Handler, restMiddlewares ...consumer.Middleware) consumer.Consumer

Создать консумера с обработчиком сообщений, реализующим интерфейс consumer.Handler, и базовыми настройками.

Опциональные middleware:

  • ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware – логирование информации о получаемых сообщениях; можно включить/выключить логирование тела сообщения.
  • ConsumerRequestId() consumer.Middleware – получить requestId из заголовка и сохранить его в контексте (установленно по-умолчанию).
BatchConsumer

Конфигурация параметров батч-консумера.

Methods:

(b BatchConsumer) ConsumerConfig() Consumer

Конвертирует BatchConsumer в стандартную конфигурацию Consumer. Фиксирует Concurrency = 1 и наследует все остальные параметры.

(b BatchConsumer) DefaultConsumer(handler batch_handler.SyncHandlerAdapter, restMiddlewares ...consumer.Middleware) consumer.Consumer

Создать батч-консумера с пакетной обработкой сообщений. Обработчик сообщений должен реализовывать интерфейс batch_handler.SyncHandlerAdapter или быть преобразованным к batch_handler.SyncHandlerAdapterFunc, если это функция-обработчик.

LogObserver

Реализация интерфейса grmq.Observer для логирования событий RabbitMQ-клиента.

Methods:

NewLogObserver(ctx context.Context, logger log.Logger) LogObserver

Конструктор обсервера.

(l LogObserver) ClientReady()

Залогировать сообщение о готовности RabbitMQ-клиента.

(l LogObserver) ClientError(err error)

Залогировать сообщение об ошибке RabbitMQ-клиента.

(l LogObserver) ConsumerError(consumer consumer.Consumer, err error)

Залогировать сообщение об ошибке консумера.

(l LogObserver) PublisherError(publisher *publisher.Publisher, err error)

Залогировать сообщение об ошибке паблишера.

(l LogObserver) ShutdownStarted()

Залогировать сообщение о начале процесса завершения работы RabbitMQ-клиента.

(l LogObserver) ShutdownDone()

Залогировать сообщение об окончании процесса завершения работы RabbitMQ-клиента.

(l LogObserver) PublishingFlow(publisher *publisher.Publisher, flow bool)

Залогировать сообщение с информацией о потоке публикации.

(l LogObserver) ConnectionBlocked(reason string)

Залогировать сообщение о блокировке соединения с указанием причины.

(l LogObserver) ConnectionUnblocked()

Залогировать сообщение о разблокировке соединения.

Functions

TopologyFromConsumers(consumers ...Consumer) topology.Declarations

Сгенерировать декларации топологии RabbitMQ на основе конфигураций консумеров.

JoinDeclarations(declarations ...topology.Declarations) topology.Declarations

Объединить несколько деклараций топологии в одну..

NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync

Создает готовый синхронный обработчик сообщений RabbitMQ с предустановленными инструментами для:

  • Логирования
  • Сбора метрик
  • Трейсинга
  • Восстановления при панике
NewResultBatchHandler(logger log.Logger, adapter batch_handler.SyncHandlerAdapter) batch_handler.Sync

Создает готовый синхронный пакетный обработчик сообщений RabbitMQ с предустановленными инструментами для:

  • Логирования
  • Сбора метрик
  • Восстановления при панике

Usage

Consumer & publisher
package main

import (
	"context"
	"log"

	"github.com/txix-open/grmq/consumer"
	"github.com/txix-open/isp-kit/grmqx"
	"github.com/txix-open/isp-kit/grmqx/handler"
	log2 "github.com/txix-open/isp-kit/log"
)

type noopHandler struct{}

func (h noopHandler) Handle(ctx context.Context, delivery *consumer.Delivery) handler.Result {
	/* put here business logic */
	return handler.Ack()
}

func main() {
	logger, err := log2.New()
	if err != nil {
		log.Fatal(err)
	}

	rmqCli := grmqx.New(logger)
	conn := grmqx.Connection{
		Host:     "test",
		Port:     5672,
		Username: "test",
		Password: "test",
		Vhost:    "/",
	}
	publisherCfg := grmqx.Publisher{
		Exchange:   "",
		RoutingKey: "queue-2",
	}
	consumerCfg := grmqx.Consumer{Queue: "queue"}

	/* create consumer & publisher from configs */
	consumer := consumerCfg.DefaultConsumer(
		grmqx.NewResultHandler(logger, noopHandler{}),
		grmqx.ConsumerLog(logger, true),
	)
	publisher := publisherCfg.DefaultPublisher()
	err = rmqCli.Upgrade(context.Background(), grmqx.NewConfig(
		conn.Url(),
		grmqx.WithConsumers(consumer),
		grmqx.WithPublishers(publisher),
		grmqx.WithDeclarations(grmqx.TopologyFromConsumers(consumerCfg)),
	))
	if err != nil {
		log.Fatal(err)
	}
}

Batch consumer
package main

import (
	"context"
	"log"

	"github.com/txix-open/isp-kit/grmqx"
	"github.com/txix-open/isp-kit/grmqx/batch_handler"
	log2 "github.com/txix-open/isp-kit/log"
)

type batchHandler struct{}

func (h batchHandler) Handle(batch batch_handler.BatchItems) {
	/* put here business logic */
	batch.AckAll()
}

func main() {
	logger, err := log2.New()
	if err != nil {
		log.Fatal(err)
	}

	rmqCli := grmqx.New(logger)
	conn := grmqx.Connection{
		Host:     "test",
		Port:     5672,
		Username: "test",
		Password: "test",
		Vhost:    "/",
	}

	consumerCfg := grmqx.BatchConsumer{
		Queue:             "queue-1",
		BatchSize:         100,
		PurgeIntervalInMs: 60000,
	}
	consumer := consumerCfg.DefaultConsumer(
		batchHandler{},
		grmqx.ConsumerLog(logger, true),
	)

	err = rmqCli.Upgrade(context.Background(), grmqx.NewConfig(
		conn.Url(),
		grmqx.WithConsumers(consumer),
		grmqx.WithDeclarations(grmqx.TopologyFromConsumers(
			consumerCfg.ConsumerConfig(),
		)),
	))
	if err != nil {
		log.Fatal(err)
	}
}

Documentation

Overview

Package grmqx provides a high-level wrapper for the RabbitMQ client, offering automatic topology declaration, integration with metrics and tracing, flexible retry policies, and contextual logging.

Index

Constants

View Source
const (
	// DefaultHeartbeat specifies the default heartbeat interval for RabbitMQ connections.
	DefaultHeartbeat = 3 * time.Second
	// DefaultDialTimeout specifies the default timeout for establishing connections.
	DefaultDialTimeout = 5 * time.Second
)

Variables

This section is empty.

Functions

func ConsumerLog

func ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware

ConsumerLog creates a consumer middleware that logs consumed messages. When logBody is true, the message body is included in the log output.

func ConsumerRequestId

func ConsumerRequestId() consumer.Middleware

ConsumerRequestId creates a consumer middleware that extracts request IDs from message headers and propagates them in the context. If no request ID is found, a new one is generated. It also adds the request ID to the context logger.

func JoinDeclarations

func JoinDeclarations(declarations ...topology.Declarations) topology.Declarations

JoinDeclarations merges multiple topology declarations into one.

func NewResultBatchHandler added in v1.59.0

func NewResultBatchHandler(logger log.Logger, adapter batch_handler.SyncHandlerAdapter) batch_handler.Sync

NewResultBatchHandler creates a ready-to-use synchronous batch RabbitMQ message handler with pre-configured tools for logging, metrics collection, and panic recovery.

func NewResultHandler

func NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync

NewResultHandler creates a ready-to-use synchronous RabbitMQ message handler with pre-configured tools for logging, metrics collection, tracing, and panic recovery.

func PublisherLog

func PublisherLog(logger log.Logger, logBody bool) publisher.Middleware

PublisherLog creates a publisher middleware that logs published messages. When logBody is true, the message body is included in the log output.

func PublisherMetrics

func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware

PublisherMetrics creates a middleware that collects publisher metrics including message size, publication duration, and error counts.

func PublisherRequestId

func PublisherRequestId() publisher.Middleware

PublisherRequestId creates a publisher middleware that generates and injects request IDs into message headers. If a request ID already exists in the context, it is used; otherwise, a new one is generated.

func PublisherRetry added in v1.50.0

func PublisherRetry(retrier Retrier) publisher.Middleware

PublisherRetry creates a middleware for retrying message publications. It is recommended to use this middleware after logging middleware to avoid duplicate logging of publication attempts.

func TopologyFromConsumers

func TopologyFromConsumers(consumers ...Consumer) topology.Declarations

TopologyFromConsumers generates RabbitMQ topology declarations based on consumer configurations.

Types

type BatchConsumer

type BatchConsumer struct {
	Queue              string         `validate:"required" schema:"Наименование очереди"`
	Dlq                bool           `schema:"Создать очередь DLQ"`
	BatchSize          int            `validate:"required" schema:"Количество сообщений в пачке"`
	PurgeIntervalInMs  int            `validate:"required" schema:"Интервал обработки"`
	DisableAutoDeclare bool           `` /* 182-byte string literal not displayed */
	Binding            *Binding       `schema:"Настройки топологии"`
	RetryPolicy        *RetryPolicy   `schema:"Политика повторной обработки"`
	QueueArgs          map[string]any `schema:"Аргументы очереди"`
}

BatchConsumer represents batch consumer configuration.

func (BatchConsumer) ConsumerConfig

func (b BatchConsumer) ConsumerConfig() Consumer

ConsumerConfig converts BatchConsumer to a standard Consumer configuration. Fixes Concurrency to 1 and inherits all other parameters.

func (BatchConsumer) DefaultConsumer

func (b BatchConsumer) DefaultConsumer(handler batch_handler.SyncHandlerAdapter, restMiddlewares ...consumer.Middleware) consumer.Consumer

DefaultConsumer creates a batch consumer with batch message processing. The handler must implement batch_handler.SyncHandlerAdapter or be convertible to batch_handler.SyncHandlerAdapterFunc if it is a function-based handler.

type Binding

type Binding struct {
	Exchange     string `validate:"required" schema:"Точка обмена"`
	ExchangeType string `validate:"required,oneof=direct fanout topic" schema:"Тип точки обмена"`
	RoutingKey   string `validate:"required" schema:"Ключ маршрутизации"`
}

Binding represents topology binding configuration.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client manages RabbitMQ connections and the lifecycle of consumers and publishers. It supports dynamic configuration updates and is safe for concurrent use.

func New

func New(logger log.Logger) *Client

New creates a new RabbitMQ client instance.

func (*Client) Close

func (c *Client) Close()

Close terminates all connections and stops the client.

func (*Client) DeleteQueues added in v1.66.4

func (c *Client) DeleteQueues(ctx context.Context, queueNames ...string) error

DeleteQueues deletes the specified queues from the broker. If an error occurs while deleting a specific queue, it is logged and processing continues for remaining queues. Returns immediately without error if no queue names are provided.

func (*Client) Healthcheck

func (c *Client) Healthcheck(ctx context.Context) error

Healthcheck verifies the ability to connect to the RabbitMQ broker. Returns an error if the client is not initialized or if the connection fails.

func (*Client) Upgrade

func (c *Client) Upgrade(ctx context.Context, config Config) error

Upgrade updates the client configuration and synchronously initializes the client, ensuring all components (consumers, publishers, declarations) are ready before returning. It blocks until the first successful session is established or an error occurs. Returns the first error encountered during session establishment, or nil on success.

func (*Client) UpgradeAndServe

func (c *Client) UpgradeAndServe(ctx context.Context, config Config)

UpgradeAndServe updates the client configuration similarly to Upgrade, but does not wait for the first successful session. Errors are passed to the Observer for handling with retries.

type Config

type Config struct {
	Url          string
	Publishers   []*publisher.Publisher
	Consumers    []consumer.Consumer
	Declarations topology.Declarations
	NewObserver  NewLogObserverFunc
}

Config represents the client configuration.

func NewConfig

func NewConfig(url string, opts ...ConfigOption) Config

NewConfig creates a new configuration with the specified URL and options.

type ConfigOption

type ConfigOption func(c *Config)

ConfigOption is a function that modifies a Config instance.

func WithConsumers

func WithConsumers(consumers ...consumer.Consumer) ConfigOption

WithConsumers sets the consumers for the configuration.

func WithDeclarations

func WithDeclarations(declarations topology.Declarations) ConfigOption

WithDeclarations sets the topology declarations for the configuration.

func WithLogObserver added in v1.57.0

func WithLogObserver(newObserverFunc NewLogObserverFunc) ConfigOption

WithLogObserver sets a custom log observer factory for the configuration.

func WithPublishers

func WithPublishers(publishers ...*publisher.Publisher) ConfigOption

WithPublishers sets the publishers for the configuration.

type Connection

type Connection struct {
	Host     string `validate:"required" schema:"Хост"`
	Port     int    `validate:"required" schema:"Порт"`
	Username string `schema:"Логин"`
	Password string `schema:"Пароль"`
	Vhost    string `schema:"Виртуальный хост"`
}

Connection represents RabbitMQ connection parameters.

func (Connection) Url

func (c Connection) Url() string

Url generates the connection URL for RabbitMQ.

type Consumer

type Consumer struct {
	Queue              string         `validate:"required" schema:"Наименование очереди"`
	Dlq                bool           `schema:"Создать очередь DLQ"`
	PrefetchCount      int            `schema:"Количество предзагруженных сообщений,по умолчанию - 1"`
	Concurrency        int            `` /* 168-byte string literal not displayed */
	DisableAutoDeclare bool           `` /* 182-byte string literal not displayed */
	Binding            *Binding       `schema:"Настройки топологии"`
	RetryPolicy        *RetryPolicy   `schema:"Политика повторной обработки"`
	QueueArgs          map[string]any `schema:"Аргументы очереди"`
}

Consumer represents consumer configuration.

func (Consumer) DefaultConsumer

func (c Consumer) DefaultConsumer(handler consumer.Handler, restMiddlewares ...consumer.Middleware) consumer.Consumer

DefaultConsumer creates a consumer with the specified handler and default settings. PrefetchCount and Concurrency default to 1 if not set or less than 1. Applies ConsumerRequestId middleware by default.

type LogObserver

type LogObserver struct {
	grmq.NoopObserver
	// contains filtered or unexported fields
}

LogObserver implements grmq.Observer interface for logging RabbitMQ client events.

func NewLogObserver

func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver

NewLogObserver creates a new log observer instance.

func (LogObserver) ClientError

func (l LogObserver) ClientError(err error)

ClientError logs a message about an unexpected client error.

func (LogObserver) ClientReady

func (l LogObserver) ClientReady()

ClientReady logs a message indicating the RabbitMQ client is connected and ready.

func (LogObserver) ConnectionBlocked added in v1.48.0

func (l LogObserver) ConnectionBlocked(reason string)

ConnectionBlocked logs a message about the connection being blocked, including the reason.

func (LogObserver) ConnectionUnblocked added in v1.48.0

func (l LogObserver) ConnectionUnblocked()

ConnectionUnblocked logs a message indicating the connection has been unblocked.

func (LogObserver) ConsumerError

func (l LogObserver) ConsumerError(consumer consumer.Consumer, err error)

ConsumerError logs a message about an unexpected consumer error.

func (LogObserver) PublisherError

func (l LogObserver) PublisherError(publisher *publisher.Publisher, err error)

PublisherError logs a message about an unexpected publisher error.

func (LogObserver) PublisherReconnected added in v1.65.1

func (l LogObserver) PublisherReconnected(publisher *publisher.Publisher)

PublisherReconnected logs a message indicating the publisher has been reconnected.

func (LogObserver) PublishingFlow

func (l LogObserver) PublishingFlow(publisher *publisher.Publisher, flow bool)

PublishingFlow logs a message with information about the publishing flow state.

func (LogObserver) ShutdownDone

func (l LogObserver) ShutdownDone()

ShutdownDone logs a message indicating the shutdown process has completed.

func (LogObserver) ShutdownStarted

func (l LogObserver) ShutdownStarted()

ShutdownStarted logs a message indicating the shutdown process has begun.

type NewLogObserverFunc added in v1.57.1

type NewLogObserverFunc func(ctx context.Context, logger log.Logger) grmq.Observer

NewLogObserverFunc is a factory function for creating custom log observers.

type Publisher

type Publisher struct {
	Exchange   string `schema:"Точка обмена"`
	RoutingKey string `` /* 182-byte string literal not displayed */
}

Publisher represents publisher configuration.

func (Publisher) DefaultPublisher

func (p Publisher) DefaultPublisher(restMiddlewares ...publisher.Middleware) *publisher.Publisher

DefaultPublisher creates a publisher with pre-configured middleware and settings: - Persistent mode enabled - Request ID generation and header injection - Metrics and tracing integration

Optional middleware can be provided: - PublisherLog: logs published messages - PublisherRequestId: generates and injects request IDs (enabled by default) - PublisherRetry: adds retry logic on publication errors - PublisherMetrics: collects metrics (enabled by default)

type PublisherMetricStorage

type PublisherMetricStorage interface {
	ObservePublishDuration(exchange string, routingKey string, t time.Duration)
	ObservePublishMsgSize(exchange string, routingKey string, size int)
	IncPublishError(exchange string, routingKey string)
}

PublisherMetricStorage defines an interface for publisher metrics storage.

type Retrier added in v1.50.0

type Retrier interface {
	Do(ctx context.Context, f func() error) error
}

Retrier defines an interface for retry logic.

type RetryConfig

type RetryConfig struct {
	DelayInMs   int `validate:"required" schema:"Задержка в миллисекундах"`
	MaxAttempts int `validate:"required" schema:"Количество попыток,-1 = бесконечно"`
}

RetryConfig represents retry configuration for message processing.

type RetryPolicy

type RetryPolicy struct {
	FinallyMoveToDlq bool          `` /* 173-byte string literal not displayed */
	Retries          []RetryConfig `schema:"Настройки"`
}

RetryPolicy represents retry policy for message processing.

Directories

Path Synopsis
Package batch_handler provides batch message processing for RabbitMQ consumers.
Package batch_handler provides batch message processing for RabbitMQ consumers.
Package handler provides synchronous message processing for RabbitMQ consumers.
Package handler provides synchronous message processing for RabbitMQ consumers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL