pipeline

package
v0.14.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2022 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// 事件类型
	WfEventNoraml WfEventType = "Normal"   // 正常事件类型,包括状态更新等
	WfEventWarn   WfEventType = "Warnning" // 警告事件类型,不影响业务运行,但需要关注
	WfEventError  WfEventType = "Error"    // 异常事件类型,影响业务运行,例如其他模块函数返回异常

	// 事件值
	WfEventJobUpdate    WfEventValue = "JobUpdate"
	WfEventRunUpdate    WfEventValue = "RunUpdate"
	WfEventJobSubmitErr WfEventValue = "JobSubmitErr"
	WfEventJobWatchErr  WfEventValue = "JobWatchErr"
	WfEventJobStopErr   WfEventValue = "JobStopErr"
)

Variables

View Source
var NewStep = func(name string, wfr *WorkflowRuntime, info *schema.WorkflowSourceStep,
	disabled bool, nodeType NodeType) (*Step, error) {

	jobName := fmt.Sprintf("%s-%s", wfr.wf.RunID, name)
	job := NewPaddleFlowJob(jobName, info.DockerEnv, info.Deps)

	st := &Step{
		name:      name,
		wfr:       wfr,
		info:      info,
		disabled:  disabled,
		ready:     make(chan bool, 1),
		cancel:    make(chan bool, 1),
		done:      false,
		executed:  false,
		submitted: false,
		job:       job,
		nodeType:  nodeType,
	}

	st.getLogger().Debugf("step[%s] of runid[%s] before starting job: param[%s], env[%s], command[%s], artifacts[%s], deps[%s]", st.name, st.wfr.wf.RunID, st.info.Parameters, st.info.Env, st.info.Command, st.info.Artifacts, st.info.Deps)
	return st, nil
}

Functions

func GetInputArtifactEnvName

func GetInputArtifactEnvName(atfName string) string

func GetOutputArtifactEnvName

func GetOutputArtifactEnvName(atfName string) string

func InvalidParamTypeError

func InvalidParamTypeError(param interface{}, expected string) error

func MismatchRegexError

func MismatchRegexError(param, regex string) error

func StringsContain

func StringsContain(items []string, item string) bool

func UnsupportedDictParamTypeError

func UnsupportedDictParamTypeError(unsupportedType string, paramName string, param interface{}) error

func UnsupportedParamTypeError

func UnsupportedParamTypeError(param interface{}, paramName string) error

func UnsupportedPathParamError

func UnsupportedPathParamError(param interface{}, paramName string) error

Types

type BaseJob

type BaseJob struct {
	Id         string            `json:"jobID"`
	Name       string            `json:"name"`       // step名字,不同run的不同step,必须拥有不同名字
	Command    string            `json:"command"`    // 区别于step,是替换后的,可以直接运行
	Parameters map[string]string `json:"parameters"` // 区别于step,是替换后的,可以直接运行
	Artifacts  schema.Artifacts  `json:"artifacts"`  // 区别于step,是替换后的,可以直接运行
	Env        map[string]string `json:"env"`
	StartTime  string            `json:"startTime"`
	EndTime    string            `json:"endTime"`
	Status     schema.JobStatus  `json:"status"`
	Deps       string            `json:"deps"`
	Message    string            `json:"message"`
}

func NewBaseJob

func NewBaseJob(name, deps string) *BaseJob

type BaseWorkflow

type BaseWorkflow struct {
	Name   string                 `json:"name,omitempty"`
	RunID  string                 `json:"runId,omitempty"`
	Desc   string                 `json:"desc,omitempty"`
	Entry  string                 `json:"entry,omitempty"`
	Params map[string]interface{} `json:"params,omitempty"`
	Extra  map[string]string      `json:"extra,omitempty"` // 可以存放一些ID,fsId,userId等
	Source schema.WorkflowSource  `json:"-"`               // Yaml string
	// contains filtered or unexported fields
}

func NewBaseWorkflow

func NewBaseWorkflow(wfSource schema.WorkflowSource, runID, entry string, params map[string]interface{}, extra map[string]string) BaseWorkflow

type CacheCalculator

type CacheCalculator interface {

	// 计算第一层 fingerprint
	CalculateFirstFingerprint() (fingerprint string, err error)

	// 计算 第二层 fingerprint
	CalculateSecondFingerprint() (fingerprint string, err error)
	// contains filtered or unexported methods
}

func NewAggressiveCacheCalculator

func NewAggressiveCacheCalculator(step Step, cacheConfig schema.Cache) (CacheCalculator, error)

TODO

func NewCacheCalculator

func NewCacheCalculator(step Step, cacheConfig schema.Cache) (CacheCalculator, error)

调用方应该保证在启用了 cache 功能的情况下才会调用NewCacheCalculator

func NewConservativeCacheCalculator

func NewConservativeCacheCalculator(step Step, cacheConfig schema.Cache) (CacheCalculator, error)

调用方应该保证在启用了 cache 功能的情况下才会调用NewConservativeCacheCalculator

type DictParam

type DictParam struct {
	Type    string
	Default interface{}
}

func (*DictParam) From

func (p *DictParam) From(origin interface{}) error

type Job

type Job interface {
	Job() BaseJob
	Update(cmd string, params map[string]string, envs map[string]string, artifacts *schema.Artifacts) error
	Validate() error
	Start() (string, error)
	Stop() error
	Check() (schema.JobStatus, error)
	Watch(chan WorkflowEvent) error
	Started() bool
	Succeeded() bool
	Failed() bool
	Terminated() bool
	Skipped() bool
	NotEnded() bool
}

type LocalJob

type LocalJob struct {
	BaseJob
	Pid string
}

---------------------------------------------------------------------------- Local Process Job ----------------------------------------------------------------------------

type PaddleFlowJob

type PaddleFlowJob struct {
	BaseJob
	Image string
}

----------------------------------------------------------------------------

K8S Job

----------------------------------------------------------------------------

func NewPaddleFlowJob

func NewPaddleFlowJob(name, image, deps string) *PaddleFlowJob

func (*PaddleFlowJob) Cancelled

func (pfj *PaddleFlowJob) Cancelled() bool

func (*PaddleFlowJob) Check

func (pfj *PaddleFlowJob) Check() (schema.JobStatus, error)

查作业状态接口

func (*PaddleFlowJob) Failed

func (pfj *PaddleFlowJob) Failed() bool

func (*PaddleFlowJob) Job

func (pfj *PaddleFlowJob) Job() BaseJob

func (*PaddleFlowJob) NotEnded

func (pfj *PaddleFlowJob) NotEnded() bool

func (*PaddleFlowJob) Skipped

func (pfj *PaddleFlowJob) Skipped() bool

func (*PaddleFlowJob) Start

func (pfj *PaddleFlowJob) Start() (string, error)

发起作业接口

func (*PaddleFlowJob) Started

func (pfj *PaddleFlowJob) Started() bool

func (*PaddleFlowJob) Stop

func (pfj *PaddleFlowJob) Stop() error

停止作业接口

func (*PaddleFlowJob) Succeeded

func (pfj *PaddleFlowJob) Succeeded() bool

func (*PaddleFlowJob) Terminated

func (pfj *PaddleFlowJob) Terminated() bool

func (*PaddleFlowJob) Update

func (pfj *PaddleFlowJob) Update(cmd string, params map[string]string, envs map[string]string, artifacts *schema.Artifacts) error

发起作业接口

func (*PaddleFlowJob) Validate

func (pfj *PaddleFlowJob) Validate() error

校验job参数

func (*PaddleFlowJob) Watch

func (pfj *PaddleFlowJob) Watch(ch chan WorkflowEvent) error

同步watch作业接口

type ResourceHandler

type ResourceHandler struct {
	// contains filtered or unexported fields
}

* resourceHandler

func NewResourceHandler

func NewResourceHandler(runID string, fsID string, logger *log.Entry) (ResourceHandler, error)

func (*ResourceHandler) ClearResource

func (resourceHandler *ResourceHandler) ClearResource() error

type StatusToSteps

type StatusToSteps struct {
	SucceededSteps  map[string]*Step
	FailedSteps     map[string]*Step
	TerminatedSteps map[string]*Step
	CanelledSteps   map[string]*Step
	SkippedSteps    map[string]*Step

	// runtime 已经向 step的 ready channel 发送了数据, 同时 Step 尚未处于终态
	SubmittedSteps map[string]*Step

	// runtime 还未向 step 的 ready channel 发送数据, 同时 Step 也未处于终态
	UnsubmittedSteps map[string]*Step
}

func NewStatusToSteps

func NewStatusToSteps() StatusToSteps

type Step

type Step struct {
	CacheRunID string
	// contains filtered or unexported fields
}

func (*Step) Execute

func (st *Step) Execute()

步骤执行

func (*Step) Watch

func (st *Step) Watch()

步骤监控

type StepParamChecker

type StepParamChecker struct {
	// contains filtered or unexported fields
}

func (*StepParamChecker) Check

func (s *StepParamChecker) Check(currentStep string) error

type StepParamSolver

type StepParamSolver struct {
	// contains filtered or unexported fields
}

* StepParamSolver * 用于校验以及替换所有WorkflowSourceStep中的参数,分四种情况: * 1. 只校验不替换 * 2. 替换参数,用于计算Cache fingerprint * 3. cache命中后,利用cache替换参数 * 4. 替换参数,用于job运行

func NewStepParamSolver

func NewStepParamSolver(
	steps map[string]*schema.WorkflowSourceStep,
	sysParams map[string]string,
	jobs map[string]Job,
	forCacheFingerprint bool,
	pplName string,
	runID string,
	fsID string,
	logger *log.Entry) StepParamSolver

func (*StepParamSolver) Solve

func (s *StepParamSolver) Solve(currentStep string, cacheOutputArtifacts map[string]string) error

type WfEventType

type WfEventType string

type WfEventValue

type WfEventValue string

type Workflow

type Workflow struct {
	BaseWorkflow
	// contains filtered or unexported fields
}

func NewWorkflow

func NewWorkflow(wfSource schema.WorkflowSource, runID, entry string, params map[string]interface{}, extra map[string]string,
	callbacks WorkflowCallbacks) (*Workflow, error)

实例化一个Workflow,并返回

func (*Workflow) Restart

func (wf *Workflow) Restart()

Restart 从 DB 中恢复重启 workflow Restart 调用逻辑:1. NewWorkflow 2. SetWorkflowRuntime 3. Restart

func (*Workflow) SetWorkflowRuntime

func (wf *Workflow) SetWorkflowRuntime(runtime schema.RuntimeView, postProcess schema.PostProcessView) error

set workflow runtime when server resuming

func (*Workflow) Start

func (wf *Workflow) Start()

Start to run a workflow

func (*Workflow) Status

func (wf *Workflow) Status() string

func (*Workflow) Stop

func (wf *Workflow) Stop(force bool)

Stop a workflow

type WorkflowCallbacks

type WorkflowCallbacks struct {
	GetJobCb      func(runID string, stepName string) (schema.JobView, error)
	UpdateRunCb   func(string, interface{}) bool
	LogCacheCb    func(req schema.LogRunCacheRequest) (string, error)
	ListCacheCb   func(firstFp, fsID, step, yamlPath string) ([]models.RunCache, error)
	LogArtifactCb func(req schema.LogRunArtifactRequest) error
}

type WorkflowEvent

type WorkflowEvent struct {
	Type    WfEventType
	Event   WfEventValue
	Message string
	Extra   map[string]interface{}
}

func NewWorkflowEvent

func NewWorkflowEvent(e WfEventValue, msg string, extra map[string]interface{}) *WorkflowEvent

实例化

type WorkflowRuntime

type WorkflowRuntime struct {
	// contains filtered or unexported fields
}

工作流运行时

func NewWorkflowRuntime

func NewWorkflowRuntime(wf *Workflow, parallelism int) *WorkflowRuntime

TODO: 将创建 Step 的逻辑迁移至此处,以合理的设置 step 的ctx,和 nodeType 等属性

func (*WorkflowRuntime) DecConcurrentJobs

func (wfr *WorkflowRuntime) DecConcurrentJobs(slots int)

减少多个并行Job

func (*WorkflowRuntime) IncConcurrentJobs

func (wfr *WorkflowRuntime) IncConcurrentJobs(slots int)

增加多个并行Job

func (*WorkflowRuntime) IsCompleted

func (wfr *WorkflowRuntime) IsCompleted() bool

func (*WorkflowRuntime) Listen

func (wfr *WorkflowRuntime) Listen()

func (*WorkflowRuntime) ProcessFailureOptions

func (wfr *WorkflowRuntime) ProcessFailureOptions(event WorkflowEvent)

func (*WorkflowRuntime) ProcessFailureOptionsWithContinue

func (wfr *WorkflowRuntime) ProcessFailureOptionsWithContinue(step *Step)

func (*WorkflowRuntime) ProcessFailureOptionsWithFailFast

func (wfr *WorkflowRuntime) ProcessFailureOptionsWithFailFast(step *Step)

func (*WorkflowRuntime) Restart

func (wfr *WorkflowRuntime) Restart() error

Restart 从 DB 中恢复重启

func (*WorkflowRuntime) Start

func (wfr *WorkflowRuntime) Start() error

运行

func (*WorkflowRuntime) Status

func (wfr *WorkflowRuntime) Status() string

func (*WorkflowRuntime) Stop

func (wfr *WorkflowRuntime) Stop(force bool) error

Stop 停止 Workflow do not call ctx_cancel(), which will be called when all steps has terminated eventually. 这里不通过 cancel channel 去取消 Step 的原因是防止有多个地方向通过一个 channel 传递东西,防止runtime hang 住

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL