redstream

package module
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2025 License: MIT Imports: 17 Imported by: 0

README

Redis Stream

A Golang library for robust Redis Streams consumption with:

  1. Consumer Groups for once-per-message processing.
  2. Redsync locks to prevent duplicates across multiple workers.
  3. Auto-reclaim (XAUTOCLAIM) to recover stuck pending messages.
  4. Optional ephemeral “publish lock” to skip concurrent duplicates.
  5. Exponential backoff for repeated failures.
  6. Dead Letter Queue (DLQ) support for over-limit messages.
  7. Universal Client for single-node, cluster, or sentinel setups.
  8. Concurrency Limit to throttle simultaneous message handling.

Installation

go get github.com/smarter-day/redstream

Quick Usage Example

Below is a simplified demonstration:

package main

import (
    "context"
    "log"

    "github.com/redis/go-redis/v9"
    "github.com/smarter-day/redstream"
)

func main() {
    ctx := context.Background()

    cfg := redstream.Config{
        StreamName:      "myStream",
        GroupName:       "myGroup",
        ConsumerName:    "myConsumer",
        EnableReclaim:   true,
        ReclaimStr:      "5s",
        ReclaimCount:    10,
        MaxReclaimAttempts: 3,
        // Control concurrency (optional). If > 0, RedStream limits simultaneous handling.
        MaxConcurrency:  5,
        // Optional DLQ handler for messages that exceed attempts
        DLQHandler: func(ctx context.Context, msg *redis.XMessage) error {
            log.Printf("DLQ got message ID=%s\n", msg.ID)
            // e.g. write to a separate "errors" stream
            return nil
        },
    }

    // Provide *redis.UniversalOptions for single node, cluster, or sentinel
    universalOpts := &redis.UniversalOptions{Addrs: []string{"localhost:6379"}}

    stream := redstream.New(universalOpts, cfg)

    // Register a handler that processes messages
    stream.RegisterHandler(func(ctx context.Context, msg map[string]string) error {
        // Return an error to test retries or DLQ.
        return nil
    })

    if err := stream.StartConsumer(ctx); err != nil {
        log.Fatal("Cannot start consumer:", err)
    }

    // Publish
    msgID, err := stream.Publish(ctx, map[string]string{"foo": "bar"})
    if err != nil {
        log.Println("Publish error:", err)
    } else {
        log.Println("Published with ID:", msgID)
    }

    // Block forever
    select {}
}

High-Level Flow

  1. Publish: By default calls XADD. If DropConcurrentDuplicates=true, it uses a brief Redsync lock to skip duplicates that arrive almost simultaneously.
  2. Consume: Worker(s) call XREADGROUP. Each message is protected by a Redsync lock so only one worker processes it. Success => XACK + XDEL; failure => attempt tracking & exponential backoff.
  3. Reclaim: If EnableReclaim=true, a background loop calls XAUTOCLAIM to rescue stuck messages. Each is retried until MaxReclaimAttempts.
  4. Concurrency Limit (optional): If MaxConcurrency > 0, the library throttles simultaneous message processing in each worker, reducing the risk of goroutine overload.

Configuration

Configure via redstream.Config plus a *redis.UniversalOptions. Important fields include:

Field Default Description
StreamName required Redis stream name.
GroupName required Consumer group name (auto-created at 0-0 if missing).
ConsumerName auto-generated Must be unique across consumers.
LockExpiryStr "10s" Redsync lock expiration for message-level locks.
LockExtendStr half of LockExpiryStr Interval to keep the lock alive via ExtendContext.
BlockDurationStr "5s" How long XREADGROUP waits if no new messages arrive.
ReclaimStr "5s" Frequency of auto-reclaim if EnableReclaim=true.
EnableReclaim false Whether to run reclaim logic.
CleanOldReclaimDuration "1h" Age threshold for removing stale reclaim records.
MaxReclaimAttempts 3 After exceeding this, messages are removed or go to DLQ.
ReclaimCount 10 Number of messages to handle per XAUTOCLAIM pass.
ReclaimMaxExponentialFactor 3 Max exponent for backoff. E.g. factor = min(2^attempts, 8).
DLQHandler nil Optional function for messages beyond MaxReclaimAttempts.
IgnoreDLQHandlerErrors false If true, remove message even if DLQ fails. Otherwise, keep it pending.
DropConcurrentDuplicates false If true, ephemeral lock on Publish(...) to skip near-simultaneous duplicates.
MaxConcurrency 10 If > 0, the library uses a channel-based limit for concurrency in processMessage.
UseRedisIdAsUniqueID false If true, - uses redis.XMessage -> ID as unique ID to not repeat processing of duplicates.
Otherwise calculates full sha256 from entire message payload.

(Your *redis.UniversalOptions can specify single node, cluster, or sentinel.)


Deeper Explanation & Best Practices

  • Universal Client:
    redis.NewUniversalClient(...) lets you toggle between single-node, cluster, or sentinel seamlessly.
  • Consumer Groups & PEL:
    Redis Streams track unacknowledged messages in a Pending Entries List (PEL). If a consumer crashes, messages remain pending.
  • Redsync Locking:
    Ensures only one worker (even in a large pool) processes each message. This helps if XAUTOCLAIM reassigns the same message to multiple consumers.
  • Exponential Backoff:
    On repeated failures, we store the next earliest time to reprocess each message. If that time is in the future, we skip reprocessing for now.
  • Dead Letter Queue:
    After MaxReclaimAttempts, messages are removed or passed to DLQHandler. If IgnoreDLQHandlerErrors=false, we keep them in pending if the DLQ fails.
  • Concurrency Limiting:
    By using MaxConcurrency, each consumer instance only handles up to N messages simultaneously. This helps if you have a CPU- or I/O-intensive handler, preventing goroutine overload.
  • Cleanup:
    Reclaim attempts are stored in a Redis hash. Stale entries older than CleanOldReclaimDuration are removed automatically.
  • Performance Tuning:
    Adjust intervals (LockExpiryStr, ReclaimStr, etc.) to match your throughput. For large streams, ensure your concurrency and backoff are tuned to avoid overload or excessive retries.

License

MIT License – open for adaptation & improvement.

Issues and contributions are always welcome!

Documentation

Index

Constants

View Source
const (
	RedisKeysPrefix = "redstream"
)

Variables

View Source
var (
	Validate *validator.Validate
)

Functions

func CreateGroupIfNotExists

func CreateGroupIfNotExists(client redis.Cmdable, stream, group string) error

CreateGroupIfNotExists attempts to create a consumer group for a Redis stream if it doesn't already exist. It uses the XGroupCreateMkStream command, which creates both the stream and the group if they don't exist.

Parameters:

  • client: A pointer to a redis.Client instance used to execute Redis commands.
  • stream: The name of the Redis stream to create or use.
  • group: The name of the consumer group to create.

Returns:

  • error: nil if the group was created successfully or already exists, otherwise returns an error.

func ParseDurationOrDefault

func ParseDurationOrDefault(s *string, def time.Duration) time.Duration

ParseDurationOrDefault attempts to parse a string into a time.Duration. If parsing fails or results in a non-positive duration, it returns the default value.

Parameters:

  • s: A string representation of a duration (e.g., "5s", "1m30s").
  • def: The default duration to return if parsing fails or results in a non-positive value.

Returns:

A time.Duration parsed from the input string, or the default value if parsing fails.

func UniqueConsumerName

func UniqueConsumerName(base string) string

UniqueConsumerName generates a unique consumer name for use in Redis streams. It combines a base name with the hostname, process ID, and a random suffix to ensure uniqueness across different instances and executions.

Parameters:

  • base: A string that serves as the prefix for the generated consumer name.

Returns:

A string representing a unique consumer name in the format:
"<base>-<hostname>-<pid>-<random_suffix>"

func ValidateDuration

func ValidateDuration(fl validator.FieldLevel) bool

ValidateDuration checks if a given field can be parsed as a valid duration.

It takes a validator.FieldLevel as an argument, which provides access to the field being validated and its metadata.

Parameters:

  • fl: A validator.FieldLevel interface that allows access to the field being validated, as well as the struct it belongs to.

Returns:

  • bool: true if the field can be parsed as a valid duration, false otherwise.

Types

type Config

type Config struct {
	StreamName   string `validate:"required"`
	GroupName    string `validate:"required"`
	ConsumerName string `validate:"required"`

	MaxConcurrency      int
	UseDistributedLock  bool // if false => skip the redsnyc-based lock
	NoProgressThreshold int  // after how many consecutive zero-reclaims do we call readPendingMessagesOnce

	LockExpiryStr    string `validate:"duration"`
	LockExtendStr    string `validate:"duration"`
	BlockDurationStr string `validate:"duration"`

	EnableReclaim               bool
	ReclaimStr                  string `validate:"duration"`
	CleanOldReclaimDuration     string `validate:"duration"`
	MaxReclaimAttempts          int
	ReclaimCount                int64
	ReclaimMaxExponentialFactor int

	DLQHandler               DLQHandlerFn
	IgnoreDLQHandlerErrors   bool
	DropConcurrentDuplicates bool

	ProcessedIdsMaxAgeStr string `validate:"duration"`
	UseRedisIdAsUniqueID  bool

	StaleConsumerIdleThresholdStr string `validate:"duration"`
	EnableStaleConsumerCleanup    bool

	EnableAutoRejoinOnRemoved  bool
	AutoRejoinCheckIntervalStr string `validate:"duration"`
}

Config represents the configuration for the RedStream.

type DLQHandlerFn

type DLQHandlerFn func(ctx context.Context, msg *redis.XMessage) error

type IRedStream

type IRedStream interface {
	StartConsumer(ctx context.Context) error
	RegisterHandler(handler func(ctx context.Context, msg map[string]string) error)
	Publish(ctx context.Context, data any) (string, error)
	HealthCheck(ctx context.Context) error

	UseDebug(fn LogFn)
	UseInfo(fn LogFn)
	UseError(fn LogFn)
}

func New

func New(redisOptions *redis.UniversalOptions, cfg Config) IRedStream

New creates and initializes a new IRedStream instance.

It sets up the Redis client, configures distributed locking if enabled, parses and validates configuration values, creates the consumer group if it doesn't exist, and registers Lua scripts.

Parameters:

  • redisOptions: A pointer to redis.UniversalOptions containing Redis connection settings.
  • cfg: A Config struct containing the configuration for the RedStream.

Returns:

  • An IRedStream interface implementation. If there's an error during initialization, it returns nil and logs a fatal error.

type LogFn

type LogFn func(ctx context.Context, args ...interface{}) error

type RedisStream

type RedisStream struct {
	Client redis.UniversalClient
	Rs     *redsync.Redsync
	Cfg    Config

	LockExpiry              time.Duration
	LockExtend              time.Duration
	BlockDuration           time.Duration
	ReclaimInterval         time.Duration
	CleanOldReclaimDuration time.Duration
	ProcessedIdsMaxAge      time.Duration
	// contains filtered or unexported fields
}

RedisStream is the main struct implementing IRedStream.

func (*RedisStream) HealthCheck

func (r *RedisStream) HealthCheck(ctx context.Context) error

HealthCheck performs a health check on the Redis connection by sending a PING command. It verifies if the Redis server is responsive and the connection is active.

Parameters:

  • ctx: A context.Context for handling cancellation and timeouts.

Returns:

  • error: An error if the PING command fails or the connection is not healthy, nil if the health check is successful.

func (*RedisStream) Publish

func (r *RedisStream) Publish(ctx context.Context, data any) (string, error)

Publish adds a new message to the Redis stream. It optionally checks for and drops concurrent duplicates.

The function marshals the provided data into JSON, optionally acquires a lock to prevent concurrent duplicates, and then adds the message to the stream using Redis XADD command.

Parameters:

  • ctx: A context.Context for handling cancellation and timeouts.
  • data: Any data type that can be marshaled into JSON to be published as a message.

Returns:

  • string: The ID of the published message in the Redis stream. Empty if skipped due to duplication.
  • error: An error if any step in the publishing process fails, nil otherwise.

func (*RedisStream) RegisterHandler

func (r *RedisStream) RegisterHandler(handler func(ctx context.Context, msg map[string]string) error)

RegisterHandler sets the handler for new messages.

func (*RedisStream) StartConsumer

func (r *RedisStream) StartConsumer(ctx context.Context) error

StartConsumer initiates the consumer process for the Redis stream. It starts goroutines for listening to new messages, reclaiming pending messages (if enabled), and performing periodic cleanup of processed message IDs.

This function sets up the necessary channels and goroutines to handle message processing concurrency, new message consumption, and maintenance tasks.

Parameters:

  • ctx: A context.Context for handling cancellation and timeouts.

Returns:

  • error: An error if the handler function is not set, nil otherwise.

func (*RedisStream) UseDebug

func (r *RedisStream) UseDebug(fn LogFn)

func (*RedisStream) UseError

func (r *RedisStream) UseError(fn LogFn)

func (*RedisStream) UseInfo

func (r *RedisStream) UseInfo(fn LogFn)

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL