fair

package
v3.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: AGPL-3.0 Imports: 4 Imported by: 0

Documentation

Overview

Package fair implements a Hierarchical Fair Queue (HFQ), providing balanced service across a hierarchy of queues.

Example
package main

import (
	"fmt"

	"github.com/grafana/loki/v3/pkg/engine/internal/util/queue/fair"
)

func main() {
	// task is a basic representation of a task to run.
	type task struct{}

	var q fair.Queue[task]

	// Define scopes for us to add tasks to.
	_ = q.RegisterScope(fair.Scope{"tenant-a", "user-a"})
	_ = q.RegisterScope(fair.Scope{"tenant-a", "user-b"})
	_ = q.RegisterScope(fair.Scope{"tenant-b", "user-c"})
	_ = q.RegisterScope(fair.Scope{"tenant-c", "user-d"})

	// Push some tasks into the queue.
	_ = q.Push(fair.Scope{"tenant-a", "user-a"}, task{}) // task-1
	_ = q.Push(fair.Scope{"tenant-a", "user-a"}, task{}) // task-2
	_ = q.Push(fair.Scope{"tenant-a", "user-b"}, task{}) // task-3
	_ = q.Push(fair.Scope{"tenant-b", "user-c"}, task{}) // task-4
	_ = q.Push(fair.Scope{"tenant-b", "user-c"}, task{}) // task-5
	_ = q.Push(fair.Scope{"tenant-c", "user-d"}, task{}) // task-6

	// Fairness is achieved by lowering the priority along the entire path of a
	// task that was just executed.
	//
	// That means, for example, a task for tenant-a/user-a lowers the priority
	// of all tasks for both tenant-a and tenant-a/user-a. However, this does
	// not directly touch the priority of tenant-a/user-b; so the next time a
	// tenant-a task is selected, it will prioritize tenant-a/user-b over
	// tenant-a/user-a.
	for {
		_, scope, _ := q.Pop()
		if scope == nil {
			break
		}
		fmt.Println(scope)
		_ = q.AdjustScope(scope, 1) // Maintain fairness
	}

}
Output:
tenant-a/user-a
tenant-b/user-c
tenant-c/user-d
tenant-a/user-b
tenant-b/user-c
tenant-a/user-a

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrEmptyScope is returned when an empty scope name is provided.
	ErrEmptyScope = errors.New("missing scope name")

	// ErrScopeExists is returned by [Queue.RegisterScope] when a scope is already
	// registered.
	ErrScopeExists = errors.New("scope already exists")

	// ErrNotFound is returned when a scope or value is not found.
	ErrNotFound = errors.New("not found")
)

Functions

This section is empty.

Types

type Position

type Position struct {
	// contains filtered or unexported fields
}

Position is an opaque token representing a value's original queue position. It is returned by Queue.Pop and can be passed to Queue.Requeue to restore the value at its original position.

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

Queue is a hierarchical fair priority queue. It is composed of a tree of [Scope]s, where each scope holds other scopes or enqueued items.

Scopes must be defined with Queue.RegisterScope before they can be used. When a scope is no longer needed, call Queue.UnregisterScope to remove it.

Scopes have a "rank" which determines their priority for Queue.Pop, with a lower rank indicating higher priority. When two ranks are equal, priority is given to the scope or value added first.

Call Queue.AdjustScope on a scope to adjust its rank after consuming a value. The rank adjustment propagates up the tree to the root scope, giving priority to other scopes, ensuring fairness across the hierarchy.

The zero value for Queue is ready for use.

func (*Queue[T]) AdjustScope

func (q *Queue[T]) AdjustScope(scope Scope, cost int64) error

AdjustScope traverses scope, modifying the rank of each layer by the given cost.

cost can be positive (to decrease the priority of scopes) or negative (to increase the priority).

AdjustScope returns an error if the scope does not exist.

func (*Queue[T]) Len

func (q *Queue[T]) Len() int

Len returns the number of items in the queue.

func (*Queue[T]) Peek

func (q *Queue[T]) Peek() (T, Scope)

Peek returns the next value with the highest priority along with its scope. The value is not removed from the queue.

See Queue.Pop for information on how the next highest priority is determined.

If q is empty, Peek returns the zero value for T and a nil Scope.

func (*Queue[T]) Pop

func (q *Queue[T]) Pop() (T, Scope, Position)

Pop removes and returns the next value with the highest priority along with its scope and position.

Priority is determined by traversing the scope tree, selecting the scope with the lowest rank at each traversal, followed by the lowest rank of the children of the selected scope.

Popping a value does not adjust the rank of the scope tree. To maintain fairness, call Queue.AdjustScope with the returned scope to adjust its priority based on some cost of the returned value.

The returned Position can be passed to Queue.Requeue to restore the value at its original position if the assignment fails.

If q is empty, Pop returns the zero value for T, a nil Scope, and a zero Position.

func (*Queue[T]) Push

func (q *Queue[T]) Push(scope Scope, value T) error

Push enqueues the value at the specified scope.

The value is assigned the minimum rank among its siblings to prevent it from jumping the line. If no siblings exist, the rank is set to 0.

Push returns ErrNotFound if the scope does not exist.

func (*Queue[T]) RegisterScope

func (q *Queue[T]) RegisterScope(scope Scope) error

RegisterScope registers a new scope, automatically registering intermediate scopes if they do not yet exist.

RegisterScope returns an error if the full scope is already registered.

func (*Queue[T]) Requeue

func (q *Queue[T]) Requeue(scope Scope, value T, pos Position) error

Requeue re-inserts a value at its original position within the scope. The pos argument must have been obtained from a previous call to Queue.Pop.

This is used to re-insert a value that was popped but needs to be retried. The value is restored at its original priority, preserving ordering relative to other values in the same scope.

Requeue returns ErrNotFound if the scope does not exist.

func (*Queue[T]) UnregisterScope

func (q *Queue[T]) UnregisterScope(scope Scope) error

UnregisterScope unregisters the provided scope. If the scope was the only child of its parent, the parent is also unregistered (recursively up to the root scope).

UnregisterScope returns ErrNotFound if the scope does not exist.

type Scope

type Scope []string

A Scope denotes a named area within a Queue where items may be queued.

func (Scope) Name

func (s Scope) Name() string

Name returns the local name of the scope (last element).

func (Scope) String

func (s Scope) String() string

String returns the full name of the scope, with each element separated by a slash.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL