worker

package
v0.28.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncRequestProcessWorker

type AsyncRequestProcessWorker struct {
	// contains filtered or unexported fields
}

AsyncRequestProcessWorker is the worker to process async requests.

func New

New creates AsyncRequestProcessWorker server instance.

func (*AsyncRequestProcessWorker) Start

Start starts worker's message loop - it starts a loop to process messages from a queue concurrently, and handles deduplication, updating resource and operation status, and running the operation. It returns an error if it fails to start the dequeuer.

type ControllerFactoryFunc

type ControllerFactoryFunc func(opts ctrl.Options) (ctrl.Controller, error)

type ControllerRegistry

type ControllerRegistry struct {
	// contains filtered or unexported fields
}

ControllerRegistry is an registry to register async controllers.

func NewControllerRegistry

func NewControllerRegistry(sp dataprovider.DataStorageProvider) *ControllerRegistry

NewControllerRegistry creates an ControllerRegistry instance.

func (*ControllerRegistry) Get

func (h *ControllerRegistry) Get(operationType v1.OperationType) ctrl.Controller

Get gets the registered async controller instance.

func (*ControllerRegistry) Register

func (h *ControllerRegistry) Register(ctx context.Context, resourceType string, method v1.OperationMethod, factoryFn ControllerFactoryFunc, opts ctrl.Options) error

Register registers controller.

type Options

type Options struct {
	// MaxOperationConcurrency is the maximum concurrency to process async request operation.
	MaxOperationConcurrency int

	// MaxOperationRetryCount is the maximum retry count to process async request operation.
	MaxOperationRetryCount int

	// MessageExtendMargin is the margin duration for clock skew before extending message lock.
	MessageExtendMargin time.Duration

	// MinMessageLockDuration is the minimum duration of message lock duration.
	MinMessageLockDuration time.Duration

	// DeduplicationDuration is the duration for the deduplication detection.
	DeduplicationDuration time.Duration

	// DequeueIntervalDuration is the duration for the dequeue interval.
	DequeueIntervalDuration time.Duration
}

Options configures AsyncRequestProcessorWorker

type Service

type Service struct {
	// ProviderName is the name of provider namespace.
	ProviderName string
	// Options is the server hosting options.
	Options hostoptions.HostOptions
	// StorageProvider is the provider of storage client.
	StorageProvider dataprovider.DataStorageProvider
	// OperationStatusManager is the manager of the operation status.
	OperationStatusManager manager.StatusManager
	// Controllers is the registry of the async operation controllers.
	Controllers *ControllerRegistry
	// RequestQueue is the queue client for async operation request message.
	RequestQueue queue.Client
}

Service is the base worker service implementation to initialize and start worker.

func (*Service) Init

func (s *Service) Init(ctx context.Context) error

Init initializes worker service - it initializes the StorageProvider, RequestQueue, OperationStatusManager, Controllers, KubeClient and returns an error if any of these operations fail.

func (*Service) Start

func (s *Service) Start(ctx context.Context, opt Options) error

Start creates and starts a worker, and logs any errors that occur while starting the worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL