Documentation
¶
Index ¶
- Variables
- func MinMaxScale(...) float64
- type APIMonitor
- type ActivationFunction
- type BasePipelineController
- func (c *BasePipelineController[PipelineType]) HandleKnowledgeCreated(ctx context.Context, evt event.CreateEvent, ...)
- func (c *BasePipelineController[PipelineType]) HandleKnowledgeDeleted(ctx context.Context, evt event.DeleteEvent, ...)
- func (c *BasePipelineController[PipelineType]) HandleKnowledgeUpdated(ctx context.Context, evt event.UpdateEvent, ...)
- func (c *BasePipelineController[PipelineType]) HandlePipelineCreated(ctx context.Context, evt event.CreateEvent, ...)
- func (c *BasePipelineController[PipelineType]) HandlePipelineDeleted(ctx context.Context, evt event.DeleteEvent, ...)
- func (c *BasePipelineController[PipelineType]) HandlePipelineUpdated(ctx context.Context, evt event.UpdateEvent, ...)
- func (c *BasePipelineController[PipelineType]) HandleStepCreated(ctx context.Context, evt event.CreateEvent, ...)
- func (c *BasePipelineController[PipelineType]) HandleStepDeleted(ctx context.Context, evt event.DeleteEvent, ...)
- func (c *BasePipelineController[PipelineType]) HandleStepUpdated(ctx context.Context, evt event.UpdateEvent, ...)
- func (c *BasePipelineController[PipelineType]) InitAllPipelines(ctx context.Context) error
- type BasePipelineControllerDelegate
- type BaseStep
- type EmptyStepOpts
- type MonitoredCallback
- type Pipeline
- type PipelineMonitor
- type PipelineRequest
- type Premodifier
- type Step
- type StepMonitor
- type StepOpts
- type StepResult
- type StepStatistics
- type StepValidator
- type StepWrapper
Constants ¶
This section is empty.
Variables ¶
var ( // This error is returned from the step at any time when the step should be skipped. ErrStepSkipped = errors.New("step skipped") )
Functions ¶
func MinMaxScale ¶
func MinMaxScale(value, lowerBound, upperBound, activationLowerBound, activationUpperBound float64) float64
Min-max scale a value between lower and upper bounds and apply the given activation. Note: the resulting value is clamped between the activation bounds.
Types ¶
type APIMonitor ¶
type APIMonitor struct {
// A histogram to measure how long the API requests take to run.
ApiRequestsTimer *prometheus.HistogramVec
}
Collection of Prometheus metrics to monitor scheduler pipeline
func NewSchedulerMonitor ¶
func NewSchedulerMonitor() APIMonitor
Create a new scheduler monitor and register the necessary Prometheus metrics.
func (*APIMonitor) Callback ¶
func (m *APIMonitor) Callback(w http.ResponseWriter, r *http.Request, pattern string) MonitoredCallback
func (*APIMonitor) Collect ¶
func (m *APIMonitor) Collect(ch chan<- prometheus.Metric)
func (*APIMonitor) Describe ¶
func (m *APIMonitor) Describe(ch chan<- *prometheus.Desc)
type ActivationFunction ¶
type ActivationFunction struct{}
Mixin that can be embedded in a step to provide some activation function tooling.
func (*ActivationFunction) Apply ¶
func (m *ActivationFunction) Apply(in, activations map[string]float64) map[string]float64
Apply the activation function to the weights map. All hosts that are not in the activations map are removed.
func (*ActivationFunction) NoEffect ¶
func (m *ActivationFunction) NoEffect() float64
Get activations that will have no effect on the host.
func (*ActivationFunction) Norm ¶
func (m *ActivationFunction) Norm(activation float64) float64
Normalize a single value using the activation function.
type BasePipelineController ¶
type BasePipelineController[PipelineType any] struct { // Available pipelines by their name. Pipelines map[string]PipelineType // Delegate to create pipelines. Delegate BasePipelineControllerDelegate[PipelineType] // Kubernetes client to manage/fetch resources. client.Client // The name of the operator to scope resources to. OperatorName string }
Base controller for decision pipelines.
func (*BasePipelineController[PipelineType]) HandleKnowledgeCreated ¶
func (c *BasePipelineController[PipelineType]) HandleKnowledgeCreated( ctx context.Context, evt event.CreateEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request], )
Handler bound to a knowledge watch to handle created knowledges.
This handler will re-evaluate all steps depending on the knowledge.
func (*BasePipelineController[PipelineType]) HandleKnowledgeDeleted ¶
func (c *BasePipelineController[PipelineType]) HandleKnowledgeDeleted( ctx context.Context, evt event.DeleteEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request], )
Handler bound to a knowledge watch to handle deleted knowledges.
This handler will re-evaluate all steps depending on the knowledge.
func (*BasePipelineController[PipelineType]) HandleKnowledgeUpdated ¶
func (c *BasePipelineController[PipelineType]) HandleKnowledgeUpdated( ctx context.Context, evt event.UpdateEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request], )
Handler bound to a knowledge watch to handle updated knowledges.
This handler will re-evaluate all steps depending on the knowledge.
func (*BasePipelineController[PipelineType]) HandlePipelineCreated ¶
func (c *BasePipelineController[PipelineType]) HandlePipelineCreated( ctx context.Context, evt event.CreateEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request], )
Handler bound to a pipeline watch to handle created pipelines.
This handler will initialize new pipelines as needed and put them into the pipeline map.
func (*BasePipelineController[PipelineType]) HandlePipelineDeleted ¶
func (c *BasePipelineController[PipelineType]) HandlePipelineDeleted( ctx context.Context, evt event.DeleteEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request], )
Handler bound to a pipeline watch to handle deleted pipelines.
This handler will remove pipelines from the pipeline map.
func (*BasePipelineController[PipelineType]) HandlePipelineUpdated ¶
func (c *BasePipelineController[PipelineType]) HandlePipelineUpdated( ctx context.Context, evt event.UpdateEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request], )
Handler bound to a pipeline watch to handle updated pipelines.
This handler will initialize new pipelines as needed and put them into the pipeline map.
func (*BasePipelineController[PipelineType]) HandleStepCreated ¶
func (c *BasePipelineController[PipelineType]) HandleStepCreated( ctx context.Context, evt event.CreateEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request], )
Handler bound to a step watch to handle created steps.
This handler will look at the underlying resources of the step and check if they are ready. It will then re-evaluate all pipelines depending on the step.
func (*BasePipelineController[PipelineType]) HandleStepDeleted ¶
func (c *BasePipelineController[PipelineType]) HandleStepDeleted( ctx context.Context, evt event.DeleteEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request], )
Handler bound to a step watch to handle deleted steps.
This handler will re-evaluate all pipelines depending on the step.
func (*BasePipelineController[PipelineType]) HandleStepUpdated ¶
func (c *BasePipelineController[PipelineType]) HandleStepUpdated( ctx context.Context, evt event.UpdateEvent, queue workqueue.TypedRateLimitingInterface[reconcile.Request], )
Handler bound to a step watch to handle updated steps.
This handler will look at the underlying resources of the step and check if they are ready. It will then re-evaluate all pipelines depending on the step.
func (*BasePipelineController[PipelineType]) InitAllPipelines ¶
func (c *BasePipelineController[PipelineType]) InitAllPipelines(ctx context.Context) error
Handle the startup of the manager by initializing the pipeline map.
type BasePipelineControllerDelegate ¶
type BasePipelineControllerDelegate[PipelineType any] interface { // Initialize a new pipeline with the given steps. // // This method is delegated to the parent controller, when a pipeline needs // to be newly initialized or re-initialized to update it in the pipeline // map. InitPipeline(steps []v1alpha1.Step) (PipelineType, error) }
The base pipeline controller will delegate some methods to the parent controller struct. The parent controller only needs to conform to this interface and set the delegate field accordingly.
type BaseStep ¶
type BaseStep[RequestType PipelineRequest, Opts StepOpts] struct { // Options to pass via yaml to this step. conf.JsonOpts[Opts] // The activation function to use. ActivationFunction // Database connection. DB db.DB // The alias of this step, if any. Alias string }
Common base for all steps that provides some functionality that would otherwise be duplicated across all steps.
func (*BaseStep[RequestType, Opts]) PrepareResult ¶
func (s *BaseStep[RequestType, Opts]) PrepareResult(request RequestType) *StepResult
Get a default result (no action) for the input weight keys given in the request.
func (*BaseStep[RequestType, Opts]) PrepareStats ¶
func (s *BaseStep[RequestType, Opts]) PrepareStats(request PipelineRequest, unit string) StepStatistics
Get default statistics for the input weight keys given in the request.
type EmptyStepOpts ¶
type EmptyStepOpts struct{}
Empty step opts conforming to the StepOpts interface (validation always succeeds).
func (EmptyStepOpts) Validate ¶
func (EmptyStepOpts) Validate() error
type MonitoredCallback ¶
type MonitoredCallback struct {
// contains filtered or unexported fields
}
Helper to respond to the request with the given code and error. Adds monitoring for the time it took to handle the request.
type Pipeline ¶
type Pipeline[RequestType PipelineRequest] interface { // Run the scheduling pipeline with the given request. Run(request RequestType) (v1alpha1.DecisionResult, error) }
func NewPipeline ¶
func NewPipeline[RequestType PipelineRequest]( supportedSteps map[string]func() Step[RequestType], confedSteps []v1alpha1.Step, stepWrappers []StepWrapper[RequestType], database db.DB, monitor PipelineMonitor, ) (Pipeline[RequestType], error)
Create a new pipeline with steps contained in the configuration.
type PipelineMonitor ¶
type PipelineMonitor struct {
// The pipeline name is used to differentiate between different pipelines.
PipelineName string
// contains filtered or unexported fields
}
Collection of Prometheus metrics to monitor scheduler pipeline
func NewPipelineMonitor ¶
func NewPipelineMonitor(registry *monitoring.Registry) PipelineMonitor
Create a new scheduler monitor and register the necessary Prometheus metrics.
func (PipelineMonitor) SubPipeline ¶
func (m PipelineMonitor) SubPipeline(name string) PipelineMonitor
Get a copied pipeline monitor with the name set, after binding the metrics.
type PipelineRequest ¶
type PipelineRequest interface {
// Get the subjects that went in the pipeline.
GetSubjects() []string
// Get the weights for the subjects.
GetWeights() map[string]float64
// Get logging args to be used in the step's trace log.
// Usually, this will be the request context including the request ID.
GetTraceLogArgs() []slog.Attr
}
type Premodifier ¶
type Premodifier[RequestType PipelineRequest] interface { // Modify the request before it is sent to the pipeline. ModifyRequest(request *RequestType) error }
type Step ¶
type Step[RequestType PipelineRequest] interface { // Configure the step with a database and options. Init(db db.DB, opts conf.RawOpts) error // Run this step of the scheduling pipeline. // Return a map of keys to activation values. Important: keys that are // not in the map are considered as filtered out. // Provide a traceLog that contains the global request id and should // be used to log the step's execution. Run(traceLog *slog.Logger, request RequestType) (*StepResult, error) // Get the name of this step. // The name is used to identify the step in metrics, config, logs, and more. // Should be something like: "my_cool_scheduler_step". GetName() string }
Interface for a scheduler step.
type StepMonitor ¶
type StepMonitor[RequestType PipelineRequest] struct { // Mixin that can be embedded in a step to provide some activation function tooling. ActivationFunction // The wrapped scheduler step to monitor. Step Step[RequestType] // contains filtered or unexported fields }
Wraps a scheduler step to monitor its execution.
func MonitorStep ¶
func MonitorStep[RequestType PipelineRequest](step Step[RequestType], m PipelineMonitor) *StepMonitor[RequestType]
Schedule using the wrapped step and measure the time it takes.
func (*StepMonitor[RequestType]) GetName ¶
func (s *StepMonitor[RequestType]) GetName() string
Get the name of the wrapped step.
func (*StepMonitor[RequestType]) Run ¶
func (s *StepMonitor[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*StepResult, error)
Run the step and observe its execution.
type StepOpts ¶
type StepOpts interface {
// Validate the options for this step.
Validate() error
}
Interface to which step options must conform.
type StepResult ¶
type StepResult struct {
// The activations calculated by this step.
Activations map[string]float64
// Step statistics like:
//
// {
// "max cpu contention": {
// "unit": "cpu contention [%]",
// "hosts": { "host 1": 10, "host 2": 10 }
// },
// "noisy projects": {
// "unit": "vms of this project running on host [#]",
// "hosts": { "host 1": 1, "host 2": 0 }
// }
// }
//
// These statistics are used to display the step's effect on the hosts.
// For example: max cpu contention: before [ 100%, 50%, 40% ], after [ 40%, 50%, 100% ]
Statistics map[string]StepStatistics
}
type StepStatistics ¶
type StepValidator ¶
type StepValidator[RequestType PipelineRequest] struct { // The wrapped step to validate. Step Step[RequestType] // By default, we execute all validations. However, through the config, // we can also disable some validations if necessary. DisabledValidations v1alpha1.DisabledValidationsSpec }
Wrapper for scheduler steps that validates them before/after execution.
func ValidateStep ¶
func ValidateStep[RequestType PipelineRequest](step Step[RequestType], disabledValidations v1alpha1.DisabledValidationsSpec) *StepValidator[RequestType]
Validate the wrapped step with the database and options.
func (*StepValidator[RequestType]) GetName ¶
func (s *StepValidator[RequestType]) GetName() string
Get the name of the wrapped step.
func (*StepValidator[RequestType]) Init ¶
Initialize the wrapped step with the database and options.
func (*StepValidator[RequestType]) Run ¶
func (s *StepValidator[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*StepResult, error)
Run the step and validate what happens.