broker

package
v2.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: GPL-3.0 Imports: 4 Imported by: 0

Documentation

Overview

Package broker provides a simple in-process notification broker for broadcasting messages to multiple consumers without blocking.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker manages notification subscriptions and broadcasts messages to all subscribers. It uses non-blocking sends to ensure that slow consumers cannot block the system.

func NewBroker

func NewBroker(ctx context.Context, source <-chan models.Notification) *Broker

NewBroker creates a new notification broker that reads from the source channel and broadcasts to all subscribers.

func (*Broker) Start

func (b *Broker) Start()

Start begins the broker's main broadcast loop in a goroutine. It reads notifications from the source channel and sends them to all subscribers using non-blocking sends. When the source channel closes or context is cancelled, it closes all subscriber channels and exits.

func (*Broker) Stop

func (b *Broker) Stop()

Stop gracefully shuts down the broker by closing all subscriber channels. This should be called during service shutdown.

func (*Broker) Subscribe

func (b *Broker) Subscribe(bufferSize int) (notifChan <-chan models.Notification, id int)

Subscribe creates a new subscription and returns a channel that will receive notifications. The bufferSize determines how many notifications can be queued before sends start blocking (and eventually dropping with warnings).

Returns the notification channel and a subscription ID that can be used for unsubscribing.

func (*Broker) Unsubscribe

func (b *Broker) Unsubscribe(id int)

Unsubscribe removes a subscription and closes its channel. It's safe to call this multiple times with the same ID.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL