stompx

package
v1.67.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: MIT Imports: 11 Imported by: 0

README

Package stompx

Пакет stompx предоставляет высокоуровневую обёртку над STOMP-протоколом, реализуя удобные инструменты для создания потребителей и издателей сообщений, с поддержкой middleware, логирования, повторных попыток и управления группой потребителей и издателей.

Types

Config

Конфигурация stompx-клиента.

Fields:

Consumers []*consumer.Watcher

Массив потребителей.

Publishers []*publisher.Publisher

Массив издателей.

Methods:

NewConfig(opts ...ConfigOption) Config

Создаёт конфигурацию клиента с указанными опциями.

ConfigOption

Функция, применяющая опции к Config.

Functions:

WithConsumers(consumers ...*consumer.Watcher) ConfigOption

Добавляет клиенты потребителей в конфигурацию клиента.

WithPublishers(publishers ...*publisher.Publisher) ConfigOption

Добавляет клиенты издателей в конфигурацию клиента.

ConsumerConfig

Конфигурация потребителя сообщений.

Fields:

Address string

Адрес брокера (обязательное).

Queue string

Имя очереди (обязательное).

Concurrency int

Количество обработчиков (по умолчанию 1).

PrefetchCount int

Количество предзагруженных сообщений.

Username string

Имя пользователя.

Password string

Пароль.

ConnHeaders map[string]string

Дополнительные заголовки подключения.

PublisherConfig

Конфигурация издателя сообщений.

Fields:

Address string

Адрес брокера (обязательное).

Queue string

Имя очереди (обязательное).

Username string

Имя пользователя.

Password string

Пароль.

ConnHeaders map[string]string

Дополнительные заголовки подключения.

Client

Клиент, включающий в себя группу потребителей и издателей, способный обновлять соединения и перезапускаться при изменении конфигурации.

Methods:

New(logger log.Logger) *Client

Создать новый клиент с логгером.

(g *Client) Upgrade(ctx context.Context, config Config) error

Обновить конфигурацию, синхронно инициализировать клиент с гарантией готовности всех компонентов:

  • Блокировка и ожидание первой успешно установленной сессии.
  • Запуск всех потребителей, инициализация всех издателей.
  • Вернет первую возникшую ошибку во время открытия первой сессии или nil.
(g *Client) UpgradeAndServe(ctx context.Context, config Config) error

Обновить конфигурацию и перезапустить подключения:

  • Останавливает старые соединения,
  • Инициализирует новые потребители/издатели,
  • Запускает обработку сообщений потребителями.
(g *Client) Close() error

Завершает все активные подключения.

LogObserver

Наблюдатель за событиями жизненного цикла потребителя (ошибки, запуск, остановка).

Methods:

NewLogObserver(logger log.Logger) LogObserver

Создаёт наблюдателя за событиями с указанным логером.

Error(c Consumer, err error)

Логирует ошибку потребителя.

CloseStart(c Consumer) / CloseDone(c Consumer)

Логируют процесс остановки.

BeginConsuming(c Consumer)

Логирует начало потребления сообщений.

Functions

DefaultConsumer(cfg ConsumerConfig, handler consumer.Handler, logger log.Logger, restMiddlewares ...consumer.Middleware) consumer.Config

Создаёт конфигурацию потребителя с поддержкой логирования, middleware и подключения по заданным параметрам.

DefaultPublisher(cfg PublisherConfig, restMiddlewares ...publisher.Middleware) *publisher.Publisher

Создаёт издателя сообщений с middleware и настройками подключения.

NewResultHandler(logger log.Logger, adapter handler.HandlerAdapter) handler.ResultHandler

Создаёт обработчик результата с логированием и восстановлением сервиса при панике.

Middleware

PublisherPersistent

Добавляет заголовок persistent=true ко всем исходящим сообщениям.

PublisherLog

Логирует отправку сообщений.

PublisherRequestId

Добавляет Request-Id в заголовки сообщений.

PublisherRetry

Повторяет публикацию при ошибках с использованием заданного Retrier.

ConsumerLog

Логирует входящие сообщения.

ConsumerRequestId

Извлекает или генерирует Request-Id и сохраняет его в контексте запроса.

Usage

Default usage flow
package main

import (
	"context"
	"log"

	"github.com/txix-open/isp-kit/app"
	"github.com/txix-open/isp-kit/shutdown"
	"github.com/txix-open/isp-kit/log"
	"github.com/txix-open/isp-kit/requestid"
	"github.com/txix-open/isp-kit/stompx"
	"github.com/txix-open/isp-kit/stompx/consumer"
	"github.com/txix-open/isp-kit/stompx/publisher"
)

func main() {
	logger := log.New()

	// Создаём обработчик сообщений
	handler := stompx.NewResultHandler(logger, stompx.HandlerFunc(func(ctx context.Context, msg []byte) error {
		logger.Info("message received", log.StringType("body", string(msg)))
		return nil
	}))

	// Конфигурация потребителя
	consumerCfg := stompx.ConsumerConfig{
		Address:  "tcp://localhost:61613",
		Queue:    "/queue/example",
		Username: "admin",
		Password: "admin",
	}

	publisherCfg := stompx.PublisherConfig{
		Address:  "tcp://localhost:61613",
		Queue:    "/queue/example",
		Username: "admin",
		Password: "admin",
	}

	consumerCli := consumer.NewWatcher(stompx.DefaultConsumer(consumerCfg, handler, logger))
	publisherCli := stompx.DefaultPublisher(publisherCfg)

	// Создаём конфигурацию
	config := stompx.NewConfig(
		stompx.WithConsumers(consumerCli),
		stompx.WithPublishers(publisherCli))

	// Создаём клиент
	сli := stompx.NewClient(logger)

	// Обработка завершения приложения
	shutdown.On(func() {
		logger.Info("shutting down...")
		_ = сli.Close()
		logger.Info("shutdown completed")
	})

	// Запускаем клиент
	сli.UpgradeAndServe(context.Background(), config)
	...
}

Documentation

Overview

Package stompx provides a high-level wrapper over the STOMP protocol, implementing convenient tools for creating message consumers and publishers, with support for middleware, logging, retries, and management of consumer and publisher groups.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConsumerLog

func ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware

ConsumerLog logs incoming message consumption.

func ConsumerRequestId

func ConsumerRequestId() consumer.Middleware

ConsumerRequestId extracts or generates a Request-Id and saves it in the request context.

func DefaultConsumer

func DefaultConsumer(cfg ConsumerConfig, handler consumer.Handler, logger log.Logger, restMiddlewares ...consumer.Middleware) consumer.Config

DefaultConsumer creates a consumer configuration with logging, middleware, and connection support based on the provided parameters.

func DefaultPublisher

func DefaultPublisher(cfg PublisherConfig, restMiddlewares ...publisher.Middleware) *publisher.Publisher

DefaultPublisher creates a message publisher with middleware and connection settings.

func NewResultHandler

func NewResultHandler(logger log.Logger, adapter handler.HandlerAdapter) handler.ResultHandler

NewResultHandler creates a result handler with logging and panic recovery.

func PublisherLog

func PublisherLog(logger log.Logger, logBody bool) publisher.Middleware

PublisherLog logs message publishing events.

func PublisherPersistent

func PublisherPersistent() publisher.Middleware

PublisherPersistent adds a `persistent=true` header to all outgoing messages.

func PublisherRequestId

func PublisherRequestId() publisher.Middleware

PublisherRequestId adds a Request-Id to message headers.

func PublisherRetry added in v1.50.0

func PublisherRetry(retrier Retrier) publisher.Middleware

PublisherRetry creates a middleware for retrying message publications. It is recommended to use this middleware after logging middleware to avoid duplicate logging of publication attempts.

Types

type Client added in v1.58.0

type Client struct {
	// contains filtered or unexported fields
}

Client manages a group of consumers and publishers, capable of updating connections and restarting when configuration changes.

func New added in v1.58.0

func New(logger log.Logger) *Client

New creates a new Client with the provided logger.

func (*Client) Close added in v1.58.0

func (c *Client) Close() error

Close terminates all active connections.

func (*Client) Shutdown added in v1.58.0

func (c *Client) Shutdown()

Shutdown gracefully shuts down all consumers and publishers.

func (*Client) Upgrade added in v1.58.0

func (c *Client) Upgrade(ctx context.Context, config Config) error

Upgrade updates the configuration and synchronously initializes the client with a guarantee that all components are ready. It returns the first error encountered during initialization, or nil if successful.

func (*Client) UpgradeAndServe added in v1.58.0

func (c *Client) UpgradeAndServe(ctx context.Context, config Config)

UpgradeAndServe updates the configuration and restarts connections. It stops old connections, initializes new consumers and publishers, and starts message processing. This method blocks and serves indefinitely.

type Config added in v1.58.0

type Config struct {
	// Consumers contains the list of message consumers.
	Consumers []*consumer.Watcher
	// Publishers contains the list of message publishers.
	Publishers []*publisher.Publisher
}

Config represents the configuration for a stompx client.

func NewConfig added in v1.58.0

func NewConfig(opts ...ConfigOption) Config

NewConfig creates a new Config with the provided options.

type ConfigOption added in v1.58.0

type ConfigOption func(c *Config)

ConfigOption is a function that applies options to a Config.

func WithConsumers added in v1.58.0

func WithConsumers(consumers ...*consumer.Watcher) ConfigOption

WithConsumers adds consumer clients to the client configuration.

func WithPublishers added in v1.58.0

func WithPublishers(publishers ...*publisher.Publisher) ConfigOption

WithPublishers adds publisher clients to the client configuration.

type ConsumerConfig

type ConsumerConfig struct {
	// Address is the broker address (required).
	Address string `validate:"required" schema:"Адрес брокера"`
	// Queue is the queue name (required).
	Queue string `validate:"required" schema:"Очередь"`
	// Concurrency is the number of handlers (default 1).
	Concurrency int `schema:"Кол-во обработчиков,по умолчанию 1"`
	// PrefetchCount is the number of preloaded messages.
	PrefetchCount int `schema:"Кол-во предзагруженных сообщений,по умолчанию не используется"`
	// Username is the username for authentication.
	Username string `schema:"Имя пользователя"`
	// Password is the password for authentication.
	Password string `schema:"Пароль"`
	// ConnHeaders are additional connection headers.
	ConnHeaders map[string]string `schema:"Дополнительные параметры подключения"`
}

ConsumerConfig holds the configuration for a message consumer.

type LogObserver

type LogObserver struct {
	// contains filtered or unexported fields
}

LogObserver observes events in the consumer lifecycle (errors, startup, shutdown).

func NewLogObserver

func NewLogObserver(logger log.Logger) LogObserver

NewLogObserver creates an observer for lifecycle events with the specified logger.

func (LogObserver) BeginConsuming

func (l LogObserver) BeginConsuming(c *consumer.Consumer)

BeginConsuming logs the start of message consumption.

func (LogObserver) CloseDone

func (l LogObserver) CloseDone(c *consumer.Consumer)

CloseDone logs the completion of the shutdown process.

func (LogObserver) CloseStart

func (l LogObserver) CloseStart(c *consumer.Consumer)

CloseStart logs the start of the shutdown process.

func (LogObserver) Error

func (l LogObserver) Error(c *consumer.Consumer, err error)

Error logs consumer errors.

type PublisherConfig

type PublisherConfig struct {
	// Address is the broker address (required).
	Address string `validate:"required" schema:"Адрес брокера"`
	// Queue is the queue name (required).
	Queue string `validate:"required" schema:"Очередь"`
	// Username is the username for authentication.
	Username string `schema:"Имя пользователя"`
	// Password is the password for authentication.
	Password string `schema:"Пароль"`
	// ConnHeaders are additional connection headers.
	ConnHeaders map[string]string `schema:"Дополнительные параметры подключения"`
}

PublisherConfig holds the configuration for a message publisher.

type Retrier added in v1.50.0

type Retrier interface {
	Do(ctx context.Context, f func() error) error
}

Retrier defines an interface for retrying operations.

Directories

Path Synopsis
Package consumer provides functionality for consuming messages from a STOMP broker.
Package consumer provides functionality for consuming messages from a STOMP broker.
Package handler provides functionality for processing STOMP messages and handling results.
Package handler provides functionality for processing STOMP messages and handling results.
Package publisher provides functionality for publishing messages to a STOMP broker.
Package publisher provides functionality for publishing messages to a STOMP broker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL