Documentation
¶
Index ¶
- Constants
- func InitializeTaskExecutors(log logrus.FieldLogger, serviceHandler service.Service, cfg *config.Config, ...) map[PeriodicTaskType]PeriodicTaskExecutor
- func MergeTasksWithConfig(cfg *config.Config) map[PeriodicTaskType]PeriodicTaskMetadata
- type ChannelManager
- type ChannelManagerConfig
- type DeviceConnectionExecutor
- type DisruptionBudgetExecutor
- type EventCleanupExecutor
- type OrganizationService
- type PeriodicTaskConsumer
- type PeriodicTaskConsumerConfig
- type PeriodicTaskExecutor
- type PeriodicTaskMetadata
- type PeriodicTaskPublisher
- type PeriodicTaskPublisherConfig
- type PeriodicTaskReference
- type PeriodicTaskType
- type QueueMaintenanceExecutor
- type RepositoryTesterExecutor
- type ResourceSyncExecutor
- type RolloutDeviceSelectionExecutor
- type ScheduledTask
- type Server
- type TaskChannelManager
- type TaskHeap
Constants ¶
View Source
const (
DefaultChannelBufferSize = 10
)
View Source
const (
DefaultConsumerCount = 5
)
View Source
const (
DefaultOrgSyncInterval = 5 * time.Minute
)
View Source
const QueueMaintenanceInterval = 2 * tasks.EventProcessingTimeout
QueueMaintenanceInterval is the interval for queue maintenance tasks It's set to 2x EventProcessingTimeout to ensure timely processing of timed out messages
Variables ¶
This section is empty.
Functions ¶
func InitializeTaskExecutors ¶ added in v0.10.0
func InitializeTaskExecutors(log logrus.FieldLogger, serviceHandler service.Service, cfg *config.Config, queuesProvider queues.Provider, workerClient worker_client.WorkerClient, workerMetrics *worker.WorkerCollector) map[PeriodicTaskType]PeriodicTaskExecutor
func MergeTasksWithConfig ¶ added in v1.1.0
func MergeTasksWithConfig(cfg *config.Config) map[PeriodicTaskType]PeriodicTaskMetadata
MergeTasksWithConfig merges configured task intervals with defaults. Configured intervals override defaults; unconfigured tasks use defaults.
Types ¶
type ChannelManager ¶ added in v0.10.0
type ChannelManager struct {
// contains filtered or unexported fields
}
ChannelManager manages a buffered channel for periodic task scheduling
func NewChannelManager ¶ added in v0.10.0
func NewChannelManager(config ChannelManagerConfig) (*ChannelManager, error)
func (*ChannelManager) Close ¶ added in v0.10.0
func (cm *ChannelManager) Close()
func (*ChannelManager) PublishTask ¶ added in v0.10.0
func (cm *ChannelManager) PublishTask(ctx context.Context, taskRef PeriodicTaskReference) error
func (*ChannelManager) Tasks ¶ added in v0.10.0
func (cm *ChannelManager) Tasks() <-chan PeriodicTaskReference
type ChannelManagerConfig ¶ added in v0.10.0
type ChannelManagerConfig struct {
Log logrus.FieldLogger
ChannelBufferSize int
}
type DeviceConnectionExecutor ¶ added in v1.1.0
type DeviceConnectionExecutor struct {
// contains filtered or unexported fields
}
func (*DeviceConnectionExecutor) Execute ¶ added in v1.1.0
func (e *DeviceConnectionExecutor) Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)
type DisruptionBudgetExecutor ¶ added in v0.10.0
type DisruptionBudgetExecutor struct {
// contains filtered or unexported fields
}
func (*DisruptionBudgetExecutor) Execute ¶ added in v0.10.0
func (e *DisruptionBudgetExecutor) Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)
type EventCleanupExecutor ¶ added in v0.10.0
type EventCleanupExecutor struct {
// contains filtered or unexported fields
}
func (*EventCleanupExecutor) Execute ¶ added in v0.10.0
func (e *EventCleanupExecutor) Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)
type OrganizationService ¶ added in v0.10.0
type OrganizationService interface {
ListOrganizations(ctx context.Context, params domain.ListOrganizationsParams) (*domain.OrganizationList, domain.Status)
}
type PeriodicTaskConsumer ¶ added in v0.10.0
type PeriodicTaskConsumer struct {
// contains filtered or unexported fields
}
func NewPeriodicTaskConsumer ¶ added in v0.10.0
func NewPeriodicTaskConsumer(config PeriodicTaskConsumerConfig) (*PeriodicTaskConsumer, error)
func (*PeriodicTaskConsumer) Run ¶ added in v0.10.0
func (c *PeriodicTaskConsumer) Run(ctx context.Context)
Run spins up the consumer goroutines. It blocks until the context is done.
type PeriodicTaskConsumerConfig ¶ added in v0.10.0
type PeriodicTaskConsumerConfig struct {
ChannelManager *ChannelManager
Log logrus.FieldLogger
Executors map[PeriodicTaskType]PeriodicTaskExecutor
ConsumerCount int
}
type PeriodicTaskExecutor ¶ added in v0.10.0
type PeriodicTaskMetadata ¶ added in v0.10.0
type PeriodicTaskPublisher ¶ added in v0.10.0
type PeriodicTaskPublisher struct {
// contains filtered or unexported fields
}
func NewPeriodicTaskPublisher ¶ added in v0.10.0
func NewPeriodicTaskPublisher(publisherConfig PeriodicTaskPublisherConfig) (*PeriodicTaskPublisher, error)
func (*PeriodicTaskPublisher) Run ¶ added in v0.10.0
func (p *PeriodicTaskPublisher) Run(ctx context.Context)
Run spins up the organization sync and the scheduling loop goroutines. It blocks until the context is done.
type PeriodicTaskPublisherConfig ¶ added in v0.10.0
type PeriodicTaskPublisherConfig struct {
Log logrus.FieldLogger
OrgService OrganizationService
TasksMetadata map[PeriodicTaskType]PeriodicTaskMetadata
ChannelManager TaskChannelManager
WorkerClient worker_client.WorkerClient
OrgSyncInterval time.Duration
TaskBackoff *poll.Config
}
type PeriodicTaskReference ¶ added in v0.10.0
type PeriodicTaskReference struct {
Type PeriodicTaskType
OrgID uuid.UUID
}
type PeriodicTaskType ¶ added in v0.10.0
type PeriodicTaskType string
const ( PeriodicTaskTypeRepositoryTester PeriodicTaskType = "repository-tester" PeriodicTaskTypeResourceSync PeriodicTaskType = "resource-sync" PeriodicTaskTypeDeviceConnection PeriodicTaskType = "device-connection" PeriodicTaskTypeRolloutDeviceSelection PeriodicTaskType = "rollout-device-selection" PeriodicTaskTypeDisruptionBudget PeriodicTaskType = "disruption-budget" PeriodicTaskTypeEventCleanup PeriodicTaskType = "event-cleanup" PeriodicTaskTypeQueueMaintenance PeriodicTaskType = "queue-maintenance" )
type QueueMaintenanceExecutor ¶ added in v0.10.0
type QueueMaintenanceExecutor struct {
// contains filtered or unexported fields
}
func (*QueueMaintenanceExecutor) Execute ¶ added in v0.10.0
func (e *QueueMaintenanceExecutor) Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)
type RepositoryTesterExecutor ¶ added in v0.10.0
type RepositoryTesterExecutor struct {
// contains filtered or unexported fields
}
func (*RepositoryTesterExecutor) Execute ¶ added in v0.10.0
func (e *RepositoryTesterExecutor) Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)
type ResourceSyncExecutor ¶ added in v0.10.0
type ResourceSyncExecutor struct {
// contains filtered or unexported fields
}
func (*ResourceSyncExecutor) Execute ¶ added in v0.10.0
func (e *ResourceSyncExecutor) Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)
type RolloutDeviceSelectionExecutor ¶ added in v0.10.0
type RolloutDeviceSelectionExecutor struct {
// contains filtered or unexported fields
}
func (*RolloutDeviceSelectionExecutor) Execute ¶ added in v0.10.0
func (e *RolloutDeviceSelectionExecutor) Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)
type ScheduledTask ¶ added in v0.10.0
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
type TaskChannelManager ¶ added in v0.10.0
type TaskChannelManager interface {
PublishTask(ctx context.Context, taskRef PeriodicTaskReference) error
}
type TaskHeap ¶ added in v0.10.0
type TaskHeap []*ScheduledTask
func NewTaskHeap ¶ added in v0.10.0
func NewTaskHeap() *TaskHeap
func (*TaskHeap) Peek ¶ added in v0.10.0
func (h *TaskHeap) Peek() *ScheduledTask
Click to show internal directories.
Click to hide internal directories.