flytek8s

package
v2.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2026 License: Apache-2.0, Apache-2.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const FlyteEnableVscode = "_F_E_VS"
View Source
const GpuPartitionSlicesLabel = "platform.union.ai/gpu-partition-slices"

GpuPartitionSlicesLabel is applied to pods that request a MIG GPU partition. The value is the number of compute slices consumed (out of 7 total), parsed from the "Xg.Ygb" partition size format. Downstream consumers (e.g. billing) use this to compute fractional GPU usage as slices/7.

View Source
const Interrupted = "Interrupted"
View Source
const OOMKilled = "OOMKilled"
View Source
const PodKind = "pod"
View Source
const PrimaryContainerKey = "primary_container_name"
View Source
const PrimaryContainerNotFound = "PrimaryContainerNotFound"
View Source
const ResourceNvidiaGPU = "nvidia.com/gpu"

ResourceNvidiaGPU is the name of the Nvidia GPU resource. Copied from: k8s.io/autoscaler/cluster-autoscaler/utils/gpu/gpu.go

View Source
const SIGKILL = 137

Variables

This section is empty.

Functions

func AddCoPilotToPod

func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilotPod *v1.PodSpec, iFace *core.TypedInterface, taskExecMetadata core2.TaskExecutionMetadata, inputPaths io.InputFilePaths, outputPaths io.OutputFilePaths, pilot *core.DataLoadingConfig) error

func AddFlyteCustomizationsToContainer

func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template.Parameters,
	mode ResourceCustomizationMode, container *v1.Container, extendedResources *core.ExtendedResources) error

AddFlyteCustomizationsToContainer takes a container definition which specifies how to run a Flyte task and fills in templated command and argument values, updates resources and decorates environment variables with platform and task-specific customizations.

func AddPreferredNodeSelectorRequirements

func AddPreferredNodeSelectorRequirements(base *v1.Affinity, weight int32, new ...v1.NodeSelectorRequirement)

AddPreferredNodeSelectorRequirements appends the provided v1.NodeSelectorRequirement objects to an existing v1.Affinity object's list of preferred scheduling terms. See: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#node-affinity-weight for how weights are used during scheduling.

func AddRequiredNodeSelectorRequirements

func AddRequiredNodeSelectorRequirements(base *v1.Affinity, new ...v1.NodeSelectorRequirement)

AddRequiredNodeSelectorRequirements adds the provided v1.NodeSelectorRequirement objects to an existing v1.Affinity object. If there are no existing required node selectors, the new v1.NodeSelectorRequirement will be added as-is. However, if there are existing required node selectors, we iterate over all existing node selector terms and append the node selector requirement. Note that multiple node selector terms are OR'd, and match expressions within a single node selector term are AND'd during scheduling. See: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#node-affinity

func AddTolerationsForExtendedResources

func AddTolerationsForExtendedResources(podSpec *v1.PodSpec) *v1.PodSpec

func ApplyContainerImageOverride

func ApplyContainerImageOverride(podSpec *v1.PodSpec, containerImage string, primaryContainerName string)

func ApplyExtendedResourcesOverrides

func ApplyExtendedResourcesOverrides(base, overrides *core.ExtendedResources) *core.ExtendedResources

Specialized merging of overrides into a base *core.ExtendedResources object. Note that doing a nested merge may not be the intended behavior all the time, so we handle each field separately here.

func ApplyFlytePodConfiguration

func ApplyFlytePodConfiguration(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, podSpec *v1.PodSpec, objectMeta *metav1.ObjectMeta, primaryContainerName string) (*v1.PodSpec, *metav1.ObjectMeta, error)

ApplyFlytePodConfiguration updates the PodSpec and ObjectMeta with various Flyte configuration. This includes applying default k8s configuration, applying overrides (resources etc.), injecting copilot containers, and merging with the configuration PodTemplate (if exists).

func ApplyGPUNodeSelectors

func ApplyGPUNodeSelectors(podSpec *v1.PodSpec, gpuAccelerator *core.GPUAccelerator)

func ApplyInterruptibleNodeAffinity

func ApplyInterruptibleNodeAffinity(interruptible bool, podSpec *v1.PodSpec)

ApplyInterruptibleNodeAffinity configures the node-affinity for the pod using the configuration specified.

func ApplyInterruptibleNodeSelectorRequirement

func ApplyInterruptibleNodeSelectorRequirement(interruptible bool, affinity *v1.Affinity)

ApplyInterruptibleNodeSelectorRequirement configures the node selector requirement of the node-affinity using the configuration specified.

func ApplyK8sResourceOverrides

func ApplyK8sResourceOverrides(teMetadata pluginmachinery_core.TaskExecutionMetadata, resources *v1.ResourceRequirements) v1.ResourceRequirements

ApplyK8sResourceOverrides ensures that both resource requests and limits are set. This is required because we run user executions in namespaces bound with a project quota and the Kubernetes scheduler will reject requests omitting these. This function is called by plugins that don't necessarily construct a default flyte container (container and k8s pod tasks) and therefore don't already receive the ApplyResourceOverrides treatment and subsequent validation which handles adding sensible defaults for requests and limits.

func ApplyPodTemplateOverride

func ApplyPodTemplateOverride(objectMeta metav1.ObjectMeta, podTemplate *core.K8SPod) (*v1.PodSpec, metav1.ObjectMeta, error)

func ApplyResourceOverrides

func ApplyResourceOverrides(resources, platformResources v1.ResourceRequirements, assignIfUnset bool) v1.ResourceRequirements

ApplyResourceOverrides handles resource resolution, allocation and validation. Primarily, it ensures that container resources do not exceed defined platformResource limits and in the case of assignIfUnset, ensures that limits and requests are sensibly set for resources of all types.

func ApplySharedMemory

func ApplySharedMemory(podSpec *v1.PodSpec, primaryContainerName string, SharedMemory *core.SharedMemory) error

func BuildIdentityPod

func BuildIdentityPod() *v1.Pod

func BuildPodLogContext

func BuildPodLogContext(pod *v1.Pod) *core.PodLogContext

func BuildRawContainer

func BuildRawContainer(ctx context.Context, tCtx pluginscore.TaskExecutionContext) (*v1.Container, error)

BuildRawContainer constructs a Container based on the definition passed by the TaskExecutionContext.

func BuildRawPod

BuildRawPod constructs a PodSpec and ObjectMeta based on the definition passed by the TaskExecutionContext. This definition does not include any configuration injected by Flyte.

func CalculateStorageSize

func CalculateStorageSize(requirements *v1.ResourceRequirements) *resource.Quantity

func CopilotCommandArgs

func CopilotCommandArgs(storageConfig *storage.Config) []string

func DataVolume

func DataVolume(name string, size *resource.Quantity) v1.Volume

func DecorateEnvVars

func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, envFroms []v1.EnvFromSource, taskEnvironmentVariables map[string]string, id pluginsCore.TaskExecutionID, consoleURL string) ([]v1.EnvVar, []v1.EnvFromSource)

func DemystifyFailedOrPendingPod

func DemystifyFailedOrPendingPod(
	ctx context.Context,
	pluginContext k8s.PluginContext,
	info pluginsCore.TaskInfo,
	namespace string,
	podName string,
	primaryContainerName string,
) (pluginsCore.PhaseInfo, error)

DemystifyFailedOrPendingPod inspects the pod status of a failed or pending pod and attempts to determine the reason for the failure or pending state. This function currently only handles pods in the Failed or Pending phase. This is useful to check the specific error from the pod in the rayjob, sparkjob, etc. For example, if the pod is in the Failed phase due to an image pull error, this function can return a more specific error message indicating that the image could not be pulled. Similarly, if the pod is in the Pending phase due to insufficient resources, this function can return an error message indicating that the pod could not be scheduled due to lack of resources.

func DemystifyFailure

func DemystifyFailure(ctx context.Context, status v1.PodStatus, info pluginsCore.TaskInfo, primaryContainerName string) (pluginsCore.PhaseInfo, error)

DemystifyFailure resolves the various Kubernetes pod failure modes to determine the most appropriate course of action

func DemystifyPending

func DemystifyPending(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error)

DemystifyPending is one the core functions, that helps FlytePropeller determine if a pending pod is indeed pending, or it is actually stuck in a un-reparable state. In such a case the pod should be marked as dead and the task should be retried. This has to be handled sadly, as K8s is still largely designed for long running services that should recover from failures, but Flyte pods are completely automated and should either run or fail Important considerations. Pending Status in Pod could be for various reasons and sometimes could signal a problem Case I: Pending because the Image pull is failing and it is backing off

This could be transient. So we can actually rely on the failure reason.
The failure transitions from ErrImagePull -> ImagePullBackoff

Case II: Not enough resources are available. This is tricky. It could be that the total number of

resources requested is beyond the capability of the system. for this we will rely on configuration
and hence input gates. We should not allow bad requests that Request for large number of resource through.
In the case it makes through, we will fail after timeout

func DemystifySuccess

func DemystifySuccess(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error)

func DeterminePrimaryContainerPhase

func DeterminePrimaryContainerPhase(ctx context.Context, primaryContainerName string, statuses []v1.ContainerStatus, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo

DeterminePrimaryContainerPhase as the name suggests, given all the containers, will return a pluginsCore.PhaseInfo object corresponding to the phase of the primaryContainer which is identified using the provided name. This is useful in case of sidecars or pod jobs, where Flyte will monitor successful exit of a single container.

func DownloadCommandArgs

func DownloadCommandArgs(fromInputsPath, outputPrefix storage.DataReference, toLocalPath string, format core.DataLoadingConfig_LiteralMapFormat, inputInterface *core.VariableMap) ([]string, error)

func FlyteCoPilotContainer

func FlyteCoPilotContainer(name string, cfg config.FlyteCoPilotConfig, args []string, volumeMounts ...v1.VolumeMount) (v1.Container, error)

func GetContainer

func GetContainer(podSpec *v1.PodSpec, name string) (*v1.Container, error)

func GetContextEnvVars

func GetContextEnvVars(ownerCtx context.Context) []v1.EnvVar

func GetExecutionEnvVars

func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1.EnvVar

func GetLastTransitionOccurredAt

func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time

func GetLiteralOffloadingEnvVars

func GetLiteralOffloadingEnvVars() []v1.EnvVar

func GetMessageAfterGracePeriod

func GetMessageAfterGracePeriod(message string, gracePeriod time.Duration) string

func GetNormalizedAcceleratorDevice

func GetNormalizedAcceleratorDevice(device string) string

getNormalizedAcceleratorDevice returns the normalized name for the given device. This should map to the node label that the corresponding nodes are provisioned with. Falls back to the original device name if the device is not configured.

func GetPodTemplateUpdatesHandler

func GetPodTemplateUpdatesHandler(store *PodTemplateStore) cache.ResourceEventHandler

GetPodTemplateUpdatesHandler returns a new ResourceEventHandler which adds / removes PodTemplates to / from the provided PodTemplateStore.

func GetPodTolerations

func GetPodTolerations(interruptible bool, resourceRequirements ...v1.ResourceRequirements) []v1.Toleration

func GetPrimaryContainerName

func GetPrimaryContainerName(pod *v1.Pod) string

func GetReportedAt

func GetReportedAt(pod *v1.Pod) metav1.Time

func GetServiceAccountNameFromTaskExecutionMetadata

func GetServiceAccountNameFromTaskExecutionMetadata(taskExecutionMetadata pluginmachinery_core.TaskExecutionMetadata) string

func IsVscodeEnabled

func IsVscodeEnabled(ctx context.Context, envVar []v1.EnvVar) bool

func MergeBasePodSpecOntoTemplate

func MergeBasePodSpecOntoTemplate(templatePodSpec *v1.PodSpec, basePodSpec *v1.PodSpec, primaryContainerName string, primaryInitContainerName string) (*v1.PodSpec, error)

MergeBasePodSpecOntoTemplate merges a base pod spec onto a template pod spec. The template pod spec has some magic values that allow users to specify templates that target all containers and primary containers. Aside from magic values this method will merge containers that have matching names.

func MergeOverlayPodSpecOntoBase

func MergeOverlayPodSpecOntoBase(basePodSpec *v1.PodSpec, overlayPodSpec *v1.PodSpec) (*v1.PodSpec, error)

MergeOverlayPodSpecOntoBase merges a customized pod spec onto a base pod spec. At a container level it will merge containers that have matching names.

func MergeResources

func MergeResources(in v1.ResourceRequirements, out *v1.ResourceRequirements)

func MergeWithBasePodTemplate

func MergeWithBasePodTemplate(ctx context.Context, tCtx pluginsCore.TaskExecutionContext,
	podSpec *v1.PodSpec, objectMeta *metav1.ObjectMeta, primaryContainerName, primaryInitContainerName string) (*v1.PodSpec, *metav1.ObjectMeta, error)

MergeWithBasePodTemplate attempts to merge the provided PodSpec and ObjectMeta with the configuration PodTemplate for this task.

func SanitizeGPUResourceRequirements

func SanitizeGPUResourceRequirements(resources *v1.ResourceRequirements, accelerator *core.GPUAccelerator)

SanitizeGPUResourceRequirements converts generic 'gpu' resource requirements to the desired accelerator resource name.

func SidecarCommandArgs

func SidecarCommandArgs(fromLocalPath string, outputPrefix, rawOutputPath storage.DataReference, uploadTimeout time.Duration, iface *core.TypedInterface) ([]string, error)

func ToK8sContainer

func ToK8sContainer(ctx context.Context, tCtx pluginscore.TaskExecutionContext) (*v1.Container, error)

ToK8sContainer builds a Container based on the definition passed by the TaskExecutionContext. This involves applying all Flyte configuration including k8s plugins and resource requests.

func ToK8sEnvVar

func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar

func ToK8sPodSpec

ToK8sPodSpec builds a PodSpec and ObjectMeta based on the definition passed by the TaskExecutionContext. This involves parsing the raw PodSpec definition and applying all Flyte configuration options.

func ToK8sResourceList

func ToK8sResourceList(resources []*core.Resources_ResourceEntry) (v1.ResourceList, error)

TODO we should modify the container resources to contain a map of enum values? Also we should probably create tolerations / taints, but we could do that as a post process

func ToK8sResourceRequirements

func ToK8sResourceRequirements(resources *core.Resources) (*v1.ResourceRequirements, error)

func UpdatePod

func UpdatePod(taskExecutionMetadata pluginsCore.TaskExecutionMetadata,
	resourceRequirements []v1.ResourceRequirements, podSpec *v1.PodSpec)

UpdatePod updates the base pod spec used to execute tasks. This is configured with plugins and task metadata-specific options

Types

type NodeExecutionK8sReader

type NodeExecutionK8sReader struct {
	// contains filtered or unexported fields
}

func (NodeExecutionK8sReader) Get

func (NodeExecutionK8sReader) List

type PluginTaskExecutionContextOption

type PluginTaskExecutionContextOption func(*pluginTaskExecutionContext)

type PodTemplateStore

type PodTemplateStore struct {
	*sync.Map
	// contains filtered or unexported fields
}

PodTemplateStore maintains a thread-safe mapping of active PodTemplates with their associated namespaces.

var DefaultPodTemplateStore PodTemplateStore = NewPodTemplateStore()

func NewPodTemplateStore

func NewPodTemplateStore() PodTemplateStore

NewPodTemplateStore initializes a new PodTemplateStore

func (*PodTemplateStore) Delete

func (p *PodTemplateStore) Delete(podTemplate *v1.PodTemplate)

Delete removes the specified PodTemplate from the store.

func (*PodTemplateStore) LoadOrDefault

func (p *PodTemplateStore) LoadOrDefault(namespace string, podTemplateName string) *v1.PodTemplate

LoadOrDefault returns the PodTemplate with the specified name in the given namespace. If one does not exist it attempts to retrieve the one associated with the defaultNamespace.

func (*PodTemplateStore) SetDefaultNamespace

func (p *PodTemplateStore) SetDefaultNamespace(namespace string)

SetDefaultNamespace sets the default namespace for the PodTemplateStore.

func (*PodTemplateStore) Store

func (p *PodTemplateStore) Store(podTemplate *v1.PodTemplate)

Store loads the specified PodTemplate into the store.

type ResourceCustomizationMode

type ResourceCustomizationMode int
const (
	// ResourceCustomizationModeAssignResources is used for container tasks where resources are validated and assigned if necessary.
	ResourceCustomizationModeAssignResources ResourceCustomizationMode = iota
	// ResourceCustomizationModeMergeExistingResources is used for primary containers in pod tasks where container requests and limits are
	// merged, validated and assigned if necessary.
	ResourceCustomizationModeMergeExistingResources
	// ResourceCustomizationModeEnsureExistingResourcesInRange is used for secondary containers in pod tasks where requests and limits are only
	// adjusted if needed (downwards).
	ResourceCustomizationModeEnsureExistingResourcesInRange
)

func ResourceCustomizationModeString

func ResourceCustomizationModeString(s string) (ResourceCustomizationMode, error)

ResourceCustomizationModeString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.

func ResourceCustomizationModeValues

func ResourceCustomizationModeValues() []ResourceCustomizationMode

ResourceCustomizationModeValues returns all values of the enum

func (ResourceCustomizationMode) IsAResourceCustomizationMode

func (i ResourceCustomizationMode) IsAResourceCustomizationMode() bool

IsAResourceCustomizationMode returns "true" if the value is listed in the enum definition. "false" otherwise

func (ResourceCustomizationMode) String

func (i ResourceCustomizationMode) String() string

type ResourceRequirement

type ResourceRequirement struct {
	Request resource.Quantity
	Limit   resource.Quantity
}

func AdjustOrDefaultResource

func AdjustOrDefaultResource(request, limit, platformDefault, platformLimit resource.Quantity) ResourceRequirement

AdjustOrDefaultResource validates resources conform to platform limits and assigns defaults for Request and Limit values by using the Request when the Limit is unset, and vice versa.

Directories

Path Synopsis
Package config contains configuration for the flytek8s module - which is global configuration for all Flyte K8s interactions.
Package config contains configuration for the flytek8s module - which is global configuration for all Flyte K8s interactions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL