Documentation
¶
Index ¶
- Constants
- type DescribePipelineTaskRequest
- type PIPELINE_TASK_STATUS
- type PipelineSpec
- type PipelineStatus
- func (r *PipelineStatus) ExportDAGGraph() (*dag.GraphData, error)
- func (r *PipelineStatus) Failed(msg string)
- func (r *PipelineStatus) GetDAG() (*dag.DAG, error)
- func (r *PipelineStatus) GetDAGStatus() (dag.DAGStatus, error)
- func (r *PipelineStatus) GetProgress() (completed int, total int, percentage float64)
- func (r *PipelineStatus) GetStatusSummary() (map[dag.NodeStatus]int, error)
- func (r *PipelineStatus) InitDAG() error
- func (r *PipelineStatus) IsCompleted() bool
- func (r *PipelineStatus) IsSuccess() bool
- func (r *PipelineStatus) NextJobRunTasks() ([]*task.Task, error)
- func (r *PipelineStatus) PrintDAGGraph() (string, error)
- func (r *PipelineStatus) Running()
- func (r *PipelineStatus) SetEndAt(t time.Time)
- func (r *PipelineStatus) SetStartAt(t time.Time)
- func (r *PipelineStatus) Success(msg string)
- func (r *PipelineStatus) TableName() string
- func (r *PipelineStatus) UnsafeGetDAG() *dag.DAG
- type PipelineTask
- type QueryPipelineRunTask
- type RunPipelineRequest
- type Service
- type UpdatePipelineTaskStatusRequest
Constants ¶
View Source
const ( JOB_RUN_TASK_LABEL_KEY = "job_run_task_id" PIPELINE_RUN_TASK_LABEL_KEY = "pipeline_run_task_id" )
View Source
const ( INIT_CONTAINER_PARAM_LABEL_KEY = "init_container_index" CONTAINER_PARAM_LABEL_KEY = "container_index" )
View Source
const (
APP_NAME = "pipeline"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DescribePipelineTaskRequest ¶
type DescribePipelineTaskRequest struct {
PipelineTaskId string `json:"id"`
}
func NewDescribePipelineTaskRequest ¶
func NewDescribePipelineTaskRequest(pipelineTaskId string) *DescribePipelineTaskRequest
type PIPELINE_TASK_STATUS ¶
type PIPELINE_TASK_STATUS string
const ( PIPELINE_RUN_TASK_STATUS_PENDDING PIPELINE_TASK_STATUS = "" PIPELINE_RUN_TASK_STATUS_RUNNING PIPELINE_TASK_STATUS = "running" PIPELINE_RUN_TASK_STATUS_FAILED PIPELINE_TASK_STATUS = "failed" PIPELINE_RUN_TASK_STATUS_SUCCESS PIPELINE_TASK_STATUS = "success" )
type PipelineSpec ¶
type PipelineSpec struct {
// pipeline id
PipelineId string `json:"pipeline_id" validate:"required" gorm:"column:pipeline_id"`
// 试运行
DryRun bool `json:"dry_run" gorm:"column:dry_run"`
// 触发方式, 默认手工触发
TriggerMode trigger.MODE `json:"trigger_mode" gorm:"column:trigger_mode"`
// 执行人
RunBy string `json:"run_by" gorm:"column:run_by" validate:"required"`
// Pipeline 的运行时参数
RunParams []*job.RunParam `json:"run_params" gorm:"column:run_params;serializer:json;type:json"`
// 任务标签
Label map[string]string `json:"label" bson:"label" gorm:"column:label;serializer:json;type:json" description:"任务标签" optional:"true"`
// 额外的其他属性
Extras map[string]string `json:"extras" form:"extras" bson:"extras" gorm:"column:extras;type:json;serializer:json;"`
}
type PipelineStatus ¶
type PipelineStatus struct {
// 状态
Status PIPELINE_TASK_STATUS `json:"status" gorm:"column:status"`
// 任务开始时间
StartAt *time.Time `json:"start_at" gorm:"column:start_at"`
// 更新时间
UpdateAt *time.Time `json:"update_at" gorm:"column:update_at"`
// 更新人
UpdateBy string `json:"update_by" gorm:"column:update_by;type:varchar(60)"`
// 任务结束时间
EndAt *time.Time `json:"end_at" gorm:"column:end_at"`
// 状态描述
Message string `json:"message" gorm:"column:message"`
// 运行的任务信息, 单独交给 Task 服务管理
Tasks []*task.Task `json:"tasks" gorm:"-"`
// 如果有下一个需要运行的Pipeline Task 的id
NextPipelineTaskId string `json:"next_pipeline_task_id" gorm:"column:next_pipeline_task_id"`
// 运行下一个流水线失败
RunNextPipelineError string `json:"run_next_pipeline_error" gorm:"column:run_next_pipeline_error"`
// contains filtered or unexported fields
}
调度下一个需要运行的任务
func NewPipelineRunStatus ¶
func NewPipelineRunStatus() *PipelineStatus
func (*PipelineStatus) ExportDAGGraph ¶
func (r *PipelineStatus) ExportDAGGraph() (*dag.GraphData, error)
ExportDAGGraph 导出 DAG 图数据供 UI 展示 返回包含所有节点信息(名称、状态、依赖关系)的结构化数据
func (*PipelineStatus) Failed ¶
func (r *PipelineStatus) Failed(msg string)
func (*PipelineStatus) GetDAG ¶
func (r *PipelineStatus) GetDAG() (*dag.DAG, error)
GetDAG 获取 DAG 实例,如果未初始化则自动初始化
func (*PipelineStatus) GetDAGStatus ¶
func (r *PipelineStatus) GetDAGStatus() (dag.DAGStatus, error)
GetDAGStatus 获取 DAG 的整体状态
func (*PipelineStatus) GetProgress ¶
func (r *PipelineStatus) GetProgress() (completed int, total int, percentage float64)
GetProgress 获取执行进度
func (*PipelineStatus) GetStatusSummary ¶
func (r *PipelineStatus) GetStatusSummary() (map[dag.NodeStatus]int, error)
GetStatusSummary 获取状态统计摘要
func (*PipelineStatus) InitDAG ¶
func (r *PipelineStatus) InitDAG() error
InitDAG 初始化 DAG 实例 设计说明:从 Tasks 动态构建 DAG,不使用持久化 - 构建成本:O(N*D),N=任务数,D=平均依赖数,通常 <5ms - 优点:Tasks 是唯一数据源,无数据一致性问题 - 适用场景:Pipeline 不可变,任务数 <100,加载频率低
func (*PipelineStatus) IsCompleted ¶
func (r *PipelineStatus) IsCompleted() bool
IsCompleted 判断所有任务是否已完成
func (*PipelineStatus) NextJobRunTasks ¶
func (r *PipelineStatus) NextJobRunTasks() ([]*task.Task, error)
NextJobRunTasks 获取下一批可运行的任务 使用 DAG 的 GetNextRunnableTasks 方法动态计算
func (*PipelineStatus) PrintDAGGraph ¶
func (r *PipelineStatus) PrintDAGGraph() (string, error)
PrintDAGGraph 在终端以树形结构打印 DAG 按拓扑顺序分层展示,方便查看整体结构和依赖关系
func (*PipelineStatus) Running ¶
func (r *PipelineStatus) Running()
func (*PipelineStatus) SetEndAt ¶
func (r *PipelineStatus) SetEndAt(t time.Time)
func (*PipelineStatus) SetStartAt ¶
func (r *PipelineStatus) SetStartAt(t time.Time)
func (*PipelineStatus) Success ¶
func (r *PipelineStatus) Success(msg string)
func (*PipelineStatus) TableName ¶
func (r *PipelineStatus) TableName() string
func (*PipelineStatus) UnsafeGetDAG ¶
func (r *PipelineStatus) UnsafeGetDAG() *dag.DAG
UnsafeGetDAG 获取 DAG 实例(不初始化,可能返回 nil) 仅在确定 DAG 已初始化的场景使用
type PipelineTask ¶
type PipelineTask struct {
// 对象Id
Id string `json:"id" gorm:"column:id;primaryKey"`
// 创建时间
CreateAt time.Time `json:"create_at" gorm:"column:create_at"`
// 运行请求
RunPipelineRequest
}
func NewPipelineRunTask ¶
func NewPipelineRunTask(req *RunPipelineRequest) *PipelineTask
func (*PipelineTask) GetRunParamValue ¶
func (r *PipelineTask) GetRunParamValue(varName string) string
解析参数值变量, 比如 1. 其他task的输入中提取值 ${task_name.output_name} 2. pipeline RunParams中提取值 运行参数 ${var_name}
func (*PipelineTask) InjectInputParamValue ¶
func (r *PipelineTask) InjectInputParamValue(t *task.Task)
func (*PipelineTask) String ¶
func (r *PipelineTask) String() string
func (*PipelineTask) TableName ¶
func (r *PipelineTask) TableName() string
type QueryPipelineRunTask ¶
type QueryPipelineRunTask struct {
request.PageRequest
}
func NewQueryPipelineRunTask ¶
func NewQueryPipelineRunTask() *QueryPipelineRunTask
type RunPipelineRequest ¶
type RunPipelineRequest struct {
// Domain
Domain string `json:"domain" gorm:"column:domain"`
// Namespace
Namespace string `json:"namespace" gorm:"column:namespace"`
// Pipeline运行参数
PipelineSpec
// 运行后的状态
PipelineStatus
}
func NewRunPipelineRequest ¶
func NewRunPipelineRequest() *RunPipelineRequest
type Service ¶
type Service interface {
// Pipeline执行记录
QueryPipelineTask(context.Context, *QueryPipelineRunTask) (*types.Set[*PipelineTask], error)
// 查询详情
DescribePipelineTask(context.Context, *DescribePipelineTaskRequest) (*PipelineTask, error)
// 运行Pipeline
RunPipeline(context.Context, *RunPipelineRequest) (*PipelineTask, error)
// 更新Pipeline任务状态
UpdatePipelineTaskStatus(context.Context, *UpdatePipelineTaskStatusRequest) (*PipelineTask, error)
}
func GetService ¶
func GetService() Service
type UpdatePipelineTaskStatusRequest ¶
type UpdatePipelineTaskStatusRequest struct {
DescribePipelineTaskRequest
TaskId string `json:"task_id"`
}
func NewUpdatePipelineTaskStatusRequest ¶
func NewUpdatePipelineTaskStatusRequest(pipelineTaskId, taskId string) *UpdatePipelineTaskStatusRequest
Click to show internal directories.
Click to hide internal directories.