Versions in this module Expand all Collapse all v1 v1.14.2 Feb 25, 2025 Changes in this version + const DefaultWorkerAvailabilityTimeoutMilliseconds + const NumWorkersLimit + var RegistrySingleton = Registry + type AbstractTrigger struct + Batcher *Batcher + Class string + FunctionName string + ID string + Kind string + Logger logger.Logger + Name string + Namespace string + ProjectName string + Statistics Statistics + Trigger Trigger + WorkerAllocator worker.Allocator + func NewAbstractTrigger(logger logger.Logger, allocator worker.Allocator, configuration *Configuration, ...) (AbstractTrigger, error) + func (at *AbstractTrigger) AllocateWorkerAndSubmitEvent(event nuclio.Event, functionLogger logger.Logger, timeout time.Duration) (response interface{}, submitError error, processError error) + func (at *AbstractTrigger) AllocateWorkerAndSubmitEvents(events []nuclio.Event, functionLogger logger.Logger, timeout time.Duration) (responses []interface{}, submitError error, processErrors []error) + func (at *AbstractTrigger) GetClass() string + func (at *AbstractTrigger) GetFunctionName() string + func (at *AbstractTrigger) GetID() string + func (at *AbstractTrigger) GetKind() string + func (at *AbstractTrigger) GetName() string + func (at *AbstractTrigger) GetNamespace() string + func (at *AbstractTrigger) GetProjectName() string + func (at *AbstractTrigger) GetStatistics() *Statistics + func (at *AbstractTrigger) GetWorkers() []*worker.Worker + func (at *AbstractTrigger) HandleSubmitPanic(workerInstance *worker.Worker, submitError *error) + func (at *AbstractTrigger) Initialize() error + func (at *AbstractTrigger) PostBatchHooks(batch []nuclio.Event, workerInstance *worker.Worker) + func (at *AbstractTrigger) PreBatchHooks(batch []nuclio.Event, workerInstance *worker.Worker) + func (at *AbstractTrigger) Restart() error + func (at *AbstractTrigger) SignalWorkersToContinue() error + func (at *AbstractTrigger) SignalWorkersToDrain() error + func (at *AbstractTrigger) SignalWorkersToTerminate() error + func (at *AbstractTrigger) StartBatcher(batchTimeout time.Duration, workerAvailabilityTimeout time.Duration) + func (at *AbstractTrigger) SubmitBatchAndSendResponses(batch []nuclio.Event, responseChans map[string]*common.ChannelWithRecover, ...) + func (at *AbstractTrigger) SubmitEventToBatch(event nuclio.Event) (chan interface{}, context.CancelFunc) + func (at *AbstractTrigger) SubmitEventToWorker(functionLogger logger.Logger, workerInstance *worker.Worker, ...) (response interface{}, processError error) + func (at *AbstractTrigger) SubscribeToControlMessageKind(kind controlcommunication.ControlMessageKind, ...) error + func (at *AbstractTrigger) TimeoutWorker(worker *worker.Worker) error + func (at *AbstractTrigger) UnsubscribeFromControlMessageKind(kind controlcommunication.ControlMessageKind, ...) error + func (at *AbstractTrigger) UpdateStatistics(success bool, times uint64) + type AnnotationConfigField struct + Key string + ValueBool *bool + ValueInt *int + ValueListString []string + ValueString *string + ValueUInt64 *uint64 + type BatchedEventWithResponse struct + type Batcher struct + Logger logger.Logger + func NewBatcher(logger logger.Logger, batchSize int) *Batcher + func (b *Batcher) Add(event nuclio.Event, responseChan *common.ChannelWithRecover) + func (b *Batcher) WaitForBatch(batchTimeout time.Duration) ([]nuclio.Event, map[string]*common.ChannelWithRecover) + type Configuration struct + ID string + RuntimeConfiguration *runtime.Configuration + func NewConfiguration(id string, triggerConfiguration *functionconfig.Trigger, ...) (*Configuration, error) + func (c *Configuration) ParseDurationOrDefault(durationConfigField *DurationConfigField) error + func (c *Configuration) PopulateConfigurationFromAnnotations(annotationConfigFields []AnnotationConfigField) error + func (c *Configuration) PopulateExplicitAckMode(logger logger.Logger, explicitAckModeValue string, ...) error + func (c *Configuration) ResolveWorkerAllocationMode(modeFromAttributes, modeFromAnnotation partitionworker.AllocationMode) partitionworker.AllocationMode + type Creator interface + Create func(logger.Logger, string, *functionconfig.Trigger, *runtime.Configuration, ...) (Trigger, error) + type DurationConfigField struct + Default time.Duration + Field *time.Duration + Name string + Value string + type Factory struct + func (f *Factory) GetWorkerAllocator(workerAllocatorName string, namedWorkerAllocators *worker.AllocatorSyncMap, ...) (worker.Allocator, error) + type Registry struct + func (r *Registry) NewTrigger(logger logger.Logger, kind string, name string, ...) (Trigger, error) + type Secret struct + Contents string + type Statistics struct + EventsHandledFailureTotal uint64 + EventsHandledSuccessTotal uint64 + WorkerAllocatorStatistics worker.AllocatorStatistics + func (s *Statistics) DiffFrom(prev *Statistics) Statistics + type Trigger interface + GetClass func() string + GetConfig func() map[string]interface{} + GetFunctionName func() string + GetID func() string + GetKind func() string + GetName func() string + GetNamespace func() string + GetProjectName func() string + GetStatistics func() *Statistics + GetWorkers func() []*worker.Worker + Initialize func() error + PostBatchHooks func(batch []nuclio.Event, workerInstance *worker.Worker) + PreBatchHooks func(batch []nuclio.Event, workerInstance *worker.Worker) + SignalWorkersToContinue func() error + SignalWorkersToDrain func() error + SignalWorkersToTerminate func() error + Start func(checkpoint functionconfig.Checkpoint) error + Stop func(force bool) (functionconfig.Checkpoint, error) + TimeoutWorker func(worker *worker.Worker) error