Documentation
¶
Index ¶
- func ApplyScaleValuesToLivePipeline(ctx context.Context, pipelineDef *unstructured.Unstructured, ...) error
- func ApplyScaleValuesToPipelineDefinition(ctx context.Context, pipelineDef *unstructured.Unstructured, ...) error
- func CanPipelineIngestData(ctx context.Context, pipeline *unstructured.Unstructured) (bool, error)
- func CheckIfPipelineWontPause(ctx context.Context, pipeline *unstructured.Unstructured, ...) bool
- func CheckMonoVertexPhase(ctx context.Context, monovertex *unstructured.Unstructured, ...) bool
- func CheckPipelineDrained(ctx context.Context, pipeline *unstructured.Unstructured) (bool, error)
- func CheckPipelineLiveObservedGeneration(ctx context.Context, pipeline *unstructured.Unstructured) (bool, int64, int64, error)
- func CheckPipelineObservedGeneration(ctx context.Context, pipeline *unstructured.Unstructured) (bool, int64, int64, error)
- func CheckPipelinePhase(ctx context.Context, pipeline *unstructured.Unstructured, ...) bool
- func CheckPipelineScaledToZero(ctx context.Context, pipeline *unstructured.Unstructured) (bool, error)
- func EnsurePipelineScaledToZero(ctx context.Context, pipeline *unstructured.Unstructured, c client.Client) error
- func ExtractScaleMinMax(object map[string]any, pathToScale []string) (*apiv1.ScaleDefinition, error)
- func GetISBServiceChildResourceHealth(conditions []metav1.Condition) (metav1.ConditionStatus, string)
- func GetISBSvcStatefulSetFromK8s(ctx context.Context, c client.Client, isbsvc *unstructured.Unstructured, ...) (*appsv1.StatefulSet, error)
- func GetMonoVertexDesiredPhase(monovertex *unstructured.Unstructured) (string, error)
- func GetPipelineDesiredPhase(pipeline *unstructured.Unstructured) (string, error)
- func GetPipelineISBSVCName(pipeline *unstructured.Unstructured) (string, error)
- func GetPipelineSpecFromRollout(pipelineName string, pipelineRollout *apiv1.PipelineRollout) (map[string]interface{}, error)
- func GetPipelineVertexDefinitions(pipeline *unstructured.Unstructured) ([]interface{}, error)
- func GetPipelineVertices(ctx context.Context, c client.Client, pipeline *unstructured.Unstructured) (map[string]*unstructured.Unstructured, error)
- func GetPipelinesForRollout(ctx context.Context, c client.Client, pipelineRollout *apiv1.PipelineRollout, ...) (unstructured.UnstructuredList, error)
- func GetRolloutForPipeline(ctx context.Context, c client.Client, pipeline *unstructured.Unstructured) (*apiv1.PipelineRollout, error)
- func GetScaleValuesFromPipelineDefinition(ctx context.Context, pipelineDef *unstructured.Unstructured) ([]apiv1.VertexScaleDefinition, error)
- func GetVertexFromPipelineSpecMap(pipelineSpec map[string]interface{}, vertexName string) (map[string]interface{}, bool, error)
- func IsPipelinePausedOrWontPause(ctx context.Context, pipeline *unstructured.Unstructured, ...) (bool, error)
- func MinimizePipelineVertexReplicas(ctx context.Context, c client.Client, pipeline *unstructured.Unstructured) error
- func PipelineWithDesiredPhase(pipeline *unstructured.Unstructured, phase string) error
- func PipelineWithISBServiceName(pipeline *unstructured.Unstructured, isbsvcName string) error
- func PipelineWithoutDesiredPhase(pipeline *unstructured.Unstructured)
- func PipelineWithoutScaleFields(pipeline map[string]interface{}) error
- func ScalePipelineDefSourceVerticesToZero(ctx context.Context, pipelineDef *unstructured.Unstructured) error
- func ScalePipelineVerticesToZero(ctx context.Context, pipelineDef *unstructured.Unstructured, c client.Client) error
- type AbstractVertex
- type Lifecycle
- type MonoVertexStatus
- type PipelineSpec
- type PipelineStatus
- type Scale
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ApplyScaleValuesToLivePipeline ¶ added in v0.28.0
func ApplyScaleValuesToLivePipeline( ctx context.Context, pipelineDef *unstructured.Unstructured, vertexScaleDefinitions []apiv1.VertexScaleDefinition, c client.Client) error
apply the scale values to the running pipeline as patches note that vertexScaleDefinitions are not required to be in order and also can be a partial set
func ApplyScaleValuesToPipelineDefinition ¶ added in v0.28.0
func ApplyScaleValuesToPipelineDefinition( ctx context.Context, pipelineDef *unstructured.Unstructured, vertexScaleDefinitions []apiv1.VertexScaleDefinition) error
func CanPipelineIngestData ¶ added in v0.28.0
func CanPipelineIngestData(ctx context.Context, pipeline *unstructured.Unstructured) (bool, error)
CanPipelineIngestData checks if the pipeline is set to desiredPhase=Running(or unset) plus if a source vertex has scale.max>0
func CheckIfPipelineWontPause ¶
func CheckIfPipelineWontPause(ctx context.Context, pipeline *unstructured.Unstructured, pipelineRollout *apiv1.PipelineRollout) bool
func CheckMonoVertexPhase ¶ added in v0.26.0
func CheckMonoVertexPhase(ctx context.Context, monovertex *unstructured.Unstructured, phase numaflowv1.PipelinePhase) bool
func CheckPipelineDrained ¶ added in v0.10.0
func CheckPipelineDrained(ctx context.Context, pipeline *unstructured.Unstructured) (bool, error)
func CheckPipelineLiveObservedGeneration ¶ added in v0.28.0
func CheckPipelineLiveObservedGeneration(ctx context.Context, pipeline *unstructured.Unstructured) (bool, int64, int64, error)
CheckPipelineLiveObservedGeneration verifies that the observedGeneration is not less than the generation, meaning it's been reconciled by Numaflow since being updated
func CheckPipelineObservedGeneration ¶ added in v0.28.0
func CheckPipelineObservedGeneration(ctx context.Context, pipeline *unstructured.Unstructured) (bool, int64, int64, error)
CheckPipelineObservedGeneration verifies that the observedGeneration is not less than the generation, meaning it's been reconciled by Numaflow since being updated
func CheckPipelinePhase ¶
func CheckPipelinePhase(ctx context.Context, pipeline *unstructured.Unstructured, phase numaflowv1.PipelinePhase) bool
func CheckPipelineScaledToZero ¶ added in v0.28.0
func CheckPipelineScaledToZero(ctx context.Context, pipeline *unstructured.Unstructured) (bool, error)
determine if all Pipeline Vertices have max=0
func EnsurePipelineScaledToZero ¶ added in v0.28.0
func EnsurePipelineScaledToZero(ctx context.Context, pipeline *unstructured.Unstructured, c client.Client) error
ensure all Pipeline Vertices have max=0
func ExtractScaleMinMax ¶ added in v0.28.0
func GetISBServiceChildResourceHealth ¶
func GetISBServiceChildResourceHealth(conditions []metav1.Condition) (metav1.ConditionStatus, string)
func GetISBSvcStatefulSetFromK8s ¶
func GetISBSvcStatefulSetFromK8s(ctx context.Context, c client.Client, isbsvc *unstructured.Unstructured, checkLive bool) (*appsv1.StatefulSet, error)
Each ISBService has one underlying StatefulSet Find it Depending on value "checkLive", either check K8S API directly or go to informer cache
func GetMonoVertexDesiredPhase ¶ added in v0.26.0
func GetMonoVertexDesiredPhase(monovertex *unstructured.Unstructured) (string, error)
func GetPipelineDesiredPhase ¶ added in v0.10.0
func GetPipelineDesiredPhase(pipeline *unstructured.Unstructured) (string, error)
func GetPipelineISBSVCName ¶ added in v0.11.0
func GetPipelineISBSVCName(pipeline *unstructured.Unstructured) (string, error)
func GetPipelineSpecFromRollout ¶ added in v0.28.0
func GetPipelineSpecFromRollout( pipelineName string, pipelineRollout *apiv1.PipelineRollout, ) (map[string]interface{}, error)
func GetPipelineVertexDefinitions ¶ added in v0.26.0
func GetPipelineVertexDefinitions(pipeline *unstructured.Unstructured) ([]interface{}, error)
func GetPipelineVertices ¶ added in v0.26.0
func GetPipelineVertices(ctx context.Context, c client.Client, pipeline *unstructured.Unstructured) (map[string]*unstructured.Unstructured, error)
find all the Pipeline Vertices in K8S using the Pipeline's definition: return a map of vertex name to resource found for any Vertices that can't be found, return an entry mapped to nil
func GetPipelinesForRollout ¶ added in v0.13.0
func GetPipelinesForRollout(ctx context.Context, c client.Client, pipelineRollout *apiv1.PipelineRollout, live bool) (unstructured.UnstructuredList, error)
func GetRolloutForPipeline ¶ added in v0.10.0
func GetRolloutForPipeline(ctx context.Context, c client.Client, pipeline *unstructured.Unstructured) (*apiv1.PipelineRollout, error)
func GetScaleValuesFromPipelineDefinition ¶ added in v0.28.0
func GetScaleValuesFromPipelineDefinition(ctx context.Context, pipelineDef *unstructured.Unstructured) ( []apiv1.VertexScaleDefinition, error)
for each Vertex, get the definition of the Scale return map of Vertex name to scale definition
func GetVertexFromPipelineSpecMap ¶ added in v0.28.0
func IsPipelinePausedOrWontPause ¶
func IsPipelinePausedOrWontPause(ctx context.Context, pipeline *unstructured.Unstructured, pipelineRollout *apiv1.PipelineRollout, requireDrained bool) (bool, error)
either pipeline must be:
- Paused
- Failed (contract with Numaflow is that unpausible Pipelines are "Failed" pipelines)
- PipelineRollout parent Annotated to allow data loss
func MinimizePipelineVertexReplicas ¶ added in v0.26.0
func MinimizePipelineVertexReplicas(ctx context.Context, c client.Client, pipeline *unstructured.Unstructured) error
MinimizePipelineVertexReplicas clears out the `replicas` field from each Vertex of a Pipeline, which has the effect in Numaflow of resetting to "scale.min" value
func PipelineWithDesiredPhase ¶ added in v0.11.0
func PipelineWithDesiredPhase(pipeline *unstructured.Unstructured, phase string) error
func PipelineWithISBServiceName ¶ added in v0.11.0
func PipelineWithISBServiceName(pipeline *unstructured.Unstructured, isbsvcName string) error
func PipelineWithoutDesiredPhase ¶ added in v0.11.0
func PipelineWithoutDesiredPhase(pipeline *unstructured.Unstructured)
remove 'lifecycle.desiredPhase' key/value pair from spec also remove 'lifecycle' if it's an empty map
func PipelineWithoutScaleFields ¶ added in v0.28.0
extract fields from the Pipeline within "scale" that are manipulated by the platform
func ScalePipelineDefSourceVerticesToZero ¶ added in v0.28.0
func ScalePipelineDefSourceVerticesToZero( ctx context.Context, pipelineDef *unstructured.Unstructured, ) error
scale any Source Vertices in the Pipeline definition to min=max=0
func ScalePipelineVerticesToZero ¶ added in v0.28.0
func ScalePipelineVerticesToZero( ctx context.Context, pipelineDef *unstructured.Unstructured, c client.Client, ) error
Types ¶
type AbstractVertex ¶ added in v0.20.0
AbstractVertex keeps track of minimum number of fields we need to know about in Numaflow's AbstractVertex, which are presumed not to change from version to version
type Lifecycle ¶
type Lifecycle struct {
// DesiredPhase used to bring the pipeline from current phase to desired phase
// +kubebuilder:default=Running
// +optional
DesiredPhase string `json:"desiredPhase,omitempty"`
}
type MonoVertexStatus ¶
type MonoVertexStatus = kubernetes.GenericStatus
func ParseMonoVertexStatus ¶
func ParseMonoVertexStatus(monoVertex *unstructured.Unstructured) (MonoVertexStatus, error)
type PipelineSpec ¶
type PipelineSpec struct {
InterStepBufferServiceName string `json:"interStepBufferServiceName"`
Lifecycle Lifecycle `json:"lifecycle,omitempty"`
Vertices []AbstractVertex `json:"vertices,omitempty"`
}
PipelineSpec keeps track of minimum number of fields we need to know about in Numaflow's PipelineSpec, which are presumed not to change from version to version
func (PipelineSpec) GetISBSvcName ¶
func (pipeline PipelineSpec) GetISBSvcName() string
type PipelineStatus ¶
type PipelineStatus struct {
Phase numaflowv1.PipelinePhase `json:"phase,omitempty"`
Conditions []metav1.Condition `json:"conditions,omitempty"`
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
DrainedOnPause bool `json:"drainedOnPause,omitempty" protobuf:"bytes,12,opt,name=drainedOnPause"`
}
PipelineStatus keeps track of minimum number of fields we need to know about in Numaflow's PipelineStatus, which are presumed not to change from version to version
func ParsePipelineStatus ¶
func ParsePipelineStatus(pipeline *unstructured.Unstructured) (PipelineStatus, error)
type Scale ¶ added in v0.20.0
type Scale struct {
// Minimum replicas.
Min *int32 `json:"min,omitempty"`
// Maximum replicas.
Max *int32 `json:"max,omitempty"`
}
Scale keeps track of minimum number of fields we need to know about in Numaflow's Scale struct, which are presumed not to change from version to version