Streams Package
Home / Streams Package
The streams package provides RabbitMQ Streams functionality for high-throughput messaging scenarios. RabbitMQ Streams are a persistent, replicated data structure introduced in RabbitMQ 3.9+ that offers exceptional performance for event streaming, time-series data, and cases where message order and durability are critical.
Features
- High-Throughput Publishing: Optimized for high-volume message publishing scenarios
- Durable Message Storage: Persistent, replicated storage with configurable retention policies
- Stream Configuration: Full control over stream behavior (max age, size limits, clustering)
- Consumer Offset Management: Built-in support for consumer positioning and replay
- Auto-Creation: Automatic stream creation with sensible defaults when needed
- Pluggable Interface: Clean contract-implementation pattern for easy testing and extensibility
🔝 back to top
Installation
go get github.com/cloudresty/go-rabbitmq/streams
🔝 back to top
Quick Start
Basic Stream Usage
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/cloudresty/go-rabbitmq"
"github.com/cloudresty/go-rabbitmq/streams"
)
func main() {
// Create RabbitMQ client
client, err := rabbitmq.NewClient(
rabbitmq.WithHosts("localhost:5672"),
rabbitmq.WithCredentials("guest", "guest"),
)
if err != nil {
log.Fatal("Failed to create client:", err)
}
defer client.Close()
// Create streams handler
streamsHandler := streams.NewHandler(client)
// Create a stream with configuration
streamConfig := rabbitmq.StreamConfig{
MaxAge: 24 * time.Hour, // Retain for 24 hours
MaxLengthMessages: 1_000_000, // Max 1M messages
MaxLengthBytes: 1024 * 1024 * 1024, // Max 1GB storage
}
err = streamsHandler.CreateStream(context.Background(), "events.stream", streamConfig)
if err != nil {
log.Printf("Stream creation: %v", err)
}
// Publish messages
for i := 0; i < 100; i++ {
message := rabbitmq.NewMessage([]byte(fmt.Sprintf("Event %d", i)))
message.MessageID = fmt.Sprintf("msg-%d", i)
err = streamsHandler.PublishToStream(context.Background(), "events.stream", message)
if err != nil {
log.Printf("Failed to publish: %v", err)
}
}
// Consume messages
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = streamsHandler.ConsumeFromStream(ctx, "events.stream", func(ctx context.Context, delivery *rabbitmq.Delivery) error {
fmt.Printf("Received: %s\n", delivery.Body)
return nil
})
if err != nil {
log.Printf("Consumption ended: %v", err)
}
}
🔝 back to top
Advanced Usage
Stream Configuration Options
The StreamConfig struct provides comprehensive control over stream behavior:
streamConfig := rabbitmq.StreamConfig{
// Retention by time
MaxAge: 7 * 24 * time.Hour, // Retain for 7 days
// Retention by message count
MaxLengthMessages: 10_000_000, // Max 10M messages
// Retention by size
MaxLengthBytes: 10 * 1024 * 1024 * 1024, // Max 10GB
// Segment configuration (advanced)
MaxSegmentSizeBytes: 500 * 1024 * 1024, // 500MB segments
// Clustering
InitialClusterSize: 3, // Replicate across 3 nodes
}
err := streamsHandler.CreateStream(ctx, "high-volume.stream", streamConfig)
🔝 back to top
// Create message with comprehensive metadata
message := rabbitmq.NewMessage([]byte(`{"event": "user_signup", "user_id": 12345}`))
message.ContentType = "application/json"
message.MessageID = "signup-12345"
message.Headers = map[string]interface{}{
"event_type": "user_action",
"source": "web_app",
"version": "1.0",
"correlation_id": "abc-123-def",
}
err := streamsHandler.PublishToStream(ctx, "user.events", message)
if err != nil {
log.Printf("Failed to publish event: %v", err)
}
🔝 back to top
Consumer with Error Handling
messageHandler := func(ctx context.Context, delivery *rabbitmq.Delivery) error {
// Extract message metadata
eventType := delivery.Headers["event_type"]
correlationID := delivery.Headers["correlation_id"]
log.Printf("Processing event: %s (correlation: %v)", eventType, correlationID)
// Process the message
var event UserEvent
if err := json.Unmarshal(delivery.Body, &event); err != nil {
log.Printf("Failed to unmarshal event: %v", err)
return err // This will cause consumption to stop
}
// Business logic
if err := processUserEvent(event); err != nil {
log.Printf("Failed to process event: %v", err)
return err
}
log.Printf("Successfully processed event: %s", delivery.MessageId)
return nil
}
// Start consuming with proper error handling
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := streamsHandler.ConsumeFromStream(ctx, "user.events", messageHandler)
if err != nil {
if err == context.Canceled {
log.Println("Consumption cancelled")
} else {
log.Printf("Consumption error: %v", err)
}
}
🔝 back to top
Stream Management Operations
// Create multiple streams with different configurations
streams := map[string]rabbitmq.StreamConfig{
"events.critical": {
MaxAge: 30 * 24 * time.Hour, // 30 days
MaxLengthMessages: 50_000_000,
InitialClusterSize: 5, // High availability
},
"events.logs": {
MaxAge: 3 * 24 * time.Hour, // 3 days
MaxLengthBytes: 1024 * 1024 * 1024, // 1GB
},
"events.metrics": {
MaxLengthMessages: 1_000_000, // Rolling window
},
}
for streamName, config := range streams {
err := streamsHandler.CreateStream(ctx, streamName, config)
if err != nil {
log.Printf("Failed to create stream %s: %v", streamName, err)
}
}
// Clean up streams when no longer needed
defer func() {
for streamName := range streams {
if err := streamsHandler.DeleteStream(ctx, streamName); err != nil {
log.Printf("Failed to delete stream %s: %v", streamName, err)
}
}
}()
🔝 back to top
Integration with Client
Using with Client Options
// Create client with streams handler
client, err := rabbitmq.NewClient(
rabbitmq.WithHosts("localhost:5672"),
rabbitmq.WithCredentials("guest", "guest"),
rabbitmq.WithStreamHandler(streams.NewHandler(nil)), // Note: pass nil, client will be set internally
)
if err != nil {
log.Fatal("Failed to create client:", err)
}
// Access the streams handler through the client
// (if you need to access it directly from the client)
🔝 back to top
Manual Handler Creation
// Create client first
client, err := rabbitmq.NewClient(
rabbitmq.WithHosts("localhost:5672"),
rabbitmq.WithCredentials("guest", "guest"),
)
if err != nil {
log.Fatal("Failed to create client:", err)
}
// Create streams handler manually
streamsHandler := streams.NewHandler(client)
// Use the handler for all stream operations
err = streamsHandler.CreateStream(ctx, "my.stream", rabbitmq.StreamConfig{
MaxAge: 24 * time.Hour,
})
🔝 back to top
High-Throughput Publishing
// For maximum throughput, consider batching
messages := make([]*rabbitmq.Message, 100)
for i := 0; i < 100; i++ {
messages[i] = rabbitmq.NewMessage([]byte(fmt.Sprintf("batch-message-%d", i)))
}
// Publish in batch (conceptual - implement based on your needs)
for _, message := range messages {
if err := streamsHandler.PublishToStream(ctx, "high-volume.stream", message); err != nil {
log.Printf("Failed to publish: %v", err)
break
}
}
🔝 back to top
Resource Management
// Always use context with timeouts for operations
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// For long-running consumers, use cancellable contexts
ctx, cancel := context.WithCancel(context.Background())
// Handle graceful shutdown
go func() {
<-shutdownSignal
cancel() // This will stop the consumer
}()
err := streamsHandler.ConsumeFromStream(ctx, "stream", messageHandler)
🔝 back to top
Error Handling
Common Error Scenarios
// Handle stream creation errors
err := streamsHandler.CreateStream(ctx, "my.stream", config)
if err != nil {
if strings.Contains(err.Error(), "already exists") {
log.Println("Stream already exists, continuing...")
} else {
log.Fatalf("Failed to create stream: %v", err)
}
}
// Handle publishing errors
err = streamsHandler.PublishToStream(ctx, "my.stream", message)
if err != nil {
if strings.Contains(err.Error(), "connection closed") {
log.Println("Connection lost, implementing retry logic...")
// Implement retry with backoff
} else {
log.Printf("Publish failed: %v", err)
}
}
// Handle consumption errors
err = streamsHandler.ConsumeFromStream(ctx, "my.stream", func(ctx context.Context, delivery *rabbitmq.Delivery) error {
// Return errors to stop consumption
if criticalError := processMessage(delivery); criticalError != nil {
return criticalError // This will stop the consumer
}
return nil // Continue consuming
})
if err != nil && err != context.Canceled {
log.Printf("Consumption error: %v", err)
}
🔝 back to top
Best Practices
Stream Design
- Naming Convention: Use hierarchical names like
domain.entity.events
- Configuration: Set appropriate retention policies based on your use case
- Partitioning: Consider using multiple streams for different event types
- Monitoring: Implement proper logging and metrics collection
🔝 back to top
Message Design
- Content Type: Always set appropriate content type (e.g., "application/json")
- Message ID: Use unique, meaningful message IDs for tracking (automatically preserved through streams with backup header mechanism)
- Headers: Include relevant metadata in headers for routing and filtering
- Size: Keep messages reasonably sized; use external storage for large payloads
🔝 back to top
Consumer Design
- Error Handling: Implement proper error handling and recovery
- Idempotency: Design message handlers to be idempotent
- Timeouts: Use appropriate context timeouts
- Graceful Shutdown: Handle shutdown signals properly
🔝 back to top
Production Considerations
- High Availability: Use
InitialClusterSize > 1 for critical streams
- Retention: Set appropriate
MaxAge, MaxLengthMessages, and MaxLengthBytes
- Monitoring: Monitor stream metrics and consumer lag
- Backpressure: Implement backpressure handling for high-volume scenarios
🔝 back to top
Examples
For complete working examples, see:
🔝 back to top
An open source project brought to you by the Cloudresty team.
Website | LinkedIn | BlueSky | GitHub | Docker Hub