Documentation
¶
Index ¶
- func GetResourceVolumeName(resourceName string) string
- func NewWorkflowRunItem(wfr *v1alpha1.WorkflowRun) *workflowRunItem
- func NextStages(wf *v1alpha1.Workflow, wfr *v1alpha1.WorkflowRun) []string
- func ParseTime(t string) (time.Duration, error)
- type GCProcessor
- type LimitedQueues
- type LimitedSortedQueue
- type Node
- type Operator
- type PodBuilder
- func (m *PodBuilder) AddCoordinator() error
- func (m *PodBuilder) AddVolumeMounts() error
- func (m *PodBuilder) ApplyQuota() error
- func (m *PodBuilder) ApplyTemplate() error
- func (m *PodBuilder) ArtifactFileName(stageName, artifactName string) (string, error)
- func (m *PodBuilder) Build() (*corev1.Pod, error)
- func (m *PodBuilder) CreateEmptyDirVolume(volumeName string)
- func (m *PodBuilder) CreatePVCVolume(volumeName, pvc string) string
- func (m *PodBuilder) CreateVolumes() error
- func (m *PodBuilder) InjectEnvs() error
- func (m *PodBuilder) Prepare() error
- func (m *PodBuilder) ResolveArguments() error
- func (m *PodBuilder) ResolveInputArtifacts() error
- func (m *PodBuilder) ResolveInputResources() error
- func (m *PodBuilder) ResolveOutputResources() error
- type TimeoutProcessor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetResourceVolumeName ¶
GetResourceVolumeName generates a volume name for a resource.
func NewWorkflowRunItem ¶
func NewWorkflowRunItem(wfr *v1alpha1.WorkflowRun) *workflowRunItem
func NextStages ¶
func NextStages(wf *v1alpha1.Workflow, wfr *v1alpha1.WorkflowRun) []string
NextStages determine next stages that can be started to execute. It returns stages that are not started yet but have all depended stages finished.
Types ¶
type GCProcessor ¶
type GCProcessor struct {
// contains filtered or unexported fields
}
GCProcessor processes garbage collection for WorkflowRun objects.
func NewGCProcessor ¶
func NewGCProcessor(client clientset.Interface, enabled bool) *GCProcessor
func (*GCProcessor) Add ¶
func (p *GCProcessor) Add(wfr *v1alpha1.WorkflowRun)
Add WorkflowRun object to GC processor, it will firstly judge whether the WorkflowRun object needs GC, if it's true, it will perform GC on it in the right time.
type LimitedQueues ¶
type LimitedQueues struct {
// Maximum queue size, it indicates maximum number of WorkflowRuns to retain for each Workflow.
MaxQueueSize int
// Workflow queue map. It use Workflow name and namespace as the key, and manage Queue for each
// Workflow.
Queues map[string]*LimitedSortedQueue
// k8s client used to clean old WorkflowRun
Client clientset.Interface
}
LimitedQueues manages WorkflowRun queue for each Workflow. Queue for each Workflow is limited to a given maximum size, if new WorkflowRun created, the oldest one would be removed.
func NewLimitedQueues ¶
func NewLimitedQueues(client clientset.Interface, maxSize int) *LimitedQueues
NewLimitedQueues creates a limited queues for WorkflowRuns, and start auto scan.
func (*LimitedQueues) AddOrRefresh ¶
func (w *LimitedQueues) AddOrRefresh(wfr *v1alpha1.WorkflowRun)
AddOrRefresh adds a WorkflowRun to its corresponding queue, if the queue size exceed the maximum size, the oldest one would be deleted. And if the WorkflowRun already exists in the queue, its 'refresh' time field would be refreshed.
func (*LimitedQueues) AutoScan ¶
func (w *LimitedQueues) AutoScan()
AutoScan scans all WorkflowRuns in the queues regularly, remove abnormal ones with old enough refresh time.
type LimitedSortedQueue ¶
type LimitedSortedQueue struct {
// contains filtered or unexported fields
}
LimitedSortedQueue is a sorted fixed length queue implemented with single linked list. Note that each queue would have a sentinel node to assist the implementation, it's a dummy node, and won't be counted in the queue size. So an empty queue would have head pointed to dummy node, with queue size 0.
func (*LimitedSortedQueue) Pop ¶
func (q *LimitedSortedQueue) Pop() *Node
Pop pops up a WorkflowRun object from the queue, it's the oldest one that will be popped.
func (*LimitedSortedQueue) PushOrRefresh ¶
func (q *LimitedSortedQueue) PushOrRefresh(wfr *v1alpha1.WorkflowRun)
PushOrRefresh pushes a WorkflowRun object to the queue, it will be inserted in the right place to keep the queue sorted by creation time. If the object already existed in the queue, its refresh time would be updated.
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node represents a WorkflowRun in the queue. The 'next' link to next node in the queue, and the 'refresh' stands the last time this node is refreshed.
'refresh' here is used to deal with a rarely occurred case: when one WorkflowRun got deleted in etcd, but workflow controller didn't get notified. Workflow controller would perform resync with etcd regularly, (5 minutes by default), during resync every WorkflowRun in the queue would be refreshed, it one WorkflowRun is deleted in etcd abnormally, it wouldn't get refreshed in the queue, so we can judge by the refresh time for this case.
When we found a node that hasn't be refreshed for a long time (for example, twice the resync period), then we remove this node from the queue.
type Operator ¶
type Operator interface {
// Get WorkflowRun instance.
GetWorkflowRun() *v1alpha1.WorkflowRun
// Update WorkflowRun, mainly the status.
Update() error
// Update stage status.
UpdateStageStatus(stage string, status *v1alpha1.Status)
// Update stage pod info.
UpdateStagePodInfo(stage string, podInfo *v1alpha1.PodInfo)
// Decide overall status of the WorkflowRun from stage status.
OverallStatus() (*v1alpha1.Status, error)
// Garbage collection on the WorkflowRun based on GC policy configured
// in Workflow Controller. Pod and data on PV would be cleaned.
// 'lastTry' indicates whether this is the last time to perform GC,
// if set to true, the WorkflowRun status will be marked as cleaned regardless
// whether the GC action succeeded or not.
GC(lastTry bool) error
// Run next stages in the Workflow and resolve overall status.
Reconcile() error
}
Operator is used to perform operations on a WorkflowRun instance, such as update status, run next stages, garbage collection, etc.
type PodBuilder ¶
type PodBuilder struct {
// contains filtered or unexported fields
}
func NewPodBuilder ¶
func NewPodBuilder(client clientset.Interface, wf *v1alpha1.Workflow, wfr *v1alpha1.WorkflowRun, stage string) *PodBuilder
func (*PodBuilder) AddCoordinator ¶
func (m *PodBuilder) AddCoordinator() error
AddCoordinator adds coordinator container as sidecar to pod. Coordinator is used to collect logs, artifacts and notify resource resolvers to push resources.
func (*PodBuilder) AddVolumeMounts ¶
func (m *PodBuilder) AddVolumeMounts() error
AddVolumeMounts add common PVC to workload containers
func (*PodBuilder) ApplyQuota ¶
func (m *PodBuilder) ApplyQuota() error
ApplyQuota applies default quota to all containers without quota specified in the pod.
func (*PodBuilder) ApplyTemplate ¶
func (m *PodBuilder) ApplyTemplate() error
TODO(ChenDe): Implement stage template.
func (*PodBuilder) ArtifactFileName ¶
func (m *PodBuilder) ArtifactFileName(stageName, artifactName string) (string, error)
func (*PodBuilder) CreateEmptyDirVolume ¶
func (m *PodBuilder) CreateEmptyDirVolume(volumeName string)
CreateEmptyDirVolume creats a EmptyDir volume for the pod with the given name
func (*PodBuilder) CreatePVCVolume ¶
func (m *PodBuilder) CreatePVCVolume(volumeName, pvc string) string
CreatePVCVolume tries to create a PVC volume for the given volume name and PVC name. If no volume available for the PVC, a new volume would be created and the volume name will be returned. If a volume of the given PVC already exists, return name of the volume, note that in this case, the returned volume name is usually different to the provided 'volumeName' argument.
func (*PodBuilder) CreateVolumes ¶
func (m *PodBuilder) CreateVolumes() error
func (*PodBuilder) InjectEnvs ¶
func (m *PodBuilder) InjectEnvs() error
InjectEnvs injects environment variables to containers, such as WorkflowRun name stage name, namespace.
func (*PodBuilder) Prepare ¶
func (m *PodBuilder) Prepare() error
func (*PodBuilder) ResolveArguments ¶
func (m *PodBuilder) ResolveArguments() error
func (*PodBuilder) ResolveInputArtifacts ¶
func (m *PodBuilder) ResolveInputArtifacts() error
ResolveInputArtifacts mount each input artifact from PVC.
func (*PodBuilder) ResolveInputResources ¶
func (m *PodBuilder) ResolveInputResources() error
ResolveInputResources creates init containers for each input resource and also mount resource to workload containers.
func (*PodBuilder) ResolveOutputResources ¶
func (m *PodBuilder) ResolveOutputResources() error
ResolveOutputResources add resource resolvers to pod spec.
type TimeoutProcessor ¶
type TimeoutProcessor struct {
// contains filtered or unexported fields
}
TimeoutProcessor manages timeout of WorkflowRun.
func NewTimeoutProcessor ¶
func NewTimeoutProcessor(client clientset.Interface) *TimeoutProcessor
NewTimeoutManager creates a timeout manager and run it.
func (*TimeoutProcessor) Add ¶
func (m *TimeoutProcessor) Add(wfr *v1alpha1.WorkflowRun) error
Add adds a WorkflowRun to the timeout manager.
func (*TimeoutProcessor) Run ¶
func (m *TimeoutProcessor) Run(interval time.Duration)
Run will check timeout of managed WorkflowRun and process items that have expired their time.