redisSyncFanoutQueue

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2022 License: MIT Imports: 13 Imported by: 0

README

redis-sync-fanout-queue-go

What is it?

Priority queue with synchronous fanout delivery for each room based on Redis.

This queue is special by several key properties:

  1. It delivers each message sent to a room to all subscribers of that room.
  2. It does not deliver the next message until all subscribers of the room ACKnowledge the last message.
  3. It is based entirely on Redis primitives.
  4. Out-of-band messages are also available. They are immediately delivered to all subscribers with no regard to ACKs.
  5. Subscribers can be Sync = true (blocking, thus requiring an ACK) or Sync = false (non-blocking, thus requiring an ACK).
  6. Supports sharded Redis clusters out-of-the-box

This allows building distributed systems where edges process messages in a coordinated lock-step with each other.

Queue guarantees

Low latency delivery

Delivery is based on Redis PUBSUB. It is possible to reach very low latencies.

Synchronized Fanout

All synchronous clients must ACKnowledge processing of a message before any other client can see the next message.

At most once delivery

There are no message redelivery attempts built in. Either you get it or you do not.

High performance, low memory footprint

Most of the heavy lifting is done in Redis.

Infrastructure

The library leverages ioredis for communication with the Redis server.

Usage

Simple use example
package main

import (
	"github.com/go-redis/redis/v8"
	"context"
	"time"
	"github.com/zavitax/redis-sync-fanout-queue-go"
	"fmt"
)

var testMessageContent = "test message content"
var testRoomId = "GO-ROOM-TEST"

var redisOptions = &redis.Options{
	Addr: "127.0.0.1:6379",
	Password: "",
	DB: 0,
};

func createQueueOptions (
	testId string,
) (*redisSyncFanoutQueue.Options) {
	result := &redisSyncFanoutQueue.Options{
		RedisOptions: redisOptions,
		ClientTimeout: time.Second * 15,
		RedisKeyPrefix: fmt.Sprintf("{test-redis-sync-fanout-queue}::%v", testId),
		Sync: true,
	}

	return result
}

func createQueueClient (options *redisSyncFanoutQueue.Options) (redisSyncFanoutQueue.RedisQueueClient, error) {
	return redisSyncFanoutQueue.NewClient(context.TODO(), options);
}

func Main () {
	var minReceivedMsgCount = int64(1)
	var receivedMsgCount int64

	options := createQueueOptions(
		"TestSendReceive",
	)

	client, err := createQueueClient(options)

	if (err != nil) { return }

	defer client.Close()

	err = client.Subscribe(context.TODO(), testRoomId, func (ctx context.Context, msg *redisSyncFanoutQueue.Message) (error) {
		fmt.Printf("Received: %v", msg.Data)

		msg.Ack(ctx)

		return nil
	})

	if (err != nil) { return }

	client.Send(context.TODO(), testRoomId, testMessageContent, 1);

	for i := 0; i < 10 && receivedMsgCount < minReceivedMsgCount; i++ {
		time.Sleep(time.Second * 1)
	}

	err = client.Unsubscribe(context.TODO(), testRoomId)

	if (err != nil) { return }
}
Sharded use example

With a simple change (replacing NewClient with NewShardedClient and passing an instance of RedisQueueShardsProvider created by a call to NewRedisClusterShardProvider) you are fully set-up to utilize a Redis cluster.

This is based on appending a ::{slot-SHARD_ID} suffix to Options.RedisKeyPrefix, telling Redis "hey, assign a slot based on the "{slot-SHARD_ID}" string".

Shard IDs are calculated on a per-room basis.

package main

import (
	"github.com/go-redis/redis/v8"
	"context"
	"time"
	"github.com/zavitax/redis-sync-fanout-queue-go"
	"fmt"
)

var testMessageContent = "test message content"
var testRoomId = "GO-ROOM-TEST"

var redisOptions = &redis.Options{
	Addr: "127.0.0.1:6379",
	Password: "",
	DB: 0,
};

func createQueueOptions (
	testId string,
) (*redisSyncFanoutQueue.Options) {
	result := &redisSyncFanoutQueue.Options{
		RedisOptions: redisOptions,
		ClientTimeout: time.Second * 15,
		RedisKeyPrefix: fmt.Sprintf("test-redis-sync-fanout-queue-sharded::%v", testId),
		Sync: true,
	}

	return result
}

