 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Overview ¶
Package workqueue provides a simple queue that supports the following features:
- Fair: items processed in the order in which they are added.
- Stingy: a single item will not be processed multiple times concurrently, and if an item is added multiple times before it can be processed, it will only be processed once.
- Multiple consumers and producers. In particular, it is allowed for an item to be reenqueued while it is being processed.
- Shutdown notifications.
Index ¶
- func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc)
- func SetProvider(metricsProvider MetricsProvider)
- type BucketRateLimiter
- type CounterMetric
- type DelayingInterface
- type DoWorkPieceFunc
- type GaugeMetric
- type Interface
- type ItemExponentialFailureRateLimiter
- type ItemFastSlowRateLimiter
- type MaxOfRateLimiter
- type MetricsProvider
- type RateLimiter
- func DefaultControllerRateLimiter() RateLimiter
- func DefaultItemBasedRateLimiter() RateLimiter
- func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter
- func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter
- func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter
 
- type RateLimitingInterface
- type SummaryMetric
- type TimedWorkQueue
- type TimedWorkQueueItem
- type Type
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Parallelize ¶ added in v1.3.0
func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc)
Parallelize is a very simple framework that allow for parallelizing N independent pieces of work.
func SetProvider ¶ added in v1.5.0
func SetProvider(metricsProvider MetricsProvider)
SetProvider sets the metrics provider of the metricsFactory.
Types ¶
type BucketRateLimiter ¶ added in v1.3.0
BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
func (*BucketRateLimiter) Forget ¶ added in v1.3.0
func (r *BucketRateLimiter) Forget(item interface{})
func (*BucketRateLimiter) NumRequeues ¶ added in v1.3.0
func (r *BucketRateLimiter) NumRequeues(item interface{}) int
func (*BucketRateLimiter) When ¶ added in v1.3.0
func (r *BucketRateLimiter) When(item interface{}) time.Duration
type CounterMetric ¶ added in v1.5.0
type CounterMetric interface {
	Inc()
}
    CounterMetric represents a single numerical value that only ever goes up.
type DelayingInterface ¶ added in v1.3.0
type DelayingInterface interface {
	Interface
	// AddAfter adds an item to the workqueue after the indicated duration has passed
	AddAfter(item interface{}, duration time.Duration)
}
    DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.
func NewDelayingQueue ¶ added in v1.3.0
func NewDelayingQueue() DelayingInterface
NewDelayingQueue constructs a new workqueue with delayed queuing ability
func NewNamedDelayingQueue ¶ added in v1.4.1
func NewNamedDelayingQueue(name string) DelayingInterface
type DoWorkPieceFunc ¶ added in v1.3.0
type DoWorkPieceFunc func(piece int)
type GaugeMetric ¶ added in v1.5.0
type GaugeMetric interface {
	Inc()
	Dec()
}
    GaugeMetric represents a single numerical value that can arbitrarily go up and down.
type ItemExponentialFailureRateLimiter ¶ added in v1.3.0
type ItemExponentialFailureRateLimiter struct {
	// contains filtered or unexported fields
}
    ItemExponentialFailureRateLimiter does a simple baseDelay*10^<num-failures> limit dealing with max failures and expiration are up to the caller
func (*ItemExponentialFailureRateLimiter) Forget ¶ added in v1.3.0
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{})
func (*ItemExponentialFailureRateLimiter) NumRequeues ¶ added in v1.3.0
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int
func (*ItemExponentialFailureRateLimiter) When ¶ added in v1.3.0
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration
type ItemFastSlowRateLimiter ¶ added in v1.3.0
type ItemFastSlowRateLimiter struct {
	// contains filtered or unexported fields
}
    ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
func (*ItemFastSlowRateLimiter) Forget ¶ added in v1.3.0
func (r *ItemFastSlowRateLimiter) Forget(item interface{})
func (*ItemFastSlowRateLimiter) NumRequeues ¶ added in v1.3.0
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int
func (*ItemFastSlowRateLimiter) When ¶ added in v1.3.0
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration
type MaxOfRateLimiter ¶ added in v1.3.0
type MaxOfRateLimiter struct {
	// contains filtered or unexported fields
}
    MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items were separately delayed a longer time.
func (*MaxOfRateLimiter) Forget ¶ added in v1.3.0
func (r *MaxOfRateLimiter) Forget(item interface{})
func (*MaxOfRateLimiter) NumRequeues ¶ added in v1.3.0
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int
func (*MaxOfRateLimiter) When ¶ added in v1.3.0
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration
type MetricsProvider ¶ added in v1.5.0
type MetricsProvider interface {
	NewDepthMetric(name string) GaugeMetric
	NewAddsMetric(name string) CounterMetric
	NewLatencyMetric(name string) SummaryMetric
	NewWorkDurationMetric(name string) SummaryMetric
	NewRetriesMetric(name string) CounterMetric
}
    MetricsProvider generates various metrics used by the queue.
type RateLimiter ¶ added in v1.3.0
type RateLimiter interface {
	// When gets an item and gets to decide how long that item should wait
	When(item interface{}) time.Duration
	// Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
	// or for success, we'll stop tracking it
	Forget(item interface{})
	// NumRequeues returns back how many failures the item has had
	NumRequeues(item interface{}) int
}
    func DefaultControllerRateLimiter ¶ added in v1.3.0
func DefaultControllerRateLimiter() RateLimiter
DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has both overall and per-item rate limitting. The overall is a token bucket and the per-item is exponential
func DefaultItemBasedRateLimiter ¶ added in v1.3.0
func DefaultItemBasedRateLimiter() RateLimiter
func NewItemExponentialFailureRateLimiter ¶ added in v1.3.0
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter
func NewItemFastSlowRateLimiter ¶ added in v1.3.0
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter
func NewMaxOfRateLimiter ¶ added in v1.3.0
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter
type RateLimitingInterface ¶ added in v1.3.0
type RateLimitingInterface interface {
	DelayingInterface
	// AddRateLimited adds an item to the workqueue after the rate limiter says its ok
	AddRateLimited(item interface{})
	// Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
	// still have to call `Done` on the queue.
	Forget(item interface{})
	// NumRequeues returns back how many times the item was requeued
	NumRequeues(item interface{}) int
}
    RateLimitingInterface is an interface that rate limits items being added to the queue.
func NewNamedRateLimitingQueue ¶ added in v1.4.1
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface
func NewRateLimitingQueue ¶ added in v1.3.0
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface
NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability Remember to call Forget! If you don't, you may end up tracking failures forever.
type SummaryMetric ¶ added in v1.5.0
type SummaryMetric interface {
	Observe(float64)
}
    SummaryMetric captures individual observations.
type TimedWorkQueue ¶ added in v1.4.1
type TimedWorkQueue struct {
	*Type
}
    func NewTimedWorkQueue ¶ added in v1.4.1
func NewTimedWorkQueue() *TimedWorkQueue
func (TimedWorkQueue) Add ¶ added in v1.4.1
func (q TimedWorkQueue) Add(timedItem *TimedWorkQueueItem)
Add adds the obj along with the current timestamp to the queue.
func (TimedWorkQueue) Done ¶ added in v1.4.1
func (q TimedWorkQueue) Done(timedItem *TimedWorkQueueItem) error
func (TimedWorkQueue) Get ¶ added in v1.4.1
func (q TimedWorkQueue) Get() (timedItem *TimedWorkQueueItem, shutdown bool)
Get gets the obj along with its timestamp from the queue.
type TimedWorkQueueItem ¶ added in v1.4.1
type Type ¶
type Type struct {
	// contains filtered or unexported fields
}
    Type is a work queue (see the package comment).
func (*Type) Done ¶
func (q *Type) Done(item interface{})
Done marks item as done processing, and if it has been marked as dirty again while it was being processed, it will be re-added to the queue for re-processing.
func (*Type) Get ¶
Get blocks until it can return an item to be processed. If shutdown = true, the caller should end their goroutine. You must call Done with item when you have finished processing it.
func (*Type) Len ¶ added in v0.17.0
Len returns the current queue length, for informational purposes only. You shouldn't e.g. gate a call to Add() or Get() on Len() being a particular value, that can't be synchronized properly.
func (*Type) ShutDown ¶
func (q *Type) ShutDown()
ShutDown will cause q to ignore all new items added to it. As soon as the worker goroutines have drained the existing items in the queue, they will be instructed to exit.