pubsub

package module
v0.0.0-...-8fdf53d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 14, 2019 License: CC0-1.0 Imports: 6 Imported by: 0

README

PubSub Challenge

GolangCI GoDoc Go Report Card

Problem

Create a new publisher-subscriber (PubSub) that allows multiple producers publishing to multiple consumers. The consumers can subscribe to messages by specifying a topic pattern (supporting wildcards) that it wants to receive messages from, and producers can only write to a specific topic (i.e one to many).

For example, two consumers where one subscribes to cosmosA-events-eventA and another subscribes to cosmosA-events-* will both receive the message that was published by producers writing to cosmosA-events-eventA.

A consumer can start subscribing to events before there is any publisher, and will start to receive data once a publisher is created and publishes to that topic.

Solution

There exists a PubSub implementation, BasePubSub. The BasePubSub allows for any number of producers to be registered. Each producer may publish messages to a single unique topic. Internally, each producer maintains a list of subscribers (i.e. one to many). Clients may then subscribe to messages using a topic pattern. For each matching topic, the subscription will be added to the producer's list of subscriptions and those messages will be sent out on each subscription channel (which is returned to the client).

Note, the BaseProducer type allows for buffered publishing. If the buffer/queue is full, the producer will error on Publish.

e.g.

+------------------+             +----------------------+
| producer (a.b.c) +------------>+ subscription (*.*.c) |
+------------------+             +----------------------+
                         +-------^
                         |
                         |
+------------------+     |       +----------------------+
| producer (x.y.c) +-----+------>+ subscription (x.y.*) |
+------------------+             +----------------------+


+--------------------+           +----------------------+
| producer (foo/bar) +---------->+ subscription (foo/*) |
+--------------------+           +----------------------+


                                 +----------------------+
                                 + subscription (*/baz) |
                                 +----------------------+

Here we have three producers, where a.b.c and foo/bar both have a single subscription and x.y.c has two subscriptions.

Note, producers can be registered and added to the BasePubSub server after a matching subscription(s) already exists. These subscriptions can either be tied to existing producers or be "idle" -- subscriptions which currently do not have any matching producers. In such a case, the new producer will have the matching subscription(s) added to it's list of subscriptions.

e.g.

When adding a new producer foo/baz, it'll match the already existing subscription foo/* which the producer foo/bar has and against the "idle" subscription */baz.

...

+--------------------+           +----------------------+
| producer (foo/bar) +---------->+ subscription (foo/*) |
+--------------------+           +----------------------+
                           +-----^
                           |
+--------------------+     |      +----------------------+
| producer (foo/baz) +-----+----->+ subscription (*/baz) |
+--------------------+            +----------------------+
Potential Improvements
  • Consider returning a richer concrete type for Subscribe (e.g the ability to close).
  • Consider using a modified radix trie to improve topic matching when the number of producers is significantly large.

Assumptions

Valid topics consist of arbitrary-length alphanumeric characters separated by a valid deliminator: /,.,-.

Documentation

Overview

Create a new publisher-subscriber (PubSub) that allows multiple producers publishing to multiple consumers. The consumers can subscribe to messages by specifying a topic pattern (supporting wildcards) that it wants to receive messages from, and producers can only write to a specific topic (i.e one to many).

For example, two consumers where one subscribes to “cosmosA-events-eventA” and another subscribes to “cosmosA-events-*” will both receive the message that was published by producers writing to “cosmosA-events-eventA”.

A consumer can start subscribing to events before there is any publisher, and will start to receive data once a publisher is created and publishes to that topic.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyTopic   = errors.New("topic cannot be empty")
	ErrInvalidTopic = errors.New("invalid topic")
)

Topic errors

Functions

func MatchTopic

func MatchTopic(topic, pattern string) bool

MatchTopic returns true if a given pattern matches the provided topic and returns false otherwise. The pattern may contain arbitrary wildcards '*'. It is assumed the topic is valid.

Example: ok := MatchTopic("a.b.c", "a.b.c") // ok: true ok := MatchTopic("a.b.c", "a.*.c") // ok: true ok := MatchTopic("a.b.c", "a.*.*") // ok: true ok := MatchTopic("a.b.c", "c.b.a") // ok: false ok := MatchTopic("a.b.c", "a.*") // ok: false

func ValidTopic

func ValidTopic(topic string) error

ValidTopic returns an error if a topic is invalid and nil otherwise. A valid topic consists of arbitrary-length alphanumeric characters separated by a valid deliminator: /,.,-

Types

type BaseProducer

type BaseProducer struct {
	// contains filtered or unexported fields
}

BaseProducer implements the Producer interface by implementing basic publishing capabilities on a single topic.

func (*BaseProducer) Publish

func (bp *BaseProducer) Publish(msg Message) error

Publish will attempt to publish the provided Message. It will return an error if the publisher's internal queue is full.

func (*BaseProducer) TotalSubscriptions

func (bp *BaseProducer) TotalSubscriptions() int

TotalSubscriptions returns the total number of subscriptions the producer currently has.

type BasePubSub

type BasePubSub struct {
	// contains filtered or unexported fields
}

BasePubSub implements a simple PubSub model where internally it maintains a set of BaseProducers per topic where each topic is unique and a set of BaseConsumers such that the relationship between BaseProducer and BaseConsumer is one-to-one. Each internal BaseConsumer maintains an arbitrary set of subscriptions for a given topic pattern.

func (*BasePubSub) RegisterProducer

func (bps *BasePubSub) RegisterProducer(topic string, producer Producer) error

RegisterProducer attempts to register a Producer for a given topic. A BaseConsumer is created for each Producer and starts to listen for new Messages. The Producer must be of type BaseProducer. An error will be returned if the topic is invalid or if a Producer already registered for that topic.

func (*BasePubSub) Subscribe

func (bps *BasePubSub) Subscribe(pattern string) <-chan Message

Subscribe will create and return a new subscription (read-only Message channel) for a topic pattern. It will find all matching topics (if any exist) where the subscription will receive Messages from each associated Producer. If no topics exist for that pattern, the subscription will never receive any Messages even if a matching topic is created later.

Note, there is no guarantee when the subscription will be added to any given producer so the client must not rely on timing. This allows Subscribe to not take abnormal amount of time when adding subscriptions.

type Message

type Message interface {
	fmt.Stringer
}

Message defines a type alias for an interface and represents arbitrary messages that a Publisher may publish and a Consumer may receive.

type Producer

type Producer interface {
	Publish(msg Message) error
	TotalSubscriptions() int
}

Producer defines a contract which a producer in a pubsub model must implement.

func NewBaseProducer

func NewBaseProducer(topic string, capacity uint) Producer

type PubSub

type PubSub interface {
	RegisterProducer(topic string, producer Producer) error

	// TODO: consider returning a concrete type for richer functionality
	Subscribe(topicPattern string) <-chan Message
}

PubSub defines a minimal interface for a publisher-subscriber (PubSub) model. A PubSub must be able to registry arbitrary Producers which each Producer publishes messages on a unique topic.

Clients must be able to subscribe to topics using a topic pattern where the pattern may contain wildcards. Subscriptions must return a channel from which clients can read from.

func NewBasePubSub

func NewBasePubSub() PubSub

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL