Interactive demo: https://g.co/gemini/share/999b256cffd3
redis-stream-client-go

A redis stream based client that can recover from failures. This lib is based on go-redis and redsync
Redis streams are awesome! Typically they are used for data written in one end and consumed at other.

When one (or more) of the consumers fail (crash, get stuck for abnormal period of time), the way to recover is by using XCLAIM (and XAUTOCLAIM) per redis streams. This supposes that your consumers are stateful i.e. they know who they are via a dedicated machine name or IP. This way using XPENDING and XCLAIM you can recover from failed or stuck situations.

However, there are two requirements that it doesn't meet:
- Recovery depends on how soon the crashed consumer can come back up and claim. This is normally a small time (few seconds) but sometimes it can be high due to startup logic.
- When a consumer gets stuck (GC or some such stop-the-world process) then the processing is stuck.
In both situations above, there are other consumers waiting and perhaps availble who can claim and continue processing in real-time. However, due to redis' pull based mechanism they don't know if they need to.
This library aims to provide two such constructs built on top of redis' own data structures:
- Inform other consumers that a consumer is dead or stuck via key space notifications.
- Provide API to claim the stream being processed.

In addition to this, for better management, the library provides a load balancer stream (LBS) based on redis streams and consumer groups that work in a load balanced fashion which can distribute incoming streams (not stream data!) among existing consumers using round-robin fashion.

usage
Just import the library:
go get https://github.com/handcoding-labs/redis-stream-client-go
Create the client:
import rsc "github.com/handcoding-labs/redis-stream-client-go/impl"
client := rsc.NewRedisStreamClient(<go redis client>, <service_name>)
Initialize the client and use the LBC and Key space notification channel for tracking which data streams to read and which have expired respectively:
outputChan, err := client.Init(ctx)
There are currently three types of notifications sent on outputChan:
StreamAdded - When a new stream gets added to LBS. You should take the stream and start reading your data from it using standard XREAD or XREADGROUP commands as applicable.
StreamExpired - When a client's ownership of stream expires and it relinquishes the lock. This is sent when key space notification arrives on stream expiry. Other clients should process this and take ownership of the stream by using Claim API.
StreamDisowned - When a client gets stuck (not crashed) and thus automatically relinquishes ownership, another active client will claim it. When the old client comes back, it will fail to extend the lock and thus will be informed that it now doesn't own the stream. The old client should gracefully exit by calling Done API.
claiming
When you receive a StreamExpired notification, you can claim the expired stream using the mutex key from the notification payload:
err := client.Claim(ctx, <ksp_notification_payload>)
The <ksp_notification_payload> is in the format data_stream_name:message_id_in_lbs. An error in Claim indicates the client was not successful in claiming the stream as some other client got there before.
stream lifecycle management
The library provides granular control over stream lifecycle:
processing individual streams
After processing is done for a specific data stream, call DoneStream to mark the end of processing for that particular stream:
err := client.DoneStream(ctx, <data_stream_name>)
This method:
- Unlocks the distributed lock for the stream
- Acknowledges the message in the LBS stream
- Cleans up internal state for that specific stream
client shutdown
When the client is shutting down completely, call Done to clean up all streams handled by the client:
err := client.Done()
This method calls DoneStream for all active streams and then performs additional cleanup like closing channels and canceling contexts.
Method ID() can be used to obtain client ID for logging purposes:
client.ID()