Documentation
¶
Overview ¶
Package messaging provides a unified interface for message queue operations. It abstracts the underlying messaging implementation to allow for easy testing and future extensibility.
Index ¶
- type AMQPClient
- type AMQPClientImpl
- func (c *AMQPClientImpl) BindQueue(queue, exchange, routingKey string, noWait bool) error
- func (c *AMQPClientImpl) Close() error
- func (c *AMQPClientImpl) Consume(ctx context.Context, destination string) (<-chan amqp.Delivery, error)
- func (c *AMQPClientImpl) ConsumeFromQueue(_ context.Context, options ConsumeOptions) (<-chan amqp.Delivery, error)
- func (c *AMQPClientImpl) DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool) error
- func (c *AMQPClientImpl) DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool) error
- func (c *AMQPClientImpl) IsReady() bool
- func (c *AMQPClientImpl) Publish(ctx context.Context, destination string, data []byte) error
- func (c *AMQPClientImpl) PublishToExchange(ctx context.Context, options PublishOptions, data []byte) error
- type BindingDeclaration
- type Client
- type ConsumeOptions
- type ConsumerDeclaration
- type ExchangeDeclaration
- type MessageHandler
- type PublishOptions
- type PublisherDeclaration
- type QueueDeclaration
- type Registry
- func (r *Registry) DeclareInfrastructure(ctx context.Context) error
- func (r *Registry) GetConsumers() []*ConsumerDeclaration
- func (r *Registry) GetPublishers() []*PublisherDeclaration
- func (r *Registry) RegisterBinding(declaration *BindingDeclaration)
- func (r *Registry) RegisterConsumer(declaration *ConsumerDeclaration)
- func (r *Registry) RegisterExchange(declaration *ExchangeDeclaration)
- func (r *Registry) RegisterPublisher(declaration *PublisherDeclaration)
- func (r *Registry) RegisterQueue(declaration *QueueDeclaration)
- func (r *Registry) StartConsumers(ctx context.Context) error
- func (r *Registry) StopConsumers()
- func (r *Registry) ValidateConsumer(queue string) bool
- func (r *Registry) ValidatePublisher(exchange, routingKey string) bool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AMQPClient ¶
type AMQPClient interface {
Client
// PublishToExchange publishes a message to a specific exchange with routing key.
PublishToExchange(ctx context.Context, options PublishOptions, data []byte) error
// ConsumeFromQueue consumes messages from a queue with specific options.
ConsumeFromQueue(ctx context.Context, options ConsumeOptions) (<-chan amqp.Delivery, error)
// DeclareQueue declares a queue with the given parameters.
DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool) error
// DeclareExchange declares an exchange with the given parameters.
DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool) error
// BindQueue binds a queue to an exchange with a routing key.
BindQueue(queue, exchange, routingKey string, noWait bool) error
}
AMQPClient extends the basic Client interface with AMQP-specific functionality. This allows for more advanced AMQP features while maintaining the simple interface.
type AMQPClientImpl ¶
type AMQPClientImpl struct {
// contains filtered or unexported fields
}
AMQPClientImpl provides an AMQP implementation of the messaging client interface. It includes automatic reconnection, retry logic, and AMQP-specific features.
func NewAMQPClient ¶
func NewAMQPClient(brokerURL string, log logger.Logger) *AMQPClientImpl
NewAMQPClient creates a new AMQP client instance. It automatically attempts to connect to the broker and handles reconnections.
func (*AMQPClientImpl) BindQueue ¶
func (c *AMQPClientImpl) BindQueue(queue, exchange, routingKey string, noWait bool) error
BindQueue binds a queue to an exchange with a routing key.
func (*AMQPClientImpl) Close ¶
func (c *AMQPClientImpl) Close() error
Close gracefully shuts down the AMQP client.
func (*AMQPClientImpl) Consume ¶
func (c *AMQPClientImpl) Consume(ctx context.Context, destination string) (<-chan amqp.Delivery, error)
Consume starts consuming messages from the specified destination (queue name).
func (*AMQPClientImpl) ConsumeFromQueue ¶
func (c *AMQPClientImpl) ConsumeFromQueue(_ context.Context, options ConsumeOptions) (<-chan amqp.Delivery, error)
ConsumeFromQueue consumes messages from a queue with specific options.
func (*AMQPClientImpl) DeclareExchange ¶
func (c *AMQPClientImpl) DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool) error
DeclareExchange declares an exchange with the given parameters.
func (*AMQPClientImpl) DeclareQueue ¶
func (c *AMQPClientImpl) DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool) error
DeclareQueue declares a queue with the given parameters.
func (*AMQPClientImpl) IsReady ¶
func (c *AMQPClientImpl) IsReady() bool
IsReady returns true if the client is connected and ready to send/receive messages.
func (*AMQPClientImpl) Publish ¶
Publish sends a message to the specified destination (queue name). Uses default exchange ("") and destination as routing key.
func (*AMQPClientImpl) PublishToExchange ¶
func (c *AMQPClientImpl) PublishToExchange(ctx context.Context, options PublishOptions, data []byte) error
PublishToExchange publishes a message to a specific exchange with routing key.
type BindingDeclaration ¶
type BindingDeclaration struct {
Queue string // Queue name
Exchange string // Exchange name
RoutingKey string // Routing key pattern
NoWait bool // Do not wait for server confirmation
Args map[string]any // Additional arguments
}
BindingDeclaration defines a queue-to-exchange binding
type Client ¶
type Client interface {
// Publish sends a message to the specified destination.
// destination can be a queue name, exchange, or topic depending on the implementation.
// Returns an error if the publish operation fails.
Publish(ctx context.Context, destination string, data []byte) error
// Consume starts consuming messages from the specified destination.
// Returns a channel that delivers messages and an error if consumption setup fails.
// Messages should be acknowledged by the consumer.
Consume(ctx context.Context, destination string) (<-chan amqp.Delivery, error)
// Close gracefully shuts down the messaging client.
// It should clean up all connections and resources.
Close() error
// IsReady returns true if the client is connected and ready to send/receive messages.
IsReady() bool
}
Client defines the interface for messaging operations. It provides a simple API for publishing and consuming messages while hiding the complexity of connection management, retries, and protocol-specific details.
type ConsumeOptions ¶
type ConsumeOptions struct {
Queue string // Queue name to consume from
Consumer string // Consumer tag
AutoAck bool // Auto-acknowledge messages
Exclusive bool // Exclusive consumer
NoLocal bool // No-local flag
NoWait bool // No-wait flag
}
ConsumeOptions contains options for consuming messages with AMQP-specific features.
type ConsumerDeclaration ¶
type ConsumerDeclaration struct {
Queue string // Queue to consume from
Consumer string // Consumer tag
AutoAck bool // Automatically acknowledge messages
Exclusive bool // Exclusive consumer
NoLocal bool // Do not deliver to the connection that published
NoWait bool // Do not wait for server confirmation
EventType string // Event type identifier
Description string // Human-readable description
Handler MessageHandler // Message handler (optional for documentation-only declarations)
}
ConsumerDeclaration defines what a module consumes and how to handle messages
type ExchangeDeclaration ¶
type ExchangeDeclaration struct {
Name string // Exchange name
Type string // Exchange type (direct, topic, fanout, headers)
Durable bool // Survive server restart
AutoDelete bool // Delete when no longer used
Internal bool // Internal exchange
NoWait bool // Do not wait for server confirmation
Args map[string]any // Additional arguments
}
ExchangeDeclaration defines an exchange to be declared
type MessageHandler ¶
type MessageHandler interface {
// Handle processes a message and returns an error if processing fails.
// If an error is returned, the message will be negatively acknowledged (nack).
// If no error is returned, the message will be acknowledged (ack).
// The delivery is passed by pointer for performance reasons.
Handle(ctx context.Context, delivery *amqp.Delivery) error
// EventType returns the event type this handler can process.
// This is used for routing messages to the correct handler.
EventType() string
}
MessageHandler defines the interface for processing consumed messages. Handlers should implement this interface to process specific message types.
type PublishOptions ¶
type PublishOptions struct {
Exchange string // AMQP exchange name
RoutingKey string // AMQP routing key
Headers map[string]any // Message headers
Mandatory bool // AMQP mandatory flag
Immediate bool // AMQP immediate flag
}
PublishOptions contains options for publishing messages with AMQP-specific features.
type PublisherDeclaration ¶
type PublisherDeclaration struct {
Exchange string // Target exchange
RoutingKey string // Default routing key
EventType string // Event type identifier
Description string // Human-readable description
Mandatory bool // Message must be routed to a queue
Immediate bool // Message must be delivered immediately
Headers map[string]any // Default headers
}
PublisherDeclaration defines what a module publishes
type QueueDeclaration ¶
type QueueDeclaration struct {
Name string // Queue name
Durable bool // Survive server restart
AutoDelete bool // Delete when no consumers
Exclusive bool // Only accessible by declaring connection
NoWait bool // Do not wait for server confirmation
Args map[string]any // Additional arguments
}
QueueDeclaration defines a queue to be declared
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages messaging infrastructure declarations across modules. It ensures queues, exchanges, and bindings are properly declared before use. It also manages consumer lifecycle and handles message routing to handlers.
func NewRegistry ¶
func NewRegistry(client AMQPClient, log logger.Logger) *Registry
NewRegistry creates a new messaging registry
func (*Registry) DeclareInfrastructure ¶
DeclareInfrastructure declares all registered messaging infrastructure
func (*Registry) GetConsumers ¶
func (r *Registry) GetConsumers() []*ConsumerDeclaration
GetConsumers returns all registered consumers (for documentation/monitoring)
func (*Registry) GetPublishers ¶
func (r *Registry) GetPublishers() []*PublisherDeclaration
GetPublishers returns all registered publishers (for documentation/monitoring)
func (*Registry) RegisterBinding ¶
func (r *Registry) RegisterBinding(declaration *BindingDeclaration)
RegisterBinding registers a binding for declaration
func (*Registry) RegisterConsumer ¶
func (r *Registry) RegisterConsumer(declaration *ConsumerDeclaration)
RegisterConsumer registers a consumer declaration
func (*Registry) RegisterExchange ¶
func (r *Registry) RegisterExchange(declaration *ExchangeDeclaration)
RegisterExchange registers an exchange for declaration
func (*Registry) RegisterPublisher ¶
func (r *Registry) RegisterPublisher(declaration *PublisherDeclaration)
RegisterPublisher registers a publisher declaration
func (*Registry) RegisterQueue ¶
func (r *Registry) RegisterQueue(declaration *QueueDeclaration)
RegisterQueue registers a queue for declaration
func (*Registry) StartConsumers ¶
StartConsumers starts all registered consumers with handlers. This should be called after DeclareInfrastructure and before starting the main application.
func (*Registry) StopConsumers ¶
func (r *Registry) StopConsumers()
StopConsumers gracefully stops all running consumers.
func (*Registry) ValidateConsumer ¶
ValidateConsumer checks if a consumer is registered for the given queue
func (*Registry) ValidatePublisher ¶
ValidatePublisher checks if a publisher is registered for the given exchange/routing key