worker

package
v1.14.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2025 License: Apache-2.0 Imports: 16 Imported by: 15

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAllWorkersAreTerminated = errors.New("All workers are terminated")
View Source
var ErrNoAvailableWorkers = errors.New("No available workers")
View Source
var WorkerFactorySingleton = Factory{}

global singleton

Functions

This section is empty.

Types

type Allocator

type Allocator interface {

	// Allocate allocates a worker
	Allocate(timeout time.Duration) (*Worker, error)

	// Release releases a worker
	Release(worker *Worker)

	// Shareable returns true if the several go routines can share this allocator
	Shareable() bool

	// GetWorkers gets direct access to all workers for things like management / housekeeping
	GetWorkers() []*Worker

	// GetNumWorkersAvailable gets number of workers available in the allocator
	GetNumWorkersAvailable() int

	// GetStatistics returns worker allocator statistics
	GetStatistics() *AllocatorStatistics

	// SignalDraining signals all workers to drain events
	SignalDraining() error

	// SignalContinue signals all workers to continue event processing
	SignalContinue() error

	// SignalTermination signals all workers to terminate
	SignalTermination() error

	// IsTerminated returns true if all workers are terminated
	IsTerminated() bool
}

func NewFixedPoolWorkerAllocator

func NewFixedPoolWorkerAllocator(parentLogger logger.Logger, workers []*Worker) (Allocator, error)

func NewSingletonWorkerAllocator

func NewSingletonWorkerAllocator(parentLogger logger.Logger, worker *Worker) (Allocator, error)

type AllocatorStatistics

type AllocatorStatistics struct {
	WorkerAllocationCount                       uint64
	WorkerAllocationSuccessImmediateTotal       uint64
	WorkerAllocationSuccessAfterWaitTotal       uint64
	WorkerAllocationTimeoutTotal                uint64
	WorkerAllocationWaitDurationMilliSecondsSum uint64
	WorkerAllocationWorkersAvailablePercentage  uint64
}

func (*AllocatorStatistics) DiffFrom

type AllocatorSyncMap

type AllocatorSyncMap struct {
	// contains filtered or unexported fields
}

func NewAllocatorSyncMap

func NewAllocatorSyncMap() *AllocatorSyncMap

func (*AllocatorSyncMap) Delete

func (w *AllocatorSyncMap) Delete(key string)

Delete delete worker allocator

func (*AllocatorSyncMap) Keys

func (w *AllocatorSyncMap) Keys() []string

Keys returns all allocator keys

func (*AllocatorSyncMap) Load

func (w *AllocatorSyncMap) Load(key string) (Allocator, bool)

Load returns the worker allocator stored in the map for a key

func (*AllocatorSyncMap) LoadOrStore

func (w *AllocatorSyncMap) LoadOrStore(key string,
	workerAllocatorCreator func() (Allocator, error)) (Allocator, error)

LoadOrStore tries to load exiting worker by key, if not existing - creates one and returns it if key is empty - always create and return a new worker allocator

func (*AllocatorSyncMap) Range

func (w *AllocatorSyncMap) Range(handler func(string, Allocator) bool)

Range calls handler for each worker allocator in map if handler returns false, iteration is stopped

func (*AllocatorSyncMap) Store

func (w *AllocatorSyncMap) Store(key string, value Allocator)

Store sets the worker allocator per key

type Factory

type Factory struct{}

func (*Factory) CreateFixedPoolWorkerAllocator

func (waf *Factory) CreateFixedPoolWorkerAllocator(logger logger.Logger,
	numWorkers int,
	runtimeConfiguration *runtime.Configuration) (Allocator, error)

func (*Factory) CreateSingletonPoolWorkerAllocator

func (waf *Factory) CreateSingletonPoolWorkerAllocator(logger logger.Logger,
	runtimeConfiguration *runtime.Configuration) (Allocator, error)

type Statistics

type Statistics struct {
	EventsHandledSuccess uint64
	EventsHandledError   uint64
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker holds all the required state and context to handle a single request

func NewWorker

func NewWorker(parentLogger logger.Logger,
	index int,
	runtime runtime.Runtime) (*Worker, error)

NewWorker creates a new worker

func (*Worker) Continue

func (w *Worker) Continue() error

func (*Worker) Drain

func (w *Worker) Drain() error

func (*Worker) GetBinaryCloudEvent

func (w *Worker) GetBinaryCloudEvent() *cloudevent.Binary

GetBinaryCloudEvent return a binary cloud event

func (*Worker) GetEventTime

func (w *Worker) GetEventTime() *time.Time

GetEventTime return current event time, nil if we're not handling event

func (*Worker) GetIndex

func (w *Worker) GetIndex() int

GetIndex returns the index of the worker, as specified during creation

func (*Worker) GetRuntime

func (w *Worker) GetRuntime() runtime.Runtime

GetRuntime returns the runtime of the worker, as specified during creation

func (*Worker) GetStatistics

func (w *Worker) GetStatistics() *Statistics

GetStatistics returns a pointer to the statistics object. This must not be modified by the reader

func (*Worker) GetStatus

func (w *Worker) GetStatus() status.Status

GetStatus returns the status of the worker, as updated by the runtime

func (*Worker) GetStructuredCloudEvent

func (w *Worker) GetStructuredCloudEvent() *cloudevent.Structured

GetStructuredCloudEvent return a structued clould event

func (*Worker) ProcessEvent

func (w *Worker) ProcessEvent(event nuclio.Event, functionLogger logger.Logger) (interface{}, error)

ProcessEvent sends the event to the associated runtime

func (*Worker) ProcessEventBatch

func (w *Worker) ProcessEventBatch(batch []nuclio.Event) ([]*runtime.ResponseWithErrors, error)

func (*Worker) ResetEventTime

func (w *Worker) ResetEventTime()

ResetEventTime resets the event time

func (*Worker) Restart

func (w *Worker) Restart() error

Restart restarts the worker

func (*Worker) Stop

func (w *Worker) Stop() error

Stop stops the worker and associated runtime

func (*Worker) Subscribe

Subscribe subscribes to a control message kind

func (*Worker) SupportsRestart

func (w *Worker) SupportsRestart() bool

SupportsRestart returns true if the underlying runtime supports restart

func (*Worker) Terminate

func (w *Worker) Terminate() error

func (*Worker) Unsubscribe

Unsubscribe unsubscribes from a control message kind

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL