common

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2025 License: Apache-2.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Finalizer            = "kaiwo.silogen.ai/finalizer"
	KaiwoQueueConfigName = "kaiwo"
)
View Source
const (
	KaiwoLabelBase              = "kaiwo.silogen.ai"
	KaiwoUserLabel              = KaiwoLabelBase + "/user"
	KaiwoNameLabel              = KaiwoLabelBase + "/name"
	KaiwoTypeLabel              = KaiwoLabelBase + "/type"
	KaiwoRunIdLabel             = KaiwoLabelBase + "/run-id"
	KaiwoManagedLabel           = KaiwoLabelBase + "/managed"
	QueueLabel                  = "kueue.x-k8s.io/queue-name"
	WorkloaddPriorityClassLabel = "kueue.x-k8s.io/priority-class"
	CPUOnly                     = "cpu-only"
	KueueRequiredTopologyKey    = "kueue.x-k8s.io/podset-required-topology"
	KueuePreferredTopologyKey   = "kueue.x-k8s.io/podset-preferred-topology"
)
View Source
const (
	PreemptableConditionType                         string        = "Preemptable"
	PreemptReasonDurationNotExceeded                 PreemptReason = "DurationNotExceeded"
	PreemptReasonDurationExceeded                    PreemptReason = "DurationExceeded"
	PreemptReasonDurationExceededWithActiveGpuDemand PreemptReason = "DurationExceededWithActiveGpuDemand"
)
View Source
const (
	DataStoragePostfix = "data"
	HfStoragePostfix   = "hf"
)
View Source
const (
	KaiwoDownloadTypeLabelValue = "downloader"
)
View Source
const PvcReadyConditionType = "PvcReady"
View Source
const WorkloadEarlyTerminationConditionType = "WorkloadTerminatedEarly"

Variables

View Source
var (
	DefaultNodePoolLabel      = "kaiwo/nodepool"
	DefaultKaiwoWorkerLabel   = "kaiwo/worker"
	GPUModelLabel             = "kaiwo/gpu-model"
	DefaultTopologyBlockLabel = "kaiwo/topology-block"
	DefaultTopologyRackLabel  = "kaiwo/topology-rack"
	DefaultTopologyHostLabel  = "kubernetes.io/hostname"
	DefaultTopologyName       = "default-topology"
)
View Source
var (
	DefaultMemory = resource.MustParse("16Gi")
	DefaultCPU    = resource.MustParse("2")
)
View Source
var DefaultClusterQueueName = baseutils.GetEnv("DEFAULT_CLUSTER_QUEUE_NAME", "kaiwo")
View Source
var DefaultRequeueDuration = 2 * time.Second

Functions

func AddEntrypoint added in v0.1.6

func AddEntrypoint(entrypoint string, podTemplateSpec *corev1.PodTemplateSpec) error

AddEntrypoint updates the entrypoint command in the PodTemplateSpec.

func AreAnyPodsRunning added in v0.1.6

func AreAnyPodsRunning(ctx context.Context, k8sClient client.Client, namespace string, matchingLabels client.MatchingLabels) (bool, error)

AreAnyPodsRunning checks if any pods are running that match a given namespace and labels

func CheckKaiwoWorkloadShouldBeTerminatedForUnderutilization added in v0.1.6

func CheckKaiwoWorkloadShouldBeTerminatedForUnderutilization(ctx context.Context, workload KaiwoWorkload) (bool, string)

CheckKaiwoWorkloadShouldBeTerminatedForUnderutilization checks if the Kaiwo workload should be terminated due to resource underutilization

func CleanupExpiredWorkloads added in v0.1.6

func CleanupExpiredWorkloads(ctx context.Context, k8sClient client.Client) (bool, error)

func ClusterHasGpuDemand added in v0.1.6

func ClusterHasGpuDemand(ctx context.Context, k8sClient client.Client, clusterQueue string, gpuVendor string, config KaiwoConfigContext) (bool, error)

func ConditionsEqual added in v0.1.6

func ConditionsEqual(a, b []metav1.Condition) bool

ConditionsEqual checks if two sets of conditions are the same, ignoring the LastTransitionTime

func CopyLabels

func CopyLabels(kaiwoLabels map[string]string, objectMeta *v1.ObjectMeta)

CopyLabels copies labels from kaiwoLabels to objectMeta.Labels, skipping keys that already exist

func CreateResourceRequirements added in v0.1.6

func CreateResourceRequirements(config KaiwoConfigContext, resourceConfig ResourceConfig, rayhead bool) corev1.ResourceRequirements

CreateResourceRequirements converts the scheduling config into ResourceRequirements that can be used to modify the workload containers

func DeleteUnderlyingResources added in v0.1.6

func DeleteUnderlyingResources(ctx context.Context, uid types.UID, name string, namespace string, k8sClient client.Client) error

DeleteUnderlyingResources deletes all the underlying resources that a workload owns

func EnsureConfig added in v0.1.6

func EnsureConfig(ctx context.Context, k8sClient client.Client) error

EnsureConfig ensures that a Kaiwo configuration exists within the cluster. It provides a way to either create one automatically, or to give some time for another system to create one while the controller is starting.

func EnsureLocalQueue added in v0.1.6

func EnsureLocalQueue(ctx context.Context, k8sClient client.Client, scheme *runtime.Scheme, clusterQueueName string, name string, namespace string) error

EnsureLocalQueue makes sure that a local queue exists. If it doesn't, it attempts to create it. Creation will fail if the referenced ClusterQueue does not exist within the KaiwoQueueConfig, or if the referenced ClusterQueue lists namespaces and the provided one does not match any of them.

func GetClusterQueueName added in v0.1.6

func GetClusterQueueName(ctx context.Context, workload KaiwoWorkload) string

func GetContextWithConfig added in v0.1.6

func GetContextWithConfig(ctx context.Context, k8sClient client.Client) (context.Context, error)

GetContextWithConfig fetches the latest config and attaches it to a given context

func GetKueueWorkload added in v0.1.6

func GetKueueWorkload(ctx context.Context, k8sClient client.Client, namespace string, uid string) (*kueuev1beta1.Workload, error)

func GetPodTemplate added in v0.1.6

func GetPodTemplate(config KaiwoConfigContext, dshmSize resource.Quantity, dangerous bool, workloadContainerName string) corev1.PodTemplateSpec

func GetPreemptableCondition added in v0.1.6

func GetPreemptableCondition(ctx context.Context, handler KaiwoWorkload) *v1.Condition

GetPreemptableCondition gets the correct preemptable condition

func GetRemainingTimeBeforeBecomingPreemptable added in v0.1.6

func GetRemainingTimeBeforeBecomingPreemptable(handler KaiwoWorkload) *time.Duration

func GetSchedulableCondition added in v0.1.6

func GetSchedulableCondition(ctx context.Context, clusterCtx ClusterContext, workload KaiwoWorkload) metav1.Condition

func GetWorkloadPods added in v0.1.6

func GetWorkloadPods(ctx context.Context, k8sClient client.Client, workload KaiwoWorkload) ([]corev1.Pod, error)

func GetWorkloadServices added in v0.1.6

func GetWorkloadServices(ctx context.Context, k8sClient client.Client, workload KaiwoWorkload) ([]corev1.Service, error)

func IsAdmitted added in v0.1.6

func IsAdmitted(ctx context.Context, k8sClient client.Client, workload WorkloadReconciler) (bool, error)

IsAdmitted checks if a workload is fully admitted by Kueue

func ObserveBatchJob added in v0.1.6

func ObserveBatchJob(job *batchv1.Job, previousStatus kaiwo.WorkloadStatus) (kaiwo.WorkloadStatus, error)

func ObserveOverallStatus added in v0.1.6

func ObserveOverallStatus(ctx context.Context, k8sClient client.Client, reconcilers []ResourceReconciler, previousWorkloadStatus kaiwo.WorkloadStatus) (*kaiwo.WorkloadStatus, []metav1.Condition, error)

ObserveOverallStatus observes the overall status from a list of reconcilers, gathering any conditions they report as well

func SetEarlyTermination added in v0.1.6

func SetEarlyTermination(ctx context.Context, workload KaiwoWorkload, reason string, message string)

SetEarlyTermination flags a workload for early termination by 1. Setting the status to TERMINATING 2. Creating the WorkloadTerminatedEarly condition, but keeping its status as False (in order to record the reason)

func SetEarlyTerminationIfLowUtilizationThresholdExceeded added in v0.1.6

func SetEarlyTerminationIfLowUtilizationThresholdExceeded(ctx context.Context, workload KaiwoWorkload) bool

func SetKaiwoSystemLabels

func SetKaiwoSystemLabels(kaiwoLabelContext KaiwoLabelContext, objectMeta *v1.ObjectMeta)

SetKaiwoSystemLabels sets the Kaiwo system labels on an ObjectMeta instance

func ShouldPreempt added in v0.1.6

func ShouldPreempt(ctx context.Context, k8sClient client.Client, handler WorkloadReconciler) (bool, error)

ShouldPreempt checks if a workload should be preempted or not based on expired duration and current queue GPU demand

func ShouldRequeueAfter added in v0.1.6

func ShouldRequeueAfter(workload KaiwoWorkload) (bool, *time.Duration)

func TerminateWorkload added in v0.1.6

func TerminateWorkload(
	ctx context.Context,
	k8sClient client.Client,
	recorder record.EventRecorder,
	handler WorkloadReconciler,
) error

TerminateWorkload terminates a given workload by deleting all the child objects and setting an early termination condition and emitting an event

func ToPascalCase added in v0.1.6

func ToPascalCase(s string) string

ToPascalCase transforms a string like "hello there" into "HelloThere".

func UpdateLabels added in v0.1.6

func UpdateLabels(workload KaiwoWorkload, objectMeta *v1.ObjectMeta)

UpdateLabels propagates the Kaiwo workload's labels to the objectMeta, and sets the Kaiwo system labels

func UpdatePodSpec added in v0.1.6

func UpdatePodSpec(config KaiwoConfigContext, workload KaiwoWorkload, resourceConfig ResourceConfig, template *corev1.PodTemplateSpec, rayHead bool)

func UpdatePodSpecWithGPUModelAffinity added in v0.1.6

func UpdatePodSpecWithGPUModelAffinity(template *corev1.PodTemplateSpec, gpuModels []string, gpuModelLabel string)

Types

type ClusterContext added in v0.1.6

type ClusterContext struct {
	Nodes            []v1.Node
	GpuStats         GPUStats
	KaiwoQueueConfig v1alpha1.KaiwoQueueConfig
}

ClusterContext provides context of the cluster and its resources to help build downstream objects

func GetClusterContext added in v0.1.6

func GetClusterContext(ctx context.Context, k8sClient client.Client, workload KaiwoWorkload) (*ClusterContext, error)

GetClusterContext gathers the cluster context that is relevant for a particular workload

type DownloadConfigMapReconciler added in v0.1.6

type DownloadConfigMapReconciler struct {
	ObjectKey   client.ObjectKey
	StorageSpec v1alpha1.StorageSpec
}

func NewDownloadConfigMapReconciler added in v0.1.6

func NewDownloadConfigMapReconciler(objectKey client.ObjectKey, storageSpec v1alpha1.StorageSpec) *DownloadConfigMapReconciler

func (*DownloadConfigMapReconciler) BuildDesired added in v0.1.6

func (r *DownloadConfigMapReconciler) BuildDesired(ctx context.Context, clusterCtx ClusterContext) (client.Object, error)

func (*DownloadConfigMapReconciler) GetInitializedObject added in v0.1.6

func (r *DownloadConfigMapReconciler) GetInitializedObject() client.Object

func (*DownloadConfigMapReconciler) MutateActual added in v0.1.6

func (r *DownloadConfigMapReconciler) MutateActual(ctx context.Context, clusterCtx ClusterContext, actual client.Object) error

func (*DownloadConfigMapReconciler) ObserveStatus added in v0.1.6

func (r *DownloadConfigMapReconciler) ObserveStatus(ctx context.Context, k8sClient client.Client, obj client.Object, previousWorkloadStatus v1alpha1.WorkloadStatus) (*v1alpha1.WorkloadStatus, []metav1.Condition, error)

type DownloadJobReconciler added in v0.1.6

type DownloadJobReconciler struct {
	ObjectKey   client.ObjectKey
	StorageSpec v1alpha1.StorageSpec
	UserEnvVars []corev1.EnvVar
}

func NewDownloadJobReconciler added in v0.1.6

func NewDownloadJobReconciler(objectKey client.ObjectKey, storageSpec v1alpha1.StorageSpec, userEnvVars []corev1.EnvVar) *DownloadJobReconciler

func (*DownloadJobReconciler) BuildDesired added in v0.1.6

func (r *DownloadJobReconciler) BuildDesired(ctx context.Context, clusterCtx ClusterContext) (client.Object, error)

func (*DownloadJobReconciler) GetInitializedObject added in v0.1.6

func (r *DownloadJobReconciler) GetInitializedObject() client.Object

func (*DownloadJobReconciler) MutateActual added in v0.1.6

func (r *DownloadJobReconciler) MutateActual(ctx context.Context, clusterCtx ClusterContext, actual client.Object) error

func (*DownloadJobReconciler) ObserveStatus added in v0.1.6

func (r *DownloadJobReconciler) ObserveStatus(ctx context.Context, k8sClient client.Client, obj client.Object, previousWorkloadStatus v1alpha1.WorkloadStatus) (*v1alpha1.WorkloadStatus, []metav1.Condition, error)

type DownloadJobSucceededReason added in v0.1.6

type DownloadJobSucceededReason string
const (
	DownloadJobSucceededConditionType = "DownloadJobSucceeded"

	DownloadJobFailed     DownloadJobSucceededReason = "JobFailed"
	DownloadJobInProgress DownloadJobSucceededReason = "JobInProgress"
	DownloadJobCompleted  DownloadJobSucceededReason = "JobCompleted"
	DownloadJobPending    DownloadJobSucceededReason = "JobPending"
)

type GPUStats added in v0.1.6

type GPUStats struct {
	// TotalClusterGPUs is the total number of GPUs (regardless of current allocation) across all matching nodes.
	TotalClusterGPUs int

	// TotalAllocatableGPUs is the sum of currently allocatable GPUs across all matching nodes. Only populated when useAvailability=true.
	TotalAllocatableGPUs int

	// MinAllocatableGPUsPerNode is the smallest number of allocatable GPUs observed on any single matching node. Only meaningful if useAvailability=true.
	MinAllocatableGPUsPerNode int

	// MinGPUsPerNode is the smallest total GPU count (ignoring allocation) observed on any single matching node.
	MinGPUsPerNode int

	// NodeGPUMap maps each node’s name to its GPU count. If useAvailability=false this is the total GPUs per node, otherwise it is the allocatable GPUs per node.
	NodeGPUMap map[string]int
}

type GroupReconciler added in v0.1.6

type GroupReconciler interface {
	GetResourceReconcilers() []ResourceReconciler

	// ObserveStatus gives the current status and conditions of this reconciler group
	ObserveStatus(ctx context.Context, k8sClient client.Client, previousWorkloadStatus kaiwo.WorkloadStatus) (kaiwo.WorkloadStatus, []metav1.Condition, error)
}

GroupReconciler provides a way to link related reconcilers and observe their status

type KaiwoConfigContext added in v0.1.6

type KaiwoConfigContext struct {
	configapi.KaiwoConfigSpec
}

KaiwoConfigContext holds the config that is relevant for a particular context In the future it can be extended with namespace-specific configuration as well

func ConfigFromContext added in v0.1.6

func ConfigFromContext(ctx context.Context) KaiwoConfigContext

ConfigFromContext extracts the cfg (or panics if missing).

type KaiwoLabelContext

type KaiwoLabelContext struct {
	User    string
	Name    string
	Type    string
	RunId   string
	Managed string
}

func GetKaiwoLabelContext

func GetKaiwoLabelContext(k KaiwoWorkload) KaiwoLabelContext

type KaiwoWorkload

type KaiwoWorkload interface {
	// GetKaiwoWorkloadObject returns the underlying Kaiwo workload object (KaiwoJob or KaiwoService)
	GetKaiwoWorkloadObject() client.Object

	// GetCommonSpec returns the common spec object
	GetCommonSpec() kaiwo.CommonMetaSpec

	// GetCommonStatusSpec returns the common status object
	GetCommonStatusSpec() *kaiwo.CommonStatusSpec
}

KaiwoWorkload is a generic interface for any Kaiwo workload

type PreemptReason added in v0.1.6

type PreemptReason string

type PvcReconciler added in v0.1.6

type PvcReconciler struct {
	ObjectKey        client.ObjectKey
	Amount           resource.Quantity
	StorageClassName string
	AccessMode       corev1.PersistentVolumeAccessMode
}

func NewPvcReconciler added in v0.1.6

func NewPvcReconciler(objectKey client.ObjectKey, amount string, storageClassName string, accessMode corev1.PersistentVolumeAccessMode) *PvcReconciler

func (*PvcReconciler) BuildDesired added in v0.1.6

func (r *PvcReconciler) BuildDesired(ctx context.Context, clusterCtx ClusterContext) (client.Object, error)

func (*PvcReconciler) GetInitializedObject added in v0.1.6

func (r *PvcReconciler) GetInitializedObject() client.Object

func (*PvcReconciler) MutateActual added in v0.1.6

func (r *PvcReconciler) MutateActual(ctx context.Context, clusterCtx ClusterContext, actual client.Object) error

func (*PvcReconciler) ObserveStatus added in v0.1.6

func (r *PvcReconciler) ObserveStatus(ctx context.Context, k8sClient client.Client, obj client.Object, previousWorkloadStatus kaiwo.WorkloadStatus) (*kaiwo.WorkloadStatus, []metav1.Condition, error)

type Reconciler

type Reconciler struct {
	Client   client.Client
	Recorder record.EventRecorder
	Scheme   *runtime.Scheme

	// WorkloadHandler is the handler for the actual workload
	WorkloadHandler WorkloadHandler

	// StorageHandler is the handler for the optional storage component (PVCs and download job)
	StorageHandler *StorageHandler

	ClusterContext ClusterContext
}

func (*Reconciler) Reconcile

func (wr *Reconciler) Reconcile(ctx context.Context) (ctrl.Result, error)

Reconcile serves as a central reconciliation function for all Kaiwo workloads. It is broken into the following steps 1. Observe the workload status from the cluster 2. If the status or conditions have changed, update the status and requeue 3. If the status is active, ensure all remote resources match the desired state

type ResourceConfig added in v0.1.6

type ResourceConfig struct {
	TotalGpus      int
	Replicas       int
	GpusPerReplica int
	GpuVendor      string

	DefaultResources *v1.ResourceRequirements
}

func CalculateResourceConfig added in v0.1.6

func CalculateResourceConfig(
	ctx context.Context,
	clusterCtx ClusterContext,
	workload KaiwoWorkload,
	useAvailability bool,
) ResourceConfig

CalculateResourceConfig calculates the workload scheduling config based on the requested GPUs and available cluster resources

type ResourceReconciler

type ResourceReconciler interface {
	// GetInitializedObject returns an object that has the correct TypeMeta and ObjectMeta attributes set
	GetInitializedObject() client.Object

	// BuildDesired builds the desired downstream workload object state
	BuildDesired(ctx context.Context, clusterCtx ClusterContext) (client.Object, error)

	// MutateActual provides a way to mutate an existing object to match the desired state
	MutateActual(ctx context.Context, clusterCtx ClusterContext, actual client.Object) error

	// ObserveStatus returns the current status of the object.
	// The object is guaranteed to have existed at least right before this method is called, so
	// a not-nil check is not required.
	ObserveStatus(ctx context.Context, k8sClient client.Client, obj client.Object, previousWorkloadStatus kaiwo.WorkloadStatus) (*kaiwo.WorkloadStatus, []metav1.Condition, error)
}

ResourceReconciler is an interface for building and updating Kubernetes resources

type ResourceUnderutilizationStatus added in v0.1.6

type ResourceUnderutilizationStatus string
const (
	KaiwoResourceUtilizationType                                = "ResourceUnderutilization"
	GpuResourceUtilizationNormal ResourceUnderutilizationStatus = "GpuUtilizationNormal"
	GpuResourceUtilizationLow    ResourceUnderutilizationStatus = "GpuUtilizationLow"
)

type SchedulingReason added in v0.1.6

type SchedulingReason string
const (
	SchedulableType = "Schedulable"

	Schedulable SchedulingReason = "Schedulable"

	UnschedulableNoGPUs           SchedulingReason = "NoGPUs"
	UnschedulableInsufficientGPUs SchedulingReason = "InsufficientGPUs"

	UnschedulableWrongQueueNamespace  SchedulingReason = "WrongQueueNamespace"
	UnschedulableClusterQueueNotFound SchedulingReason = "ClusterQueueNotFound"
)

type StorageHandler added in v0.1.6

type StorageHandler struct {
	ObjectKey      client.ObjectKey
	CommonMetaSpec v1alpha1.CommonMetaSpec
}

StorageHandler handles the storage-related reconcilers (PVCs and download config map / job)

func NewStorageHandler added in v0.1.6

func NewStorageHandler(workloadHandler WorkloadHandler) *StorageHandler

func (StorageHandler) GetResourceReconcilers added in v0.1.6

func (h StorageHandler) GetResourceReconcilers() []ResourceReconciler

func (StorageHandler) ObserveStatus added in v0.1.6

func (h StorageHandler) ObserveStatus(ctx context.Context, k8sClient client.Client, previousWorkloadStatus v1alpha1.WorkloadStatus) (v1alpha1.WorkloadStatus, []metav1.Condition, error)

type WorkloadAdmittedReason added in v0.1.6

type WorkloadAdmittedReason string
const (
	WorkloadAdmittedType = "Admitted"

	WorkloadAdmitted WorkloadAdmittedReason = "Admitted"
	WorkloadPending  WorkloadAdmittedReason = "Pending"
)

type WorkloadHandler added in v0.1.6

type WorkloadHandler struct {
	Workload WorkloadReconciler
}

WorkloadHandler handles the workload-related reconciler(s)

func (WorkloadHandler) GetResourceReconcilers added in v0.1.6

func (h WorkloadHandler) GetResourceReconcilers() []ResourceReconciler

func (WorkloadHandler) ObserveStatus added in v0.1.6

func (h WorkloadHandler) ObserveStatus(ctx context.Context, k8sClient client.Client, previousWorkloadStatus v1alpha1.WorkloadStatus) (v1alpha1.WorkloadStatus, []metav1.Condition, error)

type WorkloadReconciler added in v0.1.6

type WorkloadReconciler interface {
	ResourceReconciler
	KaiwoWorkload

	// GetKueueWorkloads returns a list of the Kueue Workloads that govern the admission of this workload
	GetKueueWorkloads(ctx context.Context, k8sClient client.Client) ([]kueuev1beta1.Workload, error)
}

WorkloadReconciler extends ResourceReconciler with workload-specific methods

type WorkloadTerminationReason added in v0.1.6

type WorkloadTerminationReason string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL