Documentation
¶
Overview ¶
Package messaging provides a unified interface for message queue operations. It abstracts the underlying messaging implementation to allow for easy testing and future extensibility.
Index ¶
- func StartConsumeSpan(ctx context.Context, delivery *amqp.Delivery, queueName string) (context.Context, trace.Span)
- type AMQPClient
- type AMQPClientImpl
- func (c *AMQPClientImpl) BindQueue(queue, exchange, routingKey string, noWait bool) error
- func (c *AMQPClientImpl) Close() error
- func (c *AMQPClientImpl) Consume(ctx context.Context, destination string) (<-chan amqp.Delivery, error)
- func (c *AMQPClientImpl) ConsumeFromQueue(_ context.Context, options ConsumeOptions) (<-chan amqp.Delivery, error)
- func (c *AMQPClientImpl) DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool) error
- func (c *AMQPClientImpl) DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool) error
- func (c *AMQPClientImpl) IsReady() bool
- func (c *AMQPClientImpl) Publish(ctx context.Context, destination string, data []byte) error
- func (c *AMQPClientImpl) PublishToExchange(ctx context.Context, options PublishOptions, data []byte) error
- type BindingDeclaration
- type Client
- type ClientFactory
- type ConsumeOptions
- type ConsumerDeclaration
- type ConsumerOptions
- type DeclarationStats
- type Declarations
- func (d *Declarations) Clone() *Declarations
- func (d *Declarations) Consumers() []*ConsumerDeclaration
- func (d *Declarations) DeclareBinding(queue, exchange, routingKey string) *BindingDeclaration
- func (d *Declarations) DeclareConsumer(opts *ConsumerOptions, queue *QueueDeclaration) *ConsumerDeclaration
- func (d *Declarations) DeclarePublisher(opts *PublisherOptions, exchange *ExchangeDeclaration) *PublisherDeclaration
- func (d *Declarations) DeclareQueue(name string) *QueueDeclaration
- func (d *Declarations) DeclareTopicExchange(name string) *ExchangeDeclaration
- func (d *Declarations) Hash() uint64
- func (d *Declarations) RegisterBinding(b *BindingDeclaration)
- func (d *Declarations) RegisterConsumer(c *ConsumerDeclaration)
- func (d *Declarations) RegisterExchange(e *ExchangeDeclaration)
- func (d *Declarations) RegisterPublisher(p *PublisherDeclaration)
- func (d *Declarations) RegisterQueue(q *QueueDeclaration)
- func (d *Declarations) ReplayToRegistry(reg RegistryInterface) error
- func (d *Declarations) Stats() DeclarationStats
- func (d *Declarations) Validate() error
- type ExchangeDeclaration
- type Manager
- func (m *Manager) Close() error
- func (m *Manager) EnsureConsumers(ctx context.Context, key string, decls *Declarations) error
- func (m *Manager) Publisher(ctx context.Context, key string) (AMQPClient, error)
- func (m *Manager) StartCleanup(interval time.Duration)
- func (m *Manager) Stats() map[string]any
- func (m *Manager) StopCleanup()
- type ManagerOptions
- type MessageHandler
- type PublishOptions
- type PublisherDeclaration
- type PublisherOptions
- type QueueDeclaration
- type Registry
- func (r *Registry) Bindings() []*BindingDeclaration
- func (r *Registry) Consumers() []*ConsumerDeclaration
- func (r *Registry) DeclareInfrastructure(ctx context.Context) error
- func (r *Registry) Exchanges() map[string]*ExchangeDeclaration
- func (r *Registry) Publishers() []*PublisherDeclaration
- func (r *Registry) Queues() map[string]*QueueDeclaration
- func (r *Registry) RegisterBinding(declaration *BindingDeclaration)
- func (r *Registry) RegisterConsumer(declaration *ConsumerDeclaration)
- func (r *Registry) RegisterExchange(declaration *ExchangeDeclaration)
- func (r *Registry) RegisterPublisher(declaration *PublisherDeclaration)
- func (r *Registry) RegisterQueue(declaration *QueueDeclaration)
- func (r *Registry) StartConsumers(ctx context.Context) error
- func (r *Registry) StopConsumers()
- func (r *Registry) ValidateConsumer(queue string) bool
- func (r *Registry) ValidatePublisher(exchange, routingKey string) bool
- type RegistryInterface
- type TenantMessagingResourceSource
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func StartConsumeSpan ¶ added in v0.12.0
func StartConsumeSpan(ctx context.Context, delivery *amqp.Delivery, queueName string) (context.Context, trace.Span)
StartConsumeSpan creates an OpenTelemetry span for message consumption. It extracts the trace context from the delivery headers and creates a child span. This should be called by consumers when processing messages. The returned context should be used for downstream operations, and the span must be ended when done.
Types ¶
type AMQPClient ¶
type AMQPClient interface {
Client
// PublishToExchange publishes a message to a specific exchange with routing key.
PublishToExchange(ctx context.Context, options PublishOptions, data []byte) error
// ConsumeFromQueue consumes messages from a queue with specific options.
ConsumeFromQueue(ctx context.Context, options ConsumeOptions) (<-chan amqp.Delivery, error)
// DeclareQueue declares a queue with the given parameters.
DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool) error
// DeclareExchange declares an exchange with the given parameters.
DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool) error
// BindQueue binds a queue to an exchange with a routing key.
BindQueue(queue, exchange, routingKey string, noWait bool) error
}
AMQPClient extends the basic Client interface with AMQP-specific functionality. This allows for more advanced AMQP features while maintaining the simple interface.
type AMQPClientImpl ¶
type AMQPClientImpl struct {
// contains filtered or unexported fields
}
AMQPClientImpl provides an AMQP implementation of the messaging client interface. It includes automatic reconnection, retry logic, and AMQP-specific features.
func NewAMQPClient ¶
func NewAMQPClient(brokerURL string, log logger.Logger) *AMQPClientImpl
NewAMQPClient creates a new AMQP client instance. It automatically attempts to connect to the broker and handles reconnections.
func (*AMQPClientImpl) BindQueue ¶
func (c *AMQPClientImpl) BindQueue(queue, exchange, routingKey string, noWait bool) error
BindQueue binds a queue to an exchange with a routing key.
func (*AMQPClientImpl) Close ¶
func (c *AMQPClientImpl) Close() error
Close gracefully shuts down the AMQP client.
func (*AMQPClientImpl) Consume ¶
func (c *AMQPClientImpl) Consume(ctx context.Context, destination string) (<-chan amqp.Delivery, error)
Consume starts consuming messages from the specified destination (queue name).
func (*AMQPClientImpl) ConsumeFromQueue ¶
func (c *AMQPClientImpl) ConsumeFromQueue(_ context.Context, options ConsumeOptions) (<-chan amqp.Delivery, error)
ConsumeFromQueue consumes messages from a queue with specific options.
func (*AMQPClientImpl) DeclareExchange ¶
func (c *AMQPClientImpl) DeclareExchange(name, kind string, durable, autoDelete, internal, noWait bool) error
DeclareExchange declares an exchange with the given parameters.
func (*AMQPClientImpl) DeclareQueue ¶
func (c *AMQPClientImpl) DeclareQueue(name string, durable, autoDelete, exclusive, noWait bool) error
DeclareQueue declares a queue with the given parameters.
func (*AMQPClientImpl) IsReady ¶
func (c *AMQPClientImpl) IsReady() bool
IsReady returns true if the client is connected and ready to send/receive messages.
func (*AMQPClientImpl) Publish ¶
Publish sends a message to the specified destination (queue name). Uses default exchange ("") and destination as routing key.
func (*AMQPClientImpl) PublishToExchange ¶
func (c *AMQPClientImpl) PublishToExchange(ctx context.Context, options PublishOptions, data []byte) error
PublishToExchange publishes a message to a specific exchange with routing key.
type BindingDeclaration ¶
type BindingDeclaration struct {
Queue string // Queue name
Exchange string // Exchange name
RoutingKey string // Routing key pattern
NoWait bool // Do not wait for server confirmation
Args map[string]any // Additional arguments
}
BindingDeclaration defines a queue-to-exchange binding
func NewBinding ¶ added in v0.14.1
func NewBinding(queue, exchange, routingKey string) *BindingDeclaration
NewBinding creates a binding declaration between a queue and exchange.
Parameters:
- queue: Queue name to bind
- exchange: Exchange name to bind to
- routingKey: Routing key pattern (e.g., "order.*", "user.created")
type Client ¶
type Client interface {
// Publish sends a message to the specified destination.
// destination can be a queue name, exchange, or topic depending on the implementation.
// Returns an error if the publish operation fails.
Publish(ctx context.Context, destination string, data []byte) error
// Consume starts consuming messages from the specified destination.
// Returns a channel that delivers messages and an error if consumption setup fails.
// Messages should be acknowledged by the consumer.
Consume(ctx context.Context, destination string) (<-chan amqp.Delivery, error)
// Close gracefully shuts down the messaging client.
// It should clean up all connections and resources.
Close() error
// IsReady returns true if the client is connected and ready to send/receive messages.
IsReady() bool
}
Client defines the interface for messaging operations. It provides a simple API for publishing and consuming messages while hiding the complexity of connection management, retries, and protocol-specific details.
type ClientFactory ¶ added in v0.9.0
type ClientFactory func(string, logger.Logger) AMQPClient
ClientFactory creates AMQP clients from URLs
type ConsumeOptions ¶
type ConsumeOptions struct {
Queue string // Queue name to consume from
Consumer string // Consumer tag
AutoAck bool // Auto-acknowledge messages
Exclusive bool // Exclusive consumer
NoLocal bool // No-local flag
NoWait bool // No-wait flag
PrefetchCount int // RabbitMQ prefetch count (0 or negative = default to 1)
}
ConsumeOptions contains options for consuming messages with AMQP-specific features.
type ConsumerDeclaration ¶
type ConsumerDeclaration struct {
Queue string // Queue to consume from
Consumer string // Consumer tag
AutoAck bool // Automatically acknowledge messages
Exclusive bool // Exclusive consumer
NoLocal bool // Do not deliver to the connection that published
NoWait bool // Do not wait for server confirmation
EventType string // Event type identifier
Description string // Human-readable description
Handler MessageHandler // Message handler (optional for documentation-only declarations)
Workers int // Number of concurrent workers (0 = auto-scale to NumCPU*4, >0 = explicit)
PrefetchCount int // RabbitMQ prefetch count (0 = auto-scale to Workers*10, capped at 500)
}
ConsumerDeclaration defines what a module consumes and how to handle messages
func NewConsumer ¶ added in v0.14.1
func NewConsumer(opts *ConsumerOptions) *ConsumerDeclaration
NewConsumer creates a consumer declaration from options.
type ConsumerOptions ¶ added in v0.14.1
type ConsumerOptions struct {
Queue string // Queue name to consume from
Consumer string // Consumer tag
EventType string // Event type identifier
Description string // Human-readable description
Handler MessageHandler // Message handler (optional for documentation-only declarations)
AutoAck bool // Automatically acknowledge messages (default: false)
Exclusive bool // Exclusive consumer (default: false)
NoLocal bool // Don't deliver to the connection that published (default: false)
Workers int // Number of concurrent workers (0 = auto-scale to NumCPU*4, >0 = explicit)
PrefetchCount int // RabbitMQ prefetch count (0 = auto-scale to Workers*10, capped at 500)
}
ConsumerOptions contains configuration for creating a consumer declaration.
type DeclarationStats ¶ added in v0.9.0
Stats returns statistics about the declarations.
type Declarations ¶ added in v0.9.0
type Declarations struct {
Exchanges map[string]*ExchangeDeclaration
Queues map[string]*QueueDeclaration
Bindings []*BindingDeclaration
Publishers []*PublisherDeclaration
// contains filtered or unexported fields
}
Declarations stores messaging infrastructure declarations made by modules at startup. This is a pure data structure (no client dependencies) that can be validated once and replayed to multiple per-tenant registries.
func NewDeclarations ¶ added in v0.9.0
func NewDeclarations() *Declarations
NewDeclarations creates a new empty declarations store.
func (*Declarations) Clone ¶ added in v0.9.0
func (d *Declarations) Clone() *Declarations
Clone creates a deep copy of the declarations. This is useful for creating per-tenant copies during replay.
func (*Declarations) Consumers ¶ added in v0.9.0
func (d *Declarations) Consumers() []*ConsumerDeclaration
Consumers returns all consumer declarations in registration order. Used internally by ReplayToRegistry and for observability/metrics.
func (*Declarations) DeclareBinding ¶ added in v0.14.1
func (d *Declarations) DeclareBinding(queue, exchange, routingKey string) *BindingDeclaration
DeclareBinding creates and registers a binding in one step. Returns the created binding declaration for reference.
func (*Declarations) DeclareConsumer ¶ added in v0.14.1
func (d *Declarations) DeclareConsumer(opts *ConsumerOptions, queue *QueueDeclaration) *ConsumerDeclaration
DeclareConsumer creates and registers a consumer in one step.
If queue is non-nil and not already registered, it will be automatically registered. This hybrid approach allows consumers to optionally declare their dependencies.
Usage:
- Pass nil if queue is already registered separately
- Pass queue declaration to auto-register (convenience for simple cases)
func (*Declarations) DeclarePublisher ¶ added in v0.14.1
func (d *Declarations) DeclarePublisher(opts *PublisherOptions, exchange *ExchangeDeclaration) *PublisherDeclaration
DeclarePublisher creates and registers a publisher in one step.
If exchange is non-nil and not already registered, it will be automatically registered. This hybrid approach allows publishers to optionally declare their dependencies.
Usage:
- Pass nil if exchange is already registered separately
- Pass exchange declaration to auto-register (convenience for simple cases)
func (*Declarations) DeclareQueue ¶ added in v0.14.1
func (d *Declarations) DeclareQueue(name string) *QueueDeclaration
DeclareQueue creates and registers a queue in one step. Returns the created queue declaration for reference.
func (*Declarations) DeclareTopicExchange ¶ added in v0.14.1
func (d *Declarations) DeclareTopicExchange(name string) *ExchangeDeclaration
DeclareTopicExchange creates and registers a topic exchange in one step. Returns the created exchange declaration for reference.
func (*Declarations) Hash ¶ added in v0.15.0
func (d *Declarations) Hash() uint64
Hash generates a deterministic hash of all declarations. This is used by Manager to detect duplicate replay attempts (idempotency). The hash is stable across multiple calls and does not include handler functions.
func (*Declarations) RegisterBinding ¶ added in v0.9.0
func (d *Declarations) RegisterBinding(b *BindingDeclaration)
RegisterBinding adds a binding declaration to the store.
func (*Declarations) RegisterConsumer ¶ added in v0.9.0
func (d *Declarations) RegisterConsumer(c *ConsumerDeclaration)
RegisterConsumer adds a consumer declaration to the store. Panics if a consumer with the same queue+consumer+event_type already exists.
func (*Declarations) RegisterExchange ¶ added in v0.9.0
func (d *Declarations) RegisterExchange(e *ExchangeDeclaration)
RegisterExchange adds an exchange declaration to the store.
func (*Declarations) RegisterPublisher ¶ added in v0.9.0
func (d *Declarations) RegisterPublisher(p *PublisherDeclaration)
RegisterPublisher adds a publisher declaration to the store.
func (*Declarations) RegisterQueue ¶ added in v0.9.0
func (d *Declarations) RegisterQueue(q *QueueDeclaration)
RegisterQueue adds a queue declaration to the store.
func (*Declarations) ReplayToRegistry ¶ added in v0.9.0
func (d *Declarations) ReplayToRegistry(reg RegistryInterface) error
ReplayToRegistry applies all declarations to a runtime registry. The order is important: exchanges first, then queues, then bindings, then publishers/consumers.
func (*Declarations) Stats ¶ added in v0.9.0
func (d *Declarations) Stats() DeclarationStats
Stats returns counts of each declaration type.
func (*Declarations) Validate ¶ added in v0.9.0
func (d *Declarations) Validate() error
Validate checks the integrity of all declarations. It ensures that references between declarations are valid.
type ExchangeDeclaration ¶
type ExchangeDeclaration struct {
Name string // Exchange name
Type string // Exchange type (direct, topic, fanout, headers)
Durable bool // Survive server restart
AutoDelete bool // Delete when no longer used
Internal bool // Internal exchange
NoWait bool // Do not wait for server confirmation
Args map[string]any // Additional arguments
}
ExchangeDeclaration defines an exchange to be declared
func NewTopicExchange ¶ added in v0.14.1
func NewTopicExchange(name string) *ExchangeDeclaration
NewTopicExchange creates a topic exchange with production-safe defaults. Topic exchanges route messages based on routing key patterns (e.g., "order.*", "user.#").
Production defaults:
- Durable: true (survives broker restart)
- AutoDelete: false (won't delete when unused)
- Internal: false (can be published to directly)
- NoWait: false (waits for broker confirmation)
type Manager ¶ added in v0.9.0
type Manager struct {
// contains filtered or unexported fields
}
Manager manages AMQP clients by string keys with different lifecycle strategies. Publishers are cached with idle eviction (can be recreated easily). Consumers are long-lived (must stay alive to receive messages). The manager is key-agnostic - it doesn't know about tenants, just manages named clients.
func NewMessagingManager ¶ added in v0.9.0
func NewMessagingManager(resourceSource TenantMessagingResourceSource, log logger.Logger, opts ManagerOptions, clientFactory ClientFactory) *Manager
NewMessagingManager creates a new messaging manager
func (*Manager) EnsureConsumers ¶ added in v0.9.0
EnsureConsumers creates and starts consumers for the given key using the provided declarations. This should be called once per key to set up long-lived consumers. Subsequent calls for the same key are idempotent.
func (*Manager) Publisher ¶ added in v0.19.0
Publisher returns a publisher client for the given key. Publishers are cached with LRU eviction and lazy initialization.
func (*Manager) StartCleanup ¶ added in v0.9.0
StartCleanup starts the background cleanup routine for idle publishers
func (*Manager) StopCleanup ¶ added in v0.9.0
func (m *Manager) StopCleanup()
StopCleanup stops the background cleanup routine
type ManagerOptions ¶ added in v0.9.0
type ManagerOptions struct {
MaxPublishers int // Maximum number of publisher clients to keep cached
IdleTTL time.Duration // Time after which idle publishers are evicted
}
ManagerOptions configures the MessagingManager
type MessageHandler ¶
type MessageHandler interface {
// Handle processes a message and returns an error if processing fails.
// If an error is returned, the message will be negatively acknowledged (nack).
// If no error is returned, the message will be acknowledged (ack).
// The delivery is passed by pointer for performance reasons.
Handle(ctx context.Context, delivery *amqp.Delivery) error
// EventType returns the event type this handler can process.
// This is used for routing messages to the correct handler.
EventType() string
}
MessageHandler defines the interface for processing consumed messages. Handlers should implement this interface to process specific message types.
type PublishOptions ¶
type PublishOptions struct {
Exchange string // AMQP exchange name
RoutingKey string // AMQP routing key
Headers map[string]any // Message headers
Mandatory bool // AMQP mandatory flag
Immediate bool // AMQP immediate flag
}
PublishOptions contains options for publishing messages with AMQP-specific features.
type PublisherDeclaration ¶
type PublisherDeclaration struct {
Exchange string // Target exchange
RoutingKey string // Default routing key
EventType string // Event type identifier
Description string // Human-readable description
Mandatory bool // Message must be routed to a queue
Immediate bool // Message must be delivered immediately
Headers map[string]any // Default headers
}
PublisherDeclaration defines what a module publishes
func NewPublisher ¶ added in v0.14.1
func NewPublisher(opts *PublisherOptions) *PublisherDeclaration
NewPublisher creates a publisher declaration from options. If Headers is nil, an empty map is created.
type PublisherOptions ¶ added in v0.14.1
type PublisherOptions struct {
Exchange string // Target exchange name
RoutingKey string // Routing key for messages
EventType string // Event type identifier
Description string // Human-readable description
Headers map[string]any // Default headers (optional)
Mandatory bool // Message must be routed to a queue (default: false)
Immediate bool // Message must be delivered immediately (default: false)
}
PublisherOptions contains configuration for creating a publisher declaration.
type QueueDeclaration ¶
type QueueDeclaration struct {
Name string // Queue name
Durable bool // Survive server restart
AutoDelete bool // Delete when no consumers
Exclusive bool // Only accessible by declaring connection
NoWait bool // Do not wait for server confirmation
Args map[string]any // Additional arguments
}
QueueDeclaration defines a queue to be declared
func NewQueue ¶ added in v0.14.1
func NewQueue(name string) *QueueDeclaration
NewQueue creates a queue with production-safe defaults.
Production defaults:
- Durable: true (survives broker restart)
- AutoDelete: false (won't delete when consumers disconnect)
- Exclusive: false (can be accessed by multiple connections)
- NoWait: false (waits for broker confirmation)
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry manages messaging infrastructure declarations across modules. It ensures queues, exchanges, and bindings are properly declared before use. It also manages consumer lifecycle and handles message routing to handlers.
func NewRegistry ¶
func NewRegistry(client AMQPClient, log logger.Logger) *Registry
NewRegistry creates a new messaging registry
func (*Registry) Bindings ¶ added in v0.19.0
func (r *Registry) Bindings() []*BindingDeclaration
Bindings returns all registered bindings (for testing/monitoring)
func (*Registry) Consumers ¶ added in v0.19.0
func (r *Registry) Consumers() []*ConsumerDeclaration
Consumers returns all registered consumers (for documentation/monitoring)
func (*Registry) DeclareInfrastructure ¶
DeclareInfrastructure declares all registered messaging infrastructure
func (*Registry) Exchanges ¶ added in v0.19.0
func (r *Registry) Exchanges() map[string]*ExchangeDeclaration
Exchanges returns all registered exchanges (for testing/monitoring)
func (*Registry) Publishers ¶ added in v0.19.0
func (r *Registry) Publishers() []*PublisherDeclaration
Publishers returns all registered publishers (for documentation/monitoring)
func (*Registry) Queues ¶ added in v0.19.0
func (r *Registry) Queues() map[string]*QueueDeclaration
Queues returns all registered queues (for testing/monitoring)
func (*Registry) RegisterBinding ¶
func (r *Registry) RegisterBinding(declaration *BindingDeclaration)
RegisterBinding registers a binding for declaration
func (*Registry) RegisterConsumer ¶
func (r *Registry) RegisterConsumer(declaration *ConsumerDeclaration)
RegisterConsumer registers a consumer declaration
func (*Registry) RegisterExchange ¶
func (r *Registry) RegisterExchange(declaration *ExchangeDeclaration)
RegisterExchange registers an exchange for declaration
func (*Registry) RegisterPublisher ¶
func (r *Registry) RegisterPublisher(declaration *PublisherDeclaration)
RegisterPublisher registers a publisher declaration
func (*Registry) RegisterQueue ¶
func (r *Registry) RegisterQueue(declaration *QueueDeclaration)
RegisterQueue registers a queue for declaration
func (*Registry) StartConsumers ¶
StartConsumers starts all registered consumers with handlers. This should be called after DeclareInfrastructure and before starting the main application.
func (*Registry) StopConsumers ¶
func (r *Registry) StopConsumers()
StopConsumers gracefully stops all running consumers.
func (*Registry) ValidateConsumer ¶
ValidateConsumer checks if a consumer is registered for the given queue
func (*Registry) ValidatePublisher ¶
ValidatePublisher checks if a publisher is registered for the given exchange/routing key
type RegistryInterface ¶ added in v0.8.0
type RegistryInterface interface {
// Registration methods
RegisterExchange(declaration *ExchangeDeclaration)
RegisterQueue(declaration *QueueDeclaration)
RegisterBinding(declaration *BindingDeclaration)
RegisterPublisher(declaration *PublisherDeclaration)
RegisterConsumer(declaration *ConsumerDeclaration)
// Infrastructure lifecycle
DeclareInfrastructure(ctx context.Context) error
StartConsumers(ctx context.Context) error
StopConsumers()
// Accessor methods for testing/monitoring
Exchanges() map[string]*ExchangeDeclaration
Queues() map[string]*QueueDeclaration
Bindings() []*BindingDeclaration
Publishers() []*PublisherDeclaration
Consumers() []*ConsumerDeclaration
// Validation methods
ValidatePublisher(exchange, routingKey string) bool
ValidateConsumer(queue string) bool
}
RegistryInterface defines the contract for messaging infrastructure management. This interface allows for easy mocking and testing of messaging infrastructure.
type TenantMessagingResourceSource ¶ added in v0.9.0
type TenantMessagingResourceSource interface {
// BrokerURL returns the AMQP broker URL for the given key.
// For single-tenant apps, key will be "". For multi-tenant, key will be the tenant ID.
BrokerURL(ctx context.Context, key string) (string, error)
}
TenantMessagingResourceSource provides per-key AMQP configurations. This interface abstracts where tenant-specific messaging configs come from.