queue

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2026 License: Apache-2.0 Imports: 7 Imported by: 1

Documentation

Overview

Package queue provides helpers for working with client-go's `workqueues`.

`queue.OperationsContext` can be used from within a `Handler` to control the behavior of the queue that has called the handler.

The queue operations are:

- Done (stop processing the current key) - Requeue (requeue the current key) - RequeueAfter (wait for some period of time before requeuing the current key) - ReqeueueErr (record an error and requeue) - RequeueAPIError (requeue after waiting according to the priority and fairness response from the apiserver)

If calling these controls from a handler, it's important to `return` immediately so that the handler does not continue processing a key that the queue thinks has stopped.

Package queue provides helpers for working with client-go's `workqueues` and control flow handlers.

This file provides handlers that integrate with the state package to provide clean control flow patterns for controllers. Instead of manually calling queue operations and returning, controllers can now use:

return queue.Done()
return queue.Requeue()
return queue.RequeueAfter(5 * time.Second)
return queue.RequeueErr(err)

These handlers automatically call the appropriate queue operations and terminate the handler pipeline by returning nil.

Error Propagation Pattern

Errors in this system are communicated via:

  1. **Context Cancellation**: ctx.Err() is checked by Continue() and parallel execution. When context is cancelled, the pipeline stops (unless WithErrorHandler is set).

  2. **Explicit Error Handlers**: queue.RequeueErr(err) and queue.RequeueAPIErr(err) take an explicit error and pass it to the queue for retry logic.

  3. **Context Values**: Steps can store errors in context using typedctx.Box or context.WithValue, then check them in subsequent steps or OnError handlers.

  4. **Decision-based Error Handling**: Use Decision() to branch based on whether an error occurred, or handle errors inline:

    riskyOperation := state.NewStepFunc(func(ctx context.Context, next state.Step) state.Step { result, err := doSomethingRisky() if err != nil { return queue.RequeueErr(err).Step().Run(ctx) // Handle error inline } // Store result and continue ctx = context.WithValue(ctx, "result", result) return state.Continue(ctx, next) })

Or with Decision for conditional logic:

Sequence(
    validateInput,
    Decision(
        func(ctx context.Context) bool { return isValid(ctx) },
        continueProcessing,
        queue.RequeueErr(fmt.Errorf("validation failed")),
    ),
)
Example

Example demonstrating the clean controller pattern this enables

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/authzed/ctxkey"

	"github.com/authzed/controller-idioms/queue"
	"github.com/authzed/controller-idioms/queue/fake"
	"github.com/authzed/controller-idioms/state"
)

func main() {
	ctx := context.Background()
	fakeQueue := &fake.FakeInterface{}

	queueCtx := queue.NewQueueOperationsCtx()
	ctxWithQueue := queueCtx.WithValue(ctx, fakeQueue)

	// Simulate resource state
	resourceReadyKey := ctxkey.New[bool]()
	ctxWithResource := resourceReadyKey.Set(ctxWithQueue, true)

	// Clean controller pipeline using queue handlers
	controllerPipeline := state.Sequence(
		// Set finalizer
		state.Do(func(ctx context.Context) context.Context {
			fmt.Println("Setting finalizer")
			return ctx
		}),

		// Check if resource is ready to process
		state.When(
			func(ctx context.Context) bool {
				return !resourceReadyKey.MustValue(ctx)
			},
			queue.RequeueAfter(30*time.Second), // Wait 30 seconds if not ready
		),

		// Process the resource
		state.Do(func(ctx context.Context) context.Context {
			fmt.Println("Processing resource")
			return ctx
		}),

		// Validate processing completed successfully
		state.When(
			func(_ context.Context) bool {
				// In real code, this would check if processing completed
				return false // Assume success
			},
			queue.Requeue(),
		),

		// Mark as done
		queue.Done(),
	)

	state.Run(ctxWithResource, controllerPipeline)

}
Output:
Setting finalizer
Processing resource

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Done added in v0.14.0

func Done() state.NewStep

Done creates a handler that marks the current queue key as finished and terminates the pipeline. This is equivalent to calling queue.NewQueueOperationsCtx().Done(ctx) and returning.

Usage:

pipeline := state.Sequence(
  validateInput,
  processResource,
  queue.Done(), // Stop here - processing complete
)

func OnError added in v0.14.0

func OnError(errorHandler, successHandler state.NewStep) state.NewStep

OnError creates a handler that executes different queue operations based on whether an error occurred in the context.

Usage:

pipeline := state.Sequence(
  riskyOperation,
  queue.OnError(
    queue.RequeueErr(fmt.Errorf("operation failed")), // If error
    queue.Done(), // If success
  ),
)

func Requeue added in v0.14.0

func Requeue() state.NewStep

Requeue creates a handler that requeues the current key immediately and terminates the pipeline. This is equivalent to calling queue.NewQueueOperationsCtx().Requeue(ctx) and returning.

Usage:

pipeline := state.Decision(
  resourceReady,
  continueProcessing,
  queue.Requeue(), // Not ready - try again immediately
)

func RequeueAPIErr added in v0.14.0

func RequeueAPIErr(err error) state.NewStep

RequeueAPIErr creates a handler that handles API errors with appropriate retry logic and terminates the pipeline. This checks if the error contains retry information from the API server and requeues accordingly, equivalent to calling queue.NewQueueOperationsCtx().RequeueAPIErr(ctx, err).

Usage - return directly from within a step when error occurs:

callKubernetesAPI := state.NewStepFunc(func(ctx context.Context, next state.Step) state.Step {
    result, err := clientset.AppsV1().Deployments(ns).Get(ctx, name, metav1.GetOptions{})
    if err != nil {
        return queue.RequeueAPIErr(err).Step().Run(ctx) // execute inline, terminating the pipeline
    }
    // Store result in context and continue
    ctx = context.WithValue(ctx, "deployment", result)
    return state.Continue(ctx, next)
})

Note: .Step() is equivalent to calling the NewStep with nil: queue.RequeueAPIErr(err)(nil) but is more readable and explicit about the conversion.

This pattern keeps errors local to where they occur, avoiding the need to store errors in context or check them in Decision branches.

func RequeueAfter added in v0.14.0

func RequeueAfter(duration time.Duration) state.NewStep

RequeueAfter creates a handler that requeues the current key after the specified duration and terminates the pipeline. This is equivalent to calling queue.NewQueueOperationsCtx().RequeueAfter(ctx, duration) and returning.

Usage:

pipeline := state.Decision(
  resourceReady,
  continueProcessing,
  queue.RequeueAfter(30 * time.Second), // Not ready - try again in 30s
)

func RequeueErr added in v0.14.0

func RequeueErr(err error) state.NewStep

RequeueErr creates a handler that records an error and requeues the current key immediately, then terminates the pipeline. This is equivalent to calling queue.NewQueueOperationsCtx().RequeueErr(ctx, err) and returning.

Usage - return directly from within a step when error occurs:

validateInput := state.NewStepFunc(func(ctx context.Context, next state.Step) state.Step {
    obj := getObjectFromContext(ctx)
    if err := validate(obj); err != nil {
        return queue.RequeueErr(fmt.Errorf("validation failed: %w", err)).Step().Run(ctx)
    }
    return state.Continue(ctx, next)
})

Or use in Decision branches when the error is known statically:

state.Decision(
    inputValid,
    continueProcessing,
    queue.RequeueErr(fmt.Errorf("validation failed")), // Error known at composition time
)

func ShouldRetry

func ShouldRetry(err error) (bool, time.Duration)

ShouldRetry returns true if the error is transient. It returns a delay if the server suggested one.

Types

type Interface

type Interface interface {
	Done()
	RequeueAfter(duration time.Duration)
	Requeue()
	RequeueErr(err error)
	RequeueAPIErr(err error)
	Error() error
}

Interface is the standard queue control interface

type Operations

type Operations struct {
	// contains filtered or unexported fields
}

Operations deals with the current queue key and provides controls for requeueing or stopping reconciliation.

func NewOperations

func NewOperations(done func(), requeueAfter func(time.Duration), cancel context.CancelFunc) *Operations
Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())

// queue has an object in it
queue.Add("current_key")
key, _ := queue.Get()

// operations are per-key
operations := NewOperations(func() {
	queue.Done(key)
}, func(duration time.Duration) {
	queue.AddAfter(key, duration)
}, cancel)

// typically called from a handler
handler.NewHandlerFromFunc(func(_ context.Context) {
	// do some work
	operations.Done()
}, "example").Handle(ctx)
fmt.Println(queue.Len())

operations.Requeue()
fmt.Println(queue.Len())
Output:
0
1

func (*Operations) Done

func (c *Operations) Done()

Done marks the current key as finished. Note that processing should stop as soon as possible after calling `Done`, since marking it as done frees the queue to potentially process the same key again.

func (*Operations) Error added in v0.5.0

func (c *Operations) Error() error

Error returns the last recorded error, if any

func (*Operations) Requeue

func (c *Operations) Requeue()

Requeue requeues the current key immediately.

func (*Operations) RequeueAPIErr

func (c *Operations) RequeueAPIErr(err error)

RequeueAPIErr checks to see if `err` is a kube api error with retry data. If so, it requeues after the wait period, otherwise, it requeues immediately.

func (*Operations) RequeueAfter

func (c *Operations) RequeueAfter(duration time.Duration)

RequeueAfter requeues the current key after duration.

func (*Operations) RequeueErr

func (c *Operations) RequeueErr(err error)

RequeueErr sets err on the object and requeues the current key.

type OperationsContext

type OperationsContext struct {
	*typedctx.Key[Interface]
}

OperationsContext is like Interface, but fetches the object from a context.

func NewQueueOperationsCtx

func NewQueueOperationsCtx() OperationsContext

NewQueueOperationsCtx returns a new OperationsContext

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())

// queue has an object in it
queue.Add("current_key")

key, _ := queue.Get()

// operations are per-key
CtxQueue := NewQueueOperationsCtx().WithValue(ctx, NewOperations(func() {
	queue.Done(key)
}, func(duration time.Duration) {
	queue.AddAfter(key, duration)
}, cancel))

// queue controls are passed via context
handler.NewHandlerFromFunc(func(_ context.Context) {
	// do some work
	CtxQueue.Done()
}, "example").Handle(ctx)

fmt.Println(queue.Len())
Output:
0

func (OperationsContext) Done

func (h OperationsContext) Done(ctx context.Context)

func (OperationsContext) Error added in v0.5.0

func (h OperationsContext) Error(ctx context.Context) error

func (OperationsContext) Requeue

func (h OperationsContext) Requeue(ctx context.Context)

func (OperationsContext) RequeueAPIErr

func (h OperationsContext) RequeueAPIErr(ctx context.Context, err error)

func (OperationsContext) RequeueAfter

func (h OperationsContext) RequeueAfter(ctx context.Context, duration time.Duration)

func (OperationsContext) RequeueErr

func (h OperationsContext) RequeueErr(ctx context.Context, err error)

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL