scheduler

package
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package scheduler provides intelligent deployment scheduling for the control plane.

Package scheduler provides intelligent deployment scheduling for the control plane.

Package scheduler provides intelligent deployment scheduling for the control plane.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrHealthCheckFailed    = errors.New("health check failed for new container")
	ErrRoutingUpdateFailed  = errors.New("failed to update routing to new container")
	ErrNoArtifact           = errors.New("deployment has no artifact")
	ErrNoPreviousDeployment = errors.New("no previous successful deployment found for rollback")
	ErrDeploymentNotFound   = errors.New("deployment not found")
)

Common errors for zero-downtime deployment.

View Source
var (
	ErrNoHealthyNodes         = errors.New("no healthy nodes available")
	ErrInsufficientResources  = errors.New("no nodes with sufficient resources")
	ErrDependenciesNotRunning = errors.New("service dependencies are not running")
	ErrDeploymentQueued       = errors.New("deployment queued waiting for available nodes")
	ErrDeploymentTimeout      = errors.New("deployment timed out waiting for scheduling")
)

Common errors returned by the scheduler.

Functions

func BuildDeployCommand

func BuildDeployCommand(deployment *models.Deployment) *pb.DeploymentCommand

BuildDeployCommand is exported for testing purposes. It creates a DeploymentCommand from a Deployment model.

func CalculateCapacityScore

func CalculateCapacityScore(node *models.Node) float64

CalculateCapacityScore calculates a capacity score for a node. Higher scores indicate more available capacity. The score is a weighted combination of CPU and memory availability.

func CheckDependenciesRunning

func CheckDependenciesRunning(deployments []*models.Deployment, dependsOn []string) bool

CheckDependenciesRunning is a pure function that checks if all dependencies are satisfied given a list of deployments and the required dependencies. This is useful for testing without needing a store.

func CheckDeploymentTimeout

func CheckDeploymentTimeout(deployment *models.Deployment, timeout time.Duration) bool

CheckDeploymentTimeout checks if a deployment has exceeded the timeout. Returns true if the deployment should be marked as failed. **Validates: Requirements 16.3**

func CreateRollbackDeployment

func CreateRollbackDeployment(source *models.Deployment, newVersion int, newID string) *models.Deployment

CreateRollbackDeployment is a pure function that creates a rollback deployment from a source deployment. This is useful for testing. **Validates: Requirements 10.5, 20.3, 20.4**

func HasSufficientResources

func HasSufficientResources(node *models.Node, spec *models.ResourceSpec) bool

HasSufficientResources checks if a node has enough resources for a given spec.

func IsRetryableError

func IsRetryableError(err error) bool

IsRetryableError is exported for testing purposes.

func IsStale

func IsStale(lastHeartbeat time.Time, threshold time.Duration) bool

IsStale checks if a node's heartbeat is older than the given threshold.

func MarkUnhealthyIfStale

func MarkUnhealthyIfStale(ctx context.Context, nodeStore store.NodeStore, nodeID string, lastHeartbeat time.Time, threshold time.Duration) (bool, error)

MarkUnhealthyIfStale marks a node as unhealthy if its heartbeat is stale. Returns true if the node was marked unhealthy.

func NodeHasClosure

func NodeHasClosure(node *models.Node, storePath string) bool

NodeHasClosure checks if a node has a specific store path cached.

func SelectBestNode

func SelectBestNode(nodes []*models.Node, deployment *models.Deployment) *models.Node

SelectBestNode selects the best node from a list based on deployment requirements. This is a convenience function that combines all placement strategies.

func ValidateRollbackDeployment

func ValidateRollbackDeployment(source, rollback *models.Deployment) bool

ValidateRollbackDeployment validates that a rollback deployment was created correctly. Returns true if the rollback deployment has the correct properties. **Validates: Requirements 10.5, 20.3, 20.4**

func WaitForHealthyWithConfig

func WaitForHealthyWithConfig(
	ctx context.Context,
	checkFunc func() (bool, error),
	timeout time.Duration,
	interval time.Duration,
) (bool, int, error)

WaitForHealthyWithConfig is a pure function for testing health check waiting logic. It returns true if the container becomes healthy within the given parameters.

Types

type AgentClient

type AgentClient interface {
	Deploy(ctx context.Context, nodeID string, deployment *models.Deployment) error
	Stop(ctx context.Context, nodeID string, deploymentID string) error
}

AgentClient defines the interface for communicating with node agents.

type CaddyRoutingUpdater

type CaddyRoutingUpdater struct {
	// contains filtered or unexported fields
}

CaddyRoutingUpdater implements RoutingUpdater for Caddy reverse proxy. It updates Caddy's configuration to route traffic to the new container. **Validates: Requirements 10.2, 10.3**

func NewCaddyRoutingUpdater

func NewCaddyRoutingUpdater(caddyAPIURL string, logger *slog.Logger) *CaddyRoutingUpdater

NewCaddyRoutingUpdater creates a new CaddyRoutingUpdater.

func (*CaddyRoutingUpdater) UpdateRouting

func (c *CaddyRoutingUpdater) UpdateRouting(ctx context.Context, serviceName, newContainerName string, port int) (string, error)

UpdateRouting updates Caddy configuration to route to the new container. Returns the old container name that was previously configured. **Validates: Requirements 10.2, 10.3**

type CommandSender

type CommandSender interface {
	SendCommand(ctx context.Context, nodeID string, cmd *pb.DeploymentCommand) error
}

CommandSender defines the interface for sending commands to nodes. This is implemented by NodeManager.

type ContainerManager

type ContainerManager interface {
	// StartContainer starts a new container with the given name and deployment config.
	StartContainer(ctx context.Context, nodeID, containerName string, deployment *models.Deployment) error
	// StopContainer stops a container by name.
	StopContainer(ctx context.Context, nodeID, containerName string) error
	// CheckHealth checks if a container is healthy.
	CheckHealth(ctx context.Context, nodeID, containerName string, healthCheck *models.HealthCheckConfig) (bool, error)
}

ContainerManager defines the interface for container lifecycle operations.

type EnvMergerInterface

type EnvMergerInterface interface {
	MergeForDeployment(ctx context.Context, appID, serviceName string, serviceEnvVars map[string]string) (map[string]string, error)
}

EnvMergerInterface defines the interface for merging environment variables. **Validates: Requirements 6.1, 6.2, 6.3**

type GRPCAgentClient

type GRPCAgentClient struct {
	// contains filtered or unexported fields
}

GRPCAgentClient implements the AgentClient interface using gRPC via NodeManager. It sends deployment commands through the persistent WatchCommands stream. Requirements: 3.1, 11.1, 11.2, 11.3

func NewGRPCAgentClient

func NewGRPCAgentClient(sender CommandSender, cfg *GRPCAgentClientConfig) *GRPCAgentClient

NewGRPCAgentClient creates a new GRPCAgentClient.

func (*GRPCAgentClient) Deploy

func (c *GRPCAgentClient) Deploy(ctx context.Context, nodeID string, deployment *models.Deployment) error

Deploy sends a deployment command to the specified node via gRPC. Requirements: 3.1, 3.2, 11.1, 11.2, 11.3

func (*GRPCAgentClient) Stop

func (c *GRPCAgentClient) Stop(ctx context.Context, nodeID string, deploymentID string) error

Stop sends a stop command to the specified node via gRPC. Requirements: 3.3

type GRPCAgentClientConfig

type GRPCAgentClientConfig struct {
	// CommandTimeout is the deadline for command acknowledgment (default: 10s)
	CommandTimeout time.Duration
	// MaxRetries is the maximum number of retry attempts (default: 3)
	MaxRetries int
}

GRPCAgentClientConfig holds configuration for the GRPCAgentClient.

func DefaultGRPCAgentClientConfig

func DefaultGRPCAgentClientConfig() *GRPCAgentClientConfig

DefaultGRPCAgentClientConfig returns default configuration values.

type HealthMonitor

type HealthMonitor struct {
	// contains filtered or unexported fields
}

HealthMonitor periodically checks node health and triggers rescheduling for unhealthy nodes. It also watches for nodes becoming healthy and processes pending deployments. **Validates: Requirements 16.2**

func NewHealthMonitor

func NewHealthMonitor(s store.Store, scheduler *Scheduler, healthThreshold, checkInterval time.Duration, logger *slog.Logger) *HealthMonitor

NewHealthMonitor creates a new HealthMonitor instance.

func NewHealthMonitorWithTimeout

func NewHealthMonitorWithTimeout(s store.Store, scheduler *Scheduler, healthThreshold, checkInterval, deploymentTimeout time.Duration, logger *slog.Logger) *HealthMonitor

NewHealthMonitorWithTimeout creates a new HealthMonitor instance with a custom deployment timeout. **Validates: Requirements 16.3**

func (*HealthMonitor) CheckNodeHealth

func (h *HealthMonitor) CheckNodeHealth(node *NodeHealthInfo) bool

CheckNodeHealth checks if a specific node should be marked as unhealthy. Returns true if the node is stale (heartbeat older than threshold).

func (*HealthMonitor) Start

func (h *HealthMonitor) Start(ctx context.Context) error

Start begins the periodic health check loop.

func (*HealthMonitor) Stop

func (h *HealthMonitor) Stop()

Stop stops the health monitor.

type NodeHealthInfo

type NodeHealthInfo struct {
	ID            string
	Healthy       bool
	LastHeartbeat time.Time
}

NodeHealthInfo contains the information needed to check node health.

type ResourceAvailability

type ResourceAvailability struct {
	CPUAvailable    float64
	MemoryAvailable int64
}

ResourceAvailability tracks the total available resources across all healthy nodes. **Validates: Requirements 16.4**

func CalculateTotalResources

func CalculateTotalResources(nodes []*models.Node, healthThreshold time.Duration) ResourceAvailability

CalculateTotalResources calculates the total available resources across all healthy nodes. **Validates: Requirements 16.4**

func (*ResourceAvailability) HasResourcesForSpec

func (r *ResourceAvailability) HasResourcesForSpec(spec *models.ResourceSpec) bool

HasResourcesForSpec checks if the total available resources can accommodate a deployment. **Validates: Requirements 16.4**

type ResourceRequirements

type ResourceRequirements struct {
	CPU    float64 // CPU cores required
	Memory int64   // Memory in bytes required
}

ResourceRequirements defines the CPU and memory requirements.

func GetResourceRequirements

func GetResourceRequirements(spec *models.ResourceSpec) ResourceRequirements

GetResourceRequirements returns the resource requirements from a ResourceSpec. If spec is nil, returns default requirements (0.5 CPU, 512MB).

type RollbackResult

type RollbackResult struct {
	NewDeployment    *models.Deployment `json:"new_deployment"`
	SourceDeployment *models.Deployment `json:"source_deployment"`
	Success          bool               `json:"success"`
	Message          string             `json:"message,omitempty"`
}

RollbackResult represents the result of a rollback operation.

type RoutingConfig

type RoutingConfig struct {
	ServiceName   string `json:"service_name"`
	ContainerName string `json:"container_name"`
	Port          int    `json:"port"`
	Domain        string `json:"domain,omitempty"`
}

RoutingConfig represents the routing configuration for a service.

func BuildRoutingConfig

func BuildRoutingConfig(deployment *models.Deployment, appName string) *RoutingConfig

BuildRoutingConfig creates a RoutingConfig from a deployment.

type RoutingUpdater

type RoutingUpdater interface {
	// UpdateRouting updates the routing configuration to point to the new container.
	// It returns the old container name that was previously routed to.
	UpdateRouting(ctx context.Context, serviceName, newContainerName string, port int) (oldContainerName string, err error)
}

RoutingUpdater defines the interface for updating routing configuration. This is typically implemented by a Caddy configuration manager.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler determines optimal node placement for deployments.

func NewScheduler

func NewScheduler(s store.Store, agentClient AgentClient, cfg *config.SchedulerConfig, logger *slog.Logger) *Scheduler

NewScheduler creates a new Scheduler instance.

func (*Scheduler) AreDependenciesRunning

func (s *Scheduler) AreDependenciesRunning(ctx context.Context, deployment *models.Deployment) (bool, error)

AreDependenciesRunning checks if all service dependencies for a deployment are running. It looks for deployments of the same app with the dependent service names that are in running state.

func (*Scheduler) GetHealthThreshold

func (s *Scheduler) GetHealthThreshold() time.Duration

GetHealthThreshold returns the configured health threshold duration.

func (*Scheduler) IsNodeHealthy

func (s *Scheduler) IsNodeHealthy(node *models.Node) bool

IsNodeHealthy checks if a node is considered healthy based on its heartbeat.

func (*Scheduler) Reschedule

func (s *Scheduler) Reschedule(ctx context.Context, nodeID string) error

Reschedule moves all deployments from a node to other healthy nodes.

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(ctx context.Context, deployment *models.Deployment) (*models.Node, error)

Schedule assigns a deployment to an appropriate node. It filters nodes by health, resources, and cache locality, then selects the best candidate.

func (*Scheduler) ScheduleAndAssign

func (s *Scheduler) ScheduleAndAssign(ctx context.Context, deployment *models.Deployment) error

ScheduleAndAssign schedules a deployment and updates the deployment record with the placement. If no healthy nodes are available, the deployment remains in "built" status (queued). **Validates: Requirements 16.1, 6.2**

func (*Scheduler) SetEnvMerger

func (s *Scheduler) SetEnvMerger(envMerger EnvMergerInterface)

SetEnvMerger sets the environment merger for the scheduler. This allows merging app-level secrets with service-level env vars before deployment. **Validates: Requirements 6.1, 6.2, 6.3**

type ZeroDowntimeDeployer

type ZeroDowntimeDeployer struct {
	// contains filtered or unexported fields
}

ZeroDowntimeDeployer handles zero-downtime deployments using blue-green pattern. **Validates: Requirements 10.1, 10.2, 10.3, 10.4**

func NewZeroDowntimeDeployer

func NewZeroDowntimeDeployer(
	s store.Store,
	containerManager ContainerManager,
	routingUpdater RoutingUpdater,
	cfg *ZeroDowntimeDeployerConfig,
	logger *slog.Logger,
) *ZeroDowntimeDeployer

NewZeroDowntimeDeployer creates a new ZeroDowntimeDeployer.

func (*ZeroDowntimeDeployer) DeployWithZeroDowntime

func (d *ZeroDowntimeDeployer) DeployWithZeroDowntime(ctx context.Context, deployment *models.Deployment, appName string) error

DeployWithZeroDowntime performs a blue-green style deployment. It starts the new container, waits for health checks, updates routing, and then stops the old container. **Validates: Requirements 10.1, 10.2, 10.3, 10.4**

func (*ZeroDowntimeDeployer) Rollback

func (d *ZeroDowntimeDeployer) Rollback(ctx context.Context, appID, serviceName string, targetDeploymentID string) (*models.Deployment, error)

Rollback creates a new deployment using a previous successful deployment's artifact. It finds the specified deployment, creates a new deployment with the same artifact, and returns the new deployment. **Validates: Requirements 10.5, 20.3, 20.4**

func (*ZeroDowntimeDeployer) RollbackToLatestSuccessful

func (d *ZeroDowntimeDeployer) RollbackToLatestSuccessful(ctx context.Context, appID, serviceName string) (*models.Deployment, error)

RollbackToLatestSuccessful creates a new deployment using the latest successful deployment's artifact. **Validates: Requirements 10.5**

type ZeroDowntimeDeployerConfig

type ZeroDowntimeDeployerConfig struct {
	// HealthTimeout is the maximum time to wait for health checks to pass.
	HealthTimeout time.Duration
	// HealthInterval is the interval between health check attempts.
	HealthInterval time.Duration
}

ZeroDowntimeDeployerConfig holds configuration for the ZeroDowntimeDeployer.

func DefaultZeroDowntimeDeployerConfig

func DefaultZeroDowntimeDeployerConfig() *ZeroDowntimeDeployerConfig

DefaultZeroDowntimeDeployerConfig returns default configuration values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL