workflowrun

package
v0.9.3-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2019 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GCPodName added in v0.9.3

func GCPodName(wfr string) string

GCPodName generates a pod name for GC pod

func GetExecutionContext added in v0.9.3

func GetExecutionContext(wfr *v1alpha1.WorkflowRun) *v1alpha1.ExecutionContext

GetExecutionContext gets execution context from WorkflowRun, if not found, use the default context in workflow controller configuration.

func GetResourceVolumeName

func GetResourceVolumeName(resourceName string) string

GetResourceVolumeName generates a volume name for a resource.

func InputContainerName

func InputContainerName(index int) string

InputContainerName generates a container name for input resolver container

func NextStages

func NextStages(wf *v1alpha1.Workflow, wfr *v1alpha1.WorkflowRun) []string

NextStages determine next stages that can be started to execute. It returns stages that are not started yet but have all depended stages finished.

func OutputContainerName

func OutputContainerName(index int) string

OutputContainerName generates a container name for output resolver container

func ParseTime

func ParseTime(t string) (time.Duration, error)

ParseTime parses time string like '30min', '2h30m' to time.Time

func PodName

func PodName(wf, stg string) string

PodName generates a pod name from Workflow name and Stage name

func ResolveRefStringValue

func ResolveRefStringValue(ref string, client clientset.Interface) (string, error)

ResolveRefStringValue resolves the given secret ref value, if it's not a ref value, return the origin value. Ref value is in format of '$.<ns>.<secret>/<jsonpath>/...' to refer value in a secret.

Types

type GCProcessor

type GCProcessor struct {
	// contains filtered or unexported fields
}

GCProcessor processes garbage collection for WorkflowRun objects.

func NewGCProcessor

func NewGCProcessor(client clientset.Interface, enabled bool) *GCProcessor

NewGCProcessor create new GC processor.

func (*GCProcessor) Add

func (p *GCProcessor) Add(wfr *v1alpha1.WorkflowRun)

Add WorkflowRun object to GC processor, it will firstly judge whether the WorkflowRun object needs GC, if it's true, it will perform GC on it in the right time.

func (*GCProcessor) Enable

func (p *GCProcessor) Enable()

Enable the processor and start it.

type LimitedQueues

type LimitedQueues struct {
	// Maximum queue size, it indicates maximum number of WorkflowRuns to retain for each Workflow.
	MaxQueueSize int
	// Workflow queue map. It use Workflow name and namespace as the key, and manage Queue for each
	// Workflow.
	Queues map[string]*LimitedSortedQueue
	// k8s client used to clean old WorkflowRun
	Client clientset.Interface
}

LimitedQueues manages WorkflowRun queue for each Workflow. Queue for each Workflow is limited to a given maximum size, if new WorkflowRun created, the oldest one would be removed.

func NewLimitedQueues

func NewLimitedQueues(client clientset.Interface, maxSize int) *LimitedQueues

NewLimitedQueues creates a limited queues for WorkflowRuns, and start auto scan.

func (*LimitedQueues) AddOrRefresh

func (w *LimitedQueues) AddOrRefresh(wfr *v1alpha1.WorkflowRun)

AddOrRefresh adds a WorkflowRun to its corresponding queue, if the queue size exceed the maximum size, the oldest one would be deleted. And if the WorkflowRun already exists in the queue, its 'refresh' time field would be refreshed.

func (*LimitedQueues) AutoScan

func (w *LimitedQueues) AutoScan()

AutoScan scans all WorkflowRuns in the queues regularly, remove abnormal ones with old enough refresh time.

func (*LimitedQueues) Refresh added in v0.9.3

func (w *LimitedQueues) Refresh(wfr *v1alpha1.WorkflowRun)

Refresh refreshes the WorkflowRun in the queue, the refresh time would be updated.

type LimitedSortedQueue

type LimitedSortedQueue struct {
	// contains filtered or unexported fields
}

LimitedSortedQueue is a sorted fixed length queue implemented with single linked list. Note that each queue would have a sentinel node to assist the implementation, it's a dummy node, and won't be counted in the queue size. So an empty queue would have head pointed to dummy node, with queue size 0.

func NewQueue

func NewQueue(key string, max int) *LimitedSortedQueue

NewQueue creates a limited sorted queue.

func (*LimitedSortedQueue) Pop

func (q *LimitedSortedQueue) Pop() *Node

Pop pops up a WorkflowRun object from the queue, it's the oldest one that will be popped.

func (*LimitedSortedQueue) PushOrRefresh

func (q *LimitedSortedQueue) PushOrRefresh(wfr *v1alpha1.WorkflowRun)

PushOrRefresh pushes a WorkflowRun object to the queue, it will be inserted in the right place to keep the queue sorted by creation time. If the object already existed in the queue, its refresh time would be updated.

func (*LimitedSortedQueue) Refresh added in v0.9.3

func (q *LimitedSortedQueue) Refresh(wfr *v1alpha1.WorkflowRun) bool

Refresh updates refresh time of WorkflowRun in the queue, if the WorkflowRun found in the queue and update successfully, return true, otherwise return false.

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node represents a WorkflowRun in the queue. The 'next' link to next node in the queue, and the 'refresh' stands the last time this node is refreshed.

'refresh' here is used to deal with a rarely occurred case: when one WorkflowRun got deleted in etcd, but workflow controller didn't get notified. Workflow controller would perform resync with etcd regularly, (5 minutes by default), during resync every WorkflowRun in the queue would be refreshed, it one WorkflowRun is deleted in etcd abnormally, it wouldn't get refreshed in the queue, so we can judge by the refresh time for this case.

When we found a node that hasn't be refreshed for a long time (for example, twice the resync period), then we remove this node from the queue.

type Operator

type Operator interface {
	// Get WorkflowRun instance.
	GetWorkflowRun() *v1alpha1.WorkflowRun
	// Update WorkflowRun, mainly the status.
	Update() error
	// Update stage status.
	UpdateStageStatus(stage string, status *v1alpha1.Status)
	// Update stage pod info.
	UpdateStagePodInfo(stage string, podInfo *v1alpha1.PodInfo)
	// Decide overall status of the WorkflowRun from stage status.
	OverallStatus() (*v1alpha1.Status, error)
	// Garbage collection on the WorkflowRun based on GC policy configured
	// in Workflow Controller. Pod and data on PV would be cleaned.
	// - 'lastTry' indicates whether this is the last time to perform GC,
	// if set to true, the WorkflowRun status will be marked as cleaned regardless
	// whether the GC action succeeded or not.
	// - 'wfrDeletion' indicates whether the GC is performed because of WorkflowRun deleted.
	GC(lastTry, wfrDeletion bool) error
	// Run next stages in the Workflow and resolve overall status.
	Reconcile() error
}

Operator is used to perform operations on a WorkflowRun instance, such as update status, run next stages, garbage collection, etc.

func NewOperator

func NewOperator(client clientset.Interface, wfr interface{}, namespace string) (Operator, error)

NewOperator create a new operator.

type PodBuilder

type PodBuilder struct {
	// contains filtered or unexported fields
}

PodBuilder is builder used to build pod for stage

func NewPodBuilder

func NewPodBuilder(client clientset.Interface, wf *v1alpha1.Workflow, wfr *v1alpha1.WorkflowRun, stage string) *PodBuilder

NewPodBuilder creates a new pod builder.

func (*PodBuilder) AddCoordinator

func (m *PodBuilder) AddCoordinator() error

AddCoordinator adds coordinator container as sidecar to pod. Coordinator is used to collect logs, artifacts and notify resource resolvers to push resources.

func (*PodBuilder) AddVolumeMounts

func (m *PodBuilder) AddVolumeMounts() error

AddVolumeMounts add common PVC to workload containers

func (*PodBuilder) ApplyResourceRequirements

func (m *PodBuilder) ApplyResourceRequirements() error

ApplyResourceRequirements applies resource requirements containers in the pod. Resource requirements can be specified in three places (ordered by priority descending order): - In the Stage spec - In the Workflow spec - In the Workflow Controller configurations as default values. So requirements set in stage spec would have the highest priority.

func (*PodBuilder) ApplyServiceAccount

func (m *PodBuilder) ApplyServiceAccount() error

ApplyServiceAccount applies service account to pod

func (*PodBuilder) ArtifactFileName

func (m *PodBuilder) ArtifactFileName(stageName, artifactName string) (string, error)

ArtifactFileName gets artifact file name from artifacts path.

func (*PodBuilder) Build

func (m *PodBuilder) Build() (*corev1.Pod, error)

Build ...

func (*PodBuilder) CreateEmptyDirVolume

func (m *PodBuilder) CreateEmptyDirVolume(volumeName string)

CreateEmptyDirVolume creates a EmptyDir volume for the pod with the given name

func (*PodBuilder) CreatePVCVolume

func (m *PodBuilder) CreatePVCVolume(volumeName, pvc string) string

CreatePVCVolume tries to create a PVC volume for the given volume name and PVC name. If no volume available for the PVC, a new volume would be created and the volume name will be returned. If a volume of the given PVC already exists, return name of the volume, note that in this case, the returned volume name is usually different to the provided 'volumeName' argument.

func (*PodBuilder) CreateVolumes

func (m *PodBuilder) CreateVolumes() error

CreateVolumes ...

func (*PodBuilder) InjectEnvs

func (m *PodBuilder) InjectEnvs() error

InjectEnvs injects environment variables to containers, such as WorkflowRun name stage name, namespace.

func (*PodBuilder) Prepare

func (m *PodBuilder) Prepare() error

Prepare ...

func (*PodBuilder) ResolveArguments

func (m *PodBuilder) ResolveArguments() error

ResolveArguments ...

func (*PodBuilder) ResolveInputArtifacts

func (m *PodBuilder) ResolveInputArtifacts() error

ResolveInputArtifacts mount each input artifact from PVC.

func (*PodBuilder) ResolveInputResources

func (m *PodBuilder) ResolveInputResources() error

ResolveInputResources creates init containers for each input resource and also mount resource to workload containers.

func (*PodBuilder) ResolveOutputResources

func (m *PodBuilder) ResolveOutputResources() error

ResolveOutputResources add resource resolvers to pod spec.

type SecretRefValue

type SecretRefValue struct {
	// Namespace of the secret
	Namespace string
	// Name of the secret
	Secret string
	// Json path in the secret to refer the value. If more than one
	// paths provided, value resolved for the previous path would be
	// regarded as a marshaled json and be used as data source to the
	// following one.
	Jsonpaths []string
}

SecretRefValue represents a value in a secret. It's defined by a secret and json paths.

func NewSecretRefValue

func NewSecretRefValue() *SecretRefValue

NewSecretRefValue create a secret reference value.

func (*SecretRefValue) Parse

func (r *SecretRefValue) Parse(ref string) error

Parse parses a given ref. The reference value specifies json path in a secret. Format of the reference is: $.<namespace>.<secret-name>/<jsonpath>/<jsonpath> For example, in secret (named 'secret' under namespace 'ns'):

{
 "apiVersion": "v1",
  "data": {
   "key": "KEY",
   "json": "{\"user\":{\"id\": \"111\"}}"
  },
  "kind": "Secret",
  "type": "Opaque",
  "metadata": {
      ...
  }
}

$.ns.secret/data.key --> KEY $.ns.secret/data.json/user.id --> 111

func (*SecretRefValue) Resolve

func (r *SecretRefValue) Resolve(client clientset.Interface) (interface{}, error)

Resolve resolves the secret ref and get the real value.

type TimeoutProcessor

type TimeoutProcessor struct {
	// contains filtered or unexported fields
}

TimeoutProcessor manages timeout of WorkflowRun.

func NewTimeoutProcessor

func NewTimeoutProcessor(client clientset.Interface) *TimeoutProcessor

NewTimeoutProcessor creates a timeout manager and run it.

func (*TimeoutProcessor) Add

Add adds a WorkflowRun to the timeout manager.

func (*TimeoutProcessor) Run

func (m *TimeoutProcessor) Run(interval time.Duration)

Run will check timeout of managed WorkflowRun and process items that have expired their time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL