Documentation
¶
Overview ¶
Package mpsc provides a lock-free MPSC (multiple-producer, single-consumer) queue built on atomic linked-list operations.
Producers call Accumulator.Send concurrently to enqueue items. A single consumer iterates over them in insertion order via Accumulator.Seq. Accumulator.Close must be called after all producers have completed to signal the end of the stream and wake the consumer.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Accumulator ¶
type Accumulator[T any] struct { // contains filtered or unexported fields }
Accumulator is a lock-free MPSC (multiple-producer, single-consumer) queue that streams an unbounded set of elements to a consumer via Accumulator.Seq. Producers call Accumulator.Send concurrently; a single consumer iterates with Seq.
Accumulator.Close must be called only after all producers have completed their Send calls. It inserts a sentinel node that causes Seq to return. The Accumulator is single-use and cannot be reused after Close.
Delivery is exactly-once: on early break from Seq, the last yielded item is consumed.
func NewAccumulator ¶
func NewAccumulator[T any]() *Accumulator[T]
NewAccumulator returns a new, empty Accumulator ready for use.
func (*Accumulator[T]) Close ¶
func (a *Accumulator[T]) Close() error
Close signals a running Seq() iterator to terminate by atomically swapping in a sentinel terminal node and closing an internal done channel. Close must be called only after all producers have completed their Send calls.
It is safe to call Close multiple times from different goroutines; only the first call has any effect.
func (*Accumulator[T]) Recv ¶ added in v1.14.0
func (a *Accumulator[T]) Recv(ctx context.Context) (T, bool)
Recv returns the next value from the queue, blocking until one is available. It returns false when the Accumulator has been closed and drained, or when ctx is cancelled.
func (*Accumulator[T]) Send ¶ added in v1.14.0
func (a *Accumulator[T]) Send(value T) bool
Send adds a value to the Accumulator[T]. It is safe to call Send concurrently. After Close has been called, Send will always return false.
func (*Accumulator[T]) Seq ¶
func (a *Accumulator[T]) Seq(ctx context.Context) iter.Seq[T]
Seq returns an iter.Seq[T] that yields elements in insertion order, blocking until new nodes appear and terminating when it encounters a sentinel terminal node produced by Close().
Delivery is exactly-once: tail advances past each yielded item immediately, so on early break the last yielded item is consumed.
Only one Seq() should be active at a time — concurrent iterators share the same tail position and the behavior is unpredictable.