lib

package
v0.0.0-...-d378d34 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2025 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// This error is returned from the step at any time when the step should be skipped.
	ErrStepSkipped = errors.New("step skipped")
)

Functions

func MinMaxScale

func MinMaxScale(value, lowerBound, upperBound, activationLowerBound, activationUpperBound float64) float64

Min-max scale a value between lower and upper bounds and apply the given activation. Note: the resulting value is clamped between the activation bounds.

Types

type APIMonitor

type APIMonitor struct {
	// A histogram to measure how long the API requests take to run.
	ApiRequestsTimer *prometheus.HistogramVec
}

Collection of Prometheus metrics to monitor scheduler pipeline

func NewSchedulerMonitor

func NewSchedulerMonitor() APIMonitor

Create a new scheduler monitor and register the necessary Prometheus metrics.

func (*APIMonitor) Callback

func (m *APIMonitor) Callback(w http.ResponseWriter, r *http.Request, pattern string) MonitoredCallback

func (*APIMonitor) Collect

func (m *APIMonitor) Collect(ch chan<- prometheus.Metric)

func (*APIMonitor) Describe

func (m *APIMonitor) Describe(ch chan<- *prometheus.Desc)

type ActivationFunction

type ActivationFunction struct{}

Mixin that can be embedded in a step to provide some activation function tooling.

func (*ActivationFunction) Apply

func (m *ActivationFunction) Apply(in, activations map[string]float64) map[string]float64

Apply the activation function to the weights map. All hosts that are not in the activations map are removed.

func (*ActivationFunction) NoEffect

func (m *ActivationFunction) NoEffect() float64

Get activations that will have no effect on the host.

func (*ActivationFunction) Norm

func (m *ActivationFunction) Norm(activation float64) float64

Normalize a single value using the activation function.

type BasePipelineController

type BasePipelineController[PipelineType any] struct {
	// Available pipelines by their name.
	Pipelines map[string]PipelineType
	// Delegate to create pipelines.
	Delegate BasePipelineControllerDelegate[PipelineType]
	// Kubernetes client to manage/fetch resources.
	client.Client
	// The name of the operator to scope resources to.
	OperatorName string
}

Base controller for decision pipelines.

func (*BasePipelineController[PipelineType]) HandleKnowledgeCreated

func (c *BasePipelineController[PipelineType]) HandleKnowledgeCreated(
	ctx context.Context,
	evt event.CreateEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a knowledge watch to handle created knowledges.

This handler will re-evaluate all steps depending on the knowledge.

func (*BasePipelineController[PipelineType]) HandleKnowledgeDeleted

func (c *BasePipelineController[PipelineType]) HandleKnowledgeDeleted(
	ctx context.Context,
	evt event.DeleteEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a knowledge watch to handle deleted knowledges.

This handler will re-evaluate all steps depending on the knowledge.

func (*BasePipelineController[PipelineType]) HandleKnowledgeUpdated

func (c *BasePipelineController[PipelineType]) HandleKnowledgeUpdated(
	ctx context.Context,
	evt event.UpdateEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a knowledge watch to handle updated knowledges.

This handler will re-evaluate all steps depending on the knowledge.

func (*BasePipelineController[PipelineType]) HandlePipelineCreated

func (c *BasePipelineController[PipelineType]) HandlePipelineCreated(
	ctx context.Context,
	evt event.CreateEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a pipeline watch to handle created pipelines.

This handler will initialize new pipelines as needed and put them into the pipeline map.

func (*BasePipelineController[PipelineType]) HandlePipelineDeleted

func (c *BasePipelineController[PipelineType]) HandlePipelineDeleted(
	ctx context.Context,
	evt event.DeleteEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a pipeline watch to handle deleted pipelines.

This handler will remove pipelines from the pipeline map.

func (*BasePipelineController[PipelineType]) HandlePipelineUpdated

func (c *BasePipelineController[PipelineType]) HandlePipelineUpdated(
	ctx context.Context,
	evt event.UpdateEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a pipeline watch to handle updated pipelines.

This handler will initialize new pipelines as needed and put them into the pipeline map.

func (*BasePipelineController[PipelineType]) HandleStepCreated

func (c *BasePipelineController[PipelineType]) HandleStepCreated(
	ctx context.Context,
	evt event.CreateEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a step watch to handle created steps.

This handler will look at the underlying resources of the step and check if they are ready. It will then re-evaluate all pipelines depending on the step.

func (*BasePipelineController[PipelineType]) HandleStepDeleted

func (c *BasePipelineController[PipelineType]) HandleStepDeleted(
	ctx context.Context,
	evt event.DeleteEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a step watch to handle deleted steps.

This handler will re-evaluate all pipelines depending on the step.

func (*BasePipelineController[PipelineType]) HandleStepUpdated

func (c *BasePipelineController[PipelineType]) HandleStepUpdated(
	ctx context.Context,
	evt event.UpdateEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a step watch to handle updated steps.

This handler will look at the underlying resources of the step and check if they are ready. It will then re-evaluate all pipelines depending on the step.

func (*BasePipelineController[PipelineType]) InitAllPipelines

func (c *BasePipelineController[PipelineType]) InitAllPipelines(ctx context.Context) error

Handle the startup of the manager by initializing the pipeline map.

type BasePipelineControllerDelegate

type BasePipelineControllerDelegate[PipelineType any] interface {
	// Initialize a new pipeline with the given steps.
	//
	// This method is delegated to the parent controller, when a pipeline needs
	// to be newly initialized or re-initialized to update it in the pipeline
	// map.
	InitPipeline(steps []v1alpha1.Step) (PipelineType, error)
}

The base pipeline controller will delegate some methods to the parent controller struct. The parent controller only needs to conform to this interface and set the delegate field accordingly.

type BaseStep

type BaseStep[RequestType PipelineRequest, Opts StepOpts] struct {
	// Options to pass via yaml to this step.
	conf.JsonOpts[Opts]
	// The activation function to use.
	ActivationFunction
	// Database connection.
	DB db.DB
	// The alias of this step, if any.
	Alias string
}

Common base for all steps that provides some functionality that would otherwise be duplicated across all steps.

func (*BaseStep[RequestType, Opts]) Init

func (s *BaseStep[RequestType, Opts]) Init(db db.DB, opts conf.RawOpts) error

Init the step with the database and options.

func (*BaseStep[RequestType, Opts]) PrepareResult

func (s *BaseStep[RequestType, Opts]) PrepareResult(request RequestType) *StepResult

Get a default result (no action) for the input weight keys given in the request.

func (*BaseStep[RequestType, Opts]) PrepareStats

func (s *BaseStep[RequestType, Opts]) PrepareStats(request PipelineRequest, unit string) StepStatistics

Get default statistics for the input weight keys given in the request.

type EmptyStepOpts

type EmptyStepOpts struct{}

Empty step opts conforming to the StepOpts interface (validation always succeeds).

func (EmptyStepOpts) Validate

func (EmptyStepOpts) Validate() error

type MonitoredCallback

type MonitoredCallback struct {
	// contains filtered or unexported fields
}

Helper to respond to the request with the given code and error. Adds monitoring for the time it took to handle the request.

func (MonitoredCallback) Respond

func (c MonitoredCallback) Respond(code int, err error, text string)

Respond to the request with the given code and error. Also log the time it took to handle the request.

type Pipeline

type Pipeline[RequestType PipelineRequest] interface {
	// Run the scheduling pipeline with the given request.
	Run(request RequestType) (v1alpha1.DecisionResult, error)
}

func NewPipeline

func NewPipeline[RequestType PipelineRequest](
	supportedSteps map[string]func() Step[RequestType],
	confedSteps []v1alpha1.Step,
	stepWrappers []StepWrapper[RequestType],
	database db.DB,
	monitor PipelineMonitor,
) (Pipeline[RequestType], error)

Create a new pipeline with steps contained in the configuration.

type PipelineMonitor

type PipelineMonitor struct {
	// The pipeline name is used to differentiate between different pipelines.
	PipelineName string
	// contains filtered or unexported fields
}

Collection of Prometheus metrics to monitor scheduler pipeline

func NewPipelineMonitor

func NewPipelineMonitor(registry *monitoring.Registry) PipelineMonitor

Create a new scheduler monitor and register the necessary Prometheus metrics.

func (PipelineMonitor) SubPipeline

func (m PipelineMonitor) SubPipeline(name string) PipelineMonitor

Get a copied pipeline monitor with the name set, after binding the metrics.

type PipelineRequest

type PipelineRequest interface {
	// Get the subjects that went in the pipeline.
	GetSubjects() []string
	// Get the weights for the subjects.
	GetWeights() map[string]float64
	// Get logging args to be used in the step's trace log.
	// Usually, this will be the request context including the request ID.
	GetTraceLogArgs() []slog.Attr
}

type Premodifier

type Premodifier[RequestType PipelineRequest] interface {
	// Modify the request before it is sent to the pipeline.
	ModifyRequest(request *RequestType) error
}

type Step

type Step[RequestType PipelineRequest] interface {
	// Configure the step with a database and options.
	Init(db db.DB, opts conf.RawOpts) error
	// Run this step of the scheduling pipeline.
	// Return a map of keys to activation values. Important: keys that are
	// not in the map are considered as filtered out.
	// Provide a traceLog that contains the global request id and should
	// be used to log the step's execution.
	Run(traceLog *slog.Logger, request RequestType) (*StepResult, error)
	// Get the name of this step.
	// The name is used to identify the step in metrics, config, logs, and more.
	// Should be something like: "my_cool_scheduler_step".
	GetName() string
}

Interface for a scheduler step.

type StepMonitor

type StepMonitor[RequestType PipelineRequest] struct {
	// Mixin that can be embedded in a step to provide some activation function tooling.
	ActivationFunction

	// The wrapped scheduler step to monitor.
	Step Step[RequestType]
	// contains filtered or unexported fields
}

Wraps a scheduler step to monitor its execution.

func MonitorStep

func MonitorStep[RequestType PipelineRequest](step Step[RequestType], m PipelineMonitor) *StepMonitor[RequestType]

Schedule using the wrapped step and measure the time it takes.

func (*StepMonitor[RequestType]) GetName

func (s *StepMonitor[RequestType]) GetName() string

Get the name of the wrapped step.

func (*StepMonitor[RequestType]) Init

func (s *StepMonitor[RequestType]) Init(db db.DB, opts conf.RawOpts) error

Initialize the wrapped step with the database and options.

func (*StepMonitor[RequestType]) Run

func (s *StepMonitor[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*StepResult, error)

Run the step and observe its execution.

type StepOpts

type StepOpts interface {
	// Validate the options for this step.
	Validate() error
}

Interface to which step options must conform.

type StepResult

type StepResult struct {
	// The activations calculated by this step.
	Activations map[string]float64

	// Step statistics like:
	//
	//	{
	//	  "max cpu contention": {
	//	     "unit": "cpu contention [%]",
	//	     "hosts": { "host 1": 10, "host 2": 10 }
	//	   },
	//	  "noisy projects": {
	//	     "unit": "vms of this project running on host [#]",
	//	     "hosts": { "host 1": 1, "host 2": 0 }
	//	   }
	//	}
	//
	// These statistics are used to display the step's effect on the hosts.
	// For example: max cpu contention: before [ 100%, 50%, 40% ], after [ 40%, 50%, 100% ]
	Statistics map[string]StepStatistics
}

type StepStatistics

type StepStatistics struct {
	// The unit of the statistic.
	Unit string
	// The subjects and their values.
	Subjects map[string]float64
}

type StepValidator

type StepValidator[RequestType PipelineRequest] struct {
	// The wrapped step to validate.
	Step Step[RequestType]
	// By default, we execute all validations. However, through the config,
	// we can also disable some validations if necessary.
	DisabledValidations v1alpha1.DisabledValidationsSpec
}

Wrapper for scheduler steps that validates them before/after execution.

func ValidateStep

func ValidateStep[RequestType PipelineRequest](step Step[RequestType], disabledValidations v1alpha1.DisabledValidationsSpec) *StepValidator[RequestType]

Validate the wrapped step with the database and options.

func (*StepValidator[RequestType]) GetName

func (s *StepValidator[RequestType]) GetName() string

Get the name of the wrapped step.

func (*StepValidator[RequestType]) Init

func (s *StepValidator[RequestType]) Init(db db.DB, opts libconf.RawOpts) error

Initialize the wrapped step with the database and options.

func (*StepValidator[RequestType]) Run

func (s *StepValidator[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*StepResult, error)

Run the step and validate what happens.

type StepWrapper

type StepWrapper[RequestType PipelineRequest] func(Step[RequestType], v1alpha1.Step) (Step[RequestType], error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL