Documentation
¶
Overview ¶
Package mpsc provides a lock-free MPSC (multiple-producer, single-consumer) queue built on atomic linked-list operations.
Producers call Accumulator.Add concurrently to enqueue items. A single consumer iterates over them in insertion order via Accumulator.Seq. Accumulator.Close inserts a sentinel node that causes Seq to return; the Accumulator is then reusable for another Add/Close/Seq cycle.
Delivery is exactly-once: when the consumer breaks out of Seq early, the last yielded item is consumed and a subsequent Seq resumes from the next unconsumed item.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Accumulator ¶
type Accumulator[T any] struct { // contains filtered or unexported fields }
Accumulator is a lock-free MPSC (multiple-producer, single-consumer) queue that streams an unbounded set of elements to a consumer via Accumulator.Seq. Producers call Accumulator.Add concurrently; a single consumer iterates with Seq. Accumulator.Close inserts a sentinel node that causes Seq to return, after which the Accumulator can be reused for another Add/Close/Seq cycle.
Delivery is exactly-once: on early break from Seq, the last yielded item is consumed and a subsequent Seq resumes from the next unconsumed item.
func NewAccumulator ¶
func NewAccumulator[T any]() *Accumulator[T]
func (*Accumulator[T]) Add ¶
func (a *Accumulator[T]) Add(values ...T)
Add adds values to the Accumulator[T] in the order they were provided.
func (*Accumulator[T]) Close ¶
func (a *Accumulator[T]) Close() error
Close signals a running Seq() iterator to terminate by atomically swapping in a sentinel terminal node. It is safe to call Close multiple times from different goroutines. Values added after Close are not visible to the current Seq() but can be consumed by calling Close again followed by a new Seq().
func (*Accumulator[T]) Seq ¶
func (a *Accumulator[T]) Seq() iter.Seq[T]
Seq returns an iter.Seq[T] that yields elements in insertion order, blocking until new nodes appear and terminating when it encounters a sentinel terminal node produced by Close().
Delivery is exactly-once: tail advances past each yielded item immediately, so on early break the last yielded item is consumed and a subsequent Seq() resumes from the next unconsumed item.
After Seq() returns, the Accumulator can be reused: call Add to enqueue more items, Close to insert a new terminal node, and Seq again to consume them.
Only one Seq() should be active at a time — concurrent iterators share the same tail position and the behavior is unpredictable.