Documentation
¶
Overview ¶
Package grmq provides a high-level client for interacting with RabbitMQ, supporting publishers, consumers, automatic reconnection, and topology management.
Index ¶
- type Client
- type ClientOption
- func WithConsumers(consumers ...consumer.Consumer) ClientOption
- func WithDeclarations(declarations topology.Declarations) ClientOption
- func WithDialConfig(config DialConfig) ClientOption
- func WithObserver(observer Observer) ClientOption
- func WithPublishers(publishers ...*publisher.Publisher) ClientOption
- func WithReconnectTimeout(timeout time.Duration) ClientOption
- func WithTopologyBuilding(options ...topology.DeclarationsOption) ClientOption
- type Consumer
- type Declarator
- type DialConfig
- type NoopObserver
- func (n NoopObserver) ClientError(err error)
- func (n NoopObserver) ClientReady()
- func (n NoopObserver) ConnectionBlocked(reason string)
- func (n NoopObserver) ConnectionUnblocked()
- func (n NoopObserver) ConsumerError(consumer consumer.Consumer, err error)
- func (n NoopObserver) PublisherError(publisher *publisher.Publisher, err error)
- func (n NoopObserver) PublisherReconnected(publisher *publisher.Publisher)
- func (n NoopObserver) PublishingFlow(publisher *publisher.Publisher, flow bool)
- func (n NoopObserver) ShutdownDone()
- func (n NoopObserver) ShutdownStarted()
- type Observer
- type Publisher
- func (p *Publisher) Close() error
- func (p *Publisher) Publish(ctx context.Context, exchange string, routingKey string, msg *amqp.Publishing) error
- func (p *Publisher) PublishWithConfirmation(ctx context.Context, exchange string, routingKey string, msg *amqp.Publishing) error
- func (p *Publisher) Run() error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client manages connections to a RabbitMQ broker, handling publishers, consumers, and topology declarations with automatic reconnection support.
func New ¶
func New(url string, options ...ClientOption) *Client
New creates a new Client instance with the specified RabbitMQ URL and optional configuration. Default settings include a 10-second heartbeat and 30-second dial timeout.
func (*Client) Run ¶
Run blocks until a successful RabbitMQ session is established. It ensures all topology declarations are applied, all publishers are initialized, and all consumers are running. Returns the first error encountered during session initialization, or nil if successful.
func (*Client) Serve ¶
Serve starts the client without blocking for the first successful session. Unlike Run, it immediately returns and relies on the Observer to handle errors and manage reconnection attempts.
func (*Client) Shutdown ¶
func (s *Client) Shutdown()
Shutdown performs a graceful shutdown of the client, closing all publishers, consumers, and the underlying connection.
func (*Client) UnsafeConnection ¶ added in v1.11.0
func (s *Client) UnsafeConnection() *amqp.Connection
UnsafeConnection returns the underlying *amqp.Connection for the active session. Returns nil if no session is currently connected.
Warning: This method provides direct access to the RabbitMQ connection and should be used with caution.
type ClientOption ¶
type ClientOption func(c *Client)
ClientOption is a function that configures a Client instance.
func WithConsumers ¶
func WithConsumers(consumers ...consumer.Consumer) ClientOption
WithConsumers sets the consumers for the client.
func WithDeclarations ¶
func WithDeclarations(declarations topology.Declarations) ClientOption
WithDeclarations sets the topology declarations for the client.
func WithDialConfig ¶
func WithDialConfig(config DialConfig) ClientOption
WithDialConfig sets the dial configuration for connecting to RabbitMQ.
func WithObserver ¶
func WithObserver(observer Observer) ClientOption
WithObserver sets the observer for monitoring client events.
func WithPublishers ¶
func WithPublishers(publishers ...*publisher.Publisher) ClientOption
WithPublishers sets the publishers for the client.
func WithReconnectTimeout ¶
func WithReconnectTimeout(timeout time.Duration) ClientOption
WithReconnectTimeout sets the timeout between reconnection attempts.
func WithTopologyBuilding ¶
func WithTopologyBuilding(options ...topology.DeclarationsOption) ClientOption
WithTopologyBuilding creates and sets topology declarations using the provided options.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer wraps a consumer instance with connection management and retry support.
func NewConsumer ¶
func NewConsumer(cfg consumer.Consumer, ch *amqp.Channel, retryPub *Publisher, observer Observer) *Consumer
NewConsumer creates a new Consumer instance with the specified configuration and optional retry support.
type Declarator ¶
type Declarator struct {
// contains filtered or unexported fields
}
Declarator manages RabbitMQ topology declarations (exchanges, queues, bindings).
func NewDeclarator ¶
func NewDeclarator(cfg topology.Declarations, ch *amqp.Channel) *Declarator
NewDeclarator creates a new Declarator with the specified topology configuration.
func (*Declarator) Close ¶
func (c *Declarator) Close() error
Close closes the declarator's channel.
func (*Declarator) Run ¶
func (c *Declarator) Run() error
Run declares all exchanges, queues, and bindings in the configured topology. Returns an error if any declaration fails.
type DialConfig ¶
DialConfig wraps amqp.Config with additional dial timeout configuration.
type NoopObserver ¶
type NoopObserver struct {
}
NoopObserver provides a default no-op implementation of the Observer interface. Use this when you don't need to observe client events. You can also embed NoopObserver in your observer struct
func (NoopObserver) ClientError ¶
func (n NoopObserver) ClientError(err error)
func (NoopObserver) ClientReady ¶
func (n NoopObserver) ClientReady()
func (NoopObserver) ConnectionBlocked ¶ added in v1.8.0
func (n NoopObserver) ConnectionBlocked(reason string)
func (NoopObserver) ConnectionUnblocked ¶ added in v1.8.0
func (n NoopObserver) ConnectionUnblocked()
func (NoopObserver) ConsumerError ¶
func (n NoopObserver) ConsumerError(consumer consumer.Consumer, err error)
func (NoopObserver) PublisherError ¶
func (n NoopObserver) PublisherError(publisher *publisher.Publisher, err error)
func (NoopObserver) PublisherReconnected ¶ added in v1.10.0
func (n NoopObserver) PublisherReconnected(publisher *publisher.Publisher)
func (NoopObserver) PublishingFlow ¶
func (n NoopObserver) PublishingFlow(publisher *publisher.Publisher, flow bool)
func (NoopObserver) ShutdownDone ¶
func (n NoopObserver) ShutdownDone()
func (NoopObserver) ShutdownStarted ¶
func (n NoopObserver) ShutdownStarted()
type Observer ¶
type Observer interface {
ClientReady()
ClientError(err error)
ConsumerError(consumer consumer.Consumer, err error)
PublisherError(publisher *publisher.Publisher, err error)
PublishingFlow(publisher *publisher.Publisher, flow bool)
PublisherReconnected(publisher *publisher.Publisher)
ConnectionBlocked(reason string)
ConnectionUnblocked()
ShutdownStarted()
ShutdownDone()
}
Observer defines an interface for monitoring client lifecycle events and errors. Implement this interface to receive notifications about connection state, publisher/consumer errors, and shutdown events.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher wraps a publisher instance with connection management, confirmation support, and automatic reconnection on channel failures.
Reconnection Behavior: The publisher automatically reconnects when a precondition failed error occurs (e.g., exchange or queue declaration mismatch). Reconnection creates a new channel, resets confirmation mode, and re-registers the round tripper.
Note: Reconnection is attempted only once per error. If reconnection fails, the publisher stops processing and reports the error via the Observer.
func NewPublisher ¶
func NewPublisher(publisher *publisher2.Publisher, ch *amqp.Channel, observer Observer, conn *amqp.Connection) *Publisher
NewPublisher creates a new Publisher instance with the specified configuration.
func (*Publisher) Publish ¶
func (p *Publisher) Publish(ctx context.Context, exchange string, routingKey string, msg *amqp.Publishing) error
Publish sends a message to the specified exchange and routing key without waiting for confirmation. Returns an error if the publish operation fails.
func (*Publisher) PublishWithConfirmation ¶
func (p *Publisher) PublishWithConfirmation(ctx context.Context, exchange string, routingKey string, msg *amqp.Publishing) error
PublishWithConfirmation sends a message and waits for broker confirmation. Enables publisher confirms mode on first call. Returns an error if the message is not acknowledged by the broker or if the publish operation fails.
func (*Publisher) Run ¶
Run initializes the publisher's event watchers and registers it with the underlying publisher. Must be called before using the publisher.
This method starts a background watcher that monitors for channel errors and flow control events. If a precondition failed error occurs (e.g., due to topology changes), the watcher triggers automatic reconnection. The watcher runs until the channel is closed or an unrecoverable error occurs.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package consumer provides a high-level consumer for receiving messages from RabbitMQ with support for concurrency, middleware, and retry policies.
|
Package consumer provides a high-level consumer for receiving messages from RabbitMQ with support for concurrency, middleware, and retry policies. |
|
Package publisher provides a high-level publisher for sending messages to RabbitMQ with support for middleware and confirmation modes.
|
Package publisher provides a high-level publisher for sending messages to RabbitMQ with support for middleware and confirmation modes. |
|
Package retry provides retry policies for failed message processing in consumers.
|
Package retry provides retry policies for failed message processing in consumers. |
|
Package topology provides types and functions for defining RabbitMQ topology including exchanges, queues, and bindings.
|
Package topology provides types and functions for defining RabbitMQ topology including exchanges, queues, and bindings. |