Documentation
¶
Overview ¶
Package scheduler provides intelligent deployment scheduling for the control plane.
Package scheduler provides intelligent deployment scheduling for the control plane.
Package scheduler provides intelligent deployment scheduling for the control plane.
Index ¶
- Variables
- func BuildDeployCommand(deployment *models.Deployment) *pb.DeploymentCommand
- func CalculateCapacityScore(node *models.Node) float64
- func CheckDependenciesRunning(deployments []*models.Deployment, dependsOn []string) bool
- func CheckDeploymentTimeout(deployment *models.Deployment, timeout time.Duration) bool
- func CreateRollbackDeployment(source *models.Deployment, newVersion int, newID string) *models.Deployment
- func HasSufficientResources(node *models.Node, spec *models.ResourceSpec) bool
- func IsRetryableError(err error) bool
- func IsStale(lastHeartbeat time.Time, threshold time.Duration) bool
- func MarkUnhealthyIfStale(ctx context.Context, nodeStore store.NodeStore, nodeID string, ...) (bool, error)
- func NodeHasClosure(node *models.Node, storePath string) bool
- func SelectBestNode(nodes []*models.Node, deployment *models.Deployment) *models.Node
- func ValidateRollbackDeployment(source, rollback *models.Deployment) bool
- func WaitForHealthyWithConfig(ctx context.Context, checkFunc func() (bool, error), timeout time.Duration, ...) (bool, int, error)
- type AgentClient
- type CaddyRoutingUpdater
- type CommandSender
- type ContainerManager
- type EnvMergerInterface
- type GRPCAgentClient
- type GRPCAgentClientConfig
- type HealthMonitor
- type NodeHealthInfo
- type ResourceAvailability
- type ResourceRequirements
- type RollbackResult
- type RoutingConfig
- type RoutingUpdater
- type Scheduler
- func (s *Scheduler) AreDependenciesRunning(ctx context.Context, deployment *models.Deployment) (bool, error)
- func (s *Scheduler) GetHealthThreshold() time.Duration
- func (s *Scheduler) IsNodeHealthy(node *models.Node) bool
- func (s *Scheduler) Reschedule(ctx context.Context, nodeID string) error
- func (s *Scheduler) Schedule(ctx context.Context, deployment *models.Deployment) (*models.Node, error)
- func (s *Scheduler) ScheduleAndAssign(ctx context.Context, deployment *models.Deployment) error
- func (s *Scheduler) SetEnvMerger(envMerger EnvMergerInterface)
- type ZeroDowntimeDeployer
- func (d *ZeroDowntimeDeployer) DeployWithZeroDowntime(ctx context.Context, deployment *models.Deployment, appName string) error
- func (d *ZeroDowntimeDeployer) Rollback(ctx context.Context, appID, serviceName string, targetDeploymentID string) (*models.Deployment, error)
- func (d *ZeroDowntimeDeployer) RollbackToLatestSuccessful(ctx context.Context, appID, serviceName string) (*models.Deployment, error)
- type ZeroDowntimeDeployerConfig
Constants ¶
This section is empty.
Variables ¶
var ( ErrHealthCheckFailed = errors.New("health check failed for new container") ErrRoutingUpdateFailed = errors.New("failed to update routing to new container") ErrNoArtifact = errors.New("deployment has no artifact") ErrNoPreviousDeployment = errors.New("no previous successful deployment found for rollback") ErrDeploymentNotFound = errors.New("deployment not found") )
Common errors for zero-downtime deployment.
var ( ErrNoHealthyNodes = errors.New("no healthy nodes available") ErrInsufficientResources = errors.New("no nodes with sufficient resources") ErrDependenciesNotRunning = errors.New("service dependencies are not running") ErrDeploymentQueued = errors.New("deployment queued waiting for available nodes") ErrDeploymentTimeout = errors.New("deployment timed out waiting for scheduling") )
Common errors returned by the scheduler.
Functions ¶
func BuildDeployCommand ¶
func BuildDeployCommand(deployment *models.Deployment) *pb.DeploymentCommand
BuildDeployCommand is exported for testing purposes. It creates a DeploymentCommand from a Deployment model.
func CalculateCapacityScore ¶
CalculateCapacityScore calculates a capacity score for a node. Higher scores indicate more available capacity. The score is a weighted combination of CPU and memory availability.
func CheckDependenciesRunning ¶
func CheckDependenciesRunning(deployments []*models.Deployment, dependsOn []string) bool
CheckDependenciesRunning is a pure function that checks if all dependencies are satisfied given a list of deployments and the required dependencies. This is useful for testing without needing a store.
func CheckDeploymentTimeout ¶
func CheckDeploymentTimeout(deployment *models.Deployment, timeout time.Duration) bool
CheckDeploymentTimeout checks if a deployment has exceeded the timeout. Returns true if the deployment should be marked as failed. **Validates: Requirements 16.3**
func CreateRollbackDeployment ¶
func CreateRollbackDeployment(source *models.Deployment, newVersion int, newID string) *models.Deployment
CreateRollbackDeployment is a pure function that creates a rollback deployment from a source deployment. This is useful for testing. **Validates: Requirements 10.5, 20.3, 20.4**
func HasSufficientResources ¶
func HasSufficientResources(node *models.Node, spec *models.ResourceSpec) bool
HasSufficientResources checks if a node has enough resources for a given spec.
func IsRetryableError ¶
IsRetryableError is exported for testing purposes.
func MarkUnhealthyIfStale ¶
func MarkUnhealthyIfStale(ctx context.Context, nodeStore store.NodeStore, nodeID string, lastHeartbeat time.Time, threshold time.Duration) (bool, error)
MarkUnhealthyIfStale marks a node as unhealthy if its heartbeat is stale. Returns true if the node was marked unhealthy.
func NodeHasClosure ¶
NodeHasClosure checks if a node has a specific store path cached.
func SelectBestNode ¶
SelectBestNode selects the best node from a list based on deployment requirements. This is a convenience function that combines all placement strategies.
func ValidateRollbackDeployment ¶
func ValidateRollbackDeployment(source, rollback *models.Deployment) bool
ValidateRollbackDeployment validates that a rollback deployment was created correctly. Returns true if the rollback deployment has the correct properties. **Validates: Requirements 10.5, 20.3, 20.4**
func WaitForHealthyWithConfig ¶
func WaitForHealthyWithConfig( ctx context.Context, checkFunc func() (bool, error), timeout time.Duration, interval time.Duration, ) (bool, int, error)
WaitForHealthyWithConfig is a pure function for testing health check waiting logic. It returns true if the container becomes healthy within the given parameters.
Types ¶
type AgentClient ¶
type AgentClient interface {
Deploy(ctx context.Context, nodeID string, deployment *models.Deployment) error
Stop(ctx context.Context, nodeID string, deploymentID string) error
}
AgentClient defines the interface for communicating with node agents.
type CaddyRoutingUpdater ¶
type CaddyRoutingUpdater struct {
// contains filtered or unexported fields
}
CaddyRoutingUpdater implements RoutingUpdater for Caddy reverse proxy. It updates Caddy's configuration to route traffic to the new container. **Validates: Requirements 10.2, 10.3**
func NewCaddyRoutingUpdater ¶
func NewCaddyRoutingUpdater(caddyAPIURL string, logger *slog.Logger) *CaddyRoutingUpdater
NewCaddyRoutingUpdater creates a new CaddyRoutingUpdater.
func (*CaddyRoutingUpdater) UpdateRouting ¶
func (c *CaddyRoutingUpdater) UpdateRouting(ctx context.Context, serviceName, newContainerName string, port int) (string, error)
UpdateRouting updates Caddy configuration to route to the new container. Returns the old container name that was previously configured. **Validates: Requirements 10.2, 10.3**
type CommandSender ¶
type CommandSender interface {
SendCommand(ctx context.Context, nodeID string, cmd *pb.DeploymentCommand) error
}
CommandSender defines the interface for sending commands to nodes. This is implemented by NodeManager.
type ContainerManager ¶
type ContainerManager interface {
// StartContainer starts a new container with the given name and deployment config.
StartContainer(ctx context.Context, nodeID, containerName string, deployment *models.Deployment) error
// StopContainer stops a container by name.
StopContainer(ctx context.Context, nodeID, containerName string) error
// CheckHealth checks if a container is healthy.
CheckHealth(ctx context.Context, nodeID, containerName string, healthCheck *models.HealthCheckConfig) (bool, error)
}
ContainerManager defines the interface for container lifecycle operations.
type EnvMergerInterface ¶
type EnvMergerInterface interface {
MergeForDeployment(ctx context.Context, appID, serviceName string, serviceEnvVars map[string]string) (map[string]string, error)
}
EnvMergerInterface defines the interface for merging environment variables. **Validates: Requirements 6.1, 6.2, 6.3**
type GRPCAgentClient ¶
type GRPCAgentClient struct {
// contains filtered or unexported fields
}
GRPCAgentClient implements the AgentClient interface using gRPC via NodeManager. It sends deployment commands through the persistent WatchCommands stream. Requirements: 3.1, 11.1, 11.2, 11.3
func NewGRPCAgentClient ¶
func NewGRPCAgentClient(sender CommandSender, cfg *GRPCAgentClientConfig) *GRPCAgentClient
NewGRPCAgentClient creates a new GRPCAgentClient.
func (*GRPCAgentClient) Deploy ¶
func (c *GRPCAgentClient) Deploy(ctx context.Context, nodeID string, deployment *models.Deployment) error
Deploy sends a deployment command to the specified node via gRPC. Requirements: 3.1, 3.2, 11.1, 11.2, 11.3
type GRPCAgentClientConfig ¶
type GRPCAgentClientConfig struct {
// CommandTimeout is the deadline for command acknowledgment (default: 10s)
CommandTimeout time.Duration
// MaxRetries is the maximum number of retry attempts (default: 3)
MaxRetries int
}
GRPCAgentClientConfig holds configuration for the GRPCAgentClient.
func DefaultGRPCAgentClientConfig ¶
func DefaultGRPCAgentClientConfig() *GRPCAgentClientConfig
DefaultGRPCAgentClientConfig returns default configuration values.
type HealthMonitor ¶
type HealthMonitor struct {
// contains filtered or unexported fields
}
HealthMonitor periodically checks node health and triggers rescheduling for unhealthy nodes. It also watches for nodes becoming healthy and processes pending deployments. **Validates: Requirements 16.2**
func NewHealthMonitor ¶
func NewHealthMonitor(s store.Store, scheduler *Scheduler, healthThreshold, checkInterval time.Duration, logger *slog.Logger) *HealthMonitor
NewHealthMonitor creates a new HealthMonitor instance.
func NewHealthMonitorWithTimeout ¶
func NewHealthMonitorWithTimeout(s store.Store, scheduler *Scheduler, healthThreshold, checkInterval, deploymentTimeout time.Duration, logger *slog.Logger) *HealthMonitor
NewHealthMonitorWithTimeout creates a new HealthMonitor instance with a custom deployment timeout. **Validates: Requirements 16.3**
func (*HealthMonitor) CheckNodeHealth ¶
func (h *HealthMonitor) CheckNodeHealth(node *NodeHealthInfo) bool
CheckNodeHealth checks if a specific node should be marked as unhealthy. Returns true if the node is stale (heartbeat older than threshold).
type NodeHealthInfo ¶
NodeHealthInfo contains the information needed to check node health.
type ResourceAvailability ¶
ResourceAvailability tracks the total available resources across all healthy nodes. **Validates: Requirements 16.4**
func CalculateTotalResources ¶
func CalculateTotalResources(nodes []*models.Node, healthThreshold time.Duration) ResourceAvailability
CalculateTotalResources calculates the total available resources across all healthy nodes. **Validates: Requirements 16.4**
func (*ResourceAvailability) HasResourcesForSpec ¶
func (r *ResourceAvailability) HasResourcesForSpec(spec *models.ResourceSpec) bool
HasResourcesForSpec checks if the total available resources can accommodate a deployment. **Validates: Requirements 16.4**
type ResourceRequirements ¶
type ResourceRequirements struct {
CPU float64 // CPU cores required
Memory int64 // Memory in bytes required
}
ResourceRequirements defines the CPU and memory requirements.
func GetResourceRequirements ¶
func GetResourceRequirements(spec *models.ResourceSpec) ResourceRequirements
GetResourceRequirements returns the resource requirements from a ResourceSpec. If spec is nil, returns default requirements (0.5 CPU, 512MB).
type RollbackResult ¶
type RollbackResult struct {
NewDeployment *models.Deployment `json:"new_deployment"`
SourceDeployment *models.Deployment `json:"source_deployment"`
Success bool `json:"success"`
Message string `json:"message,omitempty"`
}
RollbackResult represents the result of a rollback operation.
type RoutingConfig ¶
type RoutingConfig struct {
ServiceName string `json:"service_name"`
ContainerName string `json:"container_name"`
Port int `json:"port"`
Domain string `json:"domain,omitempty"`
}
RoutingConfig represents the routing configuration for a service.
func BuildRoutingConfig ¶
func BuildRoutingConfig(deployment *models.Deployment, appName string) *RoutingConfig
BuildRoutingConfig creates a RoutingConfig from a deployment.
type RoutingUpdater ¶
type RoutingUpdater interface {
// UpdateRouting updates the routing configuration to point to the new container.
// It returns the old container name that was previously routed to.
UpdateRouting(ctx context.Context, serviceName, newContainerName string, port int) (oldContainerName string, err error)
}
RoutingUpdater defines the interface for updating routing configuration. This is typically implemented by a Caddy configuration manager.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler determines optimal node placement for deployments.
func NewScheduler ¶
func NewScheduler(s store.Store, agentClient AgentClient, cfg *config.SchedulerConfig, logger *slog.Logger) *Scheduler
NewScheduler creates a new Scheduler instance.
func (*Scheduler) AreDependenciesRunning ¶
func (s *Scheduler) AreDependenciesRunning(ctx context.Context, deployment *models.Deployment) (bool, error)
AreDependenciesRunning checks if all service dependencies for a deployment are running. It looks for deployments of the same app with the dependent service names that are in running state.
func (*Scheduler) GetHealthThreshold ¶
GetHealthThreshold returns the configured health threshold duration.
func (*Scheduler) IsNodeHealthy ¶
IsNodeHealthy checks if a node is considered healthy based on its heartbeat.
func (*Scheduler) Reschedule ¶
Reschedule moves all deployments from a node to other healthy nodes.
func (*Scheduler) Schedule ¶
func (s *Scheduler) Schedule(ctx context.Context, deployment *models.Deployment) (*models.Node, error)
Schedule assigns a deployment to an appropriate node. It filters nodes by health, resources, and cache locality, then selects the best candidate.
func (*Scheduler) ScheduleAndAssign ¶
ScheduleAndAssign schedules a deployment and updates the deployment record with the placement. If no healthy nodes are available, the deployment remains in "built" status (queued). **Validates: Requirements 16.1, 6.2**
func (*Scheduler) SetEnvMerger ¶
func (s *Scheduler) SetEnvMerger(envMerger EnvMergerInterface)
SetEnvMerger sets the environment merger for the scheduler. This allows merging app-level secrets with service-level env vars before deployment. **Validates: Requirements 6.1, 6.2, 6.3**
type ZeroDowntimeDeployer ¶
type ZeroDowntimeDeployer struct {
// contains filtered or unexported fields
}
ZeroDowntimeDeployer handles zero-downtime deployments using blue-green pattern. **Validates: Requirements 10.1, 10.2, 10.3, 10.4**
func NewZeroDowntimeDeployer ¶
func NewZeroDowntimeDeployer( s store.Store, containerManager ContainerManager, routingUpdater RoutingUpdater, cfg *ZeroDowntimeDeployerConfig, logger *slog.Logger, ) *ZeroDowntimeDeployer
NewZeroDowntimeDeployer creates a new ZeroDowntimeDeployer.
func (*ZeroDowntimeDeployer) DeployWithZeroDowntime ¶
func (d *ZeroDowntimeDeployer) DeployWithZeroDowntime(ctx context.Context, deployment *models.Deployment, appName string) error
DeployWithZeroDowntime performs a blue-green style deployment. It starts the new container, waits for health checks, updates routing, and then stops the old container. **Validates: Requirements 10.1, 10.2, 10.3, 10.4**
func (*ZeroDowntimeDeployer) Rollback ¶
func (d *ZeroDowntimeDeployer) Rollback(ctx context.Context, appID, serviceName string, targetDeploymentID string) (*models.Deployment, error)
Rollback creates a new deployment using a previous successful deployment's artifact. It finds the specified deployment, creates a new deployment with the same artifact, and returns the new deployment. **Validates: Requirements 10.5, 20.3, 20.4**
func (*ZeroDowntimeDeployer) RollbackToLatestSuccessful ¶
func (d *ZeroDowntimeDeployer) RollbackToLatestSuccessful(ctx context.Context, appID, serviceName string) (*models.Deployment, error)
RollbackToLatestSuccessful creates a new deployment using the latest successful deployment's artifact. **Validates: Requirements 10.5**
type ZeroDowntimeDeployerConfig ¶
type ZeroDowntimeDeployerConfig struct {
// HealthTimeout is the maximum time to wait for health checks to pass.
HealthTimeout time.Duration
// HealthInterval is the interval between health check attempts.
HealthInterval time.Duration
}
ZeroDowntimeDeployerConfig holds configuration for the ZeroDowntimeDeployer.
func DefaultZeroDowntimeDeployerConfig ¶
func DefaultZeroDowntimeDeployerConfig() *ZeroDowntimeDeployerConfig
DefaultZeroDowntimeDeployerConfig returns default configuration values.