performance

package
v1.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2025 License: MIT Imports: 3 Imported by: 0

README

Performance Monitoring Package

Home  /  Performance Monitoring Package

 

The performance package provides comprehensive performance monitoring and metrics collection for RabbitMQ operations. This package is designed to give you deep observability into your RabbitMQ application's behavior and performance characteristics.

 

Features

  • Connection Tracking - Monitor connection establishment, failures, and reconnections
  • Operation Metrics - Track publish and consume operations with success/failure rates
  • Latency Monitoring - Record and calculate latency percentiles (P50, P95, P99)
  • Rate Tracking - Monitor operations per second over configurable time windows
  • Thread-Safe - All operations are safe for concurrent use
  • Lightweight - Minimal overhead suitable for production environments
  • Comprehensive Stats - Detailed statistics and metrics export

🔝 back to top

 

Quick Start

Basic Usage
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/cloudresty/go-rabbitmq"
    "github.com/cloudresty/go-rabbitmq/performance"
)

func main() {
    // Create a performance monitor
    monitor := performance.NewMonitor()

    // Create a client (assuming you have connection setup)
    client, err := rabbitmq.NewClient(
        rabbitmq.WithHosts("localhost:5672"),
    )
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Record connection success
    monitor.RecordConnection(true)

    // Create a publisher
    publisher, err := client.NewPublisher()
    if err != nil {
        log.Fatal(err)
    }
    defer publisher.Close()

    // Publish with performance monitoring
    for i := 0; i < 100; i++ {
        start := time.Now()

        err := publisher.Publish(context.Background(), "test.queue", rabbitmq.Publishing{
            Body: []byte(fmt.Sprintf("Message %d", i)),
        })

        // Record the operation
        duration := time.Since(start)
        monitor.RecordPublish(err == nil, duration)

        if err != nil {
            log.Printf("Publish failed: %v", err)
        }
    }

    // Get and display statistics
    stats := monitor.GetStats()
    printStats(stats)
}

func printStats(stats performance.Stats) {
    fmt.Printf("=== Performance Statistics ===\n")
    fmt.Printf("Connections: %d total, %d reconnections\n",
        stats.ConnectionsTotal, stats.ReconnectionsTotal)
    fmt.Printf("Connected: %v\n", stats.IsConnected)

    fmt.Printf("\nPublish Operations:\n")
    fmt.Printf("  Total: %d\n", stats.PublishesTotal)
    fmt.Printf("  Success: %d (%.2f%%)\n",
        stats.PublishSuccessTotal, stats.PublishSuccessRate*100)
    fmt.Printf("  Errors: %d\n", stats.PublishErrorsTotal)
    fmt.Printf("  Rate: %.2f ops/sec\n", stats.PublishRate)

    fmt.Printf("\nLatency Percentiles:\n")
    fmt.Printf("  P50: %v\n", stats.PublishLatencyP50)
    fmt.Printf("  P95: %v\n", stats.PublishLatencyP95)
    fmt.Printf("  P99: %v\n", stats.PublishLatencyP99)
}

🔝 back to top

 

Consumer Monitoring
func monitorConsumer(monitor *performance.Monitor, consumer *rabbitmq.Consumer) {
    for {
        start := time.Now()

        // Simulate consume operation
        delivery, err := consumer.Receive(context.Background())
        duration := time.Since(start)

        if err != nil {
            monitor.RecordConsume(false, duration)
            log.Printf("Consume error: %v", err)
            continue
        }

        // Process the message
        processMessage(delivery)

        // Record successful consume
        monitor.RecordConsume(true, duration)

        // Acknowledge the message
        delivery.Ack(false)
    }
}

🔝 back to top

 

Rate Tracking

The performance monitor includes rate tracking functionality for monitoring operations per second:

monitor := performance.NewMonitor()

// Record some operations
for i := 0; i < 50; i++ {
    monitor.RecordPublish(true, time.Millisecond)
    time.Sleep(20 * time.Millisecond) // Simulate ~50 ops/sec
}

// Get current rates
publishRate := monitor.GetPublishRate()
fmt.Printf("Current publish rate: %.2f ops/sec\n", publishRate)

// Get comprehensive stats
stats := monitor.GetStats()
fmt.Printf("Publish rate from stats: %.2f ops/sec\n", stats.PublishRate)
fmt.Printf("Consume rate: %.2f ops/sec\n", stats.ConsumeRate)

🔝 back to top

 

Custom Rate Tracker

You can also use the rate tracker independently:

// Create a rate tracker with 30-second window
tracker := performance.NewRateTracker(30 * time.Second)

// Record events
for i := 0; i < 10; i++ {
    tracker.Record()
    time.Sleep(100 * time.Millisecond)
}

// Get the current rate
rate := tracker.Rate()
fmt.Printf("Rate: %.2f events/sec\n", rate)

🔝 back to top

 

Statistics and Metrics

Connection Metrics
stats := monitor.GetStats()

fmt.Printf("Connection Status:\n")
fmt.Printf("  Is Connected: %v\n", stats.IsConnected)
fmt.Printf("  Total Connections: %d\n", stats.ConnectionsTotal)
fmt.Printf("  Reconnections: %d\n", stats.ReconnectionsTotal)
fmt.Printf("  Last Connection: %v\n", stats.LastConnectionTime)
fmt.Printf("  Last Reconnection: %v\n", stats.LastReconnectionTime)

🔝 back to top

 

Operation Metrics
stats := monitor.GetStats()

fmt.Printf("Publish Metrics:\n")
fmt.Printf("  Total Operations: %d\n", stats.PublishesTotal)
fmt.Printf("  Successful: %d\n", stats.PublishSuccessTotal)
fmt.Printf("  Failed: %d\n", stats.PublishErrorsTotal)
fmt.Printf("  Success Rate: %.2f%%\n", stats.PublishSuccessRate*100)
fmt.Printf("  Operations/sec: %.2f\n", stats.PublishRate)

fmt.Printf("Consume Metrics:\n")
fmt.Printf("  Total Operations: %d\n", stats.ConsumesTotal)
fmt.Printf("  Successful: %d\n", stats.ConsumeSuccessTotal)
fmt.Printf("  Failed: %d\n", stats.ConsumeErrorsTotal)
fmt.Printf("  Success Rate: %.2f%%\n", stats.ConsumeSuccessRate*100)
fmt.Printf("  Operations/sec: %.2f\n", stats.ConsumeRate)

🔝 back to top

 

Latency Analysis
stats := monitor.GetStats()

fmt.Printf("Publish Latency Percentiles:\n")
fmt.Printf("  P50 (median): %v\n", stats.PublishLatencyP50)
fmt.Printf("  P95: %v\n", stats.PublishLatencyP95)
fmt.Printf("  P99: %v\n", stats.PublishLatencyP99)

fmt.Printf("Consume Latency Percentiles:\n")
fmt.Printf("  P50 (median): %v\n", stats.ConsumeLatencyP50)
fmt.Printf("  P95: %v\n", stats.ConsumeLatencyP95)
fmt.Printf("  P99: %v\n", stats.ConsumeLatencyP99)

🔝 back to top

 

Integration with Monitoring Systems

Prometheus Integration
import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    publishTotal = promauto.NewCounterVec(
        prometheus.CounterOpts{
            Name: "rabbitmq_publish_total",
            Help: "Total number of publish operations",
        },
        []string{"status"},
    )

    publishLatency = promauto.NewHistogramVec(
        prometheus.HistogramOpts{
            Name: "rabbitmq_publish_duration_seconds",
            Help: "Publish operation duration",
        },
        []string{},
    )
)

func recordToPrometheus(monitor *performance.Monitor) {
    stats := monitor.GetStats()

    // Update Prometheus metrics
    publishTotal.WithLabelValues("success").Add(float64(stats.PublishSuccessTotal))
    publishTotal.WithLabelValues("error").Add(float64(stats.PublishErrorsTotal))

    // Record latencies (simplified)
    if stats.PublishLatencyP50 > 0 {
        publishLatency.WithLabelValues().Observe(stats.PublishLatencyP50.Seconds())
    }
}

🔝 back to top

 

Periodic Stats Export
func startStatsExporter(monitor *performance.Monitor, interval time.Duration) {
    ticker := time.NewTicker(interval)
    go func() {
        for range ticker.C {
            stats := monitor.GetStats()

            // Export to your monitoring system
            exportToInfluxDB(stats)
            exportToDatadog(stats)
            exportToCloudWatch(stats)
        }
    }()
}

func exportToInfluxDB(stats performance.Stats) {
    // Implementation depends on your InfluxDB client
    // Example structure:
    point := map[string]interface{}{
        "connections_total":      stats.ConnectionsTotal,
        "publish_success_rate":   stats.PublishSuccessRate,
        "publish_rate":          stats.PublishRate,
        "publish_latency_p50":   stats.PublishLatencyP50.Nanoseconds(),
        "publish_latency_p95":   stats.PublishLatencyP95.Nanoseconds(),
        "consume_rate":          stats.ConsumeRate,
        "is_connected":          stats.IsConnected,
    }
    // Write point to InfluxDB...
}

🔝 back to top

 

Advanced Usage

Custom Monitoring Wrapper
type MonitoredPublisher struct {
    publisher *rabbitmq.Publisher
    monitor   *performance.Monitor
}

func NewMonitoredPublisher(publisher *rabbitmq.Publisher, monitor *performance.Monitor) *MonitoredPublisher {
    return &MonitoredPublisher{
        publisher: publisher,
        monitor:   monitor,
    }
}

func (mp *MonitoredPublisher) Publish(ctx context.Context, queue string, msg rabbitmq.Publishing) error {
    start := time.Now()
    err := mp.publisher.Publish(ctx, queue, msg)
    duration := time.Since(start)

    mp.monitor.RecordPublish(err == nil, duration)
    return err
}

🔝 back to top

 

Health Check Integration
func healthCheckHandler(monitor *performance.Monitor) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        stats := monitor.GetStats()

        // Consider unhealthy if not connected or high error rate
        if !stats.IsConnected || stats.PublishSuccessRate < 0.95 {
            w.WriteHeader(http.StatusServiceUnavailable)
            json.NewEncoder(w).Encode(map[string]interface{}{
                "status": "unhealthy",
                "stats":  stats,
            })
            return
        }

        w.WriteHeader(http.StatusOK)
        json.NewEncoder(w).Encode(map[string]interface{}{
            "status": "healthy",
            "stats":  stats,
        })
    }
}

🔝 back to top

 

Helper Methods

The monitor provides several convenience methods:

monitor := performance.NewMonitor()

// Connection state
isConnected := monitor.IsConnected()

// Current rates
publishRate := monitor.GetPublishRate()
consumeRate := monitor.GetConsumeRate()

// Total operations across publish and consume
totalOps := monitor.GetTotalOperations()

// Overall success rate
successRate := monitor.GetSuccessRate()

// Reset all counters
monitor.Reset()

🔝 back to top

 

Best Practices

1. Monitor Setup
// Create one monitor per client/connection
client, _ := rabbitmq.NewClient(opts...)
monitor := performance.NewMonitor()

// Record connection status
monitor.RecordConnection(true)
2. Operation Recording
// Always record operations, both success and failure
start := time.Now()
err := operation()
duration := time.Since(start)
monitor.RecordPublish(err == nil, duration)
3. Periodic Stats Collection
// Export stats every 30 seconds
ticker := time.NewTicker(30 * time.Second)
go func() {
    for range ticker.C {
        stats := monitor.GetStats()
        logStats(stats)
    }
}()
4. Resource Management
// Reset counters periodically to prevent memory growth
if time.Since(lastReset) > time.Hour {
    monitor.Reset()
    lastReset = time.Now()
}

🔝 back to top

 

Configuration Reference

Stats Structure
Field Type Description
ConnectionsTotal uint64 Total connection attempts
ReconnectionsTotal uint64 Total reconnection attempts
IsConnected bool Current connection status
LastConnectionTime time.Time Time of last connection
LastReconnectionTime time.Time Time of last reconnection
PublishesTotal uint64 Total publish operations
PublishSuccessTotal uint64 Successful publish operations
PublishErrorsTotal uint64 Failed publish operations
PublishSuccessRate float64 Publish success rate (0.0-1.0)
PublishRate float64 Publish operations per second
ConsumesTotal uint64 Total consume operations
ConsumeSuccessTotal uint64 Successful consume operations
ConsumeErrorsTotal uint64 Failed consume operations
ConsumeSuccessRate float64 Consume success rate (0.0-1.0)
ConsumeRate float64 Consume operations per second
PublishLatencyP50 time.Duration 50th percentile publish latency
PublishLatencyP95 time.Duration 95th percentile publish latency
PublishLatencyP99 time.Duration 99th percentile publish latency
ConsumeLatencyP50 time.Duration 50th percentile consume latency
ConsumeLatencyP95 time.Duration 95th percentile consume latency
ConsumeLatencyP99 time.Duration 99th percentile consume latency

🔝 back to top

 

Performance Considerations

  • Memory Usage: The monitor keeps the last 1000 latency measurements for each operation type
  • Rate Tracking: Uses a 1-minute sliding window by default
  • Thread Safety: All operations use atomic operations or mutexes for safety
  • Overhead: Minimal performance impact - typically < 1% overhead

🔝 back to top

 

Testing

The performance package includes comprehensive tests:

# Run tests
go test ./performance

# Run benchmarks
go test -bench=. ./performance

# Run with race detection
go test -race ./performance

🔝 back to top

 

The performance monitor provides thread-safe operation tracking and comprehensive metrics collection.

🔝 back to top

 


 

An open source project brought to you by the Cloudresty team.

Website  |  LinkedIn  |  BlueSky  |  GitHub  |  Docker Hub

 

Documentation

Overview

Package performance provides detailed performance monitoring and metrics collection for RabbitMQ operations.

This package offers comprehensive monitoring capabilities including:

  • Connection tracking and health monitoring
  • Publish and consume operation metrics
  • Latency percentile calculations
  • Rate tracking over time windows
  • Success/failure rate monitoring

The performance monitor is designed to be lightweight and thread-safe, suitable for production environments where detailed observability is required.

Example usage:

import "github.com/cloudresty/go-rabbitmq/performance"

// Create a performance monitor
monitor := performance.NewMonitor()

// Use it with the main client
client, err := rabbitmq.NewClient(
	rabbitmq.WithPerformanceMonitoring(monitor),
)

// Record operations as they occur (usually done internally by the client)
start := time.Now()
err := publisher.Publish(ctx, queue, msg)
monitor.RecordPublish(err == nil, time.Since(start))

// Get detailed statistics
stats := monitor.GetStats()
fmt.Printf("Publish success rate: %.2f%%\n", stats.PublishSuccessRate*100)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

Monitor provides detailed performance monitoring capabilities

func NewMonitor

func NewMonitor() *Monitor

NewMonitor creates a new performance monitor

func (*Monitor) GetConsumeRate

func (p *Monitor) GetConsumeRate() float64

GetConsumeRate returns the current consume rate (operations per second)

func (*Monitor) GetPublishRate

func (p *Monitor) GetPublishRate() float64

GetPublishRate returns the current publish rate (operations per second)

func (*Monitor) GetStats

func (p *Monitor) GetStats() Stats

GetStats returns current performance statistics

func (*Monitor) GetSuccessRate

func (p *Monitor) GetSuccessRate() float64

GetSuccessRate returns the overall success rate across all operations

func (*Monitor) GetTotalOperations

func (p *Monitor) GetTotalOperations() uint64

GetTotalOperations returns the total number of operations recorded

func (*Monitor) IsConnected

func (p *Monitor) IsConnected() bool

IsConnected returns whether the monitored connection is currently connected

func (*Monitor) RecordConnection

func (p *Monitor) RecordConnection(success bool)

RecordConnection records a connection event

func (*Monitor) RecordConsume

func (p *Monitor) RecordConsume(success bool, duration time.Duration)

RecordConsume records a consume operation

func (*Monitor) RecordPublish

func (p *Monitor) RecordPublish(success bool, duration time.Duration)

RecordPublish records a publish operation

func (*Monitor) RecordReconnection

func (p *Monitor) RecordReconnection()

RecordReconnection records a reconnection event

func (*Monitor) Reset

func (p *Monitor) Reset()

Reset resets all performance counters

type RateTracker

type RateTracker struct {
	// contains filtered or unexported fields
}

RateTracker tracks rates over time windows

func NewRateTracker

func NewRateTracker(window time.Duration) *RateTracker

NewRateTracker creates a new rate tracker with the specified window

func (*RateTracker) Rate

func (r *RateTracker) Rate() float64

Rate returns the current rate (events per second)

func (*RateTracker) Record

func (r *RateTracker) Record()

Record records an event

type Stats

type Stats struct {
	// Connection stats
	ConnectionsTotal     uint64
	ReconnectionsTotal   uint64
	IsConnected          bool
	LastConnectionTime   time.Time
	LastReconnectionTime time.Time

	// Publish stats
	PublishesTotal      uint64
	PublishSuccessTotal uint64
	PublishErrorsTotal  uint64
	PublishSuccessRate  float64
	PublishRate         float64 // per second

	// Consume stats
	ConsumesTotal       uint64
	ConsumeSuccessTotal uint64
	ConsumeErrorsTotal  uint64
	ConsumeSuccessRate  float64
	ConsumeRate         float64 // per second

	// Latency stats (simplified percentiles)
	PublishLatencyP50 time.Duration
	PublishLatencyP95 time.Duration
	PublishLatencyP99 time.Duration
	ConsumeLatencyP50 time.Duration
	ConsumeLatencyP95 time.Duration
	ConsumeLatencyP99 time.Duration
}

Stats represents current performance statistics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL