Documentation
¶
Overview ¶
Example ¶
package main
import (
"fmt"
"sync"
"github.com/PowerDNS/lightningstream/utils/topics"
)
func main() {
t := topics.New[int]()
// No listeners yet to receive this
t.Publish(1)
// But you can access the last value
if last, ok := t.Last(); ok {
fmt.Printf("last=%v\n", last)
}
// Needed for test serialization
var wg sync.WaitGroup
wg.Add(1)
// Add a subscriber
sub := t.Subscribe(false)
go func() {
defer wg.Done()
ch := sub.Channel()
for {
m, ok := <-ch
if !ok {
fmt.Println("channel closed")
return // channel closed
}
fmt.Printf("received=%d\n", m)
}
}()
t.Publish(2)
t.Publish(3)
// Close subscription
sub.Close()
wg.Wait()
// Never received
t.Publish(4)
// But you can access the last value
if last, ok := t.Last(); ok {
fmt.Printf("last=%v\n", last)
}
}
Output: last=1 received=2 received=3 channel closed last=4
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Subscription ¶
type Subscription[T any] struct { // contains filtered or unexported fields }
Subscription is the reference to a Topic subscription. When the publisher writes to a Topic, it will BLOCK until all subscribers have received the message. Subscription MUST always be closed with Close() when no longer used.
func (*Subscription[T]) Channel ¶
func (s *Subscription[T]) Channel() <-chan T
Channel returns the chan that can be used to receive values from this subscription.
func (*Subscription[T]) Close ¶
func (s *Subscription[T]) Close()
Close terminates this subscription. The Topic will close the channel. Close can safely be called multiple times, even from different goroutines.
type Topic ¶
type Topic[T any] struct { // contains filtered or unexported fields }
Topic is a single topic that subscribers can Subscribe() to
func NewWithInitial ¶
NewWithInitial returns a new Topic that is pre-seeded with a last value.
func (*Topic[T]) Handle ¶
Handle makes it easy to consume a topic with a simple handler func. This function only returns when the callback returns an error or the context is canceled.
func (*Topic[T]) Publish ¶
func (t *Topic[T]) Publish(v T)
Publish publishes a new value to all subscribers
func (*Topic[T]) Subscribe ¶
func (t *Topic[T]) Subscribe(sendLast bool) *Subscription[T]
Subscribe creates a new Subscription. By default, this is an unbuffered channel. If sendLast is set: - We will immediately send the last value, if any. - The channel will be a buffered one with size 1.