pubsub

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2021 License: CC0-1.0 Imports: 2 Imported by: 1

README

pubsub

Embeddable Lightweight Pub/Sub in Go

Motivation

After using several Pub/Sub systems in writing production grade softwares for sometime, I decided to write one very simple, embeddable, light-weight Pub/Sub system using only native Go functionalities i.e. Go routines, Go channels.

Actually, Go channels are MPSC i.e. multiple producers can push onto same channel, but there's only one consumer. You're very much free to use multiple consumers on single channel, but they will start competing for messages being published on channel i.e. all consumers won't see all messages published on channel.

Good thing is that Go channels are concurrent-safe. So I considered extending it to make in-application communication more flexible. Below is what provided by this embeddable piece of software.

✌️ Producer Consumer
Single
Multiple

Design

architecture

Stress Testing

Stress testing using pubsub was done for following message passing patterns, where every message was of size 1024 bytes & I attempted to calculate time spent for producing & consuming data under various configuration.

If you may be interested in taking a look at stress testing examples

spsc

spmc

mpsc

mpmc


One generic simulation with N -parties & rolling average of data transferred is present here

generic_simulation


Usage

First create a Go project with GOMOD support.

# Assuming you're on UNIX flavoured OS

cd
mkdir test_pubsub

cd test_pubsub
go mod init github.com/<username>/test_pubsub

touch main.go

Now add github.com/itzmeanjan/pubsub as your project dependency

go get github.com/itzmeanjan/pubsub # v0.1.1 latest

And follow full example.


Important

A few things you should care about when using pubsub in your application

  • Though Go channels are heavily used in this package, you're never supposed to be interacting with them directly. Abstracted, easy to use methods are made available for you. DON'T INTERACT WITH CHANNELS DIRECTLY
  • When creating new subscriber, it'll be allocated with one unique ID. ID generation logic is very simple. BUT MAKE SURE YOU NEVER MANIPULATE IT
last_id = 1 // initial value
next_id = last_id + 1
last_id = next_id
  • If you're a publisher, you should concern yourself with either of

    • PubSub.Publish(...) [ non-blocking ]
    • or PubSub.BPublish(...) [ blocking ]
  • If you're a subscriber, you should first subscribe to N-many topics, using PubSub.Subscribe(...). You can start reading using

    • Subscriber.Next() [ non-blocking ]
    • Subscriber.BNext(...) [ blocking ]
    • Subscriber.AddSubscription(...) [ add more subscriptions on-the-fly ]
    • Subscriber.Unsubscribe(...) [ cancel some topic subscriptions ]
    • Subscriber.UnsubscribeAll(...) [ cancel all topic subscriptions ]
    • Subscriber.Close() [ when you don't want to use this subscriber anymore ]
  • You're good to invoke 👆 methods from N-many go routines on same Pub/Sub System which you started using PubSub.Start(...). It needs to be first created

broker := PubSub.New()

And all set 🚀


You can check package documentation here

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Message

type Message struct {
	Topics []string
	Data   []byte
}

Message - Publisher showing intent of publishing arbitrary byte slice to topics

type PubSub

type PubSub struct {
	Alive            bool
	Index            uint64
	MessageChan      chan *PublishRequest
	SubscriberIdChan chan chan uint64
	SubscribeChan    chan *SubscriptionRequest
	UnsubscribeChan  chan *UnsubscriptionRequest
	Subscribers      map[string]map[uint64]chan *PublishedMessage
}

PubSub - Pub/Sub Server i.e. holds which clients are subscribed to what topics, manages publishing messages to correct topics, handles (un-)subscription requests

In other words state manager of Pub/Sub system

func New

func New() *PubSub

New - Create a new Pub/Sub hub, using which messages can be routed to various topics

func (*PubSub) AddSubscription

func (p *PubSub) AddSubscription(subscriber *Subscriber, topics ...string) (bool, uint64)

AddSubscription - Use existing subscriber client to subscribe to more topics

Response will only be negative if Pub/Sub system has stopped running

func (*PubSub) BPublish added in v0.1.2

func (p *PubSub) BPublish(msg *Message, delay time.Duration) (bool, uint64)

BPublish - Publish message to N-many topics and block for at max `delay` if any subscriber of any of those topics are not having enough buffer space

Please note, hub attempts to send message on subscriber channel if finds lack of space, wait for `delay` & retries. This time too if it fails to find enough space, it'll return back immediately.

func (*PubSub) Publish

func (p *PubSub) Publish(msg *Message) (bool, uint64)

Publish - Publish message to N-many topics, receives how many of subscribers are receiving ( will receive ) copy of this message

Response will only be negative if Pub/Sub system has stopped running

func (*PubSub) Start

func (p *PubSub) Start(ctx context.Context)

Start - Handles request from publishers & subscribers, so that message publishing can be abstracted

Consider running it as a go routine

func (*PubSub) Subscribe

func (p *PubSub) Subscribe(cap uint64, topics ...string) *Subscriber

Subscribe - Subscribes to topics for first time, new client gets created

Use this client to add more subscriptions to topics/ unsubscribe from topics/ receive published messages etc.

Response will only be nil if Pub/Sub system has stopped running

func (*PubSub) Unsubscribe

func (p *PubSub) Unsubscribe(subscriber *Subscriber, topics ...string) (bool, uint64)

Unsubscribe - Unsubscribes from topics for specified subscriber client

Response will only be negative if Pub/Sub system has stopped running

func (*PubSub) UnsubscribeAll

func (p *PubSub) UnsubscribeAll(subscriber *Subscriber) (bool, uint64)

UnsubscribeAll - All current active subscriptions get unsubscribed from

Response will only be negative if Pub/Sub system has stopped running

type PublishRequest

type PublishRequest struct {
	Message      *Message
	BlockFor     time.Duration
	ResponseChan chan uint64
}

PublishRequest - Publisher will show interest of publication using this form, while receiving how many subscribers it published to

type PublishedMessage

type PublishedMessage struct {
	Topic string
	Data  []byte
}

PublishedMessage - Once a message is published on a topic, subscriber to receive it in this form

type Subscriber

type Subscriber struct {
	Id      uint64
	Channel chan *PublishedMessage
	Topics  map[string]bool
}

Subscriber - Uniquely identifiable subscriber with multiple subscribed topics from where it wishes to listen from over single channel

func (*Subscriber) AddSubscription

func (s *Subscriber) AddSubscription(pubsub *PubSub, topics ...string) (bool, uint64)

AddSubscription - Subscribe to topics using existing pub/sub client

func (*Subscriber) BNext

func (s *Subscriber) BNext(delay time.Duration) *PublishedMessage

BNext - Read from channel, if something is available immediately otherwise wait for specified delay & attempt to read again where this step is non-blocking

func (*Subscriber) Close

func (s *Subscriber) Close() bool

Close - Destroys subscriber

func (*Subscriber) Next

func (s *Subscriber) Next() *PublishedMessage

Next - Read from channel if anything is immediately available otherwise just return i.e. it's non-blocking op

func (*Subscriber) Unsubscribe

func (s *Subscriber) Unsubscribe(pubsub *PubSub, topics ...string) (bool, uint64)

Unsubscribe - Unsubscribe from topics, if subscribed to them using this client

func (*Subscriber) UnsubscribeAll

func (s *Subscriber) UnsubscribeAll(pubsub *PubSub) (bool, uint64)

UnsubcribeAll - Unsubscribes from all topics this client is currently subscribed to

type SubscriptionRequest

type SubscriptionRequest struct {
	Subscriber   *Subscriber
	ResponseChan chan uint64
}

SubscriptionRequest - Subscriber to send topic subscription request in this form, will also receive how many topics were successfully subscribed to

type UnsubscriptionRequest

type UnsubscriptionRequest struct {
	Id           uint64
	Topics       []string
	ResponseChan chan uint64
}

UnsubscriptionRequest - Topic unsubscription request to be sent in this form will also receive how many of them were successfully unsubscribed from

Directories

Path Synopsis
stress
generic command
mpmc command
mpsc command
spmc command
spsc command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL