mpmc

package
v1.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package mpmc provides a bounded, multiple-producer, multiple-consumer queue with optional automatic buffer growth.

Queue is safe for concurrent use by any number of senders and receivers. It uses a mutex-protected ring buffer with channel-based signaling to block senders when full and receivers when empty.

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidCapacity = errors.New("capacity must be a power of two >= 2")

ErrInvalidCapacity indicates that a capacity value is invalid.

Functions

This section is empty.

Types

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

Queue is a bounded, concurrency-safe FIFO buffer based on Dmitry Vyukov's MPMC algorithm. Multiple goroutines may call Send and Recv concurrently under a shared read lock, claiming positions via atomic CAS on head and tail and coordinating through per-slot sequence counters. A write lock is only acquired for buffer extension, Grow, and Close.

Send blocks when the buffer is full; Recv blocks when it is empty. When extensions are configured (see NewQueue), the buffer doubles automatically on a blocking Send, up to the configured limit. A negative extensions value allows unlimited growth.

After Queue.Close is called, Send and Recv return false and any blocked goroutines are woken. Items enqueued before Close can still be drained by subsequent Recv calls.

func MustQueue

func MustQueue[T any](capacity int, extensions int) *Queue[T]

MustQueue is like NewQueue but panics if the capacity is invalid.

func NewQueue

func NewQueue[T any](capacity int, extensions int) (*Queue[T], error)

NewQueue returns a Queue with the given initial capacity. The capacity must be a power of two >= 2 so that index masking can use bitwise AND. A minimum of 2 is required because the per-slot sequence counter cannot disambiguate the readable and writable states when capacity is 1. The extensions parameter limits how many times the buffer may double when full; a negative value allows unlimited growth.

Each slot's sequence counter is initialized to its index, which is required by the Vyukov algorithm to distinguish writable from readable positions on the first pass through the ring.

func (*Queue[T]) Capacity

func (p *Queue[T]) Capacity() int

Capacity returns the current buffer capacity. This may increase over time if extensions are configured.

func (*Queue[T]) Close

func (p *Queue[T]) Close()

Close shuts down the queue. Blocked Send and Recv calls are woken and return false. It is safe to call Close multiple times.

func (*Queue[T]) Grow

func (p *Queue[T]) Grow(n int) error

Grow increases the buffer capacity to at least n items, bypassing the extensions limit. The value of n must be a power of two; otherwise ErrInvalidCapacity is returned.

func (*Queue[T]) Recv

func (p *Queue[T]) Recv(ctx context.Context) (T, bool)

Recv dequeues and returns the next item, blocking if the buffer is empty. It returns false if the queue has been closed and fully drained, or if ctx is cancelled.

func (*Queue[T]) Send

func (p *Queue[T]) Send(ctx context.Context, item T) bool

Send enqueues item, blocking if the buffer is full. It returns true on success, or false if the queue has been closed or ctx is cancelled.

On the fast path (slot available, CAS wins) concurrent senders proceed without mutual exclusion under the shared read lock. When the buffer is full, Send either doubles the buffer (if extensions remain) or parks on the full channel until a Recv frees a slot.

func (*Queue[T]) Seq

func (p *Queue[T]) Seq(ctx context.Context) iter.Seq[T]

Seq returns an iterator that yields values as they are received from the queue. Breaking out of the loop closes the queue, which also terminates any other active Seq iterators on the same queue.

func (*Queue[T]) Size

func (p *Queue[T]) Size() int

Size returns the approximate number of items currently buffered. The result may be stale by the time the caller acts on it.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL