Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RequeueAfter ¶
RequeueAfter constructs a RequeueAfterError with the given duration setting.
func RequeueNow ¶
func RequeueNow() error
RequeueNow constructs a RequeueAfterError that reschedules the Request immediately.
Types ¶
type Controller ¶
type Controller interface {
// Run begins the Controller's main processing loop. When the given
// context is canceled, the Controller stops processing any remaining work.
// The Run function should only ever be called once.
Run(ctx context.Context) error
// Subscribe tells the controller to subscribe to updates for config entries based
// on the given request. Optional transformation functions can also be passed in
// to Subscribe, allowing a controller to map a config entry to a different type of
// request under the hood (i.e. watching a dependency and triggering a Reconcile on
// the dependent resource). This should only ever be called prior to calling Run.
Subscribe(request *stream.SubscribeRequest, transformers ...Transformer) Controller
// WithBackoff changes the base and maximum backoff values for the Controller's
// Request retry rate limiter. This should only ever be called prior to
// running Run.
WithBackoff(base, max time.Duration) Controller
// WithLogger sets the logger for the controller, it should be called prior to Start
// being invoked.
WithLogger(logger hclog.Logger) Controller
// WithWorkers sets the number of worker goroutines used to process the queue
// this defaults to 1 goroutine.
WithWorkers(i int) Controller
// WithQueueFactory allows a Controller to replace its underlying work queue
// implementation. This is most useful for testing. This should only ever be called
// prior to running Run.
WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller
// AddTrigger allows for triggering a reconciliation request when a
// triggering function returns, when the passed in context is canceled
// the trigger must return
AddTrigger(request Request, trigger func(ctx context.Context) error)
// RemoveTrigger removes the triggering function associated with the Request object
RemoveTrigger(request Request)
// Enqueue adds all of the given requests into the work queue.
Enqueue(requests ...Request)
}
Controller subscribes to a set of watched resources from the state store and delegates processing them to a given Reconciler. If a Reconciler errors while processing a Request, then the Controller handles rescheduling the Request to be re-processed.
func New ¶
func New(publisher state.EventPublisher, reconciler Reconciler) Controller
New returns a new Controller associated with the given state store and reconciler.
type DeferQueue ¶
type DeferQueue interface {
// Defer defers processing a Request until a given time. When
// the timeout is hit, the request will be processed by the
// callback given in the Process loop. If the given context
// is canceled, the item is not deferred.
Defer(ctx context.Context, item Request, until time.Time)
// Process processes all items in the defer queue with the
// given callback, blocking until the given context is canceled.
// Callers should only ever call Process once, likely in a
// long-lived goroutine.
Process(ctx context.Context, callback func(item Request))
}
DeferQueue is a generic priority queue implementation that allows for deferring and later processing Requests.
func NewDeferQueue ¶
func NewDeferQueue(tick time.Duration) DeferQueue
NewDeferQueue returns a priority queue for deferred Requests.
type Limiter ¶
type Limiter interface {
// NextRetry returns the remaining time until the queue should
// reprocess a Request.
NextRetry(request Request) time.Duration
// Forget causes the Limiter to reset the backoff for the Request.
Forget(request Request)
}
Limiter is an interface for a rate limiter that can limit the number of retries processed in the work queue.
func NewRateLimiter ¶
NewRateLimiter returns a Limiter that does per-item exponential backoff.
type Reconciler ¶
type Reconciler interface {
// Reconcile performs a reconciliation on the config entry referred to by the Request.
// The Controller will requeue the Request to be processed again if an error is non-nil.
// If no error is returned, the Request will be removed from the working queue.
Reconcile(context.Context, Request) error
}
Reconciler is the main implementation interface for Controllers. A Reconciler receives any change notifications for config entries that the controller is subscribed to and processes them with its Reconcile function.
type Request ¶
type Request struct {
Kind string
Name string
Meta *acl.EnterpriseMeta
}
Request contains the information necessary to reconcile a config entry. This includes only the information required to uniquely identify the config entry.
type RequeueAfterError ¶
RequeueAfterError is an error that allows a Reconciler to override the exponential backoff behavior of the Controller, rather than applying the backoff algorithm, returning a RequeueAfterError will cause the Controller to reschedule the Request at a given time in the future.
func (RequeueAfterError) Error ¶
func (r RequeueAfterError) Error() string
Error implements the error interface.
type Transformer ¶
type Transformer func(entry structs.ConfigEntry) []Request
Transformer is a function that takes one type of config entry that has changed and transforms that into a set of reconciliation requests to enqueue.
type WorkQueue ¶
type WorkQueue interface {
// Get retrieves the next Request in the queue, blocking until a Request is
// available, if shutdown is true, then the queue is shutting down and should
// no longer be used by the caller.
Get() (item Request, shutdown bool)
// Add immediately adds a Request to the work queue.
Add(item Request)
// AddAfter adds a Request to the work queue after a given amount of time.
AddAfter(item Request, duration time.Duration)
// AddRateLimited adds a Request to the work queue after the amount of time
// specified by applying the queue's rate limiter.
AddRateLimited(item Request)
// Forget signals the queue to reset the rate-limiting for the given Request.
Forget(item Request)
// Done tells the work queue that the Request has been successfully processed
// and can be deleted from the queue.
Done(item Request)
}
WorkQueue is an interface for a work queue with semantics to help with retries and rate limiting.