Documentation
¶
Overview ¶
Package lfq provides bounded FIFO queue implementations.
The package offers multiple queue variants optimized for different producer/consumer patterns:
- SPSC: Single-Producer Single-Consumer
- MPSC: Multi-Producer Single-Consumer
- SPMC: Single-Producer Multi-Consumer
- MPMC: Multi-Producer Multi-Consumer
Quick Start ¶
Direct constructors (recommended for most cases):
q := lfq.NewSPSC[Event](1024) q := lfq.NewMPMC[*Request](4096)
Builder API auto-selects algorithm based on constraints:
q := lfq.Build[Event](lfq.New(1024).SingleProducer().SingleConsumer()) // → SPSC q := lfq.Build[Event](lfq.New(1024).SingleConsumer()) // → MPSC q := lfq.Build[Event](lfq.New(1024).SingleProducer()) // → SPMC q := lfq.Build[Event](lfq.New(1024)) // → MPMC
Basic Usage ¶
All queues share the same interface for enqueueing and dequeueing:
// Create a queue
q := lfq.NewMPMC[int](1024)
// Enqueue (non-blocking)
err := q.Enqueue(new(42))
if lfq.IsWouldBlock(err) {
// Queue is full - handle backpressure
}
// Dequeue (non-blocking)
elem, err := q.Dequeue()
if lfq.IsWouldBlock(err) {
// Queue is empty - try again later
}
Common Patterns ¶
Pipeline Stage (SPSC):
// Stage 1 → Queue → Stage 2
q := lfq.NewSPSC[Data](1024)
go func() { // Producer (Stage 1)
backoff := iox.Backoff{}
for data := range input {
for q.Enqueue(&data) != nil {
backoff.Wait()
}
backoff.Reset()
}
}()
go func() { // Consumer (Stage 2)
backoff := iox.Backoff{}
for {
data, err := q.Dequeue()
if err != nil {
backoff.Wait()
continue
}
backoff.Reset()
process(data)
}
}()
Event Aggregation (MPSC):
// Multiple event sources → Single processor
q := lfq.NewMPSC[Event](4096)
// Multiple producers (event sources)
for sensor := range slices.Values(sensors) {
go func(s Sensor) {
for ev := range s.Events() {
q.Enqueue(&ev)
}
}(sensor)
}
// Single consumer (aggregator)
go func() {
for {
ev, err := q.Dequeue()
if err == nil {
aggregate(ev)
}
}
}()
Work Distribution (SPMC):
// Single dispatcher → Multiple workers
q := lfq.NewSPMC[Task](1024)
// Single producer (dispatcher)
go func() {
backoff := iox.Backoff{}
for task := range tasks {
for q.Enqueue(&task) != nil {
backoff.Wait()
}
backoff.Reset()
}
}()
// Multiple consumers (workers)
for range numWorkers {
go func() {
for {
task, err := q.Dequeue()
if err == nil {
task.Execute()
}
}
}()
}
Worker Pool (MPMC):
// Multiple submitters → Multiple workers
q := lfq.NewMPMC[Job](4096)
// Workers
for range numWorkers {
go func() {
for {
job, err := q.Dequeue()
if err == nil {
job.Run()
}
}
}()
}
// Submit jobs from anywhere
func Submit(j Job) error {
return q.Enqueue(&j)
}
Queue Variants ¶
Three queue flavors are available for different use cases:
Build[T] - Generic type-safe queue for any type BuildIndirect() - Queue for uintptr values (pool indices, handles) BuildPtr() - Queue for unsafe.Pointer (zero-copy pointer passing)
When to use Indirect:
// Buffer pool with index-based access
pool := make([][]byte, 1024)
freeList := lfq.NewSPSCIndirect(1024)
// Initialize free list with buffer indices
for i := range pool {
pool[i] = make([]byte, 4096)
freeList.Enqueue(uintptr(i))
}
// Allocate: get index from free list
idx, err := freeList.Dequeue()
buf := pool[idx]
// Free: return index to free list
freeList.Enqueue(idx)
When to use Ptr:
// Zero-copy object passing between goroutines
q := lfq.NewMPMCPtr(1024)
// Producer creates object once
msg := &Message{Data: largePayload}
q.Enqueue(unsafe.Pointer(msg))
// Consumer receives same pointer - no copy
ptr, _ := q.Dequeue()
msg := (*Message)(ptr)
Algorithm Selection ¶
The builder selects algorithms based on constraints and Compact() hint:
Default (FAA-based, 2n slots for capacity n):
SPSC: Lamport ring buffer (n slots, already optimal) MPSC: FAA producers, sequential consumer SPMC: Sequential producer, FAA consumers MPMC: FAA-based SCQ algorithm
With Compact() (CAS-based, n slots for capacity n):
SPSC: Same as default (already optimal) MPSC: CAS producers, sequential consumer SPMC: Sequential producer, CAS consumers MPMC: Sequence-based algorithm
FAA (Fetch-And-Add) scales better under high contention but requires 2n physical slots. Use Compact() when memory efficiency is critical.
Type-safe builder functions enforce constraints at compile time:
BuildSPSC[T](b) → *SPSC[T] // Requires SP + SC BuildMPSC[T](b) → Queue[T] // Requires SC only BuildSPMC[T](b) → Queue[T] // Requires SP only BuildMPMC[T](b) → Queue[T] // Requires no constraints
Performance Hints ¶
Compact() selects CAS-based algorithms with n physical slots (vs 2n for FAA-based default). Use when memory efficiency is more important than contention scalability:
// Compact mode - CAS-based, n slots (works with all queue types) q := lfq.Build[Event](lfq.New(4096).Compact()) q := lfq.New(4096).Compact().BuildIndirect() q := lfq.New(4096).Compact().BuildPtr()
SPSC variants already use n slots (Lamport ring buffer) and ignore Compact().
Error Handling ¶
Queues return ErrWouldBlock when operations cannot proceed. This error is sourced from code.hybscloud.com/iox for ecosystem consistency.
// Retry loop with backoff
backoff := iox.Backoff{}
for {
err := q.Enqueue(&item)
if err == nil {
backoff.Reset()
break
}
if !lfq.IsWouldBlock(err) {
return err // Unexpected error
}
backoff.Wait()
}
For semantic error classification (delegates to iox):
lfq.IsWouldBlock(err) // true if queue full/empty lfq.IsSemantic(err) // true if control flow signal lfq.IsNonFailure(err) // true if nil or ErrWouldBlock
Capacity and Length ¶
Capacity rounds up to the next power of 2:
q := lfq.NewMPMC[int](3) // Actual capacity: 4 q := lfq.NewMPMC[int](4) // Actual capacity: 4 q := lfq.NewMPMC[int](1000) // Actual capacity: 1024 q := lfq.NewMPMC[int](1024) // Actual capacity: 1024
Minimum capacity is 2 (already a power of 2). Panic if capacity < 2.
Length is intentionally not provided because accurate counts in lock-free algorithms require expensive cross-core synchronization. Track counts in application logic when needed.
Thread Safety ¶
All queue operations are thread-safe within their access pattern constraints:
- SPSC: One producer goroutine, one consumer goroutine
- MPSC: Multiple producer goroutines, one consumer goroutine
- SPMC: One producer goroutine, multiple consumer goroutines
- MPMC: Multiple producer and consumer goroutines
Violating these constraints (e.g., multiple producers on SPSC) causes undefined behavior including data corruption and races.
Graceful Shutdown ¶
FAA-based queues (MPMC, SPMC, MPSC) include a threshold mechanism to prevent livelock. This mechanism may cause Dequeue to return ErrWouldBlock even when items remain, waiting for producer activity to reset the threshold.
For graceful shutdown scenarios where producers have finished but consumers need to drain remaining items, use the Drainer interface:
// Producer goroutines finish
prodWg.Wait()
// Signal no more enqueues will occur
if d, ok := q.(lfq.Drainer); ok {
d.Drain()
}
// Consumers can now drain all remaining items
// without threshold blocking
After Drain is called, Dequeue skips threshold checks, allowing consumers to fully drain the queue. Drain is a hint — the caller must ensure no further Enqueue calls will be made.
SPSC queues do not implement Drainer as they have no threshold mechanism. The type assertion naturally handles this case.
Race Detection ¶
Go's race detector is not designed for lock-free algorithm verification. The race detector tracks explicit synchronization primitives (mutex, channels, WaitGroup) but cannot observe happens-before relationships established through atomic memory orderings (acquire-release semantics).
Lock-free queues use sequence numbers with acquire-release semantics to protect non-atomic data fields. These algorithms are correct, but the race detector may report false positives because it cannot track synchronization provided by atomic operations on separate variables.
For lock-free algorithm correctness verification, use:
- Formal verification tools (TLA+, SPIN)
- Stress testing without race detector
- Memory model analysis
Tests incompatible with race detection are excluded via //go:build !race.
Dependencies ¶
This package uses code.hybscloud.com/iox for semantic errors, code.hybscloud.com/atomix for atomic primitives with explicit memory ordering, and code.hybscloud.com/spin for CPU pause instructions.
Example (Backpressure) ¶
Example_backpressure demonstrates handling backpressure with a full queue.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
// Small queue to demonstrate backpressure
q := lfq.NewSPSC[int](3) // Cap()=4 with new semantics
// Fill the queue
filled := 0
for i := 1; i <= 10; i++ {
err := q.Enqueue(new(i))
if err == nil {
filled++
} else if lfq.IsWouldBlock(err) {
fmt.Printf("Backpressure at item %d (queue full)\n", i)
break
}
}
fmt.Printf("Filled %d items\n", filled)
// Drain some items to make room
for range 2 {
v, _ := q.Dequeue()
fmt.Printf("Drained: %d\n", v)
}
// Now we can enqueue more
if q.Enqueue(new(100)) == nil {
fmt.Println("Enqueued 100 after draining")
}
}
Output: Backpressure at item 5 (queue full) Filled 4 items Drained: 1 Drained: 2 Enqueued 100 after draining
Example (BatchProcessing) ¶
Example_batchProcessing demonstrates collecting items into batches.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
q := lfq.NewSPSC[int](64)
// Single producer submits items sequentially
for i := 1; i <= 9; i++ {
q.Enqueue(new(i))
}
// Batch processing: collect up to batchSize items
batchSize := 4
batch := make([]int, 0, batchSize)
batchNum := 0
for {
for len(batch) < batchSize {
v, err := q.Dequeue()
if err != nil {
break
}
batch = append(batch, v)
}
if len(batch) == 0 {
break
}
batchNum++
fmt.Printf("Batch %d: %v\n", batchNum, batch)
batch = batch[:0]
}
}
Output: Batch 1: [1 2 3 4] Batch 2: [5 6 7 8] Batch 3: [9]
Example (BufferPool) ¶
Example_bufferPool demonstrates using Indirect queue for buffer pool management.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
const poolSize = 4
const bufSize = 64
// Create buffer pool
pool := make([][]byte, poolSize)
for i := range pool {
pool[i] = make([]byte, bufSize)
}
// Free list tracks available buffer indices
freeList := lfq.NewSPSCIndirect(poolSize)
freeCount := poolSize
// Initialize: all buffers are free
for i := range poolSize {
freeList.Enqueue(uintptr(i))
}
// Allocate a buffer
allocate := func() ([]byte, uintptr, bool) {
idx, err := freeList.Dequeue()
if err != nil {
return nil, 0, false // Pool exhausted
}
freeCount--
return pool[idx], idx, true
}
// Release a buffer back to pool
release := func(idx uintptr) {
freeList.Enqueue(idx)
freeCount++
}
// Usage demonstration
fmt.Printf("Free buffers: %d\n", freeCount)
buf1, idx1, ok := allocate()
if ok {
copy(buf1, "hello")
fmt.Printf("Allocated buffer %d, free: %d\n", idx1, freeCount)
}
buf2, idx2, ok := allocate()
if ok {
copy(buf2, "world")
fmt.Printf("Allocated buffer %d, free: %d\n", idx2, freeCount)
}
release(idx1)
fmt.Printf("Released buffer %d, free: %d\n", idx1, freeCount)
release(idx2)
fmt.Printf("Released buffer %d, free: %d\n", idx2, freeCount)
}
Output: Free buffers: 4 Allocated buffer 0, free: 3 Allocated buffer 1, free: 2 Released buffer 0, free: 3 Released buffer 1, free: 4
Example (CompactMode) ¶
Example_compactMode demonstrates compact mode for memory-constrained scenarios.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
// Compact Indirect: 8 bytes per slot (vs 16 bytes standard)
// Values limited to 63 bits
q := lfq.New(1024).Compact().BuildIndirect()
// Use for buffer pool indices where nil/zero is not needed
for i := uintptr(1); i <= 5; i++ {
q.Enqueue(i) // Values must be < (1 << 63)
}
fmt.Printf("Queue capacity: %d\n", q.Cap())
// Dequeue until empty (ErrWouldBlock)
for {
idx, err := q.Dequeue()
if lfq.IsWouldBlock(err) {
break
}
fmt.Printf("Index: %d\n", idx)
}
}
Output: Queue capacity: 1024 Index: 1 Index: 2 Index: 3 Index: 4 Index: 5
Example (Pipeline) ¶
Example_pipeline demonstrates a multi-stage pipeline using SPSC queues.
package main
import (
"fmt"
"sync"
"code.hybscloud.com/iox"
"code.hybscloud.com/lfq"
)
func main() {
// Pipeline: Generate → Double → Print
stage1to2 := lfq.NewSPSC[int](8) // Generate → Double
stage2to3 := lfq.NewSPSC[int](8) // Double → Print
var wg sync.WaitGroup
results := make([]int, 0, 5)
var mu sync.Mutex
// Stage 1: Generate numbers 1-5
wg.Add(1)
go func() {
defer wg.Done()
backoff := iox.Backoff{}
for i := 1; i <= 5; i++ {
v := i
for stage1to2.Enqueue(&v) != nil {
backoff.Wait()
}
backoff.Reset()
}
}()
// Stage 2: Double each number
wg.Add(1)
go func() {
defer wg.Done()
backoffDeq := iox.Backoff{}
backoffEnq := iox.Backoff{}
processed := 0
for processed < 5 {
v, err := stage1to2.Dequeue()
if err != nil {
backoffDeq.Wait()
continue
}
backoffDeq.Reset()
doubled := v * 2
for stage2to3.Enqueue(&doubled) != nil {
backoffEnq.Wait()
}
backoffEnq.Reset()
processed++
}
}()
// Stage 3: Collect results
wg.Add(1)
go func() {
defer wg.Done()
backoff := iox.Backoff{}
for len(results) < 5 {
v, err := stage2to3.Dequeue()
if err != nil {
backoff.Wait()
continue
}
backoff.Reset()
mu.Lock()
results = append(results, v)
mu.Unlock()
}
}()
wg.Wait()
for i, v := range results {
fmt.Printf("Stage output %d: %d\n", i, v)
}
}
Output: Stage output 0: 2 Stage output 1: 4 Stage output 2: 6 Stage output 3: 8 Stage output 4: 10
Example (WorkerPool) ¶
Example_workerPool demonstrates a worker pool pattern using MPMC.
package main
import (
"fmt"
"sync"
"code.hybscloud.com/atomix"
"code.hybscloud.com/iox"
"code.hybscloud.com/lfq"
)
func main() {
type Job struct {
ID int
Input int
Result int
}
// Job queue and results
jobs := lfq.NewMPMC[Job](16)
results := make([]int, 5)
var wg sync.WaitGroup
var completed atomix.Int32
// Start 3 workers
for w := range 3 {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
backoff := iox.Backoff{}
for completed.Load() < 5 {
job, err := jobs.Dequeue()
if err != nil {
backoff.Wait()
continue
}
backoff.Reset()
// Process job: square the input
job.Result = job.Input * job.Input
results[job.ID] = job.Result
completed.Add(1)
}
}(w)
}
// Submit 5 jobs
backoff := iox.Backoff{}
for i := range 5 {
job := Job{ID: i, Input: i + 1}
for jobs.Enqueue(&job) != nil {
backoff.Wait()
}
backoff.Reset()
}
wg.Wait()
// Print results
for i, r := range results {
fmt.Printf("Job %d: %d² = %d\n", i, i+1, r)
}
}
Output: Job 0: 1² = 1 Job 1: 2² = 4 Job 2: 3² = 9 Job 3: 4² = 16 Job 4: 5² = 25
Index ¶
- Constants
- Variables
- func IsNonFailure(err error) bool
- func IsSemantic(err error) bool
- func IsWouldBlock(err error) bool
- type Builder
- func (b *Builder) BuildIndirect() QueueIndirect
- func (b *Builder) BuildIndirectMPMC() QueueIndirect
- func (b *Builder) BuildIndirectMPSC() QueueIndirect
- func (b *Builder) BuildIndirectSPMC() QueueIndirect
- func (b *Builder) BuildIndirectSPSC() *SPSCIndirect
- func (b *Builder) BuildPtr() QueuePtr
- func (b *Builder) BuildPtrMPMC() QueuePtr
- func (b *Builder) BuildPtrMPSC() QueuePtr
- func (b *Builder) BuildPtrSPMC() QueuePtr
- func (b *Builder) BuildPtrSPSC() *SPSCPtr
- func (b *Builder) Compact() *Builder
- func (b *Builder) SingleConsumer() *Builder
- func (b *Builder) SingleProducer() *Builder
- type Consumer
- type ConsumerIndirect
- type ConsumerPtr
- type Drainer
- type MPMC
- type MPMCCompactIndirect
- type MPMCIndirect
- type MPMCIndirectSeq
- type MPMCPtr
- type MPMCPtrSeq
- type MPMCSeq
- type MPSC
- type MPSCCompactIndirect
- type MPSCIndirect
- type MPSCIndirectSeq
- type MPSCPtr
- type MPSCPtrSeq
- type MPSCSeq
- type Options
- type Producer
- type ProducerIndirect
- type ProducerPtr
- type Queue
- type QueueIndirect
- type QueuePtr
- type SPMC
- type SPMCCompactIndirect
- type SPMCIndirect
- type SPMCIndirectSeq
- type SPMCPtr
- type SPMCPtrSeq
- type SPMCSeq
- type SPSC
- type SPSCIndirect
- type SPSCPtr
Examples ¶
Constants ¶
const RaceEnabled = false
RaceEnabled is false when the race detector is not active.
Variables ¶
var ErrWouldBlock = iox.ErrWouldBlock
ErrWouldBlock indicates the operation cannot proceed immediately.
For Enqueue: the queue is full (backpressure) For Dequeue: the queue is empty (no data available)
ErrWouldBlock is a control flow signal, not a failure. The caller should retry the operation later (with backoff or yield) rather than propagating the error.
This is an alias for iox.ErrWouldBlock for ecosystem consistency.
Example:
backoff := iox.Backoff{}
for {
err := q.Enqueue(&item)
if err == nil {
backoff.Reset()
break
}
if lfq.IsWouldBlock(err) {
backoff.Wait() // Adaptive backpressure
continue
}
return err // Unexpected error
}
Functions ¶
func IsNonFailure ¶
IsNonFailure reports whether err represents a non-failure condition. Returns true for nil, ErrWouldBlock, or ErrMore. Delegates to iox.IsNonFailure.
func IsSemantic ¶
IsSemantic reports whether err is a control flow signal (not a failure). Delegates to iox.IsSemantic.
func IsWouldBlock ¶
IsWouldBlock reports whether err indicates the operation would block. Delegates to iox.IsWouldBlock for wrapped error support.
Example ¶
ExampleIsWouldBlock demonstrates error handling patterns.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
q := lfq.NewSPSC[int](2) // Cap()=2
// Fill the queue
q.Enqueue(new(1))
q.Enqueue(new(2))
// Queue is full
err := q.Enqueue(new(5))
if lfq.IsWouldBlock(err) {
fmt.Println("Queue full - applying backpressure")
}
// Drain the queue
q.Dequeue()
q.Dequeue()
// Queue is empty
_, err = q.Dequeue()
if lfq.IsWouldBlock(err) {
fmt.Println("Queue empty - no data available")
}
}
Output: Queue full - applying backpressure Queue empty - no data available
Types ¶
type Builder ¶
type Builder struct {
// contains filtered or unexported fields
}
Builder creates queues with fluent configuration.
Builder provides a fluent API for configuring and creating queues. The builder automatically selects the algorithm based on producer/consumer constraints and performance hints.
Example:
// SPSC queue (optimal for single producer/consumer) q := lfq.BuildSPSC[Event](lfq.New(1024).SingleProducer().SingleConsumer()) // MPMC queue (default, general purpose) q := lfq.BuildMPMC[Request](lfq.New(4096)) // Compact indirect queue for memory efficiency q := lfq.New(8192).Compact().BuildIndirect()
func New ¶
New creates a queue builder with the given capacity.
Capacity rounds up to the next power of 2. For example, capacity=4 results in actual capacity=4, capacity=1000 results in actual capacity=1024.
Panics if capacity < 2.
Example:
// Create builder, then configure and build b := lfq.New(1024) q := lfq.BuildSPSC[int](b.SingleProducer().SingleConsumer()) // Or chain directly q := lfq.BuildMPMC[int](lfq.New(1024))
func (*Builder) BuildIndirect ¶
func (b *Builder) BuildIndirect() QueueIndirect
BuildIndirect creates a QueueIndirect for uintptr values.
Algorithm selection:
- SPSC (SingleProducer + SingleConsumer) → Lamport ring buffer
- Compact() → CAS-based algorithms (n slots, values limited to 63 bits)
- Default → FAA-based algorithms (2n slots)
func (*Builder) BuildIndirectMPMC ¶
func (b *Builder) BuildIndirectMPMC() QueueIndirect
BuildIndirectMPMC creates an MPMC queue for uintptr values. Panics if builder has any constraints set.
func (*Builder) BuildIndirectMPSC ¶
func (b *Builder) BuildIndirectMPSC() QueueIndirect
BuildIndirectMPSC creates an MPSC queue for uintptr values. Panics if builder is not configured with SingleConsumer() only.
func (*Builder) BuildIndirectSPMC ¶
func (b *Builder) BuildIndirectSPMC() QueueIndirect
BuildIndirectSPMC creates an SPMC queue for uintptr values. Panics if builder is not configured with SingleProducer() only.
func (*Builder) BuildIndirectSPSC ¶
func (b *Builder) BuildIndirectSPSC() *SPSCIndirect
BuildIndirectSPSC creates an SPSC queue for uintptr values.
func (*Builder) BuildPtr ¶
BuildPtr creates a QueuePtr for unsafe.Pointer values.
Algorithm selection:
- SPSC (SingleProducer + SingleConsumer) → Lamport ring buffer
- Other configurations → FAA default (2n slots), CAS if Compact (n slots)
Default: FAA-based algorithms with 2n physical slots (better scalability). Compact(): CAS-based algorithms with n slots (half memory footprint).
func (*Builder) BuildPtrMPMC ¶
BuildPtrMPMC creates an MPMC queue for unsafe.Pointer values. Panics if builder has any constraints set.
func (*Builder) BuildPtrMPSC ¶
BuildPtrMPSC creates an MPSC queue for unsafe.Pointer values. Panics if builder is not configured with SingleConsumer() only.
func (*Builder) BuildPtrSPMC ¶
BuildPtrSPMC creates an SPMC queue for unsafe.Pointer values. Panics if builder is not configured with SingleProducer() only.
func (*Builder) BuildPtrSPSC ¶
BuildPtrSPSC creates an SPSC queue for unsafe.Pointer values. Panics if builder is not configured with SingleProducer().SingleConsumer().
func (*Builder) Compact ¶
Compact selects CAS-based algorithms with n physical slots instead of FAA-based algorithms with 2n slots.
Trade-off: Half memory usage, reduced scalability under high contention.
SPSC already uses n slots (Lamport ring buffer) and ignores Compact().
func (*Builder) SingleConsumer ¶
SingleConsumer declares that only one goroutine will dequeue. Enables optimized algorithms for SPSC or MPSC patterns.
func (*Builder) SingleProducer ¶
SingleProducer declares that only one goroutine will enqueue. Enables optimized algorithms for SPSC or SPMC patterns.
type Consumer ¶
type Consumer[T any] interface { // Dequeue removes and returns an element from the queue (non-blocking). // Returns the dequeued element on success. // Returns (zero-value, ErrWouldBlock) if the queue is empty. // // Thread safety depends on queue type: // - SPSC: single consumer only // - MPSC: single consumer only // - SPMC/MPMC: multiple consumers safe Dequeue() (T, error) }
Consumer is the interface for dequeueing elements.
Consumer provides non-blocking dequeue operations. The element is returned by value (copied from the queue's internal buffer). The original slot is cleared to allow garbage collection of referenced objects.
For large types (>512 bytes), consider using QueuePtr or QueueIndirect instead to avoid copy overhead.
type ConsumerIndirect ¶
type ConsumerIndirect interface {
// Dequeue removes and returns an element from the queue.
// Returns (0, ErrWouldBlock) immediately if the queue is empty.
Dequeue() (uintptr, error)
}
ConsumerIndirect dequeues uintptr values (non-blocking).
type ConsumerPtr ¶
type ConsumerPtr interface {
// Dequeue removes and returns an element from the queue.
// Returns (nil, ErrWouldBlock) immediately if the queue is empty.
Dequeue() (unsafe.Pointer, error)
}
ConsumerPtr dequeues unsafe.Pointer values (non-blocking).
type Drainer ¶ added in v0.1.1
type Drainer interface {
// Drain signals that no more enqueues will occur.
// After Drain is called, Dequeue skips threshold checks, allowing
// consumers to drain all remaining items without producer pressure.
//
// Drain is a hint — the caller must ensure no further Enqueue calls
// will be made after calling Drain.
Drain()
}
Drainer signals that no more enqueues will occur.
FAA-based queues (MPMC, SPMC, MPSC) implement this interface. SPSC queues do not implement Drainer as they have no threshold mechanism.
Call Drain after all producers have finished to allow consumers to drain remaining items without threshold blocking.
Example:
prodWg.Wait() // Wait for producers to finish
if d, ok := q.(lfq.Drainer); ok {
d.Drain()
}
// Consumers can now drain all remaining items
type MPMC ¶
type MPMC[T any] struct { // contains filtered or unexported fields }
MPMC is an FAA-based multi-producer multi-consumer bounded queue.
Based on the SCQ (Scalable Circular Queue) algorithm by Nikolaev (DISC 2019). Uses Fetch-And-Add to blindly increment position counters, requiring 2n physical slots for capacity n. This approach scales better under high contention compared to CAS-based alternatives.
Cycle-based slot validation provides ABA safety: each slot tracks which "cycle" (round) it belongs to via cycle = position / capacity.
Memory: 2n slots for capacity n (16+ bytes per slot)
func NewMPMC ¶
NewMPMC creates a new FAA-based MPMC queue. Capacity rounds up to the next power of 2. Physical slot count is 2n for capacity n (SCQ requirement).
Example ¶
ExampleNewMPMC demonstrates a multi-producer multi-consumer queue.
package main
import (
"fmt"
"sync"
"code.hybscloud.com/iox"
"code.hybscloud.com/lfq"
)
func main() {
q := lfq.NewMPMC[string](16)
// Producers
var wg sync.WaitGroup
for p := range 3 {
wg.Add(1)
go func(id int) {
defer wg.Done()
backoff := iox.Backoff{}
msg := fmt.Sprintf("msg from producer %d", id)
for q.Enqueue(&msg) != nil {
backoff.Wait()
}
}(p)
}
// Wait for producers then consume
wg.Wait()
for {
msg, err := q.Dequeue()
if err != nil {
break
}
fmt.Println(msg)
}
}
Output: msg from producer 0 msg from producer 1 msg from producer 2
func (*MPMC[T]) Dequeue ¶
Dequeue removes and returns an element from the queue. Returns (zero-value, ErrWouldBlock) if the queue is empty.
func (*MPMC[T]) Drain ¶ added in v0.1.1
func (q *MPMC[T]) Drain()
Drain signals that no more enqueues will occur. After Drain is called, Dequeue skips the threshold check to allow consumers to drain all remaining items without producer pressure.
type MPMCCompactIndirect ¶
type MPMCCompactIndirect struct {
// contains filtered or unexported fields
}
MPMCCompactIndirect is a compact MPMC queue for uintptr values.
Uses round-based empty detection: empty slots store (emptyFlag | round), filled slots store the value directly. This achieves 8 bytes per slot while allowing any 63-bit value (including zero) to be enqueued.
Memory: 8 bytes per slot
func NewMPMCCompactIndirect ¶
func NewMPMCCompactIndirect(capacity int) *MPMCCompactIndirect
NewMPMCCompactIndirect creates a new compact MPMC queue. Capacity rounds up to the next power of 2. Values are limited to 63 bits (high bit reserved for empty flag).
func (*MPMCCompactIndirect) Cap ¶
func (q *MPMCCompactIndirect) Cap() int
Cap returns the queue capacity.
func (*MPMCCompactIndirect) Dequeue ¶
func (q *MPMCCompactIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns a value from the queue. Returns (0, ErrWouldBlock) if the queue is empty.
func (*MPMCCompactIndirect) Enqueue ¶
func (q *MPMCCompactIndirect) Enqueue(elem uintptr) error
Enqueue adds a value to the queue. Returns ErrWouldBlock if the queue is full. Values must fit in 63 bits (high bit must be 0).
func (*MPMCCompactIndirect) Init ¶ added in v0.1.3
func (q *MPMCCompactIndirect) Init(capacity int)
Init initializes a zero-value MPMCCompactIndirect queue in place. Capacity rounds up to the next power of 2.
type MPMCIndirect ¶
type MPMCIndirect struct {
// contains filtered or unexported fields
}
MPMCIndirect is an FAA-based MPMC queue for uintptr values.
Uses 128-bit atomic operations to pack cycle and value into a single atomic entry, reducing atomics per operation from 2-3 to 1. Based on SCQ algorithm (Nikolaev, DISC 2019) with 2n slots for capacity n.
Entry format: [lo=cycle | hi=value]
Memory: 2n slots, 16 bytes per slot (cycle + value in single Uint128)
func NewMPMCIndirect ¶
func NewMPMCIndirect(capacity int) *MPMCIndirect
NewMPMCIndirect creates a new FAA-based MPMC queue for uintptr values. Capacity rounds up to the next power of 2. Physical slot count is 2n for capacity n.
Example ¶
ExampleNewMPMCIndirect demonstrates pool index passing.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
// Simulate a buffer pool
bufferPool := make([][]byte, 4)
for i := range bufferPool {
bufferPool[i] = make([]byte, 1024)
}
// Queue passes indices, not buffers
q := lfq.NewMPMCIndirect(8)
// Producer "allocates" from pool by passing index
for i := range len(bufferPool) {
q.Enqueue(uintptr(i))
}
// Consumer retrieves indices and accesses pool
for range len(bufferPool) {
idx, _ := q.Dequeue()
buf := bufferPool[idx]
fmt.Printf("Got buffer %d with len %d\n", idx, len(buf))
}
}
Output: Got buffer 0 with len 1024 Got buffer 1 with len 1024 Got buffer 2 with len 1024 Got buffer 3 with len 1024
func (*MPMCIndirect) Dequeue ¶
func (q *MPMCIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns an element from the queue. Returns (0, ErrWouldBlock) if the queue is empty.
func (*MPMCIndirect) Drain ¶ added in v0.1.1
func (q *MPMCIndirect) Drain()
Drain signals that no more enqueues will occur. After Drain is called, Dequeue skips the threshold check to allow consumers to drain all remaining items without producer pressure.
func (*MPMCIndirect) Enqueue ¶
func (q *MPMCIndirect) Enqueue(elem uintptr) error
Enqueue adds an element to the queue. Returns ErrWouldBlock if the queue is full.
func (*MPMCIndirect) Init ¶ added in v0.1.3
func (q *MPMCIndirect) Init(capacity int)
Init initializes a zero-value MPMCIndirect queue in place. Capacity rounds up to the next power of 2.
type MPMCIndirectSeq ¶
type MPMCIndirectSeq struct {
// contains filtered or unexported fields
}
MPMCIndirectSeq is a CAS-based MPMC queue for uintptr values.
Uses 128-bit atomic operations to pack sequence and value into a single atomic entry, reducing atomics per operation from 2-3 to 1.
This is the Compact variant using n slots. Use NewMPMCIndirect for the default FAA-based implementation with 2n slots and better scalability.
Entry format: [lo=sequence | hi=value]
Memory: n slots, 16 bytes per slot
func NewMPMCIndirectSeq ¶
func NewMPMCIndirectSeq(capacity int) *MPMCIndirectSeq
NewMPMCIndirectSeq creates a new CAS-based MPMC queue for uintptr values. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewMPMCIndirect for the default FAA-based implementation.
func (*MPMCIndirectSeq) Dequeue ¶
func (q *MPMCIndirectSeq) Dequeue() (uintptr, error)
Dequeue removes and returns an element from the queue. Returns (0, ErrWouldBlock) if the queue is empty.
func (*MPMCIndirectSeq) Enqueue ¶
func (q *MPMCIndirectSeq) Enqueue(elem uintptr) error
Enqueue adds an element to the queue. Returns ErrWouldBlock if the queue is full.
func (*MPMCIndirectSeq) Init ¶ added in v0.1.3
func (q *MPMCIndirectSeq) Init(capacity int)
Init initializes a zero-value MPMCIndirectSeq queue in place. Capacity rounds up to the next power of 2.
type MPMCPtr ¶
type MPMCPtr struct {
// contains filtered or unexported fields
}
MPMCPtr is an FAA-based MPMC queue for unsafe.Pointer values.
Uses 128-bit atomic operations to pack cycle and pointer into a single atomic entry. Based on SCQ algorithm with 2n slots for capacity n.
Entry format: [lo=cycle | hi=pointer as uint64]
Memory: 2n slots, 16 bytes per slot
func NewMPMCPtr ¶
NewMPMCPtr creates a new FAA-based MPMC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.
Example ¶
ExampleNewMPMCPtr demonstrates zero-copy pointer passing.
package main
import (
"fmt"
"slices"
"unsafe"
"code.hybscloud.com/lfq"
)
func main() {
type Message struct {
ID int
Data string
}
q := lfq.NewMPMCPtr(8)
// Producer creates and enqueues messages
messages := []*Message{
{ID: 1, Data: "hello"},
{ID: 2, Data: "world"},
}
for msg := range slices.Values(messages) {
q.Enqueue(unsafe.Pointer(msg))
}
// Consumer receives pointers directly - no copy
for {
ptr, err := q.Dequeue()
if err != nil {
break
}
msg := (*Message)(ptr)
fmt.Printf("Message %d: %s\n", msg.ID, msg.Data)
}
}
Output: Message 1: hello Message 2: world
func (*MPMCPtr) Dequeue ¶
Dequeue removes and returns an element from the queue. Returns (nil, ErrWouldBlock) if the queue is empty.
func (*MPMCPtr) Drain ¶ added in v0.1.1
func (q *MPMCPtr) Drain()
Drain signals that no more enqueues will occur. After Drain is called, Dequeue skips the threshold check to allow consumers to drain all remaining items without producer pressure.
type MPMCPtrSeq ¶
type MPMCPtrSeq struct {
// contains filtered or unexported fields
}
MPMCPtrSeq is a CAS-based MPMC queue for unsafe.Pointer values.
Uses 128-bit atomic operations to pack sequence and pointer into a single atomic entry, reducing atomics per operation from 2-3 to 1.
This is the Compact variant using n slots. Use NewMPMCPtr for the default FAA-based implementation with 2n slots and better scalability.
Entry format: [lo=sequence | hi=pointer as uint64]
Memory: n slots, 16 bytes per slot
func NewMPMCPtrSeq ¶
func NewMPMCPtrSeq(capacity int) *MPMCPtrSeq
NewMPMCPtrSeq creates a new CAS-based MPMC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewMPMCPtr for the default FAA-based implementation.
func (*MPMCPtrSeq) Dequeue ¶
func (q *MPMCPtrSeq) Dequeue() (unsafe.Pointer, error)
Dequeue removes and returns an element from the queue. Returns (nil, ErrWouldBlock) if the queue is empty.
func (*MPMCPtrSeq) Enqueue ¶
func (q *MPMCPtrSeq) Enqueue(elem unsafe.Pointer) error
Enqueue adds an element to the queue. Returns ErrWouldBlock if the queue is full.
func (*MPMCPtrSeq) Init ¶ added in v0.1.3
func (q *MPMCPtrSeq) Init(capacity int)
Init initializes a zero-value MPMCPtrSeq queue in place. Capacity rounds up to the next power of 2.
type MPMCSeq ¶
type MPMCSeq[T any] struct { // contains filtered or unexported fields }
MPMCSeq is a CAS-based multi-producer multi-consumer bounded queue.
Uses per-slot sequence numbers which provide:
- Full ABA safety via sequence-based validation
- Works with both distinct and non-distinct values
- Good performance under moderate contention
This is the Compact variant using n slots (vs 2n for FAA-based default). Use NewMPMC for the default FAA-based implementation with better scalability.
Memory: n slots (16+ bytes per slot)
func NewMPMCSeq ¶
NewMPMCSeq creates a new CAS-based MPMC queue. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewMPMC for the default FAA-based implementation.
func (*MPMCSeq[T]) Dequeue ¶
Dequeue removes and returns an element from the queue. Returns (zero-value, ErrWouldBlock) if the queue is empty.
type MPSC ¶
type MPSC[T any] struct { // contains filtered or unexported fields }
MPSC is an FAA-based multi-producer single-consumer bounded queue.
Producers use FAA to blindly claim positions (SCQ-style), requiring 2n physical slots for capacity n.
Memory: 2n slots for capacity n (16+ bytes per slot)
Example (EventAggregation) ¶
ExampleMPSC_eventAggregation demonstrates using MPSC for event aggregation.
package main
import (
"fmt"
"slices"
"sync"
"code.hybscloud.com/atomix"
"code.hybscloud.com/iox"
"code.hybscloud.com/lfq"
)
func main() {
type Event struct {
Source string
Value int
}
q := lfq.NewMPSC[Event](64)
// Multiple event sources (producers)
var wg sync.WaitGroup
var total atomix.Int64
for source := range slices.Values([]string{"sensor-A", "sensor-B", "sensor-C"}) {
wg.Add(1)
go func(name string) {
defer wg.Done()
backoff := iox.Backoff{}
for i := 1; i <= 3; i++ {
ev := Event{Source: name, Value: i}
for q.Enqueue(&ev) != nil {
backoff.Wait()
}
backoff.Reset()
total.Add(1)
}
}(source)
}
// Wait for producers
wg.Wait()
// Single consumer aggregates all events
var sum int
for {
ev, err := q.Dequeue()
if err != nil {
break
}
sum += ev.Value
}
fmt.Printf("Total events: %d, Sum of values: %d\n", total.Load(), sum)
}
Output: Total events: 9, Sum of values: 18
func NewMPSC ¶
NewMPSC creates a new FAA-based MPSC queue. Capacity rounds up to the next power of 2.
func (*MPSC[T]) Dequeue ¶
Dequeue removes and returns an element (single consumer only). Returns (zero-value, ErrWouldBlock) if the queue is empty.
func (*MPSC[T]) Drain ¶ added in v0.1.1
func (q *MPSC[T]) Drain()
Drain signals that no more enqueues will occur. This is a hint for graceful shutdown — the caller ensures no further enqueues will be attempted after calling Drain.
type MPSCCompactIndirect ¶
type MPSCCompactIndirect struct {
// contains filtered or unexported fields
}
MPSCCompactIndirect is a compact MPSC queue for uintptr values.
Uses round-based empty detection. Multiple producers use CAS, single consumer reads sequentially.
Memory: 8 bytes per slot
func NewMPSCCompactIndirect ¶
func NewMPSCCompactIndirect(capacity int) *MPSCCompactIndirect
NewMPSCCompactIndirect creates a new compact MPSC queue. Capacity rounds up to the next power of 2. Values are limited to 63 bits (high bit reserved for empty flag).
func (*MPSCCompactIndirect) Cap ¶
func (q *MPSCCompactIndirect) Cap() int
Cap returns queue capacity.
func (*MPSCCompactIndirect) Dequeue ¶
func (q *MPSCCompactIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns a value (single consumer only). Returns (0, ErrWouldBlock) if the queue is empty.
func (*MPSCCompactIndirect) Enqueue ¶
func (q *MPSCCompactIndirect) Enqueue(elem uintptr) error
Enqueue adds a value (multiple producers safe). Values must fit in 63 bits.
func (*MPSCCompactIndirect) Init ¶ added in v0.1.3
func (q *MPSCCompactIndirect) Init(capacity int)
Init initializes a zero-value MPSCCompactIndirect queue in place. Capacity rounds up to the next power of 2.
type MPSCIndirect ¶
type MPSCIndirect struct {
// contains filtered or unexported fields
}
MPSCIndirect is an FAA-based MPSC queue for uintptr values.
Uses 128-bit atomic operations to pack cycle and value into a single atomic entry. Based on SCQ algorithm with 2n slots for capacity n.
Entry format: [lo=cycle | hi=value]
Memory: 2n slots, 16 bytes per slot
func NewMPSCIndirect ¶
func NewMPSCIndirect(capacity int) *MPSCIndirect
NewMPSCIndirect creates a new FAA-based MPSC queue for uintptr values. Capacity rounds up to the next power of 2.
func (*MPSCIndirect) Dequeue ¶
func (q *MPSCIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns an element (single consumer only). Returns (0, ErrWouldBlock) if the queue is empty.
func (*MPSCIndirect) Drain ¶ added in v0.1.1
func (q *MPSCIndirect) Drain()
Drain signals that no more enqueues will occur. This is a hint for graceful shutdown — the caller ensures no further enqueues will be attempted after calling Drain.
func (*MPSCIndirect) Enqueue ¶
func (q *MPSCIndirect) Enqueue(elem uintptr) error
Enqueue adds an element to the queue (multiple producers safe). Returns ErrWouldBlock if the queue is full.
func (*MPSCIndirect) Init ¶ added in v0.1.3
func (q *MPSCIndirect) Init(capacity int)
Init initializes a zero-value MPSCIndirect queue in place. Capacity rounds up to the next power of 2.
type MPSCIndirectSeq ¶
type MPSCIndirectSeq struct {
// contains filtered or unexported fields
}
MPSCIndirectSeq is a multi-producer single-consumer queue for uintptr values.
Producers use CAS to claim slots. The single consumer reads sequentially.
Entry format: [lo=sequence | hi=value]
Memory: 16 bytes per slot (sequence + value in single Uint128)
func NewMPSCIndirectSeq ¶
func NewMPSCIndirectSeq(capacity int) *MPSCIndirectSeq
NewMPSCIndirectSeq creates a new MPSC queue for uintptr values. Capacity rounds up to the next power of 2.
func (*MPSCIndirectSeq) Dequeue ¶
func (q *MPSCIndirectSeq) Dequeue() (uintptr, error)
Dequeue removes and returns an element (single consumer only). Returns (0, ErrWouldBlock) if the queue is empty.
func (*MPSCIndirectSeq) Enqueue ¶
func (q *MPSCIndirectSeq) Enqueue(elem uintptr) error
Enqueue adds an element to the queue (multiple producers safe). Returns ErrWouldBlock if the queue is full.
func (*MPSCIndirectSeq) Init ¶ added in v0.1.3
func (q *MPSCIndirectSeq) Init(capacity int)
Init initializes a zero-value MPSCIndirectSeq queue in place. Capacity rounds up to the next power of 2.
type MPSCPtr ¶
type MPSCPtr struct {
// contains filtered or unexported fields
}
MPSCPtr is an FAA-based MPSC queue for unsafe.Pointer values.
Uses 128-bit atomic operations. Based on SCQ algorithm with 2n slots.
Entry format: [lo=cycle | hi=pointer as uint64]
Memory: 2n slots, 16 bytes per slot
func NewMPSCPtr ¶
NewMPSCPtr creates a new FAA-based MPSC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.
func (*MPSCPtr) Dequeue ¶
Dequeue removes and returns an element (single consumer only). Returns (nil, ErrWouldBlock) if the queue is empty.
func (*MPSCPtr) Drain ¶ added in v0.1.1
func (q *MPSCPtr) Drain()
Drain signals that no more enqueues will occur. This is a hint for graceful shutdown — the caller ensures no further enqueues will be attempted after calling Drain.
type MPSCPtrSeq ¶
type MPSCPtrSeq struct {
// contains filtered or unexported fields
}
MPSCPtrSeq is a multi-producer single-consumer queue for unsafe.Pointer values.
Entry format: [lo=sequence | hi=pointer as uint64]
Memory: 16 bytes per slot
func NewMPSCPtrSeq ¶
func NewMPSCPtrSeq(capacity int) *MPSCPtrSeq
NewMPSCPtrSeq creates a new MPSC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.
func (*MPSCPtrSeq) Dequeue ¶
func (q *MPSCPtrSeq) Dequeue() (unsafe.Pointer, error)
Dequeue removes and returns an element (single consumer only). Returns (nil, ErrWouldBlock) if the queue is empty.
func (*MPSCPtrSeq) Enqueue ¶
func (q *MPSCPtrSeq) Enqueue(elem unsafe.Pointer) error
Enqueue adds an element (multiple producers safe). Returns ErrWouldBlock if the queue is full.
func (*MPSCPtrSeq) Init ¶ added in v0.1.3
func (q *MPSCPtrSeq) Init(capacity int)
Init initializes a zero-value MPSCPtrSeq queue in place. Capacity rounds up to the next power of 2.
type MPSCSeq ¶
type MPSCSeq[T any] struct { // contains filtered or unexported fields }
MPSCSeq is a CAS-based multi-producer single-consumer bounded queue.
Producers use CAS to claim slots. The single consumer reads sequentially.
This is the Compact variant using n slots (vs 2n for FAA-based default). Use NewMPSC for the default FAA-based implementation with better scalability.
Memory: n slots (16 bytes per slot)
func NewMPSCSeq ¶
NewMPSCSeq creates a new CAS-based MPSC queue. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewMPSC for the default FAA-based implementation.
func (*MPSCSeq[T]) Dequeue ¶
Dequeue removes and returns an element (single consumer only). Returns (zero-value, ErrWouldBlock) if the queue is empty.
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
Options configures queue creation and algorithm selection.
type Producer ¶
type Producer[T any] interface { // Enqueue adds an element to the queue (non-blocking). // The element is copied into the queue's internal buffer. // Returns nil on success, ErrWouldBlock if the queue is full. // // Thread safety depends on queue type: // - SPSC: single producer only // - MPSC/MPMC: multiple producers safe // - SPMC: single producer only Enqueue(elem *T) error }
Producer is the interface for enqueueing elements.
Producer provides non-blocking enqueue operations. The element is passed by pointer to avoid copying large structs. The queue stores a copy of the pointed-to value, so the original can be modified after Enqueue returns.
type ProducerIndirect ¶
type ProducerIndirect interface {
// Enqueue adds an element to the queue.
// Returns ErrWouldBlock immediately if the queue is full.
Enqueue(elem uintptr) error
}
ProducerIndirect enqueues uintptr values (non-blocking).
type ProducerPtr ¶
type ProducerPtr interface {
// Enqueue adds an element to the queue.
// Returns ErrWouldBlock immediately if the queue is full.
Enqueue(elem unsafe.Pointer) error
}
ProducerPtr enqueues unsafe.Pointer values (non-blocking).
type Queue ¶
Queue is the combined producer-consumer interface for a FIFO queue.
Queue provides non-blocking Enqueue and Dequeue operations. Both operations return ErrWouldBlock when they cannot proceed (queue full or empty).
The interface intentionally excludes length because accurate counts in lock-free algorithms require expensive cross-core synchronization. Track counts in application logic when needed.
Example:
q := lfq.NewMPMC[int](1024)
// Enqueue
if err := q.Enqueue(new(42)); err != nil {
// Handle full queue
}
// Dequeue
elem, err := q.Dequeue()
if err == nil {
fmt.Println(elem)
}
func Build ¶
Build creates a Queue[T] with automatic algorithm selection.
Algorithm selection:
SingleProducer + SingleConsumer → SPSC (Lamport ring buffer) SingleProducer only → SPMC (FAA default, CAS if Compact) SingleConsumer only → MPSC (FAA default, CAS if Compact) Neither → MPMC (FAA default, CAS if Compact)
Default: FAA-based algorithms with 2n physical slots (better scalability). Compact(): CAS-based algorithms with n slots (half memory footprint).
For type-safe returns with concrete types, use:
- BuildSPSC[T](b) → *SPSC[T]
- BuildMPSC[T](b) → *MPSC[T] (or *MPSCSeq[T] if Compact)
- BuildSPMC[T](b) → *SPMC[T] (or *SPMCSeq[T] if Compact)
- BuildMPMC[T](b) → *MPMC[T] (or *MPMCSeq[T] if Compact)
Example ¶
ExampleBuild demonstrates the builder API for automatic algorithm selection.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
// SPSC - both constraints
spsc := lfq.Build[int](lfq.New(64).SingleProducer().SingleConsumer())
// MPSC - only single consumer constraint
mpsc := lfq.Build[int](lfq.New(64).SingleConsumer())
// SPMC - only single producer constraint
spmc := lfq.Build[int](lfq.New(64).SingleProducer())
// MPMC - no constraints
mpmc := lfq.Build[int](lfq.New(64))
fmt.Println("SPSC capacity:", spsc.Cap())
fmt.Println("MPSC capacity:", mpsc.Cap())
fmt.Println("SPMC capacity:", spmc.Cap())
fmt.Println("MPMC capacity:", mpmc.Cap())
}
Output: SPSC capacity: 64 MPSC capacity: 64 SPMC capacity: 64 MPMC capacity: 64
func BuildMPMC ¶
BuildMPMC creates an MPMC queue with compile-time type safety. Panics if builder has any constraints set.
type QueueIndirect ¶
type QueueIndirect interface {
ProducerIndirect
ConsumerIndirect
Cap() int
}
QueueIndirect is the combined interface for indirect (uintptr) queues.
QueueIndirect passes indices or handles instead of full objects. This is useful for buffer pools, object pools, or any index-based data structure.
The interface intentionally excludes length because accurate counts in lock-free algorithms require expensive cross-core synchronization.
Example (buffer pool):
pool := make([][]byte, 1024)
freeList := lfq.NewSPSCIndirect(1024)
// Initialize pool
for i := range pool {
pool[i] = make([]byte, 4096)
freeList.Enqueue(uintptr(i))
}
// Allocate
idx, _ := freeList.Dequeue()
buf := pool[idx]
// Free
freeList.Enqueue(idx)
type QueuePtr ¶
type QueuePtr interface {
ProducerPtr
ConsumerPtr
Cap() int
}
QueuePtr is the combined interface for unsafe.Pointer queues.
QueuePtr passes pointers directly without copying. This enables zero-copy transfer of objects between goroutines. The producer creates an object, enqueues its pointer, and the consumer receives the same pointer.
Ownership semantics: The producer transfers ownership to the consumer. After enqueueing, the producer should not access the object.
The interface intentionally excludes length because accurate counts in lock-free algorithms require expensive cross-core synchronization.
Example:
type Message struct {
Data []byte
}
q := lfq.NewMPMCPtr(1024)
// Producer
msg := &Message{Data: largePayload}
q.Enqueue(unsafe.Pointer(msg))
// msg ownership transferred - do not use msg after this
// Consumer
ptr, _ := q.Dequeue()
msg := (*Message)(ptr)
// msg is now owned by consumer
type SPMC ¶
type SPMC[T any] struct { // contains filtered or unexported fields }
SPMC is an FAA-based single-producer multi-consumer bounded queue.
Consumers use FAA to blindly claim positions (SCQ-style), requiring 2n physical slots for capacity n.
Memory: 2n slots for capacity n (16+ bytes per slot)
func NewSPMC ¶
NewSPMC creates a new FAA-based SPMC queue. Capacity rounds up to the next power of 2.
func (*SPMC[T]) Dequeue ¶
Dequeue removes and returns an element (multiple consumers safe). Returns (zero-value, ErrWouldBlock) if the queue is empty.
func (*SPMC[T]) Drain ¶ added in v0.1.1
func (q *SPMC[T]) Drain()
Drain signals that no more enqueues will occur. After Drain is called, Dequeue skips the threshold check to allow consumers to drain all remaining items without producer pressure.
type SPMCCompactIndirect ¶
type SPMCCompactIndirect struct {
// contains filtered or unexported fields
}
SPMCCompactIndirect is a compact SPMC queue for uintptr values.
Uses round-based empty detection. Single producer writes sequentially, multiple consumers use CAS.
Memory: 8 bytes per slot
func NewSPMCCompactIndirect ¶
func NewSPMCCompactIndirect(capacity int) *SPMCCompactIndirect
NewSPMCCompactIndirect creates a new compact SPMC queue. Capacity rounds up to the next power of 2. Values are limited to 63 bits (high bit reserved for empty flag).
func (*SPMCCompactIndirect) Cap ¶
func (q *SPMCCompactIndirect) Cap() int
Cap returns queue capacity.
func (*SPMCCompactIndirect) Dequeue ¶
func (q *SPMCCompactIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns a value (multiple consumers safe). Returns (0, ErrWouldBlock) if the queue is empty.
func (*SPMCCompactIndirect) Enqueue ¶
func (q *SPMCCompactIndirect) Enqueue(elem uintptr) error
Enqueue adds a value (single producer only). Values must fit in 63 bits. Returns ErrWouldBlock if the queue is full.
func (*SPMCCompactIndirect) Init ¶ added in v0.1.3
func (q *SPMCCompactIndirect) Init(capacity int)
Init initializes a zero-value SPMCCompactIndirect queue in place. Capacity rounds up to the next power of 2.
type SPMCIndirect ¶
type SPMCIndirect struct {
// contains filtered or unexported fields
}
SPMCIndirect is an FAA-based SPMC queue for uintptr values.
Uses 128-bit atomic operations to pack cycle and value into a single atomic entry. Based on SCQ algorithm with 2n slots for capacity n.
Entry format: [lo=cycle | hi=value]
Memory: 2n slots, 16 bytes per slot
func NewSPMCIndirect ¶
func NewSPMCIndirect(capacity int) *SPMCIndirect
NewSPMCIndirect creates a new FAA-based SPMC queue for uintptr values. Capacity rounds up to the next power of 2.
func (*SPMCIndirect) Dequeue ¶
func (q *SPMCIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns an element (multiple consumers safe). Returns (0, ErrWouldBlock) if the queue is empty.
func (*SPMCIndirect) Drain ¶ added in v0.1.1
func (q *SPMCIndirect) Drain()
Drain signals that no more enqueues will occur. After Drain is called, Dequeue skips the threshold check to allow consumers to drain all remaining items without producer pressure.
func (*SPMCIndirect) Enqueue ¶
func (q *SPMCIndirect) Enqueue(elem uintptr) error
Enqueue adds an element to the queue (single producer only). Returns ErrWouldBlock if the queue is full.
func (*SPMCIndirect) Init ¶ added in v0.1.3
func (q *SPMCIndirect) Init(capacity int)
Init initializes a zero-value SPMCIndirect queue in place. Capacity rounds up to the next power of 2.
type SPMCIndirectSeq ¶
type SPMCIndirectSeq struct {
// contains filtered or unexported fields
}
SPMCIndirectSeq is a single-producer multi-consumer queue for uintptr values.
The single producer writes sequentially. Consumers use CAS to claim slots.
Entry format: [lo=sequence | hi=value]
Memory: 16 bytes per slot (sequence + value in single Uint128)
func NewSPMCIndirectSeq ¶
func NewSPMCIndirectSeq(capacity int) *SPMCIndirectSeq
NewSPMCIndirectSeq creates a new SPMC queue for uintptr values. Capacity rounds up to the next power of 2.
func (*SPMCIndirectSeq) Dequeue ¶
func (q *SPMCIndirectSeq) Dequeue() (uintptr, error)
Dequeue removes and returns an element (multiple consumers safe). Returns (0, ErrWouldBlock) if the queue is empty.
func (*SPMCIndirectSeq) Enqueue ¶
func (q *SPMCIndirectSeq) Enqueue(elem uintptr) error
Enqueue adds an element (single producer only). Returns ErrWouldBlock if the queue is full.
func (*SPMCIndirectSeq) Init ¶ added in v0.1.3
func (q *SPMCIndirectSeq) Init(capacity int)
Init initializes a zero-value SPMCIndirectSeq queue in place. Capacity rounds up to the next power of 2.
type SPMCPtr ¶
type SPMCPtr struct {
// contains filtered or unexported fields
}
SPMCPtr is an FAA-based SPMC queue for unsafe.Pointer values.
Uses 128-bit atomic operations. Based on SCQ algorithm with 2n slots.
Entry format: [lo=cycle | hi=pointer as uint64]
Memory: 2n slots, 16 bytes per slot
func NewSPMCPtr ¶
NewSPMCPtr creates a new FAA-based SPMC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.
func (*SPMCPtr) Dequeue ¶
Dequeue removes and returns an element (multiple consumers safe). Returns (nil, ErrWouldBlock) if the queue is empty.
func (*SPMCPtr) Drain ¶ added in v0.1.1
func (q *SPMCPtr) Drain()
Drain signals that no more enqueues will occur. After Drain is called, Dequeue skips the threshold check to allow consumers to drain all remaining items without producer pressure.
type SPMCPtrSeq ¶
type SPMCPtrSeq struct {
// contains filtered or unexported fields
}
SPMCPtrSeq is a single-producer multi-consumer queue for unsafe.Pointer values.
Entry format: [lo=sequence | hi=pointer as uint64]
Memory: 16 bytes per slot
func NewSPMCPtrSeq ¶
func NewSPMCPtrSeq(capacity int) *SPMCPtrSeq
NewSPMCPtrSeq creates a new SPMC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.
func (*SPMCPtrSeq) Dequeue ¶
func (q *SPMCPtrSeq) Dequeue() (unsafe.Pointer, error)
Dequeue removes and returns an element (multiple consumers safe). Returns (nil, ErrWouldBlock) if the queue is empty.
func (*SPMCPtrSeq) Enqueue ¶
func (q *SPMCPtrSeq) Enqueue(elem unsafe.Pointer) error
Enqueue adds an element (single producer only). Returns ErrWouldBlock if the queue is full.
func (*SPMCPtrSeq) Init ¶ added in v0.1.3
func (q *SPMCPtrSeq) Init(capacity int)
Init initializes a zero-value SPMCPtrSeq queue in place. Capacity rounds up to the next power of 2.
type SPMCSeq ¶
type SPMCSeq[T any] struct { // contains filtered or unexported fields }
SPMCSeq is a CAS-based single-producer multi-consumer bounded queue.
The single producer writes sequentially. Consumers use CAS to claim slots.
This is the Compact variant using n slots (vs 2n for FAA-based default). Use NewSPMC for the default FAA-based implementation with better scalability.
Memory: n slots (16 bytes per slot)
func NewSPMCSeq ¶
NewSPMCSeq creates a new CAS-based SPMC queue. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewSPMC for the default FAA-based implementation.
func (*SPMCSeq[T]) Dequeue ¶
Dequeue removes and returns an element (multiple consumers safe). Returns (zero-value, ErrWouldBlock) if the queue is empty.
type SPSC ¶
type SPSC[T any] struct { // contains filtered or unexported fields }
SPSC is a single-producer single-consumer bounded queue.
Based on Lamport's ring buffer with cached index optimization. The producer caches the consumer's dequeue index, and vice versa, reducing cross-core cache line traffic.
Memory: O(capacity) with minimal per-slot overhead
func BuildSPSC ¶
BuildSPSC creates an SPSC queue with compile-time type safety. Panics if builder is not configured with SingleProducer().SingleConsumer().
func NewSPSC ¶
NewSPSC creates a new SPSC queue. Capacity rounds up to the next power of 2.
Example ¶
ExampleNewSPSC demonstrates a basic SPSC queue for pipeline stages.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
// Create a single-producer single-consumer queue
q := lfq.NewSPSC[int](8)
// Producer sends 5 values
for i := 1; i <= 5; i++ {
q.Enqueue(new(i * 10))
}
// Consumer receives values
for range 5 {
v, _ := q.Dequeue()
fmt.Println(v)
}
}
Output: 10 20 30 40 50
func (*SPSC[T]) Dequeue ¶
Dequeue removes and returns an element (consumer only). Returns (zero-value, ErrWouldBlock) if the queue is empty.
type SPSCIndirect ¶
type SPSCIndirect struct {
// contains filtered or unexported fields
}
SPSCIndirect is a SPSC queue for uintptr values.
func NewSPSCIndirect ¶
func NewSPSCIndirect(capacity int) *SPSCIndirect
NewSPSCIndirect creates a new SPSC queue for uintptr values. Capacity rounds up to the next power of 2.
func (*SPSCIndirect) Dequeue ¶
func (q *SPSCIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns an element (consumer only).
func (*SPSCIndirect) Enqueue ¶
func (q *SPSCIndirect) Enqueue(elem uintptr) error
Enqueue adds an element (producer only).
func (*SPSCIndirect) Init ¶ added in v0.1.3
func (q *SPSCIndirect) Init(capacity int)
Init initializes a zero-value SPSCIndirect queue in place. Capacity rounds up to the next power of 2.
type SPSCPtr ¶
type SPSCPtr struct {
// contains filtered or unexported fields
}
SPSCPtr is a SPSC queue for unsafe.Pointer values. Useful for zero-copy pointer passing between goroutines.
func NewSPSCPtr ¶
NewSPSCPtr creates a new SPSC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.