numaflowtypes

package
v0.28.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2025 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ApplyScaleValuesToLivePipeline added in v0.28.0

func ApplyScaleValuesToLivePipeline(
	ctx context.Context, pipelineDef *unstructured.Unstructured, vertexScaleDefinitions []apiv1.VertexScaleDefinition, c client.Client) error

apply the scale values to the running pipeline as patches note that vertexScaleDefinitions are not required to be in order and also can be a partial set

func ApplyScaleValuesToPipelineDefinition added in v0.28.0

func ApplyScaleValuesToPipelineDefinition(
	ctx context.Context, pipelineDef *unstructured.Unstructured, vertexScaleDefinitions []apiv1.VertexScaleDefinition) error

func CanPipelineIngestData added in v0.28.0

func CanPipelineIngestData(ctx context.Context, pipeline *unstructured.Unstructured) (bool, error)

CanPipelineIngestData checks if the pipeline is set to desiredPhase=Running(or unset) plus if a source vertex has scale.max>0

func CheckIfPipelineWontPause

func CheckIfPipelineWontPause(ctx context.Context, pipeline *unstructured.Unstructured, pipelineRollout *apiv1.PipelineRollout) bool

func CheckMonoVertexPhase added in v0.26.0

func CheckMonoVertexPhase(ctx context.Context, monovertex *unstructured.Unstructured, phase numaflowv1.PipelinePhase) bool

func CheckPipelineDrained added in v0.10.0

func CheckPipelineDrained(ctx context.Context, pipeline *unstructured.Unstructured) (bool, error)

func CheckPipelineLiveObservedGeneration added in v0.28.0

func CheckPipelineLiveObservedGeneration(ctx context.Context, pipeline *unstructured.Unstructured) (bool, int64, int64, error)

CheckPipelineLiveObservedGeneration verifies that the observedGeneration is not less than the generation, meaning it's been reconciled by Numaflow since being updated

func CheckPipelineObservedGeneration added in v0.28.0

func CheckPipelineObservedGeneration(ctx context.Context, pipeline *unstructured.Unstructured) (bool, int64, int64, error)

CheckPipelineObservedGeneration verifies that the observedGeneration is not less than the generation, meaning it's been reconciled by Numaflow since being updated

func CheckPipelinePhase

func CheckPipelinePhase(ctx context.Context, pipeline *unstructured.Unstructured, phase numaflowv1.PipelinePhase) bool

func CheckPipelineScaledToZero added in v0.28.0

func CheckPipelineScaledToZero(ctx context.Context, pipeline *unstructured.Unstructured) (bool, error)

determine if all Pipeline Vertices have max=0

func EnsurePipelineScaledToZero added in v0.28.0

func EnsurePipelineScaledToZero(ctx context.Context, pipeline *unstructured.Unstructured, c client.Client) error

ensure all Pipeline Vertices have max=0

func ExtractScaleMinMax added in v0.28.0

func ExtractScaleMinMax(object map[string]any, pathToScale []string) (*apiv1.ScaleDefinition, error)

func GetISBServiceChildResourceHealth

func GetISBServiceChildResourceHealth(conditions []metav1.Condition) (metav1.ConditionStatus, string)

func GetISBSvcStatefulSetFromK8s

func GetISBSvcStatefulSetFromK8s(ctx context.Context, c client.Client, isbsvc *unstructured.Unstructured, checkLive bool) (*appsv1.StatefulSet, error)

Each ISBService has one underlying StatefulSet Find it Depending on value "checkLive", either check K8S API directly or go to informer cache

func GetMonoVertexDesiredPhase added in v0.26.0

func GetMonoVertexDesiredPhase(monovertex *unstructured.Unstructured) (string, error)

func GetPipelineDesiredPhase added in v0.10.0

func GetPipelineDesiredPhase(pipeline *unstructured.Unstructured) (string, error)

func GetPipelineISBSVCName added in v0.11.0

func GetPipelineISBSVCName(pipeline *unstructured.Unstructured) (string, error)

func GetPipelineSpecFromRollout added in v0.28.0

func GetPipelineSpecFromRollout(
	pipelineName string,
	pipelineRollout *apiv1.PipelineRollout,
) (map[string]interface{}, error)

func GetPipelineVertexDefinitions added in v0.26.0

func GetPipelineVertexDefinitions(pipeline *unstructured.Unstructured) ([]interface{}, error)

func GetPipelineVertices added in v0.26.0

func GetPipelineVertices(ctx context.Context, c client.Client, pipeline *unstructured.Unstructured) (map[string]*unstructured.Unstructured, error)

find all the Pipeline Vertices in K8S using the Pipeline's definition: return a map of vertex name to resource found for any Vertices that can't be found, return an entry mapped to nil

func GetPipelinesForRollout added in v0.13.0

func GetPipelinesForRollout(ctx context.Context, c client.Client, pipelineRollout *apiv1.PipelineRollout, live bool) (unstructured.UnstructuredList, error)

func GetRolloutForPipeline added in v0.10.0

func GetRolloutForPipeline(ctx context.Context, c client.Client, pipeline *unstructured.Unstructured) (*apiv1.PipelineRollout, error)

func GetScaleValuesFromPipelineDefinition added in v0.28.0

func GetScaleValuesFromPipelineDefinition(ctx context.Context, pipelineDef *unstructured.Unstructured) (
	[]apiv1.VertexScaleDefinition, error)

for each Vertex, get the definition of the Scale return map of Vertex name to scale definition

func GetVertexFromPipelineSpecMap added in v0.28.0

func GetVertexFromPipelineSpecMap(
	pipelineSpec map[string]interface{},
	vertexName string,
) (map[string]interface{}, bool, error)

func IsPipelinePausedOrWontPause

func IsPipelinePausedOrWontPause(ctx context.Context, pipeline *unstructured.Unstructured, pipelineRollout *apiv1.PipelineRollout, requireDrained bool) (bool, error)

either pipeline must be:

  • Paused
  • Failed (contract with Numaflow is that unpausible Pipelines are "Failed" pipelines)
  • PipelineRollout parent Annotated to allow data loss

func MinimizePipelineVertexReplicas added in v0.26.0

func MinimizePipelineVertexReplicas(ctx context.Context, c client.Client, pipeline *unstructured.Unstructured) error

MinimizePipelineVertexReplicas clears out the `replicas` field from each Vertex of a Pipeline, which has the effect in Numaflow of resetting to "scale.min" value

func PipelineWithDesiredPhase added in v0.11.0

func PipelineWithDesiredPhase(pipeline *unstructured.Unstructured, phase string) error

func PipelineWithISBServiceName added in v0.11.0

func PipelineWithISBServiceName(pipeline *unstructured.Unstructured, isbsvcName string) error

func PipelineWithoutDesiredPhase added in v0.11.0

func PipelineWithoutDesiredPhase(pipeline *unstructured.Unstructured)

remove 'lifecycle.desiredPhase' key/value pair from spec also remove 'lifecycle' if it's an empty map

func PipelineWithoutScaleFields added in v0.28.0

func PipelineWithoutScaleFields(pipeline map[string]interface{}) error

extract fields from the Pipeline within "scale" that are manipulated by the platform

func ScalePipelineDefSourceVerticesToZero added in v0.28.0

func ScalePipelineDefSourceVerticesToZero(
	ctx context.Context,
	pipelineDef *unstructured.Unstructured,
) error

scale any Source Vertices in the Pipeline definition to min=max=0

func ScalePipelineVerticesToZero added in v0.28.0

func ScalePipelineVerticesToZero(
	ctx context.Context,
	pipelineDef *unstructured.Unstructured,
	c client.Client,
) error

Types

type AbstractVertex added in v0.20.0

type AbstractVertex struct {
	Name  string `json:"name"`
	Scale Scale  `json:"scale,omitempty"`
}

AbstractVertex keeps track of minimum number of fields we need to know about in Numaflow's AbstractVertex, which are presumed not to change from version to version

type Lifecycle

type Lifecycle struct {
	// DesiredPhase used to bring the pipeline from current phase to desired phase
	// +kubebuilder:default=Running
	// +optional
	DesiredPhase string `json:"desiredPhase,omitempty"`
}

type MonoVertexStatus

type MonoVertexStatus = kubernetes.GenericStatus

func ParseMonoVertexStatus

func ParseMonoVertexStatus(monoVertex *unstructured.Unstructured) (MonoVertexStatus, error)

type PipelineSpec

type PipelineSpec struct {
	InterStepBufferServiceName string           `json:"interStepBufferServiceName"`
	Lifecycle                  Lifecycle        `json:"lifecycle,omitempty"`
	Vertices                   []AbstractVertex `json:"vertices,omitempty"`
}

PipelineSpec keeps track of minimum number of fields we need to know about in Numaflow's PipelineSpec, which are presumed not to change from version to version

func (PipelineSpec) GetISBSvcName

func (pipeline PipelineSpec) GetISBSvcName() string

type PipelineStatus

type PipelineStatus struct {
	Phase              numaflowv1.PipelinePhase `json:"phase,omitempty"`
	Conditions         []metav1.Condition       `json:"conditions,omitempty"`
	ObservedGeneration int64                    `json:"observedGeneration,omitempty"`
	DrainedOnPause     bool                     `json:"drainedOnPause,omitempty" protobuf:"bytes,12,opt,name=drainedOnPause"`
}

PipelineStatus keeps track of minimum number of fields we need to know about in Numaflow's PipelineStatus, which are presumed not to change from version to version

func ParsePipelineStatus

func ParsePipelineStatus(pipeline *unstructured.Unstructured) (PipelineStatus, error)

type Scale added in v0.20.0

type Scale struct {
	// Minimum replicas.
	Min *int32 `json:"min,omitempty"`
	// Maximum replicas.
	Max *int32 `json:"max,omitempty"`
}

Scale keeps track of minimum number of fields we need to know about in Numaflow's Scale struct, which are presumed not to change from version to version

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL