redisOrderedQueue

package module
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2022 License: MIT Imports: 10 Imported by: 0

README

redis-ordered-queue-go

Priority queue with message-group based partitioning and equal attention guarantee for each message group based on Redis

What is it?

Redis-based ordered queue with support for message priority.

Quality metrics

Quality Gate Status

Code Smells

Technical Debt

Maintainability Rating

Security Rating

Bugs

Vulnerabilities

Reliability Rating

Queue guarantees

Low latency delivery

Based on traffic and amount of consumers, it is possible to reach very low delivery latency. I observed 2-15 millisecond latencies with thousands of consumers, message groups and messages.

Equal attention to all message groups

Message groups are cycled through, each consumer looking at the next available message group in the buffer.

No conflicts between message groups

Message groups are locked for processing before a consumer can process messages associated with that message group. This ensures that when a consumer is processing messages in a message group, no other consumers can see messages in the same message group.

The drawback is that if you only have one message group in your set-up, only one consumer can be active at each moment.

At least once delivery

Message redelivery attempts will take place until it's acknowledged by the consumer.

High performance, low memory footprint

The amount of consumers you can run on each worker node is limited only by the amount of allowed connections on your Redis server.

Infrastructure

The library leverages ioredis for communication with the Redis server.

Usage

package main

import (
	"github.com/go-redis/redis/v8"
	"context"
	"testing"
	"time"
	"fmt"
	"sync/atomic"
	"github.com/zavitax/redis-ordered-queue-go"
)

func main () {
	var redisOptions = &redis.Options{
		Addr: "127.0.0.1:6379",
		Password: "",
		DB: 0,
	};

	msgCount := int64(0)

	var opt = &redisOrderedQueue.Options{
		RedisOptions: redisOptions,
		BatchSize: 10,
		GroupVisibilityTimeout: time.Second * 60,
		PollingTimeout: time.Second * 10,
		ConsumerCount: 5000,
		RedisKeyPrefix: "{redis-ordered-queue}",
		HandleMessage: func (ctx context.Context, data *interface{}, meta *redisOrderedQueue.MessageMetadata) (error) {
			numMsgs := atomic.AddInt64(&msgCount, 1)

			if (numMsgs % 1000 == 0) {
				fmt.Printf("handleMessage: %v messages processed\n", numMsgs)
			}

			time.Sleep(time.Millisecond * 1500)

			return nil
		},
		HandleInvalidMessage: func (ctx context.Context, data *string) (error) {
			fmt.Printf("handleInvalidMessage: data %v\n", *data)

			return nil
		},
	}

	var client, err = redisOrderedQueue.NewClient(context.TODO(), opt);

	if (err != nil) { t.Errorf("NewClient: %v", err); return }

	metricsLoop := func () {
		opt := &redisOrderedQueue.GetMetricsOptions{
			TopMessageGroupsLimit: 10,
		}

		for {
			met, _ := client.GetMetrics(context.TODO(), opt)
			fmt.Printf("Metrics: g: %v, msgs: %v, conc: %v, topbklg: %v, min: %v, avg: %v, max: %v, %v\n",
				met.TrackedMessageGroups,
				met.VisibleMessages,
				met.WorkingConsumers,
				met.TopMessageGroupsMessageBacklogLength,
				met.MinLatency,
				met.AvgLatency,
				met.MaxLatency,
				met)
			time.Sleep(time.Second * 1)
		}
	};

	go metricsLoop();

	for i := 0; i < 1000; i++ {
		fmt.Printf(".")
		client.Send(context.TODO(), "test message", 1, fmt.Sprintf("GO-GROUP-%v", i));
	}

	client.StartConsumers(context.TODO())
	time.Sleep(time.Second * 60 * 5);
  client.StopConsumers(context.TODO())
  client.Close()
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type GetMetricsOptions

type GetMetricsOptions struct {
	TopMessageGroupsLimit int
}

type MessageGroupMetrics

type MessageGroupMetrics struct {
	Group   string
	Backlog int64
}

type MessageMetadata

type MessageMetadata struct {
	MessageContext struct {
		Timestamp time.Time
		Producer  int64
		Sequence  int64
		Latency   time.Duration
		Lock      *lockHandle
	}
}

type Metrics

type Metrics struct {
	BufferedMessageGroups                int64
	TrackedMessageGroups                 int64
	WorkingConsumers                     int64
	VisibleMessages                      int64
	InvalidMessages                      int64
	MinLatency                           time.Duration
	MaxLatency                           time.Duration
	AvgLatency                           time.Duration
	TopMessageGroups                     []*MessageGroupMetrics
	TopMessageGroupsMessageBacklogLength int64
}

type Options

type Options struct {
	RedisOptions           *redis.Options
	BatchSize              int
	GroupVisibilityTimeout time.Duration
	PollingTimeout         time.Duration
	ConsumerCount          int
	RedisKeyPrefix         string
	HandleMessage          func(ctx context.Context, data *interface{}, meta *MessageMetadata) error
	HandleInvalidMessage   func(ctx context.Context, data *string) error
}

func (*Options) Validate

func (o *Options) Validate() error

type RedisQueueClient added in v1.1.0

type RedisQueueClient interface {
	StartConsumers(ctx context.Context) error
	StopConsumers(ctx context.Context) error
	Close() error
	GetMetrics(ctx context.Context, options *GetMetricsOptions) (*Metrics, error)
	Send(ctx context.Context, data interface{}, priority int, groupId string) error
}

func NewClient

func NewClient(ctx context.Context, options *Options) (RedisQueueClient, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL