Documentation
¶
Overview ¶
Package mpmc provides a bounded, multiple-producer, multiple-consumer queue with optional automatic buffer growth.
Queue is safe for concurrent use by any number of senders and receivers. It uses a mutex-protected ring buffer with channel-based signaling to block senders when full and receivers when empty.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidCapacity = errors.New("capacity must be a power of two >= 2")
ErrInvalidCapacity indicates that a capacity value is invalid.
Functions ¶
This section is empty.
Types ¶
type Queue ¶
type Queue[T any] struct { // contains filtered or unexported fields }
Queue is a bounded, concurrency-safe FIFO buffer based on Dmitry Vyukov's MPMC algorithm. Multiple goroutines may call Send and Recv concurrently under a shared read lock, claiming positions via atomic CAS on head and tail and coordinating through per-slot sequence counters. A write lock is only acquired for buffer extension, Grow, and Close.
Send blocks when the buffer is full; Recv blocks when it is empty. When extensions are configured (see NewQueue), the buffer doubles automatically on a blocking Send, up to the configured limit. A negative extensions value allows unlimited growth.
After Queue.Close is called, Send and Recv return false and any blocked goroutines are woken. Items enqueued before Close can still be drained by subsequent Recv calls.
func NewQueue ¶
NewQueue returns a Queue with the given initial capacity. The capacity must be a power of two >= 2 so that index masking can use bitwise AND. A minimum of 2 is required because the per-slot sequence counter cannot disambiguate the readable and writable states when capacity is 1. The extensions parameter limits how many times the buffer may double when full; a negative value allows unlimited growth.
Each slot's sequence counter is initialized to its index, which is required by the Vyukov algorithm to distinguish writable from readable positions on the first pass through the ring.
func (*Queue[T]) Capacity ¶
Capacity returns the current buffer capacity. This may increase over time if extensions are configured.
func (*Queue[T]) Close ¶
func (p *Queue[T]) Close()
Close shuts down the queue. Blocked Send and Recv calls are woken and return false. It is safe to call Close multiple times.
func (*Queue[T]) Grow ¶
Grow increases the buffer capacity to at least n items, bypassing the extensions limit. The value of n must be a power of two; otherwise ErrInvalidCapacity is returned.
func (*Queue[T]) Recv ¶
Recv dequeues and returns the next item, blocking if the buffer is empty. It returns false if the queue has been closed and fully drained, or if ctx is cancelled.
func (*Queue[T]) Send ¶
Send enqueues item, blocking if the buffer is full. It returns true on success, or false if the queue has been closed or ctx is cancelled.
On the fast path (slot available, CAS wins) concurrent senders proceed without mutual exclusion under the shared read lock. When the buffer is full, Send either doubles the buffer (if extensions remain) or parks on the full channel until a Recv frees a slot.