pipeline

package
v0.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: MIT Imports: 13 Imported by: 0

README

Runtime 模块说明

📋 模块概述

Runtime 模块是 MFlow 流水线的运行时引擎,负责流水线的执行调度、状态管理和任务编排。它基于 DAG(有向无环图)实现了灵活的任务依赖管理和并行执行能力。

🎯 核心功能

1. 流水线执行管理
  • 触发执行:根据 Pipeline 定义创建运行实例并启动执行
  • 异步调度:后台异步执行流水线任务,不阻塞 API 响应
  • 状态跟踪:实时记录流水线和任务的执行状态
2. 任务编排调度
  • DAG 解析:自动解析任务依赖关系,构建执行拓扑
  • 动态调度:使用 GetNextRunnableTasks 动态获取下一批可运行任务
  • 顺序保证:确保依赖任务先于被依赖任务执行
  • 状态感知:基于任务实时状态智能调度,支持并行和串行混合执行
3. 并发控制
  • 分布式锁:使用分布式锁防止流水线任务重复调度
  • 幂等性保护:多次调用不会重复执行同一流水线实例
  • 并发安全:支持跨进程/实例的并发执行
4. 参数传递
  • 上游输出注入:自动将上游任务的输出作为下游任务的输入
  • 运行时参数:支持流水线级别的运行参数传递
  • 变量解析:支持 ${task_name.output_key}${var_name} 两种变量格式

🏗️ 架构设计

┌─────────────────────────────────────────────────────────┐
│                    Runtime Service                       │
├─────────────────────────────────────────────────────────┤
│  - RunPipeline          触发流水线执行                   │
│  - QueryPipelineTask    查询执行记录                     │
│  - DescribePipelineTask 查询执行详情                     │
│  - UpdatePipelineTaskStatus 更新状态并继续调度           │
└──────────────┬──────────────────────────────────────────┘
               │
       ┌───────┴────────┐
       │                │
   ┌───▼────┐      ┌────▼────┐
   │  DAG   │      │  Lock   │
   │ Engine │      │ Manager │
   └───┬────┘      └────┬────┘
       │                │
   ┌───▼────────────────▼─────┐
   │   Task Scheduler         │
   │  - Stage 分组             │
   │  - 并行调度               │
   │  - 失败处理               │
   └──────────────────────────┘

📊 核心数据结构

PipelineTask(流水线运行实例)
type PipelineTask struct {
    Id         string                // 运行实例 ID
    CreateAt   time.Time            // 创建时间
    PipelineId string               // 所属流水线 ID
    Status     PIPELINE_TASK_STATUS // 运行状态
    StartAt    *time.Time           // 开始时间
    EndAt      *time.Time           // 结束时间
    Message    string               // 状态消息
    Tasks      []*task.Task         // 包含的所有任务
    RunParams  []*job.RunParam      // 运行时参数
}
PipelineStatus(执行状态)
状态 说明
PENDDING 等待执行
RUNNING 执行中
SUCCESS 执行成功
FAILED 执行失败

🔄 执行流程

1. 启动流水线 (RunPipeline)
用户触发
    ↓
查询 Pipeline 定义
    ↓
创建 PipelineTask 实例
    ↓
创建所有子任务 (Pendding)
    ↓
初始化 DAG Stage
    ↓
保存到数据库 (Running)
    ↓
启动后台 goroutine
    ↓
立即返回实例信息

关键设计点:

  • 使用独立的 background context,避免请求结束时任务被取消
  • 异步执行,快速响应用户请求
  • 继承原 context 的日志信息,保持链路追踪
2. 任务调度循环 (runPipelineTask)
┌─> 获取分布式锁
│       ↓
│   检查流水线状态(已结束则退出)
│       ↓
│   获取下一批可运行任务
│       ↓
│   ┌─ 有任务 ──> 注入参数 -> 触发执行
│   │
│   └─ 无任务 ──┬─ Stage运行中 -> 等待
│               │
│               └─ 所有完成 ──┬─ 有失败 -> Failed
│                             │
│                             └─ 全成功 -> Success
└─────────────┘
    ↓
释放锁
    ↓
持久化状态

关键机制:

  • 分布式锁:30秒超时,防止重复调度
  • 阶段等待:当前 Stage 未完成时不调度下一 Stage
  • 失败处理:可配置是否忽略任务失败继续执行
3. DAG Stage 划分

示例:

tasks:
  - name: A              # Stage 0
  - name: B              # Stage 0 (无依赖,与 A 并行)
  - name: C              # Stage 1 (依赖 A)
    depends: [A]
  - name: D              # Stage 1 (依赖 B,与 C 并行)
    depends: [B]
  - name: E              # Stage 2 (依赖 C 和 D)
    depends: [C, D]

执行顺序:

Stage 0: [A, B]     <- 并行执行
    ↓
Stage 1: [C, D]     <- 等待 Stage 0 完成后并行执行
    ↓
Stage 2: [E]        <- 等待 Stage 1 完成后执行

🔧 核心方法说明

RunPipeline

功能:触发流水线执行
特点

  • 创建流水线运行实例和所有子任务
  • 后台异步执行,立即返回
  • 自动进行 DAG 环检测
UpdatePipelineTaskStatus

功能:更新流水线状态并继续调度
调用时机

  • 子任务状态变更时触发
  • 外部手动触发继续执行

特点

  • 检查流水线是否已结束,避免重复调度
  • 异步执行,快速响应
  • 依赖分布式锁保证幂等性
runPipelineTask (内部方法)

功能:核心调度逻辑
职责

  • 获取下一批可执行任务
  • 注入运行参数
  • 触发任务执行
  • 修改流水线状态(内存)

设计原则

  • 只修改内存状态,不直接持久化
  • 由外部调用者统一持久化,保证一致性
  • 使用分布式锁保护整个调度过程

🔒 并发安全机制

1. 分布式锁
lock := lock.L().New(pipelineTask.Id, 30*time.Second)
err := lock.TryLock(ctx)
defer lock.UnLock(ctx)

作用:

  • 防止同一个流水线实例被多个调度器同时执行
  • 支持跨进程/实例部署
  • 30秒超时自动释放,防止死锁
2. 幂等性检查
// 已结束的流水线不再调度
if pipelineTask.Status == FAILED || pipelineTask.Status == SUCCESS {
    return
}
3. Context 管理
// 使用独立的 background context
bgCtx, cancel := context.WithTimeout(context.Background(), 24*time.Hour)
defer cancel()

// 继承日志信息
bgCtx = log.InjectCtx(bgCtx, log.FromCtx(ctx).Logger)

📝 参数解析规则

1. 任务输出引用
格式:${task_name.output_key}
示例:${build.image_tag}
说明:引用名为 build 的任务的 image_tag 输出
2. 流水线参数引用
格式:${var_name}
示例:${branch}
说明:引用流水线运行时传入的 branch 参数
3. 解析优先级
  1. 优先匹配任务输出 (task_name.output_key)
  2. 其次匹配流水线参数 (var_name)
  3. 未匹配返回空字符串

🎨 使用示例

创建并运行流水线
// 构造运行请求
req := &runtime.RunPipelineRequest{
    PipelineId: "pipeline-123",
    RunBy:      "admin",
    RunParams: []*job.RunParam{
        {Name: "branch", Value: "main"},
        {Name: "env", Value: "prod"},
    },
}

// 触发执行
task, err := runtime.GetService().RunPipeline(ctx, req)
if err != nil {
    return err
}

// 立即返回运行实例
fmt.Println("Pipeline Task ID:", task.Id)
fmt.Println("Status:", task.Status) // "running"
查询执行状态
// 查询详情
req := runtime.NewDescribePipelineTaskRequest("task-id")
task, err := runtime.GetService().DescribePipelineTask(ctx, req)

// 查看整体状态
fmt.Println("Status:", task.Status)
fmt.Println("Message:", task.Message)

// 查看各个任务状态
for _, t := range task.Tasks {
    fmt.Printf("Task %s: %s\n", t.Name, t.Status)
}
触发状态更新
// 当子任务完成后,触发流水线继续调度
req := runtime.NewUpdatePipelineTaskStatusRequest(
    "pipeline-task-id",
    "task-id",
)
task, err := runtime.GetService().UpdatePipelineTaskStatus(ctx, req)

⚠️ 注意事项

  1. Context 生命周期:后台 goroutine 使用独立的 context,避免请求取消导致任务中断
  2. 锁超时设置:分布式锁超时时间应大于单个调度周期,建议 30 秒以上
  3. DAG 环检测:在 InitDAG 时会自动检测循环依赖,如有环则返回错误
  4. 失败处理:任务可配置 ignore_error 忽略失败继续执行
  5. 状态持久化:调度逻辑只修改内存状态,由外部统一持久化保证一致性
  6. 动态调度:每次调用 NextJobRunTasks 时会基于最新状态动态计算可运行任务

🚀 性能优化

依赖满足的任务自动并行执行,最大化资源利用 2. 异步设计:API 调用立即返回,不阻塞用户请求 3. 增量调度:基于 DAG 状态只调度下一批可执行任务,无需全量扫描 4. 锁粒度:锁粒度为单个流水线实例,不同实例互不影响 5. 智能调度GetNextRunnableTasks 方法自动识别可并行执行的任务 4. 锁粒度:锁粒度为单个流水线实例,不同实例互不影响

📚 相关模块

  • Pipeline:流水线定义和模板管理
  • Task:任务执行和状态管理
  • Job:任务类型和执行器实现
  • Trigger:流水线触发器和事件管理

Documentation

Index

Constants

View Source
const (
	JOB_RUN_TASK_LABEL_KEY      = "job_run_task_id"
	PIPELINE_RUN_TASK_LABEL_KEY = "pipeline_run_task_id"
)
View Source
const (
	INIT_CONTAINER_PARAM_LABEL_KEY = "init_container_index"
	CONTAINER_PARAM_LABEL_KEY      = "container_index"
)
View Source
const (
	APP_NAME = "pipeline"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DescribePipelineTaskRequest

type DescribePipelineTaskRequest struct {
	PipelineTaskId string `json:"id"`
}

func NewDescribePipelineTaskRequest

func NewDescribePipelineTaskRequest(pipelineTaskId string) *DescribePipelineTaskRequest

type PIPELINE_TASK_STATUS

type PIPELINE_TASK_STATUS string
const (
	PIPELINE_RUN_TASK_STATUS_PENDDING PIPELINE_TASK_STATUS = ""
	PIPELINE_RUN_TASK_STATUS_RUNNING  PIPELINE_TASK_STATUS = "running"
	PIPELINE_RUN_TASK_STATUS_FAILED   PIPELINE_TASK_STATUS = "failed"
	PIPELINE_RUN_TASK_STATUS_SUCCESS  PIPELINE_TASK_STATUS = "success"
)

type PipelineSpec

type PipelineSpec struct {
	// pipeline id
	PipelineId string `json:"pipeline_id" validate:"required" gorm:"column:pipeline_id"`
	// 试运行
	DryRun bool `json:"dry_run" gorm:"column:dry_run"`
	// 触发方式, 默认手工触发
	TriggerMode trigger.MODE `json:"trigger_mode" gorm:"column:trigger_mode"`
	// 执行人
	RunBy string `json:"run_by"  gorm:"column:run_by" validate:"required"`
	// Pipeline 的运行时参数
	RunParams []*job.RunParam `json:"run_params" gorm:"column:run_params;serializer:json;type:json"`
	// 任务标签
	Label map[string]string `json:"label" bson:"label" gorm:"column:label;serializer:json;type:json" description:"任务标签" optional:"true"`
	// 额外的其他属性
	Extras map[string]string `json:"extras" form:"extras" bson:"extras" gorm:"column:extras;type:json;serializer:json;"`
}

type PipelineStatus

type PipelineStatus struct {
	// 状态
	Status PIPELINE_TASK_STATUS `json:"status" gorm:"column:status"`
	// 任务开始时间
	StartAt *time.Time `json:"start_at" gorm:"column:start_at"`
	// 更新时间
	UpdateAt *time.Time `json:"update_at" gorm:"column:update_at"`
	// 更新人
	UpdateBy string `json:"update_by" gorm:"column:update_by;type:varchar(60)"`
	// 任务结束时间
	EndAt *time.Time `json:"end_at" gorm:"column:end_at"`
	// 状态描述
	Message string `json:"message" gorm:"column:message"`
	// 运行的任务信息, 单独交给 Task 服务管理
	Tasks []*task.Task `json:"tasks" gorm:"-"`
	// 如果有下一个需要运行的Pipeline Task 的id
	NextPipelineTaskId string `json:"next_pipeline_task_id" gorm:"column:next_pipeline_task_id"`
	// 运行下一个流水线失败
	RunNextPipelineError string `json:"run_next_pipeline_error"  gorm:"column:run_next_pipeline_error"`
	// contains filtered or unexported fields
}

调度下一个需要运行的任务

func NewPipelineRunStatus

func NewPipelineRunStatus() *PipelineStatus

func (*PipelineStatus) ExportDAGGraph

func (r *PipelineStatus) ExportDAGGraph() (*dag.GraphData, error)

ExportDAGGraph 导出 DAG 图数据供 UI 展示 返回包含所有节点信息(名称、状态、依赖关系)的结构化数据

func (*PipelineStatus) Failed

func (r *PipelineStatus) Failed(msg string)

func (*PipelineStatus) GetDAG

func (r *PipelineStatus) GetDAG() (*dag.DAG, error)

GetDAG 获取 DAG 实例,如果未初始化则自动初始化

func (*PipelineStatus) GetDAGStatus

func (r *PipelineStatus) GetDAGStatus() (dag.DAGStatus, error)

GetDAGStatus 获取 DAG 的整体状态

func (*PipelineStatus) GetProgress

func (r *PipelineStatus) GetProgress() (completed int, total int, percentage float64)

GetProgress 获取执行进度

func (*PipelineStatus) GetStatusSummary

func (r *PipelineStatus) GetStatusSummary() (map[dag.NodeStatus]int, error)

GetStatusSummary 获取状态统计摘要

func (*PipelineStatus) InitDAG

func (r *PipelineStatus) InitDAG() error

InitDAG 初始化 DAG 实例 设计说明:从 Tasks 动态构建 DAG,不使用持久化 - 构建成本:O(N*D),N=任务数,D=平均依赖数,通常 <5ms - 优点:Tasks 是唯一数据源,无数据一致性问题 - 适用场景:Pipeline 不可变,任务数 <100,加载频率低

func (*PipelineStatus) IsCompleted

func (r *PipelineStatus) IsCompleted() bool

IsCompleted 判断所有任务是否已完成

func (*PipelineStatus) IsSuccess

func (r *PipelineStatus) IsSuccess() bool

IsSuccess 判断所有任务是否成功完成

func (*PipelineStatus) NextJobRunTasks

func (r *PipelineStatus) NextJobRunTasks() ([]*task.Task, error)

NextJobRunTasks 获取下一批可运行的任务 使用 DAG 的 GetNextRunnableTasks 方法动态计算

func (*PipelineStatus) PrintDAGGraph

func (r *PipelineStatus) PrintDAGGraph() (string, error)

PrintDAGGraph 在终端以树形结构打印 DAG 按拓扑顺序分层展示,方便查看整体结构和依赖关系

func (*PipelineStatus) Running

func (r *PipelineStatus) Running()

func (*PipelineStatus) SetEndAt

func (r *PipelineStatus) SetEndAt(t time.Time)

func (*PipelineStatus) SetStartAt

func (r *PipelineStatus) SetStartAt(t time.Time)

func (*PipelineStatus) Success

func (r *PipelineStatus) Success(msg string)

func (*PipelineStatus) TableName

func (r *PipelineStatus) TableName() string

func (*PipelineStatus) UnsafeGetDAG

func (r *PipelineStatus) UnsafeGetDAG() *dag.DAG

UnsafeGetDAG 获取 DAG 实例(不初始化,可能返回 nil) 仅在确定 DAG 已初始化的场景使用

type PipelineTask

type PipelineTask struct {
	// 对象Id
	Id string `json:"id" gorm:"column:id;primaryKey"`
	// 创建时间
	CreateAt time.Time `json:"create_at" gorm:"column:create_at"`
	// 运行请求
	RunPipelineRequest
}

func NewPipelineRunTask

func NewPipelineRunTask(req *RunPipelineRequest) *PipelineTask

func (*PipelineTask) GetRunParamValue

func (r *PipelineTask) GetRunParamValue(varName string) string

解析参数值变量, 比如 1. 其他task的输入中提取值 ${task_name.output_name} 2. pipeline RunParams中提取值 运行参数 ${var_name}

func (*PipelineTask) InjectInputParamValue

func (r *PipelineTask) InjectInputParamValue(t *task.Task)

func (*PipelineTask) String

func (r *PipelineTask) String() string

func (*PipelineTask) TableName

func (r *PipelineTask) TableName() string

type QueryPipelineRunTask

type QueryPipelineRunTask struct {
	request.PageRequest
}

func NewQueryPipelineRunTask

func NewQueryPipelineRunTask() *QueryPipelineRunTask

type RunPipelineRequest

type RunPipelineRequest struct {
	// Domain
	Domain string `json:"domain" gorm:"column:domain"`
	// Namespace
	Namespace string `json:"namespace" gorm:"column:namespace"`
	// Pipeline运行参数
	PipelineSpec
	// 运行后的状态
	PipelineStatus
}

func NewRunPipelineRequest

func NewRunPipelineRequest() *RunPipelineRequest

type Service

type Service interface {
	// Pipeline执行记录
	QueryPipelineTask(context.Context, *QueryPipelineRunTask) (*types.Set[*PipelineTask], error)
	// 查询详情
	DescribePipelineTask(context.Context, *DescribePipelineTaskRequest) (*PipelineTask, error)

	// 运行Pipeline
	RunPipeline(context.Context, *RunPipelineRequest) (*PipelineTask, error)
	// 更新Pipeline任务状态
	UpdatePipelineTaskStatus(context.Context, *UpdatePipelineTaskStatusRequest) (*PipelineTask, error)
}

func GetService

func GetService() Service

type UpdatePipelineTaskStatusRequest

type UpdatePipelineTaskStatusRequest struct {
	DescribePipelineTaskRequest
	TaskId string `json:"task_id"`
}

func NewUpdatePipelineTaskStatusRequest

func NewUpdatePipelineTaskStatusRequest(pipelineTaskId, taskId string) *UpdatePipelineTaskStatusRequest

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL