topics

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2025 License: MIT Imports: 3 Imported by: 0

Documentation

Overview

Example
package main

import (
	"fmt"
	"sync"

	"github.com/PowerDNS/lightningstream/utils/topics"
)

func main() {
	t := topics.New[int]()
	// No listeners yet to receive this
	t.Publish(1)

	// But you can access the last value
	if last, ok := t.Last(); ok {
		fmt.Printf("last=%v\n", last)
	}

	// Needed for test serialization
	var wg sync.WaitGroup
	wg.Add(1)

	// Add a subscriber
	sub := t.Subscribe(false)
	go func() {
		defer wg.Done()
		ch := sub.Channel()
		for {
			m, ok := <-ch
			if !ok {
				fmt.Println("channel closed")
				return // channel closed
			}
			fmt.Printf("received=%d\n", m)
		}
	}()

	t.Publish(2)
	t.Publish(3)

	// Close subscription
	sub.Close()
	wg.Wait()

	// Never received
	t.Publish(4)

	// But you can access the last value
	if last, ok := t.Last(); ok {
		fmt.Printf("last=%v\n", last)
	}

}
Output:

last=1
received=2
received=3
channel closed
last=4

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Subscription

type Subscription[T any] struct {
	// contains filtered or unexported fields
}

Subscription is the reference to a Topic subscription. When the publisher writes to a Topic, it will BLOCK until all subscribers have received the message. Subscription MUST always be closed with Close() when no longer used.

func (*Subscription[T]) Channel

func (s *Subscription[T]) Channel() <-chan T

Channel returns the chan that can be used to receive values from this subscription.

func (*Subscription[T]) Close

func (s *Subscription[T]) Close()

Close terminates this subscription. The Topic will close the channel. Close can safely be called multiple times, even from different goroutines.

func (*Subscription[T]) Next

func (s *Subscription[T]) Next(ctx context.Context) (value T, err error)

Next is a convenience method to read the next value. It blocks until the next value is available, or until the context is closed. It returns an error if the context or channel was closed.

type Topic

type Topic[T any] struct {
	// contains filtered or unexported fields
}

Topic is a single topic that subscribers can Subscribe() to

func New

func New[T any]() *Topic[T]

New returns a new Topic

func NewWithInitial

func NewWithInitial[T any](v T) *Topic[T]

NewWithInitial returns a new Topic that is pre-seeded with a last value.

func (*Topic[T]) Handle

func (t *Topic[T]) Handle(ctx context.Context, cb func(T) error) error

Handle makes it easy to consume a topic with a simple handler func. This function only returns when the callback returns an error or the context is canceled.

func (*Topic[T]) Last

func (t *Topic[T]) Last() (value T, ok bool)

Last returns the last published value, if available

func (*Topic[T]) Publish

func (t *Topic[T]) Publish(v T)

Publish publishes a new value to all subscribers

func (*Topic[T]) Subscribe

func (t *Topic[T]) Subscribe(sendLast bool) *Subscription[T]

Subscribe creates a new Subscription. By default, this is an unbuffered channel. If sendLast is set: - We will immediately send the last value, if any. - The channel will be a buffered one with size 1.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL