lfq

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2026 License: MIT Imports: 5 Imported by: 0

README

lfq

Go Reference Go Report Card Codecov

Languages: English | 简体中文 | 日本語 | Español | Français

Lock-free and wait-free FIFO queue implementations for Go.

Overview

Package lfq provides bounded FIFO queues optimized for different producer/consumer patterns. Each variant uses the suitable algorithm for its access pattern.

// Direct constructor (recommended for most cases)
q := lfq.NewSPSC[Event](1024)

// Builder API - auto-selects algorithm based on constraints
q := lfq.Build[Event](lfq.New(1024).SingleProducer().SingleConsumer())  // → SPSC
q := lfq.Build[Event](lfq.New(1024).SingleConsumer())                   // → MPSC
q := lfq.Build[Event](lfq.New(1024).SingleProducer())                   // → SPMC
q := lfq.Build[Event](lfq.New(1024))                                    // → MPMC

Installation

go get code.hybscloud.com/lfq

Requirements: Go 1.26+

Compiler Requirement

For better performance, compile with the intrinsics-optimized Go compiler:

# Using Makefile (recommended)
make install-compiler   # Download pre-built release (~30 seconds)
make build              # Build with intrinsics compiler
make test               # Test with intrinsics compiler

# Or build compiler from source (bleeding-edge)
make install-compiler-source

Manual installation:

# Pre-built release (recommended)
OS=$(uname -s | tr '[:upper:]' '[:lower:]')
ARCH=$(uname -m | sed 's/x86_64/amd64/;s/aarch64/arm64/')
URL=$(curl -fsSL https://api.github.com/repos/hayabusa-cloud/go/releases/latest | grep "browser_download_url.*${OS}-${ARCH}\.tar\.gz\"" | cut -d'"' -f4)
curl -fsSL "$URL" | tar -xz -C ~/sdk
mv ~/sdk/go ~/sdk/go-atomix

# Use for building lfq-dependent code
GOROOT=~/sdk/go-atomix ~/sdk/go-atomix/bin/go build ./...

The intrinsics compiler inlines atomix operations with proper memory ordering. The standard Go compiler works for basic testing but may exhibit issues under high contention.

Queue Types

Type Pattern Progress Guarantee Use Case
SPSC Single-Producer Single-Consumer Wait-free Pipeline stages, channels
MPSC Multi-Producer Single-Consumer Lock-free Event aggregation, logging
SPMC Single-Producer Multi-Consumer Lock-free Work distribution
MPMC Multi-Producer Multi-Consumer Lock-free General purpose
Progress Guarantees
  • Wait-free: Every operation completes in bounded steps
  • Lock-free: System-wide progress guaranteed; at least one thread makes progress

Algorithms

SPSC: Lamport Ring Buffer

Classic bounded buffer with cached index optimization.

q := lfq.NewSPSC[int](1024)

// Producer
q.Enqueue(&value)  // Wait-free O(1)

// Consumer
elem, err := q.Dequeue()  // Wait-free O(1)
MPSC/SPMC/MPMC: FAA-Based (Default)

By default, multi-access queues use FAA (Fetch-And-Add) based algorithms derived from SCQ (Scalable Circular Queue). FAA blindly increments position counters, requiring 2n physical slots for capacity n, but scales better under high contention than CAS-based alternatives.

// Multiple producers, single consumer
q := lfq.NewMPSC[Event](1024)  // FAA producers, wait-free dequeue

// Single producer, multiple consumers
q := lfq.NewSPMC[Task](1024)   // Wait-free enqueue, FAA consumers

// Multiple producers and consumers
q := lfq.NewMPMC[*Request](4096)  // FAA-based SCQ algorithm

Cycle-based slot validation provides ABA safety without epoch counters or hazard pointers.

Indirect/Ptr Variants: 128-bit Atomic Operations

Indirect and Ptr queue variants (non-SPSC, non-Compact) pack sequence number and value into a single 128-bit atomic. This reduces cache line contention and improves throughput under high concurrency.

// Indirect - single 128-bit atomic per operation
q := lfq.NewMPMCIndirect(4096)

// Ptr - same optimization for unsafe.Pointer
q := lfq.NewMPMCPtr(4096)

Builder API

Automatic algorithm selection based on constraints:

// SPSC - both constraints → Lamport ring
q := lfq.Build[T](lfq.New(1024).SingleProducer().SingleConsumer())

// MPSC - single consumer only
q := lfq.Build[T](lfq.New(1024).SingleConsumer())

// SPMC - single producer only
q := lfq.Build[T](lfq.New(1024).SingleProducer())

// MPMC - no constraints (default)
q := lfq.Build[T](lfq.New(1024))

Variants

Each queue type has three variants:

Variant Element Type Use Case
Generic [T any] Type-safe, general purpose
Indirect uintptr Index-based pools, handles
Ptr unsafe.Pointer Zero-copy pointer passing
// Generic
q := lfq.NewMPMC[MyStruct](1024)

// Indirect - for pool indices
q := lfq.NewMPMCIndirect(1024)
q.Enqueue(uintptr(poolIndex))

// Pointer - zero-copy
q := lfq.NewMPMCPtr(1024)
q.Enqueue(unsafe.Pointer(obj))
Compact Mode

Compact() selects CAS-based algorithms that use n physical slots (vs 2n for FAA-based default). Use when memory efficiency is more important than contention scalability:

// Compact mode - CAS-based, n slots
q := lfq.New(4096).Compact().BuildIndirect()
Mode Algorithm Physical Slots Use When
Default FAA-based 2n High contention, scalability
Compact CAS-based n Memory constrained

SPSC variants already use n slots (Lamport ring buffer) and ignore Compact(). For Indirect queues with Compact(), values are limited to 63 bits.

Operations

Operation Returns Description
Enqueue(elem) error Add element; returns ErrWouldBlock if full
Dequeue() (T, error) Remove element; returns ErrWouldBlock if empty
Cap() int Queue capacity
Error Handling
err := q.Enqueue(&item)
if lfq.IsWouldBlock(err) {
    // Queue is full - backpressure or retry
}

elem, err := q.Dequeue()
if lfq.IsWouldBlock(err) {
    // Queue is empty - wait or poll
}

Usage Patterns

Buffer Pool
const poolSize = 1024
const bufSize = 4096

// Pre-allocate buffers
pool := make([][]byte, poolSize)
for i := range pool {
    pool[i] = make([]byte, bufSize)
}

// Free list tracks available indices
freeList := lfq.NewSPSCIndirect(poolSize)
for i := range poolSize {
    freeList.Enqueue(uintptr(i))
}

// Allocate
func Alloc() ([]byte, uintptr, bool) {
    idx, err := freeList.Dequeue()
    if err != nil {
        return nil, 0, false
    }
    return pool[idx], idx, true
}

// Free
func Free(idx uintptr) {
    freeList.Enqueue(idx)
}
Event Aggregation
type Event struct {
    Source    string
    Timestamp time.Time
    Data      any
}

// Multiple sources → Single processor
events := lfq.NewMPSC[Event](8192)

// Event sources (multiple producers)
for sensor := range slices.Values(sensors) {
    go func(s Sensor) {
        for reading := range s.Readings() {
            ev := Event{
                Source:    s.Name(),
                Timestamp: time.Now(),
                Data:      reading,
            }
            events.Enqueue(&ev)
        }
    }(sensor)
}

// Single aggregator (single consumer)
go func() {
    for {
        ev, err := events.Dequeue()
        if err == nil {
            aggregate(ev)
        }
    }
}()
Backpressure Handling
// With retry and yield
func EnqueueWithRetry(q lfq.Queue[Item], item Item, maxRetries int) bool {
	ba := iox.Backoff{}
    for i := range maxRetries {
        if q.Enqueue(&item) == nil {
            return true
        }
        ba.Wait() // Yield to let consumers drain
    }
    return false // Apply backpressure to caller
}

Graceful Shutdown

FAA-based queues (MPMC, SPMC, MPSC) include a threshold mechanism to prevent livelock. For graceful shutdown where producers finish before consumers, use the Drainer interface:

// Producer goroutines finish
prodWg.Wait()

// Signal no more enqueues will occur
if d, ok := q.(lfq.Drainer); ok {
    d.Drain()
}

// Consumers can now drain all remaining items
// without threshold blocking
for {
    item, err := q.Dequeue()
    if err != nil {
        break // Queue is empty
    }
    process(item)
}

Drain() is a hint — the caller must ensure no further Enqueue() calls will be made. SPSC queues do not implement Drainer as they have no threshold mechanism; the type assertion naturally handles this case.

When to Use Which Queue

┌─────────────────────────────────────────────────────────────────┐
│                    How many producers?                          │
│                                                                 │
│      ┌──────────────────┐          ┌────────────────────┐      │
│      │    One (SPSC/     │          │   Multiple (MPMC/  │      │
│      │    SPMC)          │          │   MPSC)            │      │
│      └────────┬─────────┘          └─────────┬──────────┘      │
│               │                               │                 │
│               ▼                               ▼                 │
│   ┌──────────────────┐              ┌──────────────────┐       │
│   │ One consumer?    │              │ One consumer?    │       │
│   └────────┬─────────┘              └────────┬─────────┘       │
│    Yes     │     No                  Yes     │     No          │
│     │      │      │                   │      │      │          │
│     ▼      │      ▼                   ▼      │      ▼          │
│   SPSC     │    SPMC                MPSC     │    MPMC         │
│            │                                 │                  │
└────────────┴─────────────────────────────────┴─────────────────┘

Variant Selection:
• Generic [T]     → Type-safe, copying semantics
• Indirect        → Pool indices, buffer offsets (uintptr)
• Ptr             → Zero-copy object passing (unsafe.Pointer)
Capacity

Capacity rounds up to the next power of 2:

q := lfq.NewMPMC[int](3)     // Actual capacity: 4
q := lfq.NewMPMC[int](4)     // Actual capacity: 4
q := lfq.NewMPMC[int](1000)  // Actual capacity: 1024
q := lfq.NewMPMC[int](1024)  // Actual capacity: 1024

Memory Layout

All queues use cache-line padding (64 bytes) to prevent false sharing:

type MPMC[T any] struct {
    _        [64]byte      // Padding
    tail     atomix.Uint64 // Producer index
    _        [64]byte      // Padding
    head     atomix.Uint64 // Consumer index
    _        [64]byte      // Padding
    buffer   []slot[T]
    // ...
}

Race Detection

Go's race detector is not designed for lock-free algorithm verification. It tracks explicit sync primitives (mutex, channels) but cannot observe happens-before relationships from atomic memory orderings.

Tests use two protection mechanisms:

  • Build tag //go:build !race excludes example files from race testing
  • Runtime check if lfq.RaceEnabled { t.Skip() } skips concurrent tests in lockfree_test.go

Run go test -race ./... for race-safe tests, or go test ./... for all tests.

Dependencies

Platform Support

Platform Status
linux/amd64 Primary
linux/arm64 Supported
linux/riscv64 Supported
linux/loong64 Supported
darwin/amd64, darwin/arm64 Supported
freebsd/amd64, freebsd/arm64 Supported

References

  • Nikolaev, R. (2019). A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue. arXiv, arXiv:1908.04511. https://arxiv.org/abs/1908.04511.
  • Lamport, L. (1974). A New Solution of Dijkstra's Concurrent Programming Problem. Communications of the ACM, 17(8), 453–455.
  • Vyukov, D. (2010). Bounded MPMC Queue. 1024cores.net. https://1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue.
  • Herlihy, M. (1991). Wait-Free Synchronization. ACM Transactions on Programming Languages and Systems, 13(1), 124–149.
  • Herlihy, M., & Wing, J. M. (1990). Linearizability: A Correctness Condition for Concurrent Objects. ACM Transactions on Programming Languages and Systems, 12(3), 463–492.
  • Michael, M. M., & Scott, M. L. (1996). Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms. In Proceedings of the 15th ACM Symposium on Principles of Distributed Computing (PODC '96), pp. 267–275.
  • Adve, S. V., & Gharachorloo, K. (1996). Shared Memory Consistency Models: A Tutorial. IEEE Computer, 29(12), 66–76.

License

MIT — see LICENSE.

©2026 Hayabusa Cloud Co., Ltd.

Documentation

Overview

Package lfq provides bounded FIFO queue implementations.

The package offers multiple queue variants optimized for different producer/consumer patterns:

  • SPSC: Single-Producer Single-Consumer
  • MPSC: Multi-Producer Single-Consumer
  • SPMC: Single-Producer Multi-Consumer
  • MPMC: Multi-Producer Multi-Consumer

Quick Start

Direct constructors (recommended for most cases):

q := lfq.NewSPSC[Event](1024)
q := lfq.NewMPMC[*Request](4096)

Builder API auto-selects algorithm based on constraints:

q := lfq.Build[Event](lfq.New(1024).SingleProducer().SingleConsumer())  // → SPSC
q := lfq.Build[Event](lfq.New(1024).SingleConsumer())                   // → MPSC
q := lfq.Build[Event](lfq.New(1024).SingleProducer())                   // → SPMC
q := lfq.Build[Event](lfq.New(1024))                                    // → MPMC

Basic Usage

All queues share the same interface for enqueueing and dequeueing:

// Create a queue
q := lfq.NewMPMC[int](1024)

// Enqueue (non-blocking)
err := q.Enqueue(new(42))
if lfq.IsWouldBlock(err) {
    // Queue is full - handle backpressure
}

// Dequeue (non-blocking)
elem, err := q.Dequeue()
if lfq.IsWouldBlock(err) {
    // Queue is empty - try again later
}

Common Patterns

Pipeline Stage (SPSC):

// Stage 1 → Queue → Stage 2
q := lfq.NewSPSC[Data](1024)

go func() { // Producer (Stage 1)
    backoff := iox.Backoff{}
    for data := range input {
        for q.Enqueue(&data) != nil {
            backoff.Wait()
        }
        backoff.Reset()
    }
}()

go func() { // Consumer (Stage 2)
    backoff := iox.Backoff{}
    for {
        data, err := q.Dequeue()
        if err != nil {
            backoff.Wait()
            continue
        }
        backoff.Reset()
        process(data)
    }
}()

Event Aggregation (MPSC):

// Multiple event sources → Single processor
q := lfq.NewMPSC[Event](4096)

// Multiple producers (event sources)
for sensor := range slices.Values(sensors) {
    go func(s Sensor) {
        for ev := range s.Events() {
            q.Enqueue(&ev)
        }
    }(sensor)
}

// Single consumer (aggregator)
go func() {
    for {
        ev, err := q.Dequeue()
        if err == nil {
            aggregate(ev)
        }
    }
}()

Work Distribution (SPMC):

// Single dispatcher → Multiple workers
q := lfq.NewSPMC[Task](1024)

// Single producer (dispatcher)
go func() {
    backoff := iox.Backoff{}
    for task := range tasks {
        for q.Enqueue(&task) != nil {
            backoff.Wait()
        }
        backoff.Reset()
    }
}()

// Multiple consumers (workers)
for range numWorkers {
    go func() {
        for {
            task, err := q.Dequeue()
            if err == nil {
                task.Execute()
            }
        }
    }()
}

Worker Pool (MPMC):

// Multiple submitters → Multiple workers
q := lfq.NewMPMC[Job](4096)

// Workers
for range numWorkers {
    go func() {
        for {
            job, err := q.Dequeue()
            if err == nil {
                job.Run()
            }
        }
    }()
}

// Submit jobs from anywhere
func Submit(j Job) error {
    return q.Enqueue(&j)
}

Queue Variants

Three queue flavors are available for different use cases:

Build[T]        - Generic type-safe queue for any type
BuildIndirect() - Queue for uintptr values (pool indices, handles)
BuildPtr()      - Queue for unsafe.Pointer (zero-copy pointer passing)

When to use Indirect:

// Buffer pool with index-based access
pool := make([][]byte, 1024)
freeList := lfq.NewSPSCIndirect(1024)

// Initialize free list with buffer indices
for i := range pool {
    pool[i] = make([]byte, 4096)
    freeList.Enqueue(uintptr(i))
}

// Allocate: get index from free list
idx, err := freeList.Dequeue()
buf := pool[idx]

// Free: return index to free list
freeList.Enqueue(idx)

When to use Ptr:

// Zero-copy object passing between goroutines
q := lfq.NewMPMCPtr(1024)

// Producer creates object once
msg := &Message{Data: largePayload}
q.Enqueue(unsafe.Pointer(msg))

// Consumer receives same pointer - no copy
ptr, _ := q.Dequeue()
msg := (*Message)(ptr)

Algorithm Selection

The builder selects algorithms based on constraints and Compact() hint:

Default (FAA-based, 2n slots for capacity n):

SPSC: Lamport ring buffer (n slots, already optimal)
MPSC: FAA producers, sequential consumer
SPMC: Sequential producer, FAA consumers
MPMC: FAA-based SCQ algorithm

With Compact() (CAS-based, n slots for capacity n):

SPSC: Same as default (already optimal)
MPSC: CAS producers, sequential consumer
SPMC: Sequential producer, CAS consumers
MPMC: Sequence-based algorithm

FAA (Fetch-And-Add) scales better under high contention but requires 2n physical slots. Use Compact() when memory efficiency is critical.

Type-safe builder functions enforce constraints at compile time:

BuildSPSC[T](b) → *SPSC[T]    // Requires SP + SC
BuildMPSC[T](b) → Queue[T]   // Requires SC only
BuildSPMC[T](b) → Queue[T]   // Requires SP only
BuildMPMC[T](b) → Queue[T]   // Requires no constraints

Performance Hints

Compact() selects CAS-based algorithms with n physical slots (vs 2n for FAA-based default). Use when memory efficiency is more important than contention scalability:

// Compact mode - CAS-based, n slots (works with all queue types)
q := lfq.Build[Event](lfq.New(4096).Compact())
q := lfq.New(4096).Compact().BuildIndirect()
q := lfq.New(4096).Compact().BuildPtr()

SPSC variants already use n slots (Lamport ring buffer) and ignore Compact().

Error Handling

Queues return ErrWouldBlock when operations cannot proceed. This error is sourced from code.hybscloud.com/iox for ecosystem consistency.

// Retry loop with backoff
backoff := iox.Backoff{}
for {
    err := q.Enqueue(&item)
    if err == nil {
        backoff.Reset()
        break
    }
    if !lfq.IsWouldBlock(err) {
        return err // Unexpected error
    }
    backoff.Wait()
}

For semantic error classification (delegates to iox):

lfq.IsWouldBlock(err)  // true if queue full/empty
lfq.IsSemantic(err)    // true if control flow signal
lfq.IsNonFailure(err)  // true if nil or ErrWouldBlock

Capacity and Length

Capacity rounds up to the next power of 2:

q := lfq.NewMPMC[int](3)     // Actual capacity: 4
q := lfq.NewMPMC[int](4)     // Actual capacity: 4
q := lfq.NewMPMC[int](1000)  // Actual capacity: 1024
q := lfq.NewMPMC[int](1024)  // Actual capacity: 1024

Minimum capacity is 2 (already a power of 2). Panic if capacity < 2.

Length is intentionally not provided because accurate counts in lock-free algorithms require expensive cross-core synchronization. Track counts in application logic when needed.

Thread Safety

All queue operations are thread-safe within their access pattern constraints:

  • SPSC: One producer goroutine, one consumer goroutine
  • MPSC: Multiple producer goroutines, one consumer goroutine
  • SPMC: One producer goroutine, multiple consumer goroutines
  • MPMC: Multiple producer and consumer goroutines

Violating these constraints (e.g., multiple producers on SPSC) causes undefined behavior including data corruption and races.

Graceful Shutdown

FAA-based queues (MPMC, SPMC, MPSC) include a threshold mechanism to prevent livelock. This mechanism may cause Dequeue to return ErrWouldBlock even when items remain, waiting for producer activity to reset the threshold.

For graceful shutdown scenarios where producers have finished but consumers need to drain remaining items, use the Drainer interface:

// Producer goroutines finish
prodWg.Wait()

// Signal no more enqueues will occur
if d, ok := q.(lfq.Drainer); ok {
    d.Drain()
}

// Consumers can now drain all remaining items
// without threshold blocking

After Drain is called, Dequeue skips threshold checks, allowing consumers to fully drain the queue. Drain is a hint — the caller must ensure no further Enqueue calls will be made.

SPSC queues do not implement Drainer as they have no threshold mechanism. The type assertion naturally handles this case.

Race Detection

Go's race detector is not designed for lock-free algorithm verification. The race detector tracks explicit synchronization primitives (mutex, channels, WaitGroup) but cannot observe happens-before relationships established through atomic memory orderings (acquire-release semantics).

Lock-free queues use sequence numbers with acquire-release semantics to protect non-atomic data fields. These algorithms are correct, but the race detector may report false positives because it cannot track synchronization provided by atomic operations on separate variables.

For lock-free algorithm correctness verification, use:

  • Formal verification tools (TLA+, SPIN)
  • Stress testing without race detector
  • Memory model analysis

Tests incompatible with race detection are excluded via //go:build !race.

Dependencies

This package uses code.hybscloud.com/iox for semantic errors, code.hybscloud.com/atomix for atomic primitives with explicit memory ordering, and code.hybscloud.com/spin for CPU pause instructions.

Example (Backpressure)

Example_backpressure demonstrates handling backpressure with a full queue.

package main

import (
	"fmt"

	"code.hybscloud.com/lfq"
)

func main() {
	// Small queue to demonstrate backpressure
	q := lfq.NewSPSC[int](3) // Cap()=4 with new semantics

	// Fill the queue
	filled := 0
	for i := 1; i <= 10; i++ {
		err := q.Enqueue(new(i))
		if err == nil {
			filled++
		} else if lfq.IsWouldBlock(err) {
			fmt.Printf("Backpressure at item %d (queue full)\n", i)
			break
		}
	}
	fmt.Printf("Filled %d items\n", filled)

	// Drain some items to make room
	for range 2 {
		v, _ := q.Dequeue()
		fmt.Printf("Drained: %d\n", v)
	}

	// Now we can enqueue more
	if q.Enqueue(new(100)) == nil {
		fmt.Println("Enqueued 100 after draining")
	}

}
Output:

Backpressure at item 5 (queue full)
Filled 4 items
Drained: 1
Drained: 2
Enqueued 100 after draining
Example (BatchProcessing)

Example_batchProcessing demonstrates collecting items into batches.

package main

import (
	"fmt"

	"code.hybscloud.com/lfq"
)

func main() {
	q := lfq.NewSPSC[int](64)

	// Single producer submits items sequentially
	for i := 1; i <= 9; i++ {
		q.Enqueue(new(i))
	}

	// Batch processing: collect up to batchSize items
	batchSize := 4
	batch := make([]int, 0, batchSize)
	batchNum := 0

	for {
		for len(batch) < batchSize {
			v, err := q.Dequeue()
			if err != nil {
				break
			}
			batch = append(batch, v)
		}

		if len(batch) == 0 {
			break
		}

		batchNum++
		fmt.Printf("Batch %d: %v\n", batchNum, batch)
		batch = batch[:0]
	}

}
Output:

Batch 1: [1 2 3 4]
Batch 2: [5 6 7 8]
Batch 3: [9]
Example (BufferPool)

Example_bufferPool demonstrates using Indirect queue for buffer pool management.

package main

import (
	"fmt"

	"code.hybscloud.com/lfq"
)

func main() {
	const poolSize = 4
	const bufSize = 64

	// Create buffer pool
	pool := make([][]byte, poolSize)
	for i := range pool {
		pool[i] = make([]byte, bufSize)
	}

	// Free list tracks available buffer indices
	freeList := lfq.NewSPSCIndirect(poolSize)
	freeCount := poolSize

	// Initialize: all buffers are free
	for i := range poolSize {
		freeList.Enqueue(uintptr(i))
	}

	// Allocate a buffer
	allocate := func() ([]byte, uintptr, bool) {
		idx, err := freeList.Dequeue()
		if err != nil {
			return nil, 0, false // Pool exhausted
		}
		freeCount--
		return pool[idx], idx, true
	}

	// Release a buffer back to pool
	release := func(idx uintptr) {
		freeList.Enqueue(idx)
		freeCount++
	}

	// Usage demonstration
	fmt.Printf("Free buffers: %d\n", freeCount)

	buf1, idx1, ok := allocate()
	if ok {
		copy(buf1, "hello")
		fmt.Printf("Allocated buffer %d, free: %d\n", idx1, freeCount)
	}

	buf2, idx2, ok := allocate()
	if ok {
		copy(buf2, "world")
		fmt.Printf("Allocated buffer %d, free: %d\n", idx2, freeCount)
	}

	release(idx1)
	fmt.Printf("Released buffer %d, free: %d\n", idx1, freeCount)

	release(idx2)
	fmt.Printf("Released buffer %d, free: %d\n", idx2, freeCount)

}
Output:

Free buffers: 4
Allocated buffer 0, free: 3
Allocated buffer 1, free: 2
Released buffer 0, free: 3
Released buffer 1, free: 4
Example (CompactMode)

Example_compactMode demonstrates compact mode for memory-constrained scenarios.

package main

import (
	"fmt"

	"code.hybscloud.com/lfq"
)

func main() {
	// Compact Indirect: 8 bytes per slot (vs 16 bytes standard)
	// Values limited to 63 bits
	q := lfq.New(1024).Compact().BuildIndirect()

	// Use for buffer pool indices where nil/zero is not needed
	for i := uintptr(1); i <= 5; i++ {
		q.Enqueue(i) // Values must be < (1 << 63)
	}

	fmt.Printf("Queue capacity: %d\n", q.Cap())

	// Dequeue until empty (ErrWouldBlock)
	for {
		idx, err := q.Dequeue()
		if lfq.IsWouldBlock(err) {
			break
		}
		fmt.Printf("Index: %d\n", idx)
	}

}
Output:

Queue capacity: 1024
Index: 1
Index: 2
Index: 3
Index: 4
Index: 5
Example (Pipeline)

Example_pipeline demonstrates a multi-stage pipeline using SPSC queues.

package main

import (
	"fmt"
	"sync"

	"code.hybscloud.com/iox"
	"code.hybscloud.com/lfq"
)

func main() {
	// Pipeline: Generate → Double → Print
	stage1to2 := lfq.NewSPSC[int](8) // Generate → Double
	stage2to3 := lfq.NewSPSC[int](8) // Double → Print

	var wg sync.WaitGroup
	results := make([]int, 0, 5)
	var mu sync.Mutex

	// Stage 1: Generate numbers 1-5
	wg.Add(1)
	go func() {
		defer wg.Done()
		backoff := iox.Backoff{}
		for i := 1; i <= 5; i++ {
			v := i
			for stage1to2.Enqueue(&v) != nil {
				backoff.Wait()
			}
			backoff.Reset()
		}
	}()

	// Stage 2: Double each number
	wg.Add(1)
	go func() {
		defer wg.Done()
		backoffDeq := iox.Backoff{}
		backoffEnq := iox.Backoff{}
		processed := 0
		for processed < 5 {
			v, err := stage1to2.Dequeue()
			if err != nil {
				backoffDeq.Wait()
				continue
			}
			backoffDeq.Reset()
			doubled := v * 2
			for stage2to3.Enqueue(&doubled) != nil {
				backoffEnq.Wait()
			}
			backoffEnq.Reset()
			processed++
		}
	}()

	// Stage 3: Collect results
	wg.Add(1)
	go func() {
		defer wg.Done()
		backoff := iox.Backoff{}
		for len(results) < 5 {
			v, err := stage2to3.Dequeue()
			if err != nil {
				backoff.Wait()
				continue
			}
			backoff.Reset()
			mu.Lock()
			results = append(results, v)
			mu.Unlock()
		}
	}()

	wg.Wait()

	for i, v := range results {
		fmt.Printf("Stage output %d: %d\n", i, v)
	}

}
Output:

Stage output 0: 2
Stage output 1: 4
Stage output 2: 6
Stage output 3: 8
Stage output 4: 10
Example (WorkerPool)

Example_workerPool demonstrates a worker pool pattern using MPMC.

package main

import (
	"fmt"
	"sync"

	"code.hybscloud.com/atomix"
	"code.hybscloud.com/iox"
	"code.hybscloud.com/lfq"
)

func main() {
	type Job struct {
		ID     int
		Input  int
		Result int
	}

	// Job queue and results
	jobs := lfq.NewMPMC[Job](16)
	results := make([]int, 5)
	var wg sync.WaitGroup
	var completed atomix.Int32

	// Start 3 workers
	for w := range 3 {
		wg.Add(1)
		go func(workerID int) {
			defer wg.Done()
			backoff := iox.Backoff{}
			for completed.Load() < 5 {
				job, err := jobs.Dequeue()
				if err != nil {
					backoff.Wait()
					continue
				}
				backoff.Reset()
				// Process job: square the input
				job.Result = job.Input * job.Input
				results[job.ID] = job.Result
				completed.Add(1)
			}
		}(w)
	}

	// Submit 5 jobs
	backoff := iox.Backoff{}
	for i := range 5 {
		job := Job{ID: i, Input: i + 1}
		for jobs.Enqueue(&job) != nil {
			backoff.Wait()
		}
		backoff.Reset()
	}

	wg.Wait()

	// Print results
	for i, r := range results {
		fmt.Printf("Job %d: %d² = %d\n", i, i+1, r)
	}

}
Output:

Job 0: 1² = 1
Job 1: 2² = 4
Job 2: 3² = 9
Job 3: 4² = 16
Job 4: 5² = 25

Index

Examples

Constants

View Source
const RaceEnabled = false

RaceEnabled is false when the race detector is not active.

Variables

View Source
var ErrWouldBlock = iox.ErrWouldBlock

ErrWouldBlock indicates the operation cannot proceed immediately.

For Enqueue: the queue is full (backpressure) For Dequeue: the queue is empty (no data available)

ErrWouldBlock is a control flow signal, not a failure. The caller should retry the operation later (with backoff or yield) rather than propagating the error.

This is an alias for iox.ErrWouldBlock for ecosystem consistency.

Example:

backoff := iox.Backoff{}
for {
    err := q.Enqueue(&item)
    if err == nil {
        backoff.Reset()
        break
    }
    if lfq.IsWouldBlock(err) {
        backoff.Wait()  // Adaptive backpressure
        continue
    }
    return err  // Unexpected error
}

Functions

func IsNonFailure

func IsNonFailure(err error) bool

IsNonFailure reports whether err represents a non-failure condition. Returns true for nil, ErrWouldBlock, or ErrMore. Delegates to iox.IsNonFailure.

func IsSemantic

func IsSemantic(err error) bool

IsSemantic reports whether err is a control flow signal (not a failure). Delegates to iox.IsSemantic.

func IsWouldBlock

func IsWouldBlock(err error) bool

IsWouldBlock reports whether err indicates the operation would block. Delegates to iox.IsWouldBlock for wrapped error support.

Example

ExampleIsWouldBlock demonstrates error handling patterns.

package main

import (
	"fmt"

	"code.hybscloud.com/lfq"
)

func main() {
	q := lfq.NewSPSC[int](2) // Cap()=2

	// Fill the queue
	q.Enqueue(new(1))
	q.Enqueue(new(2))

	// Queue is full
	err := q.Enqueue(new(5))
	if lfq.IsWouldBlock(err) {
		fmt.Println("Queue full - applying backpressure")
	}

	// Drain the queue
	q.Dequeue()
	q.Dequeue()

	// Queue is empty
	_, err = q.Dequeue()
	if lfq.IsWouldBlock(err) {
		fmt.Println("Queue empty - no data available")
	}

}
Output:

Queue full - applying backpressure
Queue empty - no data available

Types

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder creates queues with fluent configuration.

Builder provides a fluent API for configuring and creating queues. The builder automatically selects the algorithm based on producer/consumer constraints and performance hints.

Example:

// SPSC queue (optimal for single producer/consumer)
q := lfq.BuildSPSC[Event](lfq.New(1024).SingleProducer().SingleConsumer())

// MPMC queue (default, general purpose)
q := lfq.BuildMPMC[Request](lfq.New(4096))

// Compact indirect queue for memory efficiency
q := lfq.New(8192).Compact().BuildIndirect()

func New

func New(capacity int) *Builder

New creates a queue builder with the given capacity.

Capacity rounds up to the next power of 2. For example, capacity=4 results in actual capacity=4, capacity=1000 results in actual capacity=1024.

Panics if capacity < 2.

Example:

// Create builder, then configure and build
b := lfq.New(1024)
q := lfq.BuildSPSC[int](b.SingleProducer().SingleConsumer())

// Or chain directly
q := lfq.BuildMPMC[int](lfq.New(1024))

func (*Builder) BuildIndirect

func (b *Builder) BuildIndirect() QueueIndirect

BuildIndirect creates a QueueIndirect for uintptr values.

Algorithm selection:

  • SPSC (SingleProducer + SingleConsumer) → Lamport ring buffer
  • Compact() → CAS-based algorithms (n slots, values limited to 63 bits)
  • Default → FAA-based algorithms (2n slots)

func (*Builder) BuildIndirectMPMC

func (b *Builder) BuildIndirectMPMC() QueueIndirect

BuildIndirectMPMC creates an MPMC queue for uintptr values. Panics if builder has any constraints set.

func (*Builder) BuildIndirectMPSC

func (b *Builder) BuildIndirectMPSC() QueueIndirect

BuildIndirectMPSC creates an MPSC queue for uintptr values. Panics if builder is not configured with SingleConsumer() only.

func (*Builder) BuildIndirectSPMC

func (b *Builder) BuildIndirectSPMC() QueueIndirect

BuildIndirectSPMC creates an SPMC queue for uintptr values. Panics if builder is not configured with SingleProducer() only.

func (*Builder) BuildIndirectSPSC

func (b *Builder) BuildIndirectSPSC() *SPSCIndirect

BuildIndirectSPSC creates an SPSC queue for uintptr values.

func (*Builder) BuildPtr

func (b *Builder) BuildPtr() QueuePtr

BuildPtr creates a QueuePtr for unsafe.Pointer values.

Algorithm selection:

  1. SPSC (SingleProducer + SingleConsumer) → Lamport ring buffer
  2. Other configurations → FAA default (2n slots), CAS if Compact (n slots)

Default: FAA-based algorithms with 2n physical slots (better scalability). Compact(): CAS-based algorithms with n slots (half memory footprint).

func (*Builder) BuildPtrMPMC

func (b *Builder) BuildPtrMPMC() QueuePtr

BuildPtrMPMC creates an MPMC queue for unsafe.Pointer values. Panics if builder has any constraints set.

func (*Builder) BuildPtrMPSC

func (b *Builder) BuildPtrMPSC() QueuePtr

BuildPtrMPSC creates an MPSC queue for unsafe.Pointer values. Panics if builder is not configured with SingleConsumer() only.

func (*Builder) BuildPtrSPMC

func (b *Builder) BuildPtrSPMC() QueuePtr

BuildPtrSPMC creates an SPMC queue for unsafe.Pointer values. Panics if builder is not configured with SingleProducer() only.

func (*Builder) BuildPtrSPSC

func (b *Builder) BuildPtrSPSC() *SPSCPtr

BuildPtrSPSC creates an SPSC queue for unsafe.Pointer values. Panics if builder is not configured with SingleProducer().SingleConsumer().

func (*Builder) Compact

func (b *Builder) Compact() *Builder

Compact selects CAS-based algorithms with n physical slots instead of FAA-based algorithms with 2n slots.

Trade-off: Half memory usage, reduced scalability under high contention.

SPSC already uses n slots (Lamport ring buffer) and ignores Compact().

func (*Builder) SingleConsumer

func (b *Builder) SingleConsumer() *Builder

SingleConsumer declares that only one goroutine will dequeue. Enables optimized algorithms for SPSC or MPSC patterns.

func (*Builder) SingleProducer

func (b *Builder) SingleProducer() *Builder

SingleProducer declares that only one goroutine will enqueue. Enables optimized algorithms for SPSC or SPMC patterns.

type Consumer

type Consumer[T any] interface {
	// Dequeue removes and returns an element from the queue (non-blocking).
	// Returns the dequeued element on success.
	// Returns (zero-value, ErrWouldBlock) if the queue is empty.
	//
	// Thread safety depends on queue type:
	//   - SPSC: single consumer only
	//   - MPSC: single consumer only
	//   - SPMC/MPMC: multiple consumers safe
	Dequeue() (T, error)
}

Consumer is the interface for dequeueing elements.

Consumer provides non-blocking dequeue operations. The element is returned by value (copied from the queue's internal buffer). The original slot is cleared to allow garbage collection of referenced objects.

For large types (>512 bytes), consider using QueuePtr or QueueIndirect instead to avoid copy overhead.

type ConsumerIndirect

type ConsumerIndirect interface {
	// Dequeue removes and returns an element from the queue.
	// Returns (0, ErrWouldBlock) immediately if the queue is empty.
	Dequeue() (uintptr, error)
}

ConsumerIndirect dequeues uintptr values (non-blocking).

type ConsumerPtr

type ConsumerPtr interface {
	// Dequeue removes and returns an element from the queue.
	// Returns (nil, ErrWouldBlock) immediately if the queue is empty.
	Dequeue() (unsafe.Pointer, error)
}

ConsumerPtr dequeues unsafe.Pointer values (non-blocking).

type Drainer added in v0.1.1

type Drainer interface {
	// Drain signals that no more enqueues will occur.
	// After Drain is called, Dequeue skips threshold checks, allowing
	// consumers to drain all remaining items without producer pressure.
	//
	// Drain is a hint — the caller must ensure no further Enqueue calls
	// will be made after calling Drain.
	Drain()
}

Drainer signals that no more enqueues will occur.

FAA-based queues (MPMC, SPMC, MPSC) implement this interface. SPSC queues do not implement Drainer as they have no threshold mechanism.

Call Drain after all producers have finished to allow consumers to drain remaining items without threshold blocking.

Example:

prodWg.Wait()  // Wait for producers to finish
if d, ok := q.(lfq.Drainer); ok {
    d.Drain()
}
// Consumers can now drain all remaining items

type MPMC

type MPMC[T any] struct {
	// contains filtered or unexported fields
}

MPMC is an FAA-based multi-producer multi-consumer bounded queue.

Based on the SCQ (Scalable Circular Queue) algorithm by Nikolaev (DISC 2019). Uses Fetch-And-Add to blindly increment position counters, requiring 2n physical slots for capacity n. This approach scales better under high contention compared to CAS-based alternatives.

Cycle-based slot validation provides ABA safety: each slot tracks which "cycle" (round) it belongs to via cycle = position / capacity.

Memory: 2n slots for capacity n (16+ bytes per slot)

func NewMPMC

func NewMPMC[T any](capacity int) *MPMC[T]

NewMPMC creates a new FAA-based MPMC queue. Capacity rounds up to the next power of 2. Physical slot count is 2n for capacity n (SCQ requirement).

Example

ExampleNewMPMC demonstrates a multi-producer multi-consumer queue.

package main

import (
	"fmt"
	"sync"

	"code.hybscloud.com/iox"
	"code.hybscloud.com/lfq"
)

func main() {
	q := lfq.NewMPMC[string](16)

	// Producers
	var wg sync.WaitGroup
	for p := range 3 {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			backoff := iox.Backoff{}
			msg := fmt.Sprintf("msg from producer %d", id)
			for q.Enqueue(&msg) != nil {
				backoff.Wait()
			}
		}(p)
	}

	// Wait for producers then consume
	wg.Wait()

	for {
		msg, err := q.Dequeue()
		if err != nil {
			break
		}
		fmt.Println(msg)
	}

}
Output:

msg from producer 0
msg from producer 1
msg from producer 2

func (*MPMC[T]) Cap

func (q *MPMC[T]) Cap() int

Cap returns the queue capacity.

func (*MPMC[T]) Dequeue

func (q *MPMC[T]) Dequeue() (T, error)

Dequeue removes and returns an element from the queue. Returns (zero-value, ErrWouldBlock) if the queue is empty.

func (*MPMC[T]) Drain added in v0.1.1

func (q *MPMC[T]) Drain()

Drain signals that no more enqueues will occur. After Drain is called, Dequeue skips the threshold check to allow consumers to drain all remaining items without producer pressure.

func (*MPMC[T]) Enqueue

func (q *MPMC[T]) Enqueue(elem *T) error

Enqueue adds an element to the queue. Returns ErrWouldBlock if the queue is full.

func (*MPMC[T]) Init added in v0.1.3

func (q *MPMC[T]) Init(capacity int)

Init initializes a zero-value MPMC queue in place. Capacity rounds up to the next power of 2.

type MPMCCompactIndirect

type MPMCCompactIndirect struct {
	// contains filtered or unexported fields
}

MPMCCompactIndirect is a compact MPMC queue for uintptr values.

Uses round-based empty detection: empty slots store (emptyFlag | round), filled slots store the value directly. This achieves 8 bytes per slot while allowing any 63-bit value (including zero) to be enqueued.

Memory: 8 bytes per slot

func NewMPMCCompactIndirect

func NewMPMCCompactIndirect(capacity int) *MPMCCompactIndirect

NewMPMCCompactIndirect creates a new compact MPMC queue. Capacity rounds up to the next power of 2. Values are limited to 63 bits (high bit reserved for empty flag).

func (*MPMCCompactIndirect) Cap

func (q *MPMCCompactIndirect) Cap() int

Cap returns the queue capacity.

func (*MPMCCompactIndirect) Dequeue

func (q *MPMCCompactIndirect) Dequeue() (uintptr, error)

Dequeue removes and returns a value from the queue. Returns (0, ErrWouldBlock) if the queue is empty.

func (*MPMCCompactIndirect) Enqueue

func (q *MPMCCompactIndirect) Enqueue(elem uintptr) error

Enqueue adds a value to the queue. Returns ErrWouldBlock if the queue is full. Values must fit in 63 bits (high bit must be 0).

func (*MPMCCompactIndirect) Init added in v0.1.3

func (q *MPMCCompactIndirect) Init(capacity int)

Init initializes a zero-value MPMCCompactIndirect queue in place. Capacity rounds up to the next power of 2.

type MPMCIndirect

type MPMCIndirect struct {
	// contains filtered or unexported fields
}

MPMCIndirect is an FAA-based MPMC queue for uintptr values.

Uses 128-bit atomic operations to pack cycle and value into a single atomic entry, reducing atomics per operation from 2-3 to 1. Based on SCQ algorithm (Nikolaev, DISC 2019) with 2n slots for capacity n.

Entry format: [lo=cycle | hi=value]

Memory: 2n slots, 16 bytes per slot (cycle + value in single Uint128)

func NewMPMCIndirect

func NewMPMCIndirect(capacity int) *MPMCIndirect

NewMPMCIndirect creates a new FAA-based MPMC queue for uintptr values. Capacity rounds up to the next power of 2. Physical slot count is 2n for capacity n.

Example

ExampleNewMPMCIndirect demonstrates pool index passing.

package main

import (
	"fmt"

	"code.hybscloud.com/lfq"
)

func main() {
	// Simulate a buffer pool
	bufferPool := make([][]byte, 4)
	for i := range bufferPool {
		bufferPool[i] = make([]byte, 1024)
	}

	// Queue passes indices, not buffers
	q := lfq.NewMPMCIndirect(8)

	// Producer "allocates" from pool by passing index
	for i := range len(bufferPool) {
		q.Enqueue(uintptr(i))
	}

	// Consumer retrieves indices and accesses pool
	for range len(bufferPool) {
		idx, _ := q.Dequeue()
		buf := bufferPool[idx]
		fmt.Printf("Got buffer %d with len %d\n", idx, len(buf))
	}

}
Output:

Got buffer 0 with len 1024
Got buffer 1 with len 1024
Got buffer 2 with len 1024
Got buffer 3 with len 1024

func (*MPMCIndirect) Cap

func (q *MPMCIndirect) Cap() int

Cap returns the queue capacity.

func (*MPMCIndirect) Dequeue

func (q *MPMCIndirect) Dequeue() (uintptr, error)

Dequeue removes and returns an element from the queue. Returns (0, ErrWouldBlock) if the queue is empty.

func (*MPMCIndirect) Drain added in v0.1.1

func (q *MPMCIndirect) Drain()

Drain signals that no more enqueues will occur. After Drain is called, Dequeue skips the threshold check to allow consumers to drain all remaining items without producer pressure.

func (*MPMCIndirect) Enqueue

func (q *MPMCIndirect) Enqueue(elem uintptr) error

Enqueue adds an element to the queue. Returns ErrWouldBlock if the queue is full.

func (*MPMCIndirect) Init added in v0.1.3

func (q *MPMCIndirect) Init(capacity int)

Init initializes a zero-value MPMCIndirect queue in place. Capacity rounds up to the next power of 2.

type MPMCIndirectSeq

type MPMCIndirectSeq struct {
	// contains filtered or unexported fields
}

MPMCIndirectSeq is a CAS-based MPMC queue for uintptr values.

Uses 128-bit atomic operations to pack sequence and value into a single atomic entry, reducing atomics per operation from 2-3 to 1.

This is the Compact variant using n slots. Use NewMPMCIndirect for the default FAA-based implementation with 2n slots and better scalability.

Entry format: [lo=sequence | hi=value]

Memory: n slots, 16 bytes per slot

func NewMPMCIndirectSeq

func NewMPMCIndirectSeq(capacity int) *MPMCIndirectSeq

NewMPMCIndirectSeq creates a new CAS-based MPMC queue for uintptr values. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewMPMCIndirect for the default FAA-based implementation.

func (*MPMCIndirectSeq) Cap

func (q *MPMCIndirectSeq) Cap() int

Cap returns the queue capacity.

func (*MPMCIndirectSeq) Dequeue

func (q *MPMCIndirectSeq) Dequeue() (uintptr, error)

Dequeue removes and returns an element from the queue. Returns (0, ErrWouldBlock) if the queue is empty.

func (*MPMCIndirectSeq) Enqueue

func (q *MPMCIndirectSeq) Enqueue(elem uintptr) error

Enqueue adds an element to the queue. Returns ErrWouldBlock if the queue is full.

func (*MPMCIndirectSeq) Init added in v0.1.3

func (q *MPMCIndirectSeq) Init(capacity int)

Init initializes a zero-value MPMCIndirectSeq queue in place. Capacity rounds up to the next power of 2.

type MPMCPtr

type MPMCPtr struct {
	// contains filtered or unexported fields
}

MPMCPtr is an FAA-based MPMC queue for unsafe.Pointer values.

Uses 128-bit atomic operations to pack cycle and pointer into a single atomic entry. Based on SCQ algorithm with 2n slots for capacity n.

Entry format: [lo=cycle | hi=pointer as uint64]

Memory: 2n slots, 16 bytes per slot

func NewMPMCPtr

func NewMPMCPtr(capacity int) *MPMCPtr

NewMPMCPtr creates a new FAA-based MPMC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.

Example

ExampleNewMPMCPtr demonstrates zero-copy pointer passing.

package main

import (
	"fmt"
	"slices"
	"unsafe"

	"code.hybscloud.com/lfq"
)

func main() {
	type Message struct {
		ID   int
		Data string
	}

	q := lfq.NewMPMCPtr(8)

	// Producer creates and enqueues messages
	messages := []*Message{
		{ID: 1, Data: "hello"},
		{ID: 2, Data: "world"},
	}

	for msg := range slices.Values(messages) {
		q.Enqueue(unsafe.Pointer(msg))
	}

	// Consumer receives pointers directly - no copy
	for {
		ptr, err := q.Dequeue()
		if err != nil {
			break
		}
		msg := (*Message)(ptr)
		fmt.Printf("Message %d: %s\n", msg.ID, msg.Data)
	}

}
Output:

Message 1: hello
Message 2: world

func (*MPMCPtr) Cap

func (q *MPMCPtr) Cap() int

Cap returns the queue capacity.

func (*MPMCPtr) Dequeue

func (q *MPMCPtr) Dequeue() (unsafe.Pointer, error)

Dequeue removes and returns an element from the queue. Returns (nil, ErrWouldBlock) if the queue is empty.

func (*MPMCPtr) Drain added in v0.1.1

func (q *MPMCPtr) Drain()

Drain signals that no more enqueues will occur. After Drain is called, Dequeue skips the threshold check to allow consumers to drain all remaining items without producer pressure.

func (*MPMCPtr) Enqueue

func (q *MPMCPtr) Enqueue(elem unsafe.Pointer) error

Enqueue adds an element to the queue. Returns ErrWouldBlock if the queue is full.

func (*MPMCPtr) Init added in v0.1.3

func (q *MPMCPtr) Init(capacity int)

Init initializes a zero-value MPMCPtr queue in place. Capacity rounds up to the next power of 2.

type MPMCPtrSeq

type MPMCPtrSeq struct {
	// contains filtered or unexported fields
}

MPMCPtrSeq is a CAS-based MPMC queue for unsafe.Pointer values.

Uses 128-bit atomic operations to pack sequence and pointer into a single atomic entry, reducing atomics per operation from 2-3 to 1.

This is the Compact variant using n slots. Use NewMPMCPtr for the default FAA-based implementation with 2n slots and better scalability.

Entry format: [lo=sequence | hi=pointer as uint64]

Memory: n slots, 16 bytes per slot

func NewMPMCPtrSeq

func NewMPMCPtrSeq(capacity int) *MPMCPtrSeq

NewMPMCPtrSeq creates a new CAS-based MPMC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewMPMCPtr for the default FAA-based implementation.

func (*MPMCPtrSeq) Cap

func (q *MPMCPtrSeq) Cap() int

Cap returns the queue capacity.

func (*MPMCPtrSeq) Dequeue

func (q *MPMCPtrSeq) Dequeue() (unsafe.Pointer, error)

Dequeue removes and returns an element from the queue. Returns (nil, ErrWouldBlock) if the queue is empty.

func (*MPMCPtrSeq) Enqueue

func (q *MPMCPtrSeq) Enqueue(elem unsafe.Pointer) error

Enqueue adds an element to the queue. Returns ErrWouldBlock if the queue is full.

func (*MPMCPtrSeq) Init added in v0.1.3

func (q *MPMCPtrSeq) Init(capacity int)

Init initializes a zero-value MPMCPtrSeq queue in place. Capacity rounds up to the next power of 2.

type MPMCSeq

type MPMCSeq[T any] struct {
	// contains filtered or unexported fields
}

MPMCSeq is a CAS-based multi-producer multi-consumer bounded queue.

Uses per-slot sequence numbers which provide:

  • Full ABA safety via sequence-based validation
  • Works with both distinct and non-distinct values
  • Good performance under moderate contention

This is the Compact variant using n slots (vs 2n for FAA-based default). Use NewMPMC for the default FAA-based implementation with better scalability.

Memory: n slots (16+ bytes per slot)

func NewMPMCSeq

func NewMPMCSeq[T any](capacity int) *MPMCSeq[T]

NewMPMCSeq creates a new CAS-based MPMC queue. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewMPMC for the default FAA-based implementation.

func (*MPMCSeq[T]) Cap

func (q *MPMCSeq[T]) Cap() int

Cap returns the queue capacity.

func (*MPMCSeq[T]) Dequeue

func (q *MPMCSeq[T]) Dequeue() (T, error)

Dequeue removes and returns an element from the queue. Returns (zero-value, ErrWouldBlock) if the queue is empty.

func (*MPMCSeq[T]) Enqueue

func (q *MPMCSeq[T]) Enqueue(elem *T) error

Enqueue adds an element to the queue. Returns ErrWouldBlock if the queue is full.

func (*MPMCSeq[T]) Init added in v0.1.3

func (q *MPMCSeq[T]) Init(capacity int)

Init initializes a zero-value MPMCSeq queue in place. Capacity rounds up to the next power of 2.

type MPSC

type MPSC[T any] struct {
	// contains filtered or unexported fields
}

MPSC is an FAA-based multi-producer single-consumer bounded queue.

Producers use FAA to blindly claim positions (SCQ-style), requiring 2n physical slots for capacity n.

Memory: 2n slots for capacity n (16+ bytes per slot)

Example (EventAggregation)

ExampleMPSC_eventAggregation demonstrates using MPSC for event aggregation.

package main

import (
	"fmt"
	"slices"
	"sync"

	"code.hybscloud.com/atomix"
	"code.hybscloud.com/iox"
	"code.hybscloud.com/lfq"
)

func main() {
	type Event struct {
		Source string
		Value  int
	}

	q := lfq.NewMPSC[Event](64)

	// Multiple event sources (producers)
	var wg sync.WaitGroup
	var total atomix.Int64

	for source := range slices.Values([]string{"sensor-A", "sensor-B", "sensor-C"}) {
		wg.Add(1)
		go func(name string) {
			defer wg.Done()
			backoff := iox.Backoff{}
			for i := 1; i <= 3; i++ {
				ev := Event{Source: name, Value: i}
				for q.Enqueue(&ev) != nil {
					backoff.Wait()
				}
				backoff.Reset()
				total.Add(1)
			}
		}(source)
	}

	// Wait for producers
	wg.Wait()

	// Single consumer aggregates all events
	var sum int
	for {
		ev, err := q.Dequeue()
		if err != nil {
			break
		}
		sum += ev.Value
	}

	fmt.Printf("Total events: %d, Sum of values: %d\n", total.Load(), sum)

}
Output:

Total events: 9, Sum of values: 18

func NewMPSC

func NewMPSC[T any](capacity int) *MPSC[T]

NewMPSC creates a new FAA-based MPSC queue. Capacity rounds up to the next power of 2.

func (*MPSC[T]) Cap

func (q *MPSC[T]) Cap() int

Cap returns the queue capacity.

func (*MPSC[T]) Dequeue

func (q *MPSC[T]) Dequeue() (T, error)

Dequeue removes and returns an element (single consumer only). Returns (zero-value, ErrWouldBlock) if the queue is empty.

func (*MPSC[T]) Drain added in v0.1.1

func (q *MPSC[T]) Drain()

Drain signals that no more enqueues will occur. This is a hint for graceful shutdown — the caller ensures no further enqueues will be attempted after calling Drain.

func (*MPSC[T]) Enqueue

func (q *MPSC[T]) Enqueue(elem *T) error

Enqueue adds an element to the queue (multiple producers safe). Returns ErrWouldBlock if the queue is full.

func (*MPSC[T]) Init added in v0.1.3

func (q *MPSC[T]) Init(capacity int)

Init initializes a zero-value MPSC queue in place. Capacity rounds up to the next power of 2.

type MPSCCompactIndirect

type MPSCCompactIndirect struct {
	// contains filtered or unexported fields
}

MPSCCompactIndirect is a compact MPSC queue for uintptr values.

Uses round-based empty detection. Multiple producers use CAS, single consumer reads sequentially.

Memory: 8 bytes per slot

func NewMPSCCompactIndirect

func NewMPSCCompactIndirect(capacity int) *MPSCCompactIndirect

NewMPSCCompactIndirect creates a new compact MPSC queue. Capacity rounds up to the next power of 2. Values are limited to 63 bits (high bit reserved for empty flag).

func (*MPSCCompactIndirect) Cap

func (q *MPSCCompactIndirect) Cap() int

Cap returns queue capacity.

func (*MPSCCompactIndirect) Dequeue

func (q *MPSCCompactIndirect) Dequeue() (uintptr, error)

Dequeue removes and returns a value (single consumer only). Returns (0, ErrWouldBlock) if the queue is empty.

func (*MPSCCompactIndirect) Enqueue

func (q *MPSCCompactIndirect) Enqueue(elem uintptr) error

Enqueue adds a value (multiple producers safe). Values must fit in 63 bits.

func (*MPSCCompactIndirect) Init added in v0.1.3

func (q *MPSCCompactIndirect) Init(capacity int)

Init initializes a zero-value MPSCCompactIndirect queue in place. Capacity rounds up to the next power of 2.

type MPSCIndirect

type MPSCIndirect struct {
	// contains filtered or unexported fields
}

MPSCIndirect is an FAA-based MPSC queue for uintptr values.

Uses 128-bit atomic operations to pack cycle and value into a single atomic entry. Based on SCQ algorithm with 2n slots for capacity n.

Entry format: [lo=cycle | hi=value]

Memory: 2n slots, 16 bytes per slot

func NewMPSCIndirect

func NewMPSCIndirect(capacity int) *MPSCIndirect

NewMPSCIndirect creates a new FAA-based MPSC queue for uintptr values. Capacity rounds up to the next power of 2.

func (*MPSCIndirect) Cap

func (q *MPSCIndirect) Cap() int

Cap returns the queue capacity.

func (*MPSCIndirect) Dequeue

func (q *MPSCIndirect) Dequeue() (uintptr, error)

Dequeue removes and returns an element (single consumer only). Returns (0, ErrWouldBlock) if the queue is empty.

func (*MPSCIndirect) Drain added in v0.1.1

func (q *MPSCIndirect) Drain()

Drain signals that no more enqueues will occur. This is a hint for graceful shutdown — the caller ensures no further enqueues will be attempted after calling Drain.

func (*MPSCIndirect) Enqueue

func (q *MPSCIndirect) Enqueue(elem uintptr) error

Enqueue adds an element to the queue (multiple producers safe). Returns ErrWouldBlock if the queue is full.

func (*MPSCIndirect) Init added in v0.1.3

func (q *MPSCIndirect) Init(capacity int)

Init initializes a zero-value MPSCIndirect queue in place. Capacity rounds up to the next power of 2.

type MPSCIndirectSeq

type MPSCIndirectSeq struct {
	// contains filtered or unexported fields
}

MPSCIndirectSeq is a multi-producer single-consumer queue for uintptr values.

Producers use CAS to claim slots. The single consumer reads sequentially.

Entry format: [lo=sequence | hi=value]

Memory: 16 bytes per slot (sequence + value in single Uint128)

func NewMPSCIndirectSeq

func NewMPSCIndirectSeq(capacity int) *MPSCIndirectSeq

NewMPSCIndirectSeq creates a new MPSC queue for uintptr values. Capacity rounds up to the next power of 2.

func (*MPSCIndirectSeq) Cap

func (q *MPSCIndirectSeq) Cap() int

Cap returns the queue capacity.

func (*MPSCIndirectSeq) Dequeue

func (q *MPSCIndirectSeq) Dequeue() (uintptr, error)

Dequeue removes and returns an element (single consumer only). Returns (0, ErrWouldBlock) if the queue is empty.

func (*MPSCIndirectSeq) Enqueue

func (q *MPSCIndirectSeq) Enqueue(elem uintptr) error

Enqueue adds an element to the queue (multiple producers safe). Returns ErrWouldBlock if the queue is full.

func (*MPSCIndirectSeq) Init added in v0.1.3

func (q *MPSCIndirectSeq) Init(capacity int)

Init initializes a zero-value MPSCIndirectSeq queue in place. Capacity rounds up to the next power of 2.

type MPSCPtr

type MPSCPtr struct {
	// contains filtered or unexported fields
}

MPSCPtr is an FAA-based MPSC queue for unsafe.Pointer values.

Uses 128-bit atomic operations. Based on SCQ algorithm with 2n slots.

Entry format: [lo=cycle | hi=pointer as uint64]

Memory: 2n slots, 16 bytes per slot

func NewMPSCPtr

func NewMPSCPtr(capacity int) *MPSCPtr

NewMPSCPtr creates a new FAA-based MPSC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.

func (*MPSCPtr) Cap

func (q *MPSCPtr) Cap() int

Cap returns the queue capacity.

func (*MPSCPtr) Dequeue

func (q *MPSCPtr) Dequeue() (unsafe.Pointer, error)

Dequeue removes and returns an element (single consumer only). Returns (nil, ErrWouldBlock) if the queue is empty.

func (*MPSCPtr) Drain added in v0.1.1

func (q *MPSCPtr) Drain()

Drain signals that no more enqueues will occur. This is a hint for graceful shutdown — the caller ensures no further enqueues will be attempted after calling Drain.

func (*MPSCPtr) Enqueue

func (q *MPSCPtr) Enqueue(elem unsafe.Pointer) error

Enqueue adds an element to the queue (multiple producers safe). Returns ErrWouldBlock if the queue is full.

func (*MPSCPtr) Init added in v0.1.3

func (q *MPSCPtr) Init(capacity int)

Init initializes a zero-value MPSCPtr queue in place. Capacity rounds up to the next power of 2.

type MPSCPtrSeq

type MPSCPtrSeq struct {
	// contains filtered or unexported fields
}

MPSCPtrSeq is a multi-producer single-consumer queue for unsafe.Pointer values.

Entry format: [lo=sequence | hi=pointer as uint64]

Memory: 16 bytes per slot

func NewMPSCPtrSeq

func NewMPSCPtrSeq(capacity int) *MPSCPtrSeq

NewMPSCPtrSeq creates a new MPSC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.

func (*MPSCPtrSeq) Cap

func (q *MPSCPtrSeq) Cap() int

Cap returns the queue capacity.

func (*MPSCPtrSeq) Dequeue

func (q *MPSCPtrSeq) Dequeue() (unsafe.Pointer, error)

Dequeue removes and returns an element (single consumer only). Returns (nil, ErrWouldBlock) if the queue is empty.

func (*MPSCPtrSeq) Enqueue

func (q *MPSCPtrSeq) Enqueue(elem unsafe.Pointer) error

Enqueue adds an element (multiple producers safe). Returns ErrWouldBlock if the queue is full.

func (*MPSCPtrSeq) Init added in v0.1.3

func (q *MPSCPtrSeq) Init(capacity int)

Init initializes a zero-value MPSCPtrSeq queue in place. Capacity rounds up to the next power of 2.

type MPSCSeq

type MPSCSeq[T any] struct {
	// contains filtered or unexported fields
}

MPSCSeq is a CAS-based multi-producer single-consumer bounded queue.

Producers use CAS to claim slots. The single consumer reads sequentially.

This is the Compact variant using n slots (vs 2n for FAA-based default). Use NewMPSC for the default FAA-based implementation with better scalability.

Memory: n slots (16 bytes per slot)

func NewMPSCSeq

func NewMPSCSeq[T any](capacity int) *MPSCSeq[T]

NewMPSCSeq creates a new CAS-based MPSC queue. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewMPSC for the default FAA-based implementation.

func (*MPSCSeq[T]) Cap

func (q *MPSCSeq[T]) Cap() int

Cap returns the queue capacity.

func (*MPSCSeq[T]) Dequeue

func (q *MPSCSeq[T]) Dequeue() (T, error)

Dequeue removes and returns an element (single consumer only). Returns (zero-value, ErrWouldBlock) if the queue is empty.

func (*MPSCSeq[T]) Enqueue

func (q *MPSCSeq[T]) Enqueue(elem *T) error

Enqueue adds an element to the queue (multiple producers safe). Returns ErrWouldBlock if the queue is full.

func (*MPSCSeq[T]) Init added in v0.1.3

func (q *MPSCSeq[T]) Init(capacity int)

Init initializes a zero-value MPSCSeq queue in place. Capacity rounds up to the next power of 2.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options configures queue creation and algorithm selection.

type Producer

type Producer[T any] interface {
	// Enqueue adds an element to the queue (non-blocking).
	// The element is copied into the queue's internal buffer.
	// Returns nil on success, ErrWouldBlock if the queue is full.
	//
	// Thread safety depends on queue type:
	//   - SPSC: single producer only
	//   - MPSC/MPMC: multiple producers safe
	//   - SPMC: single producer only
	Enqueue(elem *T) error
}

Producer is the interface for enqueueing elements.

Producer provides non-blocking enqueue operations. The element is passed by pointer to avoid copying large structs. The queue stores a copy of the pointed-to value, so the original can be modified after Enqueue returns.

type ProducerIndirect

type ProducerIndirect interface {
	// Enqueue adds an element to the queue.
	// Returns ErrWouldBlock immediately if the queue is full.
	Enqueue(elem uintptr) error
}

ProducerIndirect enqueues uintptr values (non-blocking).

type ProducerPtr

type ProducerPtr interface {
	// Enqueue adds an element to the queue.
	// Returns ErrWouldBlock immediately if the queue is full.
	Enqueue(elem unsafe.Pointer) error
}

ProducerPtr enqueues unsafe.Pointer values (non-blocking).

type Queue

type Queue[T any] interface {
	Producer[T]
	Consumer[T]
	Cap() int
}

Queue is the combined producer-consumer interface for a FIFO queue.

Queue provides non-blocking Enqueue and Dequeue operations. Both operations return ErrWouldBlock when they cannot proceed (queue full or empty).

The interface intentionally excludes length because accurate counts in lock-free algorithms require expensive cross-core synchronization. Track counts in application logic when needed.

Example:

q := lfq.NewMPMC[int](1024)

// Enqueue
if err := q.Enqueue(new(42)); err != nil {
    // Handle full queue
}

// Dequeue
elem, err := q.Dequeue()
if err == nil {
    fmt.Println(elem)
}

func Build

func Build[T any](b *Builder) Queue[T]

Build creates a Queue[T] with automatic algorithm selection.

Algorithm selection:

SingleProducer + SingleConsumer → SPSC (Lamport ring buffer)
SingleProducer only             → SPMC (FAA default, CAS if Compact)
SingleConsumer only             → MPSC (FAA default, CAS if Compact)
Neither                         → MPMC (FAA default, CAS if Compact)

Default: FAA-based algorithms with 2n physical slots (better scalability). Compact(): CAS-based algorithms with n slots (half memory footprint).

For type-safe returns with concrete types, use:

  • BuildSPSC[T](b) → *SPSC[T]
  • BuildMPSC[T](b) → *MPSC[T] (or *MPSCSeq[T] if Compact)
  • BuildSPMC[T](b) → *SPMC[T] (or *SPMCSeq[T] if Compact)
  • BuildMPMC[T](b) → *MPMC[T] (or *MPMCSeq[T] if Compact)
Example

ExampleBuild demonstrates the builder API for automatic algorithm selection.

package main

import (
	"fmt"

	"code.hybscloud.com/lfq"
)

func main() {
	// SPSC - both constraints
	spsc := lfq.Build[int](lfq.New(64).SingleProducer().SingleConsumer())

	// MPSC - only single consumer constraint
	mpsc := lfq.Build[int](lfq.New(64).SingleConsumer())

	// SPMC - only single producer constraint
	spmc := lfq.Build[int](lfq.New(64).SingleProducer())

	// MPMC - no constraints
	mpmc := lfq.Build[int](lfq.New(64))

	fmt.Println("SPSC capacity:", spsc.Cap())
	fmt.Println("MPSC capacity:", mpsc.Cap())
	fmt.Println("SPMC capacity:", spmc.Cap())
	fmt.Println("MPMC capacity:", mpmc.Cap())

}
Output:

SPSC capacity: 64
MPSC capacity: 64
SPMC capacity: 64
MPMC capacity: 64

func BuildMPMC

func BuildMPMC[T any](b *Builder) Queue[T]

BuildMPMC creates an MPMC queue with compile-time type safety. Panics if builder has any constraints set.

func BuildMPSC

func BuildMPSC[T any](b *Builder) Queue[T]

BuildMPSC creates an MPSC queue with compile-time type safety. Panics if builder is not configured with SingleConsumer() only.

func BuildSPMC

func BuildSPMC[T any](b *Builder) Queue[T]

BuildSPMC creates an SPMC queue with compile-time type safety. Panics if builder is not configured with SingleProducer() only.

type QueueIndirect

type QueueIndirect interface {
	ProducerIndirect
	ConsumerIndirect
	Cap() int
}

QueueIndirect is the combined interface for indirect (uintptr) queues.

QueueIndirect passes indices or handles instead of full objects. This is useful for buffer pools, object pools, or any index-based data structure.

The interface intentionally excludes length because accurate counts in lock-free algorithms require expensive cross-core synchronization.

Example (buffer pool):

pool := make([][]byte, 1024)
freeList := lfq.NewSPSCIndirect(1024)

// Initialize pool
for i := range pool {
    pool[i] = make([]byte, 4096)
    freeList.Enqueue(uintptr(i))
}

// Allocate
idx, _ := freeList.Dequeue()
buf := pool[idx]

// Free
freeList.Enqueue(idx)

type QueuePtr

type QueuePtr interface {
	ProducerPtr
	ConsumerPtr
	Cap() int
}

QueuePtr is the combined interface for unsafe.Pointer queues.

QueuePtr passes pointers directly without copying. This enables zero-copy transfer of objects between goroutines. The producer creates an object, enqueues its pointer, and the consumer receives the same pointer.

Ownership semantics: The producer transfers ownership to the consumer. After enqueueing, the producer should not access the object.

The interface intentionally excludes length because accurate counts in lock-free algorithms require expensive cross-core synchronization.

Example:

type Message struct {
    Data []byte
}

q := lfq.NewMPMCPtr(1024)

// Producer
msg := &Message{Data: largePayload}
q.Enqueue(unsafe.Pointer(msg))
// msg ownership transferred - do not use msg after this

// Consumer
ptr, _ := q.Dequeue()
msg := (*Message)(ptr)
// msg is now owned by consumer

type SPMC

type SPMC[T any] struct {
	// contains filtered or unexported fields
}

SPMC is an FAA-based single-producer multi-consumer bounded queue.

Consumers use FAA to blindly claim positions (SCQ-style), requiring 2n physical slots for capacity n.

Memory: 2n slots for capacity n (16+ bytes per slot)

func NewSPMC

func NewSPMC[T any](capacity int) *SPMC[T]

NewSPMC creates a new FAA-based SPMC queue. Capacity rounds up to the next power of 2.

func (*SPMC[T]) Cap

func (q *SPMC[T]) Cap() int

Cap returns the queue capacity.

func (*SPMC[T]) Dequeue

func (q *SPMC[T]) Dequeue() (T, error)

Dequeue removes and returns an element (multiple consumers safe). Returns (zero-value, ErrWouldBlock) if the queue is empty.

func (*SPMC[T]) Drain added in v0.1.1

func (q *SPMC[T]) Drain()

Drain signals that no more enqueues will occur. After Drain is called, Dequeue skips the threshold check to allow consumers to drain all remaining items without producer pressure.

func (*SPMC[T]) Enqueue

func (q *SPMC[T]) Enqueue(elem *T) error

Enqueue adds an element to the queue (single producer only). Returns ErrWouldBlock if the queue is full.

func (*SPMC[T]) Init added in v0.1.3

func (q *SPMC[T]) Init(capacity int)

Init initializes a zero-value SPMC queue in place. Capacity rounds up to the next power of 2.

type SPMCCompactIndirect

type SPMCCompactIndirect struct {
	// contains filtered or unexported fields
}

SPMCCompactIndirect is a compact SPMC queue for uintptr values.

Uses round-based empty detection. Single producer writes sequentially, multiple consumers use CAS.

Memory: 8 bytes per slot

func NewSPMCCompactIndirect

func NewSPMCCompactIndirect(capacity int) *SPMCCompactIndirect

NewSPMCCompactIndirect creates a new compact SPMC queue. Capacity rounds up to the next power of 2. Values are limited to 63 bits (high bit reserved for empty flag).

func (*SPMCCompactIndirect) Cap

func (q *SPMCCompactIndirect) Cap() int

Cap returns queue capacity.

func (*SPMCCompactIndirect) Dequeue

func (q *SPMCCompactIndirect) Dequeue() (uintptr, error)

Dequeue removes and returns a value (multiple consumers safe). Returns (0, ErrWouldBlock) if the queue is empty.

func (*SPMCCompactIndirect) Enqueue

func (q *SPMCCompactIndirect) Enqueue(elem uintptr) error

Enqueue adds a value (single producer only). Values must fit in 63 bits. Returns ErrWouldBlock if the queue is full.

func (*SPMCCompactIndirect) Init added in v0.1.3

func (q *SPMCCompactIndirect) Init(capacity int)

Init initializes a zero-value SPMCCompactIndirect queue in place. Capacity rounds up to the next power of 2.

type SPMCIndirect

type SPMCIndirect struct {
	// contains filtered or unexported fields
}

SPMCIndirect is an FAA-based SPMC queue for uintptr values.

Uses 128-bit atomic operations to pack cycle and value into a single atomic entry. Based on SCQ algorithm with 2n slots for capacity n.

Entry format: [lo=cycle | hi=value]

Memory: 2n slots, 16 bytes per slot

func NewSPMCIndirect

func NewSPMCIndirect(capacity int) *SPMCIndirect

NewSPMCIndirect creates a new FAA-based SPMC queue for uintptr values. Capacity rounds up to the next power of 2.

func (*SPMCIndirect) Cap

func (q *SPMCIndirect) Cap() int

Cap returns the queue capacity.

func (*SPMCIndirect) Dequeue

func (q *SPMCIndirect) Dequeue() (uintptr, error)

Dequeue removes and returns an element (multiple consumers safe). Returns (0, ErrWouldBlock) if the queue is empty.

func (*SPMCIndirect) Drain added in v0.1.1

func (q *SPMCIndirect) Drain()

Drain signals that no more enqueues will occur. After Drain is called, Dequeue skips the threshold check to allow consumers to drain all remaining items without producer pressure.

func (*SPMCIndirect) Enqueue

func (q *SPMCIndirect) Enqueue(elem uintptr) error

Enqueue adds an element to the queue (single producer only). Returns ErrWouldBlock if the queue is full.

func (*SPMCIndirect) Init added in v0.1.3

func (q *SPMCIndirect) Init(capacity int)

Init initializes a zero-value SPMCIndirect queue in place. Capacity rounds up to the next power of 2.

type SPMCIndirectSeq

type SPMCIndirectSeq struct {
	// contains filtered or unexported fields
}

SPMCIndirectSeq is a single-producer multi-consumer queue for uintptr values.

The single producer writes sequentially. Consumers use CAS to claim slots.

Entry format: [lo=sequence | hi=value]

Memory: 16 bytes per slot (sequence + value in single Uint128)

func NewSPMCIndirectSeq

func NewSPMCIndirectSeq(capacity int) *SPMCIndirectSeq

NewSPMCIndirectSeq creates a new SPMC queue for uintptr values. Capacity rounds up to the next power of 2.

func (*SPMCIndirectSeq) Cap

func (q *SPMCIndirectSeq) Cap() int

Cap returns the queue capacity.

func (*SPMCIndirectSeq) Dequeue

func (q *SPMCIndirectSeq) Dequeue() (uintptr, error)

Dequeue removes and returns an element (multiple consumers safe). Returns (0, ErrWouldBlock) if the queue is empty.

func (*SPMCIndirectSeq) Enqueue

func (q *SPMCIndirectSeq) Enqueue(elem uintptr) error

Enqueue adds an element (single producer only). Returns ErrWouldBlock if the queue is full.

func (*SPMCIndirectSeq) Init added in v0.1.3

func (q *SPMCIndirectSeq) Init(capacity int)

Init initializes a zero-value SPMCIndirectSeq queue in place. Capacity rounds up to the next power of 2.

type SPMCPtr

type SPMCPtr struct {
	// contains filtered or unexported fields
}

SPMCPtr is an FAA-based SPMC queue for unsafe.Pointer values.

Uses 128-bit atomic operations. Based on SCQ algorithm with 2n slots.

Entry format: [lo=cycle | hi=pointer as uint64]

Memory: 2n slots, 16 bytes per slot

func NewSPMCPtr

func NewSPMCPtr(capacity int) *SPMCPtr

NewSPMCPtr creates a new FAA-based SPMC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.

func (*SPMCPtr) Cap

func (q *SPMCPtr) Cap() int

Cap returns the queue capacity.

func (*SPMCPtr) Dequeue

func (q *SPMCPtr) Dequeue() (unsafe.Pointer, error)

Dequeue removes and returns an element (multiple consumers safe). Returns (nil, ErrWouldBlock) if the queue is empty.

func (*SPMCPtr) Drain added in v0.1.1

func (q *SPMCPtr) Drain()

Drain signals that no more enqueues will occur. After Drain is called, Dequeue skips the threshold check to allow consumers to drain all remaining items without producer pressure.

func (*SPMCPtr) Enqueue

func (q *SPMCPtr) Enqueue(elem unsafe.Pointer) error

Enqueue adds an element to the queue (single producer only). Returns ErrWouldBlock if the queue is full.

func (*SPMCPtr) Init added in v0.1.3

func (q *SPMCPtr) Init(capacity int)

Init initializes a zero-value SPMCPtr queue in place. Capacity rounds up to the next power of 2.

type SPMCPtrSeq

type SPMCPtrSeq struct {
	// contains filtered or unexported fields
}

SPMCPtrSeq is a single-producer multi-consumer queue for unsafe.Pointer values.

Entry format: [lo=sequence | hi=pointer as uint64]

Memory: 16 bytes per slot

func NewSPMCPtrSeq

func NewSPMCPtrSeq(capacity int) *SPMCPtrSeq

NewSPMCPtrSeq creates a new SPMC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.

func (*SPMCPtrSeq) Cap

func (q *SPMCPtrSeq) Cap() int

Cap returns the queue capacity.

func (*SPMCPtrSeq) Dequeue

func (q *SPMCPtrSeq) Dequeue() (unsafe.Pointer, error)

Dequeue removes and returns an element (multiple consumers safe). Returns (nil, ErrWouldBlock) if the queue is empty.

func (*SPMCPtrSeq) Enqueue

func (q *SPMCPtrSeq) Enqueue(elem unsafe.Pointer) error

Enqueue adds an element (single producer only). Returns ErrWouldBlock if the queue is full.

func (*SPMCPtrSeq) Init added in v0.1.3

func (q *SPMCPtrSeq) Init(capacity int)

Init initializes a zero-value SPMCPtrSeq queue in place. Capacity rounds up to the next power of 2.

type SPMCSeq

type SPMCSeq[T any] struct {
	// contains filtered or unexported fields
}

SPMCSeq is a CAS-based single-producer multi-consumer bounded queue.

The single producer writes sequentially. Consumers use CAS to claim slots.

This is the Compact variant using n slots (vs 2n for FAA-based default). Use NewSPMC for the default FAA-based implementation with better scalability.

Memory: n slots (16 bytes per slot)

func NewSPMCSeq

func NewSPMCSeq[T any](capacity int) *SPMCSeq[T]

NewSPMCSeq creates a new CAS-based SPMC queue. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewSPMC for the default FAA-based implementation.

func (*SPMCSeq[T]) Cap

func (q *SPMCSeq[T]) Cap() int

Cap returns the queue capacity.

func (*SPMCSeq[T]) Dequeue

func (q *SPMCSeq[T]) Dequeue() (T, error)

Dequeue removes and returns an element (multiple consumers safe). Returns (zero-value, ErrWouldBlock) if the queue is empty.

func (*SPMCSeq[T]) Enqueue

func (q *SPMCSeq[T]) Enqueue(elem *T) error

Enqueue adds an element to the queue (single producer only). Returns ErrWouldBlock if the queue is full.

func (*SPMCSeq[T]) Init added in v0.1.3

func (q *SPMCSeq[T]) Init(capacity int)

Init initializes a zero-value SPMCSeq queue in place. Capacity rounds up to the next power of 2.

type SPSC

type SPSC[T any] struct {
	// contains filtered or unexported fields
}

SPSC is a single-producer single-consumer bounded queue.

Based on Lamport's ring buffer with cached index optimization. The producer caches the consumer's dequeue index, and vice versa, reducing cross-core cache line traffic.

Memory: O(capacity) with minimal per-slot overhead

func BuildSPSC

func BuildSPSC[T any](b *Builder) *SPSC[T]

BuildSPSC creates an SPSC queue with compile-time type safety. Panics if builder is not configured with SingleProducer().SingleConsumer().

func NewSPSC

func NewSPSC[T any](capacity int) *SPSC[T]

NewSPSC creates a new SPSC queue. Capacity rounds up to the next power of 2.

Example

ExampleNewSPSC demonstrates a basic SPSC queue for pipeline stages.

package main

import (
	"fmt"

	"code.hybscloud.com/lfq"
)

func main() {
	// Create a single-producer single-consumer queue
	q := lfq.NewSPSC[int](8)

	// Producer sends 5 values
	for i := 1; i <= 5; i++ {
		q.Enqueue(new(i * 10))
	}

	// Consumer receives values
	for range 5 {
		v, _ := q.Dequeue()
		fmt.Println(v)
	}

}
Output:

10
20
30
40
50

func (*SPSC[T]) Cap

func (q *SPSC[T]) Cap() int

Cap returns the queue capacity.

func (*SPSC[T]) Dequeue

func (q *SPSC[T]) Dequeue() (T, error)

Dequeue removes and returns an element (consumer only). Returns (zero-value, ErrWouldBlock) if the queue is empty.

func (*SPSC[T]) Enqueue

func (q *SPSC[T]) Enqueue(elem *T) error

Enqueue adds an element to the queue (producer only). Returns ErrWouldBlock if the queue is full.

func (*SPSC[T]) Init added in v0.1.3

func (q *SPSC[T]) Init(capacity int)

Init initializes a zero-value SPSC queue in place. Capacity rounds up to the next power of 2.

type SPSCIndirect

type SPSCIndirect struct {
	// contains filtered or unexported fields
}

SPSCIndirect is a SPSC queue for uintptr values.

func NewSPSCIndirect

func NewSPSCIndirect(capacity int) *SPSCIndirect

NewSPSCIndirect creates a new SPSC queue for uintptr values. Capacity rounds up to the next power of 2.

func (*SPSCIndirect) Cap

func (q *SPSCIndirect) Cap() int

Cap returns the queue capacity.

func (*SPSCIndirect) Dequeue

func (q *SPSCIndirect) Dequeue() (uintptr, error)

Dequeue removes and returns an element (consumer only).

func (*SPSCIndirect) Enqueue

func (q *SPSCIndirect) Enqueue(elem uintptr) error

Enqueue adds an element (producer only).

func (*SPSCIndirect) Init added in v0.1.3

func (q *SPSCIndirect) Init(capacity int)

Init initializes a zero-value SPSCIndirect queue in place. Capacity rounds up to the next power of 2.

type SPSCPtr

type SPSCPtr struct {
	// contains filtered or unexported fields
}

SPSCPtr is a SPSC queue for unsafe.Pointer values. Useful for zero-copy pointer passing between goroutines.

func NewSPSCPtr

func NewSPSCPtr(capacity int) *SPSCPtr

NewSPSCPtr creates a new SPSC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.

func (*SPSCPtr) Cap

func (q *SPSCPtr) Cap() int

Cap returns the queue capacity.

func (*SPSCPtr) Dequeue

func (q *SPSCPtr) Dequeue() (unsafe.Pointer, error)

Dequeue removes and returns an element (consumer only).

func (*SPSCPtr) Enqueue

func (q *SPSCPtr) Enqueue(elem unsafe.Pointer) error

Enqueue adds an element (producer only).

func (*SPSCPtr) Init added in v0.1.3

func (q *SPSCPtr) Init(capacity int)

Init initializes a zero-value SPSCPtr queue in place. Capacity rounds up to the next power of 2.

Directories

Path Synopsis
internal
asm
Package asm provides architecture-specific helpers for hot paths.
Package asm provides architecture-specific helpers for hot paths.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL