Documentation
¶
Index ¶
- Constants
- Variables
- func AddCoPilotToContainer(ctx context.Context, cfg config.FlyteCoPilotConfig, c *v1.Container, ...) error
- func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilotPod *v1.PodSpec, ...) (string, error)
- func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template.Parameters, ...) error
- func AddFlyteCustomizationsToContainerWithPodTemplate(ctx context.Context, parameters template.Parameters, ...) error
- func AddRequiredNodeSelectorRequirements(base *v1.Affinity, new ...v1.NodeSelectorRequirement)
- func AddTolerationsForExtendedResources(podSpec *v1.PodSpec) *v1.PodSpec
- func ApplyContainerImageOverride(podSpec *v1.PodSpec, containerImage string, primaryContainerName string)
- func ApplyFlytePodConfiguration(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, ...) (*v1.PodSpec, *metav1.ObjectMeta, error)
- func ApplyGPUNodeSelectors(podSpec *v1.PodSpec, gpuAccelerator *core.GPUAccelerator)
- func ApplyInterruptibleNodeAffinity(interruptible bool, podSpec *v1.PodSpec)
- func ApplyInterruptibleNodeSelectorRequirement(interruptible bool, affinity *v1.Affinity)
- func ApplyK8sResourceOverrides(teMetadata pluginmachinery_core.TaskExecutionMetadata, ...) v1.ResourceRequirements
- func ApplyPodTemplateOverride(objectMeta metav1.ObjectMeta, podTemplate *core.K8SPod) (*v1.PodSpec, metav1.ObjectMeta, error)
- func ApplyResourceOverrides(resources, platformResources v1.ResourceRequirements, assignIfUnset bool) v1.ResourceRequirements
- func ApplySharedMemory(podSpec *v1.PodSpec, primaryContainerName string, ...) error
- func BuildIdentityPod() *v1.Pod
- func BuildRawContainer(ctx context.Context, tCtx pluginscore.TaskExecutionContext) (*v1.Container, error)
- func BuildRawPod(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v1.PodSpec, *metav1.ObjectMeta, string, error)
- func CalculateStorageSize(requirements *v1.ResourceRequirements) *resource.Quantity
- func CopilotCommandArgs(storageConfig *storage.Config) []string
- func DataVolume(name string, size *resource.Quantity) v1.Volume
- func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, envFroms []v1.EnvFromSource, ...) ([]v1.EnvVar, []v1.EnvFromSource)
- func DemystifyFailure(ctx context.Context, status v1.PodStatus, info pluginsCore.TaskInfo, ...) (pluginsCore.PhaseInfo, error)
- func DemystifyPending(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error)
- func DemystifySuccess(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error)
- func DeterminePrimaryContainerPhase(ctx context.Context, primaryContainerName string, ...) pluginsCore.PhaseInfo
- func DownloadCommandArgs(fromInputsPath, outputPrefix storage.DataReference, toLocalPath string, ...) ([]string, error)
- func ExtractContainerResourcesFromPodTemplate(podTemplate *v1.PodTemplate, containerName string, initContainers bool) v1.ResourceRequirements
- func FlyteCoPilotContainer(name string, cfg config.FlyteCoPilotConfig, args []string, ...) (v1.Container, error)
- func GetContainer(podSpec *v1.PodSpec, name string) (*v1.Container, error)
- func GetContextEnvVars(ownerCtx context.Context) []v1.EnvVar
- func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1.EnvVar
- func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time
- func GetLiteralOffloadingEnvVars() []v1.EnvVar
- func GetMessageAfterGracePeriod(message string, gracePeriod time.Duration) string
- func GetPodTemplateUpdatesHandler(store *PodTemplateStore) cache.ResourceEventHandler
- func GetPodTolerations(interruptible bool, resourceRequirements ...v1.ResourceRequirements) []v1.Toleration
- func GetReportedAt(pod *v1.Pod) metav1.Time
- func GetServiceAccountNameFromTaskExecutionMetadata(taskExecutionMetadata pluginmachinery_core.TaskExecutionMetadata) string
- func MergeBasePodSpecOntoTemplate(templatePodSpec *v1.PodSpec, basePodSpec *v1.PodSpec, ...) (*v1.PodSpec, error)
- func MergeOverlayPodSpecOntoBase(basePodSpec *v1.PodSpec, overlayPodSpec *v1.PodSpec) (*v1.PodSpec, error)
- func MergeResources(in v1.ResourceRequirements, out *v1.ResourceRequirements)
- func MergeResourcesIfMissing(in v1.ResourceRequirements, out *v1.ResourceRequirements)
- func MergeWithBasePodTemplate(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, ...) (*v1.PodSpec, *metav1.ObjectMeta, error)
- func NewPluginTaskExecutionContext(tc pluginsCore.TaskExecutionContext, ...) pluginsCore.TaskExecutionContext
- func SanitizeGPUResourceRequirements(resources *v1.ResourceRequirements)
- func SidecarCommandArgs(fromLocalPath string, outputPrefix, rawOutputPath storage.DataReference, ...) ([]string, error)
- func ToK8sContainer(ctx context.Context, tCtx pluginscore.TaskExecutionContext) (*v1.Container, error)
- func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar
- func ToK8sPodSpec(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v1.PodSpec, *metav1.ObjectMeta, string, error)
- func ToK8sResourceList(resources []*core.Resources_ResourceEntry) (v1.ResourceList, error)
- func ToK8sResourceRequirements(resources *core.Resources) (*v1.ResourceRequirements, error)
- func UpdatePod(taskExecutionMetadata pluginsCore.TaskExecutionMetadata, ...)
- type PluginTaskExecutionContextOption
- type PodTemplateStore
- type ResourceCustomizationMode
- type ResourceRequirement
Constants ¶
const ( FlyteInternalWorkerNameEnvVarKey = "_F_WN" // "FLYTE_INTERNAL_WORKER_NAME" FlyteInternalDistErrorStrategyEnvVarKey = "_F_DES" // "FLYTE_INTERNAL_DIST_ERROR_STRATEGY" )
const Interrupted = "Interrupted"
const OOMKilled = "OOMKilled"
const PodKind = "pod"
const PrimaryContainerKey = "primary_container_name"
const PrimaryContainerNotFound = "PrimaryContainerNotFound"
const ResourceNvidiaGPU = "nvidia.com/gpu"
ResourceNvidiaGPU is the name of the Nvidia GPU resource. Copied from: k8s.io/autoscaler/cluster-autoscaler/utils/gpu/gpu.go
const ResourceRDMAInfiniband = "rdma/infiniband"
const SIGKILL = 137
Variables ¶
var DefaultPodTemplateStore = NewPodTemplateStore()
Functions ¶
func AddCoPilotToContainer ¶
func AddCoPilotToContainer(ctx context.Context, cfg config.FlyteCoPilotConfig, c *v1.Container, iFace *core.TypedInterface, pilot *core.DataLoadingConfig) error
func AddCoPilotToPod ¶
func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilotPod *v1.PodSpec, iFace *core.TypedInterface, taskExecMetadata core2.TaskExecutionMetadata, inputPaths io.InputFilePaths, outputPaths io.OutputFilePaths, pilot *core.DataLoadingConfig) (string, error)
func AddFlyteCustomizationsToContainer ¶
func AddFlyteCustomizationsToContainer(ctx context.Context, parameters template.Parameters, mode ResourceCustomizationMode, container *v1.Container) error
AddFlyteCustomizationsToContainer takes a container definition which specifies how to run a Flyte task and fills in templated command and argument values, updates resources and decorates environment variables with platform and task-specific customizations.
func AddFlyteCustomizationsToContainerWithPodTemplate ¶ added in v1.16.0
func AddFlyteCustomizationsToContainerWithPodTemplate(ctx context.Context, parameters template.Parameters, mode ResourceCustomizationMode, container *v1.Container, podTemplateResources *v1.ResourceRequirements) error
AddFlyteCustomizationsToContainerWithPodTemplate is the enhanced version of AddFlyteCustomizationsToContainer that accepts pod template resources for proper resource priority handling.
func AddRequiredNodeSelectorRequirements ¶ added in v1.9.20
func AddRequiredNodeSelectorRequirements(base *v1.Affinity, new ...v1.NodeSelectorRequirement)
AddRequiredNodeSelectorRequirements adds the provided v1.NodeSelectorRequirement objects to an existing v1.Affinity object. If there are no existing required node selectors, the new v1.NodeSelectorRequirement will be added as-is. However, if there are existing required node selectors, we iterate over all existing node selector terms and append the node selector requirement. Note that multiple node selector terms are OR'd, and match expressions within a single node selector term are AND'd during scheduling. See: https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#node-affinity
func AddTolerationsForExtendedResources ¶ added in v1.14.0
func ApplyContainerImageOverride ¶ added in v1.11.0
func ApplyFlytePodConfiguration ¶
func ApplyFlytePodConfiguration(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, podSpec *v1.PodSpec, objectMeta *metav1.ObjectMeta, primaryContainerName string) (*v1.PodSpec, *metav1.ObjectMeta, error)
ApplyFlytePodConfiguration updates the PodSpec and ObjectMeta with various Flyte configuration. This includes applying default k8s configuration, applying overrides (resources etc.), injecting copilot containers, and merging with the configuration PodTemplate (if exists).
func ApplyGPUNodeSelectors ¶ added in v1.9.20
func ApplyGPUNodeSelectors(podSpec *v1.PodSpec, gpuAccelerator *core.GPUAccelerator)
func ApplyInterruptibleNodeAffinity ¶
ApplyInterruptibleNodeAffinity configures the node-affinity for the pod using the configuration specified.
func ApplyInterruptibleNodeSelectorRequirement ¶
ApplyInterruptibleNodeSelectorRequirement configures the node selector requirement of the node-affinity using the configuration specified.
func ApplyK8sResourceOverrides ¶ added in v1.15.1
func ApplyK8sResourceOverrides(teMetadata pluginmachinery_core.TaskExecutionMetadata, resources *v1.ResourceRequirements) v1.ResourceRequirements
ApplyK8sResourceOverrides ensures that both resource requests and limits are set. This is required because we run user executions in namespaces bound with a project quota and the Kubernetes scheduler will reject requests omitting these. This function is called by plugins that don't necessarily construct a default flyte container (container and k8s pod tasks) and therefore don't already receive the ApplyResourceOverrides treatment and subsequent validation which handles adding sensible defaults for requests and limits.
func ApplyPodTemplateOverride ¶ added in v1.15.0
func ApplyPodTemplateOverride(objectMeta metav1.ObjectMeta, podTemplate *core.K8SPod) (*v1.PodSpec, metav1.ObjectMeta, error)
func ApplyResourceOverrides ¶
func ApplyResourceOverrides(resources, platformResources v1.ResourceRequirements, assignIfUnset bool) v1.ResourceRequirements
ApplyResourceOverrides handles resource resolution, allocation and validation. Primarily, it ensures that container resources do not exceed defined platformResource limits and in the case of assignIfUnset, ensures that limits and requests are sensibly set for resources of all types.
func ApplySharedMemory ¶ added in v1.15.0
func BuildIdentityPod ¶
func BuildRawContainer ¶
func BuildRawContainer(ctx context.Context, tCtx pluginscore.TaskExecutionContext) (*v1.Container, error)
BuildRawContainer constructs a Container based on the definition passed by the TaskExecutionContext.
func BuildRawPod ¶
func BuildRawPod(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v1.PodSpec, *metav1.ObjectMeta, string, error)
BuildRawPod constructs a PodSpec and ObjectMeta based on the definition passed by the TaskExecutionContext. This definition does not include any configuration injected by Flyte.
func CalculateStorageSize ¶
func CalculateStorageSize(requirements *v1.ResourceRequirements) *resource.Quantity
func CopilotCommandArgs ¶
func DecorateEnvVars ¶
func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, envFroms []v1.EnvFromSource, taskEnvironmentVariables map[string]string, id pluginsCore.TaskExecutionID, consoleURL string) ([]v1.EnvVar, []v1.EnvFromSource)
func DemystifyFailure ¶
func DemystifyFailure(ctx context.Context, status v1.PodStatus, info pluginsCore.TaskInfo, primaryContainerName string) (pluginsCore.PhaseInfo, error)
DemystifyFailure resolves the various Kubernetes pod failure modes to determine the most appropriate course of action
func DemystifyPending ¶
func DemystifyPending(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error)
DemystifyPending is one the core functions, that helps FlytePropeller determine if a pending pod is indeed pending, or it is actually stuck in a un-reparable state. In such a case the pod should be marked as dead and the task should be retried. This has to be handled sadly, as K8s is still largely designed for long running services that should recover from failures, but Flyte pods are completely automated and should either run or fail Important considerations. Pending Status in Pod could be for various reasons and sometimes could signal a problem Case I: Pending because the Image pull is failing and it is backing off
This could be transient. So we can actually rely on the failure reason. The failure transitions from ErrImagePull -> ImagePullBackoff
Case II: Not enough resources are available. This is tricky. It could be that the total number of
resources requested is beyond the capability of the system. for this we will rely on configuration and hence input gates. We should not allow bad requests that Request for large number of resource through. In the case it makes through, we will fail after timeout
func DemystifySuccess ¶
func DemystifySuccess(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error)
func DeterminePrimaryContainerPhase ¶
func DeterminePrimaryContainerPhase(ctx context.Context, primaryContainerName string, statuses []v1.ContainerStatus, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo
DeterminePrimaryContainerPhase as the name suggests, given all the containers, will return a pluginsCore.PhaseInfo object corresponding to the phase of the primaryContainer which is identified using the provided name. This is useful in case of sidecars or pod jobs, where Flyte will monitor successful exit of a single container.
func DownloadCommandArgs ¶
func DownloadCommandArgs(fromInputsPath, outputPrefix storage.DataReference, toLocalPath string, format core.DataLoadingConfig_LiteralMapFormat, inputInterface *core.VariableMap) ([]string, error)
func ExtractContainerResourcesFromPodTemplate ¶ added in v1.16.0
func ExtractContainerResourcesFromPodTemplate(podTemplate *v1.PodTemplate, containerName string, initContainers bool) v1.ResourceRequirements
ExtractContainerResourcesFromPodTemplate extracts container resources from a pod template for a specific container. It returns the resources of the specified container if container names match or if PodTemplate contains a "primary"/"primary-init" or "default"/"default-init" container, or an empty ResourceRequirements if not found. This function supports both regular containers and init containers.
func FlyteCoPilotContainer ¶
func FlyteCoPilotContainer(name string, cfg config.FlyteCoPilotConfig, args []string, volumeMounts ...v1.VolumeMount) (v1.Container, error)
func GetContainer ¶ added in v1.9.18
func GetExecutionEnvVars ¶
func GetExecutionEnvVars(id pluginsCore.TaskExecutionID, consoleURL string) []v1.EnvVar
func GetLiteralOffloadingEnvVars ¶ added in v1.14.0
func GetMessageAfterGracePeriod ¶ added in v1.10.7
func GetPodTemplateUpdatesHandler ¶
func GetPodTemplateUpdatesHandler(store *PodTemplateStore) cache.ResourceEventHandler
GetPodTemplateUpdatesHandler returns a new ResourceEventHandler which adds / removes PodTemplates to / from the provided PodTemplateStore.
func GetPodTolerations ¶
func GetPodTolerations(interruptible bool, resourceRequirements ...v1.ResourceRequirements) []v1.Toleration
func GetServiceAccountNameFromTaskExecutionMetadata ¶
func GetServiceAccountNameFromTaskExecutionMetadata(taskExecutionMetadata pluginmachinery_core.TaskExecutionMetadata) string
func MergeBasePodSpecOntoTemplate ¶ added in v1.15.1
func MergeBasePodSpecOntoTemplate(templatePodSpec *v1.PodSpec, basePodSpec *v1.PodSpec, primaryContainerName string, primaryInitContainerName string) (*v1.PodSpec, error)
MergeBasePodSpecOntoTemplate merges a base pod spec onto a template pod spec. The template pod spec has some magic values that allow users to specify templates that target all containers and primary containers. Aside from magic values this method will merge containers that have matching names.
func MergeOverlayPodSpecOntoBase ¶ added in v1.15.1
func MergeOverlayPodSpecOntoBase(basePodSpec *v1.PodSpec, overlayPodSpec *v1.PodSpec) (*v1.PodSpec, error)
MergeOverlayPodSpecOntoBase merges a customized pod spec onto a base pod spec. At a container level it will merge containers that have matching names.
func MergeResources ¶
func MergeResources(in v1.ResourceRequirements, out *v1.ResourceRequirements)
func MergeResourcesIfMissing ¶ added in v1.16.0
func MergeResourcesIfMissing(in v1.ResourceRequirements, out *v1.ResourceRequirements)
MergeResourcesIfMissing merges resources from 'in' to 'out', but only for resources that are not already set in 'out'. This is used for pod template resource merging where we want to preserve higher priority resources.
func MergeWithBasePodTemplate ¶
func MergeWithBasePodTemplate(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, podSpec *v1.PodSpec, objectMeta *metav1.ObjectMeta, primaryContainerName string, primaryInitContainerName string) (*v1.PodSpec, *metav1.ObjectMeta, error)
MergeWithBasePodTemplate attempts to merge the provided PodSpec and ObjectMeta with the configuration PodTemplate for this task.
func NewPluginTaskExecutionContext ¶ added in v1.10.6
func NewPluginTaskExecutionContext(tc pluginsCore.TaskExecutionContext, options ...PluginTaskExecutionContextOption) pluginsCore.TaskExecutionContext
func SanitizeGPUResourceRequirements ¶ added in v1.12.0
func SanitizeGPUResourceRequirements(resources *v1.ResourceRequirements)
Convert GPU resource requirements named 'gpu' the recognized 'nvidia.com/gpu' identifier.
func SidecarCommandArgs ¶
func SidecarCommandArgs(fromLocalPath string, outputPrefix, rawOutputPath storage.DataReference, uploadTimeout time.Duration, iface *core.TypedInterface) ([]string, error)
func ToK8sContainer ¶
func ToK8sContainer(ctx context.Context, tCtx pluginscore.TaskExecutionContext) (*v1.Container, error)
ToK8sContainer builds a Container based on the definition passed by the TaskExecutionContext. This involves applying all Flyte configuration including k8s plugins and resource requests.
func ToK8sEnvVar ¶
func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar
func ToK8sPodSpec ¶
func ToK8sPodSpec(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v1.PodSpec, *metav1.ObjectMeta, string, error)
ToK8sPodSpec builds a PodSpec and ObjectMeta based on the definition passed by the TaskExecutionContext. This involves parsing the raw PodSpec definition and applying all Flyte configuration options.
func ToK8sResourceList ¶
func ToK8sResourceList(resources []*core.Resources_ResourceEntry) (v1.ResourceList, error)
TODO we should modify the container resources to contain a map of enum values? Also we should probably create tolerations / taints, but we could do that as a post process
func ToK8sResourceRequirements ¶
func ToK8sResourceRequirements(resources *core.Resources) (*v1.ResourceRequirements, error)
func UpdatePod ¶
func UpdatePod(taskExecutionMetadata pluginsCore.TaskExecutionMetadata, resourceRequirements []v1.ResourceRequirements, podSpec *v1.PodSpec)
UpdatePod updates the base pod spec used to execute tasks. This is configured with plugins and task metadata-specific options
Types ¶
type PluginTaskExecutionContextOption ¶ added in v1.10.6
type PluginTaskExecutionContextOption func(*pluginTaskExecutionContext)
func WithExtendedResources ¶ added in v1.10.6
func WithExtendedResources(er *core.ExtendedResources) PluginTaskExecutionContextOption
func WithInterruptible ¶ added in v1.10.6
func WithInterruptible(v bool) PluginTaskExecutionContextOption
func WithResources ¶ added in v1.10.6
func WithResources(r *v1.ResourceRequirements) PluginTaskExecutionContextOption
type PodTemplateStore ¶
PodTemplateStore maintains a thread-safe mapping of active PodTemplates with their associated namespaces.
func NewPodTemplateStore ¶
func NewPodTemplateStore() PodTemplateStore
NewPodTemplateStore initializes a new PodTemplateStore
func (*PodTemplateStore) Delete ¶
func (p *PodTemplateStore) Delete(podTemplate *v1.PodTemplate)
Delete removes the specified PodTemplate from the store.
func (*PodTemplateStore) LoadOrDefault ¶
func (p *PodTemplateStore) LoadOrDefault(namespace string, podTemplateName string) *v1.PodTemplate
LoadOrDefault returns the PodTemplate with the specified name in the given namespace. If one does not exist it attempts to retrieve the one associated with the defaultNamespace. Returns a deep copy of the cached PodTemplate to prevent state pollution between workflows.
func (*PodTemplateStore) SetDefaultNamespace ¶
func (p *PodTemplateStore) SetDefaultNamespace(namespace string)
SetDefaultNamespace sets the default namespace for the PodTemplateStore.
func (*PodTemplateStore) Store ¶
func (p *PodTemplateStore) Store(podTemplate *v1.PodTemplate)
Store loads the specified PodTemplate into the store.
type ResourceCustomizationMode ¶
type ResourceCustomizationMode int
const ( // ResourceCustomizationModeAssignResources is used for container tasks where resources are validated and assigned if necessary. ResourceCustomizationModeAssignResources ResourceCustomizationMode = iota // ResourceCustomizationModeMergeExistingResources is used for primary containers in pod tasks where container requests and limits are // merged, validated and assigned if necessary. ResourceCustomizationModeMergeExistingResources // ResourceCustomizationModeEnsureExistingResourcesInRange is used for secondary containers in pod tasks where requests and limits are only // adjusted if needed (downwards). ResourceCustomizationModeEnsureExistingResourcesInRange )
func ResourceCustomizationModeString ¶
func ResourceCustomizationModeString(s string) (ResourceCustomizationMode, error)
ResourceCustomizationModeString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.
func ResourceCustomizationModeValues ¶
func ResourceCustomizationModeValues() []ResourceCustomizationMode
ResourceCustomizationModeValues returns all values of the enum
func (ResourceCustomizationMode) IsAResourceCustomizationMode ¶
func (i ResourceCustomizationMode) IsAResourceCustomizationMode() bool
IsAResourceCustomizationMode returns "true" if the value is listed in the enum definition. "false" otherwise
func (ResourceCustomizationMode) String ¶
func (i ResourceCustomizationMode) String() string
type ResourceRequirement ¶
func AdjustOrDefaultResource ¶
func AdjustOrDefaultResource(request, limit, platformDefault, platformLimit resource.Quantity) ResourceRequirement
AdjustOrDefaultResource validates resources conform to platform limits and assigns defaults for Request and Limit values by using the Request when the Limit is unset, and vice versa.