stompx

package
v1.64.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: MIT Imports: 11 Imported by: 0

README

Package stompx

Пакет stompx предоставляет высокоуровневую обёртку над STOMP-протоколом, реализуя удобные инструменты для создания потребителей и издателей сообщений, с поддержкой middleware, логирования, повторных попыток и управления группой потребителей и издателей.

Types

Config

Конфигурация stompx-клиента.

Fields:

Consumers []*consumer.Watcher

Массив потребителей.

Publishers []*publisher.Publisher

Массив издателей.

Methods:

NewConfig(opts ...ConfigOption) Config

Создаёт конфигурацию клиента с указанными опциями.

ConfigOption

Функция, применяющая опции к Config.

Functions:

WithConsumers(consumers ...*consumer.Watcher) ConfigOption

Добавляет клиенты потребителей в конфигурацию клиента.

WithPublishers(publishers ...*publisher.Publisher) ConfigOption

Добавляет клиенты издателей в конфигурацию клиента.

ConsumerConfig

Конфигурация потребителя сообщений.

Fields:

Address string

Адрес брокера (обязательное).

Queue string

Имя очереди (обязательное).

Concurrency int

Количество обработчиков (по умолчанию 1).

PrefetchCount int

Количество предзагруженных сообщений.

Username string

Имя пользователя.

Password string

Пароль.

ConnHeaders map[string]string

Дополнительные заголовки подключения.

PublisherConfig

Конфигурация издателя сообщений.

Fields:

Address string

Адрес брокера (обязательное).

Queue string

Имя очереди (обязательное).

Username string

Имя пользователя.

Password string

Пароль.

ConnHeaders map[string]string

Дополнительные заголовки подключения.

Client

Клиент, включающий в себя группу потребителей и издателей, способный обновлять соединения и перезапускаться при изменении конфигурации.

Methods:

New(logger log.Logger) *Client

Создать новый клиент с логгером.

(g *Client) Upgrade(ctx context.Context, config Config) error

Обновить конфигурацию, синхронно инициализировать клиент с гарантией готовности всех компонентов:

  • Блокировка и ожидание первой успешно установленной сессии.
  • Запуск всех потребителей, инициализация всех издателей.
  • Вернет первую возникшую ошибку во время открытия первой сессии или nil.
(g *Client) UpgradeAndServe(ctx context.Context, config Config) error

Обновить конфигурацию и перезапустить подключения:

  • Останавливает старые соединения,
  • Инициализирует новые потребители/издатели,
  • Запускает обработку сообщений потребителями.
(g *Client) Close() error

Завершает все активные подключения.

LogObserver

Наблюдатель за событиями жизненного цикла потребителя (ошибки, запуск, остановка).

Methods:

NewLogObserver(logger log.Logger) LogObserver

Создаёт наблюдателя за событиями с указанным логером.

Error(c Consumer, err error)

Логирует ошибку потребителя.

CloseStart(c Consumer) / CloseDone(c Consumer)

Логируют процесс остановки.

BeginConsuming(c Consumer)

Логирует начало потребления сообщений.

Functions

DefaultConsumer(cfg ConsumerConfig, handler consumer.Handler, logger log.Logger, restMiddlewares ...consumer.Middleware) consumer.Config

Создаёт конфигурацию потребителя с поддержкой логирования, middleware и подключения по заданным параметрам.

DefaultPublisher(cfg PublisherConfig, restMiddlewares ...publisher.Middleware) *publisher.Publisher

Создаёт издателя сообщений с middleware и настройками подключения.

NewResultHandler(logger log.Logger, adapter handler.HandlerAdapter) handler.ResultHandler

Создаёт обработчик результата с логированием и восстановлением сервиса при панике.

Middleware

PublisherPersistent

Добавляет заголовок persistent=true ко всем исходящим сообщениям.

PublisherLog

Логирует отправку сообщений.

PublisherRequestId

Добавляет Request-Id в заголовки сообщений.

PublisherRetry

Повторяет публикацию при ошибках с использованием заданного Retrier.

ConsumerLog

Логирует входящие сообщения.

ConsumerRequestId

Извлекает или генерирует Request-Id и сохраняет его в контексте запроса.

Usage

Default usage flow
package main

import (
	"context"
	"log"

	"github.com/txix-open/isp-kit/app"
	"github.com/txix-open/isp-kit/shutdown"
	"github.com/txix-open/isp-kit/log"
	"github.com/txix-open/isp-kit/requestid"
	"github.com/txix-open/isp-kit/stompx"
	"github.com/txix-open/isp-kit/stompx/consumer"
	"github.com/txix-open/isp-kit/stompx/publisher"
)

func main() {
	logger := log.New()

	// Создаём обработчик сообщений
	handler := stompx.NewResultHandler(logger, stompx.HandlerFunc(func(ctx context.Context, msg []byte) error {
		logger.Info("message received", log.StringType("body", string(msg)))
		return nil
	}))

	// Конфигурация потребителя
	consumerCfg := stompx.ConsumerConfig{
		Address:  "tcp://localhost:61613",
		Queue:    "/queue/example",
		Username: "admin",
		Password: "admin",
	}

	publisherCfg := stompx.PublisherConfig{
		Address:  "tcp://localhost:61613",
		Queue:    "/queue/example",
		Username: "admin",
		Password: "admin",
	}

	consumerCli := consumer.NewWatcher(stompx.DefaultConsumer(consumerCfg, handler, logger))
	publisherCli := stompx.DefaultPublisher(publisherCfg)

	// Создаём конфигурацию
	config := stompx.NewConfig(
		stompx.WithConsumers(consumerCli),
		stompx.WithPublishers(publisherCli))

	// Создаём клиент
	сli := stompx.NewClient(logger)

	// Обработка завершения приложения
	shutdown.On(func() {
		logger.Info("shutting down...")
		_ = сli.Close()
		logger.Info("shutdown completed")
	})

	// Запускаем клиент
	сli.UpgradeAndServe(context.Background(), config)
	...
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConsumerLog

func ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware

func ConsumerRequestId

func ConsumerRequestId() consumer.Middleware

func DefaultConsumer

func DefaultConsumer(cfg ConsumerConfig, handler consumer.Handler, logger log.Logger, restMiddlewares ...consumer.Middleware) consumer.Config

func DefaultPublisher

func DefaultPublisher(cfg PublisherConfig, restMiddlewares ...publisher.Middleware) *publisher.Publisher

func NewResultHandler

func NewResultHandler(logger log.Logger, adapter handler.HandlerAdapter) handler.ResultHandler

func PublisherLog

func PublisherLog(logger log.Logger, logBody bool) publisher.Middleware

func PublisherPersistent

func PublisherPersistent() publisher.Middleware

func PublisherRequestId

func PublisherRequestId() publisher.Middleware

func PublisherRetry added in v1.50.0

func PublisherRetry(retrier Retrier) publisher.Middleware

PublisherRetry creates a middleware for retrying message publications. It is recommended to use this middleware after logging, to avoid duplicate logging of publication attempts.

Types

type Client added in v1.58.0

type Client struct {
	// contains filtered or unexported fields
}

func New added in v1.58.0

func New(logger log.Logger) *Client

func (*Client) Close added in v1.58.0

func (c *Client) Close() error

func (*Client) Shutdown added in v1.58.0

func (c *Client) Shutdown()

func (*Client) Upgrade added in v1.58.0

func (c *Client) Upgrade(ctx context.Context, config Config) error

func (*Client) UpgradeAndServe added in v1.58.0

func (c *Client) UpgradeAndServe(ctx context.Context, config Config)

type Config added in v1.58.0

type Config struct {
	Consumers  []*consumer.Watcher
	Publishers []*publisher.Publisher
}

func NewConfig added in v1.58.0

func NewConfig(opts ...ConfigOption) Config

type ConfigOption added in v1.58.0

type ConfigOption func(c *Config)

func WithConsumers added in v1.58.0

func WithConsumers(consumers ...*consumer.Watcher) ConfigOption

func WithPublishers added in v1.58.0

func WithPublishers(publishers ...*publisher.Publisher) ConfigOption

type ConsumerConfig

type ConsumerConfig struct {
	Address       string            `validate:"required" schema:"Адрес брокера"`
	Queue         string            `validate:"required" schema:"Очередь"`
	Concurrency   int               `schema:"Кол-во обработчиков,по умолчанию 1"`
	PrefetchCount int               `schema:"Кол-во предзагруженных сообщений,по умолчанию не используется"`
	Username      string            `schema:"Имя пользователя"`
	Password      string            `schema:"Пароль"`
	ConnHeaders   map[string]string `schema:"Дополнительные параметры подключения"`
}

type LogObserver

type LogObserver struct {
	// contains filtered or unexported fields
}

func NewLogObserver

func NewLogObserver(logger log.Logger) LogObserver

func (LogObserver) BeginConsuming

func (l LogObserver) BeginConsuming(c *consumer.Consumer)

func (LogObserver) CloseDone

func (l LogObserver) CloseDone(c *consumer.Consumer)

func (LogObserver) CloseStart

func (l LogObserver) CloseStart(c *consumer.Consumer)

func (LogObserver) Error

func (l LogObserver) Error(c *consumer.Consumer, err error)

type PublisherConfig

type PublisherConfig struct {
	Address     string            `validate:"required" schema:"Адрес брокера"`
	Queue       string            `validate:"required" schema:"Очередь"`
	Username    string            `schema:"Имя пользователя"`
	Password    string            `schema:"Пароль"`
	ConnHeaders map[string]string `schema:"Дополнительные параметры подключения"`
}

type Retrier added in v1.50.0

type Retrier interface {
	Do(ctx context.Context, f func() error) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL