stompx

package
v1.55.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2025 License: MIT Imports: 10 Imported by: 0

README

Package stompx

Пакет stompx предоставляет высокоуровневую обёртку над STOMP-протоколом, реализуя удобные инструменты для создания потребителей и издателей сообщений, с поддержкой middleware, логирования, повторных попыток и управления группой потребителей.

Types

ConsumerConfig

Конфигурация потребителя сообщений.

Fields:

Address string

Адрес брокера (обязательное).

Queue string

Имя очереди (обязательное).

Concurrency int

Количество обработчиков (по умолчанию 1).

PrefetchCount int

Количество предзагруженных сообщений.

Username string

Имя пользователя.

Password string

Пароль.

ConnHeaders map[string]string

Дополнительные заголовки подключения.

PublisherConfig

Конфигурация издателя сообщений.

Fields:

Address string

Адрес брокера (обязательное).

Queue string

Имя очереди (обязательное).

Username string

Имя пользователя.

Password string

Пароль.

ConnHeaders map[string]string

Дополнительные заголовки подключения.

ConsumerGroup

Группа потребителей, способная обновляться и перезапускаться при изменении конфигурации.

Methods:

(g *ConsumerGroup) Upgrade(ctx context.Context, consumers ...Consumer) error

Применяет новую конфигурацию без запуска.

(g *ConsumerGroup) UpgradeAndServe(ctx context.Context, consumers ...Consumer) error

Применяет и запускает новых потребителей.

(g *ConsumerGroup) Close() error

Завершает все активные подключения.

LogObserver

Наблюдатель за событиями жизненного цикла потребителя (ошибки, запуск, остановка).

Methods:

NewLogObserver(logger log.Logger) LogObserver

Создаёт наблюдателя за событиями с указанным логером.

Error(c Consumer, err error)

Логирует ошибку потребителя.

CloseStart(c Consumer) / CloseDone(c Consumer)

Логируют процесс остановки.

BeginConsuming(c Consumer)

Логирует начало потребления сообщений.

Functions

DefaultConsumer(cfg ConsumerConfig, handler consumer.Handler, logger log.Logger, restMiddlewares ...consumer.Middleware) consumer.Config

Создаёт конфигурацию потребителя с поддержкой логирования, middleware и подключения по заданным параметрам.

DefaultPublisher(cfg PublisherConfig, restMiddlewares ...publisher.Middleware) *publisher.Publisher

Создаёт издателя сообщений с middleware и настройками подключения.

NewResultHandler(logger log.Logger, adapter handler.HandlerAdapter) handler.ResultHandler

Создаёт обработчик результата с логированием и восстановлением сервиса при панике.

Middleware

PublisherPersistent

Добавляет заголовок persistent=true ко всем исходящим сообщениям.

PublisherLog

Логирует отправку сообщений.

PublisherRequestId

Добавляет Request-Id в заголовки сообщений.

PublisherRetry

Повторяет публикацию при ошибках с использованием заданного Retrier.

ConsumerLog

Логирует входящие сообщения.

ConsumerRequestId

Извлекает или генерирует Request-Id и сохраняет его в контексте запроса.

Usage

Default usage flow
package main

import (
	"context"
	"log"

	"github.com/txix-open/isp-kit/app"
	"github.com/txix-open/isp-kit/shutdown"
	"github.com/txix-open/isp-kit/log"
	"github.com/txix-open/isp-kit/requestid"
	"github.com/txix-open/isp-kit/stompx"
)

func main() {
	logger := log.New()

	// Создаём обработчик сообщений
	handler := stompx.NewResultHandler(logger, stompx.HandlerFunc(func(ctx context.Context, msg []byte) error {
		logger.Info("message received", log.StringType("body", string(msg)))
		return nil
	}))

	// Конфигурация потребителя
	consumerCfg := stompx.ConsumerConfig{
		Address:  "tcp://localhost:61613",
		Queue:    "/queue/example",
		Username: "admin",
		Password: "admin",
	}

	consumer, err := stompx.DefaultConsumer(consumerCfg, handler, logger)
	if err != nil {
		log.Fatal(err)
	}

	// Создаём группу потребителей
	group := stompx.NewConsumerGroup(logger)

	// Обработка завершения приложения
	shutdown.On(func() {
		logger.Info("shutting down...")
		_ = group.Close()
		logger.Info("shutdown completed")
	})

	// Запускаем потребителей
	err = group.UpgradeAndServe(context.Background(), consumer)
	if err != nil {
		logger.Fatal("failed to start consumer group", log.String("error", err.Error()))
	}
	...
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConsumerLog

func ConsumerLog(logger log.Logger, logBody bool) consumer.Middleware

func ConsumerRequestId

func ConsumerRequestId() consumer.Middleware

func DefaultConsumer

func DefaultConsumer(cfg ConsumerConfig, handler consumer.Handler, logger log.Logger, restMiddlewares ...consumer.Middleware) consumer.Config

func DefaultPublisher

func DefaultPublisher(cfg PublisherConfig, restMiddlewares ...publisher.Middleware) *publisher.Publisher

func NewResultHandler

func NewResultHandler(logger log.Logger, adapter handler.HandlerAdapter) handler.ResultHandler

func PublisherLog

func PublisherLog(logger log.Logger, logBody bool) publisher.Middleware

func PublisherPersistent

func PublisherPersistent() publisher.Middleware

func PublisherRequestId

func PublisherRequestId() publisher.Middleware

func PublisherRetry added in v1.50.0

func PublisherRetry(retrier Retrier) publisher.Middleware

PublisherRetry creates a middleware for retrying message publications. It is recommended to use this middleware after logging, to avoid duplicate logging of publication attempts.

Types

type ConsumerConfig

type ConsumerConfig struct {
	Address       string            `validate:"required" schema:"Адрес брокера"`
	Queue         string            `validate:"required" schema:"Очередь"`
	Concurrency   int               `schema:"Кол-во обработчиков,по умолчанию 1"`
	PrefetchCount int               `schema:"Кол-во предзагруженных сообщений,по умолчанию не используется"`
	Username      string            `schema:"Имя пользователя"`
	Password      string            `schema:"Пароль"`
	ConnHeaders   map[string]string `schema:"Дополнительные параметры подключения"`
}

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

func NewConsumerGroup

func NewConsumerGroup(logger log.Logger) *ConsumerGroup

func (*ConsumerGroup) Close

func (g *ConsumerGroup) Close() error

func (*ConsumerGroup) Upgrade

func (g *ConsumerGroup) Upgrade(ctx context.Context, consumers ...consumer.Config) error

func (*ConsumerGroup) UpgradeAndServe added in v1.41.0

func (g *ConsumerGroup) UpgradeAndServe(ctx context.Context, consumers ...consumer.Config)

type LogObserver

type LogObserver struct {
	// contains filtered or unexported fields
}

func NewLogObserver

func NewLogObserver(logger log.Logger) LogObserver

func (LogObserver) BeginConsuming

func (l LogObserver) BeginConsuming(c *consumer.Consumer)

func (LogObserver) CloseDone

func (l LogObserver) CloseDone(c *consumer.Consumer)

func (LogObserver) CloseStart

func (l LogObserver) CloseStart(c *consumer.Consumer)

func (LogObserver) Error

func (l LogObserver) Error(c *consumer.Consumer, err error)

type PublisherConfig

type PublisherConfig struct {
	Address     string            `validate:"required" schema:"Адрес брокера"`
	Queue       string            `validate:"required" schema:"Очередь"`
	Username    string            `schema:"Имя пользователя"`
	Password    string            `schema:"Пароль"`
	ConnHeaders map[string]string `schema:"Дополнительные параметры подключения"`
}

type Retrier added in v1.50.0

type Retrier interface {
	Do(ctx context.Context, f func() error) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL