README
¶
Redis Stream Client Go - Examples
This directory contains practical examples demonstrating how to use the Redis Stream Client Go library in various scenarios. These are demo only.
Examples Overview
1. Basic Usage
Simple example showing how to set up a client, initialize it, and handle basic stream operations.
2. Load Balancing
Demonstrates how multiple consumers work together to process streams in a load-balanced fashion.
Prerequisites
Before running the examples, ensure you have:
- Go 1.22 or later installed
- Redis server running (version 6.0+)
- Environment variables set:
export POD_NAME=example-consumer-1 # OR export POD_IP=127.0.0.1
Running Examples
Each example directory contains:
main.go- The main example codeREADME.md- Specific instructions for that examplego.mod- Go module file (if needed)
Quick Start
-
Start Redis server:
# Using Docker docker run -d -p 6379:6379 redis:7.2.3 # Or using local installation redis-server -
Navigate to an example:
cd examples/basic-usage -
Set environment variables:
export POD_NAME=example-consumer-$(date +%s) -
Run the example:
go run main.go
Common Configuration
All examples use similar Redis configuration:
// Redis client setup
redisClient := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{"localhost:6379"},
DB: 0,
})
// Enable keyspace notifications for expired events
redisClient.ConfigSet(ctx, "notify-keyspace-events", "Ex")
// Create Redis Stream Client
client, err := impl.NewRedisStreamClient(redisClient, "my-service")
if err != nil {
// handle error
}
Understanding the Flow
1. Initialization
outputChan, err := client.Init(ctx)
if err != nil {
slog.Error("Failed to initialize client", "error", err)
os.Exit(1)
}
2. Message Processing
The library uses an internal NotificationBroker that safely delivers notifications from multiple concurrent sources (LBS reader, keyspace listener, key extenders) to a single output channel.
for notification := range outputChan {
switch notification.Type {
case notifs.StreamAdded:
// New stream assigned to this consumer
handleNewStream(notification.Payload)
case notifs.StreamExpired:
// Another consumer failed, claim their stream
client.Claim(ctx, notification.Payload)
case notifs.StreamDisowned:
// This consumer lost ownership
handleStreamLoss()
case notifs.StreamTerminated:
// Notification channel is closing
// Check notification.AdditionalInfo["info"] for reason
slog.Info("Channel closing", "reason", notification.AdditionalInfo["info"])
}
}
3. Cleanup
defer client.Done()
The Done() method ensures all pending notifications are drained before the output channel is closed.
Architecture: NotificationBroker
The library uses an internal NotificationBroker to manage concurrent notification sources:
┌─────────────────────┐
│ Key Extenders │────▶│ │
│ (one per stream) │ │ │
└─────────────────────┘ │ │
│ NotificationBroker │────▶ outputChan ────▶ Your App
┌─────────────────────┐ │ │
│ Keyspace Listener │────▶│ Thread-safe │
│ (Redis pub/sub) │ │ Graceful shutdown │
└─────────────────────┘ │ No send panics │
│ │
┌─────────────────────┐ │ │
│ LBS Stream Reader │────▶│ │
└─────────────────────┘ └─────────────────────┘
This ensures:
- Thread-safe writes to the output channel
- No panics when sending to a closed channel during shutdown
- Graceful draining of pending notifications
Testing with Multiple Consumers
To test load balancing and failure recovery:
-
Terminal 1:
export POD_NAME=consumer-1 cd examples/load-balancing go run main.go -
Terminal 2:
export POD_NAME=consumer-2 cd examples/load-balancing go run main.go -
Terminal 3 (Producer):
cd examples/load-balancing go run main.go producer
Troubleshooting
Common Issues
-
"POD_NAME or POD_IP not set"
- Set one of the required environment variables
- These are used to create unique consumer IDs
-
Redis connection errors
- Ensure Redis server is running on localhost:6379
- Check Redis server logs for any issues
-
No messages received
- Verify keyspace notifications are enabled:
CONFIG GET notify-keyspace-events - Should return
Exor similar pattern
- Verify keyspace notifications are enabled:
-
Tests failing
- Ensure no other Redis clients are using the same consumer group
- Clear Redis data:
FLUSHALL
-
Output channel closed unexpectedly
- Check for
StreamTerminatednotifications which contain the reason - Common causes: context cancellation, Redis connection errors
- Check for
Debug Mode
Enable debug logging in examples:
import "log/slog"
// Add at the beginning of main()
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelDebug,
})))
Contributing
When adding new examples:
- Create a new directory under
examples/ - Include a
README.mdwith specific instructions - Add the example to this main README
- Ensure the example is self-contained and well-documented
- Test the example thoroughly
- Handle all notification types including
StreamTerminated
Support
For questions about the examples:
- Check the main README
- Review CONTRIBUTING.md
- Open an issue on GitHub