Documentation
¶
Index ¶
- type Option
- type PendingMesssage
- type RedisStreamMessageQueue
- func (r *RedisStreamMessageQueue) Ack(ctx context.Context, group, messageId string) error
- func (r *RedisStreamMessageQueue) Add(ctx context.Context, message *contracts.Message) error
- func (r *RedisStreamMessageQueue) Consume(ctx context.Context, batchSize int, blockDuration time.Duration, ...)
- func (r *RedisStreamMessageQueue) Delete(ctx context.Context, id string) error
- func (r *RedisStreamMessageQueue) GetMetrics(ctx context.Context) (map[string]interface{}, error)
- func (r *RedisStreamMessageQueue) HeartBeat(ctx context.Context, group, consumerName, messageID string) error
- func (r *RedisStreamMessageQueue) Len() (int64, error)
- func (r *RedisStreamMessageQueue) Purge(ctx context.Context) error
- func (r *RedisStreamMessageQueue) Receive(ctx context.Context, blockDuration time.Duration, batchSize int, ...) ([]contracts.Message, error)
- func (r *RedisStreamMessageQueue) RequireHeartHeartBeat() bool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Option ¶ added in v0.4.1
type Option func(*RedisStreamMessageQueue)
func WithDeleteOnAck ¶ added in v0.4.1
func WithPrefix ¶ added in v0.4.1
func WithReClaimDelay ¶ added in v0.4.1
func WithRedisVersion ¶ added in v0.4.1
type PendingMesssage ¶
type RedisStreamMessageQueue ¶
type RedisStreamMessageQueue struct {
// contains filtered or unexported fields
}
RedisStreamMessageQueue represents a message queue implemented using Redis streams. Messages require heartbeats to prevent reclaiming by other consumers in Redis Streams. (Do not forget to call HeartBeat function, while using Fetch) It supports features like pending message processing, metrics collection, and automatic message acknowledgment and deletion after consumption.
func NewRedisStreamMessageQueueWithOptions ¶ added in v0.4.1
func NewRedisStreamMessageQueueWithOptions(redisClient *redis.Client, options ...Option) *RedisStreamMessageQueue
NewRedisStreamMessageQueueWithOptions creates a new RedisStreamMessageQueue with the provided Redis client and optional configuration. The function initializes the queue with the following default values:
- Stream: "default:queue" (prefix: "default", queue: "queue")
- DeleteOnAck: true (messages are automatically deleted after acknowledgment)
- ReClaimDelay: 5 minutes (delay before reclaiming unacknowledged messages)
- Metrics: A RedisRing is created with a default prefix of "default:metrics:queue"
Use the provided Option functions (e.g., WithPrefix, WithQueue, WithReClaimDelay, WithDeleteOnAck, WithFetchMethod) to override these defaults and customize the behavior of the queue.
Example:
redisClient := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
queue := NewRedisStreamMessageQueueWithOptions(
redisClient,
WithPrefix("myapp"),
WithQueue("myqueue"),
WithReClaimDelay(10*time.Minute),
WithDeleteOnAck(false)
)
func (*RedisStreamMessageQueue) Ack ¶
func (r *RedisStreamMessageQueue) Ack(ctx context.Context, group, messageId string) error
Ack acknowledges the processing of a specific message by its ID.
func (*RedisStreamMessageQueue) Consume ¶
func (r *RedisStreamMessageQueue) Consume(ctx context.Context, batchSize int, blockDuration time.Duration, group, consumerName string, errorChannel chan error, consumer contracts.StreamConsumeFunc)
Consume starts consuming messages from the Redis stream using the provided consumer function. This method manages incoming and pending messages, ensuring their processing or reclamation.
func (*RedisStreamMessageQueue) Delete ¶
func (r *RedisStreamMessageQueue) Delete(ctx context.Context, id string) error
Delete removes a specific message from the Redis stream by its ID.
func (*RedisStreamMessageQueue) GetMetrics ¶
func (r *RedisStreamMessageQueue) GetMetrics(ctx context.Context) (map[string]interface{}, error)
GetMetrics retrieves queue-related metrics, including queue size and message claim delays.
func (*RedisStreamMessageQueue) HeartBeat ¶ added in v0.4.0
func (r *RedisStreamMessageQueue) HeartBeat(ctx context.Context, group, consumerName, messageID string) error
func (*RedisStreamMessageQueue) Len ¶
func (r *RedisStreamMessageQueue) Len() (int64, error)
Len returns the total number of messages in the Redis stream.
func (*RedisStreamMessageQueue) Purge ¶
func (r *RedisStreamMessageQueue) Purge(ctx context.Context) error
Purge deletes all messages from the Redis stream.
func (*RedisStreamMessageQueue) Receive ¶
func (r *RedisStreamMessageQueue) Receive(ctx context.Context, blockDuration time.Duration, batchSize int, group, consumerName string) ([]contracts.Message, error)
Fetch retrieves messages from the Redis stream. It first checks for pending messages and reclaims them if they are idle beyond the reClaimDelay. If no pending messages are available, it fetches new messages from the stream. This method ensures efficient processing of messages by prioritizing pending ones before consuming new messages. Messages fetched using this function may require acknowledgment or deletion based on the queue's configuration.
func (*RedisStreamMessageQueue) RequireHeartHeartBeat ¶ added in v0.4.0
func (r *RedisStreamMessageQueue) RequireHeartHeartBeat() bool