queue

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: Apache-2.0 Imports: 9 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddCountsRoute

func AddCountsRoute(lggr logr.Logger, mux *http.ServeMux, q CountReader)

Types

type Count added in v0.8.0

type Count struct {
	Concurrency  int   `json:"Concurrency"`
	RequestCount int64 `json:"RequestCount"`
}

Count is a snapshot of the HTTP pending request concurrency and the raw monotonic request counter, as reported by an interceptor pod.

type CountReader

type CountReader interface {
	// Current returns the current counts of pending requests.
	Current() (Counts, error)
}

CountReader represents the size of a virtual HTTP queue, possibly distributed across multiple HTTP server processes. It only can access the current size of the queue, not any other information about requests.

It is concurrency safe.

type Counter

type Counter interface {
	CountReader
	// Increase increases the queue size by delta for the given host.
	Increase(host string, delta int) error
	// Decrease decreases the queue size by delta for the given host.
	Decrease(host string, delta int) error
	// EnsureKey ensures that host is represented in this counter.
	EnsureKey(host string)
	// RemoveKey tries to remove the given host and its
	// associated counts from the queue. returns true if it existed,
	// false otherwise.
	RemoveKey(host string) bool
}

Counter represents a virtual HTTP queue, possibly distributed across multiple HTTP server processes. It can only increase or decrease the size of the queue or read the current size of the queue, but not read or modify any other information about it.

Both the mutation and read functionality is concurrency safe, but the read functionality is point-in-time only.

type Counts

type Counts map[string]Count

Counts is a snapshot of the HTTP pending request counts for each host.

func GetCounts

func GetCounts(
	httpCl *http.Client,
	interceptorURL url.URL,
) (Counts, error)

GetQueueCounts issues an RPC call to get the queue counts from the given hostAndPort. Note that the hostAndPort should not end with a "/" and shouldn't include a path.

type FakeCountReader

type FakeCountReader struct {
	// contains filtered or unexported fields
}

func (*FakeCountReader) Current

func (f *FakeCountReader) Current() (Counts, error)

type FakeCounter

type FakeCounter struct {
	RetMap        Counts
	ResizedCh     chan HostAndCount
	ResizeTimeout time.Duration
	// contains filtered or unexported fields
}

func NewFakeCounter

func NewFakeCounter() *FakeCounter

NewFakeCounter creates a FakeCounter with an unbuffered channel. Use this when tests need to synchronize on queue events.

func NewFakeCounterBuffered added in v0.12.2

func NewFakeCounterBuffered() *FakeCounter

NewFakeCounterBuffered creates a FakeCounter with a buffered channel. Use this when tests don't need to synchronize on queue events. The buffered channel prevents blocking on Increase/Decrease calls.

func (*FakeCounter) Current

func (f *FakeCounter) Current() (Counts, error)

func (*FakeCounter) Decrease added in v0.8.0

func (f *FakeCounter) Decrease(host string, i int) error

func (*FakeCounter) EnsureKey added in v0.8.0

func (f *FakeCounter) EnsureKey(host string)

func (*FakeCounter) Increase added in v0.8.0

func (f *FakeCounter) Increase(host string, i int) error

func (*FakeCounter) RemoveKey added in v0.8.0

func (f *FakeCounter) RemoveKey(host string) bool

type HostAndCount

type HostAndCount struct {
	Host  string
	Count int
}

type Memory

type Memory struct {
	// contains filtered or unexported fields
}

Memory is a Counter implementation that holds the HTTP queue in memory only. Always use NewMemory to create one of these.

Hot-path guarantees:

  • Increase: one sync.Map load + two atomic ops (no global lock).
  • Decrease: one sync.Map load + one atomic CAS (no global lock).

func NewMemory

func NewMemory() *Memory

NewMemory creates a new empty in-memory queue

func (*Memory) Current

func (r *Memory) Current() (Counts, error)

Current returns the current size of the queue.

func (*Memory) Decrease added in v0.8.0

func (r *Memory) Decrease(host string, delta int) error

Decrease atomically decrements the concurrency counter for host, clamped to zero.

func (*Memory) EnsureKey added in v0.8.0

func (r *Memory) EnsureKey(host string)

EnsureKey ensures that host is represented in this counter.

func (*Memory) Increase added in v0.8.0

func (r *Memory) Increase(host string, delta int) error

Increase atomically increments the concurrency counter and the monotonic request counter for host.

func (*Memory) RemoveKey added in v0.8.0

func (r *Memory) RemoveKey(host string) bool

RemoveKey tries to remove the given host and its associated counts from the queue. Returns true if it existed, false otherwise.

type RequestsBuckets added in v0.8.0

type RequestsBuckets struct {
	// contains filtered or unexported fields
}

RequestsBuckets keeps buckets that have been collected at a certain time.

func NewRequestsBuckets added in v0.8.0

func NewRequestsBuckets(window, granularity time.Duration) *RequestsBuckets

NewRequestsBuckets generates a new RequestsBuckets with the given granularity.

func (*RequestsBuckets) Granularity added in v0.14.0

func (t *RequestsBuckets) Granularity() time.Duration

Granularity returns the per-bucket time granularity.

func (*RequestsBuckets) IsEmpty added in v0.8.0

func (t *RequestsBuckets) IsEmpty(now time.Time) bool

IsEmpty returns true if no data has been recorded for the `window` period.

func (*RequestsBuckets) Record added in v0.8.0

func (t *RequestsBuckets) Record(now time.Time, value int)

Record adds a value with an associated time to the correct bucket. If this record would introduce a gap in the data, any intervening times between the last write and this one will be recorded as zero. If an entire window length has expired without data, the firstWrite time is reset, meaning the WindowAverage will be of a partial window until enough data is received to fill it again.

func (*RequestsBuckets) Window added in v0.14.0

func (t *RequestsBuckets) Window() time.Duration

Window returns the total time window of this ring buffer.

func (*RequestsBuckets) WindowAverage added in v0.8.0

func (t *RequestsBuckets) WindowAverage(now time.Time) float64

WindowAverage returns the average bucket value over the window.

If the first write was less than the window length ago, an average is returned over the partial window. For example, if firstWrite was 6 seconds ago, the average will be over these 6 seconds worth of buckets, even if the window is 60s. If a window passes with no data being received, the first write time is reset so this behaviour takes effect again.

Similarly, if we have not received recent data, the average is based on a partial window. For example, if the window is 60 seconds but we last received data 10 seconds ago, the window average will be the average over the first 50 seconds.

In other cases, for example if there are gaps in the data shorter than the window length, the missing data is assumed to be 0 and the average is over the whole window length inclusive of the missing data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL