Documentation
¶
Overview ¶
Package queue provides helpers for working with client-go's `workqueues`.
`queue.OperationsContext` can be used from within a `Handler` to control the behavior of the queue that has called the handler.
The queue operations are:
- Done (stop processing the current key) - Requeue (requeue the current key) - RequeueAfter (wait for some period of time before requeuing the current key) - ReqeueueErr (record an error and requeue) - RequeueAPIError (requeue after waiting according to the priority and fairness response from the apiserver)
If calling these controls from a handler, it's important to `return` immediately so that the handler does not continue processing a key that the queue thinks has stopped.
Package queue provides helpers for working with client-go's `workqueues` and control flow handlers.
This file provides handlers that integrate with the state package to provide clean control flow patterns for controllers. Instead of manually calling queue operations and returning, controllers can now use:
return queue.Done() return queue.Requeue() return queue.RequeueAfter(5 * time.Second) return queue.RequeueErr(err)
These handlers automatically call the appropriate queue operations and terminate the handler pipeline by returning nil.
Error Propagation Pattern ¶
Errors in this system are communicated via:
**Context Cancellation**: ctx.Err() is checked by Continue() and parallel execution. When context is cancelled, the pipeline stops (unless WithErrorHandler is set).
**Explicit Error Handlers**: queue.RequeueErr(err) and queue.RequeueAPIErr(err) take an explicit error and pass it to the queue for retry logic.
**Context Values**: Steps can store errors in context using typedctx.Box or context.WithValue, then check them in subsequent steps or OnError handlers.
**Decision-based Error Handling**: Use Decision() to branch based on whether an error occurred, or handle errors inline:
riskyOperation := state.NewStepFunc(func(ctx context.Context, next state.Step) state.Step { result, err := doSomethingRisky() if err != nil { return queue.RequeueErr(err).Step().Run(ctx) // Handle error inline } // Store result and continue ctx = context.WithValue(ctx, "result", result) return state.Continue(ctx, next) })
Or with Decision for conditional logic:
Sequence(
validateInput,
Decision(
func(ctx context.Context) bool { return isValid(ctx) },
continueProcessing,
queue.RequeueErr(fmt.Errorf("validation failed")),
),
)
Example ¶
Example demonstrating the clean controller pattern this enables
package main
import (
"context"
"fmt"
"time"
"github.com/authzed/ctxkey"
"github.com/authzed/controller-idioms/queue"
"github.com/authzed/controller-idioms/queue/fake"
"github.com/authzed/controller-idioms/state"
)
func main() {
ctx := context.Background()
fakeQueue := &fake.FakeInterface{}
queueCtx := queue.NewQueueOperationsCtx()
ctxWithQueue := queueCtx.WithValue(ctx, fakeQueue)
// Simulate resource state
resourceReadyKey := ctxkey.New[bool]()
ctxWithResource := resourceReadyKey.Set(ctxWithQueue, true)
// Clean controller pipeline using queue handlers
controllerPipeline := state.Sequence(
// Set finalizer
state.Do(func(ctx context.Context) context.Context {
fmt.Println("Setting finalizer")
return ctx
}),
// Check if resource is ready to process
state.When(
func(ctx context.Context) bool {
return !resourceReadyKey.MustValue(ctx)
},
queue.RequeueAfter(30*time.Second), // Wait 30 seconds if not ready
),
// Process the resource
state.Do(func(ctx context.Context) context.Context {
fmt.Println("Processing resource")
return ctx
}),
// Validate processing completed successfully
state.When(
func(_ context.Context) bool {
// In real code, this would check if processing completed
return false // Assume success
},
queue.Requeue(),
),
// Mark as done
queue.Done(),
)
state.Run(ctxWithResource, controllerPipeline)
}
Output: Setting finalizer Processing resource
Index ¶
- func Done() state.NewStep
- func OnError(errorHandler, successHandler state.NewStep) state.NewStep
- func Requeue() state.NewStep
- func RequeueAPIErr(err error) state.NewStep
- func RequeueAfter(duration time.Duration) state.NewStep
- func RequeueErr(err error) state.NewStep
- func ShouldRetry(err error) (bool, time.Duration)
- type Interface
- type Operations
- type OperationsContext
- func (h OperationsContext) Done(ctx context.Context)
- func (h OperationsContext) Error(ctx context.Context) error
- func (h OperationsContext) Requeue(ctx context.Context)
- func (h OperationsContext) RequeueAPIErr(ctx context.Context, err error)
- func (h OperationsContext) RequeueAfter(ctx context.Context, duration time.Duration)
- func (h OperationsContext) RequeueErr(ctx context.Context, err error)
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Done ¶ added in v0.14.0
Done creates a handler that marks the current queue key as finished and terminates the pipeline. This is equivalent to calling queue.NewQueueOperationsCtx().Done(ctx) and returning.
Usage:
pipeline := state.Sequence( validateInput, processResource, queue.Done(), // Stop here - processing complete )
func OnError ¶ added in v0.14.0
OnError creates a handler that executes different queue operations based on whether an error occurred in the context.
Usage:
pipeline := state.Sequence(
riskyOperation,
queue.OnError(
queue.RequeueErr(fmt.Errorf("operation failed")), // If error
queue.Done(), // If success
),
)
func Requeue ¶ added in v0.14.0
Requeue creates a handler that requeues the current key immediately and terminates the pipeline. This is equivalent to calling queue.NewQueueOperationsCtx().Requeue(ctx) and returning.
Usage:
pipeline := state.Decision( resourceReady, continueProcessing, queue.Requeue(), // Not ready - try again immediately )
func RequeueAPIErr ¶ added in v0.14.0
RequeueAPIErr creates a handler that handles API errors with appropriate retry logic and terminates the pipeline. This checks if the error contains retry information from the API server and requeues accordingly, equivalent to calling queue.NewQueueOperationsCtx().RequeueAPIErr(ctx, err).
Usage - return directly from within a step when error occurs:
callKubernetesAPI := state.NewStepFunc(func(ctx context.Context, next state.Step) state.Step {
result, err := clientset.AppsV1().Deployments(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return queue.RequeueAPIErr(err).Step().Run(ctx) // execute inline, terminating the pipeline
}
// Store result in context and continue
ctx = context.WithValue(ctx, "deployment", result)
return state.Continue(ctx, next)
})
Note: .Step() is equivalent to calling the NewStep with nil: queue.RequeueAPIErr(err)(nil) but is more readable and explicit about the conversion.
This pattern keeps errors local to where they occur, avoiding the need to store errors in context or check them in Decision branches.
func RequeueAfter ¶ added in v0.14.0
RequeueAfter creates a handler that requeues the current key after the specified duration and terminates the pipeline. This is equivalent to calling queue.NewQueueOperationsCtx().RequeueAfter(ctx, duration) and returning.
Usage:
pipeline := state.Decision( resourceReady, continueProcessing, queue.RequeueAfter(30 * time.Second), // Not ready - try again in 30s )
func RequeueErr ¶ added in v0.14.0
RequeueErr creates a handler that records an error and requeues the current key immediately, then terminates the pipeline. This is equivalent to calling queue.NewQueueOperationsCtx().RequeueErr(ctx, err) and returning.
Usage - return directly from within a step when error occurs:
validateInput := state.NewStepFunc(func(ctx context.Context, next state.Step) state.Step {
obj := getObjectFromContext(ctx)
if err := validate(obj); err != nil {
return queue.RequeueErr(fmt.Errorf("validation failed: %w", err)).Step().Run(ctx)
}
return state.Continue(ctx, next)
})
Or use in Decision branches when the error is known statically:
state.Decision(
inputValid,
continueProcessing,
queue.RequeueErr(fmt.Errorf("validation failed")), // Error known at composition time
)
Types ¶
type Interface ¶
type Interface interface {
Done()
RequeueAfter(duration time.Duration)
Requeue()
RequeueErr(err error)
RequeueAPIErr(err error)
Error() error
}
Interface is the standard queue control interface
type Operations ¶
type Operations struct {
// contains filtered or unexported fields
}
Operations deals with the current queue key and provides controls for requeueing or stopping reconciliation.
func NewOperations ¶
func NewOperations(done func(), requeueAfter func(time.Duration), cancel context.CancelFunc) *Operations
Example ¶
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
// queue has an object in it
queue.Add("current_key")
key, _ := queue.Get()
// operations are per-key
operations := NewOperations(func() {
queue.Done(key)
}, func(duration time.Duration) {
queue.AddAfter(key, duration)
}, cancel)
// typically called from a handler
handler.NewHandlerFromFunc(func(_ context.Context) {
// do some work
operations.Done()
}, "example").Handle(ctx)
fmt.Println(queue.Len())
operations.Requeue()
fmt.Println(queue.Len())
Output: 0 1
func (*Operations) Done ¶
func (c *Operations) Done()
Done marks the current key as finished. Note that processing should stop as soon as possible after calling `Done`, since marking it as done frees the queue to potentially process the same key again.
func (*Operations) Error ¶ added in v0.5.0
func (c *Operations) Error() error
Error returns the last recorded error, if any
func (*Operations) Requeue ¶
func (c *Operations) Requeue()
Requeue requeues the current key immediately.
func (*Operations) RequeueAPIErr ¶
func (c *Operations) RequeueAPIErr(err error)
RequeueAPIErr checks to see if `err` is a kube api error with retry data. If so, it requeues after the wait period, otherwise, it requeues immediately.
func (*Operations) RequeueAfter ¶
func (c *Operations) RequeueAfter(duration time.Duration)
RequeueAfter requeues the current key after duration.
func (*Operations) RequeueErr ¶
func (c *Operations) RequeueErr(err error)
RequeueErr sets err on the object and requeues the current key.
type OperationsContext ¶
OperationsContext is like Interface, but fetches the object from a context.
func NewQueueOperationsCtx ¶
func NewQueueOperationsCtx() OperationsContext
NewQueueOperationsCtx returns a new OperationsContext
Example ¶
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
// queue has an object in it
queue.Add("current_key")
key, _ := queue.Get()
// operations are per-key
CtxQueue := NewQueueOperationsCtx().WithValue(ctx, NewOperations(func() {
queue.Done(key)
}, func(duration time.Duration) {
queue.AddAfter(key, duration)
}, cancel))
// queue controls are passed via context
handler.NewHandlerFromFunc(func(_ context.Context) {
// do some work
CtxQueue.Done()
}, "example").Handle(ctx)
fmt.Println(queue.Len())
Output: 0
func (OperationsContext) Done ¶
func (h OperationsContext) Done(ctx context.Context)
func (OperationsContext) Error ¶ added in v0.5.0
func (h OperationsContext) Error(ctx context.Context) error
func (OperationsContext) Requeue ¶
func (h OperationsContext) Requeue(ctx context.Context)
func (OperationsContext) RequeueAPIErr ¶
func (h OperationsContext) RequeueAPIErr(ctx context.Context, err error)
func (OperationsContext) RequeueAfter ¶
func (h OperationsContext) RequeueAfter(ctx context.Context, duration time.Duration)
func (OperationsContext) RequeueErr ¶
func (h OperationsContext) RequeueErr(ctx context.Context, err error)