func createQueueClient (options *redisSyncFanoutQueue.Options) (redisSyncFanoutQueue.RedisQueueClient, error) {
	if shardProvider, err := redisSyncFanoutQueue.NewRedisClusterShardProvider(
		context.TODO(),
		options,
		100,
	); err != nil {
		return nil, err
	} else {
		return redisSyncFanoutQueue.NewShardedClient(context.TODO(), shardProvider)
	}
}

func Main () {
	var minReceivedMsgCount = int64(1)
	var receivedMsgCount int64

	options := createQueueOptions(
		"TestSendReceive",
	)

	client, err := createQueueClient(options)

	if (err != nil) { return }

	defer client.Close()

	err = client.Subscribe(context.TODO(), testRoomId, func (ctx context.Context, msg *redisSyncFanoutQueue.Message) (error) {
		fmt.Printf("Received: %v", msg.Data)

		msg.Ack(ctx)

		return nil
	})

	if (err != nil) { return }

	client.Send(context.TODO(), testRoomId, testMessageContent, 1);

	for i := 0; i < 10 && receivedMsgCount < minReceivedMsgCount; i++ {
		time.Sleep(time.Second * 1)
	}

	err = client.Unsubscribe(context.TODO(), testRoomId)

	if (err != nil) { return }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AckMessageFunc

type AckMessageFunc func(ctx context.Context) error

type GetMetricsOptions

type GetMetricsOptions struct {
	TopRoomsLimit int
}

type HandleMessageFunc

type HandleMessageFunc func(ctx context.Context, msg *Message) error

type HandleRoomEjectedFunc

type HandleRoomEjectedFunc func(ctx context.Context, room *string) error

type Message

type Message struct {
	Data           *interface{}
	Ack            AckMessageFunc
	MessageContext struct {
		Timestamp time.Time
		Producer  string
		Sequence  int64
		Latency   time.Duration
		Room      string
	}
}

type Metrics

type Metrics struct {
	KnownRoomsCount int64

	SubscribedRoomsCount  int
	ReceivedMessagesCount int64
	InvalidMessagesCount  int64

	MinLatency time.Duration
	MaxLatency time.Duration
	AvgLatency time.Duration

	TopRooms                             []*RoomMetrics
	TopRoomsPendingMessagesBacklogLength int64
}

type Options

type Options struct {
	RedisOptions   *redis.Options
	ClientTimeout  time.Duration
	Sync           bool
	RedisKeyPrefix string
	// HandleMessage HandleMessageFunc
	HandleRoomEjected HandleRoomEjectedFunc
}

func (*Options) Validate

func (o *Options) Validate() error

type RedisQueueClient

type RedisQueueClient interface {
	Subscribe(ctx context.Context, room string, msgHandlerFunc HandleMessageFunc) error
	Unsubscribe(ctx context.Context, room string) error
	Close() error
	GetMetrics(ctx context.Context, options *GetMetricsOptions) (*Metrics, error)
	Send(ctx context.Context, room string, data interface{}, priority int) error
	SendOutOfBand(ctx context.Context, room string, data interface{}) error
	Peek(ctx context.Context, room string, offset int, limit int) ([]*Message, error)
}

func NewClient

func NewClient(ctx context.Context, options *Options) (RedisQueueClient, error)

func NewShardedClient added in v0.2.0

func NewShardedClient(ctx context.Context, shardProvider RedisQueueShardsProvider) (RedisQueueClient, error)

type RedisQueueShardsProvider added in v0.2.0

type RedisQueueShardsProvider interface {
	GetTotalShardsCount(ctx context.Context) (uint32, error)
	GetRoomShardId(ctx context.Context, room string) (uint32, error)
	GetShardOptions(ctx context.Context, shardId uint32) (*Options, error)
}

func NewRedisClusterShardProvider added in v0.2.0

func NewRedisClusterShardProvider(ctx context.Context, options *Options, totalShardsCount uint32) (RedisQueueShardsProvider, error)

type RoomMetrics

type RoomMetrics struct {
	Room                         string
	PendingMessagesBacklogLength int64
}

type ShardingOptions added in v0.2.0

type ShardingOptions struct {
	TotalShardsCount int
}

func (*ShardingOptions) Validate added in v0.2.0

func (o *ShardingOptions) Validate() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL