Documentation
¶
Index ¶
- Constants
- Variables
- func IsConnectionError(err error) bool
- func IsRetryableError(err error) bool
- type AccessPolicy
- type AdminService
- func (a *AdminService) BindExchange(ctx context.Context, destination, source, routingKey string, args Table) error
- func (a *AdminService) BindQueue(ctx context.Context, queue, exchange, routingKey string, opts ...BindingOption) error
- func (a *AdminService) DeclareClassicQueueWithDLQ(ctx context.Context, name string, opts ...func(*QueueConfig)) (*Queue, error)
- func (a *AdminService) DeclareExchange(ctx context.Context, name string, kind ExchangeType, opts ...ExchangeOption) error
- func (a *AdminService) DeclareHAQueueWithDLQ(ctx context.Context, name string, opts ...func(*QueueConfig)) (*Queue, error)
- func (a *AdminService) DeclareQueue(ctx context.Context, name string, opts ...QueueOption) (*Queue, error)
- func (a *AdminService) DeclareQueueWithDLQ(ctx context.Context, config QueueConfig) (*Queue, error)
- func (a *AdminService) DeclareQuorumQueue(ctx context.Context, name string, opts ...QuorumQueueOption) (*Queue, error)
- func (a *AdminService) DeclareQuorumQueueWithDLQ(ctx context.Context, name string, opts ...func(*QueueConfig)) (*Queue, error)
- func (a *AdminService) DeclareTopology(ctx context.Context, topology *Topology) error
- func (a *AdminService) DeleteExchange(ctx context.Context, name string, opts ...DeleteExchangeOption) error
- func (a *AdminService) DeleteQueue(ctx context.Context, name string, opts ...DeleteQueueOption) error
- func (a *AdminService) ExchangeExists(ctx context.Context, name string) (bool, error)
- func (a *AdminService) ExchangeInfo(ctx context.Context, name string) (*ExchangeInfo, error)
- func (a *AdminService) InspectQueue(ctx context.Context, name string) (*QueueInfo, error)
- func (a *AdminService) PurgeQueue(ctx context.Context, name string) (int, error)
- func (a *AdminService) QueueExists(ctx context.Context, name string) (bool, error)
- func (a *AdminService) QueueInfo(ctx context.Context, name string) (*QueueInfo, error)
- func (a *AdminService) SetupTopology(ctx context.Context, exchanges []ExchangeConfig, queues []QueueConfig, ...) error
- func (a *AdminService) UnbindQueue(ctx context.Context, queue, exchange, routingKey string, opts ...BindingOption) error
- type AuditLogger
- type BindingConfig
- type BindingDeclaration
- type BindingOption
- type Client
- func (c *Client) Admin() *AdminService
- func (c *Client) Close() error
- func (c *Client) ConnectionName() string
- func (c *Client) CreateChannel() (*amqp.Channel, error)
- func (c *Client) NewConsumer(opts ...ConsumerOption) (*Consumer, error)
- func (c *Client) NewPublisher(opts ...PublisherOption) (*Publisher, error)
- func (c *Client) Ping(ctx context.Context) error
- func (c *Client) TopologyRegistry() *TopologyRegistry
- func (c *Client) TopologyValidator() *TopologyValidator
- func (c *Client) URL() string
- type Closable
- type ConnectionPooler
- type ConsumeOption
- type Consumer
- type ConsumerOption
- func WithAutoAck() ConsumerOption
- func WithConcurrency(workers int) ConsumerOption
- func WithConsumerCompression(compressor MessageCompressor) ConsumerOption
- func WithConsumerEncryption(encryptor MessageEncryptor) ConsumerOption
- func WithConsumerRetryPolicy(policy RetryPolicy) ConsumerOption
- func WithConsumerSerialization(serializer MessageSerializer) ConsumerOption
- func WithConsumerTag(tag string) ConsumerOption
- func WithExclusiveConsumer() ConsumerOption
- func WithMessageTimeout(timeout time.Duration) ConsumerOption
- func WithNoLocal() ConsumerOption
- func WithNoWait() ConsumerOption
- func WithPrefetchCount(count int) ConsumerOption
- func WithPrefetchSize(size int) ConsumerOption
- type DeadLetterAfterRetries
- type DeadLetterPolicy
- type DeleteExchangeOption
- type DeleteQueueOption
- type Delivery
- type DeliveryInfo
- type EnvConfig
- type Error
- type ExchangeConfig
- type ExchangeDeclaration
- type ExchangeInfo
- type ExchangeOption
- type ExchangeType
- type ExponentialBackoff
- type ExponentialBackoffPolicy
- type FixedDelay
- type GracefulShutdown
- type LinearBackoff
- type Logger
- type Message
- func (m *Message) Clone() *Message
- func (m *Message) ToAMQPPublishing() amqp.Publishing
- func (m *Message) ToPublishing() amqp.Publishing
- func (m *Message) Validate() error
- func (m *Message) WithAppID(appID string) *Message
- func (m *Message) WithContentType(contentType string) *Message
- func (m *Message) WithCorrelationID(correlationID string) *Message
- func (m *Message) WithExpiration(expiration time.Duration) *Message
- func (m *Message) WithHeader(key string, value any) *Message
- func (m *Message) WithHeaders(headers map[string]any) *Message
- func (m *Message) WithMessageID(id string) *Message
- func (m *Message) WithPersistent() *Message
- func (m *Message) WithPriority(priority uint8) *Message
- func (m *Message) WithReplyTo(replyTo string) *Message
- func (m *Message) WithTimestamp(t time.Time) *Message
- func (m *Message) WithTransient() *Message
- func (m *Message) WithType(messageType string) *Message
- func (m *Message) WithUserID(userID string) *Message
- type MessageCompressor
- type MessageEncryptor
- type MessageHandler
- type MessageSerializer
- type MetricsCollector
- type NoRetryPolicy
- type NopAuditLogger
- func (n *NopAuditLogger) LogConnection(clientID, username, host string)
- func (n *NopAuditLogger) LogConsume(clientID, queue, messageID string)
- func (n *NopAuditLogger) LogPublish(clientID, exchange, routingKey, messageID string)
- func (n *NopAuditLogger) LogTopologyChange(clientID, operation, resource string)
- type NopLogger
- type NopMetrics
- func (n *NopMetrics) RecordConnection(connectionName string)
- func (n *NopMetrics) RecordConnectionAttempt(success bool, duration time.Duration)
- func (n *NopMetrics) RecordConsume(queue string, messageSize int, duration time.Duration)
- func (n *NopMetrics) RecordError(operation string, err error)
- func (n *NopMetrics) RecordHealthCheck(success bool, duration time.Duration)
- func (n *NopMetrics) RecordMessageProcessed(queue string, success bool, duration time.Duration)
- func (n *NopMetrics) RecordMessageReceived(queue string)
- func (n *NopMetrics) RecordMessageRequeued(queue string)
- func (n *NopMetrics) RecordPublish(exchange, routingKey string, messageSize int, duration time.Duration)
- func (n *NopMetrics) RecordPublishConfirmation(success bool, duration time.Duration)
- func (n *NopMetrics) RecordReconnection(attempt int)
- type NopSpan
- type NopTracer
- type Option
- func FromEnv() Option
- func FromEnvWithPrefix(prefix string) Option
- func WithAccessPolicy(policy *AccessPolicy) Option
- func WithAuditLogging(logger AuditLogger) Option
- func WithAutoReconnect(enabled bool) Option
- func WithChannelTimeout(timeout time.Duration) Option
- func WithConnectionName(name string) Option
- func WithConnectionPooler(pooler ConnectionPooler) Option
- func WithCredentials(username, password string) Option
- func WithDialTimeout(timeout time.Duration) Option
- func WithGracefulShutdown(shutdown GracefulShutdown) Option
- func WithHeartbeat(duration time.Duration) Option
- func WithHosts(hosts ...string) Option
- func WithInsecureTLS() Option
- func WithLogger(logger Logger) Option
- func WithMaxReconnectAttempts(attempts int) Option
- func WithMetrics(metrics MetricsCollector) Option
- func WithPerformanceMonitoring(monitor PerformanceMonitor) Option
- func WithReconnectDelay(delay time.Duration) Option
- func WithReconnectPolicy(policy ReconnectPolicy) Option
- func WithSagaOrchestrator(orchestrator SagaOrchestrator) Option
- func WithStreamHandler(handler StreamHandler) Option
- func WithTLS(tlsConfig *tls.Config) Option
- func WithTopologyAutoRecreation() Option
- func WithTopologyBackgroundValidation(interval time.Duration) Option
- func WithTopologyValidation() Option
- func WithTopologyValidationInterval(interval time.Duration) Option
- func WithTracing(tracer Tracer) Option
- func WithVHost(vhost string) Option
- func WithoutTopologyAutoRecreation() Option
- func WithoutTopologyBackgroundValidation() Option
- func WithoutTopologyValidation() Option
- type PerformanceMonitor
- type PoolStats
- type PublishRequest
- type Publisher
- type PublisherOption
- func WithCompression(compressor MessageCompressor) PublisherOption
- func WithCompressionThreshold(threshold int) PublisherOption
- func WithConfirmation(timeout time.Duration) PublisherOption
- func WithDefaultExchange(exchange string) PublisherOption
- func WithEncryption(encryptor MessageEncryptor) PublisherOption
- func WithImmediate() PublisherOption
- func WithMandatory() PublisherOption
- func WithPersistent() PublisherOption
- func WithRetryPolicy(policy RetryPolicy) PublisherOption
- func WithSerializer(serializer MessageSerializer) PublisherOption
- type Queue
- type QueueConfig
- type QueueDeclaration
- type QueueInfo
- type QueueOption
- func WithArguments(args Table) QueueOption
- func WithAutoDelete() QueueOption
- func WithClassicQueue() QueueOption
- func WithDeadLetter(exchange, routingKey string) QueueOption
- func WithDeadLetterTTL(ttl time.Duration) QueueOption
- func WithDeliveryLimit(limit int) QueueOption
- func WithDurable() QueueOption
- func WithExclusiveQueue() QueueOption
- func WithMaxLength(length int64) QueueOption
- func WithMaxLengthBytes(bytes int64) QueueOption
- func WithQuorumGroupSize(size int) QueueOption
- func WithTTL(ttl time.Duration) QueueOption
- func WithoutDLQ() QueueOption
- type QueueType
- type QuorumQueueOption
- type ReconnectPolicy
- type RejectError
- type RetryPolicy
- type SagaOrchestrator
- type SagaStatus
- type SagaStep
- type SagaStepStatus
- type Span
- type SpanStatusCode
- type StreamConfig
- type StreamHandler
- type StreamMessageHandler
- type Table
- type Topology
- type TopologyRegistry
- func (r *TopologyRegistry) Clear()
- func (r *TopologyRegistry) GetBinding(queue, exchange, routingKey string) (BindingConfig, bool)
- func (r *TopologyRegistry) GetExchange(name string) (ExchangeConfig, bool)
- func (r *TopologyRegistry) GetQueue(name string) (QueueConfig, bool)
- func (r *TopologyRegistry) ListBindings() []BindingConfig
- func (r *TopologyRegistry) ListExchanges() []ExchangeConfig
- func (r *TopologyRegistry) ListQueues() []QueueConfig
- func (r *TopologyRegistry) RegisterBinding(config BindingConfig)
- func (r *TopologyRegistry) RegisterExchange(config ExchangeConfig)
- func (r *TopologyRegistry) RegisterQueue(config QueueConfig)
- type TopologyValidator
- func (v *TopologyValidator) Disable()
- func (v *TopologyValidator) Enable()
- func (v *TopologyValidator) EnableAutoRecreate()
- func (v *TopologyValidator) EnableBackgroundValidation(interval time.Duration)
- func (v *TopologyValidator) IsAutoRecreateEnabled() bool
- func (v *TopologyValidator) IsEnabled() bool
- func (v *TopologyValidator) ValidateCompleteTopology() error
- func (v *TopologyValidator) ValidateExchange(name string) error
- func (v *TopologyValidator) ValidateQueue(name string) error
- type Tracer
Constants ¶
const ( ContentTypeJSON = "application/json" ContentTypeText = "text/plain" )
ContentType constants
Variables ¶
var NoRetry = NoRetryPolicy{}
NoRetry is a global instance of NoRetryPolicy
Functions ¶
func IsConnectionError ¶ added in v1.1.2
IsConnectionError checks if an error is connection-related
func IsRetryableError ¶ added in v1.1.2
IsRetryableError checks if an error is retryable
Types ¶
type AccessPolicy ¶ added in v1.1.2
type AccessPolicy struct {
AllowedExchanges []string
AllowedQueues []string
AllowedRoutes []string
ReadOnly bool
}
AccessPolicy defines role-based access control
type AdminService ¶ added in v1.1.2
type AdminService struct {
// contains filtered or unexported fields
}
AdminService handles topology management operations
func (*AdminService) BindExchange ¶ added in v1.1.2
func (a *AdminService) BindExchange(ctx context.Context, destination, source, routingKey string, args Table) error
BindExchange binds an exchange to another exchange
func (*AdminService) BindQueue ¶ added in v1.1.2
func (a *AdminService) BindQueue(ctx context.Context, queue, exchange, routingKey string, opts ...BindingOption) error
BindQueue binds a queue to an exchange with optional binding options
func (*AdminService) DeclareClassicQueueWithDLQ ¶ added in v1.1.3
func (a *AdminService) DeclareClassicQueueWithDLQ(ctx context.Context, name string, opts ...func(*QueueConfig)) (*Queue, error)
DeclareClassicQueueWithDLQ declares a classic queue with automatic DLX/DLQ creation
func (*AdminService) DeclareExchange ¶ added in v1.1.2
func (a *AdminService) DeclareExchange(ctx context.Context, name string, kind ExchangeType, opts ...ExchangeOption) error
DeclareExchange declares an exchange with the given options
func (*AdminService) DeclareHAQueueWithDLQ ¶ added in v1.1.3
func (a *AdminService) DeclareHAQueueWithDLQ(ctx context.Context, name string, opts ...func(*QueueConfig)) (*Queue, error)
DeclareHAQueueWithDLQ declares an HA classic queue with automatic DLX/DLQ creation
func (*AdminService) DeclareQueue ¶ added in v1.1.2
func (a *AdminService) DeclareQueue(ctx context.Context, name string, opts ...QueueOption) (*Queue, error)
DeclareQueue declares a queue with the given options (Quorum queue by default)
func (*AdminService) DeclareQueueWithDLQ ¶ added in v1.1.3
func (a *AdminService) DeclareQueueWithDLQ(ctx context.Context, config QueueConfig) (*Queue, error)
DeclareQueueWithDLQ declares a queue using QueueConfig (simplified - no auto-creation)
func (*AdminService) DeclareQuorumQueue ¶ added in v1.1.2
func (a *AdminService) DeclareQuorumQueue(ctx context.Context, name string, opts ...QuorumQueueOption) (*Queue, error)
DeclareQuorumQueue declares a quorum queue with the given options
func (*AdminService) DeclareQuorumQueueWithDLQ ¶ added in v1.1.3
func (a *AdminService) DeclareQuorumQueueWithDLQ(ctx context.Context, name string, opts ...func(*QueueConfig)) (*Queue, error)
DeclareQuorumQueueWithDLQ declares a quorum queue with automatic DLX/DLQ creation
func (*AdminService) DeclareTopology ¶ added in v1.1.2
func (a *AdminService) DeclareTopology(ctx context.Context, topology *Topology) error
DeclareTopology declares a complete topology from a definition
func (*AdminService) DeleteExchange ¶ added in v1.1.2
func (a *AdminService) DeleteExchange(ctx context.Context, name string, opts ...DeleteExchangeOption) error
DeleteExchange deletes an exchange
func (*AdminService) DeleteQueue ¶ added in v1.1.2
func (a *AdminService) DeleteQueue(ctx context.Context, name string, opts ...DeleteQueueOption) error
DeleteQueue deletes a queue
func (*AdminService) ExchangeExists ¶ added in v1.2.0
ExchangeExists checks if an exchange exists using passive declaration
func (*AdminService) ExchangeInfo ¶ added in v1.1.2
func (a *AdminService) ExchangeInfo(ctx context.Context, name string) (*ExchangeInfo, error)
ExchangeInfo returns detailed information about an exchange
func (*AdminService) InspectQueue ¶ added in v1.1.2
InspectQueue returns detailed information about a queue
func (*AdminService) PurgeQueue ¶ added in v1.1.2
PurgeQueue purges all messages from a queue
func (*AdminService) QueueExists ¶ added in v1.2.0
QueueExists checks if a queue exists using passive declaration
func (*AdminService) QueueInfo ¶ added in v1.1.2
QueueInfo is an alias for InspectQueue to match the API documentation
func (*AdminService) SetupTopology ¶ added in v1.1.2
func (a *AdminService) SetupTopology(ctx context.Context, exchanges []ExchangeConfig, queues []QueueConfig, bindings []BindingConfig) error
SetupTopology sets up complete topology configuration with exchanges, queues, and bindings
func (*AdminService) UnbindQueue ¶ added in v1.1.2
func (a *AdminService) UnbindQueue(ctx context.Context, queue, exchange, routingKey string, opts ...BindingOption) error
UnbindQueue unbinds a queue from an exchange with optional binding options
type AuditLogger ¶ added in v1.1.2
type AuditLogger interface {
LogConnection(clientID, username, host string)
LogPublish(clientID, exchange, routingKey, messageID string)
LogConsume(clientID, queue, messageID string)
LogTopologyChange(clientID, operation, resource string)
}
AuditLogger interface for audit logging
func NewNopAuditLogger ¶ added in v1.1.2
func NewNopAuditLogger() AuditLogger
type BindingConfig ¶
type BindingConfig struct {
QueueName string
ExchangeName string
RoutingKey string
NoWait bool
Arguments map[string]any
}
BindingConfig holds configuration for binding a queue to an exchange
type BindingDeclaration ¶ added in v1.1.2
type BindingDeclaration struct {
Queue string `json:"queue" yaml:"queue"`
Exchange string `json:"exchange" yaml:"exchange"`
RoutingKey string `json:"routing_key" yaml:"routing_key"`
Arguments map[string]any `json:"arguments,omitempty" yaml:"arguments,omitempty"`
}
BindingDeclaration represents a binding declaration
type BindingOption ¶ added in v1.1.2
type BindingOption func(*bindingConfig)
BindingOption represents a functional option for binding configuration
func WithBindingArguments ¶ added in v1.1.2
func WithBindingArguments(args Table) BindingOption
WithBindingArguments sets arguments for the binding
func WithBindingHeaders ¶ added in v1.1.2
func WithBindingHeaders(headers map[string]any) BindingOption
WithBindingHeaders sets headers for the binding
func WithBindingNoWait ¶ added in v1.1.2
func WithBindingNoWait() BindingOption
WithBindingNoWait makes the binding operation not wait for server response
type Client ¶ added in v1.1.2
type Client struct {
// contains filtered or unexported fields
}
Client represents the main RabbitMQ client with unified architecture
func (*Client) ConnectionName ¶ added in v1.1.2
func (*Client) CreateChannel ¶ added in v1.1.2
CreateChannel creates a new AMQP channel for advanced use cases This method is provided for implementing custom messaging patterns such as streams, sagas, or other experimental features
func (*Client) NewConsumer ¶ added in v1.1.2
func (c *Client) NewConsumer(opts ...ConsumerOption) (*Consumer, error)
NewConsumer creates a new consumer from the client
func (*Client) NewPublisher ¶ added in v1.1.2
func (c *Client) NewPublisher(opts ...PublisherOption) (*Publisher, error)
NewPublisher creates a new publisher from the client
func (*Client) TopologyRegistry ¶ added in v1.2.0
func (c *Client) TopologyRegistry() *TopologyRegistry
func (*Client) TopologyValidator ¶ added in v1.2.0
func (c *Client) TopologyValidator() *TopologyValidator
TopologyValidator returns the topology validator for manual validation operations
type Closable ¶
type Closable interface {
Close() error
}
Closable interface for components that can be gracefully closed.
type ConnectionPooler ¶ added in v1.1.2
type ConnectionPooler interface {
Get() (*Client, error)
Close() error
Stats() PoolStats
Size() int
HealthyCount() int
}
ConnectionPooler defines the interface for connection pool management. This is the core contract that all connection pool implementations must satisfy. For concrete implementations, use the pool sub-package.
func NewNopConnectionPooler ¶ added in v1.1.2
func NewNopConnectionPooler() ConnectionPooler
NewNopConnectionPooler creates a new no-op connection pooler. For advanced connection pooling capabilities, use the pool sub-package.
type ConsumeOption ¶ added in v1.1.2
type ConsumeOption func(*consumeConfig)
ConsumeOption represents a functional option for consumption behavior
func WithConsumeRetryPolicy ¶ added in v1.1.2
func WithConsumeRetryPolicy(policy RetryPolicy) ConsumeOption
WithConsumeRetryPolicy sets the retry policy for consumption
func WithDeadLetterPolicy ¶ added in v1.1.2
func WithDeadLetterPolicy(policy DeadLetterPolicy) ConsumeOption
WithDeadLetterPolicy sets the dead letter policy
func WithRejectRequeue ¶ added in v1.1.2
func WithRejectRequeue() ConsumeOption
WithRejectRequeue configures message rejection behavior
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer handles message consumption operations
func (*Consumer) Consume ¶
func (c *Consumer) Consume(ctx context.Context, queue string, handler MessageHandler, opts ...ConsumeOption) error
Consume starts consuming messages from the specified queue with optional settings
type ConsumerOption ¶ added in v1.1.2
type ConsumerOption func(*consumerConfig)
ConsumerOption represents a functional option for consumer configuration
func WithAutoAck ¶ added in v1.1.2
func WithAutoAck() ConsumerOption
WithAutoAck enables automatic acknowledgment
func WithConcurrency ¶ added in v1.1.2
func WithConcurrency(workers int) ConsumerOption
WithConcurrency sets the number of concurrent message processors
func WithConsumerCompression ¶ added in v1.1.2
func WithConsumerCompression(compressor MessageCompressor) ConsumerOption
WithConsumerCompression sets the compression for the consumer (for decompression)
func WithConsumerEncryption ¶ added in v1.1.2
func WithConsumerEncryption(encryptor MessageEncryptor) ConsumerOption
WithConsumerEncryption sets the encryption for the consumer (for decryption)
func WithConsumerRetryPolicy ¶ added in v1.1.2
func WithConsumerRetryPolicy(policy RetryPolicy) ConsumerOption
WithConsumerRetryPolicy sets the retry policy for message processing failures
func WithConsumerSerialization ¶ added in v1.1.2
func WithConsumerSerialization(serializer MessageSerializer) ConsumerOption
WithConsumerSerialization sets the serializer for the consumer (for deserialization)
func WithConsumerTag ¶ added in v1.1.2
func WithConsumerTag(tag string) ConsumerOption
WithConsumerTag sets the consumer tag
func WithExclusiveConsumer ¶ added in v1.1.2
func WithExclusiveConsumer() ConsumerOption
WithExclusiveConsumer makes the consumer exclusive
func WithMessageTimeout ¶ added in v1.1.2
func WithMessageTimeout(timeout time.Duration) ConsumerOption
WithMessageTimeout sets the timeout for processing each message
func WithNoLocal ¶ added in v1.1.2
func WithNoLocal() ConsumerOption
WithNoLocal prevents delivery of messages published on this connection
func WithNoWait ¶ added in v1.1.2
func WithNoWait() ConsumerOption
WithNoWait makes the consume operation not wait for server response
func WithPrefetchCount ¶ added in v1.1.2
func WithPrefetchCount(count int) ConsumerOption
WithPrefetchCount sets the prefetch count
func WithPrefetchSize ¶ added in v1.1.2
func WithPrefetchSize(size int) ConsumerOption
WithPrefetchSize sets the prefetch size in bytes
type DeadLetterAfterRetries ¶ added in v1.1.2
type DeadLetterAfterRetries struct{}
DeadLetterAfterRetries sends messages to DLQ after max retry attempts
func (*DeadLetterAfterRetries) ShouldDeadLetter ¶ added in v1.1.2
func (d *DeadLetterAfterRetries) ShouldDeadLetter(delivery *Delivery, attempts int) bool
type DeadLetterPolicy ¶ added in v1.1.2
DeadLetterPolicy defines what to do with messages after retries are exhausted
type DeleteExchangeOption ¶ added in v1.1.2
type DeleteExchangeOption func(*deleteExchangeConfig)
DeleteExchangeOption represents a functional option for exchange deletion
type DeleteQueueOption ¶ added in v1.1.2
type DeleteQueueOption func(*deleteQueueConfig)
DeleteQueueOption represents a functional option for queue deletion
type Delivery ¶ added in v1.1.2
Delivery wraps amqp.Delivery with additional helper methods
func (*Delivery) GetMessage ¶ added in v1.1.2
GetMessage converts the delivery to a Message struct
func (*Delivery) IsRedelivered ¶ added in v1.1.2
IsRedelivered returns true if the message was redelivered
func (*Delivery) Nack ¶ added in v1.1.2
Nack negatively acknowledges the message with requeue option
type DeliveryInfo ¶
type DeliveryInfo struct {
MessageCount uint32
Exchange string
RoutingKey string
Redelivered bool
DeliveryTag uint64
// Message metadata from AMQP properties
MessageID string
CorrelationID string
ReplyTo string
Type string
AppID string
UserID string
Timestamp time.Time
ContentType string
Priority uint8
Headers map[string]any
}
DeliveryInfo contains information about a delivered message
func ExtractDeliveryInfo ¶
func ExtractDeliveryInfo(delivery *amqp.Delivery) DeliveryInfo
ExtractDeliveryInfo extracts delivery information from an AMQP delivery
type EnvConfig ¶ added in v1.1.0
type EnvConfig struct {
// Connection basics
Username string `env:"RABBITMQ_USERNAME,default=guest"`
Password string `env:"RABBITMQ_PASSWORD,default=guest"`
Hosts []string `env:"RABBITMQ_HOSTS,default=localhost:5672"`
VHost string `env:"RABBITMQ_VHOST,default=/"`
// Protocol and security
Protocol string `env:"RABBITMQ_PROTOCOL,default=amqp"` // amqp or amqps
TLSEnabled bool `env:"RABBITMQ_TLS_ENABLED,default=false"`
TLSInsecure bool `env:"RABBITMQ_TLS_INSECURE,default=false"` // Skip cert verification
// HTTP Management API
HTTPProtocol string `env:"RABBITMQ_HTTP_PROTOCOL,default=http"` // http or https
HTTPPort int `env:"RABBITMQ_HTTP_PORT,default=15672"`
HTTPHost string `env:"RABBITMQ_HTTP_HOST,default=localhost"` // Separate host for HTTP API
// Connection behavior
ConnectionName string `env:"RABBITMQ_CONNECTION_NAME,default=go-rabbitmq"`
Heartbeat time.Duration `env:"RABBITMQ_HEARTBEAT,default=10s"`
RetryAttempts int `env:"RABBITMQ_RETRY_ATTEMPTS,default=5"`
RetryDelay time.Duration `env:"RABBITMQ_RETRY_DELAY,default=2s"`
// Timeouts
DialTimeout time.Duration `env:"RABBITMQ_DIAL_TIMEOUT,default=30s"`
ChannelTimeout time.Duration `env:"RABBITMQ_CHANNEL_TIMEOUT,default=10s"`
// Auto-reconnection
AutoReconnect bool `env:"RABBITMQ_AUTO_RECONNECT,default=true"`
ReconnectDelay time.Duration `env:"RABBITMQ_RECONNECT_DELAY,default=5s"`
MaxReconnectAttempts int `env:"RABBITMQ_MAX_RECONNECT_ATTEMPTS,default=0"` // 0 = unlimited
// Publisher settings
PublisherConfirmationTimeout time.Duration `env:"RABBITMQ_PUBLISHER_CONFIRMATION_TIMEOUT,default=5s"`
PublisherShutdownTimeout time.Duration `env:"RABBITMQ_PUBLISHER_SHUTDOWN_TIMEOUT,default=15s"`
PublisherPersistent bool `env:"RABBITMQ_PUBLISHER_PERSISTENT,default=true"`
// Consumer settings
ConsumerPrefetchCount int `env:"RABBITMQ_CONSUMER_PREFETCH_COUNT,default=1"`
ConsumerAutoAck bool `env:"RABBITMQ_CONSUMER_AUTO_ACK,default=false"`
ConsumerMessageTimeout time.Duration `env:"RABBITMQ_CONSUMER_MESSAGE_TIMEOUT,default=5m"`
ConsumerShutdownTimeout time.Duration `env:"RABBITMQ_CONSUMER_SHUTDOWN_TIMEOUT,default=30s"`
// Topology validation and auto-healing (enabled by default for production reliability)
TopologyValidation bool `env:"RABBITMQ_TOPOLOGY_VALIDATION,default=true"`
TopologyAutoRecreation bool `env:"RABBITMQ_TOPOLOGY_AUTO_RECREATION,default=true"`
TopologyBackgroundValidation bool `env:"RABBITMQ_TOPOLOGY_BACKGROUND_VALIDATION,default=true"`
TopologyValidationInterval time.Duration `env:"RABBITMQ_TOPOLOGY_VALIDATION_INTERVAL,default=30s"`
}
EnvConfig holds all RabbitMQ configuration that can be loaded from environment variables
func (*EnvConfig) BuildAMQPURL ¶ added in v1.1.0
BuildAMQPURL constructs the AMQP connection URL from environment configuration For multiple hosts, returns the first host URL (AMQP library handles failover)
func (*EnvConfig) BuildAMQPURLs ¶ added in v1.1.2
BuildAMQPURLs constructs all AMQP connection URLs for failover support
func (*EnvConfig) BuildHTTPURL ¶ added in v1.1.0
BuildHTTPURL constructs the HTTP management API URL from environment configuration
type Error ¶
Error types for better error handling
func NewConnectionError ¶
NewConnectionError creates a new connection error
func NewConsumeError ¶
NewConsumeError creates a new consume error
func NewPublishError ¶
NewPublishError creates a new publish error
type ExchangeConfig ¶
type ExchangeConfig struct {
Name string
Type ExchangeType
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Arguments map[string]any
}
ExchangeConfig holds configuration for declaring an exchange
type ExchangeDeclaration ¶ added in v1.1.2
type ExchangeDeclaration struct {
Name string `json:"name" yaml:"name"`
Type ExchangeType `json:"type" yaml:"type"`
Durable bool `json:"durable" yaml:"durable"`
AutoDelete bool `json:"auto_delete" yaml:"auto_delete"`
Internal bool `json:"internal" yaml:"internal"`
Arguments map[string]any `json:"arguments,omitempty" yaml:"arguments,omitempty"`
}
ExchangeDeclaration represents an exchange declaration
type ExchangeInfo ¶ added in v1.1.2
type ExchangeInfo struct {
Name string
Type string
Durable bool
AutoDelete bool
Internal bool
Arguments Table
VHost string
}
ExchangeInfo represents detailed exchange information
type ExchangeOption ¶ added in v1.1.2
type ExchangeOption func(*exchangeConfig)
ExchangeOption represents a functional option for exchange configuration
func WithExchangeArguments ¶ added in v1.1.2
func WithExchangeArguments(args Table) ExchangeOption
WithExchangeArguments sets exchange arguments
func WithExchangeAutoDelete ¶ added in v1.1.2
func WithExchangeAutoDelete() ExchangeOption
WithExchangeAutoDelete makes the exchange auto-delete
func WithExchangeDurable ¶ added in v1.1.2
func WithExchangeDurable() ExchangeOption
WithExchangeDurable makes the exchange durable
func WithExchangeInternal ¶ added in v1.1.2
func WithExchangeInternal() ExchangeOption
WithExchangeInternal makes the exchange internal
type ExchangeType ¶
type ExchangeType string
ExchangeType represents different types of exchanges
const ( ExchangeTypeDirect ExchangeType = "direct" ExchangeTypeFanout ExchangeType = "fanout" ExchangeTypeTopic ExchangeType = "topic" ExchangeTypeHeaders ExchangeType = "headers" )
type ExponentialBackoff ¶ added in v1.1.2
type ExponentialBackoff struct {
InitialDelay time.Duration
MaxDelay time.Duration
Multiplier float64
MaxAttempts int
}
ExponentialBackoff implements exponential backoff reconnection policy
func (*ExponentialBackoff) NextDelay ¶ added in v1.1.2
func (e *ExponentialBackoff) NextDelay(attempt int) time.Duration
func (*ExponentialBackoff) ShouldRetry ¶ added in v1.1.2
func (e *ExponentialBackoff) ShouldRetry(attempt int, err error) bool
type ExponentialBackoffPolicy ¶ added in v1.1.2
type ExponentialBackoffPolicy struct {
InitialDelay time.Duration
MaxDelay time.Duration
MaxAttempts int
Multiplier float64
}
ExponentialBackoffPolicy implements exponential backoff retry
func (ExponentialBackoffPolicy) NextDelay ¶ added in v1.1.2
func (e ExponentialBackoffPolicy) NextDelay(attempt int) time.Duration
func (ExponentialBackoffPolicy) ShouldRetry ¶ added in v1.1.2
func (e ExponentialBackoffPolicy) ShouldRetry(attempt int, err error) bool
type FixedDelay ¶ added in v1.1.2
FixedDelay implements fixed delay reconnection policy
func (*FixedDelay) NextDelay ¶ added in v1.1.2
func (f *FixedDelay) NextDelay(attempt int) time.Duration
func (*FixedDelay) ShouldRetry ¶ added in v1.1.2
func (f *FixedDelay) ShouldRetry(attempt int, err error) bool
type GracefulShutdown ¶ added in v1.1.2
type GracefulShutdown interface {
RegisterComponent(component Closable) error
ShutdownGracefully(ctx context.Context) error
SetupSignalHandling() <-chan struct{}
IsShutdownComplete() bool
}
GracefulShutdown defines the interface for graceful shutdown coordination. This is the core contract that all shutdown implementations must satisfy. For concrete implementations, use the shutdown sub-package.
func NewNopGracefulShutdown ¶ added in v1.1.2
func NewNopGracefulShutdown() GracefulShutdown
NewNopGracefulShutdown creates a new no-op graceful shutdown handler. For advanced shutdown coordination capabilities, use the shutdown sub-package.
type LinearBackoff ¶ added in v1.1.2
LinearBackoff implements linear backoff reconnection policy
func (*LinearBackoff) NextDelay ¶ added in v1.1.2
func (l *LinearBackoff) NextDelay(attempt int) time.Duration
func (*LinearBackoff) ShouldRetry ¶ added in v1.1.2
func (l *LinearBackoff) ShouldRetry(attempt int, err error) bool
type Logger ¶ added in v1.1.2
type Logger interface {
// Debug logs a debug message with optional structured fields
Debug(msg string, fields ...any)
// Info logs an informational message with optional structured fields
Info(msg string, fields ...any)
// Warn logs a warning message with optional structured fields
Warn(msg string, fields ...any)
// Error logs an error message with optional structured fields
Error(msg string, fields ...any)
}
Logger interface for structured logging Users can implement this interface to integrate their preferred logging solution.
func NewNopLogger ¶ added in v1.1.2
func NewNopLogger() Logger
NewNopLogger creates a new no-operation logger
type Message ¶
type Message struct {
Body []byte
ContentType string
Headers map[string]any
Exchange string
RoutingKey string
Persistent bool
// Message identification and tracing
MessageID string // Unique message identifier (auto-generated if empty)
CorrelationID string // Correlation ID for request-response patterns
ReplyTo string // Reply queue for RPC patterns
// Message metadata
Type string // Message type/schema identifier
AppID string // Application ID that originated the message
UserID string // User ID (if authenticated)
// Timing and expiration
Timestamp int64 // Unix timestamp when message was created
Expiration string // Message expiration (in milliseconds as string)
// Message priority (0-255, higher = more priority)
Priority uint8
}
Message represents a message with metadata
func FromAMQPDelivery ¶ added in v1.1.2
FromAMQPDelivery creates a Message from amqp.Delivery (for consumer API)
func NewJSONMessage ¶
NewJSONMessage creates a new Message for JSON content by marshaling the provided value
func NewMessage ¶
NewMessage creates a new Message with auto-generated ID and timestamp
func NewMessageWithID ¶
NewMessageWithID creates a new Message with a specific ID
func NewTextMessage ¶
NewTextMessage creates a new Message for plain text content
func (*Message) ToAMQPPublishing ¶ added in v1.1.2
func (m *Message) ToAMQPPublishing() amqp.Publishing
ToAMQPPublishing converts the message to amqp.Publishing for the new API
func (*Message) ToPublishing ¶
func (m *Message) ToPublishing() amqp.Publishing
ToPublishing converts a Message to amqp.Publishing
func (*Message) WithContentType ¶ added in v1.1.2
WithContentType sets the content type for the message
func (*Message) WithCorrelationID ¶
WithCorrelationID sets the correlation ID for request-response patterns
func (*Message) WithExpiration ¶
WithExpiration sets message expiration in duration
func (*Message) WithHeader ¶
WithHeader adds a custom header to the message
func (*Message) WithHeaders ¶
WithHeaders adds multiple custom headers to the message
func (*Message) WithMessageID ¶ added in v1.1.2
WithMessageID sets the message ID
func (*Message) WithPersistent ¶ added in v1.1.2
WithPersistent sets the message to be persistent
func (*Message) WithPriority ¶
WithPriority sets message priority (0-255, higher = more priority)
func (*Message) WithReplyTo ¶
WithReplyTo sets the reply queue for RPC patterns
func (*Message) WithTimestamp ¶ added in v1.1.2
WithTimestamp sets the message timestamp
func (*Message) WithTransient ¶ added in v1.1.2
WithTransient sets the message to be transient (non-persistent)
func (*Message) WithUserID ¶
WithUserID sets the user ID (if authenticated)
type MessageCompressor ¶ added in v1.1.2
type MessageCompressor interface {
Compress(data []byte) ([]byte, error)
Decompress(data []byte) ([]byte, error)
Algorithm() string
Threshold() int // Minimum size to trigger compression
}
MessageCompressor defines the interface for message compression/decompression. This is the core contract that all compression implementations must satisfy. For concrete implementations, use the compression sub-package.
func NewNopCompressor ¶ added in v1.1.2
func NewNopCompressor() MessageCompressor
NewNopCompressor creates a new no-op compressor. For advanced compression capabilities, use the compression sub-package.
type MessageEncryptor ¶ added in v1.1.2
type MessageEncryptor interface {
Encrypt(data []byte) ([]byte, error)
Decrypt(data []byte) ([]byte, error)
Algorithm() string
}
MessageEncryptor defines the interface for message encryption/decryption
func NewNopEncryptor ¶ added in v1.1.2
func NewNopEncryptor() MessageEncryptor
NewNopEncryptor creates a new no-op encryptor. For advanced encryption capabilities, use the encryption sub-package.
type MessageHandler ¶
MessageHandler is the function signature for handling consumed messages
type MessageSerializer ¶ added in v1.1.2
type MessageSerializer interface {
Serialize(msg any) ([]byte, error)
Deserialize(data []byte, target any) error
ContentType() string
}
MessageSerializer defines the interface for message serialization/deserialization. This is the core contract that all serialization implementations must satisfy. For concrete implementations, use the protobuf sub-package or other serialization packages.
func NewNopSerializer ¶ added in v1.1.2
func NewNopSerializer() MessageSerializer
NewNopSerializer creates a new no-op serializer. For advanced serialization capabilities, use the protobuf or other serialization sub-packages.
type MetricsCollector ¶ added in v1.1.2
type MetricsCollector interface {
// Connection metrics
RecordConnection(connectionName string)
RecordConnectionAttempt(success bool, duration time.Duration)
RecordReconnection(attempt int)
// Publishing metrics
RecordPublish(exchange, routingKey string, messageSize int, duration time.Duration)
RecordPublishConfirmation(success bool, duration time.Duration)
// Consumption metrics
RecordConsume(queue string, messageSize int, duration time.Duration)
RecordMessageReceived(queue string)
RecordMessageProcessed(queue string, success bool, duration time.Duration)
RecordMessageRequeued(queue string)
// Health and error metrics
RecordHealthCheck(success bool, duration time.Duration)
RecordError(operation string, err error)
}
MetricsCollector interface for collecting metrics
func NewNopMetrics ¶ added in v1.1.2
func NewNopMetrics() MetricsCollector
type NoRetryPolicy ¶ added in v1.1.2
type NoRetryPolicy struct{}
NoRetryPolicy never retries operations
func (NoRetryPolicy) NextDelay ¶ added in v1.1.2
func (n NoRetryPolicy) NextDelay(attempt int) time.Duration
func (NoRetryPolicy) ShouldRetry ¶ added in v1.1.2
func (n NoRetryPolicy) ShouldRetry(attempt int, err error) bool
type NopAuditLogger ¶ added in v1.1.2
type NopAuditLogger struct{}
NopAuditLogger is a no-operation audit logger
func (*NopAuditLogger) LogConnection ¶ added in v1.1.2
func (n *NopAuditLogger) LogConnection(clientID, username, host string)
func (*NopAuditLogger) LogConsume ¶ added in v1.1.2
func (n *NopAuditLogger) LogConsume(clientID, queue, messageID string)
func (*NopAuditLogger) LogPublish ¶ added in v1.1.2
func (n *NopAuditLogger) LogPublish(clientID, exchange, routingKey, messageID string)
func (*NopAuditLogger) LogTopologyChange ¶ added in v1.1.2
func (n *NopAuditLogger) LogTopologyChange(clientID, operation, resource string)
type NopLogger ¶ added in v1.1.2
type NopLogger struct{}
NopLogger is a no-operation logger that produces no output. This is used as the default logger when no logger is provided.
type NopMetrics ¶ added in v1.1.2
type NopMetrics struct{}
NopMetrics is a no-operation metrics collector
func (*NopMetrics) RecordConnection ¶ added in v1.1.2
func (n *NopMetrics) RecordConnection(connectionName string)
func (*NopMetrics) RecordConnectionAttempt ¶ added in v1.1.2
func (n *NopMetrics) RecordConnectionAttempt(success bool, duration time.Duration)
func (*NopMetrics) RecordConsume ¶ added in v1.1.2
func (n *NopMetrics) RecordConsume(queue string, messageSize int, duration time.Duration)
func (*NopMetrics) RecordError ¶ added in v1.1.2
func (n *NopMetrics) RecordError(operation string, err error)
func (*NopMetrics) RecordHealthCheck ¶ added in v1.1.2
func (n *NopMetrics) RecordHealthCheck(success bool, duration time.Duration)
func (*NopMetrics) RecordMessageProcessed ¶ added in v1.1.2
func (n *NopMetrics) RecordMessageProcessed(queue string, success bool, duration time.Duration)
func (*NopMetrics) RecordMessageReceived ¶ added in v1.1.2
func (n *NopMetrics) RecordMessageReceived(queue string)
func (*NopMetrics) RecordMessageRequeued ¶ added in v1.1.2
func (n *NopMetrics) RecordMessageRequeued(queue string)
func (*NopMetrics) RecordPublish ¶ added in v1.1.2
func (n *NopMetrics) RecordPublish(exchange, routingKey string, messageSize int, duration time.Duration)
func (*NopMetrics) RecordPublishConfirmation ¶ added in v1.1.2
func (n *NopMetrics) RecordPublishConfirmation(success bool, duration time.Duration)
func (*NopMetrics) RecordReconnection ¶ added in v1.1.2
func (n *NopMetrics) RecordReconnection(attempt int)
type NopSpan ¶ added in v1.1.2
type NopSpan struct{}
NopSpan is a no-operation span
func (*NopSpan) SetAttribute ¶ added in v1.1.2
func (*NopSpan) SetStatus ¶ added in v1.1.2
func (n *NopSpan) SetStatus(code SpanStatusCode, description string)
type Option ¶ added in v1.1.2
type Option func(*clientConfig) error
Option represents a functional option for configuring the Client
func FromEnv ¶ added in v1.1.2
func FromEnv() Option
FromEnv creates a client option that loads configuration from environment variables
func FromEnvWithPrefix ¶ added in v1.1.2
FromEnvWithPrefix creates a client option that loads configuration from environment variables with a custom prefix
func WithAccessPolicy ¶ added in v1.1.2
func WithAccessPolicy(policy *AccessPolicy) Option
WithAccessPolicy sets the access control policy
func WithAuditLogging ¶ added in v1.1.2
func WithAuditLogging(logger AuditLogger) Option
WithAuditLogging sets the audit logger
func WithAutoReconnect ¶ added in v1.1.2
WithAutoReconnect enables or disables automatic reconnection
func WithChannelTimeout ¶ added in v1.1.2
WithChannelTimeout sets the channel operation timeout
func WithConnectionName ¶ added in v1.1.2
WithConnectionName sets the connection name
func WithConnectionPooler ¶ added in v1.1.2
func WithConnectionPooler(pooler ConnectionPooler) Option
WithConnectionPooler sets the connection pooler
func WithCredentials ¶ added in v1.1.2
WithCredentials sets the username and password
func WithDialTimeout ¶ added in v1.1.2
WithDialTimeout sets the connection dial timeout
func WithGracefulShutdown ¶ added in v1.1.2
func WithGracefulShutdown(shutdown GracefulShutdown) Option
WithGracefulShutdown sets the graceful shutdown handler
func WithHeartbeat ¶ added in v1.1.2
WithHeartbeat sets the heartbeat interval
func WithHosts ¶ added in v1.1.2
WithHosts sets multiple RabbitMQ hosts for failover support Each host should include port (e.g., "host1:5672,host2:5673") If no port is specified, defaults to 5672
func WithInsecureTLS ¶ added in v1.1.2
func WithInsecureTLS() Option
WithInsecureTLS enables TLS with certificate verification disabled
func WithMaxReconnectAttempts ¶ added in v1.1.2
WithMaxReconnectAttempts sets the maximum number of reconnection attempts
func WithMetrics ¶ added in v1.1.2
func WithMetrics(metrics MetricsCollector) Option
WithMetrics sets the metrics collector
func WithPerformanceMonitoring ¶ added in v1.1.2
func WithPerformanceMonitoring(monitor PerformanceMonitor) Option
WithPerformanceMonitoring sets the performance monitor
func WithReconnectDelay ¶ added in v1.1.2
WithReconnectDelay sets the delay between reconnection attempts
func WithReconnectPolicy ¶ added in v1.1.2
func WithReconnectPolicy(policy ReconnectPolicy) Option
WithReconnectPolicy sets the reconnection policy
func WithSagaOrchestrator ¶ added in v1.1.2
func WithSagaOrchestrator(orchestrator SagaOrchestrator) Option
WithSagaOrchestrator sets the saga orchestrator
func WithStreamHandler ¶ added in v1.1.2
func WithStreamHandler(handler StreamHandler) Option
WithStreamHandler sets the stream handler
func WithTopologyAutoRecreation ¶ added in v1.2.0
func WithTopologyAutoRecreation() Option
WithTopologyAutoRecreation enables automatic recreation of missing topology Requires TopologyValidation to be enabled
func WithTopologyBackgroundValidation ¶ added in v1.2.0
WithTopologyBackgroundValidation enables background topology validation with a custom interval. Background validation is enabled by default (30s interval). Requires TopologyValidation to be enabled
func WithTopologyValidation ¶ added in v1.2.0
func WithTopologyValidation() Option
WithTopologyValidation enables topology validation When enabled, the client will track declared topology and validate it before operations
func WithTopologyValidationInterval ¶ added in v1.2.0
WithTopologyValidationInterval sets the background validation interval This is a convenience method that automatically enables background validation
func WithTracing ¶ added in v1.1.2
WithTracing sets the tracer
func WithoutTopologyAutoRecreation ¶ added in v1.2.0
func WithoutTopologyAutoRecreation() Option
WithoutTopologyAutoRecreation disables automatic recreation while keeping validation Topology will be validated but not automatically recreated if missing
func WithoutTopologyBackgroundValidation ¶ added in v1.2.0
func WithoutTopologyBackgroundValidation() Option
WithoutTopologyBackgroundValidation disables background validation while keeping on-demand validation Topology will be validated before operations but not periodically validated in the background
func WithoutTopologyValidation ¶ added in v1.2.0
func WithoutTopologyValidation() Option
WithoutTopologyValidation disables topology validation This disables all topology tracking, validation, and auto-recreation
type PerformanceMonitor ¶ added in v1.1.2
type PerformanceMonitor interface {
// RecordPublish records a publish operation with its success status and duration
RecordPublish(success bool, duration time.Duration)
// RecordConsume records a consume operation with its success status and duration
RecordConsume(success bool, duration time.Duration)
}
PerformanceMonitor provides performance monitoring capabilities for RabbitMQ operations. This is the core contract that all performance monitoring implementations must satisfy. For concrete implementations, use the performance sub-package.
func NewNopPerformanceMonitor ¶ added in v1.1.2
func NewNopPerformanceMonitor() PerformanceMonitor
NewNopPerformanceMonitor creates a new no-op performance monitor. For advanced monitoring capabilities, use the performance sub-package.
type PoolStats ¶ added in v1.1.2
type PoolStats struct {
TotalConnections int64
HealthyConnections int64
FailedConnections int64
RepairAttempts int64
LastHealthCheck time.Time
}
PoolStats provides statistics about a connection pool.
type PublishRequest ¶ added in v1.1.2
PublishRequest represents a single publish operation
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher handles message publishing operations
func (*Publisher) Publish ¶
func (p *Publisher) Publish(ctx context.Context, exchange, routingKey string, message *Message) error
Publish publishes a message to the specified exchange and routing key
func (*Publisher) PublishBatch ¶ added in v1.1.2
func (p *Publisher) PublishBatch(ctx context.Context, messages []PublishRequest) error
PublishBatch publishes multiple messages in a batch
type PublisherOption ¶ added in v1.1.2
type PublisherOption func(*publisherConfig)
PublisherOption represents a functional option for publisher configuration
func WithCompression ¶ added in v1.1.2
func WithCompression(compressor MessageCompressor) PublisherOption
WithCompression sets the message compressor for the publisher
func WithCompressionThreshold ¶ added in v1.1.2
func WithCompressionThreshold(threshold int) PublisherOption
WithCompressionThreshold sets the compression threshold for the publisher
func WithConfirmation ¶ added in v1.1.2
func WithConfirmation(timeout time.Duration) PublisherOption
WithConfirmation enables publish confirmations for the publisher. The provided timeout is used to wait for each acknowledgement. This makes the publisher reliable but slower as each Publish call will block until the broker confirms the message or the timeout expires.
func WithDefaultExchange ¶ added in v1.1.2
func WithDefaultExchange(exchange string) PublisherOption
WithDefaultExchange sets the default exchange for publishing
func WithEncryption ¶ added in v1.1.2
func WithEncryption(encryptor MessageEncryptor) PublisherOption
WithEncryption sets the message encryptor for the publisher
func WithImmediate ¶ added in v1.1.2
func WithImmediate() PublisherOption
WithImmediate enables immediate publishing
func WithMandatory ¶ added in v1.1.2
func WithMandatory() PublisherOption
WithMandatory enables mandatory publishing
func WithPersistent ¶ added in v1.1.2
func WithPersistent() PublisherOption
WithPersistent makes all messages persistent by default
func WithRetryPolicy ¶ added in v1.1.2
func WithRetryPolicy(policy RetryPolicy) PublisherOption
WithRetryPolicy sets the retry policy for failed publishes
func WithSerializer ¶ added in v1.1.2
func WithSerializer(serializer MessageSerializer) PublisherOption
WithSerializer sets the message serializer for the publisher
type QueueConfig ¶
type QueueConfig struct {
Name string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Arguments map[string]any
// Cluster-aware settings
QueueType QueueType // Queue type: classic, quorum, stream
HighAvailability bool // Enable HA for classic queues
ReplicationFactor int // Replication factor for quorum queues (default: 3)
MaxLength int // Maximum queue length (0 = unlimited)
MaxLengthBytes int // Maximum queue size in bytes (0 = unlimited)
MessageTTL int // Message TTL in milliseconds (0 = no TTL)
DeadLetterExchange string // Dead letter exchange name
DeadLetterRoutingKey string // Dead letter routing key
}
QueueConfig holds configuration for declaring a queue
func DefaultClassicQueueConfig ¶
func DefaultClassicQueueConfig(name string) QueueConfig
DefaultClassicQueueConfig returns a basic durable classic queue configuration
func DefaultHAQueueConfig ¶
func DefaultHAQueueConfig(name string) QueueConfig
DefaultHAQueueConfig returns a production-ready HA classic queue configuration
func DefaultQuorumQueueConfig ¶
func DefaultQuorumQueueConfig(name string) QueueConfig
DefaultQuorumQueueConfig returns a production-ready quorum queue configuration
func (*QueueConfig) ToArguments ¶
func (q *QueueConfig) ToArguments() map[string]any
ToArguments converts the QueueConfig to RabbitMQ queue arguments
func (*QueueConfig) WithCustomDeadLetter ¶
func (q *QueueConfig) WithCustomDeadLetter(dlxName, routingKey string) *QueueConfig
WithCustomDeadLetter configures a custom dead letter exchange
func (*QueueConfig) WithoutDeadLetter ¶
func (q *QueueConfig) WithoutDeadLetter() *QueueConfig
WithoutDeadLetter clears any dead letter configuration
type QueueDeclaration ¶ added in v1.1.2
type QueueDeclaration struct {
Name string `json:"name" yaml:"name"`
Durable bool `json:"durable" yaml:"durable"`
AutoDelete bool `json:"auto_delete" yaml:"auto_delete"`
Exclusive bool `json:"exclusive" yaml:"exclusive"`
Arguments map[string]any `json:"arguments,omitempty" yaml:"arguments,omitempty"`
TTL time.Duration `json:"ttl,omitempty" yaml:"ttl,omitempty"`
DeadLetter string `json:"dead_letter,omitempty" yaml:"dead_letter,omitempty"`
}
QueueDeclaration represents a queue declaration
type QueueInfo ¶ added in v1.1.2
type QueueInfo struct {
Name string
Messages int
Consumers int
Memory int64
Durable bool
AutoDelete bool
Exclusive bool
Arguments Table
VHost string
Node string
}
QueueInfo represents detailed queue information
type QueueOption ¶ added in v1.1.2
type QueueOption func(*queueConfig)
QueueOption represents a functional option for queue configuration
func WithArguments ¶ added in v1.1.2
func WithArguments(args Table) QueueOption
WithArguments sets custom queue arguments
func WithAutoDelete ¶ added in v1.1.2
func WithAutoDelete() QueueOption
WithAutoDelete makes the queue auto-delete when no longer used
func WithClassicQueue ¶ added in v1.1.3
func WithClassicQueue() QueueOption
WithClassicQueue forces the queue to be a classic queue instead of the default quorum queue
func WithDeadLetter ¶ added in v1.1.2
func WithDeadLetter(exchange, routingKey string) QueueOption
WithDeadLetter configures dead letter exchange and routing key
func WithDeadLetterTTL ¶ added in v1.1.2
func WithDeadLetterTTL(ttl time.Duration) QueueOption
WithDeadLetterTTL sets the TTL for messages in dead letter queue
func WithDeliveryLimit ¶ added in v1.1.3
func WithDeliveryLimit(limit int) QueueOption
WithDeliveryLimit sets the delivery limit for quorum queues (when using DeclareQueue with default quorum)
func WithDurable ¶ added in v1.1.2
func WithDurable() QueueOption
WithDurable makes the queue durable
func WithExclusiveQueue ¶ added in v1.1.2
func WithExclusiveQueue() QueueOption
WithExclusiveQueue makes the queue exclusive to this connection
func WithMaxLength ¶ added in v1.1.2
func WithMaxLength(length int64) QueueOption
WithMaxLength sets the maximum queue length
func WithMaxLengthBytes ¶ added in v1.1.2
func WithMaxLengthBytes(bytes int64) QueueOption
WithMaxLengthBytes sets the maximum queue size in bytes
func WithQuorumGroupSize ¶ added in v1.1.3
func WithQuorumGroupSize(size int) QueueOption
WithQuorumGroupSize sets the initial group size for quorum queues (when using DeclareQueue with default quorum)
func WithTTL ¶ added in v1.1.2
func WithTTL(ttl time.Duration) QueueOption
WithTTL sets the message TTL for the queue
func WithoutDLQ ¶ added in v1.1.3
func WithoutDLQ() QueueOption
WithoutDLQ clears any dead letter configuration
type QuorumQueueOption ¶ added in v1.1.2
type QuorumQueueOption func(*quorumQueueConfig)
QuorumQueueOption represents a functional option for quorum queue configuration
func WithInitialGroupSize ¶ added in v1.1.2
func WithInitialGroupSize(size int) QuorumQueueOption
WithInitialGroupSize sets the initial group size for quorum queues
func WithQuorumDeadLetter ¶ added in v1.2.0
func WithQuorumDeadLetter(exchange, routingKey string) QuorumQueueOption
WithQuorumDeadLetter sets the dead letter exchange and routing key for quorum queues
func WithQuorumDeliveryLimit ¶ added in v1.1.2
func WithQuorumDeliveryLimit(limit int) QuorumQueueOption
WithQuorumDeliveryLimit sets the delivery limit for quorum queues
func WithoutQuorumDLQ ¶ added in v1.1.3
func WithoutQuorumDLQ() QuorumQueueOption
WithoutQuorumDLQ clears any dead letter configuration for quorum queues
type ReconnectPolicy ¶ added in v1.1.2
type ReconnectPolicy interface {
ShouldRetry(attempt int, err error) bool
NextDelay(attempt int) time.Duration
}
ReconnectPolicy defines the interface for reconnection strategies
func DefaultReconnectPolicy ¶ added in v1.1.2
func DefaultReconnectPolicy() ReconnectPolicy
DefaultReconnectPolicy returns a sensible default reconnection policy
func ProductionReconnectPolicy ¶ added in v1.1.2
func ProductionReconnectPolicy() ReconnectPolicy
ProductionReconnectPolicy returns a production-ready reconnection policy
func TestingReconnectPolicy ¶ added in v1.1.2
func TestingReconnectPolicy() ReconnectPolicy
TestingReconnectPolicy returns a fast reconnection policy for testing
type RejectError ¶ added in v1.1.2
RejectError allows controlling message rejection behavior
func (*RejectError) Error ¶ added in v1.1.2
func (r *RejectError) Error() string
func (*RejectError) Unwrap ¶ added in v1.1.2
func (r *RejectError) Unwrap() error
type RetryPolicy ¶ added in v1.1.2
type RetryPolicy interface {
ShouldRetry(attempt int, err error) bool
NextDelay(attempt int) time.Duration
}
RetryPolicy defines retry behavior for operations
type SagaOrchestrator ¶ added in v1.1.2
type SagaOrchestrator interface {
StartSaga(ctx context.Context, sagaName string, steps []SagaStep, context map[string]any) (string, error)
CompensateSaga(ctx context.Context, sagaID string) error
GetSagaStatus(ctx context.Context, sagaID string) (SagaStatus, error)
}
SagaOrchestrator defines the interface for saga pattern orchestration. This is the core contract that all saga implementations must satisfy. For concrete implementations, use the saga sub-package.
func NewNopSagaOrchestrator ¶ added in v1.1.2
func NewNopSagaOrchestrator() SagaOrchestrator
NewNopSagaOrchestrator creates a new no-op saga orchestrator. For advanced saga orchestration capabilities, use the saga sub-package.
type SagaStatus ¶ added in v1.1.2
type SagaStatus struct {
ID string
State string
Steps []SagaStepStatus
}
SagaStatus represents the current status of a saga.
type SagaStepStatus ¶ added in v1.1.2
type SagaStepStatus struct {
Name string
State string
Error string
Output map[string]any
Timestamp time.Time
}
SagaStepStatus represents the status of a saga step.
type Span ¶ added in v1.1.2
type Span interface {
SetAttribute(key string, value any)
SetStatus(code SpanStatusCode, description string)
End()
}
Span interface for tracing spans
type SpanStatusCode ¶ added in v1.1.2
type SpanStatusCode int
SpanStatusCode represents the status of a span
const ( SpanStatusUnset SpanStatusCode = iota SpanStatusOK SpanStatusError )
type StreamConfig ¶ added in v1.1.2
type StreamConfig struct {
MaxAge time.Duration
MaxLengthMessages int
MaxLengthBytes int
MaxSegmentSizeBytes int
InitialClusterSize int
}
StreamConfig holds configuration for stream creation.
type StreamHandler ¶ added in v1.1.2
type StreamHandler interface {
PublishToStream(ctx context.Context, streamName string, message *Message) error
ConsumeFromStream(ctx context.Context, streamName string, handler StreamMessageHandler) error
CreateStream(ctx context.Context, streamName string, config StreamConfig) error
DeleteStream(ctx context.Context, streamName string) error
}
StreamHandler defines the interface for RabbitMQ streams operations. This is the core contract that all stream implementations must satisfy. For concrete implementations, use the streams sub-package.
func NewNopStreamHandler ¶ added in v1.1.2
func NewNopStreamHandler() StreamHandler
NewNopStreamHandler creates a new no-op stream handler. For advanced streaming capabilities, use the streams sub-package.
type StreamMessageHandler ¶ added in v1.1.2
StreamMessageHandler defines the function signature for processing stream messages
type Topology ¶ added in v1.1.2
type Topology struct {
Exchanges []ExchangeDeclaration `json:"exchanges,omitempty" yaml:"exchanges,omitempty"`
Queues []QueueDeclaration `json:"queues,omitempty" yaml:"queues,omitempty"`
Bindings []BindingDeclaration `json:"bindings,omitempty" yaml:"bindings,omitempty"`
}
Topology represents a complete topology definition
type TopologyRegistry ¶ added in v1.2.0
type TopologyRegistry struct {
// contains filtered or unexported fields
}
TopologyRegistry tracks declared topology for validation and auto-recreation
func NewTopologyRegistry ¶ added in v1.2.0
func NewTopologyRegistry() *TopologyRegistry
NewTopologyRegistry creates a new topology registry
func (*TopologyRegistry) Clear ¶ added in v1.2.0
func (r *TopologyRegistry) Clear()
Clear removes all registered topology
func (*TopologyRegistry) GetBinding ¶ added in v1.2.0
func (r *TopologyRegistry) GetBinding(queue, exchange, routingKey string) (BindingConfig, bool)
GetBinding retrieves a registered binding configuration
func (*TopologyRegistry) GetExchange ¶ added in v1.2.0
func (r *TopologyRegistry) GetExchange(name string) (ExchangeConfig, bool)
GetExchange retrieves a registered exchange configuration
func (*TopologyRegistry) GetQueue ¶ added in v1.2.0
func (r *TopologyRegistry) GetQueue(name string) (QueueConfig, bool)
GetQueue retrieves a registered queue configuration
func (*TopologyRegistry) ListBindings ¶ added in v1.2.0
func (r *TopologyRegistry) ListBindings() []BindingConfig
ListBindings returns all registered bindings
func (*TopologyRegistry) ListExchanges ¶ added in v1.2.0
func (r *TopologyRegistry) ListExchanges() []ExchangeConfig
ListExchanges returns all registered exchanges
func (*TopologyRegistry) ListQueues ¶ added in v1.2.0
func (r *TopologyRegistry) ListQueues() []QueueConfig
ListQueues returns all registered queues
func (*TopologyRegistry) RegisterBinding ¶ added in v1.2.0
func (r *TopologyRegistry) RegisterBinding(config BindingConfig)
RegisterBinding records a binding declaration for tracking
func (*TopologyRegistry) RegisterExchange ¶ added in v1.2.0
func (r *TopologyRegistry) RegisterExchange(config ExchangeConfig)
RegisterExchange records an exchange declaration for tracking
func (*TopologyRegistry) RegisterQueue ¶ added in v1.2.0
func (r *TopologyRegistry) RegisterQueue(config QueueConfig)
RegisterQueue records a queue declaration for tracking
type TopologyValidator ¶ added in v1.2.0
type TopologyValidator struct {
// contains filtered or unexported fields
}
TopologyValidator handles topology validation and auto-recreation
func NewTopologyValidator ¶ added in v1.2.0
func NewTopologyValidator(client *Client, admin *AdminService, registry *TopologyRegistry) *TopologyValidator
NewTopologyValidator creates a new topology validator
func (*TopologyValidator) Disable ¶ added in v1.2.0
func (v *TopologyValidator) Disable()
Disable disables topology validation
func (*TopologyValidator) Enable ¶ added in v1.2.0
func (v *TopologyValidator) Enable()
Enable enables topology validation
func (*TopologyValidator) EnableAutoRecreate ¶ added in v1.2.0
func (v *TopologyValidator) EnableAutoRecreate()
EnableAutoRecreate enables automatic recreation of missing topology
func (*TopologyValidator) EnableBackgroundValidation ¶ added in v1.2.0
func (v *TopologyValidator) EnableBackgroundValidation(interval time.Duration)
EnableBackgroundValidation starts background topology validation
func (*TopologyValidator) IsAutoRecreateEnabled ¶ added in v1.2.0
func (v *TopologyValidator) IsAutoRecreateEnabled() bool
IsAutoRecreateEnabled returns whether auto-recreation is enabled
func (*TopologyValidator) IsEnabled ¶ added in v1.2.0
func (v *TopologyValidator) IsEnabled() bool
IsEnabled returns whether topology validation is enabled
func (*TopologyValidator) ValidateCompleteTopology ¶ added in v1.2.5
func (v *TopologyValidator) ValidateCompleteTopology() error
ValidateCompleteTopology performs a comprehensive validation of all registered topology This method can be called on-demand to ensure entire topology is consistent
func (*TopologyValidator) ValidateExchange ¶ added in v1.2.0
func (v *TopologyValidator) ValidateExchange(name string) error
ValidateExchange checks if an exchange exists and optionally recreates it
func (*TopologyValidator) ValidateQueue ¶ added in v1.2.0
func (v *TopologyValidator) ValidateQueue(name string) error
ValidateQueue checks if a queue exists and optionally recreates it
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package compression provides message compression/decompression functionality for go-rabbitmq.
|
Package compression provides message compression/decompression functionality for go-rabbitmq. |
|
Package encryption provides message encryption/decryption capabilities for the go-rabbitmq library.
|
Package encryption provides message encryption/decryption capabilities for the go-rabbitmq library. |
|
examples
|
|
|
advanced-performance
command
|
|
|
compression-features
command
|
|
|
encryption-features
command
|
|
|
pool-features
command
|
|
|
production-defaults
command
|
|
|
protobuf-features
command
|
|
|
streams-unified
command
|
|
|
topology-features
command
|
|
|
Package performance provides detailed performance monitoring and metrics collection for RabbitMQ operations.
|
Package performance provides detailed performance monitoring and metrics collection for RabbitMQ operations. |
|
Package pool provides connection pooling functionality for high-throughput RabbitMQ applications.
|
Package pool provides connection pooling functionality for high-throughput RabbitMQ applications. |
|
Package protobuf provides support for Protocol Buffer message serialization and routing for the RabbitMQ library.
|
Package protobuf provides support for Protocol Buffer message serialization and routing for the RabbitMQ library. |
|
Package saga provides distributed transaction support using the Saga pattern.
|
Package saga provides distributed transaction support using the Saga pattern. |
|
Package shutdown provides coordinated graceful shutdown management for RabbitMQ components and other closable resources.
|
Package shutdown provides coordinated graceful shutdown management for RabbitMQ components and other closable resources. |
|
Package streams provides RabbitMQ streams functionality for high-throughput scenarios.
|
Package streams provides RabbitMQ streams functionality for high-throughput scenarios. |