Documentation
¶
Overview ¶
Create a new publisher-subscriber (PubSub) that allows multiple producers publishing to multiple consumers. The consumers can subscribe to messages by specifying a topic pattern (supporting wildcards) that it wants to receive messages from, and producers can only write to a specific topic (i.e one to many).
For example, two consumers where one subscribes to “cosmosA-events-eventA” and another subscribes to “cosmosA-events-*” will both receive the message that was published by producers writing to “cosmosA-events-eventA”.
A consumer can start subscribing to events before there is any publisher, and will start to receive data once a publisher is created and publishes to that topic.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrEmptyTopic = errors.New("topic cannot be empty") ErrInvalidTopic = errors.New("invalid topic") )
Topic errors
Functions ¶
func MatchTopic ¶
MatchTopic returns true if a given pattern matches the provided topic and returns false otherwise. The pattern may contain arbitrary wildcards '*'. It is assumed the topic is valid.
Example: ok := MatchTopic("a.b.c", "a.b.c") // ok: true ok := MatchTopic("a.b.c", "a.*.c") // ok: true ok := MatchTopic("a.b.c", "a.*.*") // ok: true ok := MatchTopic("a.b.c", "c.b.a") // ok: false ok := MatchTopic("a.b.c", "a.*") // ok: false
func ValidTopic ¶
ValidTopic returns an error if a topic is invalid and nil otherwise. A valid topic consists of arbitrary-length alphanumeric characters separated by a valid deliminator: /,.,-
Types ¶
type BaseProducer ¶
type BaseProducer struct {
// contains filtered or unexported fields
}
BaseProducer implements the Producer interface by implementing basic publishing capabilities on a single topic.
func (*BaseProducer) Publish ¶
func (bp *BaseProducer) Publish(msg Message) error
Publish will attempt to publish the provided Message. It will return an error if the publisher's internal queue is full.
func (*BaseProducer) TotalSubscriptions ¶
func (bp *BaseProducer) TotalSubscriptions() int
TotalSubscriptions returns the total number of subscriptions the producer currently has.
type BasePubSub ¶
type BasePubSub struct {
// contains filtered or unexported fields
}
BasePubSub implements a simple PubSub model where internally it maintains a set of BaseProducers per topic where each topic is unique and a set of BaseConsumers such that the relationship between BaseProducer and BaseConsumer is one-to-one. Each internal BaseConsumer maintains an arbitrary set of subscriptions for a given topic pattern.
func (*BasePubSub) RegisterProducer ¶
func (bps *BasePubSub) RegisterProducer(topic string, producer Producer) error
RegisterProducer attempts to register a Producer for a given topic. A BaseConsumer is created for each Producer and starts to listen for new Messages. The Producer must be of type BaseProducer. An error will be returned if the topic is invalid or if a Producer already registered for that topic.
func (*BasePubSub) Subscribe ¶
func (bps *BasePubSub) Subscribe(pattern string) <-chan Message
Subscribe will create and return a new subscription (read-only Message channel) for a topic pattern. It will find all matching topics (if any exist) where the subscription will receive Messages from each associated Producer. If no topics exist for that pattern, the subscription will never receive any Messages even if a matching topic is created later.
Note, there is no guarantee when the subscription will be added to any given producer so the client must not rely on timing. This allows Subscribe to not take abnormal amount of time when adding subscriptions.
type Message ¶
Message defines a type alias for an interface and represents arbitrary messages that a Publisher may publish and a Consumer may receive.
type Producer ¶
Producer defines a contract which a producer in a pubsub model must implement.
func NewBaseProducer ¶
type PubSub ¶
type PubSub interface {
RegisterProducer(topic string, producer Producer) error
// TODO: consider returning a concrete type for richer functionality
Subscribe(topicPattern string) <-chan Message
}
PubSub defines a minimal interface for a publisher-subscriber (PubSub) model. A PubSub must be able to registry arbitrary Producers which each Producer publishes messages on a unique topic.
Clients must be able to subscribe to topics using a topic pattern where the pattern may contain wildcards. Subscriptions must return a channel from which clients can read from.
func NewBasePubSub ¶
func NewBasePubSub() PubSub