workqueue

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2026 License: MIT Imports: 1 Imported by: 0

Documentation

Overview

Package workqueue provides a rate-limited work queue modeled after k8s.io/client-go/util/workqueue. It uses Go generics for type safety.

The core pattern is the dirty/processing dual-set dedup mechanism from Kubernetes, which ensures that:

  • Each item is processed at most once at a time.
  • If an item is re-added while being processed, it will be re-queued after the current processing completes (via Done).
  • Duplicate Adds are coalesced (dirty set dedup).

Usage pattern (K8s controller style):

q := workqueue.New[string]()
defer q.ShutDown()

// Producer:
q.Add("my-key")

// Consumer (worker loop):
for {
    item, shutdown := q.Get()
    if shutdown { break }
    // process item...
    q.Done(item)
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Interface

type Interface[T comparable] interface {
	// Add marks an item as needing processing. If the item is not already
	// in the dirty set, it is added to the queue. If the item is currently
	// being processed, it is added to the dirty set so it will be re-queued
	// when Done is called.
	Add(item T)

	// Len returns the current number of items in the queue.
	Len() int

	// Get blocks until an item is available, then returns it along with a
	// shutdown indicator. If shutdown is true, the caller should exit.
	// The caller MUST call Done with the item when processing is complete.
	Get() (item T, shutdown bool)

	// Done marks an item as finished processing. If the item was re-added
	// (dirty) while being processed, it is re-enqueued.
	// This MUST be called for every item returned by Get.
	Done(item T)

	// ShutDown signals the queue to shut down. All blocked Get calls will
	// return with shutdown=true. Further Add calls are ignored.
	ShutDown()

	// ShutDownWithDrain signals shutdown but continues to process existing
	// items until the queue is empty. New items added after this call are
	// accepted only if the queue hasn't fully drained yet.
	ShutDownWithDrain()

	// ShuttingDown returns true if ShutDown or ShutDownWithDrain has been called.
	ShuttingDown() bool
}

Interface defines the contract for a work queue. Modeled after k8s.io/client-go/util/workqueue.TypedInterface[T].

func New

func New[T comparable]() Interface[T]

New creates a new work queue. This is the equivalent of k8s.io/client-go/util/workqueue.NewTyped[T]().

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL