workqueue

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package workqueue provides a simple queue that used to rate limit or retry processing of requests. This package learn from https://github.com/kubernetes/client-go/blob/master/util/workqueue, you can find more RateLimiter implementation in the original package. rate_limiters.go provides rate limiters implementation for workqueue.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BucketRateLimiter

type BucketRateLimiter[T comparable] struct {
	*rate.Limiter
}

BucketRateLimiter adapts a standard bucket to the RateLimiter API

func (*BucketRateLimiter[T]) Forget

func (r *BucketRateLimiter[T]) Forget(_ context.Context, _ T)

Forget is a no-op for the bucket rate limiter.

func (*BucketRateLimiter[T]) Retries

func (r *BucketRateLimiter[T]) Retries(_ context.Context, _ T) int

Retries returns 0 as the number of retries for the bucket rate limiter.

func (*BucketRateLimiter[T]) Take

func (r *BucketRateLimiter[T]) Take(_ context.Context, _ T) bool

Take gets an item and gets to decide whether it should run now or not,

func (*BucketRateLimiter[T]) When

func (r *BucketRateLimiter[T]) When(_ context.Context, _ T) time.Duration

When returns the delay for a reservation for a token from the bucket.

type ItemExponentialFailureRateLimiter

type ItemExponentialFailureRateLimiter[T comparable] struct {
	// contains filtered or unexported fields
}

ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit dealing with max failures and expiration are up to the caller

func (*ItemExponentialFailureRateLimiter[T]) Forget

func (r *ItemExponentialFailureRateLimiter[T]) Forget(_ context.Context, item T)

Forget removes an item from the failure map.

func (*ItemExponentialFailureRateLimiter[T]) Retries

func (r *ItemExponentialFailureRateLimiter[T]) Retries(_ context.Context, item T) int

Retries returns the number of times an item has been requeued.

func (*ItemExponentialFailureRateLimiter[T]) Take

func (r *ItemExponentialFailureRateLimiter[T]) Take(_ context.Context, item T) bool

Take gets an item and gets to decide whether it should run now or not,

func (*ItemExponentialFailureRateLimiter[T]) When

When calculates the delay for an item based on the exponential backoff algorithm.

type ItemFastSlowRateLimiter

type ItemFastSlowRateLimiter[T comparable] struct {
	// contains filtered or unexported fields
}

ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that

func (*ItemFastSlowRateLimiter[T]) Forget

func (r *ItemFastSlowRateLimiter[T]) Forget(_ context.Context, item T)

Forget removes an item from the failure map.

func (*ItemFastSlowRateLimiter[T]) Retries

func (r *ItemFastSlowRateLimiter[T]) Retries(_ context.Context, item T) int

Retries returns the number of times an item has been requeued.

func (*ItemFastSlowRateLimiter[T]) Take

func (r *ItemFastSlowRateLimiter[T]) Take(_ context.Context, item T) bool

Take gets an item and gets to decide whether it should run now or not,

func (*ItemFastSlowRateLimiter[T]) When

func (r *ItemFastSlowRateLimiter[T]) When(_ context.Context, item T) time.Duration

When calculates the delay for an item based on whether it has exceeded the maximum number of fast attempts.

type ItemRedisTokenRateLimiter

type ItemRedisTokenRateLimiter[T comparable] struct {
	// contains filtered or unexported fields
}

ItemRedisTokenRateLimiter is a rate limiter that uses a token bucket in redis to rate limit items

func NewItemRedisTokenRateLimiter

func NewItemRedisTokenRateLimiter[T comparable](rdb redis.UniversalClient, key string, r, b int, maxDelay time.Duration) *ItemRedisTokenRateLimiter[T]

NewItemRedisTokenRateLimiter creates a new ItemRedisTokenRateLimiter

func (*ItemRedisTokenRateLimiter[T]) Forget

func (r *ItemRedisTokenRateLimiter[T]) Forget(ctx context.Context, _ T)

Forget removes an item from the token bucket in redis

func (*ItemRedisTokenRateLimiter[T]) Retries

func (r *ItemRedisTokenRateLimiter[T]) Retries(_ context.Context, _ T) int

Retries returns 0 as the number of retries for the token bucket in redis

func (*ItemRedisTokenRateLimiter[T]) Take

func (r *ItemRedisTokenRateLimiter[T]) Take(ctx context.Context, item T) bool

Take gets an item and gets to decide whether it should run now or not

func (*ItemRedisTokenRateLimiter[T]) When

func (r *ItemRedisTokenRateLimiter[T]) When(ctx context.Context, item T) time.Duration

When calculates the delay for an item based on the token bucket in redis

type MaxOfRateLimiter

type MaxOfRateLimiter[T comparable] struct {
	// contains filtered or unexported fields
}

MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a token bucket limiter, the capacity could be apparently exceeded in cases where particular items were separately delayed a longer time.

func (*MaxOfRateLimiter[T]) Forget

func (r *MaxOfRateLimiter[T]) Forget(ctx context.Context, item T)

Forget calls the Forget method on all the limiters for an item.

func (*MaxOfRateLimiter[T]) Retries

func (r *MaxOfRateLimiter[T]) Retries(ctx context.Context, item T) int

Retries returns the maximum number of retries among all the limiters for an item.

func (*MaxOfRateLimiter[T]) Take

func (r *MaxOfRateLimiter[T]) Take(ctx context.Context, item T) bool

Take gets an item and gets to decide whether it should run now or not,

func (*MaxOfRateLimiter[T]) When

func (r *MaxOfRateLimiter[T]) When(ctx context.Context, item T) time.Duration

When calculates the maximum delay among all the limiters for an item.

type RateLimiter

type RateLimiter[T comparable] interface {
	// Take gets an item and gets to decide whether it should run now or not,
	// use this method if you intend to drop / skip events that exceed the rate.
	Take(ctx context.Context, item T) bool
	// When gets an item and gets to decide how long that item should wait,
	// use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
	When(ctx context.Context, item T) time.Duration
	// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
	// or for success, we'll stop tracking it
	Forget(ctx context.Context, item T)
	// Retries returns back how many failures the item has had
	Retries(ctx context.Context, item T) int
}

RateLimiter is an interface that knows how to limit the rate at which something is processed It provides methods to decide how long an item should wait, to stop tracking an item, and to get the number of failures an item has had.

func NewBucketRateLimiter

func NewBucketRateLimiter[T comparable](l *rate.Limiter) RateLimiter[T]

NewBucketRateLimiter creates a new BucketRateLimiter

func NewItemExponentialFailureRateLimiter

func NewItemExponentialFailureRateLimiter[T comparable](baseDelay, maxDelay time.Duration) RateLimiter[T]

NewItemExponentialFailureRateLimiter creates a new ItemExponentialFailureRateLimiter with the specified base and max delays.

func NewItemFastSlowRateLimiter

func NewItemFastSlowRateLimiter[T comparable](fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter[T]

NewItemFastSlowRateLimiter creates a new ItemFastSlowRateLimiter with the specified fast and slow delays and the maximum number of fast attempts.

func NewMaxOfRateLimiter

func NewMaxOfRateLimiter[T comparable](limiters ...RateLimiter[T]) RateLimiter[T]

NewMaxOfRateLimiter creates a new MaxOfRateLimiter with the specified limiters.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL