redisstream

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2025 License: GPL-3.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option added in v0.4.1

type Option func(*RedisStreamMessageQueue)

func WithDeleteOnAck added in v0.4.1

func WithDeleteOnAck(deleteOnAck bool) Option

func WithPrefix added in v0.4.1

func WithPrefix(prefix string) Option

func WithQueue added in v0.4.1

func WithQueue(queue string) Option

func WithReClaimDelay added in v0.4.1

func WithReClaimDelay(reClaimDelay time.Duration) Option

func WithRedisVersion added in v0.4.1

func WithRedisVersion(version string) Option

type PendingMesssage

type PendingMesssage struct {
	ID         string
	Idle       time.Duration
	RetryCount int64
}

type RedisStreamMessageQueue

type RedisStreamMessageQueue struct {
	// contains filtered or unexported fields
}

RedisStreamMessageQueue represents a message queue implemented using Redis streams. Messages require heartbeats to prevent reclaiming by other consumers in Redis Streams. (Do not forget to call HeartBeat function, while using Fetch) It supports features like pending message processing, metrics collection, and automatic message acknowledgment and deletion after consumption.

func NewRedisStreamMessageQueue

func NewRedisStreamMessageQueue(redisClient *redis.Client, prefix, queue string, reClaimDelay time.Duration, deleteOnAck bool) *RedisStreamMessageQueue

func NewRedisStreamMessageQueueWithOptions added in v0.4.1

func NewRedisStreamMessageQueueWithOptions(redisClient *redis.Client, options ...Option) *RedisStreamMessageQueue

NewRedisStreamMessageQueueWithOptions creates a new RedisStreamMessageQueue with the provided Redis client and optional configuration. The function initializes the queue with the following default values:

  • Stream: "default:queue" (prefix: "default", queue: "queue")
  • DeleteOnAck: true (messages are automatically deleted after acknowledgment)
  • ReClaimDelay: 5 minutes (delay before reclaiming unacknowledged messages)
  • Metrics: A RedisRing is created with a default prefix of "default:metrics:queue"

Use the provided Option functions (e.g., WithPrefix, WithQueue, WithReClaimDelay, WithDeleteOnAck, WithFetchMethod) to override these defaults and customize the behavior of the queue.

Example:

redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
queue := NewRedisStreamMessageQueueWithOptions(
    redisClient,
    WithPrefix("myapp"),
    WithQueue("myqueue"),
    WithReClaimDelay(10*time.Minute),
    WithDeleteOnAck(false)
)

func (*RedisStreamMessageQueue) Ack

func (r *RedisStreamMessageQueue) Ack(ctx context.Context, group, messageId string) error

Ack acknowledges the processing of a specific message by its ID.

func (*RedisStreamMessageQueue) Add

Add to queue and returns message ID

func (*RedisStreamMessageQueue) Consume

func (r *RedisStreamMessageQueue) Consume(ctx context.Context,
	batchSize int, blockDuration time.Duration,
	group, consumerName string,
	errorChannel chan error,
	consumer contracts.StreamConsumeFunc)

Consume starts consuming messages from the Redis stream using the provided consumer function. This method manages incoming and pending messages, ensuring their processing or reclamation.

func (*RedisStreamMessageQueue) Delete

func (r *RedisStreamMessageQueue) Delete(ctx context.Context, id string) error

Delete removes a specific message from the Redis stream by its ID.

func (*RedisStreamMessageQueue) GetMetrics

func (r *RedisStreamMessageQueue) GetMetrics(ctx context.Context) (map[string]interface{}, error)

GetMetrics retrieves queue-related metrics, including queue size and message claim delays.

func (*RedisStreamMessageQueue) HeartBeat added in v0.4.0

func (r *RedisStreamMessageQueue) HeartBeat(ctx context.Context, group, consumerName, messageID string) error

func (*RedisStreamMessageQueue) Len

func (r *RedisStreamMessageQueue) Len() (int64, error)

Len returns the total number of messages in the Redis stream.

func (*RedisStreamMessageQueue) Purge

Purge deletes all messages from the Redis stream.

func (*RedisStreamMessageQueue) Receive

func (r *RedisStreamMessageQueue) Receive(ctx context.Context,
	blockDuration time.Duration, batchSize int, group, consumerName string) ([]contracts.Message, error)

Fetch retrieves messages from the Redis stream. It first checks for pending messages and reclaims them if they are idle beyond the reClaimDelay. If no pending messages are available, it fetches new messages from the stream. This method ensures efficient processing of messages by prioritizing pending ones before consuming new messages. Messages fetched using this function may require acknowledgment or deletion based on the queue's configuration.

func (*RedisStreamMessageQueue) RequireHeartHeartBeat added in v0.4.0

func (r *RedisStreamMessageQueue) RequireHeartHeartBeat() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL