concurrent

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2025 License: MIT Imports: 7 Imported by: 0

README

concurrent

Concurrent is a library of lock-free and wait-free algorithms

Environment Requirements

  • amd64 or arm64 CPU architecture
  • Go 1.25 or later

Installation

To install the concurrent library, run the following command:

go get code.hybscloud.com/concurrent

Basic Usages

Queue (Convenience Constructor)

c, p := concurrent.NewQueue[string](256)
message := "Hello, Concurrent!"
err := concurrent.EnqueueWait(p, &message)
if err != nil {
	return err
}

result, err := concurrent.DequeueWait(c)
if err != nil {
	return err
}
println(*result)

Multi-Producer Multi-Consumer Queue

c, p := concurrent.NewMPMCQueue[int](256)
i := 100
err := p.Enqueue(&i)
if err != nil {
	return err
}

res, err := c.Dequeue()
if err != nil {
	return err
}
println(*res)

Multi-Producer Multi-Consumer Queue (Indirect)

c, p := concurrent.NewMPMCQueueIndirect(256)
index := uintptr(42)
err := p.Enqueue(index)
if err != nil {
	return err
}

value, err := c.Dequeue()
if err != nil {
	return err
}
println(value)

Spin Lock

lock := concurrent.SpinLock{} // the zero value is ready to use
lock.Lock()
...
lock.Unlock()

Spin Wait

sw := concurrent.SpinWait{} // the zero value is ready to use
sw.Once()

Next Step

Implement the sCQ lock-free FIFO queue

References

License

©2023 Hayabusa Cloud Co., Ltd.
#5F Eclat BLDG, 3-6-2 Shibuya, Shibuya City, Tokyo 150-0002, Japan
Released under the MIT license

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AndUint64 func(u64 *uint64, val uint64) = and64

AndUint64 takes val and atomically performs a bit-wise "and" operation with the value of u64, storing the result into u64

View Source
var CompareAndSwapUint128 func(u128 *uint64, old, new [2]uint64) bool = cas128

CompareAndSwapUint128 atomically compares u128 with old, and if they're equal, swaps value of u128 with new. It reports whether the swap ran.

View Source
var (
	// ErrTemporaryUnavailable is the error used for Enqueue operations
	// on a fulled queue or Dequeue operations on an empty queue
	ErrTemporaryUnavailable = errors.New("temporary unavailable")
)
View Source
var OrUint64 func(u64 *uint64, val uint64) = or64

OrUint64 takes val and atomically performs a bit-wise "or" operation with the value of u64, storing the result into u64

Functions

func DequeueWait

func DequeueWait[T any](c Consumer[T]) (elem *T, err error)

DequeueWait pops items from fifo queue. the operation will block until a success or error occurred

func DoubleUint64

func DoubleUint64(first, second uint64) (u128 []uint64)

DoubleUint64 creates and returns []uint64 with size 2 and the given values the address of u128 will be 16-bytes aligned

func DoubleUintPtr

func DoubleUintPtr(first, second uintptr) (dw []uintptr)

DoubleUintPtr creates and returns []uintptr with size 2 and the given values the address of dw will be 16-bytes aligned

func EnqueueWait

func EnqueueWait[T any](p Producer[T], elem *T) error

EnqueueWait pushes the given item to a fifo queue. the operation will block until a success or error occurred

func NewMPMCQueue

func NewMPMCQueue[T any](capacity int) (Consumer[T], Producer[T])

NewMPMCQueue creates a new multiple producers multiple consumers FIFO queue with the given capacity

func NewMPMCQueueIndirect

func NewMPMCQueueIndirect(capacity int) (ConsumerIndirect, ProducerIndirect)

NewMPMCQueueIndirect creates a new multiple producers multiple consumers FIFO queue with the given capacity

func NewMPSCQueue

func NewMPSCQueue[T any](capacity int) (Consumer[T], Producer[T])

NewMPSCQueue creates a new multiple producers single consumer FIFO queue with the given capacity

func NewQueue

func NewQueue[T any](capacity int, opts ...func(opts *QueueOptions)) (Consumer[T], Producer[T])

NewQueue creates a new queue with the given capacity and options.

func NewSPMCQueue

func NewSPMCQueue[T any](capacity int) (Consumer[T], Producer[T])

NewSPMCQueue creates a new single producer multiple consumers FIFO queue with the given capacity

func NewSPSCQueue

func NewSPSCQueue[T any](capacity int) (Consumer[T], Producer[T])

NewSPSCQueue creates a new simple producer single consumer FIFO queue with the given capacity

func Pause

func Pause(cycles ...int)

Pause executes CPU pause instructions to reduce energy consumption in spin-wait loops.

Defaults to 20 cycles if not specified. Uses optimized assembly on amd64/arm64.

Usage:

Pause()     // 20 cycles (default)
Pause(1)    // 1 cycle
Pause(40)   // 40 cycles

func SetYieldDuration

func SetYieldDuration(d time.Duration)

SetYieldDuration sets the base duration unit for Yield(). Recommended: 50-250 microseconds for real-time systems, 1-4 ms for general workloads.

func Yield

func Yield(lv ...int)

Yield yields the processor by sleeping for a duration based on the backoff level lv (default: 1). Higher levels sleep longer with quadratic scaling: Yield(1), Yield(2)=4x, Yield(3)=9x, etc. For automatic adaptive backoff in tight loops, use SpinWait instead.

Types

type Closer

type Closer interface {
	// Close closes the queue.
	// Enqueue and Dequeue operations on a closed queue are undefined
	Close() error
}

Closer is the interface that wraps the Close method

type Consumer

type Consumer[T any] interface {
	// Dequeue pops items from the FIFO queue.
	// if the queue is empty, ErrTemporaryUnavailable will be returned
	Dequeue() (elem *T, err error)
}

Consumer is the interface that wraps the Dequeue method

type ConsumerIndirect

type ConsumerIndirect interface {
	Dequeue() (elem uintptr, err error)
}

ConsumerIndirect is a non-generic consumer interface that dequeues uintptr values. Use this for indirect references such as indices, handles, or other integer-like identifiers.

type MPMCQueue

type MPMCQueue[T any] struct {
	// contains filtered or unexported fields
}

MPMCQueue represents multiple producers multiple consumers FIFO queue

func (*MPMCQueue[T]) Dequeue

func (q *MPMCQueue[T]) Dequeue() (elem *T, err error)

Dequeue pops items from FIFO queue

func (*MPMCQueue[T]) Enqueue

func (q *MPMCQueue[T]) Enqueue(elem *T) error

Enqueue pushes the given item to a FIFO queue

type MPMCQueueIndirect

type MPMCQueueIndirect struct {
	// contains filtered or unexported fields
}

MPMCQueueIndirect represents multiple producers multiple consumers FIFO queue with indirect references

func (*MPMCQueueIndirect) Dequeue

func (q *MPMCQueueIndirect) Dequeue() (elem uintptr, err error)

func (*MPMCQueueIndirect) Enqueue

func (q *MPMCQueueIndirect) Enqueue(elem uintptr) error

type MPSCQueue

type MPSCQueue[T any] struct{}

MPSCQueue represents multiple producers single consumer FIFO queue

type Producer

type Producer[T any] interface {
	// Enqueue pushes item to FIFO queue.
	// if the queue is fulled, ErrTemporaryUnavailable will be returned
	Enqueue(elem *T) error
}

Producer is the interface that wraps the Enqueue method

type ProducerIndirect

type ProducerIndirect interface {
	Enqueue(elem uintptr) error
}

ProducerIndirect is a non-generic producer interface that enqueues uintptr values. Use this for indirect references such as indices, handles, or other integer-like identifiers.

type QueueOptions

type QueueOptions struct {
	SingleProducer bool // TODO: implement
	SingleConsumer bool // TODO: implement
	LowContention  bool // TODO: implement
	DistinctValues bool
}

QueueOptions is a struct that contains options for creating a queue.

type SPMCQueue

type SPMCQueue[T any] struct{}

SPMCQueue represents single producer multiple consumers FIFO queue

type SPSCQueue

type SPSCQueue[T any] struct{}

SPSCQueue represents simple producer single consumer FIFO queue

type SpinLock

type SpinLock struct {
	// contains filtered or unexported fields
}

func (*SpinLock) Lock

func (sl *SpinLock) Lock()

func (*SpinLock) Unlock

func (sl *SpinLock) Unlock()

type SpinWait

type SpinWait struct {
	// contains filtered or unexported fields
}

SpinWait is a lightweight synchronization type that you can use in low-level scenarios with lower cost. The zero value for SpinWait is ready to use.

func (*SpinWait) Once

func (s *SpinWait) Once()

Once performs a single spin

func (*SpinWait) Reset

func (s *SpinWait) Reset()

Reset resets the counter in SpinWait

func (*SpinWait) WillYield

func (s *SpinWait) WillYield() bool

WillYield returns true if calling SpinOnce() will result in occurring a thread sleeping instead of a simply procyield()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL