broadcast

package
v0.15.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package broadcast implements a fan-out event stream with overwrite-on-full semantics. Multiple producers can call Send concurrently and multiple consumers can Subscribe. Every consumer receives every message in order unless it has fallen behind far enough that its next sequence has been overwritten; in that case the consumer receives a lag notification (LaggedError) indicating where to resume. Notes:

  • Producers are serialised by a global mutex (b.mu), so producer-side throughput is effectively single-writer (MPMC-safe but not truly parallel on Send).
  • The ring is bounded and overwriting; backpressure to producers does not apply, except that producers can stall when wrapping to a slot currently being read.

Index

Constants

This section is empty.

Variables

View Source
var (
	// Returned when the broadcast is closed and no more sends are possible.
	ErrClosed = errors.New("broadcast channel closed")
	// Returned when the requested sequence is ahead of the latest published (not yet available).
	ErrFutureSeq = errors.New("requested sequence is not published yet")
	// Returned when the requested sequence is 0. Which is indicates unitialized sequence in this
	// system.
	ErrInvalidSequence = errors.New("invalid sequence: 0")
	ErrNoEvent         = errors.New("no event present")
	ErrNoLag           = errors.New("no lag info present")
)

Functions

This section is empty.

Types

type Broadcast

type Broadcast[T any] struct {
	// contains filtered or unexported fields
}

Broadcast is the fan-out ring buffer. Fields: - buffer: ring buffer of slots. - mu: global mutex serialising all Send calls (MPMC-safe, single-writer). - tail: next sequence to assign (number of messages published so far). - capacity: ring capacity (power of two, rounded up from the requested value). - mask: capacity-1 for index masking. - ctx: broadcast lifetime context; when canceled, Send returns ErrClosed. - cancel: function to cancel ctx

func New

func New[T any](capacity uint64) *Broadcast[T]

New constructs a Broadcast with a ring of at least the given capacity. - The actual capacity is rounded up to the next power of two. - Each slot’s condition variable is initialised to use the slot’s mutex. - The initial tail is 0 (no messages published).

func (*Broadcast[T]) Close

func (b *Broadcast[T]) Close()

Close cancels the Broadcast context (preventing further successful Send calls). It then broadcasts on the cond of the slot at index (tail & mask) to wake any waiters on that slot. Subscribers only waits for tail thus only broadcasting to tail slot should wake all waiting goroutines. Closes under global lock to avoid race with producers in order to preserve the integrity of single slot broadcast closing.

func (*Broadcast[T]) Send

func (b *Broadcast[T]) Send(msg *T) error

Send publishes msg to the ring. Behaviour:

  • Returns ErrClosed if the Broadcast context is already canceled.
  • Serialises concurrent producers via b.mu.
  • Writes msg to slot at index (tail & mask), sets slot.seq to tail+1, and broadcasts on that slot’s condition variable to wake any waiters on that slot.
  • Increments tail by 1.
  • No backpressure: Send does not wait for readers (aside from brief acquisition of the per-slot lock).

func (*Broadcast[T]) Subscribe

func (b *Broadcast[T]) Subscribe() *Subscription[T]

Subscribe creates a new subscription that starts from the current tail (i.e., it will receive the next message published). The returned Subscription spawns an internal delivery goroutine. To stop receiving, call Unsubscribe; the out channel is closed on termination.

type EventOrLag

type EventOrLag[T any] struct {
	// contains filtered or unexported fields
}

EventOrLag is a tagged union:

  • If isLag is false, event holds a regular message.
  • If isLag is true, lag holds the lag notification.

func (*EventOrLag[T]) Event

func (e *EventOrLag[T]) Event() (T, error)

Event returns the event value or an error if none.

func (*EventOrLag[T]) IsEvent

func (e *EventOrLag[T]) IsEvent() bool

IsEvent returns true if the EventOrLag contains a regular Event.

func (*EventOrLag[T]) IsLag

func (e *EventOrLag[T]) IsLag() bool

IsLag returns true if the EventOrLag contains a lag notification.

func (*EventOrLag[T]) Lag

func (e *EventOrLag[T]) Lag() (LaggedError, error)

Lag returns the lag info or an error if none.

type LaggedError

type LaggedError struct {
	MissedSeq uint64 // The sequence the subscriber attempted to read
	NextSeq   uint64 // The oldest available sequence in the buffer (where subscriber resumes)
}

LaggedError indicates the requested sequence was overwritten by newer writes. - MissedSeq: the requested sequence that was lost. - NextSeq: the oldest sequence still available (resume point).

func (*LaggedError) Error

func (e *LaggedError) Error() string

type Subscription

type Subscription[T any] struct {
	// contains filtered or unexported fields
}

Subscription represents a single consumer. Fields: - bcast: back reference to broadcast - nextSeq: next sequence number this subscriber expects to receive. - out: user-facing channel delivering EventOrLag values. - done: internal stop signal for this subscription.

Lifecycle:

  • Created by Broadcast.Subscribe, which spawns a delivery goroutine (run).
  • Call Unsubscribe to stop; it requests the goroutine to exit promptly and wakes it if blocked, then the goroutine closes out upon return.
  • If Broadcast.Close is called, subscriptions will only exit once they can observe the closed context in their run loop, after draining the channel.

func (*Subscription[T]) Recv

func (sub *Subscription[T]) Recv() <-chan EventOrLag[T]

Recv returns the user-facing channel for this subscription.

func (*Subscription[T]) Unsubscribe

func (sub *Subscription[T]) Unsubscribe()

Unsubscribe terminates the subscribers delivery goroutine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL