redisSyncFanoutQueue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2023 License: MIT Imports: 11 Imported by: 0

README

redis-sync-fanout-queue-go

What is it?

Priority queue with synchronous fanout delivery for each room based on Redis.

This queue is special by several key properties:

  1. It delivers each message sent to a room to all subscribers of that room.
  2. It does not deliver the next message until all subscribers of the room ACKnowledge the last message.
  3. It is based entirely on Redis primitives.
  4. Out-of-band messages are also available. They are immediately delivered to all subscribers with no regard to ACKs.

This allows building distributed systems where edges process messages in a coordinated lock-step with each other.

Queue guarantees

Synchronized Fanout

All clients must ACKnowledge processing of a message before any other client can see the next message, unless the message is sent Out-of-band.

At most once delivery

There are no message redelivery attempts built in. Either you get it or you do not.

(*) In case communication breaks, client must stop sending Ping() requests (preferably send an Unsubscribe() request), obtain a new Client ID, and Subscribe() after communication has been re-established.

High performance, low memory footprint

Most of the heavy lifting is done in Redis.

Infrastructure

The library leverages ioredis for communication with the Redis server.

Usage

Publishing messages
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"time"

	"github.com/go-redis/redis/v8"

	redisSyncFanoutQueue "github.com/zavitax/redis-sync-fanout-queue-go"
)

var testMessageContent = "test message content"
var testRoomId = "GO-ROOM-TEST"

var redisOptions = &redis.Options{
	Addr:     "127.0.0.1:6379",
	Password: "",
	DB:       0,
}

func createApiOptions() *redisSyncFanoutQueue.ApiOptions {
	result := &redisSyncFanoutQueue.ApiOptions{
		RedisOptions:   redisOptions,
		ClientTimeout:  time.Second * 15,
		RedisKeyPrefix: fmt.Sprintf("{test-redis-sync-fanout-queue}::%v", "test"),
	}

	return result
}

func createWorkerOptions() *redisSyncFanoutQueue.WorkerOptions {
	result := &redisSyncFanoutQueue.WorkerOptions{
		RedisOptions:   redisOptions,
		RedisKeyPrefix: fmt.Sprintf("{test-redis-sync-fanout-queue}::%v", "test"),
	}

	return result
}

func createApiClient(options *redisSyncFanoutQueue.ApiOptions) (redisSyncFanoutQueue.RedisQueueApiClient, error) {
	return redisSyncFanoutQueue.NewApiClient(context.TODO(), options)
}

func pub() {
	doneC := make(chan os.Signal, 1)
	signal.Notify(doneC)

	client, _ := createApiClient(createApiOptions())

	i := 0
	ticker := time.NewTicker(time.Second)
	done := false
	for !done {
		select {
		case <-doneC:
			done = true
		case <-ticker.C:
			i++

			client.Send(context.TODO(), "MSG_PRODUCER_ID", testRoomId, testMessageContent, 1)

			fmt.Printf("Send: %v\r", i)
		}
	}
	ticker.Stop()

	client.Close()
}

func main() {
	pub()
}
Subscribing and maintaining a client connection
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"time"

	"github.com/go-redis/redis/v8"

	redisSyncFanoutQueue "github.com/zavitax/redis-sync-fanout-queue-go"
)

var testMessageContent = "test message content"
var testRoomId = "GO-ROOM-TEST"

var redisOptions = &redis.Options{
	Addr:     "127.0.0.1:6379",
	Password: "",
	DB:       0,
}

func createApiOptions() *redisSyncFanoutQueue.ApiOptions {
	result := &redisSyncFanoutQueue.ApiOptions{
		RedisOptions:   redisOptions,
		ClientTimeout:  time.Second * 15,
		RedisKeyPrefix: fmt.Sprintf("{test-redis-sync-fanout-queue}::%v", "test"),
	}

	return result
}

func createWorkerOptions() *redisSyncFanoutQueue.WorkerOptions {
	result := &redisSyncFanoutQueue.WorkerOptions{
		RedisOptions:   redisOptions,
		RedisKeyPrefix: fmt.Sprintf("{test-redis-sync-fanout-queue}::%v", "test"),
	}

	return result
}

func createApiClient(options *redisSyncFanoutQueue.ApiOptions) (redisSyncFanoutQueue.RedisQueueApiClient, error) {
	return redisSyncFanoutQueue.NewApiClient(context.TODO(), options)
}

func sub(clientsCount int) {
	doneC := make(chan os.Signal, 1)
	signal.Notify(doneC)

	client, _ := createApiClient(createApiOptions())

	clientIds := []string{}
	for i := 0; i < clientsCount; i++ {
		clientId, _ := client.CreateClientID(context.Background())
		client.Subscribe(context.TODO(), clientId, testRoomId)
		clientIds = append(clientIds, clientId)
	}

	ticker := time.NewTicker(time.Second)
	done := false
	for !done {
		select {
		case <-doneC:
			done = true
		case <-ticker.C:
			for _, clientId := range clientIds {
				// Must call periodically for each Client ID & Room ID comibnation to keep
				// Client ID alive.
				client.Ping(context.Background(), clientId, testRoomId)
			}

			metrics, _ := client.GetMetrics(context.TODO(), &redisSyncFanoutQueue.GetApiMetricsOptions{
				TopRoomsLimit: 10,
			})

			fmt.Printf("Metrics: %v\n", metrics)
		}
	}
	ticker.Stop()

	for _, clientId := range clientIds {
		client.Unsubscribe(context.Background(), clientId, testRoomId)
	}

	client.Close()
}

func main() {
	sub(1)
}
Delivering and ACKnowledging messages (worker)
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"time"

	"github.com/go-redis/redis/v8"

	redisSyncFanoutQueue "github.com/zavitax/redis-sync-fanout-queue-go"
)

var testMessageContent = "test message content"
var testRoomId = "GO-ROOM-TEST"

var redisOptions = &redis.Options{
	Addr:     "127.0.0.1:6379",
	Password: "",
	DB:       0,
}

func createApiOptions() *redisSyncFanoutQueue.ApiOptions {
	result := &redisSyncFanoutQueue.ApiOptions{
		RedisOptions:   redisOptions,
		ClientTimeout:  time.Second * 15,
		RedisKeyPrefix: fmt.Sprintf("{test-redis-sync-fanout-queue}::%v", "test"),
	}

	return result
}

func createWorkerOptions() *redisSyncFanoutQueue.WorkerOptions {
	result := &redisSyncFanoutQueue.WorkerOptions{
		RedisOptions:   redisOptions,
		RedisKeyPrefix: fmt.Sprintf("{test-redis-sync-fanout-queue}::%v", "test"),
	}

	return result
}

func createApiClient(options *redisSyncFanoutQueue.ApiOptions) (redisSyncFanoutQueue.RedisQueueApiClient, error) {
	return redisSyncFanoutQueue.NewApiClient(context.TODO(), options)
}

func worker() {
	doneC := make(chan os.Signal, 1)
	signal.Notify(doneC)

	// Used for ACKs
	client, _ := createApiClient(createApiOptions())

	wo := createWorkerOptions()
	wo.HandleRoomClientTimeout = func(ctx context.Context, clientId *string, roomId *string) error {
		return nil
	}
	wo.HandleMessage = func(ctx context.Context, clientId *string, msg *redisSyncFanoutQueue.Message) error {
		if msg.Data == nil {
			fmt.Printf("Received nil data\n")
			return nil
		}

		strData := (*msg.Data).(string)
		if strData != testMessageContent {
			fmt.Printf("Expected '%v' but received '%v'\n", testMessageContent, strData)
			return nil
		}

		fmt.Printf("Received: client[%s] room[%s]: %v\n", *clientId, msg.Room, strData)

		if msg.AckToken != nil {
			//time.Sleep(time.Second * 3)
			// This should be called by the actual client, when it's done processing the message
			client.AckMessage(ctx, *clientId, msg.AckToken)
		}

		return nil
	}

	worker, _ := createWorkerClient(wo)

	ticker := time.NewTicker(time.Second)
	done := false
	for !done {
		select {
		case <-doneC:
			done = true
		case <-ticker.C:
			metrics, _ := worker.GetMetrics(context.TODO(), &redisSyncFanoutQueue.GetWorkerMetricsOptions{})

			fmt.Printf("Metrics: %v\n", metrics)
		}
	}
	ticker.Stop()

	worker.Close()
	client.Close()
}

func main() {
	worker()
}
Peeking at room's queue head
package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"time"

	"github.com/go-redis/redis/v8"

	redisSyncFanoutQueue "github.com/zavitax/redis-sync-fanout-queue-go"
)

var testMessageContent = "test message content"
var testRoomId = "GO-ROOM-TEST"

var redisOptions = &redis.Options{
	Addr:     "127.0.0.1:6379",
	Password: "",
	DB:       0,
}

func createApiOptions() *redisSyncFanoutQueue.ApiOptions {
	result := &redisSyncFanoutQueue.ApiOptions{
		RedisOptions:   redisOptions,
		ClientTimeout:  time.Second * 15,
		RedisKeyPrefix: fmt.Sprintf("{test-redis-sync-fanout-queue}::%v", "test"),
	}

	return result
}

func createWorkerOptions() *redisSyncFanoutQueue.WorkerOptions {
	result := &redisSyncFanoutQueue.WorkerOptions{
		RedisOptions:   redisOptions,
		RedisKeyPrefix: fmt.Sprintf("{test-redis-sync-fanout-queue}::%v", "test"),
	}

	return result
}

func createApiClient(options *redisSyncFanoutQueue.ApiOptions) (redisSyncFanoutQueue.RedisQueueApiClient, error) {
	return redisSyncFanoutQueue.NewApiClient(context.TODO(), options)
}

func peek() {
	doneC := make(chan os.Signal, 1)
	signal.Notify(doneC)

	client, _ := createApiClient(createApiOptions())

	ticker := time.NewTicker(time.Second)
	done := false
	for !done {
		select {
		case <-doneC:
			done = true
		case <-ticker.C:
			if msgs, err := client.Peek(context.TODO(), testRoomId, 0, 10); err != nil {
				panic(err)
			} else {
				for index, msg := range msgs {
					strData := (*msg.Data).(string)

					fmt.Printf("Peek: %d: %v\n", index, strData)
				}
			}
		}
	}
	ticker.Stop()

	client.Close()
}

func main() {
	peek()
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ApiMetrics added in v1.0.0

type ApiMetrics struct {
	KnownRoomsCount int64

	TopRooms                             []*RoomMetrics
	TopRoomsPendingMessagesBacklogLength int64
}

type ApiOptions added in v1.0.0

type ApiOptions struct {
	RedisOptions   *redis.Options
	ClientTimeout  time.Duration
	RedisKeyPrefix string
}

func (*ApiOptions) Validate added in v1.0.0

func (o *ApiOptions) Validate() error

type GetApiMetricsOptions added in v1.0.0

type GetApiMetricsOptions struct {
	TopRoomsLimit int
}

type GetWorkerMetricsOptions added in v1.0.0

type GetWorkerMetricsOptions struct {
}

type HandleMessageFunc

type HandleMessageFunc func(ctx context.Context, clientId *string, msg *Message) error

type HandleRoomClientTimeoutFunc added in v1.0.0

type HandleRoomClientTimeoutFunc func(ctx context.Context, clientId, room *string) error

type Message

type Message struct {
	Data           *interface{}
	AckToken       *string
	Room           string
	MessageContext struct {
		Timestamp time.Time
		Producer  string
		Latency   time.Duration
	}
}

type RedisQueueApiClient added in v1.0.0

type RedisQueueApiClient interface {
	CreateClientID(ctx context.Context) (string, error)
	Close() error
	Send(ctx context.Context, producerId string, room string, data interface{}, priority int) error
	SendOutOfBand(ctx context.Context, producerId string, room string, data interface{}) error
	Peek(ctx context.Context, room string, offset int, limit int) ([]*Message, error)
	GetMetrics(ctx context.Context, options *GetApiMetricsOptions) (*ApiMetrics, error)
	Ping(ctx context.Context, clientId string) error
	AckMessage(ctx context.Context, clientId string, ackToken *string) error
	Subscribe(ctx context.Context, clientId string, room string) error
	Unsubscribe(ctx context.Context, clientId string, room string) error
}

func NewApiClient added in v1.0.0

func NewApiClient(ctx context.Context, options *ApiOptions) (RedisQueueApiClient, error)

type RedisQueueWorkerClient added in v1.0.0

type RedisQueueWorkerClient interface {
	Close() error
	GetMetrics(ctx context.Context, options *GetWorkerMetricsOptions) (*WorkerMetrics, error)
}

func NewWorkerClient added in v1.0.0

func NewWorkerClient(ctx context.Context, options *WorkerOptions) (RedisQueueWorkerClient, error)

type RoomMetrics

type RoomMetrics struct {
	Room                         string
	PendingMessagesBacklogLength int64
}

type WorkerMetrics added in v1.0.0

type WorkerMetrics struct {
	ReceivedMessagesCount int64
	InvalidMessagesCount  int64

	MinLatency time.Duration
	MaxLatency time.Duration
	AvgLatency time.Duration
}

type WorkerOptions added in v1.0.0

type WorkerOptions struct {
	RedisOptions            *redis.Options
	RedisKeyPrefix          string
	HandleMessage           HandleMessageFunc
	HandleRoomClientTimeout HandleRoomClientTimeoutFunc
}

func (*WorkerOptions) Validate added in v1.0.0

func (o *WorkerOptions) Validate() error

Directories

Path Synopsis
cmd
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL