Documentation
¶
Overview ¶
Package lfq provides bounded FIFO queue implementations.
The package offers multiple queue variants optimized for different producer/consumer patterns:
- SPSC: Single-Producer Single-Consumer
- MPSC: Multi-Producer Single-Consumer
- SPMC: Single-Producer Multi-Consumer
- MPMC: Multi-Producer Multi-Consumer
Quick Start ¶
Direct constructors (recommended for most cases):
q := lfq.NewSPSC[Event](1024) q := lfq.NewMPMC[*Request](4096)
Builder API auto-selects algorithm based on constraints:
q := lfq.Build[Event](lfq.New(1024).SingleProducer().SingleConsumer()) // → SPSC q := lfq.Build[Event](lfq.New(1024).SingleConsumer()) // → MPSC q := lfq.Build[Event](lfq.New(1024).SingleProducer()) // → SPMC q := lfq.Build[Event](lfq.New(1024)) // → MPMC
Basic Usage ¶
All queues share the same interface for enqueueing and dequeueing:
// Create a queue
q := lfq.NewMPMC[int](1024)
// Enqueue (non-blocking)
value := 42
err := q.Enqueue(&value)
if lfq.IsWouldBlock(err) {
// Queue is full - handle backpressure
}
// Dequeue (non-blocking)
elem, err := q.Dequeue()
if lfq.IsWouldBlock(err) {
// Queue is empty - try again later
}
Common Patterns ¶
Pipeline Stage (SPSC):
// Stage 1 → Queue → Stage 2
q := lfq.NewSPSC[Data](1024)
go func() { // Producer (Stage 1)
backoff := iox.Backoff{}
for data := range input {
for q.Enqueue(&data) != nil {
backoff.Wait()
}
backoff.Reset()
}
}()
go func() { // Consumer (Stage 2)
backoff := iox.Backoff{}
for {
data, err := q.Dequeue()
if err != nil {
backoff.Wait()
continue
}
backoff.Reset()
process(data)
}
}()
Event Aggregation (MPSC):
// Multiple event sources → Single processor
q := lfq.NewMPSC[Event](4096)
// Multiple producers (event sources)
for sensor := range slices.Value(sensors) {
go func(s Sensor) {
for ev := range s.Events() {
q.Enqueue(&ev)
}
}(sensor)
}
// Single consumer (aggregator)
go func() {
for {
ev, err := q.Dequeue()
if err == nil {
aggregate(ev)
}
}
}()
Work Distribution (SPMC):
// Single dispatcher → Multiple workers
q := lfq.NewSPMC[Task](1024)
// Single producer (dispatcher)
go func() {
backoff := iox.Backoff{}
for task := range tasks {
for q.Enqueue(&task) != nil {
backoff.Wait()
}
backoff.Reset()
}
}()
// Multiple consumers (workers)
for range numWorkers {
go func() {
for {
task, err := q.Dequeue()
if err == nil {
task.Execute()
}
}
}()
}
Worker Pool (MPMC):
// Multiple submitters → Multiple workers
q := lfq.NewMPMC[Job](4096)
// Workers
for range numWorkers {
go func() {
for {
job, err := q.Dequeue()
if err == nil {
job.Run()
}
}
}()
}
// Submit jobs from anywhere
func Submit(j Job) error {
return q.Enqueue(&j)
}
Queue Variants ¶
Three queue flavors are available for different use cases:
Build[T] - Generic type-safe queue for any type BuildIndirect() - Queue for uintptr values (pool indices, handles) BuildPtr() - Queue for unsafe.Pointer (zero-copy pointer passing)
When to use Indirect:
// Buffer pool with index-based access
pool := make([][]byte, 1024)
freeList := lfq.NewSPSCIndirect(1024)
// Initialize free list with buffer indices
for i := range pool {
pool[i] = make([]byte, 4096)
freeList.Enqueue(uintptr(i))
}
// Allocate: get index from free list
idx, err := freeList.Dequeue()
buf := pool[idx]
// Free: return index to free list
freeList.Enqueue(idx)
When to use Ptr:
// Zero-copy object passing between goroutines
q := lfq.NewMPMCPtr(1024)
// Producer creates object once
msg := &Message{Data: largePayload}
q.Enqueue(unsafe.Pointer(msg))
// Consumer receives same pointer - no copy
ptr, _ := q.Dequeue()
msg := (*Message)(ptr)
Algorithm Selection ¶
The builder selects algorithms based on constraints and Compact() hint:
Default (FAA-based, 2n slots for capacity n):
SPSC: Lamport ring buffer (n slots, already optimal) MPSC: FAA producers, sequential consumer SPMC: Sequential producer, FAA consumers MPMC: FAA-based SCQ algorithm
With Compact() (CAS-based, n slots for capacity n):
SPSC: Same as default (already optimal) MPSC: CAS producers, sequential consumer SPMC: Sequential producer, CAS consumers MPMC: Sequence-based algorithm
FAA (Fetch-And-Add) scales better under high contention but requires 2n physical slots. Use Compact() when memory efficiency is critical.
Type-safe builder functions enforce constraints at compile time:
BuildSPSC[T](b) → *SPSC[T] // Requires SP + SC BuildMPSC[T](b) → Queue[T] // Requires SC only BuildSPMC[T](b) → Queue[T] // Requires SP only BuildMPMC[T](b) → Queue[T] // Requires no constraints
Performance Hints ¶
Compact() selects CAS-based algorithms with n physical slots (vs 2n for FAA-based default). Use when memory efficiency is more important than contention scalability:
// Compact mode - CAS-based, n slots (works with all queue types) q := lfq.Build[Event](lfq.New(4096).Compact()) q := lfq.New(4096).Compact().BuildIndirect() q := lfq.New(4096).Compact().BuildPtr()
SPSC variants already use n slots (Lamport ring buffer) and ignore Compact().
Error Handling ¶
Queues return ErrWouldBlock when operations cannot proceed. This error is sourced from code.hybscloud.com/iox for ecosystem consistency.
// Retry loop with backoff
backoff := iox.Backoff{}
for {
err := q.Enqueue(&item)
if err == nil {
backoff.Reset()
break
}
if !lfq.IsWouldBlock(err) {
return err // Unexpected error
}
backoff.Wait()
}
For semantic error classification (delegates to iox):
lfq.IsWouldBlock(err) // true if queue full/empty lfq.IsSemantic(err) // true if control flow signal lfq.IsNonFailure(err) // true if nil or ErrWouldBlock
Capacity and Length ¶
Capacity rounds up to the next power of 2:
q := lfq.NewMPMC[int](3) // Actual capacity: 4 q := lfq.NewMPMC[int](4) // Actual capacity: 4 q := lfq.NewMPMC[int](1000) // Actual capacity: 1024 q := lfq.NewMPMC[int](1024) // Actual capacity: 1024
Minimum capacity is 2 (already a power of 2). Panic if capacity < 2.
Length is intentionally not provided because accurate counts in lock-free algorithms require expensive cross-core synchronization. Track counts in application logic when needed. # Thread Safety
All queue operations are thread-safe within their access pattern constraints:
- SPSC: One producer goroutine, one consumer goroutine
- MPSC: Multiple producer goroutines, one consumer goroutine
- SPMC: One producer goroutine, multiple consumer goroutines
- MPMC: Multiple producer and consumer goroutines
Violating these constraints (e.g., multiple producers on SPSC) causes undefined behavior including data corruption and races.
Race Detection ¶
Go's race detector is not designed for lock-free algorithm verification. The race detector tracks explicit synchronization primitives (mutex, channels, WaitGroup) but cannot observe happens-before relationships established through atomic memory orderings (acquire-release semantics).
Lock-free queues use sequence numbers with acquire-release semantics to protect non-atomic data fields. These algorithms are correct, but the race detector may report false positives because it cannot track synchronization provided by atomic operations on separate variables.
For lock-free algorithm correctness verification, use:
- Formal verification tools (TLA+, SPIN)
- Stress testing without race detector
- Memory model analysis
Tests incompatible with race detection are excluded via //go:build !race.
Dependencies ¶
This package uses code.hybscloud.com/iox for semantic errors, code.hybscloud.com/atomix for atomic primitives with explicit memory ordering, and code.hybscloud.com/spin for CPU pause instructions.
Example (Backpressure) ¶
Example_backpressure demonstrates handling backpressure with a full queue.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
// Small queue to demonstrate backpressure
q := lfq.NewSPSC[int](3) // Cap()=4 with new semantics
// Fill the queue
filled := 0
for i := 1; i <= 10; i++ {
v := i
err := q.Enqueue(&v)
if err == nil {
filled++
} else if lfq.IsWouldBlock(err) {
fmt.Printf("Backpressure at item %d (queue full)\n", i)
break
}
}
fmt.Printf("Filled %d items\n", filled)
// Drain some items to make room
for range 2 {
v, _ := q.Dequeue()
fmt.Printf("Drained: %d\n", v)
}
// Now we can enqueue more
v := 100
if q.Enqueue(&v) == nil {
fmt.Println("Enqueued 100 after draining")
}
}
Output: Backpressure at item 5 (queue full) Filled 4 items Drained: 1 Drained: 2 Enqueued 100 after draining
Example (BatchProcessing) ¶
Example_batchProcessing demonstrates collecting items into batches.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
q := lfq.NewSPSC[int](64)
// Single producer submits items sequentially
for i := 1; i <= 9; i++ {
v := i
q.Enqueue(&v)
}
// Batch processing: collect up to batchSize items
batchSize := 4
batch := make([]int, 0, batchSize)
batchNum := 0
for {
for len(batch) < batchSize {
v, err := q.Dequeue()
if err != nil {
break
}
batch = append(batch, v)
}
if len(batch) == 0 {
break
}
batchNum++
fmt.Printf("Batch %d: %v\n", batchNum, batch)
batch = batch[:0]
}
}
Output: Batch 1: [1 2 3 4] Batch 2: [5 6 7 8] Batch 3: [9]
Example (BufferPool) ¶
Example_bufferPool demonstrates using Indirect queue for buffer pool management.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
const poolSize = 4
const bufSize = 64
// Create buffer pool
pool := make([][]byte, poolSize)
for i := range pool {
pool[i] = make([]byte, bufSize)
}
// Free list tracks available buffer indices
freeList := lfq.NewSPSCIndirect(poolSize)
freeCount := poolSize
// Initialize: all buffers are free
for i := range poolSize {
freeList.Enqueue(uintptr(i))
}
// Allocate a buffer
allocate := func() ([]byte, uintptr, bool) {
idx, err := freeList.Dequeue()
if err != nil {
return nil, 0, false // Pool exhausted
}
freeCount--
return pool[idx], idx, true
}
// Release a buffer back to pool
release := func(idx uintptr) {
freeList.Enqueue(idx)
freeCount++
}
// Usage demonstration
fmt.Printf("Free buffers: %d\n", freeCount)
buf1, idx1, ok := allocate()
if ok {
copy(buf1, "hello")
fmt.Printf("Allocated buffer %d, free: %d\n", idx1, freeCount)
}
buf2, idx2, ok := allocate()
if ok {
copy(buf2, "world")
fmt.Printf("Allocated buffer %d, free: %d\n", idx2, freeCount)
}
release(idx1)
fmt.Printf("Released buffer %d, free: %d\n", idx1, freeCount)
release(idx2)
fmt.Printf("Released buffer %d, free: %d\n", idx2, freeCount)
}
Output: Free buffers: 4 Allocated buffer 0, free: 3 Allocated buffer 1, free: 2 Released buffer 0, free: 3 Released buffer 1, free: 4
Example (CompactMode) ¶
Example_compactMode demonstrates compact mode for memory-constrained scenarios.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
// Compact Indirect: 8 bytes per slot (vs 16 bytes standard)
// Values limited to 63 bits
q := lfq.New(1024).Compact().BuildIndirect()
// Use for buffer pool indices where nil/zero is not needed
for i := uintptr(1); i <= 5; i++ {
q.Enqueue(i) // Values must be < (1 << 63)
}
fmt.Printf("Queue capacity: %d\n", q.Cap())
// Dequeue until empty (ErrWouldBlock)
for {
idx, err := q.Dequeue()
if lfq.IsWouldBlock(err) {
break
}
fmt.Printf("Index: %d\n", idx)
}
}
Output: Queue capacity: 1024 Index: 1 Index: 2 Index: 3 Index: 4 Index: 5
Example (Pipeline) ¶
Example_pipeline demonstrates a multi-stage pipeline using SPSC queues.
package main
import (
"fmt"
"sync"
"code.hybscloud.com/iox"
"code.hybscloud.com/lfq"
)
func main() {
// Pipeline: Generate → Double → Print
stage1to2 := lfq.NewSPSC[int](8) // Generate → Double
stage2to3 := lfq.NewSPSC[int](8) // Double → Print
var wg sync.WaitGroup
results := make([]int, 0, 5)
var mu sync.Mutex
// Stage 1: Generate numbers 1-5
wg.Add(1)
go func() {
defer wg.Done()
backoff := iox.Backoff{}
for i := 1; i <= 5; i++ {
v := i
for stage1to2.Enqueue(&v) != nil {
backoff.Wait()
}
backoff.Reset()
}
}()
// Stage 2: Double each number
wg.Add(1)
go func() {
defer wg.Done()
backoffDeq := iox.Backoff{}
backoffEnq := iox.Backoff{}
processed := 0
for processed < 5 {
v, err := stage1to2.Dequeue()
if err != nil {
backoffDeq.Wait()
continue
}
backoffDeq.Reset()
doubled := v * 2
for stage2to3.Enqueue(&doubled) != nil {
backoffEnq.Wait()
}
backoffEnq.Reset()
processed++
}
}()
// Stage 3: Collect results
wg.Add(1)
go func() {
defer wg.Done()
backoff := iox.Backoff{}
for len(results) < 5 {
v, err := stage2to3.Dequeue()
if err != nil {
backoff.Wait()
continue
}
backoff.Reset()
mu.Lock()
results = append(results, v)
mu.Unlock()
}
}()
wg.Wait()
for i, v := range results {
fmt.Printf("Stage output %d: %d\n", i, v)
}
}
Output: Stage output 0: 2 Stage output 1: 4 Stage output 2: 6 Stage output 3: 8 Stage output 4: 10
Example (WorkerPool) ¶
Example_workerPool demonstrates a worker pool pattern using MPMC.
package main
import (
"fmt"
"sync"
"code.hybscloud.com/atomix"
"code.hybscloud.com/iox"
"code.hybscloud.com/lfq"
)
func main() {
type Job struct {
ID int
Input int
Result int
}
// Job queue and results
jobs := lfq.NewMPMC[Job](16)
results := make([]int, 5)
var wg sync.WaitGroup
var completed atomix.Int32
// Start 3 workers
for w := range 3 {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
backoff := iox.Backoff{}
for completed.Load() < 5 {
job, err := jobs.Dequeue()
if err != nil {
backoff.Wait()
continue
}
backoff.Reset()
// Process job: square the input
job.Result = job.Input * job.Input
results[job.ID] = job.Result
completed.Add(1)
}
}(w)
}
// Submit 5 jobs
backoff := iox.Backoff{}
for i := range 5 {
job := Job{ID: i, Input: i + 1}
for jobs.Enqueue(&job) != nil {
backoff.Wait()
}
backoff.Reset()
}
wg.Wait()
// Print results
for i, r := range results {
fmt.Printf("Job %d: %d² = %d\n", i, i+1, r)
}
}
Output: Job 0: 1² = 1 Job 1: 2² = 4 Job 2: 3² = 9 Job 3: 4² = 16 Job 4: 5² = 25
Index ¶
- Constants
- Variables
- func IsNonFailure(err error) bool
- func IsSemantic(err error) bool
- func IsWouldBlock(err error) bool
- type Builder
- func (b *Builder) BuildIndirect() QueueIndirect
- func (b *Builder) BuildIndirectMPMC() QueueIndirect
- func (b *Builder) BuildIndirectMPSC() QueueIndirect
- func (b *Builder) BuildIndirectSPMC() QueueIndirect
- func (b *Builder) BuildIndirectSPSC() *SPSCIndirect
- func (b *Builder) BuildPtr() QueuePtr
- func (b *Builder) BuildPtrMPMC() QueuePtr
- func (b *Builder) BuildPtrMPSC() QueuePtr
- func (b *Builder) BuildPtrSPMC() QueuePtr
- func (b *Builder) BuildPtrSPSC() *SPSCPtr
- func (b *Builder) Compact() *Builder
- func (b *Builder) SingleConsumer() *Builder
- func (b *Builder) SingleProducer() *Builder
- type Consumer
- type ConsumerIndirect
- type ConsumerPtr
- type MPMC
- type MPMCCompactIndirect
- type MPMCIndirect
- type MPMCIndirectSeq
- type MPMCPtr
- type MPMCPtrSeq
- type MPMCSeq
- type MPSC
- type MPSCCompactIndirect
- type MPSCIndirect
- type MPSCIndirectSeq
- type MPSCPtr
- type MPSCPtrSeq
- type MPSCSeq
- type Options
- type Producer
- type ProducerIndirect
- type ProducerPtr
- type Queue
- type QueueIndirect
- type QueuePtr
- type SPMC
- type SPMCCompactIndirect
- type SPMCIndirect
- type SPMCIndirectSeq
- type SPMCPtr
- type SPMCPtrSeq
- type SPMCSeq
- type SPSC
- type SPSCIndirect
- type SPSCPtr
Examples ¶
Constants ¶
const RaceEnabled = false
RaceEnabled is false when the race detector is not active.
Variables ¶
var ErrWouldBlock = iox.ErrWouldBlock
ErrWouldBlock indicates the operation cannot proceed immediately.
For Enqueue: the queue is full (backpressure) For Dequeue: the queue is empty (no data available)
ErrWouldBlock is a control flow signal, not a failure. The caller should retry the operation later (with backoff or yield) rather than propagating the error.
This is an alias for iox.ErrWouldBlock for ecosystem consistency.
Example:
backoff := iox.Backoff{}
for {
err := q.Enqueue(&item)
if err == nil {
backoff.Reset()
break
}
if lfq.IsWouldBlock(err) {
backoff.Wait() // Adaptive backpressure
continue
}
return err // Unexpected error
}
Functions ¶
func IsNonFailure ¶
IsNonFailure reports whether err represents a non-failure condition. Returns true for nil, ErrWouldBlock, or ErrMore. Delegates to iox.IsNonFailure.
func IsSemantic ¶
IsSemantic reports whether err is a control flow signal (not a failure). Delegates to iox.IsSemantic.
func IsWouldBlock ¶
IsWouldBlock reports whether err indicates the operation would block. Delegates to iox.IsWouldBlock for wrapped error support.
Example ¶
ExampleIsWouldBlock demonstrates error handling patterns.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
q := lfq.NewSPSC[int](2) // Cap()=2
// Fill the queue
one, two := 1, 2
q.Enqueue(&one)
q.Enqueue(&two)
// Queue is full
five := 5
err := q.Enqueue(&five)
if lfq.IsWouldBlock(err) {
fmt.Println("Queue full - applying backpressure")
}
// Drain the queue
q.Dequeue()
q.Dequeue()
// Queue is empty
_, err = q.Dequeue()
if lfq.IsWouldBlock(err) {
fmt.Println("Queue empty - no data available")
}
}
Output: Queue full - applying backpressure Queue empty - no data available
Types ¶
type Builder ¶
type Builder struct {
// contains filtered or unexported fields
}
Builder creates queues with fluent configuration.
Builder provides a fluent API for configuring and creating queues. The builder automatically selects the algorithm based on producer/consumer constraints and performance hints.
Example:
// SPSC queue (optimal for single producer/consumer) q := lfq.BuildSPSC[Event](lfq.New(1024).SingleProducer().SingleConsumer()) // MPMC queue (default, general purpose) q := lfq.BuildMPMC[Request](lfq.New(4096)) // Compact indirect queue for memory efficiency q := lfq.New(8192).Compact().BuildIndirect()
func New ¶
New creates a queue builder with the given capacity.
Capacity rounds up to the next power of 2. For example, capacity=4 results in actual capacity=4, capacity=1000 results in actual capacity=1024.
Panics if capacity < 2.
Example:
// Create builder, then configure and build b := lfq.New(1024) q := lfq.BuildSPSC[int](b.SingleProducer().SingleConsumer()) // Or chain directly q := lfq.BuildMPMC[int](lfq.New(1024))
func (*Builder) BuildIndirect ¶
func (b *Builder) BuildIndirect() QueueIndirect
BuildIndirect creates a QueueIndirect for uintptr values.
Algorithm selection:
- SPSC (SingleProducer + SingleConsumer) → Lamport ring buffer
- Compact() → CAS-based algorithms (n slots, values limited to 63 bits)
- Default → FAA-based algorithms (2n slots)
func (*Builder) BuildIndirectMPMC ¶
func (b *Builder) BuildIndirectMPMC() QueueIndirect
BuildIndirectMPMC creates an MPMC queue for uintptr values. Panics if builder has any constraints set.
func (*Builder) BuildIndirectMPSC ¶
func (b *Builder) BuildIndirectMPSC() QueueIndirect
BuildIndirectMPSC creates an MPSC queue for uintptr values. Panics if builder is not configured with SingleConsumer() only.
func (*Builder) BuildIndirectSPMC ¶
func (b *Builder) BuildIndirectSPMC() QueueIndirect
BuildIndirectSPMC creates an SPMC queue for uintptr values. Panics if builder is not configured with SingleProducer() only.
func (*Builder) BuildIndirectSPSC ¶
func (b *Builder) BuildIndirectSPSC() *SPSCIndirect
BuildIndirectSPSC creates an SPSC queue for uintptr values.
func (*Builder) BuildPtr ¶
BuildPtr creates a QueuePtr for unsafe.Pointer values.
Algorithm selection:
- SPSC (SingleProducer + SingleConsumer) → Lamport ring buffer
- Other configurations → FAA default (2n slots), CAS if Compact (n slots)
Default: FAA-based algorithms with 2n physical slots (better scalability). Compact(): CAS-based algorithms with n slots (half memory footprint).
func (*Builder) BuildPtrMPMC ¶
BuildPtrMPMC creates an MPMC queue for unsafe.Pointer values. Panics if builder has any constraints set.
func (*Builder) BuildPtrMPSC ¶
BuildPtrMPSC creates an MPSC queue for unsafe.Pointer values. Panics if builder is not configured with SingleConsumer() only.
func (*Builder) BuildPtrSPMC ¶
BuildPtrSPMC creates an SPMC queue for unsafe.Pointer values. Panics if builder is not configured with SingleProducer() only.
func (*Builder) BuildPtrSPSC ¶
BuildPtrSPSC creates an SPSC queue for unsafe.Pointer values. Panics if builder is not configured with SingleProducer().SingleConsumer().
func (*Builder) Compact ¶
Compact selects CAS-based algorithms with n physical slots instead of FAA-based algorithms with 2n slots.
Trade-off: Half memory usage, reduced scalability under high contention.
SPSC already uses n slots (Lamport ring buffer) and ignores Compact().
func (*Builder) SingleConsumer ¶
SingleConsumer declares that only one goroutine will dequeue. Enables optimized algorithms for SPSC or MPSC patterns.
func (*Builder) SingleProducer ¶
SingleProducer declares that only one goroutine will enqueue. Enables optimized algorithms for SPSC or SPMC patterns.
type Consumer ¶
type Consumer[T any] interface { // Dequeue removes and returns an element from the queue (non-blocking). // Returns the dequeued element on success. // Returns (zero-value, ErrWouldBlock) if the queue is empty. // // Thread safety depends on queue type: // - SPSC: single consumer only // - MPSC: single consumer only // - SPMC/MPMC: multiple consumers safe Dequeue() (T, error) }
Consumer is the interface for dequeueing elements.
Consumer provides non-blocking dequeue operations. The element is returned by value (copied from the queue's internal buffer). The original slot is cleared to allow garbage collection of referenced objects.
For large types (>512 bytes), consider using QueuePtr or QueueIndirect instead to avoid copy overhead.
type ConsumerIndirect ¶
type ConsumerIndirect interface {
// Dequeue removes and returns an element from the queue.
// Returns (0, ErrWouldBlock) immediately if the queue is empty.
Dequeue() (uintptr, error)
}
ConsumerIndirect dequeues uintptr values (non-blocking).
type ConsumerPtr ¶
type ConsumerPtr interface {
// Dequeue removes and returns an element from the queue.
// Returns (nil, ErrWouldBlock) immediately if the queue is empty.
Dequeue() (unsafe.Pointer, error)
}
ConsumerPtr dequeues unsafe.Pointer values (non-blocking).
type MPMC ¶
type MPMC[T any] struct { // contains filtered or unexported fields }
MPMC is an FAA-based multi-producer multi-consumer bounded queue.
Based on the SCQ (Scalable Circular Queue) algorithm by Nikolaev (DISC 2019). Uses Fetch-And-Add to blindly increment position counters, requiring 2n physical slots for capacity n. This approach scales better under high contention compared to CAS-based alternatives.
Cycle-based slot validation provides ABA safety: each slot tracks which "cycle" (round) it belongs to via cycle = position / capacity.
Memory: 2n slots for capacity n (16+ bytes per slot)
func NewMPMC ¶
NewMPMC creates a new FAA-based MPMC queue. Capacity rounds up to the next power of 2. Physical slot count is 2n for capacity n (SCQ requirement).
Example ¶
ExampleNewMPMC demonstrates a multi-producer multi-consumer queue.
package main
import (
"fmt"
"sync"
"code.hybscloud.com/iox"
"code.hybscloud.com/lfq"
)
func main() {
q := lfq.NewMPMC[string](16)
// Producers
var wg sync.WaitGroup
for p := range 3 {
wg.Add(1)
go func(id int) {
defer wg.Done()
backoff := iox.Backoff{}
msg := fmt.Sprintf("msg from producer %d", id)
for q.Enqueue(&msg) != nil {
backoff.Wait()
}
}(p)
}
// Wait for producers then consume
wg.Wait()
for {
msg, err := q.Dequeue()
if err != nil {
break
}
fmt.Println(msg)
}
}
Output: msg from producer 0 msg from producer 1 msg from producer 2
type MPMCCompactIndirect ¶
type MPMCCompactIndirect struct {
// contains filtered or unexported fields
}
MPMCCompactIndirect is a compact MPMC queue for uintptr values.
Uses round-based empty detection: empty slots store (emptyFlag | round), filled slots store the value directly. This achieves 8 bytes per slot while allowing any 63-bit value (including zero) to be enqueued.
Memory: 8 bytes per slot
func NewMPMCCompactIndirect ¶
func NewMPMCCompactIndirect(capacity int) *MPMCCompactIndirect
NewMPMCCompactIndirect creates a new compact MPMC queue. Capacity rounds up to the next power of 2. Values are limited to 63 bits (high bit reserved for empty flag).
func (*MPMCCompactIndirect) Cap ¶
func (q *MPMCCompactIndirect) Cap() int
Cap returns the queue capacity.
func (*MPMCCompactIndirect) Dequeue ¶
func (q *MPMCCompactIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns a value from the queue. Returns (0, ErrWouldBlock) if the queue is empty.
func (*MPMCCompactIndirect) Enqueue ¶
func (q *MPMCCompactIndirect) Enqueue(elem uintptr) error
Enqueue adds a value to the queue. Returns ErrWouldBlock if the queue is full. Values must fit in 63 bits (high bit must be 0).
type MPMCIndirect ¶
type MPMCIndirect struct {
// contains filtered or unexported fields
}
MPMCIndirect is an FAA-based MPMC queue for uintptr values.
Uses 128-bit atomic operations to pack cycle and value into a single atomic entry, reducing atomics per operation from 2-3 to 1. Based on SCQ algorithm (Nikolaev, DISC 2019) with 2n slots for capacity n.
Entry format: [lo=cycle | hi=value]
Memory: 2n slots, 16 bytes per slot (cycle + value in single Uint128)
func NewMPMCIndirect ¶
func NewMPMCIndirect(capacity int) *MPMCIndirect
NewMPMCIndirect creates a new FAA-based MPMC queue for uintptr values. Capacity rounds up to the next power of 2. Physical slot count is 2n for capacity n.
Example ¶
ExampleNewMPMCIndirect demonstrates pool index passing.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
// Simulate a buffer pool
bufferPool := make([][]byte, 4)
for i := range bufferPool {
bufferPool[i] = make([]byte, 1024)
}
// Queue passes indices, not buffers
q := lfq.NewMPMCIndirect(8)
// Producer "allocates" from pool by passing index
for i := range len(bufferPool) {
q.Enqueue(uintptr(i))
}
// Consumer retrieves indices and accesses pool
for range len(bufferPool) {
idx, _ := q.Dequeue()
buf := bufferPool[idx]
fmt.Printf("Got buffer %d with len %d\n", idx, len(buf))
}
}
Output: Got buffer 0 with len 1024 Got buffer 1 with len 1024 Got buffer 2 with len 1024 Got buffer 3 with len 1024
func (*MPMCIndirect) Dequeue ¶
func (q *MPMCIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns an element from the queue. Returns (0, ErrWouldBlock) if the queue is empty.
func (*MPMCIndirect) Enqueue ¶
func (q *MPMCIndirect) Enqueue(elem uintptr) error
Enqueue adds an element to the queue. Returns ErrWouldBlock if the queue is full.
type MPMCIndirectSeq ¶
type MPMCIndirectSeq struct {
// contains filtered or unexported fields
}
MPMCIndirectSeq is a CAS-based MPMC queue for uintptr values.
Uses 128-bit atomic operations to pack sequence and value into a single atomic entry, reducing atomics per operation from 2-3 to 1.
This is the Compact variant using n slots. Use NewMPMCIndirect for the default FAA-based implementation with 2n slots and better scalability.
Entry format: [lo=sequence | hi=value]
Memory: n slots, 16 bytes per slot
func NewMPMCIndirectSeq ¶
func NewMPMCIndirectSeq(capacity int) *MPMCIndirectSeq
NewMPMCIndirectSeq creates a new CAS-based MPMC queue for uintptr values. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewMPMCIndirect for the default FAA-based implementation.
func (*MPMCIndirectSeq) Dequeue ¶
func (q *MPMCIndirectSeq) Dequeue() (uintptr, error)
Dequeue removes and returns an element from the queue. Returns (0, ErrWouldBlock) if the queue is empty.
func (*MPMCIndirectSeq) Enqueue ¶
func (q *MPMCIndirectSeq) Enqueue(elem uintptr) error
Enqueue adds an element to the queue. Returns ErrWouldBlock if the queue is full.
type MPMCPtr ¶
type MPMCPtr struct {
// contains filtered or unexported fields
}
MPMCPtr is an FAA-based MPMC queue for unsafe.Pointer values.
Uses 128-bit atomic operations to pack cycle and pointer into a single atomic entry. Based on SCQ algorithm with 2n slots for capacity n.
Entry format: [lo=cycle | hi=pointer as uint64]
Memory: 2n slots, 16 bytes per slot
func NewMPMCPtr ¶
NewMPMCPtr creates a new FAA-based MPMC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.
Example ¶
ExampleNewMPMCPtr demonstrates zero-copy pointer passing.
package main
import (
"fmt"
"slices"
"unsafe"
"code.hybscloud.com/lfq"
)
func main() {
type Message struct {
ID int
Data string
}
q := lfq.NewMPMCPtr(8)
// Producer creates and enqueues messages
messages := []*Message{
{ID: 1, Data: "hello"},
{ID: 2, Data: "world"},
}
for msg := range slices.Values(messages) {
q.Enqueue(unsafe.Pointer(msg))
}
// Consumer receives pointers directly - no copy
for {
ptr, err := q.Dequeue()
if err != nil {
break
}
msg := (*Message)(ptr)
fmt.Printf("Message %d: %s\n", msg.ID, msg.Data)
}
}
Output: Message 1: hello Message 2: world
type MPMCPtrSeq ¶
type MPMCPtrSeq struct {
// contains filtered or unexported fields
}
MPMCPtrSeq is a CAS-based MPMC queue for unsafe.Pointer values.
Uses 128-bit atomic operations to pack sequence and pointer into a single atomic entry, reducing atomics per operation from 2-3 to 1.
This is the Compact variant using n slots. Use NewMPMCPtr for the default FAA-based implementation with 2n slots and better scalability.
Entry format: [lo=sequence | hi=pointer as uint64]
Memory: n slots, 16 bytes per slot
func NewMPMCPtrSeq ¶
func NewMPMCPtrSeq(capacity int) *MPMCPtrSeq
NewMPMCPtrSeq creates a new CAS-based MPMC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewMPMCPtr for the default FAA-based implementation.
type MPMCSeq ¶
type MPMCSeq[T any] struct { // contains filtered or unexported fields }
MPMCSeq is a CAS-based multi-producer multi-consumer bounded queue.
Uses per-slot sequence numbers which provide:
- Full ABA safety via sequence-based validation
- Works with both distinct and non-distinct values
- Good performance under moderate contention
This is the Compact variant using n slots (vs 2n for FAA-based default). Use NewMPMC for the default FAA-based implementation with better scalability.
Memory: n slots (16+ bytes per slot)
func NewMPMCSeq ¶
NewMPMCSeq creates a new CAS-based MPMC queue. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewMPMC for the default FAA-based implementation.
type MPSC ¶
type MPSC[T any] struct { // contains filtered or unexported fields }
MPSC is an FAA-based multi-producer single-consumer bounded queue.
Producers use FAA to blindly claim positions (SCQ-style), requiring 2n physical slots for capacity n.
Memory: 2n slots for capacity n (16+ bytes per slot)
Example (EventAggregation) ¶
ExampleMPSC_eventAggregation demonstrates using MPSC for event aggregation.
package main
import (
"fmt"
"slices"
"sync"
"code.hybscloud.com/atomix"
"code.hybscloud.com/iox"
"code.hybscloud.com/lfq"
)
func main() {
type Event struct {
Source string
Value int
}
q := lfq.NewMPSC[Event](64)
// Multiple event sources (producers)
var wg sync.WaitGroup
var total atomix.Int64
for source := range slices.Values([]string{"sensor-A", "sensor-B", "sensor-C"}) {
wg.Add(1)
go func(name string) {
defer wg.Done()
backoff := iox.Backoff{}
for i := 1; i <= 3; i++ {
ev := Event{Source: name, Value: i}
for q.Enqueue(&ev) != nil {
backoff.Wait()
}
backoff.Reset()
total.Add(1)
}
}(source)
}
// Wait for producers
wg.Wait()
// Single consumer aggregates all events
var sum int
for {
ev, err := q.Dequeue()
if err != nil {
break
}
sum += ev.Value
}
fmt.Printf("Total events: %d, Sum of values: %d\n", total.Load(), sum)
}
Output: Total events: 9, Sum of values: 18
func NewMPSC ¶
NewMPSC creates a new FAA-based MPSC queue. Capacity rounds up to the next power of 2.
type MPSCCompactIndirect ¶
type MPSCCompactIndirect struct {
// contains filtered or unexported fields
}
MPSCCompactIndirect is a compact MPSC queue for uintptr values.
Uses round-based empty detection. Multiple producers use CAS, single consumer reads sequentially.
Memory: 8 bytes per slot
func NewMPSCCompactIndirect ¶
func NewMPSCCompactIndirect(capacity int) *MPSCCompactIndirect
NewMPSCCompactIndirect creates a new compact MPSC queue. Capacity rounds up to the next power of 2. Values are limited to 63 bits (high bit reserved for empty flag).
func (*MPSCCompactIndirect) Cap ¶
func (q *MPSCCompactIndirect) Cap() int
Cap returns queue capacity.
func (*MPSCCompactIndirect) Dequeue ¶
func (q *MPSCCompactIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns a value (single consumer only). Returns (0, ErrWouldBlock) if the queue is empty.
func (*MPSCCompactIndirect) Enqueue ¶
func (q *MPSCCompactIndirect) Enqueue(elem uintptr) error
Enqueue adds a value (multiple producers safe). Values must fit in 63 bits.
type MPSCIndirect ¶
type MPSCIndirect struct {
// contains filtered or unexported fields
}
MPSCIndirect is an FAA-based MPSC queue for uintptr values.
Uses 128-bit atomic operations to pack cycle and value into a single atomic entry. Based on SCQ algorithm with 2n slots for capacity n.
Entry format: [lo=cycle | hi=value]
Memory: 2n slots, 16 bytes per slot
func NewMPSCIndirect ¶
func NewMPSCIndirect(capacity int) *MPSCIndirect
NewMPSCIndirect creates a new FAA-based MPSC queue for uintptr values. Capacity rounds up to the next power of 2.
func (*MPSCIndirect) Dequeue ¶
func (q *MPSCIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns an element (single consumer only). Returns (0, ErrWouldBlock) if the queue is empty.
func (*MPSCIndirect) Enqueue ¶
func (q *MPSCIndirect) Enqueue(elem uintptr) error
Enqueue adds an element to the queue (multiple producers safe). Returns ErrWouldBlock if the queue is full.
type MPSCIndirectSeq ¶
type MPSCIndirectSeq struct {
// contains filtered or unexported fields
}
MPSCIndirectSeq is a multi-producer single-consumer queue for uintptr values.
Producers use CAS to claim slots. The single consumer reads sequentially.
Entry format: [lo=sequence | hi=value]
Memory: 16 bytes per slot (sequence + value in single Uint128)
func NewMPSCIndirectSeq ¶
func NewMPSCIndirectSeq(capacity int) *MPSCIndirectSeq
NewMPSCIndirectSeq creates a new MPSC queue for uintptr values. Capacity rounds up to the next power of 2.
func (*MPSCIndirectSeq) Dequeue ¶
func (q *MPSCIndirectSeq) Dequeue() (uintptr, error)
Dequeue removes and returns an element (single consumer only). Returns (0, ErrWouldBlock) if the queue is empty.
func (*MPSCIndirectSeq) Enqueue ¶
func (q *MPSCIndirectSeq) Enqueue(elem uintptr) error
Enqueue adds an element to the queue (multiple producers safe). Returns ErrWouldBlock if the queue is full.
type MPSCPtr ¶
type MPSCPtr struct {
// contains filtered or unexported fields
}
MPSCPtr is an FAA-based MPSC queue for unsafe.Pointer values.
Uses 128-bit atomic operations. Based on SCQ algorithm with 2n slots.
Entry format: [lo=cycle | hi=pointer as uint64]
Memory: 2n slots, 16 bytes per slot
func NewMPSCPtr ¶
NewMPSCPtr creates a new FAA-based MPSC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.
type MPSCPtrSeq ¶
type MPSCPtrSeq struct {
// contains filtered or unexported fields
}
MPSCPtrSeq is a multi-producer single-consumer queue for unsafe.Pointer values.
Entry format: [lo=sequence | hi=pointer as uint64]
Memory: 16 bytes per slot
func NewMPSCPtrSeq ¶
func NewMPSCPtrSeq(capacity int) *MPSCPtrSeq
NewMPSCPtrSeq creates a new MPSC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.
type MPSCSeq ¶
type MPSCSeq[T any] struct { // contains filtered or unexported fields }
MPSCSeq is a CAS-based multi-producer single-consumer bounded queue.
Producers use CAS to claim slots. The single consumer reads sequentially.
This is the Compact variant using n slots (vs 2n for FAA-based default). Use NewMPSC for the default FAA-based implementation with better scalability.
Memory: n slots (16 bytes per slot)
func NewMPSCSeq ¶
NewMPSCSeq creates a new CAS-based MPSC queue. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewMPSC for the default FAA-based implementation.
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
Options configures queue creation and algorithm selection.
type Producer ¶
type Producer[T any] interface { // Enqueue adds an element to the queue (non-blocking). // The element is copied into the queue's internal buffer. // Returns nil on success, ErrWouldBlock if the queue is full. // // Thread safety depends on queue type: // - SPSC: single producer only // - MPSC/MPMC: multiple producers safe // - SPMC: single producer only Enqueue(elem *T) error }
Producer is the interface for enqueueing elements.
Producer provides non-blocking enqueue operations. The element is passed by pointer to avoid copying large structs. The queue stores a copy of the pointed-to value, so the original can be modified after Enqueue returns.
type ProducerIndirect ¶
type ProducerIndirect interface {
// Enqueue adds an element to the queue.
// Returns ErrWouldBlock immediately if the queue is full.
Enqueue(elem uintptr) error
}
ProducerIndirect enqueues uintptr values (non-blocking).
type ProducerPtr ¶
type ProducerPtr interface {
// Enqueue adds an element to the queue.
// Returns ErrWouldBlock immediately if the queue is full.
Enqueue(elem unsafe.Pointer) error
}
ProducerPtr enqueues unsafe.Pointer values (non-blocking).
type Queue ¶
Queue is the combined producer-consumer interface for a FIFO queue.
Queue provides non-blocking Enqueue and Dequeue operations. Both operations return ErrWouldBlock when they cannot proceed (queue full or empty).
The interface intentionally excludes length because accurate counts in lock-free algorithms require expensive cross-core synchronization. Track counts in application logic when needed.
Example:
q := lfq.NewMPMC[int](1024)
// Enqueue
val := 42
if err := q.Enqueue(&val); err != nil {
// Handle full queue
}
// Dequeue
elem, err := q.Dequeue()
if err == nil {
fmt.Println(elem)
}
func Build ¶
Build creates a Queue[T] with automatic algorithm selection.
Algorithm selection:
SingleProducer + SingleConsumer → SPSC (Lamport ring buffer) SingleProducer only → SPMC (FAA default, CAS if Compact) SingleConsumer only → MPSC (FAA default, CAS if Compact) Neither → MPMC (FAA default, CAS if Compact)
Default: FAA-based algorithms with 2n physical slots (better scalability). Compact(): CAS-based algorithms with n slots (half memory footprint).
For type-safe returns with concrete types, use:
- BuildSPSC[T](b) → *SPSC[T]
- BuildMPSC[T](b) → *MPSC[T] (or *MPSCSeq[T] if Compact)
- BuildSPMC[T](b) → *SPMC[T] (or *SPMCSeq[T] if Compact)
- BuildMPMC[T](b) → *MPMC[T] (or *MPMCSeq[T] if Compact)
Example ¶
ExampleBuild demonstrates the builder API for automatic algorithm selection.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
// SPSC - both constraints
spsc := lfq.Build[int](lfq.New(64).SingleProducer().SingleConsumer())
// MPSC - only single consumer constraint
mpsc := lfq.Build[int](lfq.New(64).SingleConsumer())
// SPMC - only single producer constraint
spmc := lfq.Build[int](lfq.New(64).SingleProducer())
// MPMC - no constraints
mpmc := lfq.Build[int](lfq.New(64))
fmt.Println("SPSC capacity:", spsc.Cap())
fmt.Println("MPSC capacity:", mpsc.Cap())
fmt.Println("SPMC capacity:", spmc.Cap())
fmt.Println("MPMC capacity:", mpmc.Cap())
}
Output: SPSC capacity: 64 MPSC capacity: 64 SPMC capacity: 64 MPMC capacity: 64
func BuildMPMC ¶
BuildMPMC creates an MPMC queue with compile-time type safety. Panics if builder has any constraints set.
type QueueIndirect ¶
type QueueIndirect interface {
ProducerIndirect
ConsumerIndirect
Cap() int
}
QueueIndirect is the combined interface for indirect (uintptr) queues.
QueueIndirect passes indices or handles instead of full objects. This is useful for buffer pools, object pools, or any index-based data structure.
The interface intentionally excludes length because accurate counts in lock-free algorithms require expensive cross-core synchronization.
Example (buffer pool):
pool := make([][]byte, 1024)
freeList := lfq.NewSPSCIndirect(1024)
// Initialize pool
for i := range pool {
pool[i] = make([]byte, 4096)
freeList.Enqueue(uintptr(i))
}
// Allocate
idx, _ := freeList.Dequeue()
buf := pool[idx]
// Free
freeList.Enqueue(idx)
type QueuePtr ¶
type QueuePtr interface {
ProducerPtr
ConsumerPtr
Cap() int
}
QueuePtr is the combined interface for unsafe.Pointer queues.
QueuePtr passes pointers directly without copying. This enables zero-copy transfer of objects between goroutines. The producer creates an object, enqueues its pointer, and the consumer receives the same pointer.
Ownership semantics: The producer transfers ownership to the consumer. After enqueueing, the producer should not access the object.
The interface intentionally excludes length because accurate counts in lock-free algorithms require expensive cross-core synchronization.
Example:
type Message struct {
Data []byte
}
q := lfq.NewMPMCPtr(1024)
// Producer
msg := &Message{Data: largePayload}
q.Enqueue(unsafe.Pointer(msg))
// msg ownership transferred - do not use msg after this
// Consumer
ptr, _ := q.Dequeue()
msg := (*Message)(ptr)
// msg is now owned by consumer
type SPMC ¶
type SPMC[T any] struct { // contains filtered or unexported fields }
SPMC is an FAA-based single-producer multi-consumer bounded queue.
Consumers use FAA to blindly claim positions (SCQ-style), requiring 2n physical slots for capacity n.
Memory: 2n slots for capacity n (16+ bytes per slot)
func NewSPMC ¶
NewSPMC creates a new FAA-based SPMC queue. Capacity rounds up to the next power of 2.
type SPMCCompactIndirect ¶
type SPMCCompactIndirect struct {
// contains filtered or unexported fields
}
SPMCCompactIndirect is a compact SPMC queue for uintptr values.
Uses round-based empty detection. Single producer writes sequentially, multiple consumers use CAS.
Memory: 8 bytes per slot
func NewSPMCCompactIndirect ¶
func NewSPMCCompactIndirect(capacity int) *SPMCCompactIndirect
NewSPMCCompactIndirect creates a new compact SPMC queue. Capacity rounds up to the next power of 2. Values are limited to 63 bits (high bit reserved for empty flag).
func (*SPMCCompactIndirect) Cap ¶
func (q *SPMCCompactIndirect) Cap() int
Cap returns queue capacity.
func (*SPMCCompactIndirect) Dequeue ¶
func (q *SPMCCompactIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns a value (multiple consumers safe). Returns (0, ErrWouldBlock) if the queue is empty.
func (*SPMCCompactIndirect) Enqueue ¶
func (q *SPMCCompactIndirect) Enqueue(elem uintptr) error
Enqueue adds a value (single producer only). Values must fit in 63 bits. Returns ErrWouldBlock if the queue is full.
type SPMCIndirect ¶
type SPMCIndirect struct {
// contains filtered or unexported fields
}
SPMCIndirect is an FAA-based SPMC queue for uintptr values.
Uses 128-bit atomic operations to pack cycle and value into a single atomic entry. Based on SCQ algorithm with 2n slots for capacity n.
Entry format: [lo=cycle | hi=value]
Memory: 2n slots, 16 bytes per slot
func NewSPMCIndirect ¶
func NewSPMCIndirect(capacity int) *SPMCIndirect
NewSPMCIndirect creates a new FAA-based SPMC queue for uintptr values. Capacity rounds up to the next power of 2.
func (*SPMCIndirect) Dequeue ¶
func (q *SPMCIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns an element (multiple consumers safe). Returns (0, ErrWouldBlock) if the queue is empty.
func (*SPMCIndirect) Enqueue ¶
func (q *SPMCIndirect) Enqueue(elem uintptr) error
Enqueue adds an element to the queue (single producer only). Returns ErrWouldBlock if the queue is full.
type SPMCIndirectSeq ¶
type SPMCIndirectSeq struct {
// contains filtered or unexported fields
}
SPMCIndirectSeq is a single-producer multi-consumer queue for uintptr values.
The single producer writes sequentially. Consumers use CAS to claim slots.
Entry format: [lo=sequence | hi=value]
Memory: 16 bytes per slot (sequence + value in single Uint128)
func NewSPMCIndirectSeq ¶
func NewSPMCIndirectSeq(capacity int) *SPMCIndirectSeq
NewSPMCIndirectSeq creates a new SPMC queue for uintptr values. Capacity rounds up to the next power of 2.
func (*SPMCIndirectSeq) Dequeue ¶
func (q *SPMCIndirectSeq) Dequeue() (uintptr, error)
Dequeue removes and returns an element (multiple consumers safe). Returns (0, ErrWouldBlock) if the queue is empty.
func (*SPMCIndirectSeq) Enqueue ¶
func (q *SPMCIndirectSeq) Enqueue(elem uintptr) error
Enqueue adds an element (single producer only). Returns ErrWouldBlock if the queue is full.
type SPMCPtr ¶
type SPMCPtr struct {
// contains filtered or unexported fields
}
SPMCPtr is an FAA-based SPMC queue for unsafe.Pointer values.
Uses 128-bit atomic operations. Based on SCQ algorithm with 2n slots.
Entry format: [lo=cycle | hi=pointer as uint64]
Memory: 2n slots, 16 bytes per slot
func NewSPMCPtr ¶
NewSPMCPtr creates a new FAA-based SPMC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.
type SPMCPtrSeq ¶
type SPMCPtrSeq struct {
// contains filtered or unexported fields
}
SPMCPtrSeq is a single-producer multi-consumer queue for unsafe.Pointer values.
Entry format: [lo=sequence | hi=pointer as uint64]
Memory: 16 bytes per slot
func NewSPMCPtrSeq ¶
func NewSPMCPtrSeq(capacity int) *SPMCPtrSeq
NewSPMCPtrSeq creates a new SPMC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.
type SPMCSeq ¶
type SPMCSeq[T any] struct { // contains filtered or unexported fields }
SPMCSeq is a CAS-based single-producer multi-consumer bounded queue.
The single producer writes sequentially. Consumers use CAS to claim slots.
This is the Compact variant using n slots (vs 2n for FAA-based default). Use NewSPMC for the default FAA-based implementation with better scalability.
Memory: n slots (16 bytes per slot)
func NewSPMCSeq ¶
NewSPMCSeq creates a new CAS-based SPMC queue. Capacity rounds up to the next power of 2. This is the Compact variant. Use NewSPMC for the default FAA-based implementation.
type SPSC ¶
type SPSC[T any] struct { // contains filtered or unexported fields }
SPSC is a single-producer single-consumer bounded queue.
Based on Lamport's ring buffer with cached index optimization. The producer caches the consumer's dequeue index, and vice versa, reducing cross-core cache line traffic.
Memory: O(capacity) with minimal per-slot overhead
func BuildSPSC ¶
BuildSPSC creates an SPSC queue with compile-time type safety. Panics if builder is not configured with SingleProducer().SingleConsumer().
func NewSPSC ¶
NewSPSC creates a new SPSC queue. Capacity rounds up to the next power of 2.
Example ¶
ExampleNewSPSC demonstrates a basic SPSC queue for pipeline stages.
package main
import (
"fmt"
"code.hybscloud.com/lfq"
)
func main() {
// Create a single-producer single-consumer queue
q := lfq.NewSPSC[int](8)
// Producer sends 5 values
for i := 1; i <= 5; i++ {
v := i * 10
q.Enqueue(&v)
}
// Consumer receives values
for range 5 {
v, _ := q.Dequeue()
fmt.Println(v)
}
}
Output: 10 20 30 40 50
type SPSCIndirect ¶
type SPSCIndirect struct {
// contains filtered or unexported fields
}
SPSCIndirect is a SPSC queue for uintptr values.
func NewSPSCIndirect ¶
func NewSPSCIndirect(capacity int) *SPSCIndirect
NewSPSCIndirect creates a new SPSC queue for uintptr values. Capacity rounds up to the next power of 2.
func (*SPSCIndirect) Dequeue ¶
func (q *SPSCIndirect) Dequeue() (uintptr, error)
Dequeue removes and returns an element (consumer only).
func (*SPSCIndirect) Enqueue ¶
func (q *SPSCIndirect) Enqueue(elem uintptr) error
Enqueue adds an element (producer only).
type SPSCPtr ¶
type SPSCPtr struct {
// contains filtered or unexported fields
}
SPSCPtr is a SPSC queue for unsafe.Pointer values. Useful for zero-copy pointer passing between goroutines.
func NewSPSCPtr ¶
NewSPSCPtr creates a new SPSC queue for unsafe.Pointer values. Capacity rounds up to the next power of 2.