Documentation
¶
Overview ¶
Package grmqx provides a high-level wrapper for the RabbitMQ client, offering automatic topology declaration, integration with metrics and tracing, flexible retry policies, and contextual logging.
Index ¶
- Constants
- Variables
- func ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware
- func ConsumerRequestId() consumer.Middleware
- func JoinDeclarations(declarations ...topology.Declarations) topology.Declarations
- func NewResultBatchHandler(logger log.Logger, adapter batch_handler.SyncHandlerAdapter) batch_handler.Sync
- func NewResultHandler(logger log.Logger, adapter handler.SyncHandlerAdapter) handler.Sync
- func PublisherLog(logger log.Logger, logBody bool) publisher.Middleware
- func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware
- func PublisherRequestId() publisher.Middleware
- func PublisherRetry(retrier Retrier) publisher.Middleware
- func TopologyFromConsumers(consumers ...Consumer) topology.Declarations
- type BatchConsumer
- type Binding
- type Client
- func (c *Client) Close()
- func (c *Client) DeleteQueues(ctx context.Context, queueNames ...string) error
- func (c *Client) DeleteQueuesWithInspect(ctx context.Context, queueNames ...string) (map[string]error, error)
- func (c *Client) Healthcheck(ctx context.Context) error
- func (c *Client) QueueInspect(name string) (amqp091.Queue, error)
- func (c *Client) Upgrade(ctx context.Context, config Config) error
- func (c *Client) UpgradeAndServe(ctx context.Context, config Config)
- type Config
- type ConfigOption
- type Connection
- type Consumer
- type LogObserver
- func (l LogObserver) ClientError(err error)
- func (l LogObserver) ClientReady()
- func (l LogObserver) ConnectionBlocked(reason string)
- func (l LogObserver) ConnectionUnblocked()
- func (l LogObserver) ConsumerError(consumer consumer.Consumer, err error)
- func (l LogObserver) PublisherError(publisher *publisher.Publisher, err error)
- func (l LogObserver) PublisherReconnected(publisher *publisher.Publisher)
- func (l LogObserver) PublishingFlow(publisher *publisher.Publisher, flow bool)
- func (l LogObserver) ShutdownDone()
- func (l LogObserver) ShutdownStarted()
- type NewLogObserverFunc
- type Publisher
- type PublisherMetricStorage
- type Retrier
- type RetryConfig
- type RetryPolicy
Constants ¶
const ( // DefaultHeartbeat specifies the default heartbeat interval for RabbitMQ connections. DefaultHeartbeat = 3 * time.Second // DefaultDialTimeout specifies the default timeout for establishing connections. DefaultDialTimeout = 5 * time.Second )
Variables ¶
var ( // ErrNotExistQueue returned when trying to operate a queue that doesn't exist. ErrNotExistQueue = errors.New("queue does not exist") )
Functions ¶
func ConsumerLog ¶
func ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware
ConsumerLog creates a consumer middleware that logs consumed messages. When logBody is true, the message body is included in the log output.
func ConsumerRequestId ¶
func ConsumerRequestId() consumer.Middleware
ConsumerRequestId creates a consumer middleware that extracts request IDs from message headers and propagates them in the context. If no request ID is found, a new one is generated. It also adds the request ID to the context logger.
func JoinDeclarations ¶
func JoinDeclarations(declarations ...topology.Declarations) topology.Declarations
JoinDeclarations merges multiple topology declarations into one.
func NewResultBatchHandler ¶ added in v1.59.0
func NewResultBatchHandler(logger log.Logger, adapter batch_handler.SyncHandlerAdapter) batch_handler.Sync
NewResultBatchHandler creates a ready-to-use synchronous batch RabbitMQ message handler with pre-configured tools for logging, metrics collection, and panic recovery.
func NewResultHandler ¶
NewResultHandler creates a ready-to-use synchronous RabbitMQ message handler with pre-configured tools for logging, metrics collection, tracing, and panic recovery.
func PublisherLog ¶
func PublisherLog(logger log.Logger, logBody bool) publisher.Middleware
PublisherLog creates a publisher middleware that logs published messages. When logBody is true, the message body is included in the log output.
func PublisherMetrics ¶
func PublisherMetrics(storage PublisherMetricStorage) publisher.Middleware
PublisherMetrics creates a middleware that collects publisher metrics including message size, publication duration, and error counts.
func PublisherRequestId ¶
func PublisherRequestId() publisher.Middleware
PublisherRequestId creates a publisher middleware that generates and injects request IDs into message headers. If a request ID already exists in the context, it is used; otherwise, a new one is generated.
func PublisherRetry ¶ added in v1.50.0
func PublisherRetry(retrier Retrier) publisher.Middleware
PublisherRetry creates a middleware for retrying message publications. It is recommended to use this middleware after logging middleware to avoid duplicate logging of publication attempts.
func TopologyFromConsumers ¶
func TopologyFromConsumers(consumers ...Consumer) topology.Declarations
TopologyFromConsumers generates RabbitMQ topology declarations based on consumer configurations.
Types ¶
type BatchConsumer ¶
type BatchConsumer struct {
Queue string `validate:"required" schema:"Наименование очереди"`
Dlq bool `schema:"Создать очередь DLQ"`
BatchSize int `validate:"required" schema:"Количество сообщений в пачке"`
PurgeIntervalInMs int `validate:"required" schema:"Интервал обработки"`
DisableAutoDeclare bool `` /* 182-byte string literal not displayed */
Binding *Binding `schema:"Настройки топологии"`
RetryPolicy *RetryPolicy `schema:"Политика повторной обработки"`
QueueArgs map[string]any `schema:"Аргументы очереди"`
}
BatchConsumer represents batch consumer configuration.
func (BatchConsumer) ConsumerConfig ¶
func (b BatchConsumer) ConsumerConfig() Consumer
ConsumerConfig converts BatchConsumer to a standard Consumer configuration. Fixes Concurrency to 1 and inherits all other parameters.
func (BatchConsumer) DefaultConsumer ¶
func (b BatchConsumer) DefaultConsumer(handler batch_handler.SyncHandlerAdapter, restMiddlewares ...consumer.Middleware) consumer.Consumer
DefaultConsumer creates a batch consumer with batch message processing. The handler must implement batch_handler.SyncHandlerAdapter or be convertible to batch_handler.SyncHandlerAdapterFunc if it is a function-based handler.
type Binding ¶
type Binding struct {
Exchange string `validate:"required" schema:"Точка обмена"`
ExchangeType string `validate:"required,oneof=direct fanout topic" schema:"Тип точки обмена"`
RoutingKey string `validate:"required" schema:"Ключ маршрутизации"`
}
Binding represents topology binding configuration.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client manages RabbitMQ connections and the lifecycle of consumers and publishers. It supports dynamic configuration updates and is safe for concurrent use.
func (*Client) Close ¶
func (c *Client) Close()
Close terminates all connections and stops the client.
func (*Client) DeleteQueues ¶ added in v1.66.4
DeleteQueues deletes the specified queues from the broker. If an error occurs while deleting a specific queue, it is logged and processing continues for remaining queues. Returns immediately without error if no queue names are provided. The queue list should not be empty.
func (*Client) DeleteQueuesWithInspect ¶ added in v1.67.1
func (c *Client) DeleteQueuesWithInspect(ctx context.Context, queueNames ...string) (map[string]error, error)
DeleteQueuesWithInspect Delete queues by the list of names `queueNames'. Queues are inspected before they are deleted, and those with messages and/or connections will not be deleted. Returns the queue name card - error. If the queue was deleted without errors, the error value will be `nil` The queue list should not be empty.
func (*Client) Healthcheck ¶
Healthcheck verifies the ability to connect to the RabbitMQ broker. Returns an error if the client is not initialized or if the connection fails.
func (*Client) QueueInspect ¶ added in v1.67.1
QueueInspect To get information about a queue named `name'. Provides data on the number of messages in the queue and connections. The queue name must not be empty.
func (*Client) Upgrade ¶
Upgrade updates the client configuration and synchronously initializes the client, ensuring all components (consumers, publishers, declarations) are ready before returning. It blocks until the first successful session is established or an error occurs. Returns the first error encountered during session establishment, or nil on success.
type Config ¶
type Config struct {
Url string
Publishers []*publisher.Publisher
Consumers []consumer.Consumer
Declarations topology.Declarations
NewObserver NewLogObserverFunc
}
Config represents the client configuration.
func NewConfig ¶
func NewConfig(url string, opts ...ConfigOption) Config
NewConfig creates a new configuration with the specified URL and options.
type ConfigOption ¶
type ConfigOption func(c *Config)
ConfigOption is a function that modifies a Config instance.
func WithConsumers ¶
func WithConsumers(consumers ...consumer.Consumer) ConfigOption
WithConsumers sets the consumers for the configuration.
func WithDeclarations ¶
func WithDeclarations(declarations topology.Declarations) ConfigOption
WithDeclarations sets the topology declarations for the configuration.
func WithLogObserver ¶ added in v1.57.0
func WithLogObserver(newObserverFunc NewLogObserverFunc) ConfigOption
WithLogObserver sets a custom log observer factory for the configuration.
func WithPublishers ¶
func WithPublishers(publishers ...*publisher.Publisher) ConfigOption
WithPublishers sets the publishers for the configuration.
type Connection ¶
type Connection struct {
Host string `validate:"required" schema:"Хост"`
Port int `validate:"required" schema:"Порт"`
Username string `schema:"Логин"`
Password string `schema:"Пароль"`
Vhost string `schema:"Виртуальный хост"`
}
Connection represents RabbitMQ connection parameters.
func (Connection) Url ¶
func (c Connection) Url() string
Url generates the connection URL for RabbitMQ.
type Consumer ¶
type Consumer struct {
Queue string `validate:"required" schema:"Наименование очереди"`
Dlq bool `schema:"Создать очередь DLQ"`
PrefetchCount int `schema:"Количество предзагруженных сообщений,по умолчанию - 1"`
Concurrency int `` /* 168-byte string literal not displayed */
DisableAutoDeclare bool `` /* 182-byte string literal not displayed */
Binding *Binding `schema:"Настройки топологии"`
RetryPolicy *RetryPolicy `schema:"Политика повторной обработки"`
QueueArgs map[string]any `schema:"Аргументы очереди"`
}
Consumer represents consumer configuration.
func (Consumer) DefaultConsumer ¶
func (c Consumer) DefaultConsumer(handler consumer.Handler, restMiddlewares ...consumer.Middleware) consumer.Consumer
DefaultConsumer creates a consumer with the specified handler and default settings. PrefetchCount and Concurrency default to 1 if not set or less than 1. Applies ConsumerRequestId middleware by default.
type LogObserver ¶
type LogObserver struct {
grmq.NoopObserver
// contains filtered or unexported fields
}
LogObserver implements grmq.Observer interface for logging RabbitMQ client events.
func NewLogObserver ¶
func NewLogObserver(ctx context.Context, logger log.Logger) LogObserver
NewLogObserver creates a new log observer instance.
func (LogObserver) ClientError ¶
func (l LogObserver) ClientError(err error)
ClientError logs a message about an unexpected client error.
func (LogObserver) ClientReady ¶
func (l LogObserver) ClientReady()
ClientReady logs a message indicating the RabbitMQ client is connected and ready.
func (LogObserver) ConnectionBlocked ¶ added in v1.48.0
func (l LogObserver) ConnectionBlocked(reason string)
ConnectionBlocked logs a message about the connection being blocked, including the reason.
func (LogObserver) ConnectionUnblocked ¶ added in v1.48.0
func (l LogObserver) ConnectionUnblocked()
ConnectionUnblocked logs a message indicating the connection has been unblocked.
func (LogObserver) ConsumerError ¶
func (l LogObserver) ConsumerError(consumer consumer.Consumer, err error)
ConsumerError logs a message about an unexpected consumer error.
func (LogObserver) PublisherError ¶
func (l LogObserver) PublisherError(publisher *publisher.Publisher, err error)
PublisherError logs a message about an unexpected publisher error.
func (LogObserver) PublisherReconnected ¶ added in v1.65.1
func (l LogObserver) PublisherReconnected(publisher *publisher.Publisher)
PublisherReconnected logs a message indicating the publisher has been reconnected.
func (LogObserver) PublishingFlow ¶
func (l LogObserver) PublishingFlow(publisher *publisher.Publisher, flow bool)
PublishingFlow logs a message with information about the publishing flow state.
func (LogObserver) ShutdownDone ¶
func (l LogObserver) ShutdownDone()
ShutdownDone logs a message indicating the shutdown process has completed.
func (LogObserver) ShutdownStarted ¶
func (l LogObserver) ShutdownStarted()
ShutdownStarted logs a message indicating the shutdown process has begun.
type NewLogObserverFunc ¶ added in v1.57.1
NewLogObserverFunc is a factory function for creating custom log observers.
type Publisher ¶
type Publisher struct {
Exchange string `schema:"Точка обмена"`
RoutingKey string `` /* 182-byte string literal not displayed */
}
Publisher represents publisher configuration.
func (Publisher) DefaultPublisher ¶
func (p Publisher) DefaultPublisher(restMiddlewares ...publisher.Middleware) *publisher.Publisher
DefaultPublisher creates a publisher with pre-configured middleware and settings: - Persistent mode enabled - Request ID generation and header injection - Metrics and tracing integration
Optional middleware can be provided: - PublisherLog: logs published messages - PublisherRequestId: generates and injects request IDs (enabled by default) - PublisherRetry: adds retry logic on publication errors - PublisherMetrics: collects metrics (enabled by default)
type PublisherMetricStorage ¶
type PublisherMetricStorage interface {
ObservePublishDuration(exchange string, routingKey string, t time.Duration)
ObservePublishMsgSize(exchange string, routingKey string, size int)
IncPublishError(exchange string, routingKey string)
}
PublisherMetricStorage defines an interface for publisher metrics storage.
type RetryConfig ¶
type RetryConfig struct {
DelayInMs int `validate:"required" schema:"Задержка в миллисекундах"`
MaxAttempts int `validate:"required" schema:"Количество попыток,-1 = бесконечно"`
}
RetryConfig represents retry configuration for message processing.
type RetryPolicy ¶
type RetryPolicy struct {
FinallyMoveToDlq bool `` /* 173-byte string literal not displayed */
Retries []RetryConfig `schema:"Настройки"`
}
RetryPolicy represents retry policy for message processing.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package batch_handler provides batch message processing for RabbitMQ consumers.
|
Package batch_handler provides batch message processing for RabbitMQ consumers. |
|
Package handler provides synchronous message processing for RabbitMQ consumers.
|
Package handler provides synchronous message processing for RabbitMQ consumers. |