Documentation
¶
Index ¶
- Variables
- type Allocator
- type AllocatorStatistics
- type AllocatorSyncMap
- func (w *AllocatorSyncMap) Delete(key string)
- func (w *AllocatorSyncMap) Keys() []string
- func (w *AllocatorSyncMap) Load(key string) (Allocator, bool)
- func (w *AllocatorSyncMap) LoadOrStore(key string, workerAllocatorCreator func() (Allocator, error)) (Allocator, error)
- func (w *AllocatorSyncMap) Range(handler func(string, Allocator) bool)
- func (w *AllocatorSyncMap) Store(key string, value Allocator)
- type Factory
- type Statistics
- type Worker
- func (w *Worker) Continue() error
- func (w *Worker) Drain() error
- func (w *Worker) GetBinaryCloudEvent() *cloudevent.Binary
- func (w *Worker) GetEventTime() *time.Time
- func (w *Worker) GetIndex() int
- func (w *Worker) GetRuntime() runtime.Runtime
- func (w *Worker) GetStatistics() *Statistics
- func (w *Worker) GetStatus() status.Status
- func (w *Worker) GetStructuredCloudEvent() *cloudevent.Structured
- func (w *Worker) ProcessEvent(event nuclio.Event, functionLogger logger.Logger) (interface{}, error)
- func (w *Worker) ProcessEventBatch(batch []nuclio.Event) ([]*runtime.ResponseWithErrors, error)
- func (w *Worker) ResetEventTime()
- func (w *Worker) Restart() error
- func (w *Worker) Stop() error
- func (w *Worker) Subscribe(kind controlcommunication.ControlMessageKind, ...) error
- func (w *Worker) SupportsRestart() bool
- func (w *Worker) Terminate() error
- func (w *Worker) Unsubscribe(kind controlcommunication.ControlMessageKind, ...) error
Constants ¶
This section is empty.
Variables ¶
var ErrAllWorkersAreTerminated = errors.New("All workers are terminated")
var ErrNoAvailableWorkers = errors.New("No available workers")
var WorkerFactorySingleton = Factory{}
global singleton
Functions ¶
This section is empty.
Types ¶
type Allocator ¶
type Allocator interface {
// Allocate allocates a worker
Allocate(timeout time.Duration) (*Worker, error)
// Release releases a worker
Release(worker *Worker)
Shareable() bool
// GetWorkers gets direct access to all workers for things like management / housekeeping
GetWorkers() []*Worker
// GetNumWorkersAvailable gets number of workers available in the allocator
GetNumWorkersAvailable() int
// GetStatistics returns worker allocator statistics
GetStatistics() *AllocatorStatistics
// SignalDraining signals all workers to drain events
SignalDraining() error
// SignalContinue signals all workers to continue event processing
SignalContinue() error
// SignalTermination signals all workers to terminate
SignalTermination() error
// IsTerminated returns true if all workers are terminated
IsTerminated() bool
}
type AllocatorStatistics ¶
type AllocatorStatistics struct {
WorkerAllocationCount uint64
WorkerAllocationSuccessImmediateTotal uint64
WorkerAllocationSuccessAfterWaitTotal uint64
WorkerAllocationTimeoutTotal uint64
WorkerAllocationWaitDurationMilliSecondsSum uint64
WorkerAllocationWorkersAvailablePercentage uint64
}
func (*AllocatorStatistics) DiffFrom ¶
func (s *AllocatorStatistics) DiffFrom(prev *AllocatorStatistics) AllocatorStatistics
type AllocatorSyncMap ¶
type AllocatorSyncMap struct {
// contains filtered or unexported fields
}
func NewAllocatorSyncMap ¶
func NewAllocatorSyncMap() *AllocatorSyncMap
func (*AllocatorSyncMap) Delete ¶
func (w *AllocatorSyncMap) Delete(key string)
Delete delete worker allocator
func (*AllocatorSyncMap) Keys ¶
func (w *AllocatorSyncMap) Keys() []string
Keys returns all allocator keys
func (*AllocatorSyncMap) Load ¶
func (w *AllocatorSyncMap) Load(key string) (Allocator, bool)
Load returns the worker allocator stored in the map for a key
func (*AllocatorSyncMap) LoadOrStore ¶
func (w *AllocatorSyncMap) LoadOrStore(key string, workerAllocatorCreator func() (Allocator, error)) (Allocator, error)
LoadOrStore tries to load exiting worker by key, if not existing - creates one and returns it if key is empty - always create and return a new worker allocator
func (*AllocatorSyncMap) Range ¶
func (w *AllocatorSyncMap) Range(handler func(string, Allocator) bool)
Range calls handler for each worker allocator in map if handler returns false, iteration is stopped
func (*AllocatorSyncMap) Store ¶
func (w *AllocatorSyncMap) Store(key string, value Allocator)
Store sets the worker allocator per key
type Factory ¶
type Factory struct{}
func (*Factory) CreateFixedPoolWorkerAllocator ¶
func (*Factory) CreateSingletonPoolWorkerAllocator ¶
type Statistics ¶
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker holds all the required state and context to handle a single request
func (*Worker) GetBinaryCloudEvent ¶
func (w *Worker) GetBinaryCloudEvent() *cloudevent.Binary
GetBinaryCloudEvent return a binary cloud event
func (*Worker) GetEventTime ¶
GetEventTime return current event time, nil if we're not handling event
func (*Worker) GetRuntime ¶
GetRuntime returns the runtime of the worker, as specified during creation
func (*Worker) GetStatistics ¶
func (w *Worker) GetStatistics() *Statistics
GetStatistics returns a pointer to the statistics object. This must not be modified by the reader
func (*Worker) GetStructuredCloudEvent ¶
func (w *Worker) GetStructuredCloudEvent() *cloudevent.Structured
GetStructuredCloudEvent return a structued clould event
func (*Worker) ProcessEvent ¶
func (w *Worker) ProcessEvent(event nuclio.Event, functionLogger logger.Logger) (interface{}, error)
ProcessEvent sends the event to the associated runtime
func (*Worker) ProcessEventBatch ¶
func (*Worker) ResetEventTime ¶
func (w *Worker) ResetEventTime()
ResetEventTime resets the event time
func (*Worker) Subscribe ¶
func (w *Worker) Subscribe(kind controlcommunication.ControlMessageKind, channel chan *controlcommunication.ControlMessage) error
Subscribe subscribes to a control message kind
func (*Worker) SupportsRestart ¶
SupportsRestart returns true if the underlying runtime supports restart
func (*Worker) Unsubscribe ¶
func (w *Worker) Unsubscribe(kind controlcommunication.ControlMessageKind, channel chan *controlcommunication.ControlMessage) error
Unsubscribe unsubscribes from a control message kind