mpsc

package
v1.13.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2026 License: Apache-2.0 Imports: 2 Imported by: 0

Documentation

Overview

Package mpsc provides a lock-free MPSC (multiple-producer, single-consumer) queue built on atomic linked-list operations.

Producers call Accumulator.Add concurrently to enqueue items. A single consumer iterates over them in insertion order via Accumulator.Seq. Accumulator.Close inserts a sentinel node that causes Seq to return; the Accumulator is then reusable for another Add/Close/Seq cycle.

Delivery is exactly-once: when the consumer breaks out of Seq early, the last yielded item is consumed and a subsequent Seq resumes from the next unconsumed item.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Accumulator

type Accumulator[T any] struct {
	// contains filtered or unexported fields
}

Accumulator is a lock-free MPSC (multiple-producer, single-consumer) queue that streams an unbounded set of elements to a consumer via Accumulator.Seq. Producers call Accumulator.Add concurrently; a single consumer iterates with Seq. Accumulator.Close inserts a sentinel node that causes Seq to return, after which the Accumulator can be reused for another Add/Close/Seq cycle.

Delivery is exactly-once: on early break from Seq, the last yielded item is consumed and a subsequent Seq resumes from the next unconsumed item.

func NewAccumulator

func NewAccumulator[T any]() *Accumulator[T]

func (*Accumulator[T]) Add

func (a *Accumulator[T]) Add(values ...T)

Add adds values to the Accumulator[T] in the order they were provided.

func (*Accumulator[T]) Close

func (a *Accumulator[T]) Close() error

Close signals a running Seq() iterator to terminate by atomically swapping in a sentinel terminal node. It is safe to call Close multiple times from different goroutines. Values added after Close are not visible to the current Seq() but can be consumed by calling Close again followed by a new Seq().

func (*Accumulator[T]) Seq

func (a *Accumulator[T]) Seq() iter.Seq[T]

Seq returns an iter.Seq[T] that yields elements in insertion order, blocking until new nodes appear and terminating when it encounters a sentinel terminal node produced by Close().

Delivery is exactly-once: tail advances past each yielded item immediately, so on early break the last yielded item is consumed and a subsequent Seq() resumes from the next unconsumed item.

After Seq() returns, the Accumulator can be reused: call Add to enqueue more items, Close to insert a new terminal node, and Seq again to consume them.

Only one Seq() should be active at a time — concurrent iterators share the same tail position and the behavior is unpredictable.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL