lib

package
v0.0.0-...-0e8ba2f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2026 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// This error is returned from the step at any time when the step should be skipped.
	ErrStepSkipped = errors.New("step skipped")
)

Functions

func MinMaxScale

func MinMaxScale(value, lowerBound, upperBound, activationLowerBound, activationUpperBound float64) float64

Min-max scale a value between lower and upper bounds and apply the given activation. Note: the resulting value is clamped between the activation bounds.

Types

type APIMonitor

type APIMonitor struct {
	// A histogram to measure how long the API requests take to run.
	ApiRequestsTimer *prometheus.HistogramVec
}

Collection of Prometheus metrics to monitor scheduler pipeline

func NewSchedulerMonitor

func NewSchedulerMonitor() APIMonitor

Create a new scheduler monitor and register the necessary Prometheus metrics.

func (*APIMonitor) Callback

func (m *APIMonitor) Callback(w http.ResponseWriter, r *http.Request, pattern string) MonitoredCallback

func (*APIMonitor) Collect

func (m *APIMonitor) Collect(ch chan<- prometheus.Metric)

func (*APIMonitor) Describe

func (m *APIMonitor) Describe(ch chan<- *prometheus.Desc)

type ActivationFunction

type ActivationFunction struct{}

Mixin that can be embedded in a step to provide some activation function tooling.

func (*ActivationFunction) Apply

func (m *ActivationFunction) Apply(in, activations map[string]float64, multiplier float64) map[string]float64

Apply the activation function to the weights map. All hosts that are not in the activations map are removed.

func (*ActivationFunction) NoEffect

func (m *ActivationFunction) NoEffect() float64

Get activations that will have no effect on the host.

func (*ActivationFunction) Norm

func (m *ActivationFunction) Norm(activation float64) float64

Normalize a single value using the activation function.

type BaseDetector

type BaseDetector[Opts DetectionStepOpts] struct {
	// Options to pass via yaml to this step.
	Options Opts
	// The kubernetes client to use.
	Client client.Client
}

Common base for all descheduler steps that provides some functionality that would otherwise be duplicated across all steps.

func (*BaseDetector[Opts]) CheckKnowledges

func (d *BaseDetector[Opts]) CheckKnowledges(ctx context.Context, kns ...corev1.ObjectReference) error

Check if all knowledges are ready, and if not, return an error indicating why not.

func (*BaseDetector[Opts]) Init

func (d *BaseDetector[Opts]) Init(ctx context.Context, client client.Client, step v1alpha1.DetectorSpec) error

Init the step.

func (*BaseDetector[Opts]) Validate

func (d *BaseDetector[Opts]) Validate(ctx context.Context, params v1alpha1.Parameters) error

Validate that the given config is valid for this step. This is used in the pipeline validation to check if the pipeline configuration is valid without actually initializing the step.

type BaseFilter

type BaseFilter[RequestType FilterWeigherPipelineRequest, Opts FilterWeigherPipelineStepOpts] struct {
	BaseFilterWeigherPipelineStep[RequestType, Opts]
}

Common base for all steps that provides some functionality that would otherwise be duplicated across all steps.

func (*BaseFilter[RequestType, Opts]) Init

func (s *BaseFilter[RequestType, Opts]) Init(ctx context.Context, client client.Client, step v1alpha1.FilterSpec) error

Init the filter with the database and options.

type BaseFilterWeigherPipelineStep

type BaseFilterWeigherPipelineStep[RequestType FilterWeigherPipelineRequest, Opts FilterWeigherPipelineStepOpts] struct {
	// Options to pass via yaml to this step.
	Options Opts
	// The activation function to use.
	ActivationFunction
	// The kubernetes client to use.
	Client client.Client
}

Common base for all steps that provides some functionality that would otherwise be duplicated across all steps.

func (*BaseFilterWeigherPipelineStep[RequestType, Opts]) CheckKnowledges

func (d *BaseFilterWeigherPipelineStep[RequestType, Opts]) CheckKnowledges(ctx context.Context, kns ...corev1.ObjectReference) error

Check if all knowledges are ready, and if not, return an error indicating why not.

func (*BaseFilterWeigherPipelineStep[RequestType, Opts]) IncludeAllHostsFromRequest

func (s *BaseFilterWeigherPipelineStep[RequestType, Opts]) IncludeAllHostsFromRequest(request RequestType) *FilterWeigherPipelineStepResult

Get a default result (no action) for the input weight keys given in the request. Use this to initialize the result before applying filtering/weighing logic.

func (*BaseFilterWeigherPipelineStep[RequestType, Opts]) Init

func (s *BaseFilterWeigherPipelineStep[RequestType, Opts]) Init(ctx context.Context, client client.Client, params v1alpha1.Parameters) error

Init the step with the database and options.

func (*BaseFilterWeigherPipelineStep[RequestType, Opts]) PrepareStats

func (s *BaseFilterWeigherPipelineStep[RequestType, Opts]) PrepareStats(request RequestType, unit string) FilterWeigherPipelineStepStatistics

Get default statistics for the input weight keys given in the request.

func (*BaseFilterWeigherPipelineStep[RequestType, Opts]) Validate

func (s *BaseFilterWeigherPipelineStep[RequestType, Opts]) Validate(ctx context.Context, params v1alpha1.Parameters) error

Validate that the given config is valid for this step. This is used in the pipeline validation to check if the pipeline configuration is valid without actually initializing the step.

type BasePipelineController

type BasePipelineController[PipelineType any] struct {
	// Initialized pipelines by their name.
	Pipelines map[string]PipelineType
	// The configured pipelines by their name.
	PipelineConfigs map[string]v1alpha1.Pipeline
	// Delegate to create pipelines.
	Initializer PipelineInitializer[PipelineType]
	// Kubernetes client to manage/fetch resources.
	client.Client
	// The scheduling domain to scope resources to.
	SchedulingDomain v1alpha1.SchedulingDomain
	// Manager for creating, updating, and deleting History CRDs.
	HistoryManager HistoryClient
}

Base controller for decision pipelines.

func (*BasePipelineController[PipelineType]) HandleKnowledgeCreated

func (c *BasePipelineController[PipelineType]) HandleKnowledgeCreated(
	ctx context.Context,
	evt event.CreateEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a knowledge watch to handle created knowledges.

This handler will re-evaluate all pipelines depending on the knowledge.

func (*BasePipelineController[PipelineType]) HandleKnowledgeDeleted

func (c *BasePipelineController[PipelineType]) HandleKnowledgeDeleted(
	ctx context.Context,
	evt event.DeleteEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a knowledge watch to handle deleted knowledges.

This handler will re-evaluate all pipelines depending on the knowledge.

func (*BasePipelineController[PipelineType]) HandleKnowledgeUpdated

func (c *BasePipelineController[PipelineType]) HandleKnowledgeUpdated(
	ctx context.Context,
	evt event.UpdateEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a knowledge watch to handle updated knowledges.

This handler will re-evaluate all pipelines depending on the knowledge.

func (*BasePipelineController[PipelineType]) HandlePipelineCreated

func (c *BasePipelineController[PipelineType]) HandlePipelineCreated(
	ctx context.Context,
	evt event.CreateEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a pipeline watch to handle created pipelines.

This handler will initialize new pipelines as needed and put them into the pipeline map.

func (*BasePipelineController[PipelineType]) HandlePipelineDeleted

func (c *BasePipelineController[PipelineType]) HandlePipelineDeleted(
	ctx context.Context,
	evt event.DeleteEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a pipeline watch to handle deleted pipelines.

This handler will remove pipelines from the pipeline map.

func (*BasePipelineController[PipelineType]) HandlePipelineUpdated

func (c *BasePipelineController[PipelineType]) HandlePipelineUpdated(
	ctx context.Context,
	evt event.UpdateEvent,
	queue workqueue.TypedRateLimitingInterface[reconcile.Request],
)

Handler bound to a pipeline watch to handle updated pipelines.

This handler will initialize new pipelines as needed and put them into the pipeline map.

func (*BasePipelineController[PipelineType]) InitAllPipelines

func (c *BasePipelineController[PipelineType]) InitAllPipelines(ctx context.Context) error

Handle the startup of the manager by initializing the pipeline map.

type BaseWeigher

type BaseWeigher[RequestType FilterWeigherPipelineRequest, Opts FilterWeigherPipelineStepOpts] struct {
	BaseFilterWeigherPipelineStep[RequestType, Opts]
}

Common base for all steps that provides some functionality that would otherwise be duplicated across all steps.

func (*BaseWeigher[RequestType, Opts]) Init

func (s *BaseWeigher[RequestType, Opts]) Init(ctx context.Context, client client.Client, step v1alpha1.WeigherSpec) error

Init the weigher with the database and options.

func (*BaseWeigher[RequestType, Opts]) Validate

func (s *BaseWeigher[RequestType, Opts]) Validate(ctx context.Context, params v1alpha1.Parameters) error

Validate the weigher.

type Detection

type Detection interface {
	// Get the ID of the detected resource.
	GetResource() string
	// Get the host on which this resource is currently located.
	GetHost() string
	// Get the reason for the detection.
	GetReason() string
	// Set the reason for the detection.
	WithReason(reason string) Detection
}

type DetectionStepOpts

type DetectionStepOpts interface {
	// Validate the options for this step.
	Validate() error
}

Interface to which step options must conform.

type Detector

type Detector[DetectionType Detection] interface {
	// Detect resources such as VMs on their current hosts that should be
	// considered for descheduling.
	Run() ([]DetectionType, error)

	// Initialize the step.
	Init(ctx context.Context, client client.Client, step v1alpha1.DetectorSpec) error

	// Validate that the given config is valid for this step. This is used in
	// the pipeline validation to check if the pipeline configuration is valid
	// without actually initializing the step.
	Validate(ctx context.Context, params v1alpha1.Parameters) error
}

type DetectorCycleBreaker

type DetectorCycleBreaker[DetectionType Detection] interface {
	// Filter descheduling decisions to avoid cycles.
	Filter(ctx context.Context, decisions []DetectionType) ([]DetectionType, error)
}

type DetectorMonitor

type DetectorMonitor[DetectionType Detection] struct {
	// contains filtered or unexported fields
}

func (DetectorMonitor[DetectionType]) Init

func (m DetectorMonitor[DetectionType]) Init(
	ctx context.Context, client client.Client, step v1alpha1.DetectorSpec,
) error

Initialize the step with the database and options.

func (DetectorMonitor[DetectionType]) Run

func (m DetectorMonitor[DetectionType]) Run() ([]DetectionType, error)

Run the step and measure its execution time.

func (DetectorMonitor[DetectionType]) Validate

func (m DetectorMonitor[DetectionType]) Validate(ctx context.Context, params v1alpha1.Parameters) error

Validate the wrapped step configuration.

type DetectorPipeline

type DetectorPipeline[DetectionType Detection] struct {
	// Kubernetes client to create descheduling resources.
	client.Client
	// Cycle detector to avoid cycles in descheduling.
	Breaker DetectorCycleBreaker[DetectionType]
	// Monitor to use for tracking the pipeline.
	Monitor DetectorPipelineMonitor
	// contains filtered or unexported fields
}

func (*DetectorPipeline[DetectionType]) Combine

func (p *DetectorPipeline[DetectionType]) Combine(decisionsByStep map[string][]DetectionType) []DetectionType

Combine the decisions made by each step into a single list of resources to deschedule.

func (*DetectorPipeline[DetectionType]) Init

func (p *DetectorPipeline[DetectionType]) Init(
	ctx context.Context,
	confedSteps []v1alpha1.DetectorSpec,
	supportedSteps map[string]Detector[DetectionType],
) (unknownDetectors []string, detectorErrs map[string]error)

func (*DetectorPipeline[DetectionType]) Run

func (p *DetectorPipeline[DetectionType]) Run() map[string][]DetectionType

Execute the descheduler steps in parallel and collect the decisions made by each step.

type DetectorPipelineMonitor

type DetectorPipelineMonitor struct {

	// The name of the pipeline being monitored.
	PipelineName string
	// contains filtered or unexported fields
}

func NewDetectorPipelineMonitor

func NewDetectorPipelineMonitor() DetectorPipelineMonitor

func (*DetectorPipelineMonitor) Collect

func (m *DetectorPipelineMonitor) Collect(ch chan<- prometheus.Metric)

func (*DetectorPipelineMonitor) Describe

func (m *DetectorPipelineMonitor) Describe(ch chan<- *prometheus.Desc)

func (DetectorPipelineMonitor) SubPipeline

Get a copied pipeline monitor with the name set, after binding the metrics.

type EmptyDetectionStepOpts

type EmptyDetectionStepOpts struct{}

Empty step opts conforming to the StepOpts interface (validation always succeeds).

func (EmptyDetectionStepOpts) Validate

func (EmptyDetectionStepOpts) Validate() error

type EmptyFilterWeigherPipelineStepOpts

type EmptyFilterWeigherPipelineStepOpts struct{}

Empty step opts conforming to the StepOpts interface (validation always succeeds).

func (EmptyFilterWeigherPipelineStepOpts) Validate

type Filter

type Filter[RequestType FilterWeigherPipelineRequest] interface {
	FilterWeigherPipelineStep[RequestType]

	// Configure the filter and initialize things like a database connection.
	Init(ctx context.Context, client client.Client, step v1alpha1.FilterSpec) error

	// Validate the given config parameters for this filter.
	Validate(ctx context.Context, params v1alpha1.Parameters) error
}

Interface for a filter as part of the scheduling pipeline.

type FilterMonitor

type FilterMonitor[RequestType FilterWeigherPipelineRequest] struct {
	// contains filtered or unexported fields
}

Wraps a scheduler filter to monitor its execution.

func (*FilterMonitor[RequestType]) Init

func (fm *FilterMonitor[RequestType]) Init(ctx context.Context, client client.Client, step v1alpha1.FilterSpec) error

Initialize the wrapped filter.

func (*FilterMonitor[RequestType]) Run

func (fm *FilterMonitor[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error)

Run the filter and observe its execution.

func (*FilterMonitor[RequestType]) Validate

func (fm *FilterMonitor[RequestType]) Validate(ctx context.Context, params v1alpha1.Parameters) error

Validate the wrapped filter.

type FilterValidator

type FilterValidator[RequestType FilterWeigherPipelineRequest] struct {
	// The wrapped filter to validate.
	Filter Filter[RequestType]
}

Wrapper for scheduler steps that validates them before/after execution.

func (*FilterValidator[RequestType]) Init

func (s *FilterValidator[RequestType]) Init(ctx context.Context, client client.Client, step v1alpha1.FilterSpec) error

Initialize the wrapped filter with the database and options.

func (*FilterValidator[RequestType]) Run

func (s *FilterValidator[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error)

Run the filter and validate what happens.

func (*FilterValidator[RequestType]) Validate

func (s *FilterValidator[RequestType]) Validate(ctx context.Context, params v1alpha1.Parameters) error

Validate the wrapped filter.

type FilterWeigherPipeline

type FilterWeigherPipeline[RequestType FilterWeigherPipelineRequest] interface {
	// Run the scheduling pipeline with the given request.
	Run(request RequestType) (v1alpha1.DecisionResult, error)
}

type FilterWeigherPipelineMonitor

type FilterWeigherPipelineMonitor struct {
	// The pipeline name is used to differentiate between different pipelines.
	PipelineName string
	// contains filtered or unexported fields
}

Collection of Prometheus metrics to monitor scheduler pipeline

func NewPipelineMonitor

func NewPipelineMonitor() FilterWeigherPipelineMonitor

Create a new scheduler monitor and register the necessary Prometheus metrics.

func (*FilterWeigherPipelineMonitor) Collect

func (m *FilterWeigherPipelineMonitor) Collect(ch chan<- prometheus.Metric)

func (*FilterWeigherPipelineMonitor) Describe

func (m *FilterWeigherPipelineMonitor) Describe(ch chan<- *prometheus.Desc)

func (FilterWeigherPipelineMonitor) SubPipeline

Get a copied pipeline monitor with the name set, after binding the metrics.

type FilterWeigherPipelineRequest

type FilterWeigherPipelineRequest interface {
	// Get the hosts that went in the pipeline.
	GetHosts() []string
	// This function can be used by the pipeline to obtain a mutated version
	// of the request with only the given hosts remaining. This is helpful
	// for steps that filter out hosts. Hosts not included in the map
	// are considered as filtered out, and won't be reconsidered in later steps.
	// This function should also update the weights of the remaining hosts
	// accordingly, so that the weights map always corresponds to the hosts
	// that are currently in the request.
	Filter(includedHosts map[string]float64) FilterWeigherPipelineRequest
	// Get the weights for the hosts.
	GetWeights() map[string]float64
	// Get logging args to be used in the step's trace log.
	// Usually, this will be the request context including the request ID.
	GetTraceLogArgs() []slog.Attr
}

type FilterWeigherPipelineStep

type FilterWeigherPipelineStep[RequestType FilterWeigherPipelineRequest] interface {
	// Run this step in the scheduling pipeline.
	//
	// The request is immutable and modifications are stored in the result.
	// This allows steps to be run in parallel (e.g. weighers) without passing
	// mutable state around.
	//
	// All hosts that should not be filtered out must be included in the returned
	// map of activations. I.e., filters implementing this interface should
	// remove activations by omitting them from the returned map.
	//
	// Filters implementing this interface should adjust activation
	// values in the returned map, including all hosts from the request.
	//
	// A traceLog is provided that contains the global request id and should
	// be used to log the step's execution.
	Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error)
}

Steps can be chained together to form a scheduling pipeline.

type FilterWeigherPipelineStepMonitor

type FilterWeigherPipelineStepMonitor[RequestType FilterWeigherPipelineRequest] struct {
	// Mixin that can be embedded in a step to provide some activation function tooling.
	ActivationFunction
	// contains filtered or unexported fields
}

Wraps a scheduler step to monitor its execution.

func (*FilterWeigherPipelineStepMonitor[RequestType]) RunWrapped

func (s *FilterWeigherPipelineStepMonitor[RequestType]) RunWrapped(
	traceLog *slog.Logger,
	request RequestType,
	step FilterWeigherPipelineStep[RequestType],
) (*FilterWeigherPipelineStepResult, error)

Run the step and observe its execution.

type FilterWeigherPipelineStepOpts

type FilterWeigherPipelineStepOpts interface {
	// Validate the options for this step.
	Validate() error
}

Interface to which step options must conform.

type FilterWeigherPipelineStepResult

type FilterWeigherPipelineStepResult struct {
	// The activations calculated by this step.
	Activations map[string]float64

	// Step statistics like:
	//
	//	{
	//	  "max cpu contention": {
	//	     "unit": "cpu contention [%]",
	//	     "hosts": { "host 1": 10, "host 2": 10 }
	//	   },
	//	  "noisy projects": {
	//	     "unit": "vms of this project running on host [#]",
	//	     "hosts": { "host 1": 1, "host 2": 0 }
	//	   }
	//	}
	//
	// These statistics are used to display the step's effect on the hosts.
	// For example: max cpu contention: before [ 100%, 50%, 40% ], after [ 40%, 50%, 100% ]
	Statistics map[string]FilterWeigherPipelineStepStatistics
}

type FilterWeigherPipelineStepStatistics

type FilterWeigherPipelineStepStatistics struct {
	// The unit of the statistic.
	Unit string
	// The hosts and their values.
	Hosts map[string]float64
}

type HistoryClient

type HistoryClient struct {
	Client   client.Client
	Recorder events.EventRecorder
}

HistoryClient manages History CRDs for scheduling decisions. It holds the Kubernetes client and event recorder so callers don't have to pass them on every invocation.

func (*HistoryClient) CreateOrUpdateHistory

func (h *HistoryClient) CreateOrUpdateHistory(
	ctx context.Context,
	decision *v1alpha1.Decision,
	az *string,
	pipelineErr error,
) error

CreateOrUpdateHistory creates or updates a History CRD for the given decision. It is called after every pipeline run (success and failure). The pipelineErr parameter is used to generate a meaningful explanation when the pipeline fails. If a non-nil Recorder is set, a Kubernetes Event is emitted on the History object to short-term persist the scheduling decision.

func (*HistoryClient) Delete

func (h *HistoryClient) Delete(
	ctx context.Context,
	schedulingDomain v1alpha1.SchedulingDomain,
	resourceID string,
) error

Delete deletes the History CRD associated with the given scheduling domain and resource ID. It is a no-op if the History CRD does not exist.

type MonitoredCallback

type MonitoredCallback struct {
	// contains filtered or unexported fields
}

Helper to respond to the request with the given code and error. Adds monitoring for the time it took to handle the request.

func (MonitoredCallback) Respond

func (c MonitoredCallback) Respond(logger *slog.Logger, code int, err error, text string)

Respond to the request with the given code and error. Also log the time it took to handle the request.

type PipelineAdmissionWebhook

type PipelineAdmissionWebhook struct {
	// The scheduling domain this webhook handles (e.g., nova, cinder, manila).
	SchedulingDomain v1alpha1.SchedulingDomain
	// ValidatableFilters maps filter names to validatable filter instances.
	ValidatableFilters map[string]Validatable
	// ValidatableWeighers maps weigher names to validatable weigher instances.
	ValidatableWeighers map[string]Validatable
	// ValidatableDetectors maps detector names to validatable detector instances.
	ValidatableDetectors map[string]Validatable
}

PipelineAdmissionWebhook validates Pipeline resources for a specific scheduling domain. It checks that all configured steps (filters, weighers, detectors) exist in the provided indexes and that their parameters are valid.

func (*PipelineAdmissionWebhook) SetupWebhookWithManager

func (w *PipelineAdmissionWebhook) SetupWebhookWithManager(mgr ctrl.Manager) error

SetupWebhookWithManager sets up the validating webhook for Pipeline resources.

func (*PipelineAdmissionWebhook) ValidateCreate

func (w *PipelineAdmissionWebhook) ValidateCreate(
	ctx context.Context,
	pipeline *v1alpha1.Pipeline,
) (admission.Warnings, error)

ValidateCreate implements admission.Validator.

func (*PipelineAdmissionWebhook) ValidateDelete

func (w *PipelineAdmissionWebhook) ValidateDelete(
	ctx context.Context,
	pipeline *v1alpha1.Pipeline,
) (admission.Warnings, error)

ValidateDelete implements admission.Validator.

func (*PipelineAdmissionWebhook) ValidateUpdate

func (w *PipelineAdmissionWebhook) ValidateUpdate(
	ctx context.Context,
	oldPipeline, newPipeline *v1alpha1.Pipeline,
) (admission.Warnings, error)

ValidateUpdate implements admission.Validator.

type PipelineInitResult

type PipelineInitResult[PipelineType any] struct {
	// The pipeline, if successfully created.
	Pipeline PipelineType

	// Errors for filters, if any, by their name.
	FilterErrors map[string]error
	// Unknown filters that were referenced but not found in the index, by their name.
	UnknownFilters []string
	// Errors for weighers, if any, by their name.
	WeigherErrors map[string]error
	// Unknown weighers that were referenced but not found in the index, by their name.
	UnknownWeighers []string
	// Errors for detectors, if any, by their name.
	DetectorErrors map[string]error
	// Unknown detectors that were referenced but not found in the index, by their name.
	UnknownDetectors []string
}

Result returned by the InitPipeline interface method.

func InitNewFilterWeigherPipeline

func InitNewFilterWeigherPipeline[RequestType FilterWeigherPipelineRequest](
	ctx context.Context,
	client client.Client,
	name string,
	supportedFilters map[string]func() Filter[RequestType],
	confedFilters []v1alpha1.FilterSpec,
	supportedWeighers map[string]func() Weigher[RequestType],
	confedWeighers []v1alpha1.WeigherSpec,
	monitor FilterWeigherPipelineMonitor,
) PipelineInitResult[FilterWeigherPipeline[RequestType]]

Create a new pipeline with filters and weighers contained in the configuration.

type PipelineInitializer

type PipelineInitializer[PipelineType any] interface {
	// Initialize a new pipeline with the given steps.
	//
	// This method is delegated to the parent controller, when a pipeline needs
	// to be newly initialized or re-initialized to update it in the pipeline
	// map.
	InitPipeline(ctx context.Context, p v1alpha1.Pipeline) PipelineInitResult[PipelineType]

	// Get the accepted pipeline type for this controller.
	//
	// This is used to filter pipelines when listing existing pipelines on
	// startup or when reacting to pipeline events.
	PipelineType() v1alpha1.PipelineType
}

The base pipeline controller will delegate some methods to the parent controller struct. The parent controller only needs to conform to this interface and set the delegate field accordingly.

type Validatable

type Validatable interface {
	// Validate checks if the given parameters are valid for this step.
	Validate(ctx context.Context, params v1alpha1.Parameters) error
}

Validatable is implemented by all pipeline steps (filters, weighers, detectors). It allows validation of step parameters without full initialization.

type Weigher

type Weigher[RequestType FilterWeigherPipelineRequest] interface {
	FilterWeigherPipelineStep[RequestType]

	// Configure the step and initialize things like a database connection.
	Init(ctx context.Context, client client.Client, step v1alpha1.WeigherSpec) error

	// Validate the given config parameters for this weigher.
	Validate(ctx context.Context, params v1alpha1.Parameters) error
}

Interface for a weigher as part of the scheduling pipeline.

type WeigherMonitor

type WeigherMonitor[RequestType FilterWeigherPipelineRequest] struct {
	// contains filtered or unexported fields
}

Wraps a scheduler weigher to monitor its execution.

func (*WeigherMonitor[RequestType]) Init

func (wm *WeigherMonitor[RequestType]) Init(ctx context.Context, client client.Client, step v1alpha1.WeigherSpec) error

Initialize the wrapped weigher.

func (*WeigherMonitor[RequestType]) Run

func (wm *WeigherMonitor[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error)

Run the weigher and observe its execution.

func (*WeigherMonitor[RequestType]) Validate

func (wm *WeigherMonitor[RequestType]) Validate(ctx context.Context, params v1alpha1.Parameters) error

Validate the wrapped weigher.

type WeigherValidator

type WeigherValidator[RequestType FilterWeigherPipelineRequest] struct {
	// The wrapped weigher to validate.
	Weigher Weigher[RequestType]
}

Wrapper for scheduler steps that validates them before/after execution.

func (*WeigherValidator[RequestType]) Init

func (s *WeigherValidator[RequestType]) Init(ctx context.Context, client client.Client, step v1alpha1.WeigherSpec) error

Initialize the wrapped weigher with the database and options.

func (*WeigherValidator[RequestType]) Run

func (s *WeigherValidator[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error)

Run the weigher and validate what happens.

func (*WeigherValidator[RequestType]) Validate

func (s *WeigherValidator[RequestType]) Validate(ctx context.Context, params v1alpha1.Parameters) error

Validate the wrapped weigher.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL