redisSyncFanoutQueue

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2022 License: MIT Imports: 11 Imported by: 0

README

redis-sync-fanout-queue-go

What is it?

Priority queue with synchronous fanout delivery for each room based on Redis.

This queue is special by two properties:

  1. It delivers each message sent to a room to all subscribers of that room.
  2. It does not deliver the next message until all subscribers of the room ACKnowledge the last message.
  3. It is based entirely on Redis primitives.
  4. Out-of-band messages are also available. They are immediately delivered to all subscribers with no regard to ACKs.
  5. Subscribers can be Sync = true (blocking, thus requiring an ACK) or Sync = false (non-blocking, thus requiring an ACK).

This allows building distributed systems where edges process messages in a coordinated lock-step with each other.

Queue guarantees

Low latency delivery

Delivery is based on Redis PUBSUB. It is possible to reach very low latencies.

Synchronized Fanout

All synchronous clients must ACKnowledge processing of a message before any other client can see the next message.

At most once delivery

There are no message redelivery attempts built in. Either you get it or you do not.

High performance, low memory footprint

Most of the heavy lifting is done in Redis.

Infrastructure

The library leverages ioredis for communication with the Redis server.

Usage

package main

import (
	"github.com/go-redis/redis/v8"
	"context"
	"time"
	"github.com/zavitax/redis-sync-fanout-queue-go"
	"fmt"
)

var testMessageContent = "test message content"
var testRoomId = "GO-ROOM-TEST"

var redisOptions = &redis.Options{
	Addr: "127.0.0.1:6379",
	Password: "",
	DB: 0,
};

func createQueueOptions (
	testId string,
) (*redisSyncFanoutQueue.Options) {
	result := &redisSyncFanoutQueue.Options{
		RedisOptions: redisOptions,
		ClientTimeout: time.Second * 15,
		RedisKeyPrefix: fmt.Sprintf("{test-redis-sync-fanout-queue}::%v", testId),
		Sync: true,
	}

	return result
}

func createQueueClient (options *redisSyncFanoutQueue.Options) (redisSyncFanoutQueue.RedisQueueClient, error) {
	return redisSyncFanoutQueue.NewClient(context.TODO(), options);
}

func Main () {
	var minReceivedMsgCount = int64(1)
	var receivedMsgCount int64

	options := createQueueOptions(
		"TestSendReceive",
	)

	client, err := createQueueClient(options)

	if (err != nil) { return }

	defer client.Close()

	err = client.Subscribe(context.TODO(), testRoomId, func (ctx context.Context, msg *redisSyncFanoutQueue.Message) (error) {
		fmt.Printf("Received: %v", msg.Data)

		msg.Ack(ctx)

		return nil
	})

	if (err != nil) { return }

	client.Send(context.TODO(), testRoomId, testMessageContent, 1);

	for i := 0; i < 10 && receivedMsgCount < minReceivedMsgCount; i++ {
		time.Sleep(time.Second * 1)
	}

	err = client.Unsubscribe(context.TODO(), testRoomId)

	if (err != nil) { return }
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AckMessageFunc

type AckMessageFunc func(ctx context.Context) error

type GetMetricsOptions

type GetMetricsOptions struct {
	TopRoomsLimit int
}

type HandleMessageFunc

type HandleMessageFunc func(ctx context.Context, msg *Message) error

type HandleRoomEjectedFunc

type HandleRoomEjectedFunc func(ctx context.Context, room *string) error

type Message

type Message struct {
	Data           *interface{}
	Ack            AckMessageFunc
	MessageContext struct {
		Timestamp time.Time
		Producer  string
		Sequence  int64
		Latency   time.Duration
	}
}

type Metrics

type Metrics struct {
	KnownRoomsCount int64

	SubscribedRoomsCount  int
	ReceivedMessagesCount int64
	InvalidMessagesCount  int64

	MinLatency time.Duration
	MaxLatency time.Duration
	AvgLatency time.Duration

	TopRooms                             []*RoomMetrics
	TopRoomsPendingMessagesBacklogLength int64
}

type Options

type Options struct {
	RedisOptions   *redis.Options
	ClientTimeout  time.Duration
	Sync           bool
	RedisKeyPrefix string
	// HandleMessage HandleMessageFunc
	HandleRoomEjected HandleRoomEjectedFunc
}

func (*Options) Validate

func (o *Options) Validate() error

type RedisQueueClient

type RedisQueueClient interface {
	Subscribe(ctx context.Context, room string, msgHandlerFunc HandleMessageFunc) error
	Unsubscribe(ctx context.Context, room string) error
	Close() error
	Pong(ctx context.Context) error
	GetMetrics(ctx context.Context, options *GetMetricsOptions) (*Metrics, error)
	Send(ctx context.Context, room string, data interface{}, priority int) error
	SendOutOfBand(ctx context.Context, room string, data interface{}) error
	Peek(ctx context.Context, room string, offset int, limit int) ([]*Message, error)
}

func NewClient

func NewClient(ctx context.Context, options *Options) (RedisQueueClient, error)

type RoomMetrics

type RoomMetrics struct {
	Room                         string
	PendingMessagesBacklogLength int64
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL