messaging

package
v0.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2025 License: MIT Imports: 28 Imported by: 0

Documentation

Overview

Package messaging provides a unified interface for message queue operations. It abstracts the underlying messaging implementation to allow for easy testing and future extensibility.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func StartConsumeSpan added in v0.12.0

func StartConsumeSpan(ctx context.Context, delivery *amqp.Delivery, queueName string) (context.Context, trace.Span)

StartConsumeSpan creates an OpenTelemetry span for message consumption. It extracts the trace context from the delivery headers and creates a child span. This should be called by consumers when processing messages. The returned context should be used for downstream operations, and the span must be ended when done.

Types

type AMQPClient

type AMQPClient interface {
	Client

	// PublishToExchange publishes a message to a specific exchange with routing key.
	PublishToExchange(ctx context.Context, options PublishOptions, data []byte) error

	// ConsumeFromQueue consumes messages from a queue with specific options.
	ConsumeFromQueue(ctx context.Context, options ConsumeOptions) (<-chan amqp.Delivery, error)

	// DeclareQueue declares a queue with the given parameters.
	DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool) error

	// DeclareExchange declares an exchange with the given parameters.
	DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool) error

	// BindQueue binds a queue to an exchange with a routing key.
	BindQueue(queue, exchange, routingKey string, noWait bool) error
}

AMQPClient extends the basic Client interface with AMQP-specific functionality. This allows for more advanced AMQP features while maintaining the simple interface.

type AMQPClientImpl

type AMQPClientImpl struct {
	// contains filtered or unexported fields
}

AMQPClientImpl provides an AMQP implementation of the messaging client interface. It includes automatic reconnection, retry logic, and AMQP-specific features.

func NewAMQPClient

func NewAMQPClient(brokerURL string, log logger.Logger) *AMQPClientImpl

NewAMQPClient creates a new AMQP client instance. It automatically attempts to connect to the broker and handles reconnections.

func (*AMQPClientImpl) BindQueue

func (c *AMQPClientImpl) BindQueue(queue, exchange, routingKey string, noWait bool) error

BindQueue binds a queue to an exchange with a routing key.

func (*AMQPClientImpl) Close

func (c *AMQPClientImpl) Close() error

Close gracefully shuts down the AMQP client.

func (*AMQPClientImpl) Consume

func (c *AMQPClientImpl) Consume(ctx context.Context, destination string) (<-chan amqp.Delivery, error)

Consume starts consuming messages from the specified destination (queue name).

func (*AMQPClientImpl) ConsumeFromQueue

func (c *AMQPClientImpl) ConsumeFromQueue(_ context.Context, options ConsumeOptions) (<-chan amqp.Delivery, error)

ConsumeFromQueue consumes messages from a queue with specific options.

func (*AMQPClientImpl) DeclareExchange

func (c *AMQPClientImpl) DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool) error

DeclareExchange declares an exchange with the given parameters.

func (*AMQPClientImpl) DeclareQueue

func (c *AMQPClientImpl) DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool) error

DeclareQueue declares a queue with the given parameters.

func (*AMQPClientImpl) IsReady

func (c *AMQPClientImpl) IsReady() bool

IsReady returns true if the client is connected and ready to send/receive messages.

func (*AMQPClientImpl) Publish

func (c *AMQPClientImpl) Publish(ctx context.Context, destination string, data []byte) error

Publish sends a message to the specified destination (queue name). Uses default exchange ("") and destination as routing key.

func (*AMQPClientImpl) PublishToExchange

func (c *AMQPClientImpl) PublishToExchange(ctx context.Context, options PublishOptions, data []byte) error

PublishToExchange publishes a message to a specific exchange with routing key.

type BindingDeclaration

type BindingDeclaration struct {
	Queue      string         // Queue name
	Exchange   string         // Exchange name
	RoutingKey string         // Routing key pattern
	NoWait     bool           // Do not wait for server confirmation
	Args       map[string]any // Additional arguments
}

BindingDeclaration defines a queue-to-exchange binding

func NewBinding added in v0.14.1

func NewBinding(queue, exchange, routingKey string) *BindingDeclaration

NewBinding creates a binding declaration between a queue and exchange.

Parameters:

  • queue: Queue name to bind
  • exchange: Exchange name to bind to
  • routingKey: Routing key pattern (e.g., "order.*", "user.created")

type Client

type Client interface {
	// Publish sends a message to the specified destination.
	// destination can be a queue name, exchange, or topic depending on the implementation.
	// Returns an error if the publish operation fails.
	Publish(ctx context.Context, destination string, data []byte) error

	// Consume starts consuming messages from the specified destination.
	// Returns a channel that delivers messages and an error if consumption setup fails.
	// Messages should be acknowledged by the consumer.
	Consume(ctx context.Context, destination string) (<-chan amqp.Delivery, error)

	// Close gracefully shuts down the messaging client.
	// It should clean up all connections and resources.
	Close() error

	// IsReady returns true if the client is connected and ready to send/receive messages.
	IsReady() bool
}

Client defines the interface for messaging operations. It provides a simple API for publishing and consuming messages while hiding the complexity of connection management, retries, and protocol-specific details.

type ClientFactory added in v0.9.0

type ClientFactory func(string, logger.Logger) AMQPClient

ClientFactory creates AMQP clients from URLs

type ConsumeOptions

type ConsumeOptions struct {
	Queue         string // Queue name to consume from
	Consumer      string // Consumer tag
	AutoAck       bool   // Auto-acknowledge messages
	Exclusive     bool   // Exclusive consumer
	NoLocal       bool   // No-local flag
	NoWait        bool   // No-wait flag
	PrefetchCount int    // RabbitMQ prefetch count (0 or negative = default to 1)
}

ConsumeOptions contains options for consuming messages with AMQP-specific features.

type ConsumerDeclaration

type ConsumerDeclaration struct {
	Queue         string         // Queue to consume from
	Consumer      string         // Consumer tag
	AutoAck       bool           // Automatically acknowledge messages
	Exclusive     bool           // Exclusive consumer
	NoLocal       bool           // Do not deliver to the connection that published
	NoWait        bool           // Do not wait for server confirmation
	EventType     string         // Event type identifier
	Description   string         // Human-readable description
	Handler       MessageHandler // Message handler (optional for documentation-only declarations)
	Workers       int            // Number of concurrent workers (0 = auto-scale to NumCPU*4, >0 = explicit)
	PrefetchCount int            // RabbitMQ prefetch count (0 = auto-scale to Workers*10, capped at 500)
}

ConsumerDeclaration defines what a module consumes and how to handle messages

func NewConsumer added in v0.14.1

func NewConsumer(opts *ConsumerOptions) *ConsumerDeclaration

NewConsumer creates a consumer declaration from options.

type ConsumerOptions added in v0.14.1

type ConsumerOptions struct {
	Queue         string         // Queue name to consume from
	Consumer      string         // Consumer tag
	EventType     string         // Event type identifier
	Description   string         // Human-readable description
	Handler       MessageHandler // Message handler (optional for documentation-only declarations)
	AutoAck       bool           // Automatically acknowledge messages (default: false)
	Exclusive     bool           // Exclusive consumer (default: false)
	NoLocal       bool           // Don't deliver to the connection that published (default: false)
	Workers       int            // Number of concurrent workers (0 = auto-scale to NumCPU*4, >0 = explicit)
	PrefetchCount int            // RabbitMQ prefetch count (0 = auto-scale to Workers*10, capped at 500)
}

ConsumerOptions contains configuration for creating a consumer declaration.

type DeclarationStats added in v0.9.0

type DeclarationStats struct {
	Exchanges  int
	Queues     int
	Bindings   int
	Publishers int
	Consumers  int
}

Stats returns statistics about the declarations.

type Declarations added in v0.9.0

type Declarations struct {
	Exchanges  map[string]*ExchangeDeclaration
	Queues     map[string]*QueueDeclaration
	Bindings   []*BindingDeclaration
	Publishers []*PublisherDeclaration
	// contains filtered or unexported fields
}

Declarations stores messaging infrastructure declarations made by modules at startup. This is a pure data structure (no client dependencies) that can be validated once and replayed to multiple per-tenant registries.

func NewDeclarations added in v0.9.0

func NewDeclarations() *Declarations

NewDeclarations creates a new empty declarations store.

func (*Declarations) Clone added in v0.9.0

func (d *Declarations) Clone() *Declarations

Clone creates a deep copy of the declarations. This is useful for creating per-tenant copies during replay.

func (*Declarations) Consumers added in v0.9.0

func (d *Declarations) Consumers() []*ConsumerDeclaration

Consumers returns all consumer declarations in registration order. Used internally by ReplayToRegistry and for observability/metrics.

func (*Declarations) DeclareBinding added in v0.14.1

func (d *Declarations) DeclareBinding(queue, exchange, routingKey string) *BindingDeclaration

DeclareBinding creates and registers a binding in one step. Returns the created binding declaration for reference.

func (*Declarations) DeclareConsumer added in v0.14.1

func (d *Declarations) DeclareConsumer(opts *ConsumerOptions, queue *QueueDeclaration) *ConsumerDeclaration

DeclareConsumer creates and registers a consumer in one step.

If queue is non-nil and not already registered, it will be automatically registered. This hybrid approach allows consumers to optionally declare their dependencies.

Usage:

  • Pass nil if queue is already registered separately
  • Pass queue declaration to auto-register (convenience for simple cases)

func (*Declarations) DeclarePublisher added in v0.14.1

func (d *Declarations) DeclarePublisher(opts *PublisherOptions, exchange *ExchangeDeclaration) *PublisherDeclaration

DeclarePublisher creates and registers a publisher in one step.

If exchange is non-nil and not already registered, it will be automatically registered. This hybrid approach allows publishers to optionally declare their dependencies.

Usage:

  • Pass nil if exchange is already registered separately
  • Pass exchange declaration to auto-register (convenience for simple cases)

func (*Declarations) DeclareQueue added in v0.14.1

func (d *Declarations) DeclareQueue(name string) *QueueDeclaration

DeclareQueue creates and registers a queue in one step. Returns the created queue declaration for reference.

func (*Declarations) DeclareTopicExchange added in v0.14.1

func (d *Declarations) DeclareTopicExchange(name string) *ExchangeDeclaration

DeclareTopicExchange creates and registers a topic exchange in one step. Returns the created exchange declaration for reference.

func (*Declarations) Hash added in v0.15.0

func (d *Declarations) Hash() uint64

Hash generates a deterministic hash of all declarations. This is used by Manager to detect duplicate replay attempts (idempotency). The hash is stable across multiple calls and does not include handler functions.

func (*Declarations) RegisterBinding added in v0.9.0

func (d *Declarations) RegisterBinding(b *BindingDeclaration)

RegisterBinding adds a binding declaration to the store.

func (*Declarations) RegisterConsumer added in v0.9.0

func (d *Declarations) RegisterConsumer(c *ConsumerDeclaration)

RegisterConsumer adds a consumer declaration to the store. Panics if a consumer with the same queue+consumer+event_type already exists.

func (*Declarations) RegisterExchange added in v0.9.0

func (d *Declarations) RegisterExchange(e *ExchangeDeclaration)

RegisterExchange adds an exchange declaration to the store.

func (*Declarations) RegisterPublisher added in v0.9.0

func (d *Declarations) RegisterPublisher(p *PublisherDeclaration)

RegisterPublisher adds a publisher declaration to the store.

func (*Declarations) RegisterQueue added in v0.9.0

func (d *Declarations) RegisterQueue(q *QueueDeclaration)

RegisterQueue adds a queue declaration to the store.

func (*Declarations) ReplayToRegistry added in v0.9.0

func (d *Declarations) ReplayToRegistry(reg RegistryInterface) error

ReplayToRegistry applies all declarations to a runtime registry. The order is important: exchanges first, then queues, then bindings, then publishers/consumers.

func (*Declarations) Stats added in v0.9.0

func (d *Declarations) Stats() DeclarationStats

Stats returns counts of each declaration type.

func (*Declarations) Validate added in v0.9.0

func (d *Declarations) Validate() error

Validate checks the integrity of all declarations. It ensures that references between declarations are valid.

type ExchangeDeclaration

type ExchangeDeclaration struct {
	Name       string         // Exchange name
	Type       string         // Exchange type (direct, topic, fanout, headers)
	Durable    bool           // Survive server restart
	AutoDelete bool           // Delete when no longer used
	Internal   bool           // Internal exchange
	NoWait     bool           // Do not wait for server confirmation
	Args       map[string]any // Additional arguments
}

ExchangeDeclaration defines an exchange to be declared

func NewTopicExchange added in v0.14.1

func NewTopicExchange(name string) *ExchangeDeclaration

NewTopicExchange creates a topic exchange with production-safe defaults. Topic exchanges route messages based on routing key patterns (e.g., "order.*", "user.#").

