pubsub

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2021 License: CC0-1.0 Imports: 2 Imported by: 1

README

pubsub

Embeddable Lightweight Pub/Sub in Go

Motivation

After using several Pub/Sub systems in writing production softwares for sometime, I decided to write one very simple, embeddable, light-weight Pub/Sub system using only native Go functionalities i.e. Go routines, Go channels.

I found Go channels are MPSC i.e. multiple producers can push onto same channel, but there's only one consumer. You're very much free to use multiple consumers on single channel, but they will start competing for messages being published on channel.

Good thing is that Go channels are concurrent-safe. So I considered extending it to make in-application communication more flexible. Below is what provided by this embeddable piece of software.

✌️ Producer Consumer
Single
Multiple

Design

architecture

Usage

Here's an example

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Message

type Message struct {
	Topics []string
	Data   []byte
}

Message - Publisher showing intent of publishing arbitrary byte slice to topics

type PubSub

type PubSub struct {
	Alive            bool
	Index            uint64
	MessageChan      chan *PublishRequest
	SubscriberIdChan chan chan uint64
	SubscribeChan    chan *SubscriptionRequest
	UnsubscribeChan  chan *UnsubscriptionRequest
	Subscribers      map[string]map[uint64]chan *PublishedMessage
}

PubSub - Pub/Sub Server i.e. holds which clients are subscribed to what topics, manages publishing messages to correct topics, handles (un-)subscription requests

In other words state manager of Pub/Sub system

func New

func New() *PubSub

New - Create a new Pub/Sub hub, using which messages can be routed to various topics

func (*PubSub) AddSubscription

func (p *PubSub) AddSubscription(subscriber *Subscriber, topics ...string) (bool, uint64)

AddSubscription - Use existing subscriber client to subscribe to more topics

Response will only be negative if Pub/Sub system has stopped running

func (*PubSub) Publish

func (p *PubSub) Publish(msg *Message) (bool, uint64)

Publish - Publish message to N-many topics, receives how many of subscribers are receiving ( will receive ) copy of this message

Response will only be negative if Pub/Sub system has stopped running

func (*PubSub) Start

func (p *PubSub) Start(ctx context.Context)

Start - Handles request from publishers & subscribers, so that message publishing can be abstracted

Consider running it as a go routine

func (*PubSub) Subscribe

func (p *PubSub) Subscribe(cap uint64, topics ...string) *Subscriber

Subscribe - Subscribes to topics for first time, new client gets created

Use this client to add more subscriptions to topics/ unsubscribe from topics/ receive published messages etc.

Response will only be nil if Pub/Sub system has stopped running

func (*PubSub) Unsubscribe

func (p *PubSub) Unsubscribe(subscriber *Subscriber, topics ...string) (bool, uint64)

Unsubscribe - Unsubscribes from topics for specified subscriber client

Response will only be negative if Pub/Sub system has stopped running

func (*PubSub) UnsubscribeAll

func (p *PubSub) UnsubscribeAll(subscriber *Subscriber) (bool, uint64)

UnsubscribeAll - All current active subscriptions get unsubscribed from

Response will only be negative if Pub/Sub system has stopped running

type PublishRequest

type PublishRequest struct {
	Message      *Message
	ResponseChan chan uint64
}

PublishRequest - Publisher will show interest of publication using this form, while receiving how many subscribers it published to

type PublishedMessage

type PublishedMessage struct {
	Topic string
	Data  []byte
}

PublishedMessage - Once a message is published on a topic, subscriber to receive it in this form

type Subscriber

type Subscriber struct {
	Id      uint64
	Channel chan *PublishedMessage
	Topics  map[string]bool
}

Subscriber - Uniquely identifiable subscriber with multiple subscribed topics from where it wishes to listen from over single channel

func (*Subscriber) AddSubscription

func (s *Subscriber) AddSubscription(pubsub *PubSub, topics ...string) (bool, uint64)

AddSubscription - Subscribe to topics using existing pub/sub client

func (*Subscriber) BNext

func (s *Subscriber) BNext(delay time.Duration) *PublishedMessage

BNext - Read from channel, if something is available immediately otherwise wait for specified delay & attempt to read again where this step is non-blocking

func (*Subscriber) Close

func (s *Subscriber) Close() bool

Close - Destroys subscriber

func (*Subscriber) Next

func (s *Subscriber) Next() *PublishedMessage

Next - Read from channel if anything is immediately available otherwise just return i.e. it's non-blocking op

func (*Subscriber) Unsubscribe

func (s *Subscriber) Unsubscribe(pubsub *PubSub, topics ...string) (bool, uint64)

Unsubscribe - Unsubscribe from topics, if subscribed to them using this client

func (*Subscriber) UnsubscribeAll

func (s *Subscriber) UnsubscribeAll(pubsub *PubSub) (bool, uint64)

UnsubcribeAll - Unsubscribes from all topics this client is currently subscribed to

type SubscriptionRequest

type SubscriptionRequest struct {
	Subscriber   *Subscriber
	ResponseChan chan uint64
}

SubscriptionRequest - Subscriber to send topic subscription request in this form, will also receive how many topics were successfully subscribed to

type UnsubscriptionRequest

type UnsubscriptionRequest struct {
	Id           uint64
	Topics       []string
	ResponseChan chan uint64
}

UnsubscriptionRequest - Topic unsubscription request to be sent in this form will also receive how many of them were successfully unsubscribed from

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL