pubsub

package module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2021 License: CC0-1.0 Imports: 5 Imported by: 1

README

pubsub

Embeddable Lightweight Pub/Sub in Go

Motivation

After using several Pub/Sub systems in writing production grade softwares for sometime, I decided to write one very simple, embeddable, light-weight Pub/Sub system using only native Go functionalities i.e. Go routines, Go channels, Streaming I/O.

Actually, Go channels are MPSC i.e. multiple producers can push onto same channel, but there's only one consumer. You're very much free to use multiple consumers on single channel, but they will start competing for messages being published on channel i.e. all consumers won't see all messages published on channel.

Good thing is that Go channels are concurrent-safe. So I considered extending it to make in-application communication more flexible, while leveraging Go's powerful streaming I/O for fast copying of messages from publisher stream to N-consumer stream(s). Below is what provided by this embeddable piece of software.

✌️ Producer Consumer
Single
Multiple

Design

architecture

Stress Testing

For stress testing the system, I wrote one configurable program which makes running tests easy using various CLI argument combination.

Run it using with

go run stress/main.go -help
go run stress/main.go

stress_testing_result

Usage

First create a Go project with GOMOD support.

# Assuming you're on UNIX flavoured OS

cd
mkdir test_pubsub

cd test_pubsub
go mod init github.com/<username>/test_pubsub

touch main.go

Now add github.com/itzmeanjan/pubsub as your project dependency

go get github.com/itzmeanjan/pubsub # v0.1.5 latest

And follow full example.


Important

If you're planning to use pubsub in your application

  • You should first start pub/sub broker using
broker := pubsub.New(ctx)

// concurrent-safe utility method
if !broker.IsAlive() {
	return
}

// Start using broker 👇
  • If you're a publisher, you should concern yourself with only PubSub.Publish(...)
msg := pubsub.Message{
    Topics: []pubsub.String{pubsub.String("topic_1")},
    Data:   []byte("hello"),
}
ok, publishedTo := broker.Publish(&msg) // concurrent-safe
  • If you're a subscriber, you should first subscribe to N-many topics, using PubSub.Subscribe(...).
subscriber := broker.Subscribe(ctx, 16, []string{"topic_1"}...)
  • You can start consuming messages using Subscriber.Next(...)
for {
    msg := subscriber.Next()
    if msg == nil {
        continue
    }
}
  • Add more subscriptions on-the-fly using Subscriber.AddSubscription(...)
ok, subTo := subscriber.AddSubscription([]string{"topic_2"}...)
  • Unsubscribe from specific topic using Subscriber.Unsubscribe(...)
ok, unsubFrom := subscriber.Unsubscribe([]string{"topic_1"}...)
  • Unsubscribe from all topics using Subscriber.UnsubscribeAll(...)
ok, unsubFrom := subscriber.UnsubscribeAll()
  • Cancel context when you're done using subscriber ( or may be broker itself, but please be careful ❗️ )
cancel()
<-time.After(time.Duration(100) * time.Microsecond)
// all good

And all set 🚀


You can check package documentation here

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Binary added in v0.1.4

type Binary []byte

Binary - Actual message content to be published

func (Binary) Bytes added in v0.1.4

func (b Binary) Bytes() []byte

func (*Binary) ReadFrom added in v0.1.4

func (b *Binary) ReadFrom(r io.Reader) (int64, error)

ReadFrom - Reads length prefixed content from given stream

func (Binary) String added in v0.1.4

func (b Binary) String() string

func (Binary) WriteTo added in v0.1.4

func (b Binary) WriteTo(w io.Writer) (int64, error)

WriteTo - Writes length prefixed content into given stream

type Message

type Message struct {
	Topics []String
	Data   Binary
}

Message - Publisher showing intent of publishing arbitrary byte slice to topics

func (*Message) ReadFrom added in v0.1.4

func (m *Message) ReadFrom(r io.Reader) (int64, error)

ReadFrom - Read from byte stream into structured message

func (*Message) WriteTo added in v0.1.4

func (m *Message) WriteTo(w io.Writer) (int64, error)

WriteTo - Writes byte serialised content into given stream

type PubSub

type PubSub struct {
	// contains filtered or unexported fields
}

PubSub - Pub/Sub Server i.e. holds which clients are subscribed to what topics, manages publishing messages to correct topics, handles (un-)subscription requests

In other words state manager of Pub/Sub Broker

func New

func New(ctx context.Context) *PubSub

New - Create a new Pub/Sub hub, using which messages can be routed to various topics

func (*PubSub) IsAlive added in v0.1.5

func (p *PubSub) IsAlive() bool

IsAlive - Check whether Hub is still alive or not [ concurrent-safe, good for external usage ]

func (*PubSub) Publish

func (p *PubSub) Publish(msg *Message) (bool, uint64)

Publish - Send message publishing request to N-topics in concurrent-safe manner

func (*PubSub) Subscribe

func (p *PubSub) Subscribe(ctx context.Context, cap int, topics ...string) *Subscriber

Subscribe - Create new subscriber instance with initial buffer capacity, listening for messages published on N-topics initially.

More topics can be subscribed to later using returned subscriber instance.

type PublishedMessage

type PublishedMessage struct {
	Topic String
	Data  Binary
}

PublishedMessage - Subscriber will receive message for consumption in this form

func (*PublishedMessage) ReadFrom added in v0.1.4

func (p *PublishedMessage) ReadFrom(r io.Reader) (int64, error)

ReadFrom - Read from byte stream into structured message

func (*PublishedMessage) WriteTo added in v0.1.4

func (p *PublishedMessage) WriteTo(w io.Writer) (int64, error)

WriteTo - Writes byte serialised content into given stream

type String added in v0.1.4

type String string

String - Message topic to be written into stream in this form

func (String) Bytes added in v0.1.4

func (s String) Bytes() []byte

func (*String) ReadFrom added in v0.1.4

func (s *String) ReadFrom(r io.Reader) (int64, error)

ReadFrom - Length prefixed byte content ( topic ) is read from stream

func (String) String added in v0.1.4

func (s String) String() string

func (String) WriteTo added in v0.1.4

func (s String) WriteTo(w io.Writer) (int64, error)

WriteTo - Length prefixed byte content ( topic ) is written into stream

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber - Uniquely identifiable subscriber with multiple subscribed topics from where it wishes to listen from over ping channel

func (*Subscriber) AddSubscription

func (s *Subscriber) AddSubscription(topics ...string) (bool, uint64)

AddSubscription - Add subscriptions to more topics on-the-fly

func (*Subscriber) Next

func (s *Subscriber) Next() *PublishedMessage

Next - Attempt to consume oldest message living in buffer, by popping it out, in concurrent-safe manner

func (*Subscriber) Unsubscribe

func (s *Subscriber) Unsubscribe(topics ...string) (bool, uint64)

Unsubscribe - Unsubscribe from specified subscribed topics

func (*Subscriber) UnsubscribeAll

func (s *Subscriber) UnsubscribeAll() (bool, uint64)

UnsubscribeAll - Unsubscribe from all active subscribed topics

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL