Documentation
¶
Overview ¶
Package fair implements a Hierarchical Fair Queue (HFQ), providing balanced service across a hierarchy of queues.
Example ¶
package main
import (
"fmt"
"github.com/grafana/loki/v3/pkg/engine/internal/util/queue/fair"
)
func main() {
// task is a basic representation of a task to run.
type task struct{}
var q fair.Queue[task]
// Define scopes for us to add tasks to.
_ = q.RegisterScope(fair.Scope{"tenant-a", "user-a"})
_ = q.RegisterScope(fair.Scope{"tenant-a", "user-b"})
_ = q.RegisterScope(fair.Scope{"tenant-b", "user-c"})
_ = q.RegisterScope(fair.Scope{"tenant-c", "user-d"})
// Push some tasks into the queue.
_ = q.Push(fair.Scope{"tenant-a", "user-a"}, task{}) // task-1
_ = q.Push(fair.Scope{"tenant-a", "user-a"}, task{}) // task-2
_ = q.Push(fair.Scope{"tenant-a", "user-b"}, task{}) // task-3
_ = q.Push(fair.Scope{"tenant-b", "user-c"}, task{}) // task-4
_ = q.Push(fair.Scope{"tenant-b", "user-c"}, task{}) // task-5
_ = q.Push(fair.Scope{"tenant-c", "user-d"}, task{}) // task-6
// Fairness is achieved by lowering the priority along the entire path of a
// task that was just executed.
//
// That means, for example, a task for tenant-a/user-a lowers the priority
// of all tasks for both tenant-a and tenant-a/user-a. However, this does
// not directly touch the priority of tenant-a/user-b; so the next time a
// tenant-a task is selected, it will prioritize tenant-a/user-b over
// tenant-a/user-a.
for {
_, scope, _ := q.Pop()
if scope == nil {
break
}
fmt.Println(scope)
_ = q.AdjustScope(scope, 1) // Maintain fairness
}
}
Output: tenant-a/user-a tenant-b/user-c tenant-c/user-d tenant-a/user-b tenant-b/user-c tenant-a/user-a
Index ¶
- Variables
- type Position
- type Queue
- func (q *Queue[T]) AdjustScope(scope Scope, cost int64) error
- func (q *Queue[T]) Len() int
- func (q *Queue[T]) Peek() (T, Scope)
- func (q *Queue[T]) Pop() (T, Scope, Position)
- func (q *Queue[T]) Push(scope Scope, value T) error
- func (q *Queue[T]) RegisterScope(scope Scope) error
- func (q *Queue[T]) Requeue(scope Scope, value T, pos Position) error
- func (q *Queue[T]) UnregisterScope(scope Scope) error
- type Scope
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrEmptyScope is returned when an empty scope name is provided. ErrEmptyScope = errors.New("missing scope name") // ErrScopeExists is returned by [Queue.RegisterScope] when a scope is already // registered. ErrScopeExists = errors.New("scope already exists") // ErrNotFound is returned when a scope or value is not found. ErrNotFound = errors.New("not found") )
Functions ¶
This section is empty.
Types ¶
type Position ¶
type Position struct {
// contains filtered or unexported fields
}
Position is an opaque token representing a value's original queue position. It is returned by Queue.Pop and can be passed to Queue.Requeue to restore the value at its original position.
type Queue ¶
type Queue[T any] struct { // contains filtered or unexported fields }
Queue is a hierarchical fair priority queue. It is composed of a tree of [Scope]s, where each scope holds other scopes or enqueued items.
Scopes must be defined with Queue.RegisterScope before they can be used. When a scope is no longer needed, call Queue.UnregisterScope to remove it.
Scopes have a "rank" which determines their priority for Queue.Pop, with a lower rank indicating higher priority. When two ranks are equal, priority is given to the scope or value added first.
Call Queue.AdjustScope on a scope to adjust its rank after consuming a value. The rank adjustment propagates up the tree to the root scope, giving priority to other scopes, ensuring fairness across the hierarchy.
The zero value for Queue is ready for use.
func (*Queue[T]) AdjustScope ¶
AdjustScope traverses scope, modifying the rank of each layer by the given cost.
cost can be positive (to decrease the priority of scopes) or negative (to increase the priority).
AdjustScope returns an error if the scope does not exist.
func (*Queue[T]) Peek ¶
Peek returns the next value with the highest priority along with its scope. The value is not removed from the queue.
See Queue.Pop for information on how the next highest priority is determined.
If q is empty, Peek returns the zero value for T and a nil Scope.
func (*Queue[T]) Pop ¶
Pop removes and returns the next value with the highest priority along with its scope and position.
Priority is determined by traversing the scope tree, selecting the scope with the lowest rank at each traversal, followed by the lowest rank of the children of the selected scope.
Popping a value does not adjust the rank of the scope tree. To maintain fairness, call Queue.AdjustScope with the returned scope to adjust its priority based on some cost of the returned value.
The returned Position can be passed to Queue.Requeue to restore the value at its original position if the assignment fails.
If q is empty, Pop returns the zero value for T, a nil Scope, and a zero Position.
func (*Queue[T]) Push ¶
Push enqueues the value at the specified scope.
The value is assigned the minimum rank among its siblings to prevent it from jumping the line. If no siblings exist, the rank is set to 0.
Push returns ErrNotFound if the scope does not exist.
func (*Queue[T]) RegisterScope ¶
RegisterScope registers a new scope, automatically registering intermediate scopes if they do not yet exist.
RegisterScope returns an error if the full scope is already registered.
func (*Queue[T]) Requeue ¶
Requeue re-inserts a value at its original position within the scope. The pos argument must have been obtained from a previous call to Queue.Pop.
This is used to re-insert a value that was popped but needs to be retried. The value is restored at its original priority, preserving ordering relative to other values in the same scope.
Requeue returns ErrNotFound if the scope does not exist.
func (*Queue[T]) UnregisterScope ¶
UnregisterScope unregisters the provided scope. If the scope was the only child of its parent, the parent is also unregistered (recursively up to the root scope).
UnregisterScope returns ErrNotFound if the scope does not exist.