Documentation
¶
Overview ¶
Package consumer provides functionality for consuming messages from a STOMP broker.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var (
ErrDeliveryAlreadyHandled = errors.New("delivery already handled")
)
ErrDeliveryAlreadyHandled is returned when attempting to acknowledge a delivery that has already been processed.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Address is the broker address.
Address string
// Queue is the queue name to consume from.
Queue string
// ConnOpts are connection options.
ConnOpts []ConnOption
// Concurrency is the number of concurrent workers.
Concurrency int
// Middlewares are middleware functions applied to the handler.
Middlewares []Middleware
// SubscriptionOpts are subscription options.
SubscriptionOpts []SubscriptionOption
// Observer is the lifecycle observer.
Observer Observer
// ReconnectTimeout is the delay before reconnecting.
ReconnectTimeout time.Duration
// contains filtered or unexported fields
}
Config holds the configuration for a message consumer.
type ConnOption ¶
ConnOption is a function type for configuring STOMP connections.
type Consumer ¶
type Consumer struct {
Config
// contains filtered or unexported fields
}
Consumer manages a connection to a STOMP broker and processes messages from a queue.
type Delivery ¶
type Delivery struct {
// contains filtered or unexported fields
}
Delivery represents a message delivery that can be acknowledged or negatively acknowledged.
func NewDelivery ¶
NewDelivery creates a new delivery instance.
func (*Delivery) Ack ¶
Ack acknowledges the message delivery. Returns ErrDeliveryAlreadyHandled if the delivery has already been processed.
type Donner ¶
type Donner interface {
Done()
}
Donner defines an interface for signaling completion.
type HandlerFunc ¶
HandlerFunc is an adapter type that allows using functions as handlers.
type Middleware ¶
Middleware is a function that wraps a Handler with additional functionality.
type NoopObserver ¶
type NoopObserver struct {
}
NoopObserver is a no-op implementation of the Observer interface.
func (NoopObserver) BeginConsuming ¶
func (n NoopObserver) BeginConsuming(c *Consumer)
BeginConsuming performs no action.
func (NoopObserver) CloseDone ¶
func (n NoopObserver) CloseDone(c *Consumer)
CloseDone performs no action.
func (NoopObserver) CloseStart ¶
func (n NoopObserver) CloseStart(c *Consumer)
CloseStart performs no action.
func (NoopObserver) Error ¶
func (n NoopObserver) Error(c *Consumer, err error)
Error performs no action.
type Observer ¶
type Observer interface {
// Error is called when a consumer encounters an error.
Error(c *Consumer, err error)
// CloseStart is called when shutdown begins.
CloseStart(c *Consumer)
// CloseDone is called when shutdown completes.
CloseDone(c *Consumer)
// BeginConsuming is called when message consumption starts.
BeginConsuming(c *Consumer)
}
Observer defines the interface for observing consumer lifecycle events.
type Option ¶
type Option func(c *Config)
Option is a function that applies configuration options to a Config.
func WithConcurrency ¶
WithConcurrency sets the number of concurrent workers.
func WithConnectionOptions ¶
func WithConnectionOptions(connOpts ...ConnOption) Option
WithConnectionOptions adds connection options to the configuration.
func WithMiddlewares ¶
func WithMiddlewares(middlewares ...Middleware) Option
WithMiddlewares adds middleware functions to the configuration.
func WithObserver ¶
WithObserver sets the lifecycle observer.
func WithSubscriptionOptions ¶
func WithSubscriptionOptions(subOpts ...SubscriptionOption) Option
WithSubscriptionOptions adds subscription options to the configuration.
type SubscriptionOption ¶
SubscriptionOption is a function type for configuring subscriptions.
type Watcher ¶
type Watcher struct {
// contains filtered or unexported fields
}
Watcher manages the lifecycle of a consumer with automatic reconnection support.
func NewWatcher ¶
NewWatcher creates a new Watcher with the provided configuration.
func (*Watcher) Healthcheck ¶ added in v1.58.0
Healthcheck returns an error if the watcher is not receiving messages.
func (*Watcher) Run ¶
Run starts the consumer and blocks until it is ready or an error occurs. It returns the first error encountered during session initialization.