rabbitmq

package module
v1.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2025 License: MIT Imports: 12 Imported by: 0

README

Go RabbitMQ

Home  /

 

A modular, production-ready Go library for RabbitMQ with pluggable architecture. Built on the contract-implementation pattern, where core interfaces live in the root package and concrete implementations are provided by specialized sub-packages. This design enables maximum flexibility, testability, and extensibility for enterprise messaging solutions.

 

Go Reference Go Tests Go Report Card GitHub Tag License

 

Table of Contents

🔝 back to top

 

Requirements

  • Go 1.24+ (recommended)
  • RabbitMQ 4.0+ (recommended)

🔝 back to top

 

Installation

go get github.com/cloudresty/go-rabbitmq

🔝 back to top

 

Pluggable Sub-Packages

Each sub-package implements core interfaces defined in the root package, enabling you to mix and match features as needed:

Package Purpose Key Features
compression/ Message compression Gzip, Zlib with configurable thresholds
encryption/ Message encryption AES-256-GCM with secure key management
pool/ Connection pooling Round-robin, health monitoring, auto-repair
performance/ Metrics & monitoring Latency tracking, rate monitoring, statistics
saga/ Distributed transactions Orchestration engine, compensation, atomic state
streams/ RabbitMQ Streams High-throughput, durable, ordered messaging
shutdown/ Graceful shutdown Signal handling, resource cleanup, timeouts
protobuf/ Protocol Buffers Type-safe serialization, message routing

🔝 back to top

 

Key Features

Contract-Implementation Architecture
  • Core Interfaces: All contracts defined in the root package
  • Pluggable Implementations: Concrete implementations in specialized sub-packages
  • Mix & Match: Combine any features - encryption + compression + pooling + streams
  • Testing: Easy mocking of interfaces for comprehensive unit testing

🔝 back to top

 

Production-Ready Features
  • Topology Auto-Healing: Automatic topology validation and recreation enabled by default
  • Connection Pooling: Distribute load across multiple connections with health monitoring
  • Message Encryption: AES-256-GCM encryption with secure key management
  • Compression: Gzip/Zlib compression with configurable thresholds
  • Saga Pattern: Distributed transaction orchestration with automatic compensation
  • Streams Support: High-throughput RabbitMQ Streams for event sourcing
  • Performance Monitoring: Latency tracking, rate monitoring, comprehensive metrics

🔝 back to top

 

Developer Experience
  • ULID Message IDs: 6x faster than UUIDs, database-optimized, lexicographically sortable
  • Auto-Reconnection: Intelligent retry with configurable exponential backoff
  • Graceful Shutdown: Signal handling with proper resource cleanup and timeouts
  • Comprehensive Documentation: Each sub-package has detailed README with examples

🔝 back to top

 

Simple Queue Configuration

This library provides a straightforward approach to queue configuration with user control over topology:

 

Quorum Queues by Default
  • High Availability: Built-in replication across cluster nodes
  • Data Safety: No message loss during node failures
  • Poison Message Protection: Automatic delivery limits prevent infinite redelivery loops
  • Better Performance: Optimized for throughput in clustered environments

🔝 back to top

 

Dead Letter Configuration
  • Manual Configuration: Full control over dead letter exchange and routing configuration
  • Flexible Setup: Configure dead letter handling exactly as needed for your topology
  • Error Handling: Failed messages routed according to your dead letter configuration

🔝 back to top

 

Topology Auto-Healing (Enabled by Default)
  • Automatic Validation: Every publish/consume operation validates topology exists
  • Auto-Recreation: Missing exchanges, queues, and bindings are automatically recreated
  • Background Monitoring: Periodic validation every 30 seconds (default, customizable)
  • Zero Configuration: Works out of the box - no setup required

🔝 back to top

 

Easy Customization
// Default: Auto-healing quorum queue (production-ready!)
client, _ := rabbitmq.NewClient(rabbitmq.FromEnv())
admin := client.Admin()
queue, _ := admin.DeclareQueue(ctx, "orders")  // Automatically tracked & protected

// Custom quorum settings
queue, _ := admin.DeclareQueue(ctx, "payments",
    rabbitmq.WithQuorumGroupSize(5),       // Custom cluster size
    rabbitmq.WithDeliveryLimit(3),         // Max retry attempts
)

// With dead letter configuration  
queue, _ := admin.DeclareQueue(ctx, "processing",
    rabbitmq.WithDeadLetter("errors.dlx", "failed"), // Manual DLX setup
)

// Advanced users can opt-out if needed
client, _ := rabbitmq.NewClient(
    rabbitmq.FromEnv(),
    rabbitmq.WithoutTopologyValidation(),            // Disable auto-healing
    rabbitmq.WithoutTopologyAutoRecreation(),        // Keep validation, disable auto-recreation
    rabbitmq.WithoutTopologyBackgroundValidation(),  // Disable background monitoring only
)

// Or customize the background validation interval
client, _ := rabbitmq.NewClient(
    rabbitmq.FromEnv(),
    rabbitmq.WithTopologyValidationInterval(10*time.Second), // Custom interval
)

// Legacy compatibility (opt-in)
queue, _ := admin.DeclareQueue(ctx, "legacy",
    rabbitmq.WithClassicQueue(),           // Classic queue type
)

Benefits: Get enterprise-grade reliability and availability with simple, user-controlled configuration.

🔝 back to top

 

Quick Start

package main

import (
    "context"
    "log"

    "github.com/cloudresty/go-rabbitmq"
)

func main() {
    // Create a basic client (topology auto-healing enabled by default!)
    client, err := rabbitmq.NewClient(
        rabbitmq.WithHosts("localhost:5672"),
        rabbitmq.WithCredentials("guest", "guest"),
        rabbitmq.WithConnectionName("my-service"),
    )
    if err != nil {
        log.Fatal("Failed to create client:", err)
    }
    defer client.Close()

    // Declare a queue - automatically tracked and protected from deletion!
    admin := client.Admin()
    queue, err := admin.DeclareQueue(context.Background(), "user-events")
    if err != nil {
        log.Fatal("Failed to declare queue:", err)
    }
    log.Printf("Created auto-healing production queue: %s", queue.Name)

    // Create a publisher
    publisher, err := client.NewPublisher(
        rabbitmq.WithDefaultExchange("events"),
    )
    if err != nil {
        log.Fatal("Failed to create publisher:", err)
    }
    defer publisher.Close()

    // Publish a message with auto-generated ULID
    message := rabbitmq.NewMessage([]byte(`{"event": "user_signup", "user_id": 123}`))
    err = publisher.Publish(context.Background(), "events", "user.created", message)
    if err != nil {
        log.Fatal("Failed to publish:", err)
    }

    // Create a consumer
    consumer, err := client.NewConsumer()
    if err != nil {
        log.Fatal("Failed to create consumer:", err)
    }
    defer consumer.Close()

    // Consume messages
    err = consumer.Consume(context.Background(), "user-events", func(ctx context.Context, delivery *rabbitmq.Delivery) error {
        log.Printf("Received: %s (ID: %s)", delivery.Body, delivery.MessageId)
        return nil
    })
    if err != nil {
        log.Fatal("Failed to consume:", err)
    }
}

🔝 back to top

 

Production Usage

Environment-Based Configuration
# Set environment variables for deployment
export RABBITMQ_HOST=rabbitmq.production.com
export RABBITMQ_USERNAME=myservice
export RABBITMQ_PASSWORD=securepassword
export RABBITMQ_VHOST=/production
export RABBITMQ_CONNECTION_NAME=order-service

🔝 back to top

 

High-Availability Setup
// Production-ready client with HA
client, err := rabbitmq.NewClient(
    rabbitmq.WithHosts("rabbit1:5672", "rabbit2:5672", "rabbit3:5672"),
    rabbitmq.WithCredentials("user", "pass"),
    rabbitmq.WithVHost("/production"),
    rabbitmq.WithConnectionName("order-service"),
    rabbitmq.WithReconnectPolicy(&rabbitmq.ExponentialBackoff{
        InitialDelay: 1 * time.Second,
        MaxDelay:     30 * time.Second,
        Multiplier:   2.0,
        MaxAttempts:  10,
    }),
)

// Connection pooling for high throughput
pool, err := pool.New(20,
    pool.WithClientOptions(/* client options */),
    pool.WithHealthCheck(30 * time.Second),
    pool.WithAutoRepair(true),
)

// Performance monitoring
monitor := performance.NewMonitor()

// Graceful shutdown management
shutdownManager := shutdown.NewManager()
shutdownManager.RegisterComponents(publisher, consumer, pool)
shutdownManager.SetupSignalHandler()

🔝 back to top

 

Advanced Examples

Production Defaults Demo

See examples/production-defaults/ for a comprehensive demonstration of the production-ready defaults including quorum queues, dead letter configuration, and customization options.

Topology Auto-Healing Demo

See examples/topology-features/ for a comprehensive demonstration of topology validation, auto-recreation, environment configuration, and opt-out options.

Complete Feature Integration
package main

import (
    "context"
    "log"

    "github.com/cloudresty/go-rabbitmq"
    "github.com/cloudresty/go-rabbitmq/compression"
    "github.com/cloudresty/go-rabbitmq/encryption"
    "github.com/cloudresty/go-rabbitmq/pool"
    "github.com/cloudresty/go-rabbitmq/performance"
    "github.com/cloudresty/go-rabbitmq/shutdown"
)

func main() {
    // Create connection pool for high-throughput
    connectionPool, err := pool.New(10,
        pool.WithClientOptions(
            rabbitmq.WithHosts("localhost:5672"),
            rabbitmq.WithCredentials("guest", "guest"),
        ),
        pool.WithHealthCheck(30*time.Second),
        pool.WithAutoRepair(true),
    )
    if err != nil {
        log.Fatal("Failed to create pool:", err)
    }
    defer connectionPool.Close()

    // Get client from pool
    client, err := connectionPool.Get()
    if err != nil {
        log.Fatal("No healthy connections:", err)
    }

    // Create pluggable features
    compressor := compression.NewGzip()
    encryptor, _ := encryption.NewAESGCM([]byte("your-32-byte-encryption-key-here"))
    monitor := performance.NewMonitor()

    // Create publisher with all features
    publisher, err := client.NewPublisher(
        rabbitmq.WithCompression(compressor),
        rabbitmq.WithEncryption(encryptor),
        rabbitmq.WithCompressionThreshold(100),
        rabbitmq.WithConfirmation(5*time.Second),
    )
    if err != nil {
        log.Fatal("Failed to create publisher:", err)
    }
    defer publisher.Close()

    // Publish with monitoring
    message := rabbitmq.NewMessage([]byte(`{"large": "payload with lots of data..."}`))

    start := time.Now()
    err = publisher.Publish(context.Background(), "events", "data.processed", message)
    duration := time.Since(start)

    // Record performance metrics
    monitor.RecordPublish(err == nil, duration)

    // Setup graceful shutdown
    shutdownManager := shutdown.NewManager()
    shutdownManager.RegisterComponents(publisher, consumer, connectionPool)

    // Handle shutdown signals
    shutdownManager.SetupSignalHandler()
    shutdownManager.Wait() // Blocks until SIGINT/SIGTERM
}

🔝 back to top

 

Saga Pattern for Distributed Transactions
package main

import (
    "context"
    "log"

    "github.com/cloudresty/go-rabbitmq"
    "github.com/cloudresty/go-rabbitmq/saga"
)

func main() {
    client, _ := rabbitmq.NewClient(rabbitmq.WithHosts("localhost:5672"))
    store := saga.NewInMemoryStore()

    // Define step and compensation handlers
    stepHandlers := map[string]saga.StepHandler{
        "create_order":    createOrderHandler,
        "reserve_inventory": reserveInventoryHandler,
        "charge_payment":  chargePaymentHandler,
    }

    compensationHandlers := map[string]saga.CompensationHandler{
        "cancel_order":      cancelOrderHandler,
        "release_inventory": releaseInventoryHandler,
        "refund_payment":    refundPaymentHandler,
    }

    // Create saga manager with orchestration engine
    manager, err := saga.NewManager(client, store, saga.Config{
        SagaExchange:         "sagas",
        StepQueue:           "saga.steps",
        CompensateQueue:     "saga.compensate",
        StepHandlers:        stepHandlers,
        CompensationHandlers: compensationHandlers,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer manager.Close()

    // Start orchestration engine
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        if err := manager.Run(ctx); err != nil {
            log.Printf("Orchestration engine error: %v", err)
        }
    }()

    // Define distributed transaction steps
    steps := []saga.Step{
        {Name: "create_order", Action: "create_order", Compensation: "cancel_order"},
        {Name: "reserve_inventory", Action: "reserve_inventory", Compensation: "release_inventory"},
        {Name: "charge_payment", Action: "charge_payment", Compensation: "refund_payment"},
    }

    // Start saga (engine will orchestrate automatically)
    s, err := manager.Start(ctx, "order_processing", steps, map[string]any{
        "customer_id": "cust-123",
        "order_total": 99.99,
    })
    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Started saga: %s", s.ID)
}

// Implement your step and compensation handlers...
func createOrderHandler(ctx context.Context, s *saga.Saga, step *saga.Step) error {
    // Business logic for order creation
    return nil
}

🔝 back to top

 

High-Performance Streams
package main

import (
    "context"
    "log"
    "time"

    "github.com/cloudresty/go-rabbitmq"
    "github.com/cloudresty/go-rabbitmq/streams"
)

func main() {
    client, _ := rabbitmq.NewClient(rabbitmq.WithHosts("localhost:5672"))
    streamsHandler := streams.NewHandler(client)

    // Create high-throughput stream
    config := rabbitmq.StreamConfig{
        MaxAge:            24 * time.Hour,
        MaxLengthMessages: 10_000_000,
        MaxLengthBytes:    10 * 1024 * 1024 * 1024, // 10GB
        InitialClusterSize: 3, // High availability
    }

    err := streamsHandler.CreateStream(context.Background(), "events.stream", config)
    if err != nil {
        log.Printf("Stream creation: %v", err)
    }

    // High-speed publishing
    for i := 0; i < 100000; i++ {
        message := rabbitmq.NewMessage([]byte(fmt.Sprintf("Event %d: %s", i, time.Now())))
        err := streamsHandler.PublishToStream(context.Background(), "events.stream", message)
        if err != nil {
            log.Printf("Failed to publish: %v", err)
            break
        }
    }

    // Consume from stream
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    err = streamsHandler.ConsumeFromStream(ctx, "events.stream", func(ctx context.Context, delivery *rabbitmq.Delivery) error {
        log.Printf("Stream message: %s", delivery.Body)
        return nil
    })
}

🔝 back to top

 

Documentation

Document Description
Sub-Package READMEs Detailed documentation for each pluggable feature
compression/ Message compression with Gzip and Zlib
encryption/ AES-256-GCM message encryption
performance/ Metrics collection and monitoring
pool/ Connection pooling with health monitoring
protobuf/ Protocol Buffers integration
saga/ Distributed transaction orchestration
shutdown/ Graceful shutdown management
streams/ High-throughput RabbitMQ Streams
Additional Docs
API Reference Complete function reference and usage patterns
Environment Variables List of environment variables for configuration
Environment Variables Complete environment variable reference and usage
Production Features Auto-reconnection, graceful shutdown, HA queues
Examples Working examples for each feature
ULID Message IDs Using ULIDs for message IDs in RabbitMQ
Usage Patterns Common patterns for using the library effectively

🔝 back to top

 

Contributing

We welcome contributions! Please see our Contributing Guidelines for details.

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for your changes
  4. Ensure all tests pass
  5. Submit a pull request

🔝 back to top

 

Security

If you discover a security vulnerability, please report it via email to security@cloudresty.com.

🔝 back to top

 

License

This project is licensed under the MIT License - see the LICENSE.txt file for details.

🔝 back to top

 


 

An open source project brought to you by the Cloudresty team.

Website  |  LinkedIn  |  BlueSky  |  GitHub  |  Docker Hub

 

Documentation

Index

Constants

View Source
const (
	ContentTypeJSON = "application/json"
	ContentTypeText = "text/plain"
)

ContentType constants

Variables

View Source
var NoRetry = NoRetryPolicy{}

NoRetry is a global instance of NoRetryPolicy

Functions

func IsConnectionError added in v1.1.2

func IsConnectionError(err error) bool

IsConnectionError checks if an error is connection-related

func IsRetryableError added in v1.1.2

func IsRetryableError(err error) bool

IsRetryableError checks if an error is retryable

Types

type AccessPolicy added in v1.1.2

type AccessPolicy struct {
	AllowedExchanges []string
	AllowedQueues    []string
	AllowedRoutes    []string
	ReadOnly         bool
}

AccessPolicy defines role-based access control

type AdminService added in v1.1.2

type AdminService struct {
	// contains filtered or unexported fields
}

AdminService handles topology management operations

func (*AdminService) BindExchange added in v1.1.2

func (a *AdminService) BindExchange(ctx context.Context, destination, source, routingKey string, args Table) error

BindExchange binds an exchange to another exchange

func (*AdminService) BindQueue added in v1.1.2

func (a *AdminService) BindQueue(ctx context.Context, queue, exchange, routingKey string, opts ...BindingOption) error

BindQueue binds a queue to an exchange with optional binding options

func (*AdminService) DeclareClassicQueueWithDLQ added in v1.1.3

func (a *AdminService) DeclareClassicQueueWithDLQ(ctx context.Context, name string, opts ...func(*QueueConfig)) (*Queue, error)

DeclareClassicQueueWithDLQ declares a classic queue with automatic DLX/DLQ creation

func (*AdminService) DeclareExchange added in v1.1.2

func (a *AdminService) DeclareExchange(ctx context.Context, name string, kind ExchangeType, opts ...ExchangeOption) error

DeclareExchange declares an exchange with the given options

func (*AdminService) DeclareHAQueueWithDLQ added in v1.1.3

func (a *AdminService) DeclareHAQueueWithDLQ(ctx context.Context, name string, opts ...func(*QueueConfig)) (*Queue, error)

DeclareHAQueueWithDLQ declares an HA classic queue with automatic DLX/DLQ creation

func (*AdminService) DeclareQueue added in v1.1.2

func (a *AdminService) DeclareQueue(ctx context.Context, name string, opts ...QueueOption) (*Queue, error)

DeclareQueue declares a queue with the given options (Quorum queue by default)

func (*AdminService) DeclareQueueWithDLQ added in v1.1.3

func (a *AdminService) DeclareQueueWithDLQ(ctx context.Context, config QueueConfig) (*Queue, error)

DeclareQueueWithDLQ declares a queue using QueueConfig (simplified - no auto-creation)

func (*AdminService) DeclareQuorumQueue added in v1.1.2

func (a *AdminService) DeclareQuorumQueue(ctx context.Context, name string, opts ...QuorumQueueOption) (*Queue, error)

DeclareQuorumQueue declares a quorum queue with the given options

func (*AdminService) DeclareQuorumQueueWithDLQ added in v1.1.3

func (a *AdminService) DeclareQuorumQueueWithDLQ(ctx context.Context, name string, opts ...func(*QueueConfig)) (*Queue, error)

DeclareQuorumQueueWithDLQ declares a quorum queue with automatic DLX/DLQ creation

func (*AdminService) DeclareTopology added in v1.1.2

func (a *AdminService) DeclareTopology(ctx context.Context, topology *Topology) error

DeclareTopology declares a complete topology from a definition

func (*AdminService) DeleteExchange added in v1.1.2

func (a *AdminService) DeleteExchange(ctx context.Context, name string, opts ...DeleteExchangeOption) error

DeleteExchange deletes an exchange

func (*AdminService) DeleteQueue added in v1.1.2

func (a *AdminService) DeleteQueue(ctx context.Context, name string, opts ...DeleteQueueOption) error

DeleteQueue deletes a queue

func (*AdminService) ExchangeExists added in v1.2.0

func (a *AdminService) ExchangeExists(ctx context.Context, name string) (bool, error)

ExchangeExists checks if an exchange exists using passive declaration

func (*AdminService) ExchangeInfo added in v1.1.2

func (a *AdminService) ExchangeInfo(ctx context.Context, name string) (*ExchangeInfo, error)

ExchangeInfo returns detailed information about an exchange

func (*AdminService) InspectQueue added in v1.1.2

func (a *AdminService) InspectQueue(ctx context.Context, name string) (*QueueInfo, error)

InspectQueue returns detailed information about a queue

func (*AdminService) PurgeQueue added in v1.1.2

func (a *AdminService) PurgeQueue(ctx context.Context, name string) (int, error)

PurgeQueue purges all messages from a queue

func (*AdminService) QueueExists added in v1.2.0

func (a *AdminService) QueueExists(ctx context.Context, name string) (bool, error)

QueueExists checks if a queue exists using passive declaration

func (*AdminService) QueueInfo added in v1.1.2

func (a *AdminService) QueueInfo(ctx context.Context, name string) (*QueueInfo, error)

QueueInfo is an alias for InspectQueue to match the API documentation

func (*AdminService) SetupTopology added in v1.1.2

func (a *AdminService) SetupTopology(ctx context.Context, exchanges []ExchangeConfig, queues []QueueConfig, bindings []BindingConfig) error

SetupTopology sets up complete topology configuration with exchanges, queues, and bindings

func (*AdminService) UnbindQueue added in v1.1.2

func (a *AdminService) UnbindQueue(ctx context.Context, queue, exchange, routingKey string, opts ...BindingOption) error

UnbindQueue unbinds a queue from an exchange with optional binding options

type AuditLogger added in v1.1.2

type AuditLogger interface {
	LogConnection(clientID, username, host string)
	LogPublish(clientID, exchange, routingKey, messageID string)
	LogConsume(clientID, queue, messageID string)
	LogTopologyChange(clientID, operation, resource string)
}

AuditLogger interface for audit logging

func NewNopAuditLogger added in v1.1.2

func NewNopAuditLogger() AuditLogger

type BindingConfig

type BindingConfig struct {
	QueueName    string
	ExchangeName string
	RoutingKey   string
	NoWait       bool
	Arguments    map[string]any
}

BindingConfig holds configuration for binding a queue to an exchange

type BindingDeclaration added in v1.1.2

type BindingDeclaration struct {
	Queue      string         `json:"queue" yaml:"queue"`
	Exchange   string         `json:"exchange" yaml:"exchange"`
	RoutingKey string         `json:"routing_key" yaml:"routing_key"`
	Arguments  map[string]any `json:"arguments,omitempty" yaml:"arguments,omitempty"`
}

BindingDeclaration represents a binding declaration

type BindingOption added in v1.1.2

type BindingOption func(*bindingConfig)

BindingOption represents a functional option for binding configuration

func WithBindingArguments added in v1.1.2

func WithBindingArguments(args Table) BindingOption

WithBindingArguments sets arguments for the binding

func WithBindingHeaders added in v1.1.2

func WithBindingHeaders(headers map[string]any) BindingOption

WithBindingHeaders sets headers for the binding

func WithBindingNoWait added in v1.1.2

func WithBindingNoWait() BindingOption

WithBindingNoWait makes the binding operation not wait for server response

type Client added in v1.1.2

type Client struct {
	// contains filtered or unexported fields
}

Client represents the main RabbitMQ client with unified architecture

func NewClient added in v1.1.2

func NewClient(opts ...Option) (*Client, error)

NewClient creates a new RabbitMQ client with the specified options

func (*Client) Admin added in v1.1.2

func (c *Client) Admin() *AdminService

Service accessors

func (*Client) Close added in v1.1.2

func (c *Client) Close() error

func (*Client) ConnectionName added in v1.1.2

func (c *Client) ConnectionName() string

func (*Client) CreateChannel added in v1.1.2

func (c *Client) CreateChannel() (*amqp.Channel, error)

CreateChannel creates a new AMQP channel for advanced use cases This method is provided for implementing custom messaging patterns such as streams, sagas, or other experimental features

func (*Client) NewConsumer added in v1.1.2

func (c *Client) NewConsumer(opts ...ConsumerOption) (*Consumer, error)

NewConsumer creates a new consumer from the client

func (*Client) NewPublisher added in v1.1.2

func (c *Client) NewPublisher(opts ...PublisherOption) (*Publisher, error)

NewPublisher creates a new publisher from the client

func (*Client) Ping added in v1.1.2

func (c *Client) Ping(ctx context.Context) error

Health and connectivity methods

func (*Client) TopologyRegistry added in v1.2.0

func (c *Client) TopologyRegistry() *TopologyRegistry

func (*Client) TopologyValidator added in v1.2.0

func (c *Client) TopologyValidator() *TopologyValidator

TopologyValidator returns the topology validator for manual validation operations

func (*Client) URL added in v1.1.2

func (c *Client) URL() string

Connection information

type Closable

type Closable interface {
	Close() error
}

Closable interface for components that can be gracefully closed.

type ConnectionPooler added in v1.1.2

type ConnectionPooler interface {
	Get() (*Client, error)
	Close() error
	Stats() PoolStats
	Size() int
	HealthyCount() int
}

ConnectionPooler defines the interface for connection pool management. This is the core contract that all connection pool implementations must satisfy. For concrete implementations, use the pool sub-package.

func NewNopConnectionPooler added in v1.1.2

func NewNopConnectionPooler() ConnectionPooler

NewNopConnectionPooler creates a new no-op connection pooler. For advanced connection pooling capabilities, use the pool sub-package.

type ConsumeOption added in v1.1.2

type ConsumeOption func(*consumeConfig)

ConsumeOption represents a functional option for consumption behavior

func WithConsumeRetryPolicy added in v1.1.2

func WithConsumeRetryPolicy(policy RetryPolicy) ConsumeOption

WithConsumeRetryPolicy sets the retry policy for consumption

func WithDeadLetterPolicy added in v1.1.2

func WithDeadLetterPolicy(policy DeadLetterPolicy) ConsumeOption

WithDeadLetterPolicy sets the dead letter policy

func WithRejectRequeue added in v1.1.2

func WithRejectRequeue() ConsumeOption

WithRejectRequeue configures message rejection behavior

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer handles message consumption operations

func (*Consumer) Close

func (c *Consumer) Close() error

Close stops the consumer and closes its channel

func (*Consumer) Consume

func (c *Consumer) Consume(ctx context.Context, queue string, handler MessageHandler, opts ...ConsumeOption) error

Consume starts consuming messages from the specified queue with optional settings

type ConsumerOption added in v1.1.2

type ConsumerOption func(*consumerConfig)

ConsumerOption represents a functional option for consumer configuration

func WithAutoAck added in v1.1.2

func WithAutoAck() ConsumerOption

WithAutoAck enables automatic acknowledgment

func WithConcurrency added in v1.1.2

func WithConcurrency(workers int) ConsumerOption

WithConcurrency sets the number of concurrent message processors

func WithConsumerCompression added in v1.1.2

func WithConsumerCompression(compressor MessageCompressor) ConsumerOption

WithConsumerCompression sets the compression for the consumer (for decompression)

func WithConsumerEncryption added in v1.1.2

func WithConsumerEncryption(encryptor MessageEncryptor) ConsumerOption

WithConsumerEncryption sets the encryption for the consumer (for decryption)

func WithConsumerRetryPolicy added in v1.1.2

func WithConsumerRetryPolicy(policy RetryPolicy) ConsumerOption

WithConsumerRetryPolicy sets the retry policy for message processing failures

func WithConsumerSerialization added in v1.1.2

func WithConsumerSerialization(serializer MessageSerializer) ConsumerOption

WithConsumerSerialization sets the serializer for the consumer (for deserialization)

func WithConsumerTag added in v1.1.2

func WithConsumerTag(tag string) ConsumerOption

WithConsumerTag sets the consumer tag

func WithExclusiveConsumer added in v1.1.2

func WithExclusiveConsumer() ConsumerOption

WithExclusiveConsumer makes the consumer exclusive

func WithMessageTimeout added in v1.1.2

func WithMessageTimeout(timeout time.Duration) ConsumerOption

WithMessageTimeout sets the timeout for processing each message

func WithNoLocal added in v1.1.2

func WithNoLocal() ConsumerOption

WithNoLocal prevents delivery of messages published on this connection

func WithNoWait added in v1.1.2

func WithNoWait() ConsumerOption

WithNoWait makes the consume operation not wait for server response

func WithPrefetchCount added in v1.1.2

func WithPrefetchCount(count int) ConsumerOption

WithPrefetchCount sets the prefetch count

func WithPrefetchSize added in v1.1.2

func WithPrefetchSize(size int) ConsumerOption

WithPrefetchSize sets the prefetch size in bytes

type DeadLetterAfterRetries added in v1.1.2

type DeadLetterAfterRetries struct{}

DeadLetterAfterRetries sends messages to DLQ after max retry attempts

func (*DeadLetterAfterRetries) ShouldDeadLetter added in v1.1.2

func (d *DeadLetterAfterRetries) ShouldDeadLetter(delivery *Delivery, attempts int) bool

type DeadLetterPolicy added in v1.1.2

type DeadLetterPolicy interface {
	ShouldDeadLetter(delivery *Delivery, attempts int) bool
}

DeadLetterPolicy defines what to do with messages after retries are exhausted

type DeleteExchangeOption added in v1.1.2

type DeleteExchangeOption func(*deleteExchangeConfig)

DeleteExchangeOption represents a functional option for exchange deletion

type DeleteQueueOption added in v1.1.2

type DeleteQueueOption func(*deleteQueueConfig)

DeleteQueueOption represents a functional option for queue deletion

type Delivery added in v1.1.2

type Delivery struct {
	amqp.Delivery

	// Additional metadata
	ReceivedAt time.Time
}

Delivery wraps amqp.Delivery with additional helper methods

func (*Delivery) Ack added in v1.1.2

func (d *Delivery) Ack() error

Ack acknowledges the message

func (*Delivery) GetMessage added in v1.1.2

func (d *Delivery) GetMessage() *Message

GetMessage converts the delivery to a Message struct

func (*Delivery) IsRedelivered added in v1.1.2

func (d *Delivery) IsRedelivered() bool

IsRedelivered returns true if the message was redelivered

func (*Delivery) MessageID added in v1.1.2

func (d *Delivery) MessageID() string

MessageID returns the message ID

func (*Delivery) Nack added in v1.1.2

func (d *Delivery) Nack(requeue bool) error

Nack negatively acknowledges the message with requeue option

func (*Delivery) Reject added in v1.1.2

func (d *Delivery) Reject(requeue bool) error

Reject rejects the message with requeue option

func (*Delivery) Timestamp added in v1.1.2

func (d *Delivery) Timestamp() time.Time

Timestamp returns the message timestamp

type DeliveryInfo

type DeliveryInfo struct {
	MessageCount uint32
	Exchange     string
	RoutingKey   string
	Redelivered  bool
	DeliveryTag  uint64
	// Message metadata from AMQP properties
	MessageID     string
	CorrelationID string
	ReplyTo       string
	Type          string
	AppID         string
	UserID        string
	Timestamp     time.Time
	ContentType   string
	Priority      uint8
	Headers       map[string]any
}

DeliveryInfo contains information about a delivered message

func ExtractDeliveryInfo

func ExtractDeliveryInfo(delivery *amqp.Delivery) DeliveryInfo

ExtractDeliveryInfo extracts delivery information from an AMQP delivery

type EnvConfig added in v1.1.0

type EnvConfig struct {
	// Connection basics
	Username string   `env:"RABBITMQ_USERNAME,default=guest"`
	Password string   `env:"RABBITMQ_PASSWORD,default=guest"`
	Hosts    []string `env:"RABBITMQ_HOSTS,default=localhost:5672"`
	VHost    string   `env:"RABBITMQ_VHOST,default=/"`

	// Protocol and security
	Protocol    string `env:"RABBITMQ_PROTOCOL,default=amqp"` // amqp or amqps
	TLSEnabled  bool   `env:"RABBITMQ_TLS_ENABLED,default=false"`
	TLSInsecure bool   `env:"RABBITMQ_TLS_INSECURE,default=false"` // Skip cert verification

	// HTTP Management API
	HTTPProtocol string `env:"RABBITMQ_HTTP_PROTOCOL,default=http"` // http or https
	HTTPPort     int    `env:"RABBITMQ_HTTP_PORT,default=15672"`
	HTTPHost     string `env:"RABBITMQ_HTTP_HOST,default=localhost"` // Separate host for HTTP API

	// Connection behavior
	ConnectionName string        `env:"RABBITMQ_CONNECTION_NAME,default=go-rabbitmq"`
	Heartbeat      time.Duration `env:"RABBITMQ_HEARTBEAT,default=10s"`
	RetryAttempts  int           `env:"RABBITMQ_RETRY_ATTEMPTS,default=5"`
	RetryDelay     time.Duration `env:"RABBITMQ_RETRY_DELAY,default=2s"`

	// Timeouts
	DialTimeout    time.Duration `env:"RABBITMQ_DIAL_TIMEOUT,default=30s"`
	ChannelTimeout time.Duration `env:"RABBITMQ_CHANNEL_TIMEOUT,default=10s"`

	// Auto-reconnection
	AutoReconnect        bool          `env:"RABBITMQ_AUTO_RECONNECT,default=true"`
	ReconnectDelay       time.Duration `env:"RABBITMQ_RECONNECT_DELAY,default=5s"`
	MaxReconnectAttempts int           `env:"RABBITMQ_MAX_RECONNECT_ATTEMPTS,default=0"` // 0 = unlimited

	// Publisher settings
	PublisherConfirmationTimeout time.Duration `env:"RABBITMQ_PUBLISHER_CONFIRMATION_TIMEOUT,default=5s"`
	PublisherShutdownTimeout     time.Duration `env:"RABBITMQ_PUBLISHER_SHUTDOWN_TIMEOUT,default=15s"`
	PublisherPersistent          bool          `env:"RABBITMQ_PUBLISHER_PERSISTENT,default=true"`

	// Consumer settings
	ConsumerPrefetchCount   int           `env:"RABBITMQ_CONSUMER_PREFETCH_COUNT,default=1"`
	ConsumerAutoAck         bool          `env:"RABBITMQ_CONSUMER_AUTO_ACK,default=false"`
	ConsumerMessageTimeout  time.Duration `env:"RABBITMQ_CONSUMER_MESSAGE_TIMEOUT,default=5m"`
	ConsumerShutdownTimeout time.Duration `env:"RABBITMQ_CONSUMER_SHUTDOWN_TIMEOUT,default=30s"`

	// Topology validation and auto-healing (enabled by default for production reliability)
	TopologyValidation           bool          `env:"RABBITMQ_TOPOLOGY_VALIDATION,default=true"`
	TopologyAutoRecreation       bool          `env:"RABBITMQ_TOPOLOGY_AUTO_RECREATION,default=true"`
	TopologyBackgroundValidation bool          `env:"RABBITMQ_TOPOLOGY_BACKGROUND_VALIDATION,default=true"`
	TopologyValidationInterval   time.Duration `env:"RABBITMQ_TOPOLOGY_VALIDATION_INTERVAL,default=30s"`
}

EnvConfig holds all RabbitMQ configuration that can be loaded from environment variables

func (*EnvConfig) BuildAMQPURL added in v1.1.0

func (e *EnvConfig) BuildAMQPURL() string

BuildAMQPURL constructs the AMQP connection URL from environment configuration For multiple hosts, returns the first host URL (AMQP library handles failover)

func (*EnvConfig) BuildAMQPURLs added in v1.1.2

func (e *EnvConfig) BuildAMQPURLs() []string

BuildAMQPURLs constructs all AMQP connection URLs for failover support

func (*EnvConfig) BuildHTTPURL added in v1.1.0

func (e *EnvConfig) BuildHTTPURL() string

BuildHTTPURL constructs the HTTP management API URL from environment configuration

type Error

type Error struct {
	Type    string
	Message string
	Cause   error
}

Error types for better error handling

func NewConnectionError

func NewConnectionError(message string, cause error) *Error

NewConnectionError creates a new connection error

func NewConsumeError

func NewConsumeError(message string, cause error) *Error

NewConsumeError creates a new consume error

func NewPublishError

func NewPublishError(message string, cause error) *Error

NewPublishError creates a new publish error

func (*Error) Error

func (e *Error) Error() string

func (*Error) Unwrap

func (e *Error) Unwrap() error

type ExchangeConfig

type ExchangeConfig struct {
	Name       string
	Type       ExchangeType
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Arguments  map[string]any
}

ExchangeConfig holds configuration for declaring an exchange

type ExchangeDeclaration added in v1.1.2

type ExchangeDeclaration struct {
	Name       string         `json:"name" yaml:"name"`
	Type       ExchangeType   `json:"type" yaml:"type"`
	Durable    bool           `json:"durable" yaml:"durable"`
	AutoDelete bool           `json:"auto_delete" yaml:"auto_delete"`
	Internal   bool           `json:"internal" yaml:"internal"`
	Arguments  map[string]any `json:"arguments,omitempty" yaml:"arguments,omitempty"`
}

ExchangeDeclaration represents an exchange declaration

type ExchangeInfo added in v1.1.2

type ExchangeInfo struct {
	Name       string
	Type       string
	Durable    bool
	AutoDelete bool
	Internal   bool
	Arguments  Table
	VHost      string
}

ExchangeInfo represents detailed exchange information

type ExchangeOption added in v1.1.2

type ExchangeOption func(*exchangeConfig)

ExchangeOption represents a functional option for exchange configuration

func WithExchangeArguments added in v1.1.2

func WithExchangeArguments(args Table) ExchangeOption

WithExchangeArguments sets exchange arguments

func WithExchangeAutoDelete added in v1.1.2

func WithExchangeAutoDelete() ExchangeOption

WithExchangeAutoDelete makes the exchange auto-delete

func WithExchangeDurable added in v1.1.2

func WithExchangeDurable() ExchangeOption

WithExchangeDurable makes the exchange durable

func WithExchangeInternal added in v1.1.2

func WithExchangeInternal() ExchangeOption

WithExchangeInternal makes the exchange internal

type ExchangeType

type ExchangeType string

ExchangeType represents different types of exchanges

const (
	ExchangeTypeDirect  ExchangeType = "direct"
	ExchangeTypeFanout  ExchangeType = "fanout"
	ExchangeTypeTopic   ExchangeType = "topic"
	ExchangeTypeHeaders ExchangeType = "headers"
)

type ExponentialBackoff added in v1.1.2

type ExponentialBackoff struct {
	InitialDelay time.Duration
	MaxDelay     time.Duration
	Multiplier   float64
	MaxAttempts  int
}

ExponentialBackoff implements exponential backoff reconnection policy

func (*ExponentialBackoff) NextDelay added in v1.1.2

func (e *ExponentialBackoff) NextDelay(attempt int) time.Duration

func (*ExponentialBackoff) ShouldRetry added in v1.1.2

func (e *ExponentialBackoff) ShouldRetry(attempt int, err error) bool

type ExponentialBackoffPolicy added in v1.1.2

type ExponentialBackoffPolicy struct {
	InitialDelay time.Duration
	MaxDelay     time.Duration
	MaxAttempts  int
	Multiplier   float64
}

ExponentialBackoffPolicy implements exponential backoff retry

func (ExponentialBackoffPolicy) NextDelay added in v1.1.2

func (e ExponentialBackoffPolicy) NextDelay(attempt int) time.Duration

func (ExponentialBackoffPolicy) ShouldRetry added in v1.1.2

func (e ExponentialBackoffPolicy) ShouldRetry(attempt int, err error) bool

type FixedDelay added in v1.1.2

type FixedDelay struct {
	Delay       time.Duration
	MaxAttempts int
}

FixedDelay implements fixed delay reconnection policy

func (*FixedDelay) NextDelay added in v1.1.2

func (f *FixedDelay) NextDelay(attempt int) time.Duration

func (*FixedDelay) ShouldRetry added in v1.1.2

func (f *FixedDelay) ShouldRetry(attempt int, err error) bool

type GracefulShutdown added in v1.1.2

type GracefulShutdown interface {
	RegisterComponent(component Closable) error
	ShutdownGracefully(ctx context.Context) error
	SetupSignalHandling() <-chan struct{}
	IsShutdownComplete() bool
}

GracefulShutdown defines the interface for graceful shutdown coordination. This is the core contract that all shutdown implementations must satisfy. For concrete implementations, use the shutdown sub-package.

func NewNopGracefulShutdown added in v1.1.2

func NewNopGracefulShutdown() GracefulShutdown

NewNopGracefulShutdown creates a new no-op graceful shutdown handler. For advanced shutdown coordination capabilities, use the shutdown sub-package.

type LinearBackoff added in v1.1.2

type LinearBackoff struct {
	Delay       time.Duration
	MaxAttempts int
}

LinearBackoff implements linear backoff reconnection policy

func (*LinearBackoff) NextDelay added in v1.1.2

func (l *LinearBackoff) NextDelay(attempt int) time.Duration

func (*LinearBackoff) ShouldRetry added in v1.1.2

func (l *LinearBackoff) ShouldRetry(attempt int, err error) bool

type Logger added in v1.1.2

type Logger interface {
	// Debug logs a debug message with optional structured fields
	Debug(msg string, fields ...any)

	// Info logs an informational message with optional structured fields
	Info(msg string, fields ...any)

	// Warn logs a warning message with optional structured fields
	Warn(msg string, fields ...any)

	// Error logs an error message with optional structured fields
	Error(msg string, fields ...any)
}

Logger interface for structured logging Users can implement this interface to integrate their preferred logging solution.

func NewNopLogger added in v1.1.2

func NewNopLogger() Logger

NewNopLogger creates a new no-operation logger

type Message

type Message struct {
	Body        []byte
	ContentType string
	Headers     map[string]any
	Exchange    string
	RoutingKey  string
	Persistent  bool
	// Message identification and tracing
	MessageID     string // Unique message identifier (auto-generated if empty)
	CorrelationID string // Correlation ID for request-response patterns
	ReplyTo       string // Reply queue for RPC patterns
	// Message metadata
	Type   string // Message type/schema identifier
	AppID  string // Application ID that originated the message
	UserID string // User ID (if authenticated)
	// Timing and expiration
	Timestamp  int64  // Unix timestamp when message was created
	Expiration string // Message expiration (in milliseconds as string)
	// Message priority (0-255, higher = more priority)
	Priority uint8
}

Message represents a message with metadata

func FromAMQPDelivery added in v1.1.2

func FromAMQPDelivery(delivery amqp.Delivery) *Message

FromAMQPDelivery creates a Message from amqp.Delivery (for consumer API)

func NewJSONMessage

func NewJSONMessage(v interface{}) (*Message, error)

NewJSONMessage creates a new Message for JSON content by marshaling the provided value

func NewMessage

func NewMessage(body []byte) *Message

NewMessage creates a new Message with auto-generated ID and timestamp

func NewMessageWithID

func NewMessageWithID(body []byte, messageID string) *Message

NewMessageWithID creates a new Message with a specific ID

func NewTextMessage

func NewTextMessage(body []byte) *Message

NewTextMessage creates a new Message for plain text content

func (*Message) Clone added in v1.1.2

func (m *Message) Clone() *Message

Clone creates a deep copy of the message

func (*Message) ToAMQPPublishing added in v1.1.2

func (m *Message) ToAMQPPublishing() amqp.Publishing

ToAMQPPublishing converts the message to amqp.Publishing for the new API

func (*Message) ToPublishing

func (m *Message) ToPublishing() amqp.Publishing

ToPublishing converts a Message to amqp.Publishing

func (*Message) Validate added in v1.1.2

func (m *Message) Validate() error

Validate checks if the message is valid for publishing

func (*Message) WithAppID

func (m *Message) WithAppID(appID string) *Message

WithAppID sets the application ID

func (*Message) WithContentType added in v1.1.2

func (m *Message) WithContentType(contentType string) *Message

WithContentType sets the content type for the message

func (*Message) WithCorrelationID

func (m *Message) WithCorrelationID(correlationID string) *Message

WithCorrelationID sets the correlation ID for request-response patterns

func (*Message) WithExpiration

func (m *Message) WithExpiration(expiration time.Duration) *Message

WithExpiration sets message expiration in duration

func (*Message) WithHeader

func (m *Message) WithHeader(key string, value any) *Message

WithHeader adds a custom header to the message

func (*Message) WithHeaders

func (m *Message) WithHeaders(headers map[string]any) *Message

WithHeaders adds multiple custom headers to the message

func (*Message) WithMessageID added in v1.1.2

func (m *Message) WithMessageID(id string) *Message

WithMessageID sets the message ID

func (*Message) WithPersistent added in v1.1.2

func (m *Message) WithPersistent() *Message

WithPersistent sets the message to be persistent

func (*Message) WithPriority

func (m *Message) WithPriority(priority uint8) *Message

WithPriority sets message priority (0-255, higher = more priority)

func (*Message) WithReplyTo

func (m *Message) WithReplyTo(replyTo string) *Message

WithReplyTo sets the reply queue for RPC patterns

func (*Message) WithTimestamp added in v1.1.2

func (m *Message) WithTimestamp(t time.Time) *Message

WithTimestamp sets the message timestamp

func (*Message) WithTransient added in v1.1.2

func (m *Message) WithTransient() *Message

WithTransient sets the message to be transient (non-persistent)

func (*Message) WithType

func (m *Message) WithType(messageType string) *Message

WithType sets the message type/schema identifier

func (*Message) WithUserID

func (m *Message) WithUserID(userID string) *Message

WithUserID sets the user ID (if authenticated)

type MessageCompressor added in v1.1.2

type MessageCompressor interface {
	Compress(data []byte) ([]byte, error)
	Decompress(data []byte) ([]byte, error)
	Algorithm() string
	Threshold() int // Minimum size to trigger compression
}

MessageCompressor defines the interface for message compression/decompression. This is the core contract that all compression implementations must satisfy. For concrete implementations, use the compression sub-package.

func NewNopCompressor added in v1.1.2

func NewNopCompressor() MessageCompressor

NewNopCompressor creates a new no-op compressor. For advanced compression capabilities, use the compression sub-package.

type MessageEncryptor added in v1.1.2

type MessageEncryptor interface {
	Encrypt(data []byte) ([]byte, error)
	Decrypt(data []byte) ([]byte, error)
	Algorithm() string
}

MessageEncryptor defines the interface for message encryption/decryption

func NewNopEncryptor added in v1.1.2

func NewNopEncryptor() MessageEncryptor

NewNopEncryptor creates a new no-op encryptor. For advanced encryption capabilities, use the encryption sub-package.

type MessageHandler

type MessageHandler func(ctx context.Context, delivery *Delivery) error

MessageHandler is the function signature for handling consumed messages

type MessageSerializer added in v1.1.2

type MessageSerializer interface {
	Serialize(msg any) ([]byte, error)
	Deserialize(data []byte, target any) error
	ContentType() string
}

MessageSerializer defines the interface for message serialization/deserialization. This is the core contract that all serialization implementations must satisfy. For concrete implementations, use the protobuf sub-package or other serialization packages.

func NewNopSerializer added in v1.1.2

func NewNopSerializer() MessageSerializer

NewNopSerializer creates a new no-op serializer. For advanced serialization capabilities, use the protobuf or other serialization sub-packages.

type MetricsCollector added in v1.1.2

type MetricsCollector interface {
	// Connection metrics
	RecordConnection(connectionName string)
	RecordConnectionAttempt(success bool, duration time.Duration)
	RecordReconnection(attempt int)

	// Publishing metrics
	RecordPublish(exchange, routingKey string, messageSize int, duration time.Duration)
	RecordPublishConfirmation(success bool, duration time.Duration)

	// Consumption metrics
	RecordConsume(queue string, messageSize int, duration time.Duration)
	RecordMessageReceived(queue string)
	RecordMessageProcessed(queue string, success bool, duration time.Duration)
	RecordMessageRequeued(queue string)

	// Health and error metrics
	RecordHealthCheck(success bool, duration time.Duration)
	RecordError(operation string, err error)
}

MetricsCollector interface for collecting metrics

func NewNopMetrics added in v1.1.2

func NewNopMetrics() MetricsCollector

type NoRetryPolicy added in v1.1.2

type NoRetryPolicy struct{}

NoRetryPolicy never retries operations

func (NoRetryPolicy) NextDelay added in v1.1.2

func (n NoRetryPolicy) NextDelay(attempt int) time.Duration

func (NoRetryPolicy) ShouldRetry added in v1.1.2

func (n NoRetryPolicy) ShouldRetry(attempt int, err error) bool

type NopAuditLogger added in v1.1.2

type NopAuditLogger struct{}

NopAuditLogger is a no-operation audit logger

func (*NopAuditLogger) LogConnection added in v1.1.2

func (n *NopAuditLogger) LogConnection(clientID, username, host string)

func (*NopAuditLogger) LogConsume added in v1.1.2

func (n *NopAuditLogger) LogConsume(clientID, queue, messageID string)

func (*NopAuditLogger) LogPublish added in v1.1.2

func (n *NopAuditLogger) LogPublish(clientID, exchange, routingKey, messageID string)

func (*NopAuditLogger) LogTopologyChange added in v1.1.2

func (n *NopAuditLogger) LogTopologyChange(clientID, operation, resource string)

type NopLogger added in v1.1.2

type NopLogger struct{}

NopLogger is a no-operation logger that produces no output. This is used as the default logger when no logger is provided.

func (*NopLogger) Debug added in v1.1.2

func (n *NopLogger) Debug(msg string, fields ...any)

Debug implements Logger.Debug with no operation

func (*NopLogger) Error added in v1.1.2

func (n *NopLogger) Error(msg string, fields ...any)

Error implements Logger.Error with no operation

func (*NopLogger) Info added in v1.1.2

func (n *NopLogger) Info(msg string, fields ...any)

Info implements Logger.Info with no operation

func (*NopLogger) Warn added in v1.1.2

func (n *NopLogger) Warn(msg string, fields ...any)

Warn implements Logger.Warn with no operation

type NopMetrics added in v1.1.2

type NopMetrics struct{}

NopMetrics is a no-operation metrics collector

func (*NopMetrics) RecordConnection added in v1.1.2

func (n *NopMetrics) RecordConnection(connectionName string)

func (*NopMetrics) RecordConnectionAttempt added in v1.1.2

func (n *NopMetrics) RecordConnectionAttempt(success bool, duration time.Duration)

func (*NopMetrics) RecordConsume added in v1.1.2

func (n *NopMetrics) RecordConsume(queue string, messageSize int, duration time.Duration)

func (*NopMetrics) RecordError added in v1.1.2

func (n *NopMetrics) RecordError(operation string, err error)

func (*NopMetrics) RecordHealthCheck added in v1.1.2

func (n *NopMetrics) RecordHealthCheck(success bool, duration time.Duration)

func (*NopMetrics) RecordMessageProcessed added in v1.1.2

func (n *NopMetrics) RecordMessageProcessed(queue string, success bool, duration time.Duration)

func (*NopMetrics) RecordMessageReceived added in v1.1.2

func (n *NopMetrics) RecordMessageReceived(queue string)

func (*NopMetrics) RecordMessageRequeued added in v1.1.2

func (n *NopMetrics) RecordMessageRequeued(queue string)

func (*NopMetrics) RecordPublish added in v1.1.2

func (n *NopMetrics) RecordPublish(exchange, routingKey string, messageSize int, duration time.Duration)

func (*NopMetrics) RecordPublishConfirmation added in v1.1.2

func (n *NopMetrics) RecordPublishConfirmation(success bool, duration time.Duration)

func (*NopMetrics) RecordReconnection added in v1.1.2

func (n *NopMetrics) RecordReconnection(attempt int)

type NopSpan added in v1.1.2

type NopSpan struct{}

NopSpan is a no-operation span

func (*NopSpan) End added in v1.1.2

func (n *NopSpan) End()

func (*NopSpan) SetAttribute added in v1.1.2

func (n *NopSpan) SetAttribute(key string, value any)

func (*NopSpan) SetStatus added in v1.1.2

func (n *NopSpan) SetStatus(code SpanStatusCode, description string)

type NopTracer added in v1.1.2

type NopTracer struct{}

NopTracer is a no-operation tracer

func (*NopTracer) StartSpan added in v1.1.2

func (n *NopTracer) StartSpan(ctx context.Context, operation string) (context.Context, Span)

type Option added in v1.1.2

type Option func(*clientConfig) error

Option represents a functional option for configuring the Client

func FromEnv added in v1.1.2

func FromEnv() Option

FromEnv creates a client option that loads configuration from environment variables

func FromEnvWithPrefix added in v1.1.2

func FromEnvWithPrefix(prefix string) Option

FromEnvWithPrefix creates a client option that loads configuration from environment variables with a custom prefix

func WithAccessPolicy added in v1.1.2

func WithAccessPolicy(policy *AccessPolicy) Option

WithAccessPolicy sets the access control policy

func WithAuditLogging added in v1.1.2

func WithAuditLogging(logger AuditLogger) Option

WithAuditLogging sets the audit logger

func WithAutoReconnect added in v1.1.2

func WithAutoReconnect(enabled bool) Option

WithAutoReconnect enables or disables automatic reconnection

func WithChannelTimeout added in v1.1.2

func WithChannelTimeout(timeout time.Duration) Option

WithChannelTimeout sets the channel operation timeout

func WithConnectionName added in v1.1.2

func WithConnectionName(name string) Option

WithConnectionName sets the connection name

func WithConnectionPooler added in v1.1.2

func WithConnectionPooler(pooler ConnectionPooler) Option

WithConnectionPooler sets the connection pooler

func WithCredentials added in v1.1.2

func WithCredentials(username, password string) Option

WithCredentials sets the username and password

func WithDialTimeout added in v1.1.2

func WithDialTimeout(timeout time.Duration) Option

WithDialTimeout sets the connection dial timeout

func WithGracefulShutdown added in v1.1.2

func WithGracefulShutdown(shutdown GracefulShutdown) Option

WithGracefulShutdown sets the graceful shutdown handler

func WithHeartbeat added in v1.1.2

func WithHeartbeat(duration time.Duration) Option

WithHeartbeat sets the heartbeat interval

func WithHosts added in v1.1.2

func WithHosts(hosts ...string) Option

WithHosts sets multiple RabbitMQ hosts for failover support Each host should include port (e.g., "host1:5672,host2:5673") If no port is specified, defaults to 5672

func WithInsecureTLS added in v1.1.2

func WithInsecureTLS() Option

WithInsecureTLS enables TLS with certificate verification disabled

func WithLogger added in v1.1.2

func WithLogger(logger Logger) Option

WithLogger sets the logger

func WithMaxReconnectAttempts added in v1.1.2

func WithMaxReconnectAttempts(attempts int) Option

WithMaxReconnectAttempts sets the maximum number of reconnection attempts

func WithMetrics added in v1.1.2

func WithMetrics(metrics MetricsCollector) Option

WithMetrics sets the metrics collector

func WithPerformanceMonitoring added in v1.1.2

func WithPerformanceMonitoring(monitor PerformanceMonitor) Option

WithPerformanceMonitoring sets the performance monitor

func WithReconnectDelay added in v1.1.2

func WithReconnectDelay(delay time.Duration) Option

WithReconnectDelay sets the delay between reconnection attempts

func WithReconnectPolicy added in v1.1.2

func WithReconnectPolicy(policy ReconnectPolicy) Option

WithReconnectPolicy sets the reconnection policy

func WithSagaOrchestrator added in v1.1.2

func WithSagaOrchestrator(orchestrator SagaOrchestrator) Option

WithSagaOrchestrator sets the saga orchestrator

func WithStreamHandler added in v1.1.2

func WithStreamHandler(handler StreamHandler) Option

WithStreamHandler sets the stream handler

func WithTLS added in v1.1.2

func WithTLS(tlsConfig *tls.Config) Option

WithTLS sets the TLS configuration

func WithTopologyAutoRecreation added in v1.2.0

func WithTopologyAutoRecreation() Option

WithTopologyAutoRecreation enables automatic recreation of missing topology Requires TopologyValidation to be enabled

func WithTopologyBackgroundValidation added in v1.2.0

func WithTopologyBackgroundValidation(interval time.Duration) Option

WithTopologyBackgroundValidation enables background topology validation with a custom interval. Background validation is enabled by default (30s interval). Requires TopologyValidation to be enabled

func WithTopologyValidation added in v1.2.0

func WithTopologyValidation() Option

WithTopologyValidation enables topology validation When enabled, the client will track declared topology and validate it before operations

func WithTopologyValidationInterval added in v1.2.0

func WithTopologyValidationInterval(interval time.Duration) Option

WithTopologyValidationInterval sets the background validation interval This is a convenience method that automatically enables background validation

func WithTracing added in v1.1.2

func WithTracing(tracer Tracer) Option

WithTracing sets the tracer

func WithVHost added in v1.1.2

func WithVHost(vhost string) Option

WithVHost sets the virtual host

func WithoutTopologyAutoRecreation added in v1.2.0

func WithoutTopologyAutoRecreation() Option

WithoutTopologyAutoRecreation disables automatic recreation while keeping validation Topology will be validated but not automatically recreated if missing

func WithoutTopologyBackgroundValidation added in v1.2.0

func WithoutTopologyBackgroundValidation() Option

WithoutTopologyBackgroundValidation disables background validation while keeping on-demand validation Topology will be validated before operations but not periodically validated in the background

func WithoutTopologyValidation added in v1.2.0

func WithoutTopologyValidation() Option

WithoutTopologyValidation disables topology validation This disables all topology tracking, validation, and auto-recreation

type PerformanceMonitor added in v1.1.2

type PerformanceMonitor interface {
	// RecordPublish records a publish operation with its success status and duration
	RecordPublish(success bool, duration time.Duration)

	// RecordConsume records a consume operation with its success status and duration
	RecordConsume(success bool, duration time.Duration)
}

PerformanceMonitor provides performance monitoring capabilities for RabbitMQ operations. This is the core contract that all performance monitoring implementations must satisfy. For concrete implementations, use the performance sub-package.

func NewNopPerformanceMonitor added in v1.1.2

func NewNopPerformanceMonitor() PerformanceMonitor

NewNopPerformanceMonitor creates a new no-op performance monitor. For advanced monitoring capabilities, use the performance sub-package.

type PoolStats added in v1.1.2

type PoolStats struct {
	TotalConnections   int64
	HealthyConnections int64
	FailedConnections  int64
	RepairAttempts     int64
	LastHealthCheck    time.Time
}

PoolStats provides statistics about a connection pool.

type PublishRequest added in v1.1.2

type PublishRequest struct {
	Exchange   string
	RoutingKey string
	Message    *Message
}

PublishRequest represents a single publish operation

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher handles message publishing operations

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the publisher and its channel

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, exchange, routingKey string, message *Message) error

Publish publishes a message to the specified exchange and routing key

func (*Publisher) PublishBatch added in v1.1.2

func (p *Publisher) PublishBatch(ctx context.Context, messages []PublishRequest) error

PublishBatch publishes multiple messages in a batch

type PublisherOption added in v1.1.2

type PublisherOption func(*publisherConfig)

PublisherOption represents a functional option for publisher configuration

func WithCompression added in v1.1.2

func WithCompression(compressor MessageCompressor) PublisherOption

WithCompression sets the message compressor for the publisher

func WithCompressionThreshold added in v1.1.2

func WithCompressionThreshold(threshold int) PublisherOption

WithCompressionThreshold sets the compression threshold for the publisher

func WithConfirmation added in v1.1.2

func WithConfirmation(timeout time.Duration) PublisherOption

WithConfirmation enables publish confirmations for the publisher. The provided timeout is used to wait for each acknowledgement. This makes the publisher reliable but slower as each Publish call will block until the broker confirms the message or the timeout expires.

func WithDefaultExchange added in v1.1.2

func WithDefaultExchange(exchange string) PublisherOption

WithDefaultExchange sets the default exchange for publishing

func WithEncryption added in v1.1.2

func WithEncryption(encryptor MessageEncryptor) PublisherOption

WithEncryption sets the message encryptor for the publisher

func WithImmediate added in v1.1.2

func WithImmediate() PublisherOption

WithImmediate enables immediate publishing

func WithMandatory added in v1.1.2

func WithMandatory() PublisherOption

WithMandatory enables mandatory publishing

func WithPersistent added in v1.1.2

func WithPersistent() PublisherOption

WithPersistent makes all messages persistent by default

func WithRetryPolicy added in v1.1.2

func WithRetryPolicy(policy RetryPolicy) PublisherOption

WithRetryPolicy sets the retry policy for failed publishes

func WithSerializer added in v1.1.2

func WithSerializer(serializer MessageSerializer) PublisherOption

WithSerializer sets the message serializer for the publisher

type Queue added in v1.1.2

type Queue struct {
	Name      string
	Messages  int
	Consumers int
}

Queue represents a declared queue

type QueueConfig

type QueueConfig struct {
	Name       string
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	NoWait     bool
	Arguments  map[string]any
	// Cluster-aware settings
	QueueType            QueueType // Queue type: classic, quorum, stream
	HighAvailability     bool      // Enable HA for classic queues
	ReplicationFactor    int       // Replication factor for quorum queues (default: 3)
	MaxLength            int       // Maximum queue length (0 = unlimited)
	MaxLengthBytes       int       // Maximum queue size in bytes (0 = unlimited)
	MessageTTL           int       // Message TTL in milliseconds (0 = no TTL)
	DeadLetterExchange   string    // Dead letter exchange name
	DeadLetterRoutingKey string    // Dead letter routing key
}

QueueConfig holds configuration for declaring a queue

func DefaultClassicQueueConfig

func DefaultClassicQueueConfig(name string) QueueConfig

DefaultClassicQueueConfig returns a basic durable classic queue configuration

func DefaultHAQueueConfig

func DefaultHAQueueConfig(name string) QueueConfig

DefaultHAQueueConfig returns a production-ready HA classic queue configuration

func DefaultQuorumQueueConfig

func DefaultQuorumQueueConfig(name string) QueueConfig

DefaultQuorumQueueConfig returns a production-ready quorum queue configuration

func (*QueueConfig) ToArguments

func (q *QueueConfig) ToArguments() map[string]any

ToArguments converts the QueueConfig to RabbitMQ queue arguments

func (*QueueConfig) WithCustomDeadLetter

func (q *QueueConfig) WithCustomDeadLetter(dlxName, routingKey string) *QueueConfig

WithCustomDeadLetter configures a custom dead letter exchange

func (*QueueConfig) WithoutDeadLetter

func (q *QueueConfig) WithoutDeadLetter() *QueueConfig

WithoutDeadLetter clears any dead letter configuration

type QueueDeclaration added in v1.1.2

type QueueDeclaration struct {
	Name       string         `json:"name" yaml:"name"`
	Durable    bool           `json:"durable" yaml:"durable"`
	AutoDelete bool           `json:"auto_delete" yaml:"auto_delete"`
	Exclusive  bool           `json:"exclusive" yaml:"exclusive"`
	Arguments  map[string]any `json:"arguments,omitempty" yaml:"arguments,omitempty"`
	TTL        time.Duration  `json:"ttl,omitempty" yaml:"ttl,omitempty"`
	DeadLetter string         `json:"dead_letter,omitempty" yaml:"dead_letter,omitempty"`
}

QueueDeclaration represents a queue declaration

type QueueInfo added in v1.1.2

type QueueInfo struct {
	Name       string
	Messages   int
	Consumers  int
	Memory     int64
	Durable    bool
	AutoDelete bool
	Exclusive  bool
	Arguments  Table
	VHost      string
	Node       string
}

QueueInfo represents detailed queue information

type QueueOption added in v1.1.2

type QueueOption func(*queueConfig)

QueueOption represents a functional option for queue configuration

func WithArguments added in v1.1.2

func WithArguments(args Table) QueueOption

WithArguments sets custom queue arguments

func WithAutoDelete added in v1.1.2

func WithAutoDelete() QueueOption

WithAutoDelete makes the queue auto-delete when no longer used

func WithClassicQueue added in v1.1.3

func WithClassicQueue() QueueOption

WithClassicQueue forces the queue to be a classic queue instead of the default quorum queue

func WithDeadLetter added in v1.1.2

func WithDeadLetter(exchange, routingKey string) QueueOption

WithDeadLetter configures dead letter exchange and routing key

func WithDeadLetterTTL added in v1.1.2

func WithDeadLetterTTL(ttl time.Duration) QueueOption

WithDeadLetterTTL sets the TTL for messages in dead letter queue

func WithDeliveryLimit added in v1.1.3

func WithDeliveryLimit(limit int) QueueOption

WithDeliveryLimit sets the delivery limit for quorum queues (when using DeclareQueue with default quorum)

func WithDurable added in v1.1.2

func WithDurable() QueueOption

WithDurable makes the queue durable

func WithExclusiveQueue added in v1.1.2

func WithExclusiveQueue() QueueOption

WithExclusiveQueue makes the queue exclusive to this connection

func WithMaxLength added in v1.1.2

func WithMaxLength(length int64) QueueOption

WithMaxLength sets the maximum queue length

func WithMaxLengthBytes added in v1.1.2

func WithMaxLengthBytes(bytes int64) QueueOption

WithMaxLengthBytes sets the maximum queue size in bytes

func WithQuorumGroupSize added in v1.1.3

func WithQuorumGroupSize(size int) QueueOption

WithQuorumGroupSize sets the initial group size for quorum queues (when using DeclareQueue with default quorum)

func WithTTL added in v1.1.2

func WithTTL(ttl time.Duration) QueueOption

WithTTL sets the message TTL for the queue

func WithoutDLQ added in v1.1.3

func WithoutDLQ() QueueOption

WithoutDLQ clears any dead letter configuration

type QueueType

type QueueType string

QueueType represents the type of queue

const (
	QueueTypeClassic QueueType = "classic"
	QueueTypeQuorum  QueueType = "quorum"
	QueueTypeStream  QueueType = "stream"
)

type QuorumQueueOption added in v1.1.2

type QuorumQueueOption func(*quorumQueueConfig)

QuorumQueueOption represents a functional option for quorum queue configuration

func WithInitialGroupSize added in v1.1.2

func WithInitialGroupSize(size int) QuorumQueueOption

WithInitialGroupSize sets the initial group size for quorum queues

func WithQuorumDeadLetter added in v1.2.0

func WithQuorumDeadLetter(exchange, routingKey string) QuorumQueueOption

WithQuorumDeadLetter sets the dead letter exchange and routing key for quorum queues

func WithQuorumDeliveryLimit added in v1.1.2

func WithQuorumDeliveryLimit(limit int) QuorumQueueOption

WithQuorumDeliveryLimit sets the delivery limit for quorum queues

func WithoutQuorumDLQ added in v1.1.3

func WithoutQuorumDLQ() QuorumQueueOption

WithoutQuorumDLQ clears any dead letter configuration for quorum queues

type ReconnectPolicy added in v1.1.2

type ReconnectPolicy interface {
	ShouldRetry(attempt int, err error) bool
	NextDelay(attempt int) time.Duration
}

ReconnectPolicy defines the interface for reconnection strategies

func DefaultReconnectPolicy added in v1.1.2

func DefaultReconnectPolicy() ReconnectPolicy

DefaultReconnectPolicy returns a sensible default reconnection policy

func ProductionReconnectPolicy added in v1.1.2

func ProductionReconnectPolicy() ReconnectPolicy

ProductionReconnectPolicy returns a production-ready reconnection policy

func TestingReconnectPolicy added in v1.1.2

func TestingReconnectPolicy() ReconnectPolicy

TestingReconnectPolicy returns a fast reconnection policy for testing

type RejectError added in v1.1.2

type RejectError struct {
	Requeue bool
	Cause   error
}

RejectError allows controlling message rejection behavior

func (*RejectError) Error added in v1.1.2

func (r *RejectError) Error() string

func (*RejectError) Unwrap added in v1.1.2

func (r *RejectError) Unwrap() error

type RetryPolicy added in v1.1.2

type RetryPolicy interface {
	ShouldRetry(attempt int, err error) bool
	NextDelay(attempt int) time.Duration
}

RetryPolicy defines retry behavior for operations

type SagaOrchestrator added in v1.1.2

type SagaOrchestrator interface {
	StartSaga(ctx context.Context, sagaName string, steps []SagaStep, context map[string]any) (string, error)
	CompensateSaga(ctx context.Context, sagaID string) error
	GetSagaStatus(ctx context.Context, sagaID string) (SagaStatus, error)
}

SagaOrchestrator defines the interface for saga pattern orchestration. This is the core contract that all saga implementations must satisfy. For concrete implementations, use the saga sub-package.

func NewNopSagaOrchestrator added in v1.1.2

func NewNopSagaOrchestrator() SagaOrchestrator

NewNopSagaOrchestrator creates a new no-op saga orchestrator. For advanced saga orchestration capabilities, use the saga sub-package.

type SagaStatus added in v1.1.2

type SagaStatus struct {
	ID    string
	State string
	Steps []SagaStepStatus
}

SagaStatus represents the current status of a saga.

type SagaStep added in v1.1.2

type SagaStep struct {
	Name         string
	Action       string
	Compensation string
}

SagaStep represents a step in a saga workflow.

type SagaStepStatus added in v1.1.2

type SagaStepStatus struct {
	Name      string
	State     string
	Error     string
	Output    map[string]any
	Timestamp time.Time
}

SagaStepStatus represents the status of a saga step.

type Span added in v1.1.2

type Span interface {
	SetAttribute(key string, value any)
	SetStatus(code SpanStatusCode, description string)
	End()
}

Span interface for tracing spans

type SpanStatusCode added in v1.1.2

type SpanStatusCode int

SpanStatusCode represents the status of a span

const (
	SpanStatusUnset SpanStatusCode = iota
	SpanStatusOK
	SpanStatusError
)

type StreamConfig added in v1.1.2

type StreamConfig struct {
	MaxAge              time.Duration
	MaxLengthMessages   int
	MaxLengthBytes      int
	MaxSegmentSizeBytes int
	InitialClusterSize  int
}

StreamConfig holds configuration for stream creation.

type StreamHandler added in v1.1.2

type StreamHandler interface {
	PublishToStream(ctx context.Context, streamName string, message *Message) error
	ConsumeFromStream(ctx context.Context, streamName string, handler StreamMessageHandler) error
	CreateStream(ctx context.Context, streamName string, config StreamConfig) error
	DeleteStream(ctx context.Context, streamName string) error
}

StreamHandler defines the interface for RabbitMQ streams operations. This is the core contract that all stream implementations must satisfy. For concrete implementations, use the streams sub-package.

func NewNopStreamHandler added in v1.1.2

func NewNopStreamHandler() StreamHandler

NewNopStreamHandler creates a new no-op stream handler. For advanced streaming capabilities, use the streams sub-package.

type StreamMessageHandler added in v1.1.2

type StreamMessageHandler func(ctx context.Context, delivery *Delivery) error

StreamMessageHandler defines the function signature for processing stream messages

type Table added in v1.1.2

type Table map[string]any

Table represents AMQP table type for arguments

type Topology added in v1.1.2

type Topology struct {
	Exchanges []ExchangeDeclaration `json:"exchanges,omitempty" yaml:"exchanges,omitempty"`
	Queues    []QueueDeclaration    `json:"queues,omitempty" yaml:"queues,omitempty"`
	Bindings  []BindingDeclaration  `json:"bindings,omitempty" yaml:"bindings,omitempty"`
}

Topology represents a complete topology definition

type TopologyRegistry added in v1.2.0

type TopologyRegistry struct {
	// contains filtered or unexported fields
}

TopologyRegistry tracks declared topology for validation and auto-recreation

func NewTopologyRegistry added in v1.2.0

func NewTopologyRegistry() *TopologyRegistry

NewTopologyRegistry creates a new topology registry

func (*TopologyRegistry) Clear added in v1.2.0

func (r *TopologyRegistry) Clear()

Clear removes all registered topology

func (*TopologyRegistry) GetBinding added in v1.2.0

func (r *TopologyRegistry) GetBinding(queue, exchange, routingKey string) (BindingConfig, bool)

GetBinding retrieves a registered binding configuration

func (*TopologyRegistry) GetExchange added in v1.2.0

func (r *TopologyRegistry) GetExchange(name string) (ExchangeConfig, bool)

GetExchange retrieves a registered exchange configuration

func (*TopologyRegistry) GetQueue added in v1.2.0

func (r *TopologyRegistry) GetQueue(name string) (QueueConfig, bool)

GetQueue retrieves a registered queue configuration

func (*TopologyRegistry) ListBindings added in v1.2.0

func (r *TopologyRegistry) ListBindings() []BindingConfig

ListBindings returns all registered bindings

func (*TopologyRegistry) ListExchanges added in v1.2.0

func (r *TopologyRegistry) ListExchanges() []ExchangeConfig

ListExchanges returns all registered exchanges

func (*TopologyRegistry) ListQueues added in v1.2.0

func (r *TopologyRegistry) ListQueues() []QueueConfig

ListQueues returns all registered queues

func (*TopologyRegistry) RegisterBinding added in v1.2.0

func (r *TopologyRegistry) RegisterBinding(config BindingConfig)

RegisterBinding records a binding declaration for tracking

func (*TopologyRegistry) RegisterExchange added in v1.2.0

func (r *TopologyRegistry) RegisterExchange(config ExchangeConfig)

RegisterExchange records an exchange declaration for tracking

func (*TopologyRegistry) RegisterQueue added in v1.2.0

func (r *TopologyRegistry) RegisterQueue(config QueueConfig)

RegisterQueue records a queue declaration for tracking

type TopologyValidator added in v1.2.0

type TopologyValidator struct {
	// contains filtered or unexported fields
}

TopologyValidator handles topology validation and auto-recreation

func NewTopologyValidator added in v1.2.0

func NewTopologyValidator(client *Client, admin *AdminService, registry *TopologyRegistry) *TopologyValidator

NewTopologyValidator creates a new topology validator

func (*TopologyValidator) Disable added in v1.2.0

func (v *TopologyValidator) Disable()

Disable disables topology validation

func (*TopologyValidator) Enable added in v1.2.0

func (v *TopologyValidator) Enable()

Enable enables topology validation

func (*TopologyValidator) EnableAutoRecreate added in v1.2.0

func (v *TopologyValidator) EnableAutoRecreate()

EnableAutoRecreate enables automatic recreation of missing topology

func (*TopologyValidator) EnableBackgroundValidation added in v1.2.0

func (v *TopologyValidator) EnableBackgroundValidation(interval time.Duration)

EnableBackgroundValidation starts background topology validation

func (*TopologyValidator) IsAutoRecreateEnabled added in v1.2.0

func (v *TopologyValidator) IsAutoRecreateEnabled() bool

IsAutoRecreateEnabled returns whether auto-recreation is enabled

func (*TopologyValidator) IsEnabled added in v1.2.0

func (v *TopologyValidator) IsEnabled() bool

IsEnabled returns whether topology validation is enabled

func (*TopologyValidator) ValidateCompleteTopology added in v1.2.5

func (v *TopologyValidator) ValidateCompleteTopology() error

ValidateCompleteTopology performs a comprehensive validation of all registered topology This method can be called on-demand to ensure entire topology is consistent

func (*TopologyValidator) ValidateExchange added in v1.2.0

func (v *TopologyValidator) ValidateExchange(name string) error

ValidateExchange checks if an exchange exists and optionally recreates it

func (*TopologyValidator) ValidateQueue added in v1.2.0

func (v *TopologyValidator) ValidateQueue(name string) error

ValidateQueue checks if a queue exists and optionally recreates it

type Tracer added in v1.1.2

type Tracer interface {
	StartSpan(ctx context.Context, operation string) (context.Context, Span)
}

Tracer interface for distributed tracing

func NewNopTracer added in v1.1.2

func NewNopTracer() Tracer

Directories

Path Synopsis
Package compression provides message compression/decompression functionality for go-rabbitmq.
Package compression provides message compression/decompression functionality for go-rabbitmq.
Package encryption provides message encryption/decryption capabilities for the go-rabbitmq library.
Package encryption provides message encryption/decryption capabilities for the go-rabbitmq library.
examples
pool-features command
streams-unified command
Package performance provides detailed performance monitoring and metrics collection for RabbitMQ operations.
Package performance provides detailed performance monitoring and metrics collection for RabbitMQ operations.
Package pool provides connection pooling functionality for high-throughput RabbitMQ applications.
Package pool provides connection pooling functionality for high-throughput RabbitMQ applications.
Package protobuf provides support for Protocol Buffer message serialization and routing for the RabbitMQ library.
Package protobuf provides support for Protocol Buffer message serialization and routing for the RabbitMQ library.
Package saga provides distributed transaction support using the Saga pattern.
Package saga provides distributed transaction support using the Saga pattern.
Package shutdown provides coordinated graceful shutdown management for RabbitMQ components and other closable resources.
Package shutdown provides coordinated graceful shutdown management for RabbitMQ components and other closable resources.
Package streams provides RabbitMQ streams functionality for high-throughput scenarios.
Package streams provides RabbitMQ streams functionality for high-throughput scenarios.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL