Documentation
¶
Overview ¶
Package broadcast implements a fan-out event stream with overwrite-on-full semantics. Multiple producers can call Send concurrently and multiple consumers can Subscribe. Every consumer receives every message in order unless it has fallen behind far enough that its next sequence has been overwritten; in that case the consumer receives a lag notification (LaggedError) indicating where to resume. Notes:
- Producers are serialised by a global mutex (b.mu), so producer-side throughput is effectively single-writer (MPMC-safe but not truly parallel on Send).
- The ring is bounded and overwriting; backpressure to producers does not apply, except that producers can stall when wrapping to a slot currently being read.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // Returned when the broadcast is closed and no more sends are possible. ErrClosed = errors.New("broadcast channel closed") // Returned when the requested sequence is ahead of the latest published (not yet available). ErrFutureSeq = errors.New("requested sequence is not published yet") // Returned when the requested sequence is 0. Which is indicates unitialized sequence in this // system. ErrInvalidSequence = errors.New("invalid sequence: 0") ErrNoEvent = errors.New("no event present") ErrNoLag = errors.New("no lag info present") )
Functions ¶
This section is empty.
Types ¶
type Broadcast ¶
type Broadcast[T any] struct { // contains filtered or unexported fields }
Broadcast is the fan-out ring buffer. Fields: - buffer: ring buffer of slots. - mu: global mutex serialising all Send calls (MPMC-safe, single-writer). - tail: next sequence to assign (number of messages published so far). - capacity: ring capacity (power of two, rounded up from the requested value). - mask: capacity-1 for index masking. - ctx: broadcast lifetime context; when canceled, Send returns ErrClosed. - cancel: function to cancel ctx
func New ¶
New constructs a Broadcast with a ring of at least the given capacity. - The actual capacity is rounded up to the next power of two. - Each slot’s condition variable is initialised to use the slot’s mutex. - The initial tail is 0 (no messages published).
func (*Broadcast[T]) Close ¶
func (b *Broadcast[T]) Close()
Close cancels the Broadcast context (preventing further successful Send calls). It then broadcasts on the cond of the slot at index (tail & mask) to wake any waiters on that slot. Subscribers only waits for tail thus only broadcasting to tail slot should wake all waiting goroutines. Closes under global lock to avoid race with producers in order to preserve the integrity of single slot broadcast closing.
func (*Broadcast[T]) Send ¶
Send publishes msg to the ring. Behaviour:
- Returns ErrClosed if the Broadcast context is already canceled.
- Serialises concurrent producers via b.mu.
- Writes msg to slot at index (tail & mask), sets slot.seq to tail+1, and broadcasts on that slot’s condition variable to wake any waiters on that slot.
- Increments tail by 1.
- No backpressure: Send does not wait for readers (aside from brief acquisition of the per-slot lock).
func (*Broadcast[T]) Subscribe ¶
func (b *Broadcast[T]) Subscribe() *Subscription[T]
Subscribe creates a new subscription that starts from the current tail (i.e., it will receive the next message published). The returned Subscription spawns an internal delivery goroutine. To stop receiving, call Unsubscribe; the out channel is closed on termination.
type EventOrLag ¶
type EventOrLag[T any] struct { // contains filtered or unexported fields }
EventOrLag is a tagged union:
- If isLag is false, event holds a regular message.
- If isLag is true, lag holds the lag notification.
func (*EventOrLag[T]) Event ¶
func (e *EventOrLag[T]) Event() (T, error)
Event returns the event value or an error if none.
func (*EventOrLag[T]) IsEvent ¶
func (e *EventOrLag[T]) IsEvent() bool
IsEvent returns true if the EventOrLag contains a regular Event.
func (*EventOrLag[T]) IsLag ¶
func (e *EventOrLag[T]) IsLag() bool
IsLag returns true if the EventOrLag contains a lag notification.
func (*EventOrLag[T]) Lag ¶
func (e *EventOrLag[T]) Lag() (LaggedError, error)
Lag returns the lag info or an error if none.
type LaggedError ¶
type LaggedError struct {
MissedSeq uint64 // The sequence the subscriber attempted to read
NextSeq uint64 // The oldest available sequence in the buffer (where subscriber resumes)
}
LaggedError indicates the requested sequence was overwritten by newer writes. - MissedSeq: the requested sequence that was lost. - NextSeq: the oldest sequence still available (resume point).
func (*LaggedError) Error ¶
func (e *LaggedError) Error() string
type Subscription ¶
type Subscription[T any] struct { // contains filtered or unexported fields }
Subscription represents a single consumer. Fields: - bcast: back reference to broadcast - nextSeq: next sequence number this subscriber expects to receive. - out: user-facing channel delivering EventOrLag values. - done: internal stop signal for this subscription.
Lifecycle:
- Created by Broadcast.Subscribe, which spawns a delivery goroutine (run).
- Call Unsubscribe to stop; it requests the goroutine to exit promptly and wakes it if blocked, then the goroutine closes out upon return.
- If Broadcast.Close is called, subscriptions will only exit once they can observe the closed context in their run loop, after draining the channel.
func (*Subscription[T]) Recv ¶
func (sub *Subscription[T]) Recv() <-chan EventOrLag[T]
Recv returns the user-facing channel for this subscription.
func (*Subscription[T]) Unsubscribe ¶
func (sub *Subscription[T]) Unsubscribe()
Unsubscribe terminates the subscribers delivery goroutine.