Documentation
¶
Index ¶
- type PoolScaler
- type PoolScalerConfig
- type Scheduler
- func (s *Scheduler) DeregisterWorker(ctx context.Context, workerId string) error
- func (s *Scheduler) GetAvailableWorkers(ctx context.Context) ([]*types.Worker, error)
- func (s *Scheduler) GetPool(poolName string) (*WorkerPoolController, error)
- func (s *Scheduler) GetPools() map[string]*WorkerPoolController
- func (s *Scheduler) GetTaskQueue(poolName string) repository.TaskQueue
- func (s *Scheduler) GetWorker(ctx context.Context, workerId string) (*types.Worker, error)
- func (s *Scheduler) GetWorkers(ctx context.Context) ([]*types.Worker, error)
- func (s *Scheduler) RegisterWorker(ctx context.Context, worker *types.Worker) error
- func (s *Scheduler) Start() error
- func (s *Scheduler) Stop() error
- func (s *Scheduler) TaskQueue() repository.TaskQueue
- func (s *Scheduler) UpdateWorkerStatus(ctx context.Context, workerId string, status types.WorkerStatus) error
- func (s *Scheduler) WorkerHeartbeat(ctx context.Context, workerId string) error
- func (s *Scheduler) WorkerRepo() repository.WorkerRepository
- type WorkerPoolController
- func (p *WorkerPoolController) Config() types.WorkerPoolConfig
- func (p *WorkerPoolController) Context() context.Context
- func (p *WorkerPoolController) GetWorkers() ([]*types.Worker, error)
- func (p *WorkerPoolController) Name() string
- func (p *WorkerPoolController) OnWorkerDeregistered(workerId string)
- func (p *WorkerPoolController) OnWorkerRegistered(workerId string)
- func (p *WorkerPoolController) State() (*types.WorkerPoolState, error)
- func (p *WorkerPoolController) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type PoolScaler ¶
type PoolScaler struct {
// contains filtered or unexported fields
}
PoolScaler monitors queue depth and scales a K8s Deployment
func NewPoolScaler ¶
func NewPoolScaler(ctx context.Context, config PoolScalerConfig, taskQueue repository.TaskQueue) (*PoolScaler, error)
NewPoolScaler creates a new pool scaler
func (*PoolScaler) EnsureDeployment ¶
func (s *PoolScaler) EnsureDeployment() error
EnsureDeployment checks if the worker deployment exists and creates/updates it. Skips the update when the config hash is unchanged (common case). Retries on conflict since the scaler may update replicas between our GET and UPDATE.
func (*PoolScaler) GetStatus ¶
func (s *PoolScaler) GetStatus(ctx context.Context) (*types.PoolScalerStatus, error)
GetStatus returns the current scaling status
type PoolScalerConfig ¶
type PoolScalerConfig struct {
PoolName string
DeploymentName string
Namespace string
MinReplicas int32
MaxReplicas int32
ScaleDownDelay time.Duration
ScalingInterval time.Duration
// Worker resources
WorkerImage string
WorkerCpu string
WorkerMemory string
// Gateway connection
GatewayServiceName string
GatewayPort int
GatewayExternalGRPCAddr string
UseGatewayServiceHostname bool
// Pod-level worker settings
WorkerServiceAccountName string
WorkerHostNetwork bool
WorkerImagePullSecrets []string
RuntimeClassName string
NodeSelector map[string]string
// Worker authentication token
WorkerToken string
// Full app config - passed to workers as CONFIG_JSON
AppConfig types.AppConfig
}
PoolScalerConfig defines the configuration for a pool scaler
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler manages worker registration, heartbeats, and pool scaling
func NewScheduler ¶
func NewScheduler(ctx context.Context, config types.AppConfig, redisClient *common.RedisClient, backendRepo repository.BackendRepository) (*Scheduler, error)
NewScheduler creates a new Scheduler instance
func (*Scheduler) DeregisterWorker ¶
DeregisterWorker removes a worker from the scheduler
func (*Scheduler) GetAvailableWorkers ¶
GetAvailableWorkers returns workers that are available
func (*Scheduler) GetPool ¶
func (s *Scheduler) GetPool(poolName string) (*WorkerPoolController, error)
GetPool returns a pool controller by name
func (*Scheduler) GetPools ¶
func (s *Scheduler) GetPools() map[string]*WorkerPoolController
GetPools returns all pool controllers
func (*Scheduler) GetTaskQueue ¶
func (s *Scheduler) GetTaskQueue(poolName string) repository.TaskQueue
GetTaskQueue returns the task queue for a specific pool
func (*Scheduler) GetWorkers ¶
GetWorkers returns all registered workers
func (*Scheduler) RegisterWorker ¶
RegisterWorker registers a new worker with the scheduler
func (*Scheduler) TaskQueue ¶
func (s *Scheduler) TaskQueue() repository.TaskQueue
TaskQueue returns the task queue (for submitting tasks)
func (*Scheduler) UpdateWorkerStatus ¶
func (s *Scheduler) UpdateWorkerStatus(ctx context.Context, workerId string, status types.WorkerStatus) error
UpdateWorkerStatus updates a worker's status
func (*Scheduler) WorkerHeartbeat ¶
WorkerHeartbeat updates a worker's last seen timestamp
func (*Scheduler) WorkerRepo ¶
func (s *Scheduler) WorkerRepo() repository.WorkerRepository
WorkerRepo returns the worker repository (for service layer access)
type WorkerPoolController ¶
type WorkerPoolController struct {
// contains filtered or unexported fields
}
WorkerPoolController manages a pool of workers In the queue-based model, this is primarily for visibility/monitoring Actual scaling is handled by PoolScaler based on queue depth
func NewWorkerPoolController ¶
func NewWorkerPoolController( ctx context.Context, name string, poolConfig types.WorkerPoolConfig, appConfig types.AppConfig, workerRepo repository.WorkerRepository, ) *WorkerPoolController
NewWorkerPoolController creates a new worker pool controller
func (*WorkerPoolController) Config ¶
func (p *WorkerPoolController) Config() types.WorkerPoolConfig
Config returns the pool configuration
func (*WorkerPoolController) Context ¶
func (p *WorkerPoolController) Context() context.Context
Context returns the pool's context
func (*WorkerPoolController) GetWorkers ¶
func (p *WorkerPoolController) GetWorkers() ([]*types.Worker, error)
GetWorkers returns all workers in this pool
func (*WorkerPoolController) Name ¶
func (p *WorkerPoolController) Name() string
Name returns the pool name
func (*WorkerPoolController) OnWorkerDeregistered ¶
func (p *WorkerPoolController) OnWorkerDeregistered(workerId string)
OnWorkerDeregistered is called when a worker deregisters
func (*WorkerPoolController) OnWorkerRegistered ¶
func (p *WorkerPoolController) OnWorkerRegistered(workerId string)
OnWorkerRegistered is called when a worker registers with the gateway
func (*WorkerPoolController) State ¶
func (p *WorkerPoolController) State() (*types.WorkerPoolState, error)
State returns the current state of the pool
func (*WorkerPoolController) Stop ¶
func (p *WorkerPoolController) Stop()
Stop stops the pool controller