examples/

directory
v0.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: LGPL-2.1, LGPL-2.1-or-later

README

Redis Stream Client Go - Examples

This directory contains practical examples demonstrating how to use the Redis Stream Client Go library in various scenarios. These are demo only.

Examples Overview

1. Basic Usage

Simple example showing how to set up a client, initialize it, and handle basic stream operations.

2. Load Balancing

Demonstrates how multiple consumers work together to process streams in a load-balanced fashion.

Prerequisites

Before running the examples, ensure you have:

  1. Go 1.22 or later installed
  2. Redis server running (version 6.0+)
  3. Environment variables set:
    export POD_NAME=example-consumer-1
    # OR
    export POD_IP=127.0.0.1
    

Running Examples

Each example directory contains:

  • main.go - The main example code
  • README.md - Specific instructions for that example
  • go.mod - Go module file (if needed)
Quick Start
  1. Start Redis server:

    # Using Docker
    docker run -d -p 6379:6379 redis:7.2.3
    
    # Or using local installation
    redis-server
    
  2. Navigate to an example:

    cd examples/basic-usage
    
  3. Set environment variables:

    export POD_NAME=example-consumer-$(date +%s)
    
  4. Run the example:

    go run main.go
    

Common Configuration

All examples use similar Redis configuration:

// Redis client setup
redisClient := redis.NewUniversalClient(&redis.UniversalOptions{
    Addrs: []string{"localhost:6379"},
    DB:    0,
})

// Enable keyspace notifications for expired events
redisClient.ConfigSet(ctx, "notify-keyspace-events", "Ex")

// Create Redis Stream Client
client, err := impl.NewRedisStreamClient(redisClient, "my-service")
if err != nil {
   // handle error
}

Understanding the Flow

1. Initialization
outputChan, err := client.Init(ctx)
if err != nil {
    slog.Error("Failed to initialize client", "error", err)
    os.Exit(1)
}
2. Message Processing

The library uses an internal NotificationBroker that safely delivers notifications from multiple concurrent sources (LBS reader, keyspace listener, key extenders) to a single output channel.

for notification := range outputChan {
    switch notification.Type {
    case notifs.StreamAdded:
        // New stream assigned to this consumer
        handleNewStream(notification.Payload)
    case notifs.StreamExpired:
        // Another consumer failed, claim their stream
        client.Claim(ctx, notification.Payload)
    case notifs.StreamDisowned:
        // This consumer lost ownership
        handleStreamLoss()
    case notifs.StreamTerminated:
        // Notification channel is closing
        // Check notification.AdditionalInfo["info"] for reason
        slog.Info("Channel closing", "reason", notification.AdditionalInfo["info"])
    }
}
3. Cleanup
defer client.Done()

The Done() method ensures all pending notifications are drained before the output channel is closed.

Architecture: NotificationBroker

The library uses an internal NotificationBroker to manage concurrent notification sources:

┌─────────────────────┐     
│  Key Extenders      │────▶│                     │
│  (one per stream)   │     │                     │
└─────────────────────┘     │                     │
                            │  NotificationBroker │────▶ outputChan ────▶ Your App
┌─────────────────────┐     │                     │
│  Keyspace Listener  │────▶│  Thread-safe        │
│  (Redis pub/sub)    │     │  Graceful shutdown  │
└─────────────────────┘     │  No send panics     │
                            │                     │
┌─────────────────────┐     │                     │
│  LBS Stream Reader  │────▶│                     │
└─────────────────────┘     └─────────────────────┘

This ensures:

  • Thread-safe writes to the output channel
  • No panics when sending to a closed channel during shutdown
  • Graceful draining of pending notifications

Testing with Multiple Consumers

To test load balancing and failure recovery:

  1. Terminal 1:

    export POD_NAME=consumer-1
    cd examples/load-balancing
    go run main.go
    
  2. Terminal 2:

    export POD_NAME=consumer-2
    cd examples/load-balancing
    go run main.go
    
  3. Terminal 3 (Producer):

    cd examples/load-balancing
    go run main.go producer
    

Troubleshooting

Common Issues
  1. "POD_NAME or POD_IP not set"

    • Set one of the required environment variables
    • These are used to create unique consumer IDs
  2. Redis connection errors

    • Ensure Redis server is running on localhost:6379
    • Check Redis server logs for any issues
  3. No messages received

    • Verify keyspace notifications are enabled: CONFIG GET notify-keyspace-events
    • Should return Ex or similar pattern
  4. Tests failing

    • Ensure no other Redis clients are using the same consumer group
    • Clear Redis data: FLUSHALL
  5. Output channel closed unexpectedly

    • Check for StreamTerminated notifications which contain the reason
    • Common causes: context cancellation, Redis connection errors
Debug Mode

Enable debug logging in examples:

import "log/slog"

// Add at the beginning of main()
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
    Level: slog.LevelDebug,
})))

Contributing

When adding new examples:

  1. Create a new directory under examples/
  2. Include a README.md with specific instructions
  3. Add the example to this main README
  4. Ensure the example is self-contained and well-documented
  5. Test the example thoroughly
  6. Handle all notification types including StreamTerminated

Support

For questions about the examples:

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL