consumer

package
v1.67.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: MIT Imports: 8 Imported by: 0

README

Package consumer

Пакет consumer предназначен для организации надёжного потребления сообщений из очереди через протокол STOMP с автоматическим переподключением, поддержкой конкурентной обработки и отслеживанием состояния.

Types

Config

Структура Config содержит конфигурацию подключения и работы потребителя: адрес сервера, очередь, параметры подключения и подписки, конкуренция, middlewares, обработчик сообщений и наблюдатель событий.

Fields:

Address string

Адрес брокера (обязательное).

Queue string

Имя очереди (обязательное).

ConnOpts []ConnOption

Параметры подключения

Concurrency int

Количество обработчиков (по умолчанию 1).

Middlewares []Middleware

Список мидлвар.

SubscriptionOpts []SubscriptionOption

Параметры подписки.

Observer Observer

Наблюдатель за событиями жизненного цикла потребителя (ошибки, запуск, остановка). Реализация интерфейса Observer.

Consumer

Структура Consumer инкапсулирует логику подключения к STOMP-серверу, подписки на очередь и конкурентной обработки поступающих сообщений с подтверждениями (ack/nack).

Methods:

(c *Consumer) Run() error

Запускает обработку сообщений с учётом заданной конкуренции. Блокирующая операция, возвращает ошибку при неустранимой ошибке подключения или подписки.

(c *Consumer) Close() error

Выполняет корректное завершение работы, включая отписку от очереди, ожидание завершения обработки сообщений и отключение от сервера.

Delivery

Структура Delivery представляет собой сообщение, доставленное из очереди, с возможностью подтверждения успешной обработки (Ack) или отказа (Nack).

Methods:

(d *Delivery) Ack() error

Подтверждает успешную обработку сообщения.

(d *Delivery) Nack() error

Отмечает сообщение как неуспешно обработанное.

Observer

Интерфейс Observer для слежения за ЖЦ потребителя.

Methods:

Error(c *Consumer, err error)

Уведомление о возникшей ошибке.

CloseStart(c *Consumer)

Уведомление о начале закрытия потребителя.

CloseDone(c *Consumer)

Уведомление о завершении закрытия потребителя.

BeginConsuming(c *Consumer)

Уведомление о начале работы потребителя.

Handler

Интерфейс Handler описывает обработчик сообщений из очереди.

Middleware

Тип Middleware — функция, оборачивающая обработчик для расширения функциональности (логирование, ретрай, и т.п.).

Watcher

Структура Watcher реализует высокоуровневый наблюдатель за процессом потребления сообщений. Отвечает за управление жизненным циклом потребителя, автоматический повтор подключения и обработку ошибок.

Methods:

(w *Watcher) Run(ctx context.Context) error

Запускает процесс наблюдения и потребления сообщений с ожиданием первой сессии. Блокирующая операция, возвращает ошибку при неудачном первом подключении или завершении контекста.

(w *Watcher) Serve(ctx context.Context)

Запускает процесс наблюдения в отдельной горутине. Не блокирует вызывающий поток.

(w *Watcher) Shutdown()

Выполняет корректное завершение работы Watcher, останавливая внутренние горутины.

(w *Watcher) Healthcheck(ctx context.Context) error

Проверить работоспособность соединения потребителя. Возвращает ошибку при проблемах с подключением.

Usage

Default usage flow
package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/txix-open/isp-kit/consumer"
)

func main() {
	handler := consumer.HandlerFunc(func(delivery consumer.Delivery) error {
		log.Printf("received message: %s", delivery.Body())
		// Обработка сообщения...
		return delivery.Ack()
	})

	cfg := consumer.NewConfig(
		"tcp://localhost:61613",
		"/queue/example",
		handler,
		consumer.WithConcurrency(5),
	)

	watcher := consumer.NewWatcher(cfg)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func() {
		if err := watcher.Run(ctx); err != nil {
			log.Fatalf("consumer watcher error: %v", err)
		}
	}()

	// Ожидаем системных сигналов для завершения
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh

	log.Println("shutting down consumer...")
	watcher.Shutdown()
	log.Println("consumer stopped")
}

Documentation

Overview

Package consumer provides functionality for consuming messages from a STOMP broker.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrDeliveryAlreadyHandled = errors.New("delivery already handled")
)

ErrDeliveryAlreadyHandled is returned when attempting to acknowledge a delivery that has already been processed.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Address is the broker address.
	Address string
	// Queue is the queue name to consume from.
	Queue string
	// ConnOpts are connection options.
	ConnOpts []ConnOption
	// Concurrency is the number of concurrent workers.
	Concurrency int
	// Middlewares are middleware functions applied to the handler.
	Middlewares []Middleware
	// SubscriptionOpts are subscription options.
	SubscriptionOpts []SubscriptionOption
	// Observer is the lifecycle observer.
	Observer Observer
	// ReconnectTimeout is the delay before reconnecting.
	ReconnectTimeout time.Duration
	// contains filtered or unexported fields
}

Config holds the configuration for a message consumer.

func NewConfig

func NewConfig(address string, queue string, handler Handler, opts ...Option) Config

NewConfig creates a new consumer configuration with the provided options.

func (Config) String

func (c Config) String() string

String returns a string representation of the configuration.

type ConnOption

type ConnOption = func(*stomp.Conn) error

ConnOption is a function type for configuring STOMP connections.

type Consumer

type Consumer struct {
	Config
	// contains filtered or unexported fields
}

Consumer manages a connection to a STOMP broker and processes messages from a queue.

func New

func New(config Config) (*Consumer, error)

New creates a new Consumer with the provided configuration.

func (*Consumer) Close

func (c *Consumer) Close() error

Close gracefully shuts down the consumer, disconnecting from the broker.

func (*Consumer) Run

func (c *Consumer) Run() error

Run starts the consumer and begins processing messages. It blocks until an error occurs or the consumer is closed.

type Delivery

type Delivery struct {
	// contains filtered or unexported fields
}

Delivery represents a message delivery that can be acknowledged or negatively acknowledged.

func NewDelivery

func NewDelivery(donner Donner, conn *stomp.Conn, source *stomp.Message) *Delivery

NewDelivery creates a new delivery instance.

func (*Delivery) Ack

func (d *Delivery) Ack() error

Ack acknowledges the message delivery. Returns ErrDeliveryAlreadyHandled if the delivery has already been processed.

func (*Delivery) Nack

func (d *Delivery) Nack() error

Nack negatively acknowledges the message delivery, causing it to be requeued. Returns ErrDeliveryAlreadyHandled if the delivery has already been processed.

func (*Delivery) Source

func (d *Delivery) Source() *stomp.Message

Source returns the underlying STOMP message.

type Donner

type Donner interface {
	Done()
}

Donner defines an interface for signaling completion.

type Handler

type Handler interface {
	Handle(ctx context.Context, delivery *Delivery)
}

Handler defines the interface for processing message deliveries.

type HandlerFunc

type HandlerFunc func(ctx context.Context, delivery *Delivery)

HandlerFunc is an adapter type that allows using functions as handlers.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(ctx context.Context, delivery *Delivery)

Handle calls the underlying function.

type Middleware

type Middleware func(next Handler) Handler

Middleware is a function that wraps a Handler with additional functionality.

type NoopObserver

type NoopObserver struct {
}

NoopObserver is a no-op implementation of the Observer interface.

func (NoopObserver) BeginConsuming

func (n NoopObserver) BeginConsuming(c *Consumer)

BeginConsuming performs no action.

func (NoopObserver) CloseDone

func (n NoopObserver) CloseDone(c *Consumer)

CloseDone performs no action.

func (NoopObserver) CloseStart

func (n NoopObserver) CloseStart(c *Consumer)

CloseStart performs no action.

func (NoopObserver) Error

func (n NoopObserver) Error(c *Consumer, err error)

Error performs no action.

type Observer

type Observer interface {
	// Error is called when a consumer encounters an error.
	Error(c *Consumer, err error)
	// CloseStart is called when shutdown begins.
	CloseStart(c *Consumer)
	// CloseDone is called when shutdown completes.
	CloseDone(c *Consumer)
	// BeginConsuming is called when message consumption starts.
	BeginConsuming(c *Consumer)
}

Observer defines the interface for observing consumer lifecycle events.

type Option

type Option func(c *Config)

Option is a function that applies configuration options to a Config.

func WithConcurrency

func WithConcurrency(concurrency int) Option

WithConcurrency sets the number of concurrent workers.

func WithConnectionOptions

func WithConnectionOptions(connOpts ...ConnOption) Option

WithConnectionOptions adds connection options to the configuration.

func WithMiddlewares

func WithMiddlewares(middlewares ...Middleware) Option

WithMiddlewares adds middleware functions to the configuration.

func WithObserver

func WithObserver(observer Observer) Option

WithObserver sets the lifecycle observer.

func WithSubscriptionOptions

func WithSubscriptionOptions(subOpts ...SubscriptionOption) Option

WithSubscriptionOptions adds subscription options to the configuration.

type SubscriptionOption

type SubscriptionOption = func(*frame.Frame) error

SubscriptionOption is a function type for configuring subscriptions.

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

Watcher manages the lifecycle of a consumer with automatic reconnection support.

func NewWatcher

func NewWatcher(config Config) *Watcher

NewWatcher creates a new Watcher with the provided configuration.

func (*Watcher) Healthcheck added in v1.58.0

func (w *Watcher) Healthcheck(ctx context.Context) error

Healthcheck returns an error if the watcher is not receiving messages.

func (*Watcher) Run

func (w *Watcher) Run(ctx context.Context) error

Run starts the consumer and blocks until it is ready or an error occurs. It returns the first error encountered during session initialization.

func (*Watcher) Serve added in v1.41.0

func (w *Watcher) Serve(ctx context.Context)

Serve starts the consumer and returns immediately, allowing message processing to continue.

func (*Watcher) Shutdown

func (w *Watcher) Shutdown()

Shutdown performs a graceful shutdown, waiting for all operations to complete.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL