dag

package
v0.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const MaxExpressionLength = 1000

表达式长度限制(防止 DoS 攻击)

Variables

This section is empty.

Functions

This section is empty.

Types

type DAG

type DAG struct {
	Nodes map[string]DAGNoder `json:"-"`     // 节点映射(接口类型,不直接序列化)
	Edges map[string][]string `json:"edges"` // 邻接表:node -> 它依赖的节点列表

	Strict bool `json:"strict"` // 是否启用严格依赖校验
	// contains filtered or unexported fields
}

DAG 表示有向无环图

func LoadFromJSON

func LoadFromJSON(data []byte) (*DAG, error)

LoadFromJSON 静态方法:从JSON创建新的DAG实例

func NewDAG

func NewDAG() *DAG

NewDAG 创建新的DAG

func (*DAG) AddNode

func (d *DAG) AddNode(nodes ...DAGNoder) error

AddNode 添加节点

func (*DAG) BuildEdges

func (d *DAG) BuildEdges() error

BuildEdges 构建边关系,并检测循环依赖 返回 error 如果检测到循环依赖

func (*DAG) Debug

func (d *DAG) Debug(format string, v ...any)

func (*DAG) ExportGraph

func (d *DAG) ExportGraph() *GraphData

ExportGraph 导出完整的图数据供UI展示 返回包含所有节点信息(名称、状态、依赖关系)的结构化数据 UI可以根据dependencies字段绘制DAG关系图

func (*DAG) GetDAGStatus

func (d *DAG) GetDAGStatus() DAGStatus

GetDAGStatus 获取 DAG 的整体状态 返回值说明: - DAGStatusNotStarted: 未开始(所有节点都是 pending) - DAGStatusRunning: 执行中(有节点正在运行) - DAGStatusSuccess: 成功完成(所有节点都成功) - DAGStatusFailed: 失败完成(所有节点完成且有失败) - DAGStatusCanceled: 取消完成(所有节点完成且有取消,无失败) - DAGStatusPartial: 部分完成(有待执行的节点)

注意:如果有节点失败但还有节点在运行,返回 DAGStatusRunning 而不是 DAGStatusFailed

func (*DAG) GetDependencies

func (d *DAG) GetDependencies(nodeName string) []string

GetDependencies 获取节点的依赖列表

func (*DAG) GetNextRunnableTasks

func (d *DAG) GetNextRunnableTasks(ctx *EvalContext) []DAGNoder

GetNextRunnableTasks 获取下一批可运行的任务 返回所有依赖都已成功完成(Success/Skipped)且当前状态为Pending的任务 ctx 参数为可选的评估上下文,用于 when 条件评估

func (*DAG) GetNode

func (d *DAG) GetNode(nodeName string) DAGNoder

GetNode 获取节点

func (*DAG) GetNodeStatus

func (d *DAG) GetNodeStatus(nodeName string) (NodeStatus, error)

GetNodeStatus 获取节点状态

func (*DAG) GetNodesByStatus

func (d *DAG) GetNodesByStatus(status NodeStatus) []DAGNoder

GetNodesByStatus 获取指定状态的所有节点

func (*DAG) GetProgress

func (d *DAG) GetProgress() (completed int, total int, percentage float64)

GetProgress 获取 DAG 执行进度 返回:已完成节点数、总节点数、完成百分比

func (*DAG) GetStatusSummary

func (d *DAG) GetStatusSummary() map[NodeStatus]int

GetStatusSummary 获取状态统计摘要 返回每种状态的节点数量

func (*DAG) HasCanceledNodes

func (d *DAG) HasCanceledNodes() bool

HasCanceledNodes 判断是否有被取消的节点(中间状态判断) 只要有一个节点被取消就返回 true,不管其他节点是否完成

func (*DAG) HasCycles

func (d *DAG) HasCycles() bool

HasCycles 检查图中是否有环

func (*DAG) HasFailedNodes

func (d *DAG) HasFailedNodes() bool

HasFailedNodes 判断是否有失败的节点(中间状态判断) 只要有一个节点失败就返回 true,不管其他节点是否完成

func (*DAG) HasStarted

func (d *DAG) HasStarted() bool

HasStarted 判断 DAG 是否已开始执行 只要有节点不是 pending 状态就返回 true

func (*DAG) IsCanceled

func (d *DAG) IsCanceled() bool

IsCanceled 判断整个 DAG 是否被取消完成(最终状态) 当所有节点都已完成且至少有一个节点被取消(canceled)且没有失败节点时返回 true 注意:这是最终状态,不是中间状态。失败优先于取消

func (*DAG) IsCompleted

func (d *DAG) IsCompleted() bool

IsCompleted 判断整个 DAG 是否已完成 当所有节点都处于完成状态(success/failed/skipped/canceled)时返回 true

func (*DAG) IsFailed

func (d *DAG) IsFailed() bool

IsFailed 判断整个 DAG 是否失败完成(最终状态) 当所有节点都已完成且至少有一个节点失败(failed)时返回 true 注意:这是最终状态,不是中间状态。如果还有节点在运行,返回 false

func (*DAG) IsRunning

func (d *DAG) IsRunning() bool

IsRunning 判断 DAG 是否正在执行中 只要有一个节点处于 running 状态就返回 true

func (*DAG) IsSuccess

func (d *DAG) IsSuccess() bool

IsSuccess 判断整个 DAG 是否成功完成 当所有节点都成功完成(success/skipped)时返回 true 这意味着整个流程正常执行完成,没有失败或被取消的任务

func (*DAG) Load

func (d *DAG) Load(data []byte) error

Load 从JSON数据恢复DAG

func (*DAG) PrintGraph

func (d *DAG) PrintGraph() string

PrintGraph 在终端以树形结构打印 DAG 按拓扑顺序分层展示,方便查看整体结构和依赖关系

func (*DAG) ResetAllStatus

func (d *DAG) ResetAllStatus()

ResetAllStatus 重置所有节点状态为Pending

func (*DAG) Save

func (d *DAG) Save() ([]byte, error)

Save 序列化DAG为JSON(用于存储到数据库)

func (*DAG) TopologicalSort

func (d *DAG) TopologicalSort() ([]DAGNoder, error)

TopologicalSort 拓扑排序实现

func (*DAG) UpdateNodeStatus

func (d *DAG) UpdateNodeStatus(nodeName string, status NodeStatus) error

UpdateNodeStatus 更新节点状态

func (*DAG) ValidateAllWhenConditions

func (d *DAG) ValidateAllWhenConditions() []error

ValidateAllWhenConditions 验证所有节点的 when 条件表达式

在 DAG 构建时调用此方法可以提前发现表达式语法错误,避免运行时错误。

返回:

  • []error: 所有验证错误的列表,如果无错误则返回 nil 或空切片

示例:

dag.AddNode(...)
if errs := dag.ValidateAllWhenConditions(); len(errs) > 0 {
    for _, err := range errs {
        log.Error(err)
    }
    return fmt.Errorf("invalid when conditions found")
}

func (*DAG) WithEvaluator

func (d *DAG) WithEvaluator(evaluator *WhenEvaluator) *DAG

WithEvaluator 设置 when 评估器

func (*DAG) WithLogger

func (d *DAG) WithLogger(log *zerolog.Logger) *DAG

func (*DAG) WithStrict

func (d *DAG) WithStrict(strict bool) *DAG

WithStrict 设置是否启用严格依赖校验

type DAGNode

type DAGNode struct {
	Name         string     `json:"name"`
	Dependencies []string   `json:"dependencies"` // 依赖的节点名称列表
	Status       NodeStatus `json:"status"`
	Data         any        `json:"data,omitempty"` // 业务数据(可选,用于存储额外信息)

	// WhenCondition 执行条件表达式,用于动态控制节点是否执行
	//
	// 支持的格式:
	//   - 内置条件: "always"(始终执行), "never"(永不执行), "on_success"(依赖成功时执行), "on_failure"(依赖失败时执行)
	//   - 自定义表达式: "params.env == 'prod'"(基于参数), "deps.build.status == 'success'"(基于依赖状态)
	//   - 复杂表达式: "params.env in ['staging', 'prod'] && params.branch == 'main'"
	//
	// 默认值: 空字符串等同于 "always"
	//
	// 可用变量:
	//   - params: 用户提供的参数(map[string]interface{})
	//   - deps: 依赖节点信息(map[string]*DepNodeInfo),包含 status 字段
	//   - vars: 自定义变量(map[string]interface{})
	//
	// 错误处理: 表达式评估错误时默认执行节点(fail-safe 原则)
	//
	// 示例:
	//   WhenCondition: "always"                                    // 始终执行
	//   WhenCondition: "params.env == 'prod'"                      // 生产环境执行
	//   WhenCondition: "on_failure"                                // 依赖失败时执行(用于清理)
	//   WhenCondition: "deps.build.status == 'success' && params.deploy"  // 组合条件
	//
	// 注意:
	//   1. 需要通过 DAG.WithEvaluator() 设置评估器才能启用 when 条件功能
	//   2. 未设置评估器时,所有 when 条件被忽略(向后兼容)
	//   3. 建议表达式长度 < 200 字符,避免过于复杂
	//   4. 避免在 when 条件中使用敏感信息(如密码、密钥)
	WhenCondition string `json:"when,omitempty"`
}

DAGNode DAG节点(带状态)

func (*DAGNode) DependsNodes

func (n *DAGNode) DependsNodes() []string

DependsNodes 实现 DAGNoder 接口

func (*DAGNode) GetStatus

func (n *DAGNode) GetStatus() NodeStatus

GetStatus 实现 DAGNoder 接口

func (*DAGNode) IsCompleted

func (n *DAGNode) IsCompleted() bool

IsCompleted 实现 DAGNoder 接口

func (*DAGNode) IsSuccess

func (n *DAGNode) IsSuccess() bool

IsSuccess 实现 DAGNoder 接口

func (*DAGNode) NodeName

func (n *DAGNode) NodeName() string

NodeName 实现 DAGNoder 接口

func (*DAGNode) SetStatus

func (n *DAGNode) SetStatus(status NodeStatus)

SetStatus 实现 DAGNoder 接口

func (*DAGNode) When

func (n *DAGNode) When() string

When 实现 DAGNoder 接口

type DAGNoder

type DAGNoder interface {
	// NodeName 获取节点名称
	NodeName() string
	// DependsNodes 获取节点依赖的任务名称列表
	DependsNodes() []string
	// GetStatus 获取节点状态
	GetStatus() NodeStatus
	// SetStatus 设置节点状态
	SetStatus(NodeStatus)
	// IsCompleted 判断节点是否已完成
	IsCompleted() bool
	// IsSuccess 判断节点是否成功完成
	IsSuccess() bool
	// When 获取节点的执行条件表达式
	// 返回空字符串表示无条件限制(等同于 "always")
	When() string
}

DAGNoder 节点接口(由业务层实现) 完整的节点抽象,包含名称、依赖关系和运行时状态

type DAGStatus

type DAGStatus string

DAGStatus DAG整体状态

const (
	DAGStatusNotStarted DAGStatus = "not_started" // 未开始(所有节点都是pending)
	DAGStatusRunning    DAGStatus = "running"     // 执行中(有节点在运行)
	DAGStatusSuccess    DAGStatus = "success"     // 成功完成(所有节点都成功)
	DAGStatusFailed     DAGStatus = "failed"      // 失败完成(有节点失败)
	DAGStatusCanceled   DAGStatus = "canceled"    // 取消完成(有节点取消,无失败)
	DAGStatusPartial    DAGStatus = "partial"     // 部分完成(有完成有未完成)
)

type DepNodeInfo

type DepNodeInfo struct {
	Status NodeStatus     `json:"status"` // 节点状态(success, failed, skipped 等)
	Result map[string]any `json:"result"` // 节点结果(可选,由业务层填充)
}

DepNodeInfo 依赖节点信息

type EvalContext

type EvalContext struct {
	Params map[string]any          `json:"params"` // 流水线参数(用户输入)
	Deps   map[string]*DepNodeInfo `json:"deps"`   // 依赖节点信息(节点名 -> 节点状态)
	Vars   map[string]any          `json:"vars"`   // 全局变量(可选)
}

EvalContext 评估上下文

包含评估 when 表达式时可访问的所有变量。

type GraphData

type GraphData struct {
	Nodes []GraphNode `json:"nodes"`
}

GraphData 完整的图数据(用于UI展示)

type GraphNode

type GraphNode struct {
	Name         string     `json:"name"`
	Status       NodeStatus `json:"status"`
	Dependencies []string   `json:"dependencies"`
}

GraphNode UI展示用的节点数据

type NodeStatus

type NodeStatus string

NodeStatus 节点状态

const (
	NodeStatusPending  NodeStatus = "pending"  // 等待执行
	NodeStatusRunning  NodeStatus = "running"  // 执行中
	NodeStatusSuccess  NodeStatus = "success"  // 执行成功
	NodeStatusFailed   NodeStatus = "failed"   // 执行失败
	NodeStatusSkipped  NodeStatus = "skipped"  // 跳过执行(条件不满足或手动跳过)
	NodeStatusCanceled NodeStatus = "canceled" // 被取消
)

func (NodeStatus) IsCompleted

func (s NodeStatus) IsCompleted() bool

IsCompleted 判断节点是否已完成(成功、失败、跳过、取消都算完成)

func (NodeStatus) IsFinal

func (s NodeStatus) IsFinal() bool

IsFinal 判断是否为最终状态(不会再变化)

func (NodeStatus) IsSuccess

func (s NodeStatus) IsSuccess() bool

IsSuccess 判断节点是否成功完成

type WhenEvaluator

type WhenEvaluator struct {
	// contains filtered or unexported fields
}

WhenEvaluator when 条件评估器

用于评估 DAG 节点的 when 条件表达式,支持内置条件和自定义表达式。 使用 expr-lang/expr 库提供灵活的表达式评估能力。

特性:

  • 内置条件: always, never, on_success, on_failure
  • 自定义表达式: 支持逻辑运算、比较、成员运算等
  • 表达式缓存: 编译结果缓存提升性能
  • 线程安全: 可在并发环境中使用

使用示例:

evaluator := dag.NewWhenEvaluator()
dag.WithEvaluator(evaluator)

ctx := &dag.EvalContext{
    Params: map[string]interface{}{"env": "prod"},
}
result, err := evaluator.Evaluate("params.env == 'prod'", ctx)

注意事项:

  • 表达式必须返回布尔值,否则报错
  • 编译错误和运行时错误会返回 error
  • 推荐使用 ValidateExpression 预先验证表达式

func NewWhenEvaluator

func NewWhenEvaluator() *WhenEvaluator

NewWhenEvaluator 创建新的 when 评估器

func (*WhenEvaluator) Evaluate

func (e *WhenEvaluator) Evaluate(when string, ctx *EvalContext) (bool, error)

Evaluate 评估 when 条件表达式

参数:

  • when: 条件表达式字符串
  • ctx: 评估上下文,包含 params、deps、vars

返回:

  • bool: 条件是否满足
  • error: 编译或评估错误

支持的表达式:

  • 内置: "always", "never", "on_success", "on_failure"
  • 比较: "params.count > 10", "params.env == 'prod'"
  • 逻辑: "params.a && params.b", "params.a || params.b"
  • 成员: "params.env in ['dev', 'staging']"
  • 访问依赖: "deps.build.status == 'success'"

示例:

result, err := evaluator.Evaluate("params.env == 'prod'", ctx)
if err != nil {
    // 处理错误
}
if result {
    // 条件满足
}

func (*WhenEvaluator) ValidateExpression

func (e *WhenEvaluator) ValidateExpression(when string) error

ValidateExpression 验证表达式语法

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL