Documentation
¶
Index ¶
- Constants
- Variables
- func GetInputArtifactEnvName(atfName string) string
- func GetOutputArtifactEnvName(atfName string) string
- func InvalidParamTypeError(param interface{}, expected string) error
- func MismatchRegexError(param, regex string) error
- func StringsContain(items []string, item string) bool
- func UnsupportedDictParamTypeError(unsupportedType string, paramName string, param interface{}) error
- func UnsupportedParamTypeError(param interface{}, paramName string) error
- func UnsupportedPathParamError(param interface{}, paramName string) error
- type BaseJob
- type BaseWorkflow
- type CacheCalculator
- type DictParam
- type Job
- type LocalJob
- type PaddleFlowJob
- func (pfj *PaddleFlowJob) Cancelled() bool
- func (pfj *PaddleFlowJob) Check() (schema.JobStatus, error)
- func (pfj *PaddleFlowJob) Failed() bool
- func (pfj *PaddleFlowJob) Job() BaseJob
- func (pfj *PaddleFlowJob) NotEnded() bool
- func (pfj *PaddleFlowJob) Skipped() bool
- func (pfj *PaddleFlowJob) Start() (string, error)
- func (pfj *PaddleFlowJob) Started() bool
- func (pfj *PaddleFlowJob) Stop() error
- func (pfj *PaddleFlowJob) Succeeded() bool
- func (pfj *PaddleFlowJob) Terminated() bool
- func (pfj *PaddleFlowJob) Update(cmd string, params map[string]string, envs map[string]string, ...) error
- func (pfj *PaddleFlowJob) Validate() error
- func (pfj *PaddleFlowJob) Watch(ch chan WorkflowEvent) error
- type ResourceHandler
- type StatusToSteps
- type Step
- type StepParamChecker
- type StepParamSolver
- type WfEventType
- type WfEventValue
- type Workflow
- type WorkflowCallbacks
- type WorkflowEvent
- type WorkflowRuntime
- func (wfr *WorkflowRuntime) DecConcurrentJobs(slots int)
- func (wfr *WorkflowRuntime) IncConcurrentJobs(slots int)
- func (wfr *WorkflowRuntime) IsCompleted() bool
- func (wfr *WorkflowRuntime) Listen()
- func (wfr *WorkflowRuntime) ProcessFailureOptions(event WorkflowEvent)
- func (wfr *WorkflowRuntime) ProcessFailureOptionsWithContinue(step *Step)
- func (wfr *WorkflowRuntime) ProcessFailureOptionsWithFailFast(step *Step)
- func (wfr *WorkflowRuntime) Restart() error
- func (wfr *WorkflowRuntime) Start() error
- func (wfr *WorkflowRuntime) Status() string
- func (wfr *WorkflowRuntime) Stop(force bool) error
Constants ¶
const ( // 事件类型 WfEventNoraml WfEventType = "Normal" // 正常事件类型,包括状态更新等 WfEventWarn WfEventType = "Warnning" // 警告事件类型,不影响业务运行,但需要关注 WfEventError WfEventType = "Error" // 异常事件类型,影响业务运行,例如其他模块函数返回异常 // 事件值 WfEventJobUpdate WfEventValue = "JobUpdate" WfEventRunUpdate WfEventValue = "RunUpdate" WfEventJobSubmitErr WfEventValue = "JobSubmitErr" WfEventJobWatchErr WfEventValue = "JobWatchErr" WfEventJobStopErr WfEventValue = "JobStopErr" )
Variables ¶
var NewStep = func(name string, wfr *WorkflowRuntime, info *schema.WorkflowSourceStep, disabled bool, nodeType NodeType) (*Step, error) { jobName := fmt.Sprintf("%s-%s", wfr.wf.RunID, name) job := NewPaddleFlowJob(jobName, info.DockerEnv, info.Deps) st := &Step{ name: name, wfr: wfr, info: info, disabled: disabled, ready: make(chan bool, 1), cancel: make(chan bool, 1), done: false, executed: false, submitted: false, job: job, nodeType: nodeType, } st.getLogger().Debugf("step[%s] of runid[%s] before starting job: param[%s], env[%s], command[%s], artifacts[%s], deps[%s]", st.name, st.wfr.wf.RunID, st.info.Parameters, st.info.Env, st.info.Command, st.info.Artifacts, st.info.Deps) return st, nil }
Functions ¶
func GetInputArtifactEnvName ¶
func InvalidParamTypeError ¶
func MismatchRegexError ¶
func StringsContain ¶
Types ¶
type BaseJob ¶
type BaseJob struct {
Id string `json:"jobID"`
Name string `json:"name"` // step名字,不同run的不同step,必须拥有不同名字
Command string `json:"command"` // 区别于step,是替换后的,可以直接运行
Parameters map[string]string `json:"parameters"` // 区别于step,是替换后的,可以直接运行
Artifacts schema.Artifacts `json:"artifacts"` // 区别于step,是替换后的,可以直接运行
Env map[string]string `json:"env"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
Status schema.JobStatus `json:"status"`
Deps string `json:"deps"`
Message string `json:"message"`
}
func NewBaseJob ¶
type BaseWorkflow ¶
type BaseWorkflow struct {
Name string `json:"name,omitempty"`
RunID string `json:"runId,omitempty"`
Desc string `json:"desc,omitempty"`
Entry string `json:"entry,omitempty"`
Params map[string]interface{} `json:"params,omitempty"`
Extra map[string]string `json:"extra,omitempty"` // 可以存放一些ID,fsId,userId等
Source schema.WorkflowSource `json:"-"` // Yaml string
// contains filtered or unexported fields
}
func NewBaseWorkflow ¶
func NewBaseWorkflow(wfSource schema.WorkflowSource, runID, entry string, params map[string]interface{}, extra map[string]string) BaseWorkflow
type CacheCalculator ¶
type CacheCalculator interface {
// 计算第一层 fingerprint
CalculateFirstFingerprint() (fingerprint string, err error)
// 计算 第二层 fingerprint
CalculateSecondFingerprint() (fingerprint string, err error)
// contains filtered or unexported methods
}
func NewAggressiveCacheCalculator ¶
func NewAggressiveCacheCalculator(step Step, cacheConfig schema.Cache) (CacheCalculator, error)
TODO
func NewCacheCalculator ¶
func NewCacheCalculator(step Step, cacheConfig schema.Cache) (CacheCalculator, error)
调用方应该保证在启用了 cache 功能的情况下才会调用NewCacheCalculator
func NewConservativeCacheCalculator ¶
func NewConservativeCacheCalculator(step Step, cacheConfig schema.Cache) (CacheCalculator, error)
调用方应该保证在启用了 cache 功能的情况下才会调用NewConservativeCacheCalculator
type Job ¶
type Job interface {
Job() BaseJob
Update(cmd string, params map[string]string, envs map[string]string, artifacts *schema.Artifacts) error
Validate() error
Start() (string, error)
Stop() error
Check() (schema.JobStatus, error)
Watch(chan WorkflowEvent) error
Started() bool
Succeeded() bool
Failed() bool
Terminated() bool
Skipped() bool
NotEnded() bool
}
type LocalJob ¶
---------------------------------------------------------------------------- Local Process Job ----------------------------------------------------------------------------
type PaddleFlowJob ¶
----------------------------------------------------------------------------
K8S Job
----------------------------------------------------------------------------
func NewPaddleFlowJob ¶
func NewPaddleFlowJob(name, image, deps string) *PaddleFlowJob
func (*PaddleFlowJob) Cancelled ¶
func (pfj *PaddleFlowJob) Cancelled() bool
func (*PaddleFlowJob) Failed ¶
func (pfj *PaddleFlowJob) Failed() bool
func (*PaddleFlowJob) Job ¶
func (pfj *PaddleFlowJob) Job() BaseJob
func (*PaddleFlowJob) NotEnded ¶
func (pfj *PaddleFlowJob) NotEnded() bool
func (*PaddleFlowJob) Skipped ¶
func (pfj *PaddleFlowJob) Skipped() bool
func (*PaddleFlowJob) Started ¶
func (pfj *PaddleFlowJob) Started() bool
func (*PaddleFlowJob) Succeeded ¶
func (pfj *PaddleFlowJob) Succeeded() bool
func (*PaddleFlowJob) Terminated ¶
func (pfj *PaddleFlowJob) Terminated() bool
func (*PaddleFlowJob) Update ¶
func (pfj *PaddleFlowJob) Update(cmd string, params map[string]string, envs map[string]string, artifacts *schema.Artifacts) error
发起作业接口
func (*PaddleFlowJob) Watch ¶
func (pfj *PaddleFlowJob) Watch(ch chan WorkflowEvent) error
同步watch作业接口
type ResourceHandler ¶
type ResourceHandler struct {
// contains filtered or unexported fields
}
* resourceHandler
func NewResourceHandler ¶
func (*ResourceHandler) ClearResource ¶
func (resourceHandler *ResourceHandler) ClearResource() error
type StatusToSteps ¶
type StatusToSteps struct {
SucceededSteps map[string]*Step
FailedSteps map[string]*Step
TerminatedSteps map[string]*Step
CanelledSteps map[string]*Step
SkippedSteps map[string]*Step
// runtime 已经向 step的 ready channel 发送了数据, 同时 Step 尚未处于终态
SubmittedSteps map[string]*Step
// runtime 还未向 step 的 ready channel 发送数据, 同时 Step 也未处于终态
UnsubmittedSteps map[string]*Step
}
func NewStatusToSteps ¶
func NewStatusToSteps() StatusToSteps
type StepParamChecker ¶
type StepParamChecker struct {
// contains filtered or unexported fields
}
func (*StepParamChecker) Check ¶
func (s *StepParamChecker) Check(currentStep string) error
type StepParamSolver ¶
type StepParamSolver struct {
// contains filtered or unexported fields
}
* StepParamSolver * 用于校验以及替换所有WorkflowSourceStep中的参数,分四种情况: * 1. 只校验不替换 * 2. 替换参数,用于计算Cache fingerprint * 3. cache命中后,利用cache替换参数 * 4. 替换参数,用于job运行
func NewStepParamSolver ¶
type WfEventType ¶
type WfEventType string
type WfEventValue ¶
type WfEventValue string
type Workflow ¶
type Workflow struct {
BaseWorkflow
// contains filtered or unexported fields
}
func NewWorkflow ¶
func NewWorkflow(wfSource schema.WorkflowSource, runID, entry string, params map[string]interface{}, extra map[string]string, callbacks WorkflowCallbacks) (*Workflow, error)
实例化一个Workflow,并返回
func (*Workflow) Restart ¶
func (wf *Workflow) Restart()
Restart 从 DB 中恢复重启 workflow Restart 调用逻辑:1. NewWorkflow 2. SetWorkflowRuntime 3. Restart
func (*Workflow) SetWorkflowRuntime ¶
func (wf *Workflow) SetWorkflowRuntime(runtime schema.RuntimeView, postProcess schema.PostProcessView) error
set workflow runtime when server resuming
type WorkflowCallbacks ¶
type WorkflowCallbacks struct {
GetJobCb func(runID string, stepName string) (schema.JobView, error)
UpdateRunCb func(string, interface{}) bool
LogCacheCb func(req schema.LogRunCacheRequest) (string, error)
ListCacheCb func(firstFp, fsID, step, yamlPath string) ([]models.RunCache, error)
LogArtifactCb func(req schema.LogRunArtifactRequest) error
}
type WorkflowEvent ¶
type WorkflowEvent struct {
Type WfEventType
Event WfEventValue
Message string
Extra map[string]interface{}
}
func NewWorkflowEvent ¶
func NewWorkflowEvent(e WfEventValue, msg string, extra map[string]interface{}) *WorkflowEvent
实例化
type WorkflowRuntime ¶
type WorkflowRuntime struct {
// contains filtered or unexported fields
}
工作流运行时
func NewWorkflowRuntime ¶
func NewWorkflowRuntime(wf *Workflow, parallelism int) *WorkflowRuntime
TODO: 将创建 Step 的逻辑迁移至此处,以合理的设置 step 的ctx,和 nodeType 等属性
func (*WorkflowRuntime) DecConcurrentJobs ¶
func (wfr *WorkflowRuntime) DecConcurrentJobs(slots int)
减少多个并行Job
func (*WorkflowRuntime) IncConcurrentJobs ¶
func (wfr *WorkflowRuntime) IncConcurrentJobs(slots int)
增加多个并行Job
func (*WorkflowRuntime) IsCompleted ¶
func (wfr *WorkflowRuntime) IsCompleted() bool
func (*WorkflowRuntime) Listen ¶
func (wfr *WorkflowRuntime) Listen()
func (*WorkflowRuntime) ProcessFailureOptions ¶
func (wfr *WorkflowRuntime) ProcessFailureOptions(event WorkflowEvent)
func (*WorkflowRuntime) ProcessFailureOptionsWithContinue ¶
func (wfr *WorkflowRuntime) ProcessFailureOptionsWithContinue(step *Step)
func (*WorkflowRuntime) ProcessFailureOptionsWithFailFast ¶
func (wfr *WorkflowRuntime) ProcessFailureOptionsWithFailFast(step *Step)
func (*WorkflowRuntime) Status ¶
func (wfr *WorkflowRuntime) Status() string
func (*WorkflowRuntime) Stop ¶
func (wfr *WorkflowRuntime) Stop(force bool) error
Stop 停止 Workflow do not call ctx_cancel(), which will be called when all steps has terminated eventually. 这里不通过 cancel channel 去取消 Step 的原因是防止有多个地方向通过一个 channel 传递东西,防止runtime hang 住