Production defaults:

  • Durable: true (survives broker restart)
  • AutoDelete: false (won't delete when unused)
  • Internal: false (can be published to directly)
  • NoWait: false (waits for broker confirmation)

type Manager added in v0.9.0

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages AMQP clients by string keys with different lifecycle strategies. Publishers are cached with idle eviction (can be recreated easily). Consumers are long-lived (must stay alive to receive messages). The manager is key-agnostic - it doesn't know about tenants, just manages named clients.

func NewMessagingManager added in v0.9.0

func NewMessagingManager(resourceSource TenantMessagingResourceSource, log logger.Logger, opts ManagerOptions, clientFactory ClientFactory) *Manager

NewMessagingManager creates a new messaging manager

func (*Manager) Close added in v0.9.0

func (m *Manager) Close() error

Close closes all clients and stops cleanup

func (*Manager) EnsureConsumers added in v0.9.0

func (m *Manager) EnsureConsumers(ctx context.Context, key string, decls *Declarations) error

EnsureConsumers creates and starts consumers for the given key using the provided declarations. This should be called once per key to set up long-lived consumers. Subsequent calls for the same key are idempotent.

func (*Manager) Publisher added in v0.19.0

func (m *Manager) Publisher(ctx context.Context, key string) (AMQPClient, error)

Publisher returns a publisher client for the given key. Publishers are cached with LRU eviction and lazy initialization.

func (*Manager) StartCleanup added in v0.9.0

func (m *Manager) StartCleanup(interval time.Duration)

StartCleanup starts the background cleanup routine for idle publishers

func (*Manager) Stats added in v0.9.0

func (m *Manager) Stats() map[string]any

Stats returns statistics about the messaging manager

func (*Manager) StopCleanup added in v0.9.0

func (m *Manager) StopCleanup()

StopCleanup stops the background cleanup routine

type ManagerOptions added in v0.9.0

type ManagerOptions struct {
	MaxPublishers int           // Maximum number of publisher clients to keep cached
	IdleTTL       time.Duration // Time after which idle publishers are evicted
}

ManagerOptions configures the MessagingManager

type MessageHandler

type MessageHandler interface {
	// Handle processes a message and returns an error if processing fails.
	// If an error is returned, the message will be negatively acknowledged (nack).
	// If no error is returned, the message will be acknowledged (ack).
	// The delivery is passed by pointer for performance reasons.
	Handle(ctx context.Context, delivery *amqp.Delivery) error

	// EventType returns the event type this handler can process.
	// This is used for routing messages to the correct handler.
	EventType() string
}

MessageHandler defines the interface for processing consumed messages. Handlers should implement this interface to process specific message types.

type PublishOptions

type PublishOptions struct {
	Exchange   string         // AMQP exchange name
	RoutingKey string         // AMQP routing key
	Headers    map[string]any // Message headers
	Mandatory  bool           // AMQP mandatory flag
	Immediate  bool           // AMQP immediate flag
}

PublishOptions contains options for publishing messages with AMQP-specific features.

type PublisherDeclaration

type PublisherDeclaration struct {
	Exchange    string         // Target exchange
	RoutingKey  string         // Default routing key
	EventType   string         // Event type identifier
	Description string         // Human-readable description
	Mandatory   bool           // Message must be routed to a queue
	Immediate   bool           // Message must be delivered immediately
	Headers     map[string]any // Default headers
}

PublisherDeclaration defines what a module publishes

func NewPublisher added in v0.14.1

func NewPublisher(opts *PublisherOptions) *PublisherDeclaration

NewPublisher creates a publisher declaration from options. If Headers is nil, an empty map is created.

type PublisherOptions added in v0.14.1

type PublisherOptions struct {
	Exchange    string         // Target exchange name
	RoutingKey  string         // Routing key for messages
	EventType   string         // Event type identifier
	Description string         // Human-readable description
	Headers     map[string]any // Default headers (optional)
	Mandatory   bool           // Message must be routed to a queue (default: false)
	Immediate   bool           // Message must be delivered immediately (default: false)
}

PublisherOptions contains configuration for creating a publisher declaration.

type QueueDeclaration

type QueueDeclaration struct {
	Name       string         // Queue name
	Durable    bool           // Survive server restart
	AutoDelete bool           // Delete when no consumers
	Exclusive  bool           // Only accessible by declaring connection
	NoWait     bool           // Do not wait for server confirmation
	Args       map[string]any // Additional arguments
}

QueueDeclaration defines a queue to be declared

func NewQueue added in v0.14.1

func NewQueue(name string) *QueueDeclaration

NewQueue creates a queue with production-safe defaults.

Production defaults:

  • Durable: true (survives broker restart)
  • AutoDelete: false (won't delete when consumers disconnect)
  • Exclusive: false (can be accessed by multiple connections)
  • NoWait: false (waits for broker confirmation)

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages messaging infrastructure declarations across modules. It ensures queues, exchanges, and bindings are properly declared before use. It also manages consumer lifecycle and handles message routing to handlers.

func NewRegistry

func NewRegistry(client AMQPClient, log logger.Logger) *Registry

NewRegistry creates a new messaging registry

func (*Registry) Bindings added in v0.19.0

func (r *Registry) Bindings() []*BindingDeclaration

Bindings returns all registered bindings (for testing/monitoring)

func (*Registry) Consumers added in v0.19.0

func (r *Registry) Consumers() []*ConsumerDeclaration

Consumers returns all registered consumers (for documentation/monitoring)

func (*Registry) DeclareInfrastructure

func (r *Registry) DeclareInfrastructure(ctx context.Context) error

DeclareInfrastructure declares all registered messaging infrastructure

func (*Registry) Exchanges added in v0.19.0

func (r *Registry) Exchanges() map[string]*ExchangeDeclaration

Exchanges returns all registered exchanges (for testing/monitoring)

func (*Registry) Publishers added in v0.19.0

func (r *Registry) Publishers() []*PublisherDeclaration

Publishers returns all registered publishers (for documentation/monitoring)

func (*Registry) Queues added in v0.19.0

func (r *Registry) Queues() map[string]*QueueDeclaration

Queues returns all registered queues (for testing/monitoring)

func (*Registry) RegisterBinding

func (r *Registry) RegisterBinding(declaration *BindingDeclaration)

RegisterBinding registers a binding for declaration

func (*Registry) RegisterConsumer

func (r *Registry) RegisterConsumer(declaration *ConsumerDeclaration)

RegisterConsumer registers a consumer declaration

func (*Registry) RegisterExchange

func (r *Registry) RegisterExchange(declaration *ExchangeDeclaration)

RegisterExchange registers an exchange for declaration

func (*Registry) RegisterPublisher

func (r *Registry) RegisterPublisher(declaration *PublisherDeclaration)

RegisterPublisher registers a publisher declaration

func (*Registry) RegisterQueue

func (r *Registry) RegisterQueue(declaration *QueueDeclaration)

RegisterQueue registers a queue for declaration

func (*Registry) StartConsumers

func (r *Registry) StartConsumers(ctx context.Context) error

StartConsumers starts all registered consumers with handlers. This should be called after DeclareInfrastructure and before starting the main application.

func (*Registry) StopConsumers

func (r *Registry) StopConsumers()

StopConsumers gracefully stops all running consumers.

func (*Registry) ValidateConsumer

func (r *Registry) ValidateConsumer(queue string) bool

ValidateConsumer checks if a consumer is registered for the given queue

func (*Registry) ValidatePublisher

func (r *Registry) ValidatePublisher(exchange, routingKey string) bool

ValidatePublisher checks if a publisher is registered for the given exchange/routing key

type RegistryInterface added in v0.8.0

type RegistryInterface interface {
	// Registration methods
	RegisterExchange(declaration *ExchangeDeclaration)
	RegisterQueue(declaration *QueueDeclaration)
	RegisterBinding(declaration *BindingDeclaration)
	RegisterPublisher(declaration *PublisherDeclaration)
	RegisterConsumer(declaration *ConsumerDeclaration)

	// Infrastructure lifecycle
	DeclareInfrastructure(ctx context.Context) error
	StartConsumers(ctx context.Context) error
	StopConsumers()

	// Accessor methods for testing/monitoring
	Exchanges() map[string]*ExchangeDeclaration
	Queues() map[string]*QueueDeclaration
	Bindings() []*BindingDeclaration
	Publishers() []*PublisherDeclaration
	Consumers() []*ConsumerDeclaration

	// Validation methods
	ValidatePublisher(exchange, routingKey string) bool
	ValidateConsumer(queue string) bool
}

RegistryInterface defines the contract for messaging infrastructure management. This interface allows for easy mocking and testing of messaging infrastructure.

type TenantMessagingResourceSource added in v0.9.0

type TenantMessagingResourceSource interface {
	// BrokerURL returns the AMQP broker URL for the given key.
	// For single-tenant apps, key will be "". For multi-tenant, key will be the tenant ID.
	BrokerURL(ctx context.Context, key string) (string, error)
}

TenantMessagingResourceSource provides per-key AMQP configurations. This interface abstracts where tenant-specific messaging configs come from.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL