Documentation
¶
Overview ¶
Package pubsub implements a simple multi-topic pub-sub library.
A topic can have any number of subscribers. All subscribers receive messages published on the topic.
Example ¶
package main
import (
"fmt"
"github.com/cskr/pubsub/v2"
)
const topic = "topic"
func main() {
ps := pubsub.New[string, string](0)
ch := ps.Sub(topic)
go publish(ps)
for i := 1; ; i++ {
if i == 5 {
// See the documentation of Unsub for why it is called in a new
// goroutine.
go ps.Unsub(ch, "topic")
}
if msg, ok := <-ch; ok {
fmt.Printf("Received %s, %d times.\n", msg, i)
} else {
break
}
}
}
func publish(ps *pubsub.PubSub[string, string]) {
for {
ps.Pub("message", topic)
}
}
Index ¶
- type PubSub
- func (ps *PubSub[T, M]) AddSub(ch chan M, topics ...T)
- func (ps *PubSub[T, M]) AddSubOnceEach(ch chan M, topics ...T)
- func (ps *PubSub[T, M]) Close(topics ...T)
- func (ps *PubSub[T, M]) Pub(msg M, topics ...T)
- func (ps *PubSub[T, M]) Shutdown()
- func (ps *PubSub[T, M]) Sub(topics ...T) chan M
- func (ps *PubSub[T, M]) SubOnce(topics ...T) chan M
- func (ps *PubSub[T, M]) SubOnceEach(topics ...T) chan M
- func (ps *PubSub[T, M]) TryPub(msg M, topics ...T)
- func (ps *PubSub[T, M]) Unsub(ch chan M, topics ...T)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type PubSub ¶
type PubSub[T comparable, M any] struct { // contains filtered or unexported fields }
PubSub is a collection of topics.
func New ¶
func New[T comparable, M any](capacity int) *PubSub[T, M]
New creates a new PubSub and starts a goroutine for handling operations. Sub and SubOnce will create channels with the given capacity.
func (*PubSub[T, M]) AddSub ¶
func (ps *PubSub[T, M]) AddSub(ch chan M, topics ...T)
AddSub adds subscriptions to an existing channel.
func (*PubSub[T, M]) AddSubOnceEach ¶
func (ps *PubSub[T, M]) AddSubOnceEach(ch chan M, topics ...T)
AddSubOnceEach adds subscriptions to an existing channel with SubOnceEach behavior.
func (*PubSub[T, M]) Close ¶
func (ps *PubSub[T, M]) Close(topics ...T)
Close closes all channels currently subscribed to the specified topics. If a channel is subscribed to multiple topics, some of which is not specified, it is not closed.
func (*PubSub[T, M]) Pub ¶
func (ps *PubSub[T, M]) Pub(msg M, topics ...T)
Pub publishes the given message to all subscribers of the specified topics.
func (*PubSub[T, M]) Shutdown ¶
func (ps *PubSub[T, M]) Shutdown()
Shutdown closes all subscribed channels and terminates the goroutine.
func (*PubSub[T, M]) Sub ¶
func (ps *PubSub[T, M]) Sub(topics ...T) chan M
Sub returns a channel from which messages published on the specified topics can be received.
func (*PubSub[T, M]) SubOnce ¶
func (ps *PubSub[T, M]) SubOnce(topics ...T) chan M
SubOnce is similar to Sub, but only the first message published, after subscription, on any of the specified topics can be received.
func (*PubSub[T, M]) SubOnceEach ¶
func (ps *PubSub[T, M]) SubOnceEach(topics ...T) chan M
SubOnceEach returns a channel on which callers receive, at most, one message for each topic.
func (*PubSub[T, M]) TryPub ¶
func (ps *PubSub[T, M]) TryPub(msg M, topics ...T)
TryPub publishes the given message to all subscribers of the specified topics if the topic has buffer space.
func (*PubSub[T, M]) Unsub ¶
func (ps *PubSub[T, M]) Unsub(ch chan M, topics ...T)
Unsub unsubscribes the given channel from the specified topics. If no topic is specified, it is unsubscribed from all topics.
Unsub must be called from a goroutine that is different from the subscriber. The subscriber must consume messages from the channel until it reaches the end. Not doing so can result in a deadlock.