reflector

package
v2.30.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2026 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultWatchErrorHandler

func DefaultWatchErrorHandler(ctx context.Context, r *Reflector, err error)

DefaultWatchErrorHandler is the default implementation of WatchErrorHandlerWithContext.

func New

func New(c *Config) cache.Controller

New makes a new Controller from the given Config. KCP modification: This controller uses our forked Reflector with cluster-aware key function support. Returns cache.Controller so it can be used as a drop-in replacement.

Types

type Config

type Config struct {
	// The queue for your objects - has to be a DeltaFIFO due to
	// assumptions in the implementation. Your Process() function
	// should accept the output of this Queue's Pop() method.
	cache.Queue

	// Something that can list and watch your objects.
	cache.ListerWatcher

	// Process can process a popped Deltas.
	Process cache.ProcessFunc

	// ObjectType is an example object of the type this controller is
	// expected to handle.
	ObjectType runtime.Object

	// ObjectDescription is the description to use when logging type-specific information about this controller.
	ObjectDescription string

	// FullResyncPeriod is the period at which ShouldResync is considered.
	FullResyncPeriod time.Duration

	// MinWatchTimeout, if set, will define the minimum timeout for watch requests send
	// to kube-apiserver. However, values lower than 5m will not be honored to avoid
	// negative performance impact on controlplane.
	// Optional - if unset a default value of 5m will be used.
	MinWatchTimeout time.Duration

	// ShouldResync is periodically used by the reflector to determine
	// whether to Resync the Queue. If ShouldResync is `nil` or
	// returns true, it means the reflector should proceed with the
	// resync.
	ShouldResync cache.ShouldResyncFunc

	// Called whenever the ListAndWatch drops the connection with an error.
	WatchErrorHandler cache.WatchErrorHandler

	// Called whenever the ListAndWatch drops the connection with an error
	// and WatchErrorHandler is not set.
	WatchErrorHandlerWithContext cache.WatchErrorHandlerWithContext

	// WatchListPageSize is the requested chunk size of initial and relist watch lists.
	WatchListPageSize int64

	// KCP modification: KeyFunction is the function used to generate keys for objects
	// in the reflector's temporary store during WatchList operations.
	// This is critical for multi-cluster setups where objects from different clusters
	// may have the same namespace/name but different cluster names.
	KeyFunction cache.KeyFunc
}

Config contains all the settings for one of these low-level controllers. KCP modification: Added KeyFunction field for cluster-aware key generation.

type Reflector

type Reflector struct {

	// WatchListPageSize is the requested chunk size of initial and resync watch lists.
	// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
	// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
	// it will turn off pagination to allow serving them from watch cache.
	// NOTE: It should be used carefully as paginated lists are always served directly from
	// etcd, which is significantly less efficient and may lead to serious performance and
	// scalability problems.
	WatchListPageSize int64
	// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
	ShouldResync func() bool
	// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
	MaxInternalErrorRetryDuration time.Duration
	// contains filtered or unexported fields
}

Reflector watches a specified resource and causes all changes to be reflected in the given store. KCP modification: Added keyFunction field for cluster-aware key generation.

func NewNamedReflector

func NewNamedReflector(name string, lw cache.ListerWatcher, expectedType interface{}, store cache.ReflectorStore, resyncPeriod time.Duration) *Reflector

NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further information.

func NewReflector

func NewReflector(lw cache.ListerWatcher, expectedType interface{}, store cache.ReflectorStore, resyncPeriod time.Duration) *Reflector

NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack that is outside this package. See NewReflectorWithOptions for further information.

func NewReflectorWithOptions

func NewReflectorWithOptions(lw cache.ListerWatcher, expectedType interface{}, store cache.ReflectorStore, options ReflectorOptions) *Reflector

NewReflectorWithOptions creates a new Reflector object which will keep the given store up to date with the server's contents for the given resource. Reflector promises to only put things in the store that have the type of expectedType, unless expectedType is nil. If resyncPeriod is non-zero, then the reflector will periodically consult its ShouldResync function to determine whether to invoke the Store's Resync operation; `ShouldResync==nil` means always "yes". This enables you to use reflectors to periodically process everything as well as incrementally processing the things that change.

func (*Reflector) LastSyncResourceVersion

func (r *Reflector) LastSyncResourceVersion() string

LastSyncResourceVersion is the resource version observed when last sync with the underlying store The value returned is not synchronized with access to the underlying store and is not thread-safe

func (*Reflector) ListAndWatch

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error

ListAndWatch first lists all items and get the resource version at the moment of call, and then use the resource version to watch. It returns error if ListAndWatch didn't even try to initialize watch.

Contextual logging: ListAndWatchWithContext should be used instead of ListAndWatch in code which supports contextual logging.

func (*Reflector) ListAndWatchWithContext

func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error

ListAndWatchWithContext first lists all items and get the resource version at the moment of call, and then use the resource version to watch. It returns error if ListAndWatchWithContext didn't even try to initialize watch.

func (*Reflector) Name

func (r *Reflector) Name() string

func (*Reflector) Run

func (r *Reflector) Run(stopCh <-chan struct{})

Run repeatedly uses the reflector's ListAndWatch to fetch all the objects and subsequent deltas. Run will exit when stopCh is closed.

Contextual logging: RunWithContext should be used instead of Run in code which supports contextual logging.

func (*Reflector) RunWithContext

func (r *Reflector) RunWithContext(ctx context.Context)

RunWithContext repeatedly uses the reflector's ListAndWatch to fetch all the objects and subsequent deltas. Run will exit when the context is canceled.

func (*Reflector) TypeDescription

func (r *Reflector) TypeDescription() string

type ReflectorOptions

type ReflectorOptions struct {
	// Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line
	// in the call stack that is outside this package.
	Name string

	// TypeDescription is the Reflector's type description. If unset/unspecified, the type description is defaulted
	// using the following rules: if the expectedType passed to NewReflectorWithOptions was nil, the type description is
	// "<unspecified>". If the expectedType is an instance of *unstructured.Unstructured and its apiVersion and kind fields
	// are set, the type description is the string encoding of those. Otherwise, the type description is set to the
	// go type of expectedType..
	TypeDescription string

	// ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0
	// (do not resync).
	ResyncPeriod time.Duration

	// MinWatchTimeout, if non-zero, defines the minimum timeout for watch requests send to kube-apiserver.
	// However, values lower than 5m will not be honored to avoid negative performance impact on controlplane.
	MinWatchTimeout time.Duration

	// Clock allows tests to control time. If unset defaults to clock.RealClock{}
	Clock clock.Clock

	// KCP modification: KeyFunction is used to generate keys for objects in the temporary store
	// during WatchList operations. If unset, defaults to DeletionHandlingMetaClusterNamespaceKeyFunc
	// for cluster-aware key generation.
	KeyFunction cache.KeyFunc
}

ReflectorOptions configures a Reflector. KCP modification: Added KeyFunction field.

type VeryShortWatchError

type VeryShortWatchError struct {
	// Name of the Reflector
	Name string
}

VeryShortWatchError is returned when the watch result channel is closed within one second, without having sent any events.

func (*VeryShortWatchError) Error

func (e *VeryShortWatchError) Error() string

Error implements the error interface

type WatchErrorHandlerWithContext

type WatchErrorHandlerWithContext func(ctx context.Context, r *Reflector, err error)

WatchErrorHandlerWithContext is called whenever ListAndWatch drops the connection with an error. After calling this handler, the informer will backoff and retry.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL