README
¶
Load Balancing Example
This example demonstrates how multiple consumers work together to process streams in a load-balanced fashion using the Redis Stream Client Go library.
What This Example Shows
- Multiple consumers processing streams concurrently
- Load balancing of stream assignments across consumers
- Producer generating continuous stream of messages
- Statistics and monitoring of consumer performance
- Automatic claiming of streams from failed consumers
Architecture
Producer → LBS (Load Balancer Stream) → Consumer 1
→ Consumer 2
→ Consumer 3
→ ...
The Load Balancer Stream (LBS) distributes incoming stream assignments to available consumers in a round-robin fashion.
Prerequisites
- Redis server running on
localhost:6379 - Go 1.22 or later
- Multiple terminal windows for running multiple consumers
Running the Example
Step 1: Start Redis Server
# Using Docker
docker run -d -p 6379:6379 redis:7.2.3
# Or use your local Redis installation
redis-server
Step 2: Start Multiple Consumers
Open multiple terminal windows and run:
Terminal 1 (Consumer 1):
export POD_NAME=consumer-1
cd examples/load-balancing
go run main.go
Terminal 2 (Consumer 2):
export POD_NAME=consumer-2
cd examples/load-balancing
go run main.go
Terminal 3 (Consumer 3):
export POD_NAME=consumer-3
cd examples/load-balancing
go run main.go
Step 3: Start the Producer
Terminal 4 (Producer):
cd examples/load-balancing
go run main.go producer
Expected Output
Consumer Output:
2024/01/15 10:30:00 Starting consumer: consumer-1
2024/01/15 10:30:00 Created client with ID: redis-consumer-consumer-1
2024/01/15 10:30:00 🎉 [consumer-1] New stream assigned
2024/01/15 10:30:00 🔄 [consumer-1] Processing stream: order-stream-0
2024/01/15 10:30:00 📋 [consumer-1] Stream details: map[amount:100 created_at:2024-01-15T10:30:00Z customer_id:customer-0 order_id:order-0 priority:low status:pending]
2024/01/15 10:30:04 ✅ [consumer-1] Completed processing stream: order-stream-0 (took 4s)
2024/01/15 10:30:10 📊 [consumer-1] Statistics: Processed 1 streams
Producer Output:
2024/01/15 10:30:05 🏭 Starting producer...
2024/01/15 10:30:05 📤 Produced message 0: order-stream-0
2024/01/15 10:30:05 📤 Produced message 1: order-stream-1
2024/01/15 10:30:05 📤 Produced message 2: order-stream-2
2024/01/15 10:30:05 📤 Produced message 3: order-stream-3
2024/01/15 10:30:05 📤 Produced message 4: order-stream-4
Key Features Demonstrated
1. Load Balancing
- Streams are distributed among consumers in round-robin fashion
- Each consumer processes different streams
- No single consumer gets overwhelmed
2. Priority Processing
The example includes three priority levels:
- High priority: 1 second processing time
- Normal priority: 2 seconds processing time
- Low priority: 4 seconds processing time
3. Statistics Monitoring
Each consumer reports:
- Total streams processed
- Recent activity (last 10 seconds)
- Processing times and completion status
4. Failure Recovery
If you kill a consumer (Ctrl+C), other consumers will automatically claim its unprocessed streams.
Testing Scenarios
Scenario 1: Basic Load Balancing
- Start 3 consumers
- Start the producer
- Observe how streams are distributed among consumers
Scenario 2: Consumer Failure
- Start 3 consumers and producer
- Kill one consumer (Ctrl+C)
- Observe other consumers claiming the failed consumer's streams
Scenario 3: Dynamic Scaling
- Start with 1 consumer and producer
- Add more consumers while producer is running
- Observe load redistribution
Scenario 4: High Load Testing
- Start multiple consumers
- Modify producer to generate more messages
- Monitor consumer statistics
Code Walkthrough
Producer Logic
// Create messages with different priorities
lbsMessage := notifs.LBSMessage{
DataStreamName: fmt.Sprintf("order-stream-%d", messageID),
Info: map[string]interface{}{
"order_id": fmt.Sprintf("order-%d", messageID),
"priority": []string{"low", "normal", "high"}[messageID%3],
// ... other fields
},
}
// Add to LBS
redisClient.XAdd(ctx, &redis.XAddArgs{
Stream: "load-balance-demo-input",
Values: map[string]interface{}{
"lbs-input": string(messageData),
},
})
Consumer Processing
// Handle new stream assignment
case notifs.StreamAdded:
go handleStreamAdded(ctx, client, notification.Payload.(string))
// Handle claiming from failed consumer
case notifs.StreamExpired:
client.Claim(ctx, notification.Payload.(string))
go handleClaimedStream(ctx, client, notification.Payload.(string))
Statistics Tracking
// Track processed streams
var processedStreams sync.Map
var streamCount int32
// Update counters
processedStreams.Store(streamName, time.Now())
atomic.AddInt32(&streamCount, 1)
Monitoring and Debugging
Redis CLI Commands
# Check LBS stream
redis-cli XINFO STREAM load-balance-demo-input
# Check consumer group
redis-cli XINFO GROUPS load-balance-demo-input
# Check pending messages
redis-cli XPENDING load-balance-demo-input load-balance-demo-group
# Monitor keyspace notifications
redis-cli --csv psubscribe '__keyevent@0__:expired'
Environment Variables
# Set unique consumer IDs
export POD_NAME=consumer-$(hostname)-$(date +%s)
# Enable debug logging
export DEBUG=1
Performance Tuning
Consumer Optimization
- Adjust processing times based on your use case
- Implement proper error handling and retries
- Use connection pooling for database operations
Producer Optimization
- Batch message production
- Use pipelining for better throughput
- Monitor Redis memory usage
Redis Configuration
# Increase memory if needed
redis-cli CONFIG SET maxmemory 1gb
# Optimize for streams
redis-cli CONFIG SET stream-node-max-entries 1000
Troubleshooting
No Load Balancing
- Ensure all consumers use the same service name
- Check that consumers have unique POD_NAME values
- Verify Redis keyspace notifications are enabled
Messages Not Processing
- Check Redis connection and authentication
- Verify LBS stream exists:
redis-cli XLEN load-balance-demo-input - Check for errors in consumer logs
High Memory Usage
- Monitor stream lengths:
redis-cli XLEN <stream-name> - Implement proper message acknowledgment
- Consider stream trimming for old messages
Next Steps
After understanding load balancing, check out:
- Failure Recovery Example - Advanced failure handling
- Basic Usage Example - Fundamental concepts
Documentation
¶
There is no documentation for this package.
Click to show internal directories.
Click to hide internal directories.