Versions in this module Expand all Collapse all v0 v0.9.0 Sep 7, 2023 v0.8.0 Apr 17, 2023 Changes in this version + var ErrTriggerWorkerNotFound = fmt.Errorf("trigger worker not found") + type Config struct + CheckInterval time.Duration + DisconnectCleanTime time.Duration + HeartbeatTimeout time.Duration + LostHeartbeatTime time.Duration + StartSubscriptionDuration time.Duration + StartWorkerDuration time.Duration + WaitRunningTimeout time.Duration + type Manager interface + AddTriggerWorker func(ctx context.Context, addr string) error + GetActiveRunningTriggerWorker func() []metadata.TriggerWorkerInfo + GetTriggerWorker func(addr string) TriggerWorker + Init func(ctx context.Context) error + RemoveTriggerWorker func(ctx context.Context, addr string) + Start func() + Stop func() + UpdateTriggerWorkerInfo func(ctx context.Context, addr string) error + func NewTriggerWorkerManager(config Config, storage storage.TriggerWorkerStorage, ...) Manager + type MockManager struct + func NewMockManager(ctrl *gomock.Controller) *MockManager + func (m *MockManager) AddTriggerWorker(ctx context.Context, addr string) error + func (m *MockManager) EXPECT() *MockManagerMockRecorder + func (m *MockManager) GetActiveRunningTriggerWorker() []metadata.TriggerWorkerInfo + func (m *MockManager) GetTriggerWorker(addr string) TriggerWorker + func (m *MockManager) Init(ctx context.Context) error + func (m *MockManager) RemoveTriggerWorker(ctx context.Context, addr string) + func (m *MockManager) Start() + func (m *MockManager) Stop() + func (m *MockManager) UpdateTriggerWorkerInfo(ctx context.Context, addr string) error + type MockManagerMockRecorder struct + func (mr *MockManagerMockRecorder) AddTriggerWorker(ctx, addr interface{}) *gomock.Call + func (mr *MockManagerMockRecorder) GetActiveRunningTriggerWorker() *gomock.Call + func (mr *MockManagerMockRecorder) GetTriggerWorker(addr interface{}) *gomock.Call + func (mr *MockManagerMockRecorder) Init(ctx interface{}) *gomock.Call + func (mr *MockManagerMockRecorder) RemoveTriggerWorker(ctx, addr interface{}) *gomock.Call + func (mr *MockManagerMockRecorder) Start() *gomock.Call + func (mr *MockManagerMockRecorder) Stop() *gomock.Call + func (mr *MockManagerMockRecorder) UpdateTriggerWorkerInfo(ctx, addr interface{}) *gomock.Call + type MockTriggerWorker struct + func NewMockTriggerWorker(ctrl *gomock.Controller) *MockTriggerWorker + func (m *MockTriggerWorker) AssignSubscription(id vanus.ID) + func (m *MockTriggerWorker) Close() error + func (m *MockTriggerWorker) EXPECT() *MockTriggerWorkerMockRecorder + func (m *MockTriggerWorker) GetAddr() string + func (m *MockTriggerWorker) GetAssignedSubscriptions() []vanus.ID + func (m *MockTriggerWorker) GetHeartbeatTime() time.Time + func (m *MockTriggerWorker) GetInfo() metadata.TriggerWorkerInfo + func (m *MockTriggerWorker) GetPendingTime() time.Time + func (m *MockTriggerWorker) GetPhase() metadata.TriggerWorkerPhase + func (m *MockTriggerWorker) IsActive() bool + func (m *MockTriggerWorker) Polish() + func (m *MockTriggerWorker) RemoteStart(ctx context.Context) error + func (m *MockTriggerWorker) RemoteStop(ctx context.Context) error + func (m *MockTriggerWorker) Reset() + func (m *MockTriggerWorker) SetPhase(arg0 metadata.TriggerWorkerPhase) + func (m *MockTriggerWorker) Start(ctx context.Context) error + func (m *MockTriggerWorker) UnAssignSubscription(id vanus.ID) error + type MockTriggerWorkerMockRecorder struct + func (mr *MockTriggerWorkerMockRecorder) AssignSubscription(id interface{}) *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) Close() *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) GetAddr() *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) GetAssignedSubscriptions() *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) GetHeartbeatTime() *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) GetInfo() *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) GetPendingTime() *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) GetPhase() *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) IsActive() *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) Polish() *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) RemoteStart(ctx interface{}) *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) RemoteStop(ctx interface{}) *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) Reset() *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) SetPhase(arg0 interface{}) *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) Start(ctx interface{}) *gomock.Call + func (mr *MockTriggerWorkerMockRecorder) UnAssignSubscription(id interface{}) *gomock.Call + type OnTriggerWorkerRemoveSubscription func(ctx context.Context, subId vanus.ID, addr string) error + type RandomPolicy struct + func (r *RandomPolicy) Acquire(_ context.Context, workers []metadata.TriggerWorkerInfo) metadata.TriggerWorkerInfo + type RoundRobinPolicy struct + func (rr *RoundRobinPolicy) Acquire(_ context.Context, workers []metadata.TriggerWorkerInfo) metadata.TriggerWorkerInfo + type SubscriptionScheduler struct + func NewSubscriptionScheduler(workerManager Manager, subscriptionManager subscription.Manager) *SubscriptionScheduler + func (s *SubscriptionScheduler) EnqueueNormalSubscription(id vanus.ID) + func (s *SubscriptionScheduler) EnqueueSubscription(id vanus.ID) + func (s *SubscriptionScheduler) Run() + func (s *SubscriptionScheduler) Stop() + type TriggerWorker interface + AssignSubscription func(id vanus.ID) + Close func() error + GetAddr func() string + GetAssignedSubscriptions func() []vanus.ID + GetHeartbeatTime func() time.Time + GetInfo func() metadata.TriggerWorkerInfo + GetPendingTime func() time.Time + GetPhase func() metadata.TriggerWorkerPhase + IsActive func() bool + Polish func() + RemoteStart func(ctx context.Context) error + RemoteStop func(ctx context.Context) error + Reset func() + SetPhase func(metadata.TriggerWorkerPhase) + Start func(ctx context.Context) error + UnAssignSubscription func(id vanus.ID) error + func NewTriggerWorker(twInfo *metadata.TriggerWorkerInfo, subscriptionManager subscription.Manager) TriggerWorker + func NewTriggerWorkerByAddr(addr string, subscriptionManager subscription.Manager) TriggerWorker + type TriggerWorkerPolicy interface + Acquire func(context.Context, []metadata.TriggerWorkerInfo) metadata.TriggerWorkerInfo