Documentation
¶
Index ¶
- Constants
- func CheckReg(str, pattern string) bool
- func CheckScalarResource(res string) error
- func ConcatenatePVCName(fsID string) string
- func ConcatenatePVName(namespace, fsID string) string
- func GetBindSource(fsID string) string
- func ID(userName, fsName string) string
- func IsEmptyResource(resourceInfo ResourceInfo) bool
- func IsImmutableJobStatus(status JobStatus) bool
- func IsValidFsMetaDriver(metaDriver string) bool
- func ProcessStepCacheByMap(cache *Cache, globalCacheMap map[string]interface{}, ...) error
- func ProcessStepFsMount(fsMountList *[]FsMount, globalFsMountList []interface{}) error
- func RunYaml2Map(runYaml []byte) (map[string]interface{}, error)
- func ValidateResource(resourceInfo ResourceInfo, scalarResourcesType []string) error
- func ValidateResourceItem(res string) error
- func ValidateScalarResourceInfo(scalarResources ScalarResourcesType, scalarResourcesType []string) error
- type ActionType
- type Artifacts
- type Cache
- type ClientOptions
- type Cluster
- type Component
- type ComponentView
- type Conf
- func (c *Conf) Framework() Framework
- func (c *Conf) GetAllFileSystem() []FileSystem
- func (c *Conf) GetAnnotations() map[string]string
- func (c *Conf) GetArgs() []string
- func (c *Conf) GetClusterID() string
- func (c *Conf) GetCommand() string
- func (c *Conf) GetEnv() map[string]string
- func (c *Conf) GetEnvSubset(prefix string) map[string]string
- func (c *Conf) GetEnvValue(key string) string
- func (c *Conf) GetExtraFS() []FileSystem
- func (c *Conf) GetFS() string
- func (c *Conf) GetFileSystem() FileSystem
- func (c *Conf) GetFlavour() string
- func (c *Conf) GetImage() string
- func (c *Conf) GetJobExecutorReplicas() string
- func (c *Conf) GetJobMode() string
- func (c *Conf) GetJobReplicas() string
- func (c *Conf) GetLabels() map[string]string
- func (c *Conf) GetName() string
- func (c *Conf) GetNamespace() string
- func (c *Conf) GetPSCommand() string
- func (c *Conf) GetPSFlavour() string
- func (c *Conf) GetPSReplicas() string
- func (c *Conf) GetPriority() string
- func (c *Conf) GetQueueID() string
- func (c *Conf) GetQueueName() string
- func (c *Conf) GetRestartPolicy() string
- func (c *Conf) GetUserName() string
- func (c *Conf) GetWorkerCommand() string
- func (c *Conf) GetWorkerFlavour() string
- func (c *Conf) GetWorkerReplicas() string
- func (c *Conf) GetYamlPath() string
- func (c *Conf) SetAnnotations(k, v string)
- func (c *Conf) SetClusterID(id string)
- func (c *Conf) SetEnv(name, value string)
- func (c *Conf) SetFS(fsID string)
- func (c *Conf) SetFlavour(flavourKey string)
- func (c *Conf) SetLabels(k, v string)
- func (c *Conf) SetNamespace(ns string)
- func (c *Conf) SetPSFlavour(flavourKey string)
- func (c *Conf) SetPriority(pc string)
- func (c *Conf) SetQueueID(id string)
- func (c *Conf) SetQueueName(queueName string)
- func (c *Conf) SetUserName(userName string)
- func (c *Conf) SetWorkerFlavour(flavourKey string)
- func (c *Conf) Type() JobType
- type DagView
- func (d DagView) GetComponentName() string
- func (d DagView) GetDeps() string
- func (d DagView) GetEndTime() string
- func (d DagView) GetMsg() string
- func (d DagView) GetName() string
- func (d DagView) GetParentDagID() string
- func (d DagView) GetSeq() int
- func (d DagView) GetStartTime() string
- func (d DagView) GetStatus() JobStatus
- func (d *DagView) SetDeps(deps string)
- type FailureOptions
- type FileSystem
- type Flavour
- type Framework
- type FrameworkVersion
- type FsMount
- type FsOptions
- type FsScope
- type JobLogInfo
- type JobLogRequest
- type JobStatus
- type JobType
- type JobView
- func (j JobView) GetComponentName() string
- func (j JobView) GetDeps() string
- func (j JobView) GetEndTime() string
- func (j JobView) GetMsg() string
- func (j JobView) GetName() string
- func (j JobView) GetParentDagID() string
- func (j JobView) GetSeq() int
- func (j JobView) GetStartTime() string
- func (j JobView) GetStatus() JobStatus
- func (j *JobView) SetDeps(deps string)
- type LogInfo
- type LogRunArtifactRequest
- type LogRunCacheRequest
- type Member
- type MemberRole
- type NodeQuotaInfo
- type PFJobConf
- type Parser
- func (p *Parser) IsDag(comp map[string]interface{}) bool
- func (p *Parser) ParseCache(cacheMap map[string]interface{}, cache *Cache) error
- func (p *Parser) ParseComponents(entryPoints map[string]interface{}) (map[string]Component, error)
- func (p *Parser) ParseDag(params map[string]interface{}, dagComp *WorkflowSourceDag) error
- func (p *Parser) ParseFsMount(fsMap map[string]interface{}, fs *FsMount) error
- func (p *Parser) ParseFsOptions(fsMap map[string]interface{}, fs *FsOptions) error
- func (p *Parser) ParseFsScope(fsMap map[string]interface{}, fs *FsScope) error
- func (p *Parser) ParseStep(params map[string]interface{}, step *WorkflowSourceStep) error
- func (p *Parser) ParseWorkflowSource(bodyMap map[string]interface{}, wfs *WorkflowSource) error
- func (p *Parser) TransJsonMap2Yaml(jsonMap map[string]interface{}) error
- type PostProcessView
- type QuotaSummary
- type Reference
- type ResourceInfo
- type ResourceName
- type RunOptions
- type RuntimeView
- type ScalarResourcesType
- type TaskLogInfo
- type TaskStatus
- type WorkflowSource
- func (wfs *WorkflowSource) GetComponentByFullName(fullName string) (Component, error)
- func (wfs *WorkflowSource) GetCompsMapAndRelName(components map[string]Component, absoluteName string) (map[string]Component, string, bool)
- func (wfs *WorkflowSource) GetDisabled() []string
- func (wfs *WorkflowSource) GetFsMounts() ([]FsMount, error)
- func (wfs *WorkflowSource) IsDisabled(componentName string) (bool, error)
- func (wfs *WorkflowSource) ProcessRuntimeComponents(components map[string]Component, componentType string, ...) error
- func (wfs *WorkflowSource) TransToRunYamlRaw() (runYamlRaw string, err error)
- func (wfs *WorkflowSource) UnmarshalJSON(data []byte) error
- type WorkflowSourceDag
- func (d *WorkflowSourceDag) DeepCopy() Component
- func (d *WorkflowSourceDag) GetArtifactPath(artName string) (string, error)
- func (d *WorkflowSourceDag) GetArtifacts() Artifacts
- func (d *WorkflowSourceDag) GetCondition() string
- func (d *WorkflowSourceDag) GetDeps() []string
- func (d *WorkflowSourceDag) GetInputArtifactPath(artName string) (string, error)
- func (d *WorkflowSourceDag) GetLoopArgument() interface{}
- func (d *WorkflowSourceDag) GetLoopArgumentLength() int
- func (d *WorkflowSourceDag) GetName() string
- func (d *WorkflowSourceDag) GetOutputArtifactPath(artName string) (string, error)
- func (d *WorkflowSourceDag) GetParameterValue(paramName string) (interface{}, error)
- func (d *WorkflowSourceDag) GetParameters() map[string]interface{}
- func (d *WorkflowSourceDag) GetSubComponet(subComponentName string) (Component, bool)
- func (d *WorkflowSourceDag) GetType() string
- func (d *WorkflowSourceDag) InitInputArtifacts()
- func (d *WorkflowSourceDag) InitOutputArtifacts()
- func (d *WorkflowSourceDag) InitParameters()
- func (d *WorkflowSourceDag) UpdateCondition(condition string)
- func (d *WorkflowSourceDag) UpdateDeps(deps string)
- func (d *WorkflowSourceDag) UpdateLoopArguemt(loopArgument interface{})
- func (d *WorkflowSourceDag) UpdateName(name string)
- type WorkflowSourceStep
- func (s *WorkflowSourceStep) DeepCopy() Component
- func (s *WorkflowSourceStep) GetArtifactPath(artName string) (string, error)
- func (s *WorkflowSourceStep) GetArtifacts() Artifacts
- func (s *WorkflowSourceStep) GetCondition() string
- func (s *WorkflowSourceStep) GetDeps() []string
- func (s *WorkflowSourceStep) GetInputArtifactPath(artName string) (string, error)
- func (s *WorkflowSourceStep) GetLoopArgument() interface{}
- func (s *WorkflowSourceStep) GetLoopArgumentLength() int
- func (s *WorkflowSourceStep) GetName() string
- func (s *WorkflowSourceStep) GetOutputArtifactPath(artName string) (string, error)
- func (s *WorkflowSourceStep) GetParameterValue(paramName string) (interface{}, error)
- func (s *WorkflowSourceStep) GetParameters() map[string]interface{}
- func (s *WorkflowSourceStep) GetType() string
- func (s *WorkflowSourceStep) InitInputArtifacts()
- func (s *WorkflowSourceStep) InitOutputArtifacts()
- func (s *WorkflowSourceStep) InitParameters()
- func (s *WorkflowSourceStep) UpdateCondition(condition string)
- func (s *WorkflowSourceStep) UpdateDeps(deps string)
- func (s *WorkflowSourceStep) UpdateLoopArguemt(loopArgument interface{})
- func (s *WorkflowSourceStep) UpdateName(name string)
Constants ¶
const ( LocalType = "Local" KubernetesType = "Kubernetes" )
const ( PFSTypeLocal = "local" PVNameTemplate = "pfs-$(pfs.fs.id)-$(namespace)-pv" PVCNameTemplate = "pfs-$(pfs.fs.id)-pvc" FSIDFormat = "$(pfs.fs.id)" NameSpaceFormat = "$(namespace)" PFSID = "pfs.fs.id" PFSInfo = "pfs.fs.info" PFSCache = "pfs.fs.cache" PFSServer = "pfs.server" PFSClusterID = "pfs.cluster.id" FusePodMntDir = "/home/paddleflow/mnt" FsMetaMemory = "mem" FsMetaDisk = "disk" FuseKeyFsInfo = "fs-info" LabelKeyFsID = "fsID" LabelKeyCacheID = "cacheID" LabelKeyNodeName = "nodename" LabelKeyUsedSize = "usedSize" AnnotationKeyCacheDir = "cacheDir" AnnotationKeyMTime = "modifiedTime" AnnotationKeyMountPrefix = "mount-" EnvKeyMountPodName = "POD_NAME" EnvKeyNamespace = "NAMESPACE" MountPodNamespace = "paddleflow" )
const ( EnvJobType = "PF_JOB_TYPE" EnvJobQueueName = "PF_JOB_QUEUE_NAME" EnvJobQueueID = "PF_JOB_QUEUE_ID" EnvJobClusterName = "PF_JOB_CLUSTER_NAME" EnvJobClusterID = "PF_JOB_CLUSTER_ID" EnvJobNamespace = "PF_JOB_NAMESPACE" EnvJobUserName = "PF_USER_NAME" EnvJobFsID = "PF_FS_ID" EnvJobPVCName = "PF_JOB_PVC_NAME" EnvJobPriority = "PF_JOB_PRIORITY" EnvJobMode = "PF_JOB_MODE" EnvJobFramework = "PF_JOB_FRAMEWORK" // EnvJobYamlPath Additional configuration for a specific job EnvJobYamlPath = "PF_JOB_YAML_PATH" EnvIsCustomYaml = "PF_IS_CUSTOM_YAML" // EnvJobWorkDir The working directory of the job, `null` means command without a working directory EnvJobWorkDir = "PF_WORK_DIR" EnvMountPath = "PF_MOUNT_PATH" EnvJobRestartPolicy = "PF_JOB_RESTART_POLICY" // EnvJobModePS env EnvJobModePS = "PS" EnvJobPSPort = "PF_JOB_PS_PORT" EnvJobPServerReplicas = "PF_JOB_PSERVER_REPLICAS" EnvJobPServerFlavour = "PF_JOB_PSERVER_FLAVOUR" EnvJobPServerCommand = "PF_JOB_PSERVER_COMMAND" EnvJobWorkerReplicas = "PF_JOB_WORKER_REPLICAS" EnvJobWorkerFlavour = "PF_JOB_WORKER_FLAVOUR" EnvJobWorkerCommand = "PF_JOB_WORKER_COMMAND" // EnvJobModeCollective env EnvJobModeCollective = "Collective" EnvJobReplicas = "PF_JOB_REPLICAS" EnvJobFlavour = "PF_JOB_FLAVOUR" // EnvJobModePod env reuse EnvJobReplicas and EnvJobFlavour EnvJobModePod = "Pod" // spark job env EnvJobSparkMainFile = "PF_JOB_SPARK_MAIN_FILE" EnvJobSparkMainClass = "PF_JOB_SPARK_MAIN_CLASS" EnvJobSparkArguments = "PF_JOB_SPARK_ARGUMENTS" EnvJobDriverFlavour = "PF_JOB_DRIVER_FLAVOUR" EnvJobExecutorReplicas = "PF_JOB_EXECUTOR_REPLICAS" EnvJobExecutorFlavour = "PF_JOB_EXECUTOR_FLAVOUR" // TODO move to framework TypeVcJob JobType = "vcjob" TypeSparkJob JobType = "spark" TypePaddleJob JobType = "paddlejob" TypePodJob JobType = "pod" StatusJobInit JobStatus = "init" StatusJobPending JobStatus = "pending" StatusJobRunning JobStatus = "running" StatusJobFailed JobStatus = "failed" StatusJobSucceeded JobStatus = "succeeded" StatusJobTerminating JobStatus = "terminating" StatusJobTerminated JobStatus = "terminated" StatusJobCancelled JobStatus = "cancelled" StatusJobSkipped JobStatus = "skipped" StatusTaskPending TaskStatus = "pending" StatusTaskRunning TaskStatus = "running" StatusTaskSucceeded TaskStatus = "succeeded" StatusTaskFailed TaskStatus = "failed" RoleMaster MemberRole = "master" RoleWorker MemberRole = "worker" RoleDriver MemberRole = "driver" RoleExecutor MemberRole = "executor" RolePServer MemberRole = "pserver" RolePWorker MemberRole = "pworker" TypeSingle JobType = "single" TypeDistributed JobType = "distributed" TypeWorkflow JobType = "workflow" FrameworkSpark Framework = "spark" FrameworkMPI Framework = "mpi" FrameworkTF Framework = "tensorflow" FrameworkPytorch Framework = "pytorch" FrameworkPaddle Framework = "paddle" FrameworkMXNet Framework = "mxnet" FrameworkRay Framework = "ray" FrameworkStandalone Framework = "standalone" ListenerTypeJob = "job" ListenerTypeTask = "task" ListenerTypeQueue = "queue" // job priority EnvJobVeryLowPriority = "VERY_LOW" EnvJobLowPriority = "LOW" EnvJobNormalPriority = "NORMAL" EnvJobHighPriority = "HIGH" EnvJobVeryHighPriority = "VERY_HIGH" // priority class PriorityClassVeryLow = "very-low" PriorityClassLow = "low" PriorityClassNormal = "normal" PriorityClassHigh = "high" PriorityClassVeryHigh = "very-high" JobOwnerLabel = "owner" JobOwnerValue = "paddleflow" JobIDLabel = "paddleflow-job-id" JobTTLSeconds = "padleflow/job-ttl-seconds" JobLabelFramework = "paddleflow-job-framework" VolcanoJobNameLabel = "volcano.sh/job-name" QueueLabelKey = "volcano.sh/queue-name" SparkAPPJobNameLabel = "sparkoperator.k8s.io/app-name" JobPrefix = "job" DefaultSchedulerName = "volcano" DefaultFSMountPath = "/home/paddleflow/storage/mnt" // EnvPaddleParaJob defines env for Paddle Para Job EnvPaddleParaJob = "PF_PADDLE_PARA_JOB" EnvPaddleParaPriority = "PF_PADDLE_PARA_PRIORITY" EnvPaddleParaConfigHostFile = "PF_PADDLE_PARA_CONFIG_FILE" // PaddleParaVolumeName defines config for Paddle Para Pod PaddleParaVolumeName = "paddle-para-conf-volume" PaddleParaAnnotationKeyJobName = "paddle-para/job-name" PaddleParaAnnotationKeyPriority = "paddle-para/priority" PaddleParaEnvJobName = "FLAGS_job_name" PaddleParaEnvGPUConfigFile = "GPU_CONFIG_FILE" PaddleParaGPUConfigFilePath = "/opt/paddle/para/gpu_config.json" // RayJob keywords EnvRayJobEntryPoint = "RAY_JOB_ENTRY_POINT" EnvRayJobRuntimeEnv = "RAY_JOB_RUNTIME_ENV" EnvRayJobEnableAutoScaling = "RAY_JOB_ENABLE_AUTOSCALING" EnvRayJobAutoScalingMode = "RAY_JOB_AUTOSCALING_MODE" EnvRayJobAutoScalingTimeout = "RAY_JOB_AUTOSCALING_IDLE_TIMEOUT" EnvRayJobHeaderFlavour = "RAY_JOB_HEADER_FLAVOUR" EnvRayJobHeaderImage = "RAY_JOB_HEADER_IMAGE" EnvRayJobHeaderPriority = "RAY_JOB_HEADER_PRIORITY" EnvRayJobHeaderPreStop = "RAY_JOB_HEADER_PRE_STOP" EnvRayJobHeaderStartParamsPrefix = "RAY_JOB_HEADER_START_PARAMS_" EnvRayJobWorkerGroupName = "RAY_JOB_WORKER_GROUP_NAME" EnvRayJobWorkerFlavour = "RAY_JOB_WORKER_FLAVOUR" EnvRayJobWorkerImage = "RAY_JOB_WORKER_IMAGE" EnvRayJobWorkerPriority = "RAY_JOB_WORKER_PRIORITY" EnvRayJobWorkerReplicas = "RAY_JOB_WORKER_REPLICAS" EnvRayJobWorkerMinReplicas = "RAY_JOB_WORKER_MIN_REPLICAS" EnvRayJobWorkerMaxReplicas = "RAY_JOB_WORKER_MAX_REPLICAS" EnvRayJobWorkerStartParamsPrefix = "RAY_JOB_WORKER_START_PARAMS_" )
const ( StatusQueueCreating = "creating" StatusQueueOpen = "open" StatusQueueUpdating = "updating" StatusQueueClosing = "closing" StatusQueueClosed = "closed" TypeElasticQuota = "elasticQuota" TypeVolcanoCapabilityQuota = "volcanoCapabilityQuota" )
const ( ArtifactTypeInput = "input" ArtifactTypeOutput = "output" EntryPointsStr = "entry_points" CacheAttributeEnable = "enable" CacheAttributeMaxExpiredTime = "max_expired_time" CacheAttributeFsScope = "fs_scope" FailureStrategyFailFast = "fail_fast" FailureStrategyContinue = "continue" EnvDockerEnv = "dockerEnv" FsPrefix = "fs-" CompTypeComponents = "components" CompTypeEntryPoints = "entryPoints" CompTypePostProcess = "postProcess" )
Variables ¶
This section is empty.
Functions ¶
func CheckScalarResource ¶
func ConcatenatePVCName ¶ added in v0.14.3
func ConcatenatePVName ¶ added in v0.14.3
func GetBindSource ¶
func IsEmptyResource ¶
func IsEmptyResource(resourceInfo ResourceInfo) bool
IsEmptyResource return true when cpu or mem is nil
func IsImmutableJobStatus ¶
func IsValidFsMetaDriver ¶
func ProcessStepCacheByMap ¶ added in v0.14.3
func ProcessStepFsMount ¶ added in v0.14.3
func RunYaml2Map ¶ added in v0.14.3
将yaml解析为map
func ValidateResource ¶ added in v0.14.3
func ValidateResource(resourceInfo ResourceInfo, scalarResourcesType []string) error
ValidateResource validate resource info
func ValidateResourceItem ¶ added in v0.14.3
ValidateResourceItem check resource for cpu or memory
func ValidateScalarResourceInfo ¶
func ValidateScalarResourceInfo(scalarResources ScalarResourcesType, scalarResourcesType []string) error
ValidateScalarResourceInfo validate scalar resource info
Types ¶
type ActionType ¶
type ActionType string
const ( Create ActionType = "create" Update ActionType = "update" Delete ActionType = "delete" Terminate ActionType = "terminate" )
type Artifacts ¶
type Artifacts struct {
Input map[string]string `yaml:"input" json:"input"`
Output map[string]string `yaml:"output" json:"output"`
}
func (*Artifacts) ValidateOutputMapByList ¶
type ClientOptions ¶
ClientOptions used to build rest config.
type Cluster ¶
type Cluster struct {
ID string
Name string
Type string
// ClientOpt defines client config for cluster
ClientOpt ClientOptions
}
type Component ¶ added in v0.14.3
type Component interface {
GetDeps() []string
GetArtifacts() Artifacts
GetArtifactPath(artName string) (string, error)
GetInputArtifactPath(artName string) (string, error)
GetOutputArtifactPath(artName string) (string, error)
GetParameters() map[string]interface{}
GetParameterValue(paramName string) (interface{}, error)
GetCondition() string
GetLoopArgument() interface{}
GetLoopArgumentLength() int
GetType() string
GetName() string
// 下面几个Update 函数在进行模板替换的时候会用到
UpdateCondition(string)
UpdateLoopArguemt(interface{})
UpdateName(name string)
UpdateDeps(deps string)
InitInputArtifacts()
InitOutputArtifacts()
InitParameters()
// 用于 deepCopy, 避免复用时出现问题
DeepCopy() Component
}
Component包括Dag和Step,有Struct WorkflowSourceStep 和 WorkflowSourceDag实现了该接口
type ComponentView ¶ added in v0.14.3
type Conf ¶
type Conf struct {
Name string `json:"name"`
// 存储资源
FileSystem FileSystem `json:"fs,omitempty"`
ExtraFileSystem []FileSystem `json:"extraFS,omitempty"`
// 计算资源
Flavour Flavour `json:"flavour,omitempty"`
Priority string `json:"priority"`
ClusterID string `json:"clusterID"`
QueueID string `json:"queueID"`
QueueName string `json:"queueName,omitempty"`
// 运行时需要的参数
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
Env map[string]string `json:"env,omitempty"`
Command string `json:"command,omitempty"`
Image string `json:"image"`
Port int `json:"port,omitempty"`
Args []string `json:"args,omitempty"`
}
func (*Conf) GetAllFileSystem ¶ added in v0.14.3
func (c *Conf) GetAllFileSystem() []FileSystem
GetAllFileSystem combine FileSystem and ExtraFileSystem to a slice
func (*Conf) GetAnnotations ¶ added in v0.14.5
func (*Conf) GetClusterID ¶
func (*Conf) GetCommand ¶
func (*Conf) GetEnvSubset ¶ added in v0.14.5
func (*Conf) GetEnvValue ¶ added in v0.14.5
func (*Conf) GetExtraFS ¶ added in v0.14.3
func (c *Conf) GetExtraFS() []FileSystem
func (*Conf) GetFileSystem ¶ added in v0.14.3
func (c *Conf) GetFileSystem() FileSystem
func (*Conf) GetFlavour ¶
func (*Conf) GetJobExecutorReplicas ¶
func (*Conf) GetJobMode ¶
func (*Conf) GetJobReplicas ¶
func (*Conf) GetNamespace ¶
func (*Conf) GetPSCommand ¶
func (*Conf) GetPSFlavour ¶
func (*Conf) GetPSReplicas ¶
func (*Conf) GetPriority ¶
func (*Conf) GetQueueID ¶
func (*Conf) GetQueueName ¶
func (*Conf) GetRestartPolicy ¶ added in v0.14.5
func (*Conf) GetUserName ¶
func (*Conf) GetWorkerCommand ¶
func (*Conf) GetWorkerFlavour ¶
func (*Conf) GetWorkerReplicas ¶
func (*Conf) GetYamlPath ¶
func (*Conf) SetAnnotations ¶
func (*Conf) SetClusterID ¶
func (*Conf) SetFlavour ¶
func (*Conf) SetNamespace ¶
func (*Conf) SetPSFlavour ¶
func (*Conf) SetPriority ¶
func (*Conf) SetQueueID ¶
func (*Conf) SetQueueName ¶
SetQueueName set queue name
func (*Conf) SetUserName ¶
func (*Conf) SetWorkerFlavour ¶
type DagView ¶ added in v0.14.3
type DagView struct {
PK int64 `json:"-"`
DagID string `json:"id"`
Name string `json:"name"`
Type string `json:"type"`
DagName string `json:"dagName"`
ParentDagID string `json:"parentDagID"`
LoopSeq int `json:"-"`
Deps string `json:"deps"`
Parameters map[string]string `json:"parameters"`
Artifacts Artifacts `json:"artifacts"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
Status JobStatus `json:"status"`
Message string `json:"message"`
EntryPoints map[string][]ComponentView `json:"entryPoints"`
}
func (DagView) GetComponentName ¶ added in v0.14.3
func (DagView) GetEndTime ¶ added in v0.14.5
func (DagView) GetParentDagID ¶ added in v0.14.3
func (DagView) GetStartTime ¶ added in v0.14.5
type FailureOptions ¶
type FailureOptions struct {
Strategy string `yaml:"strategy" json:"strategy"`
}
type FileSystem ¶
type FileSystem struct {
ID string `json:"id,omitempty"`
Name string `json:"name"`
Type string `json:"type"`
HostPath string `json:"hostPath,omitempty"`
MountPath string `json:"mountPath,omitempty"`
SubPath string `json:"subPath,omitempty"`
ReadOnly bool `json:"readOnly,omitempty"`
}
FileSystem indicate PaddleFlow
type Flavour ¶
type Flavour struct {
ResourceInfo `yaml:",inline"`
Name string `json:"name" yaml:"name"`
}
Flavour is a set of resources that can be used to run a job.
type FrameworkVersion ¶ added in v0.14.5
type FrameworkVersion struct {
Framework string `json:"framework"`
APIVersion string `json:"apiVersion"`
}
func NewFrameworkVersion ¶ added in v0.14.5
func NewFrameworkVersion(framework, apiVersion string) FrameworkVersion
func (*FrameworkVersion) String ¶ added in v0.14.5
func (f *FrameworkVersion) String() string
type JobLogInfo ¶
type JobLogInfo struct {
JobID string `json:"jobID"`
TaskList []TaskLogInfo `json:"taskList"`
}
type JobLogRequest ¶
type JobView ¶
type JobView struct {
PK int64 `json:"-"`
JobID string `json:"jobID"`
Name string `json:"name"`
Type string `json:"type"`
StepName string `json:"stepName"`
ParentDagID string `json:"parentDagID"`
LoopSeq int `json:"-"`
Command string `json:"command"`
Parameters map[string]string `json:"parameters"`
Env map[string]string `json:"env"`
ExtraFS []FsMount `json:"extraFS"`
StartTime string `json:"startTime"`
EndTime string `json:"endTime"`
Status JobStatus `json:"status"`
Deps string `json:"deps"`
DockerEnv string `json:"dockerEnv"`
Artifacts Artifacts `json:"artifacts"`
Cache Cache `json:"cache"`
JobMessage string `json:"jobMessage"`
CacheRunID string `json:"cacheRunID"`
CacheJobID string `json:"cacheJobID"`
}
JobView is view of job info responded to user, while Job is for pipeline and job engine to process
func (JobView) GetComponentName ¶ added in v0.14.3
func (JobView) GetEndTime ¶ added in v0.14.5
func (JobView) GetParentDagID ¶ added in v0.14.3
func (JobView) GetStartTime ¶ added in v0.14.5
type LogRunArtifactRequest ¶
type LogRunArtifactRequest struct {
Md5 string `json:"md5"`
RunID string `json:"runID"`
FsID string `json:"fsID"`
FsName string `json:"fsname"`
UserName string `json:"username"`
ArtifactPath string `json:"artifactPath"`
Step string `json:"step"`
JobID string `json:"jobID"`
Type string `json:"type"`
ArtifactName string `json:"artifactName"`
Meta string `json:"meta"`
}
type LogRunCacheRequest ¶
type LogRunCacheRequest struct {
FirstFp string `json:"firstFp"`
SecondFp string `json:"secondFp"`
Source string `json:"source"`
RunID string `json:"runID"`
Step string `json:"step"`
JobID string `json:"jobID"`
FsID string `json:"fsID"`
FsName string `json:"fsname"`
UserName string `json:"username"`
ExpiredTime string `json:"expiredTime"`
Strategy string `json:"strategy"`
}
type Member ¶ added in v0.14.5
type Member struct {
ID string `json:"id"`
Replicas int `json:"replicas"`
Role MemberRole `json:"role"`
Conf `json:",inline"`
}
type MemberRole ¶
type MemberRole string
type NodeQuotaInfo ¶
type PFJobConf ¶
type PFJobConf interface {
GetName() string
GetEnv() map[string]string
GetEnvValue(key string) string
GetEnvSubset(prefix string) map[string]string
GetCommand() string
GetImage() string
GetFileSystem() FileSystem
GetExtraFS() []FileSystem
GetArgs() []string
GetPriority() string
SetPriority(string)
GetQueueName() string
GetQueueID() string
GetClusterID() string
GetUserName() string
// Deprecated
GetFS() string
SetFS(string)
GetYamlPath() string
GetNamespace() string
GetJobMode() string
GetFlavour() string
GetPSFlavour() string
GetWorkerFlavour() string
SetQueueID(string)
SetClusterID(string)
SetNamespace(string)
SetEnv(string, string)
SetLabels(string, string)
SetAnnotations(string, string)
Type() JobType
Framework() Framework
}
type Parser ¶ added in v0.14.3
type Parser struct {
}
func (*Parser) ParseCache ¶ added in v0.14.3
func (*Parser) ParseComponents ¶ added in v0.14.3
func (*Parser) ParseDag ¶ added in v0.14.3
func (p *Parser) ParseDag(params map[string]interface{}, dagComp *WorkflowSourceDag) error
该函数用于给生成给WorkflowSourceDag的各个字段赋值,但不会进行默认值填充,不会进行全局参数对局部参数的替换
func (*Parser) ParseFsMount ¶ added in v0.14.3
func (*Parser) ParseFsOptions ¶ added in v0.14.3
func (*Parser) ParseFsScope ¶ added in v0.14.3
func (*Parser) ParseStep ¶ added in v0.14.3
func (p *Parser) ParseStep(params map[string]interface{}, step *WorkflowSourceStep) error
func (*Parser) ParseWorkflowSource ¶ added in v0.14.3
func (p *Parser) ParseWorkflowSource(bodyMap map[string]interface{}, wfs *WorkflowSource) error
该函数将请求体解析成WorkflowSource, 该函数未完成全局替换操作
func (*Parser) TransJsonMap2Yaml ¶ added in v0.14.3
type PostProcessView ¶
type QuotaSummary ¶
type Reference ¶ added in v0.14.3
type Reference struct {
Component string `yaml:"component" json:"component"`
}
type ResourceInfo ¶
type ResourceInfo struct {
CPU string `json:"cpu" yaml:"cpu"`
Mem string `json:"mem" yaml:"mem"`
ScalarResources ScalarResourcesType `json:"scalarResources,omitempty" yaml:"scalarResources,omitempty"`
}
ResourceInfo is a struct that contains the information of a resource.
func (ResourceInfo) ToMap ¶ added in v0.14.3
func (r ResourceInfo) ToMap() map[string]string
type ResourceName ¶
type ResourceName string
ResourceName is the name identifying various resources in a ResourceList.
type RunOptions ¶ added in v0.14.3
type RuntimeView ¶
type RuntimeView map[string][]ComponentView
RuntimeView is view of run responded to user, while workflowRuntime is for pipeline engine to process
func (*RuntimeView) UnmarshalJSON ¶ added in v0.14.5
func (rv *RuntimeView) UnmarshalJSON(data []byte) error
type ScalarResourcesType ¶
type ScalarResourcesType map[ResourceName]string
ScalarResourcesType is the type of scalar resources
type TaskLogInfo ¶
type TaskStatus ¶
type TaskStatus string
type WorkflowSource ¶
type WorkflowSource struct {
Name string `yaml:"name" json:"name"`
DockerEnv string `yaml:"docker_env" json:"dockerEnv"`
EntryPoints WorkflowSourceDag `yaml:"entry_points" json:"entryPoints"`
Components map[string]Component `yaml:"components" json:"components"`
Cache Cache `yaml:"cache" json:"cache"`
Parallelism int `yaml:"parallelism" json:"parallelism"`
Disabled string `yaml:"disabled" json:"disabled"`
FailureOptions FailureOptions `yaml:"failure_options" json:"failureOptions"`
PostProcess map[string]*WorkflowSourceStep `yaml:"post_process" json:"postProcess"`
FsOptions FsOptions `yaml:"fs_options" json:"fsOptions"`
}
func GetWorkflowSource ¶ added in v0.14.3
func GetWorkflowSource(runYaml []byte) (WorkflowSource, error)
该函数除了将yaml解析为wfs,还进行了全局参数替换操作
func GetWorkflowSourceByMap ¶ added in v0.14.3
func GetWorkflowSourceByMap(yamlMap map[string]interface{}) (WorkflowSource, error)
由Map解析得到一个Wfs,该Map中的key需要是下划线格式
func (*WorkflowSource) GetComponentByFullName ¶ added in v0.14.3
func (wfs *WorkflowSource) GetComponentByFullName(fullName string) (Component, error)
func (*WorkflowSource) GetCompsMapAndRelName ¶ added in v0.14.3
func (wfs *WorkflowSource) GetCompsMapAndRelName(components map[string]Component, absoluteName string) (map[string]Component, string, bool)
递归的检查Absolute Name对应的Component是否存在,并返回该Comp的所有同级别节点,和它的Relative Name
func (*WorkflowSource) GetDisabled ¶
func (wfs *WorkflowSource) GetDisabled() []string
func (*WorkflowSource) GetFsMounts ¶ added in v0.14.3
func (wfs *WorkflowSource) GetFsMounts() ([]FsMount, error)
func (*WorkflowSource) IsDisabled ¶
func (wfs *WorkflowSource) IsDisabled(componentName string) (bool, error)
func (*WorkflowSource) ProcessRuntimeComponents ¶ added in v0.14.3
func (wfs *WorkflowSource) ProcessRuntimeComponents(components map[string]Component, componentType string, yamlMap map[string]interface{}, componentsMap map[string]interface{}) error
对Step的DockerEnv、Cache进行全局替换
func (*WorkflowSource) TransToRunYamlRaw ¶ added in v0.14.3
func (wfs *WorkflowSource) TransToRunYamlRaw() (runYamlRaw string, err error)
func (*WorkflowSource) UnmarshalJSON ¶ added in v0.14.5
func (wfs *WorkflowSource) UnmarshalJSON(data []byte) error
type WorkflowSourceDag ¶ added in v0.14.3
type WorkflowSourceDag struct {
Name string `yaml:"-" json:"name"`
Type string `yaml:"-" json:"type"`
LoopArgument interface{} `yaml:"loop_argument" json:"loopArgument"`
Condition string `yaml:"condition" json:"condition"`
Parameters map[string]interface{} `yaml:"parameters" json:"parameters"`
Deps string `yaml:"deps" json:"deps"`
Artifacts Artifacts `yaml:"artifacts" json:"artifacts"`
EntryPoints map[string]Component `yaml:"entry_points" json:"entryPoints"`
}
func (*WorkflowSourceDag) DeepCopy ¶ added in v0.14.3
func (d *WorkflowSourceDag) DeepCopy() Component
func (*WorkflowSourceDag) GetArtifactPath ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetArtifactPath(artName string) (string, error)
获取 artifact 的路径
func (*WorkflowSourceDag) GetArtifacts ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetArtifacts() Artifacts
func (*WorkflowSourceDag) GetCondition ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetCondition() string
func (*WorkflowSourceDag) GetDeps ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetDeps() []string
func (*WorkflowSourceDag) GetInputArtifactPath ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetInputArtifactPath(artName string) (string, error)
获取 输入artifact的存储路径
func (*WorkflowSourceDag) GetLoopArgument ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetLoopArgument() interface{}
func (*WorkflowSourceDag) GetLoopArgumentLength ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetLoopArgumentLength() int
func (*WorkflowSourceDag) GetName ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetName() string
func (*WorkflowSourceDag) GetOutputArtifactPath ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetOutputArtifactPath(artName string) (string, error)
获取输出artifact的存储路径
func (*WorkflowSourceDag) GetParameterValue ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetParameterValue(paramName string) (interface{}, error)
获取指定 parameter 的值
func (*WorkflowSourceDag) GetParameters ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetParameters() map[string]interface{}
func (*WorkflowSourceDag) GetSubComponet ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetSubComponet(subComponentName string) (Component, bool)
func (*WorkflowSourceDag) GetType ¶ added in v0.14.3
func (d *WorkflowSourceDag) GetType() string
func (*WorkflowSourceDag) InitInputArtifacts ¶ added in v0.14.3
func (d *WorkflowSourceDag) InitInputArtifacts()
func (*WorkflowSourceDag) InitOutputArtifacts ¶ added in v0.14.3
func (d *WorkflowSourceDag) InitOutputArtifacts()
func (*WorkflowSourceDag) InitParameters ¶ added in v0.14.3
func (d *WorkflowSourceDag) InitParameters()
func (*WorkflowSourceDag) UpdateCondition ¶ added in v0.14.3
func (d *WorkflowSourceDag) UpdateCondition(condition string)
func (*WorkflowSourceDag) UpdateDeps ¶ added in v0.14.3
func (d *WorkflowSourceDag) UpdateDeps(deps string)
func (*WorkflowSourceDag) UpdateLoopArguemt ¶ added in v0.14.3
func (d *WorkflowSourceDag) UpdateLoopArguemt(loopArgument interface{})
func (*WorkflowSourceDag) UpdateName ¶ added in v0.14.3
func (d *WorkflowSourceDag) UpdateName(name string)
type WorkflowSourceStep ¶
type WorkflowSourceStep struct {
Name string `yaml:"-" json:"name"`
LoopArgument interface{} `yaml:"loop_argument" json:"loopArgument"`
Condition string `yaml:"condition" json:"condition"`
Parameters map[string]interface{} `yaml:"parameters" json:"parameters"`
Command string `yaml:"command" json:"command"`
Deps string `yaml:"deps" json:"deps"`
Artifacts Artifacts `yaml:"artifacts" json:"artifacts"`
Env map[string]string `yaml:"env" json:"env"`
DockerEnv string `yaml:"docker_env" json:"dockerEnv"`
Cache Cache `yaml:"cache" json:"cache"`
Reference Reference `yaml:"reference" json:"reference"`
ExtraFS []FsMount `yaml:"extra_fs" json:"extraFS"`
}
func (*WorkflowSourceStep) DeepCopy ¶ added in v0.14.3
func (s *WorkflowSourceStep) DeepCopy() Component
func (*WorkflowSourceStep) GetArtifactPath ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetArtifactPath(artName string) (string, error)
获取 artifact 的路径
func (*WorkflowSourceStep) GetArtifacts ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetArtifacts() Artifacts
func (*WorkflowSourceStep) GetCondition ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetCondition() string
func (*WorkflowSourceStep) GetDeps ¶
func (s *WorkflowSourceStep) GetDeps() []string
func (*WorkflowSourceStep) GetInputArtifactPath ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetInputArtifactPath(artName string) (string, error)
获取 输入artifact的存储路径
func (*WorkflowSourceStep) GetLoopArgument ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetLoopArgument() interface{}
func (*WorkflowSourceStep) GetLoopArgumentLength ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetLoopArgumentLength() int
func (*WorkflowSourceStep) GetName ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetName() string
func (*WorkflowSourceStep) GetOutputArtifactPath ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetOutputArtifactPath(artName string) (string, error)
获取输出artifact的存储路径
func (*WorkflowSourceStep) GetParameterValue ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetParameterValue(paramName string) (interface{}, error)
获取指定 parameter 的值
func (*WorkflowSourceStep) GetParameters ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetParameters() map[string]interface{}
func (*WorkflowSourceStep) GetType ¶ added in v0.14.3
func (s *WorkflowSourceStep) GetType() string
func (*WorkflowSourceStep) InitInputArtifacts ¶ added in v0.14.3
func (s *WorkflowSourceStep) InitInputArtifacts()
func (*WorkflowSourceStep) InitOutputArtifacts ¶ added in v0.14.3
func (s *WorkflowSourceStep) InitOutputArtifacts()
func (*WorkflowSourceStep) InitParameters ¶ added in v0.14.3
func (s *WorkflowSourceStep) InitParameters()
func (*WorkflowSourceStep) UpdateCondition ¶ added in v0.14.3
func (s *WorkflowSourceStep) UpdateCondition(condition string)
func (*WorkflowSourceStep) UpdateDeps ¶ added in v0.14.3
func (s *WorkflowSourceStep) UpdateDeps(deps string)
func (*WorkflowSourceStep) UpdateLoopArguemt ¶ added in v0.14.3
func (s *WorkflowSourceStep) UpdateLoopArguemt(loopArgument interface{})
func (*WorkflowSourceStep) UpdateName ¶ added in v0.14.3
func (s *WorkflowSourceStep) UpdateName(name string)