scheduler

package
v0.1.147 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type PoolScaler

type PoolScaler struct {
	// contains filtered or unexported fields
}

PoolScaler monitors queue depth and scales a K8s Deployment

func NewPoolScaler

func NewPoolScaler(ctx context.Context, config PoolScalerConfig, taskQueue repository.TaskQueue) (*PoolScaler, error)

NewPoolScaler creates a new pool scaler

func (*PoolScaler) EnsureDeployment

func (s *PoolScaler) EnsureDeployment() error

EnsureDeployment checks if the worker deployment exists and creates/updates it. Skips the update when the config hash is unchanged (common case). Retries on conflict since the scaler may update replicas between our GET and UPDATE.

func (*PoolScaler) GetStatus

func (s *PoolScaler) GetStatus(ctx context.Context) (*types.PoolScalerStatus, error)

GetStatus returns the current scaling status

func (*PoolScaler) Start

func (s *PoolScaler) Start()

Start begins the scaling loop

func (*PoolScaler) Stop

func (s *PoolScaler) Stop()

Stop stops the scaler

type PoolScalerConfig

type PoolScalerConfig struct {
	PoolName        string
	DeploymentName  string
	Namespace       string
	MinReplicas     int32
	MaxReplicas     int32
	ScaleDownDelay  time.Duration
	ScalingInterval time.Duration

	// Worker resources
	WorkerImage  string
	WorkerCpu    string
	WorkerMemory string

	// Gateway connection
	GatewayServiceName        string
	GatewayPort               int
	GatewayExternalGRPCAddr   string
	UseGatewayServiceHostname bool

	// Pod-level worker settings
	WorkerServiceAccountName string
	WorkerHostNetwork        bool
	WorkerImagePullSecrets   []string
	RuntimeClassName         string
	NodeSelector             map[string]string

	// Worker authentication token
	WorkerToken string

	// Full app config - passed to workers as CONFIG_JSON
	AppConfig types.AppConfig
}

PoolScalerConfig defines the configuration for a pool scaler

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler manages worker registration, heartbeats, and pool scaling

func NewScheduler

func NewScheduler(ctx context.Context, config types.AppConfig, redisClient *common.RedisClient, backendRepo repository.BackendRepository) (*Scheduler, error)

NewScheduler creates a new Scheduler instance

func (*Scheduler) DeregisterWorker

func (s *Scheduler) DeregisterWorker(ctx context.Context, workerId string) error

DeregisterWorker removes a worker from the scheduler

func (*Scheduler) GetAvailableWorkers

func (s *Scheduler) GetAvailableWorkers(ctx context.Context) ([]*types.Worker, error)

GetAvailableWorkers returns workers that are available

func (*Scheduler) GetPool

func (s *Scheduler) GetPool(poolName string) (*WorkerPoolController, error)

GetPool returns a pool controller by name

func (*Scheduler) GetPools

func (s *Scheduler) GetPools() map[string]*WorkerPoolController

GetPools returns all pool controllers

func (*Scheduler) GetTaskQueue

func (s *Scheduler) GetTaskQueue(poolName string) repository.TaskQueue

GetTaskQueue returns the task queue for a specific pool

func (*Scheduler) GetWorker

func (s *Scheduler) GetWorker(ctx context.Context, workerId string) (*types.Worker, error)

GetWorker returns a specific worker

func (*Scheduler) GetWorkers

func (s *Scheduler) GetWorkers(ctx context.Context) ([]*types.Worker, error)

GetWorkers returns all registered workers

func (*Scheduler) RegisterWorker

func (s *Scheduler) RegisterWorker(ctx context.Context, worker *types.Worker) error

RegisterWorker registers a new worker with the scheduler

func (*Scheduler) Start

func (s *Scheduler) Start() error

Start begins the scheduler's background processes

func (*Scheduler) Stop

func (s *Scheduler) Stop() error

Stop gracefully stops the scheduler

func (*Scheduler) TaskQueue

func (s *Scheduler) TaskQueue() repository.TaskQueue

TaskQueue returns the task queue (for submitting tasks)

func (*Scheduler) UpdateWorkerStatus

func (s *Scheduler) UpdateWorkerStatus(ctx context.Context, workerId string, status types.WorkerStatus) error

UpdateWorkerStatus updates a worker's status

func (*Scheduler) WorkerHeartbeat

func (s *Scheduler) WorkerHeartbeat(ctx context.Context, workerId string) error

WorkerHeartbeat updates a worker's last seen timestamp

func (*Scheduler) WorkerRepo

func (s *Scheduler) WorkerRepo() repository.WorkerRepository

WorkerRepo returns the worker repository (for service layer access)

type WorkerPoolController

type WorkerPoolController struct {
	// contains filtered or unexported fields
}

WorkerPoolController manages a pool of workers In the queue-based model, this is primarily for visibility/monitoring Actual scaling is handled by PoolScaler based on queue depth

func NewWorkerPoolController

func NewWorkerPoolController(
	ctx context.Context,
	name string,
	poolConfig types.WorkerPoolConfig,
	appConfig types.AppConfig,
	workerRepo repository.WorkerRepository,
) *WorkerPoolController

NewWorkerPoolController creates a new worker pool controller

func (*WorkerPoolController) Config

Config returns the pool configuration

func (*WorkerPoolController) Context

func (p *WorkerPoolController) Context() context.Context

Context returns the pool's context

func (*WorkerPoolController) GetWorkers

func (p *WorkerPoolController) GetWorkers() ([]*types.Worker, error)

GetWorkers returns all workers in this pool

func (*WorkerPoolController) Name

func (p *WorkerPoolController) Name() string

Name returns the pool name

func (*WorkerPoolController) OnWorkerDeregistered

func (p *WorkerPoolController) OnWorkerDeregistered(workerId string)

OnWorkerDeregistered is called when a worker deregisters

func (*WorkerPoolController) OnWorkerRegistered

func (p *WorkerPoolController) OnWorkerRegistered(workerId string)

OnWorkerRegistered is called when a worker registers with the gateway

func (*WorkerPoolController) State

State returns the current state of the pool

func (*WorkerPoolController) Stop

func (p *WorkerPoolController) Stop()

Stop stops the pool controller

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL