Documentation
¶
Index ¶
- Variables
- type BackpressureQueue
- func (q *BackpressureQueue[T]) Close()
- func (q *BackpressureQueue[T]) Dequeue(ctx context.Context) fn.Result[T]
- func (q *BackpressureQueue[T]) Enqueue(ctx context.Context, item T) error
- func (q *BackpressureQueue[T]) Len() int
- func (q *BackpressureQueue[T]) ReceiveChan() <-chan T
- func (q *BackpressureQueue[T]) TryEnqueue(item T) bool
- type CircularBuffer
- type ConcurrentQueue
- type DropCheckFunc
- type DropPredicate
- type GCQueue
- type PriorityQueue
- type PriorityQueueItem
- type REDOption
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidThresholdOrder = errors.New(
"queue: maxThreshold must be > minThreshold",
)
ErrInvalidThresholdOrder is returned by RandomEarlyDrop when maxThreshold is not strictly greater than minThreshold.
var ErrItemDropped = errors.New("item dropped by drop predicate")
ErrItemDropped is returned by Enqueue when the item is dropped by the DropPredicate. This can happen before the queue is actually full (e.g. with RED-style early drops).
var ErrNegativeMinThreshold = errors.New(
"queue: minThreshold must be >= 0",
)
ErrNegativeMinThreshold is returned by RandomEarlyDrop when minThreshold is negative.
var ErrQueueClosed = errors.New("queue closed")
ErrQueueClosed is returned by Enqueue/TryEnqueue when the queue has already been closed.
Functions ¶
This section is empty.
Types ¶
type BackpressureQueue ¶ added in v1.2.0
type BackpressureQueue[T any] struct { // contains filtered or unexported fields }
BackpressureQueue is a generic, fixed-capacity queue with predicate-based drop behavior. When full, it uses the DropPredicate to perform early drops (e.g., RED-style).
func NewBackpressureQueue ¶ added in v1.2.0
func NewBackpressureQueue[T any](capacity int, predicate DropPredicate[T]) *BackpressureQueue[T]
NewBackpressureQueue creates a new BackpressureQueue with the given capacity and drop predicate. Panics if capacity <= 0 or predicate is nil.
func (*BackpressureQueue[T]) Close ¶ added in v1.2.0
func (q *BackpressureQueue[T]) Close()
Close closes the internal channel. It is safe to call multiple times; only the first call has any effect. After Close, no more items can be enqueued. Remaining items can still be received via ReceiveChan.
func (*BackpressureQueue[T]) Dequeue ¶ added in v1.2.0
func (q *BackpressureQueue[T]) Dequeue(ctx context.Context) fn.Result[T]
Dequeue retrieves the next item from the queue, blocking until available or context done. Returns the item or an error if ctx is done before an item is available.
func (*BackpressureQueue[T]) Enqueue ¶ added in v1.2.0
func (q *BackpressureQueue[T]) Enqueue(ctx context.Context, item T) error
Enqueue attempts to add an item to the queue, respecting context cancellation. Returns ErrItemDropped if dropped, or context error if ctx is done before enqueue. Otherwise, `nil` is returned on success.
func (*BackpressureQueue[T]) Len ¶ added in v1.2.0
func (q *BackpressureQueue[T]) Len() int
Len returns the current number of items buffered in the queue.
func (*BackpressureQueue[T]) ReceiveChan ¶ added in v1.2.0
func (q *BackpressureQueue[T]) ReceiveChan() <-chan T
ReceiveChan returns the receive-only end of the internal channel, allowing callers to select on it alongside other channels (e.g., context.Done).
func (*BackpressureQueue[T]) TryEnqueue ¶ added in v1.2.0
func (q *BackpressureQueue[T]) TryEnqueue(item T) bool
TryEnqueue attempts to add an item to the queue without blocking. Returns true if successfully enqueued, false if the drop predicate rejected the item or the queue is at capacity.
type CircularBuffer ¶ added in v1.0.3
type CircularBuffer struct {
// contains filtered or unexported fields
}
CircularBuffer is a buffer which retains a set of values in memory, and overwrites the oldest item in the buffer when a new item needs to be written.
func NewCircularBuffer ¶ added in v1.0.3
func NewCircularBuffer(size int) (*CircularBuffer, error)
NewCircularBuffer returns a new circular buffer with the size provided. It will fail if a zero or negative size parameter is provided.
func (*CircularBuffer) Add ¶ added in v1.0.3
func (c *CircularBuffer) Add(item interface{})
Add adds an item to the buffer, overwriting the oldest item if the buffer is full.
func (*CircularBuffer) Latest ¶ added in v1.0.3
func (c *CircularBuffer) Latest() interface{}
Latest returns the item that was most recently added to the buffer.
func (*CircularBuffer) List ¶ added in v1.0.3
func (c *CircularBuffer) List() []interface{}
List returns a copy of the items in the buffer ordered from the oldest to newest item.
func (*CircularBuffer) Total ¶ added in v1.0.3
func (c *CircularBuffer) Total() int
Total returns the total number of items that have been added to the buffer.
type ConcurrentQueue ¶
type ConcurrentQueue struct {
// contains filtered or unexported fields
}
ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity. Clients interact with the queue by pushing items into the in channel and popping items from the out channel. There is a goroutine that manages moving items from the in channel to the out channel in the correct order that must be started by calling Start().
func NewConcurrentQueue ¶
func NewConcurrentQueue(bufferSize int) *ConcurrentQueue
NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is the capacity of the output channel. When the size of the queue is below this threshold, pushes do not incur the overhead of the less efficient overflow structure.
func (*ConcurrentQueue) ChanIn ¶
func (cq *ConcurrentQueue) ChanIn() chan<- interface{}
ChanIn returns a channel that can be used to push new items into the queue.
func (*ConcurrentQueue) ChanOut ¶
func (cq *ConcurrentQueue) ChanOut() <-chan interface{}
ChanOut returns a channel that can be used to pop items from the queue.
func (*ConcurrentQueue) Start ¶
func (cq *ConcurrentQueue) Start()
Start begins a goroutine that manages moving items from the in channel to the out channel. The queue tries to move items directly to the out channel minimize overhead, but if the out channel is full it pushes items to an overflow queue. This must be called before using the queue.
func (*ConcurrentQueue) Stop ¶
func (cq *ConcurrentQueue) Stop()
Stop ends the goroutine that moves items from the in channel to the out channel. This does not clear the queue state, so the queue can be restarted without dropping items.
type DropCheckFunc ¶ added in v1.2.0
DropCheckFunc decides whether to drop an item based solely on the current queue depth. This is the natural return type for length-only strategies such as RandomEarlyDrop.
func RandomEarlyDrop ¶ added in v1.2.0
func RandomEarlyDrop(minThreshold, maxThreshold int, opts ...REDOption) (DropCheckFunc, error)
RandomEarlyDrop returns a DropCheckFunc that implements Random Early Detection (RED), inspired by TCP-RED queue management.
RED prevents sudden buffer overflows by proactively dropping packets before the queue is full. It establishes two thresholds:
- minThreshold: queue length below which no drops occur.
- maxThreshold: queue length at or above which all items are dropped.
Between these points, the drop probability p increases linearly:
p = (queueLen - minThreshold) / (maxThreshold - minThreshold)
For example, with minThreshold=15 and maxThreshold=35:
- At queueLen=15, p=0.0 (0% drop chance)
- At queueLen=25, p=0.5 (50% drop chance)
- At queueLen=35, p=1.0 (100% drop chance)
This smooth ramp helps avoid tail-drop spikes, smooths queue occupancy, and gives early back-pressure signals to senders.
type DropPredicate ¶ added in v1.2.0
DropPredicate decides whether to drop an item based on the current queue depth and the item itself. It returns true to drop, false to enqueue. Use this when the drop decision depends on the item itself; for length-only checks prefer DropCheckFunc.
func AsDropPredicate ¶ added in v1.2.0
func AsDropPredicate[T any](f DropCheckFunc) DropPredicate[T]
AsDropPredicate adapts a length-only DropCheckFunc into a DropPredicate[T], ignoring the item.
type GCQueue ¶
type GCQueue struct {
// contains filtered or unexported fields
}
GCQueue is garbage collecting queue, which dynamically grows and contracts based on load. If the queue has items which have been returned, the queue will check every gcInterval amount of time to see if any elements are eligible to be released back to the runtime. Elements that have been in the queue for a duration of least expiryInterval will be released upon the next iteration of the garbage collection, thus the maximum amount of time an element remain in the queue is expiryInterval+gcInterval. The gc ticker will be disabled after all items in the queue have been taken or released to ensure that the GCQueue becomes quiescent, and imposes minimal overhead in the steady state.
func NewGCQueue ¶
func NewGCQueue(newItem func() interface{}, returnQueueSize int,
gcInterval, expiryInterval time.Duration) *GCQueue
NewGCQueue creates a new garbage collecting queue, which dynamically grows and contracts based on load. If the queue has items which have been returned, the queue will check every gcInterval amount of time to see if any elements are eligible to be released back to the runtime. Elements that have been in the queue for a duration of least expiryInterval will be released upon the next iteration of the garbage collection, thus the maximum amount of time an element remain in the queue is expiryInterval+gcInterval. The gc ticker will be disabled after all items in the queue have been taken or released to ensure that the GCQueue becomes quiescent, and imposes minimal overhead in the steady state. The returnQueueSize parameter is used to size the maximal number of items that can be returned without being dropped during large bursts in attempts to return items to the GCQUeue.
type PriorityQueue ¶ added in v1.0.2
type PriorityQueue struct {
// contains filtered or unexported fields
}
PriorityQueue wraps a standard heap into a self contained class.
func (*PriorityQueue) Empty ¶ added in v1.0.2
func (pq *PriorityQueue) Empty() bool
Empty returns true if the queue is empty.
func (*PriorityQueue) Len ¶ added in v1.0.2
func (pq *PriorityQueue) Len() int
Len returns the length of the queue.
func (*PriorityQueue) Pop ¶ added in v1.0.2
func (pq *PriorityQueue) Pop() PriorityQueueItem
Pop removes the top most item from the queue.
func (*PriorityQueue) Push ¶ added in v1.0.2
func (pq *PriorityQueue) Push(item PriorityQueueItem)
Push adds an item to the priority queue.
func (*PriorityQueue) Top ¶ added in v1.0.2
func (pq *PriorityQueue) Top() PriorityQueueItem
Top returns the top most item from the queue without removing it.
type PriorityQueueItem ¶ added in v1.0.2
type PriorityQueueItem interface {
// Less must return true if this item is ordered before other and false
// otherwise.
Less(other PriorityQueueItem) bool
}
PriorityQueueItem is an interface that represents items in a PriorityQueue. Users of PriorityQueue will need to define a Less function such that PriorityQueue will be able to use that to build and restore an underlying heap.
type REDOption ¶ added in v1.2.0
type REDOption func(*redConfig)
REDOption is a functional option for configuring RandomEarlyDrop.
func WithRandSource ¶ added in v1.2.0
WithRandSource provides a custom random number source (a function that returns a float64 between 0.0 and 1.0).