pubsub

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2021 License: CC0-1.0 Imports: 2 Imported by: 1

README

pubsub

Embeddable Lightweight Pub/Sub in Go

Motivation

After using several Pub/Sub systems in writing production grade softwares for sometime, I decided to write one very simple, embeddable, light-weight Pub/Sub system using only native Go functionalities i.e. Go routines, Go channels.

Actually, Go channels are MPSC i.e. multiple producers can push onto same channel, but there's only one consumer. You're very much free to use multiple consumers on single channel, but they will start competing for messages being published on channel i.e. all consumers won't see all messages published on channel.

Good thing is that Go channels are concurrent-safe. So I considered extending it to make in-application communication more flexible. Below is what provided by this embeddable piece of software.

✌️ Producer Consumer
Single
Multiple

Design

architecture

Stress Testing

Stress testing using pubsub was done for following message passing patterns, where every message was of size 1024 bytes & I attempted to calculate time spent for producing & consuming data under various configuration.

If you may be interested in taking a look at stress testing examples

SPSC SPMC MPSC MPMC
spsc spmc mpsc mpmc

One generic simulation with N -parties & rolling average of data transferred is present here

SAFETY Mode Enabled ( SLOWER ) SAFETY Mode Disabled ( FASTER )
generic_safe_simulation generic_unsafe_simulation

Usage

First create a Go project with GOMOD support.

# Assuming you're on UNIX flavoured OS

cd
mkdir test_pubsub

cd test_pubsub
go mod init github.com/<username>/test_pubsub

touch main.go

Now add github.com/itzmeanjan/pubsub as your project dependency

go get github.com/itzmeanjan/pubsub # v0.1.1 latest

And follow full example.


Important

A few things you should care about when using pubsub in your application

  • Though Go channels are heavily used in this package, you're never supposed to be interacting with them directly. Abstracted, easy to use methods are made available for you. DON'T INTERACT WITH CHANNELS DIRECTLY. IF YOU DO, PLEASE BE CAREFUL.
  • When creating new subscriber, it'll be allocated with one unique ID. ID generation logic is very simple. BUT MAKE SURE YOU NEVER MANIPULATE IT
last_id = 1 // initial value
next_id = last_id + 1
last_id = next_id
  • If you're a publisher, you should concern yourself with either of

    • PubSub.Publish(...) [ non-blocking ]
    • or PubSub.BPublish(...) [ blocking ]
  • If you're a subscriber, you should first subscribe to N-many topics, using PubSub.Subscribe(...). You can start reading using

    • Subscriber.Next() [ non-blocking ]
    • Subscriber.BNext(...) [ blocking ]
    • Subscriber.AddSubscription(...) [ add more subscriptions on-the-fly ]
    • Subscriber.Unsubscribe(...) [ cancel some topic subscriptions ]
    • Subscriber.UnsubscribeAll(...) [ cancel all topic subscriptions ]
    • Subscriber.Close() [ when you don't want to use this subscriber anymore ]
  • You're good to invoke 👆 methods from N-many go routines on same Pub/Sub System which you started using PubSub.Start(...). It needs to be first created

broker := PubSub.New()
  • If you use pubsub with default settings you won't get highest possible HUB performance. You can enable that explicitly ( at your own risk ) by invoking PubSub.AllowUnsafe(), which will stop doing most expensive thing it does during message passing i.e. copying messages for each subscriber. It'll make whole system FASTer, but at cost of risk.

Let's assume you publish a message to N-parties & make modification to same message slice which you used for publishing. As you've also disabled safety lock, hub didn't copy messages for each subscriber, rather it just passed a reference to that same slice to all parties. Each of them might see an inconsistent view of message now. If you're sure you're not making any changes to same message slice from either publisher/ subscriber side, you better disable SAFETY lock & get far better performance from Pub/Sub system.

  • After disabling SAFETY lock, you might want to again enable it at runtime, which can be done in concurrent safe manner by invoking PubSub.OnlySafe().

🔥 Disabling safety mode, brings you ~63.17% performance improvement, but be careful 🔥

And all set 🚀


You can check package documentation here

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Message

type Message struct {
	Topics []string
	Data   []byte
}

Message - Publisher showing intent of publishing arbitrary byte slice to topics

type PubSub

type PubSub struct {
	SafetyMode       bool
	Alive            bool
	Index            uint64
	MessageChan      chan *PublishRequest
	SubscriberIdChan chan chan uint64
	SubscribeChan    chan *SubscriptionRequest
	UnsubscribeChan  chan *UnsubscriptionRequest
	SafetyChan       chan *SafetyMode
	Subscribers      map[string]map[uint64]chan *PublishedMessage
}

PubSub - Pub/Sub Server i.e. holds which clients are subscribed to what topics, manages publishing messages to correct topics, handles (un-)subscription requests

In other words state manager of Pub/Sub system

func New

func New() *PubSub

New - Create a new Pub/Sub hub, using which messages can be routed to various topics

func (*PubSub) AddSubscription

func (p *PubSub) AddSubscription(subscriber *Subscriber, topics ...string) (bool, uint64)

AddSubscription - Use existing subscriber client to subscribe to more topics

Response will only be negative if Pub/Sub system has stopped running

func (*PubSub) AllowUnsafe added in v0.1.3

func (p *PubSub) AllowUnsafe() bool

AllowUnsafe - Hub allows you to pass slice of messages to N-many topic subscribers & as slices are references if any of those subscribers ( or even publisher itself ) mutates slice it'll be reflected to all parties, which might not be desireable always.

But if you're sure that won't cause any problem for you, you can at your own risk disable SAFETY lock

If disabled, hub won't anymore attempt to copy slices to for each topic subscriber, it'll simply pass. As this means hub will do lesser work, hub will be able to process more data than ever **FASTer ⭐️**

❗️ But remember this might bring problems for you

func (*PubSub) BPublish added in v0.1.2

func (p *PubSub) BPublish(msg *Message, delay time.Duration) (bool, uint64)

BPublish - Publish message to N-many topics and block for at max `delay` if any subscriber of any of those topics are not having enough buffer space

Please note, hub attempts to send message on subscriber channel if finds lack of space, wait for `delay` & retries. This time too if it fails to find enough space, it'll return back immediately.

func (*PubSub) OnlySafe added in v0.1.3

func (p *PubSub) OnlySafe() bool

OnlySafe - You'll probably never require to use this method if you've not explicitly disabled safety lock by invoking `AllowUnsafe` ( 👆)

But you've & in runtime you need to again enable safety mode, you can call this method & all messages published are going to be copied for each subscriber which will make ops slower that SAFETY lock disabled mode

func (*PubSub) Publish

func (p *PubSub) Publish(msg *Message) (bool, uint64)

Publish - Publish message to N-many topics, receives how many of subscribers are receiving ( will receive ) copy of this message

Response will only be negative if Pub/Sub system has stopped running

func (*PubSub) Start

func (p *PubSub) Start(ctx context.Context)

Start - Handles request from publishers & subscribers, so that message publishing can be abstracted

Consider running it as a go routine

func (*PubSub) Subscribe

func (p *PubSub) Subscribe(cap uint64, topics ...string) *Subscriber

Subscribe - Subscribes to topics for first time, new client gets created

Use this client to add more subscriptions to topics/ unsubscribe from topics/ receive published messages etc.

Response will only be nil if Pub/Sub system has stopped running

func (*PubSub) Unsubscribe

func (p *PubSub) Unsubscribe(subscriber *Subscriber, topics ...string) (bool, uint64)

Unsubscribe - Unsubscribes from topics for specified subscriber client

Response will only be negative if Pub/Sub system has stopped running

func (*PubSub) UnsubscribeAll

func (p *PubSub) UnsubscribeAll(subscriber *Subscriber) (bool, uint64)

UnsubscribeAll - All current active subscriptions get unsubscribed from

Response will only be negative if Pub/Sub system has stopped running

type PublishRequest

type PublishRequest struct {
	Message      *Message
	BlockFor     time.Duration
	ResponseChan chan uint64
}

PublishRequest - Publisher will show interest of publication using this form, while receiving how many subscribers it published to

type PublishedMessage

type PublishedMessage struct {
	Topic string
	Data  []byte
}

PublishedMessage - Once a message is published on a topic, subscriber to receive it in this form

type SafetyMode added in v0.1.3

type SafetyMode struct {
	Enable       bool
	ResponseChan chan bool
}

SafetyMode - For enabling/ disabling SAFETY lock message to be sent to HUB in this form

type Subscriber

type Subscriber struct {
	Id      uint64
	Channel chan *PublishedMessage
	Topics  map[string]bool
}

Subscriber - Uniquely identifiable subscriber with multiple subscribed topics from where it wishes to listen from over single channel

func (*Subscriber) AddSubscription

func (s *Subscriber) AddSubscription(pubsub *PubSub, topics ...string) (bool, uint64)

AddSubscription - Subscribe to topics using existing pub/sub client

func (*Subscriber) BNext

func (s *Subscriber) BNext(delay time.Duration) *PublishedMessage

BNext - Read from channel, if something is available immediately otherwise wait for specified delay & attempt to read again where this step is non-blocking

func (*Subscriber) Close

func (s *Subscriber) Close() bool

Close - Destroys subscriber

func (*Subscriber) Next

func (s *Subscriber) Next() *PublishedMessage

Next - Read from channel if anything is immediately available otherwise just return i.e. it's non-blocking op

func (*Subscriber) Unsubscribe

func (s *Subscriber) Unsubscribe(pubsub *PubSub, topics ...string) (bool, uint64)

Unsubscribe - Unsubscribe from topics, if subscribed to them using this client

func (*Subscriber) UnsubscribeAll

func (s *Subscriber) UnsubscribeAll(pubsub *PubSub) (bool, uint64)

UnsubcribeAll - Unsubscribes from all topics this client is currently subscribed to

type SubscriptionRequest

type SubscriptionRequest struct {
	Subscriber   *Subscriber
	ResponseChan chan uint64
}

SubscriptionRequest - Subscriber to send topic subscription request in this form, will also receive how many topics were successfully subscribed to

type UnsubscriptionRequest

type UnsubscriptionRequest struct {
	Id           uint64
	Topics       []string
	ResponseChan chan uint64
}

UnsubscriptionRequest - Topic unsubscription request to be sent in this form will also receive how many of them were successfully unsubscribed from

Directories

Path Synopsis
stress
generic command
mpmc command
mpsc command
spmc command
spsc command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL