Documentation
¶
Index ¶
- Constants
- type DAG
- func (d *DAG) AddNode(nodes ...DAGNoder) error
- func (d *DAG) BuildEdges() error
- func (d *DAG) Debug(format string, v ...any)
- func (d *DAG) ExportGraph() *GraphData
- func (d *DAG) GetDAGStatus() DAGStatus
- func (d *DAG) GetDependencies(nodeName string) []string
- func (d *DAG) GetNextRunnableTasks(ctx *EvalContext) []DAGNoder
- func (d *DAG) GetNode(nodeName string) DAGNoder
- func (d *DAG) GetNodeStatus(nodeName string) (NodeStatus, error)
- func (d *DAG) GetNodesByStatus(status NodeStatus) []DAGNoder
- func (d *DAG) GetProgress() (completed int, total int, percentage float64)
- func (d *DAG) GetStatusSummary() map[NodeStatus]int
- func (d *DAG) HasCanceledNodes() bool
- func (d *DAG) HasCycles() bool
- func (d *DAG) HasFailedNodes() bool
- func (d *DAG) HasStarted() bool
- func (d *DAG) IsCanceled() bool
- func (d *DAG) IsCompleted() bool
- func (d *DAG) IsFailed() bool
- func (d *DAG) IsRunning() bool
- func (d *DAG) IsSuccess() bool
- func (d *DAG) Load(data []byte) error
- func (d *DAG) PrintGraph() string
- func (d *DAG) ResetAllStatus()
- func (d *DAG) Save() ([]byte, error)
- func (d *DAG) TopologicalSort() ([]DAGNoder, error)
- func (d *DAG) UpdateNodeStatus(nodeName string, status NodeStatus) error
- func (d *DAG) ValidateAllWhenConditions() []error
- func (d *DAG) WithEvaluator(evaluator *WhenEvaluator) *DAG
- func (d *DAG) WithLogger(log *zerolog.Logger) *DAG
- func (d *DAG) WithStrict(strict bool) *DAG
- type DAGNode
- type DAGNoder
- type DAGStatus
- type DepNodeInfo
- type EvalContext
- type GraphData
- type GraphNode
- type NodeStatus
- type WhenEvaluator
Constants ¶
const MaxExpressionLength = 1000
表达式长度限制(防止 DoS 攻击)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DAG ¶
type DAG struct {
Nodes map[string]DAGNoder `json:"-"` // 节点映射(接口类型,不直接序列化)
Edges map[string][]string `json:"edges"` // 邻接表:node -> 它依赖的节点列表
Strict bool `json:"strict"` // 是否启用严格依赖校验
// contains filtered or unexported fields
}
DAG 表示有向无环图
func (*DAG) BuildEdges ¶
BuildEdges 构建边关系,并检测循环依赖 返回 error 如果检测到循环依赖
func (*DAG) ExportGraph ¶
ExportGraph 导出完整的图数据供UI展示 返回包含所有节点信息(名称、状态、依赖关系)的结构化数据 UI可以根据dependencies字段绘制DAG关系图
func (*DAG) GetDAGStatus ¶
GetDAGStatus 获取 DAG 的整体状态 返回值说明: - DAGStatusNotStarted: 未开始(所有节点都是 pending) - DAGStatusRunning: 执行中(有节点正在运行) - DAGStatusSuccess: 成功完成(所有节点都成功) - DAGStatusFailed: 失败完成(所有节点完成且有失败) - DAGStatusCanceled: 取消完成(所有节点完成且有取消,无失败) - DAGStatusPartial: 部分完成(有待执行的节点)
注意:如果有节点失败但还有节点在运行,返回 DAGStatusRunning 而不是 DAGStatusFailed
func (*DAG) GetDependencies ¶
GetDependencies 获取节点的依赖列表
func (*DAG) GetNextRunnableTasks ¶
func (d *DAG) GetNextRunnableTasks(ctx *EvalContext) []DAGNoder
GetNextRunnableTasks 获取下一批可运行的任务 返回所有依赖都已成功完成(Success/Skipped)且当前状态为Pending的任务 ctx 参数为可选的评估上下文,用于 when 条件评估
func (*DAG) GetNodeStatus ¶
func (d *DAG) GetNodeStatus(nodeName string) (NodeStatus, error)
GetNodeStatus 获取节点状态
func (*DAG) GetNodesByStatus ¶
func (d *DAG) GetNodesByStatus(status NodeStatus) []DAGNoder
GetNodesByStatus 获取指定状态的所有节点
func (*DAG) GetProgress ¶
GetProgress 获取 DAG 执行进度 返回:已完成节点数、总节点数、完成百分比
func (*DAG) GetStatusSummary ¶
func (d *DAG) GetStatusSummary() map[NodeStatus]int
GetStatusSummary 获取状态统计摘要 返回每种状态的节点数量
func (*DAG) HasCanceledNodes ¶
HasCanceledNodes 判断是否有被取消的节点(中间状态判断) 只要有一个节点被取消就返回 true,不管其他节点是否完成
func (*DAG) HasFailedNodes ¶
HasFailedNodes 判断是否有失败的节点(中间状态判断) 只要有一个节点失败就返回 true,不管其他节点是否完成
func (*DAG) HasStarted ¶
HasStarted 判断 DAG 是否已开始执行 只要有节点不是 pending 状态就返回 true
func (*DAG) IsCanceled ¶
IsCanceled 判断整个 DAG 是否被取消完成(最终状态) 当所有节点都已完成且至少有一个节点被取消(canceled)且没有失败节点时返回 true 注意:这是最终状态,不是中间状态。失败优先于取消
func (*DAG) IsCompleted ¶
IsCompleted 判断整个 DAG 是否已完成 当所有节点都处于完成状态(success/failed/skipped/canceled)时返回 true
func (*DAG) IsFailed ¶
IsFailed 判断整个 DAG 是否失败完成(最终状态) 当所有节点都已完成且至少有一个节点失败(failed)时返回 true 注意:这是最终状态,不是中间状态。如果还有节点在运行,返回 false
func (*DAG) IsSuccess ¶
IsSuccess 判断整个 DAG 是否成功完成 当所有节点都成功完成(success/skipped)时返回 true 这意味着整个流程正常执行完成,没有失败或被取消的任务
func (*DAG) PrintGraph ¶
PrintGraph 在终端以树形结构打印 DAG 按拓扑顺序分层展示,方便查看整体结构和依赖关系
func (*DAG) TopologicalSort ¶
TopologicalSort 拓扑排序实现
func (*DAG) UpdateNodeStatus ¶
func (d *DAG) UpdateNodeStatus(nodeName string, status NodeStatus) error
UpdateNodeStatus 更新节点状态
func (*DAG) ValidateAllWhenConditions ¶
ValidateAllWhenConditions 验证所有节点的 when 条件表达式
在 DAG 构建时调用此方法可以提前发现表达式语法错误,避免运行时错误。
返回:
- []error: 所有验证错误的列表,如果无错误则返回 nil 或空切片
示例:
dag.AddNode(...)
if errs := dag.ValidateAllWhenConditions(); len(errs) > 0 {
for _, err := range errs {
log.Error(err)
}
return fmt.Errorf("invalid when conditions found")
}
func (*DAG) WithEvaluator ¶
func (d *DAG) WithEvaluator(evaluator *WhenEvaluator) *DAG
WithEvaluator 设置 when 评估器
type DAGNode ¶
type DAGNode struct {
Name string `json:"name"`
Dependencies []string `json:"dependencies"` // 依赖的节点名称列表
Status NodeStatus `json:"status"`
Data any `json:"data,omitempty"` // 业务数据(可选,用于存储额外信息)
// WhenCondition 执行条件表达式,用于动态控制节点是否执行
//
// 支持的格式:
// - 内置条件: "always"(始终执行), "never"(永不执行), "on_success"(依赖成功时执行), "on_failure"(依赖失败时执行)
// - 自定义表达式: "params.env == 'prod'"(基于参数), "deps.build.status == 'success'"(基于依赖状态)
// - 复杂表达式: "params.env in ['staging', 'prod'] && params.branch == 'main'"
//
// 默认值: 空字符串等同于 "always"
//
// 可用变量:
// - params: 用户提供的参数(map[string]interface{})
// - deps: 依赖节点信息(map[string]*DepNodeInfo),包含 status 字段
// - vars: 自定义变量(map[string]interface{})
//
// 错误处理: 表达式评估错误时默认执行节点(fail-safe 原则)
//
// 示例:
// WhenCondition: "always" // 始终执行
// WhenCondition: "params.env == 'prod'" // 生产环境执行
// WhenCondition: "on_failure" // 依赖失败时执行(用于清理)
// WhenCondition: "deps.build.status == 'success' && params.deploy" // 组合条件
//
// 注意:
// 1. 需要通过 DAG.WithEvaluator() 设置评估器才能启用 when 条件功能
// 2. 未设置评估器时,所有 when 条件被忽略(向后兼容)
// 3. 建议表达式长度 < 200 字符,避免过于复杂
// 4. 避免在 when 条件中使用敏感信息(如密码、密钥)
WhenCondition string `json:"when,omitempty"`
}
DAGNode DAG节点(带状态)
func (*DAGNode) DependsNodes ¶
DependsNodes 实现 DAGNoder 接口
type DAGNoder ¶
type DAGNoder interface {
// NodeName 获取节点名称
NodeName() string
// DependsNodes 获取节点依赖的任务名称列表
DependsNodes() []string
// GetStatus 获取节点状态
GetStatus() NodeStatus
// SetStatus 设置节点状态
SetStatus(NodeStatus)
// IsCompleted 判断节点是否已完成
IsCompleted() bool
// IsSuccess 判断节点是否成功完成
IsSuccess() bool
// When 获取节点的执行条件表达式
// 返回空字符串表示无条件限制(等同于 "always")
When() string
}
DAGNoder 节点接口(由业务层实现) 完整的节点抽象,包含名称、依赖关系和运行时状态
type DAGStatus ¶
type DAGStatus string
DAGStatus DAG整体状态
const ( DAGStatusNotStarted DAGStatus = "not_started" // 未开始(所有节点都是pending) DAGStatusRunning DAGStatus = "running" // 执行中(有节点在运行) DAGStatusSuccess DAGStatus = "success" // 成功完成(所有节点都成功) DAGStatusFailed DAGStatus = "failed" // 失败完成(有节点失败) DAGStatusCanceled DAGStatus = "canceled" // 取消完成(有节点取消,无失败) DAGStatusPartial DAGStatus = "partial" // 部分完成(有完成有未完成) )
type DepNodeInfo ¶
type DepNodeInfo struct {
Status NodeStatus `json:"status"` // 节点状态(success, failed, skipped 等)
Result map[string]any `json:"result"` // 节点结果(可选,由业务层填充)
}
DepNodeInfo 依赖节点信息
type EvalContext ¶
type EvalContext struct {
Params map[string]any `json:"params"` // 流水线参数(用户输入)
Deps map[string]*DepNodeInfo `json:"deps"` // 依赖节点信息(节点名 -> 节点状态)
Vars map[string]any `json:"vars"` // 全局变量(可选)
}
EvalContext 评估上下文
包含评估 when 表达式时可访问的所有变量。
type GraphData ¶
type GraphData struct {
Nodes []GraphNode `json:"nodes"`
}
GraphData 完整的图数据(用于UI展示)
type GraphNode ¶
type GraphNode struct {
Name string `json:"name"`
Status NodeStatus `json:"status"`
Dependencies []string `json:"dependencies"`
}
GraphNode UI展示用的节点数据
type NodeStatus ¶
type NodeStatus string
NodeStatus 节点状态
const ( NodeStatusPending NodeStatus = "pending" // 等待执行 NodeStatusRunning NodeStatus = "running" // 执行中 NodeStatusSuccess NodeStatus = "success" // 执行成功 NodeStatusFailed NodeStatus = "failed" // 执行失败 NodeStatusSkipped NodeStatus = "skipped" // 跳过执行(条件不满足或手动跳过) NodeStatusCanceled NodeStatus = "canceled" // 被取消 )
func (NodeStatus) IsCompleted ¶
func (s NodeStatus) IsCompleted() bool
IsCompleted 判断节点是否已完成(成功、失败、跳过、取消都算完成)
type WhenEvaluator ¶
type WhenEvaluator struct {
// contains filtered or unexported fields
}
WhenEvaluator when 条件评估器
用于评估 DAG 节点的 when 条件表达式,支持内置条件和自定义表达式。 使用 expr-lang/expr 库提供灵活的表达式评估能力。
特性:
- 内置条件: always, never, on_success, on_failure
- 自定义表达式: 支持逻辑运算、比较、成员运算等
- 表达式缓存: 编译结果缓存提升性能
- 线程安全: 可在并发环境中使用
使用示例:
evaluator := dag.NewWhenEvaluator()
dag.WithEvaluator(evaluator)
ctx := &dag.EvalContext{
Params: map[string]interface{}{"env": "prod"},
}
result, err := evaluator.Evaluate("params.env == 'prod'", ctx)
注意事项:
- 表达式必须返回布尔值,否则报错
- 编译错误和运行时错误会返回 error
- 推荐使用 ValidateExpression 预先验证表达式
func (*WhenEvaluator) Evaluate ¶
func (e *WhenEvaluator) Evaluate(when string, ctx *EvalContext) (bool, error)
Evaluate 评估 when 条件表达式
参数:
- when: 条件表达式字符串
- ctx: 评估上下文,包含 params、deps、vars
返回:
- bool: 条件是否满足
- error: 编译或评估错误
支持的表达式:
- 内置: "always", "never", "on_success", "on_failure"
- 比较: "params.count > 10", "params.env == 'prod'"
- 逻辑: "params.a && params.b", "params.a || params.b"
- 成员: "params.env in ['dev', 'staging']"
- 访问依赖: "deps.build.status == 'success'"
示例:
result, err := evaluator.Evaluate("params.env == 'prod'", ctx)
if err != nil {
// 处理错误
}
if result {
// 条件满足
}
func (*WhenEvaluator) ValidateExpression ¶
func (e *WhenEvaluator) ValidateExpression(when string) error
ValidateExpression 验证表达式语法