periodic

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultChannelBufferSize = 10
)
View Source
const (
	DefaultConsumerCount = 5
)
View Source
const (
	DefaultOrgSyncInterval = 5 * time.Minute
)
View Source
const QueueMaintenanceInterval = 2 * tasks.EventProcessingTimeout

QueueMaintenanceInterval is the interval for queue maintenance tasks It's set to 2x EventProcessingTimeout to ensure timely processing of timed out messages

Variables

This section is empty.

Functions

func InitializeTaskExecutors added in v0.10.0

func InitializeTaskExecutors(log logrus.FieldLogger, serviceHandler service.Service, cfg *config.Config, queuesProvider queues.Provider, workerClient worker_client.WorkerClient, workerMetrics *worker.WorkerCollector) map[PeriodicTaskType]PeriodicTaskExecutor

func MergeTasksWithConfig added in v1.1.0

func MergeTasksWithConfig(cfg *config.Config) map[PeriodicTaskType]PeriodicTaskMetadata

MergeTasksWithConfig merges configured task intervals with defaults. Configured intervals override defaults; unconfigured tasks use defaults.

Types

type ChannelManager added in v0.10.0

type ChannelManager struct {
	// contains filtered or unexported fields
}

ChannelManager manages a buffered channel for periodic task scheduling

func NewChannelManager added in v0.10.0

func NewChannelManager(config ChannelManagerConfig) (*ChannelManager, error)

func (*ChannelManager) Close added in v0.10.0

func (cm *ChannelManager) Close()

func (*ChannelManager) PublishTask added in v0.10.0

func (cm *ChannelManager) PublishTask(ctx context.Context, taskRef PeriodicTaskReference) error

func (*ChannelManager) Tasks added in v0.10.0

func (cm *ChannelManager) Tasks() <-chan PeriodicTaskReference

type ChannelManagerConfig added in v0.10.0

type ChannelManagerConfig struct {
	Log               logrus.FieldLogger
	ChannelBufferSize int
}

type DeviceConnectionExecutor added in v1.1.0

type DeviceConnectionExecutor struct {
	// contains filtered or unexported fields
}

func (*DeviceConnectionExecutor) Execute added in v1.1.0

func (e *DeviceConnectionExecutor) Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)

type DisruptionBudgetExecutor added in v0.10.0

type DisruptionBudgetExecutor struct {
	// contains filtered or unexported fields
}

func (*DisruptionBudgetExecutor) Execute added in v0.10.0

func (e *DisruptionBudgetExecutor) Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)

type EventCleanupExecutor added in v0.10.0

type EventCleanupExecutor struct {
	// contains filtered or unexported fields
}

func (*EventCleanupExecutor) Execute added in v0.10.0

func (e *EventCleanupExecutor) Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)

type OrganizationService added in v0.10.0

type OrganizationService interface {
	ListOrganizations(ctx context.Context, params domain.ListOrganizationsParams) (*domain.OrganizationList, domain.Status)
}

type PeriodicTaskConsumer added in v0.10.0

type PeriodicTaskConsumer struct {
	// contains filtered or unexported fields
}

func NewPeriodicTaskConsumer added in v0.10.0

func NewPeriodicTaskConsumer(config PeriodicTaskConsumerConfig) (*PeriodicTaskConsumer, error)

func (*PeriodicTaskConsumer) Run added in v0.10.0

func (c *PeriodicTaskConsumer) Run(ctx context.Context)

Run spins up the consumer goroutines. It blocks until the context is done.

type PeriodicTaskConsumerConfig added in v0.10.0

type PeriodicTaskConsumerConfig struct {
	ChannelManager *ChannelManager
	Log            logrus.FieldLogger
	Executors      map[PeriodicTaskType]PeriodicTaskExecutor
	ConsumerCount  int
}

type PeriodicTaskExecutor added in v0.10.0

type PeriodicTaskExecutor interface {
	Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)
}

type PeriodicTaskMetadata added in v0.10.0

type PeriodicTaskMetadata struct {
	Interval   time.Duration
	SystemWide bool // If true, task runs once for the entire system, not per organization
}

type PeriodicTaskPublisher added in v0.10.0

type PeriodicTaskPublisher struct {
	// contains filtered or unexported fields
}

func NewPeriodicTaskPublisher added in v0.10.0

func NewPeriodicTaskPublisher(publisherConfig PeriodicTaskPublisherConfig) (*PeriodicTaskPublisher, error)

func (*PeriodicTaskPublisher) Run added in v0.10.0

Run spins up the organization sync and the scheduling loop goroutines. It blocks until the context is done.

type PeriodicTaskPublisherConfig added in v0.10.0

type PeriodicTaskPublisherConfig struct {
	Log             logrus.FieldLogger
	OrgService      OrganizationService
	TasksMetadata   map[PeriodicTaskType]PeriodicTaskMetadata
	ChannelManager  TaskChannelManager
	WorkerClient    worker_client.WorkerClient
	OrgSyncInterval time.Duration
	TaskBackoff     *poll.Config
}

type PeriodicTaskReference added in v0.10.0

type PeriodicTaskReference struct {
	Type  PeriodicTaskType
	OrgID uuid.UUID
}

type PeriodicTaskType added in v0.10.0

type PeriodicTaskType string
const (
	PeriodicTaskTypeRepositoryTester       PeriodicTaskType = "repository-tester"
	PeriodicTaskTypeResourceSync           PeriodicTaskType = "resource-sync"
	PeriodicTaskTypeDeviceConnection       PeriodicTaskType = "device-connection"
	PeriodicTaskTypeRolloutDeviceSelection PeriodicTaskType = "rollout-device-selection"
	PeriodicTaskTypeDisruptionBudget       PeriodicTaskType = "disruption-budget"
	PeriodicTaskTypeEventCleanup           PeriodicTaskType = "event-cleanup"
	PeriodicTaskTypeQueueMaintenance       PeriodicTaskType = "queue-maintenance"
)

type QueueMaintenanceExecutor added in v0.10.0

type QueueMaintenanceExecutor struct {
	// contains filtered or unexported fields
}

func (*QueueMaintenanceExecutor) Execute added in v0.10.0

func (e *QueueMaintenanceExecutor) Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)

type RepositoryTesterExecutor added in v0.10.0

type RepositoryTesterExecutor struct {
	// contains filtered or unexported fields
}

func (*RepositoryTesterExecutor) Execute added in v0.10.0

func (e *RepositoryTesterExecutor) Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)

type ResourceSyncExecutor added in v0.10.0

type ResourceSyncExecutor struct {
	// contains filtered or unexported fields
}

func (*ResourceSyncExecutor) Execute added in v0.10.0

func (e *ResourceSyncExecutor) Execute(ctx context.Context, log logrus.FieldLogger, orgId uuid.UUID)

type RolloutDeviceSelectionExecutor added in v0.10.0

type RolloutDeviceSelectionExecutor struct {
	// contains filtered or unexported fields
}

func (*RolloutDeviceSelectionExecutor) Execute added in v0.10.0

type ScheduledTask added in v0.10.0

type ScheduledTask struct {
	NextRun  time.Time
	OrgID    uuid.UUID
	TaskType PeriodicTaskType
	Interval time.Duration
	Retries  int
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func New

func New(
	cfg *config.Config,
	log logrus.FieldLogger,
	store store.Store,
) *Server

New returns a new instance of a flightctl server.

func (*Server) Run

func (s *Server) Run(ctx context.Context) error

TODO: expose metrics

type TaskChannelManager added in v0.10.0

type TaskChannelManager interface {
	PublishTask(ctx context.Context, taskRef PeriodicTaskReference) error
}

type TaskHeap added in v0.10.0

type TaskHeap []*ScheduledTask

func NewTaskHeap added in v0.10.0

func NewTaskHeap() *TaskHeap

func (TaskHeap) Len added in v0.10.0

func (h TaskHeap) Len() int

func (TaskHeap) Less added in v0.10.0

func (h TaskHeap) Less(i, j int) bool

func (*TaskHeap) Peek added in v0.10.0

func (h *TaskHeap) Peek() *ScheduledTask

func (*TaskHeap) Pop added in v0.10.0

func (h *TaskHeap) Pop() interface{}

func (*TaskHeap) Push added in v0.10.0

func (h *TaskHeap) Push(x interface{})

func (TaskHeap) Swap added in v0.10.0

func (h TaskHeap) Swap(i, j int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL