Documentation
¶
Index ¶
- type ApprovalConfig
- type ApprovalRunner
- type BaseRunner
- func (b *BaseRunner) GetResult() *core.Result
- func (b *BaseRunner) GetStatus() core.Status
- func (b *BaseRunner) GetWorkingDirectory() (string, error)
- func (b *BaseRunner) InitBase()
- func (b *BaseRunner) Lock()
- func (b *BaseRunner) RLock()
- func (b *BaseRunner) RUnlock()
- func (b *BaseRunner) SendLog(logChan chan<- string, message string)
- func (b *BaseRunner) SetApiserver(apiserver core.Apiserver)
- func (b *BaseRunner) Unlock()
- type CommandRunner
- func (r *CommandRunner) Cleanup() error
- func (r *CommandRunner) Execute(ctx context.Context, logChan chan<- string) (*core.Result, error)
- func (r *CommandRunner) GetResult() *core.Result
- func (r *CommandRunner) GetStatus() core.Status
- func (r *CommandRunner) Kill() error
- func (r *CommandRunner) ParseArgs(task *core.Task) error
- func (r *CommandRunner) Stop() error
- type CommandSecurity
- type ContainerConfig
- type ContainerRunner
- type DatabaseConfig
- type DatabaseRunner
- type EmailSender
- type FeishuBotSender
- type FileConfig
- type FileRunner
- func (r *FileRunner) Cleanup() error
- func (r *FileRunner) Execute(ctx context.Context, logChan chan<- string) (*core.Result, error)
- func (r *FileRunner) Kill() error
- func (r *FileRunner) ParseArgs(task *core.Task) error
- func (r *FileRunner) SetTask(task *core.Task) error
- func (r *FileRunner) Stop() error
- type GitConfig
- type GitResult
- type GitRunner
- type HTTPAuth
- type HTTPConfig
- type HTTPResponse
- type HTTPRunner
- type MessageConfig
- type MessageRunner
- func (r *MessageRunner) Cleanup() error
- func (r *MessageRunner) Execute(ctx context.Context, logChan chan<- string) (*core.Result, error)
- func (r *MessageRunner) GetStatus() core.Status
- func (r *MessageRunner) Kill() error
- func (r *MessageRunner) ParseArgs(task *core.Task) error
- func (r *MessageRunner) SetApiserver(apiserver core.Apiserver)
- func (r *MessageRunner) Stop() error
- type MessageSender
- type ScriptConfig
- type ScriptRunner
- func (r *ScriptRunner) Cleanup() error
- func (r *ScriptRunner) Execute(ctx context.Context, logChan chan<- string) (*core.Result, error)
- func (r *ScriptRunner) GetResult() *core.Result
- func (r *ScriptRunner) GetStatus() core.Status
- func (r *ScriptRunner) Kill() error
- func (r *ScriptRunner) ParseArgs(task *core.Task) error
- func (r *ScriptRunner) Stop() error
- type SecurityConfig
- type WechatWorkBotSender
- type WechatWorkSender
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ApprovalConfig ¶
type ApprovalConfig struct {
Title string `json:"title"` // 审批标题(必填)
Content string `json:"content"` // 审批内容(支持Markdown)
Context string `json:"context"` // 审批上下文(JSON格式,包含相关数据)
UserIDs []string `json:"user_ids"` // 审批人用户ID列表
AIAgentIDs []string `json:"ai_agent_ids"` // AI审批实体ID列表
RequireAll bool `json:"require_all"` // 是否需要所有人都审批(默认false,任意一人即可)
Timeout int `json:"timeout"` // 审批超时时间(秒,可选,默认使用Task.Timeout,若Task.Timeout也为空则为3600)
Metadata string `json:"metadata"` // 扩展元数据(JSON格式)
}
ApprovalConfig 审批配置
type ApprovalRunner ¶
type ApprovalRunner struct {
BaseRunner // 🔥 嵌入基类
// contains filtered or unexported fields
}
ApprovalRunner 审批 Runner
用于在Workflow中创建审批步骤,支持: - 人工审批(指定user_ids) - AI自动审批(指定ai_agent_ids) - 混合审批(同时指定人工和AI) - 超时处理
工作流程: 1. 解析任务参数 2. 调用APIServer创建Approval对象 3. 将审批ID写入Task.Output 4. 设置Task状态为Running 5. 快速返回(非阻塞) 6. 等待审批人或AI通过API操作审批 7. 审批完成后,通过API更新Task状态
func NewApprovalRunner ¶
func NewApprovalRunner() *ApprovalRunner
NewApprovalRunner 创建新的 ApprovalRunner
func (*ApprovalRunner) Execute ¶
Execute 执行审批任务
核心逻辑: 1. 调用APIServer创建Approval对象 2. 获取审批ID 3. 构造输出(包含审批ID) 4. 快速返回Success
注意:此方法不会阻塞等待审批结果! 审批完成后,需要通过Approval API更新Task状态,触发Workflow继续执行。
type BaseRunner ¶
type BaseRunner struct {
// 🔥 公共字段 - 子类可直接访问
Task *core.Task // 任务对象
Status core.Status // 当前状态
Result *core.Result // 执行结果
Apiserver core.Apiserver // API Server 客户端
Ctx context.Context // 执行上下文
Cancel context.CancelFunc // 取消函数
StartTime time.Time // 开始时间
// contains filtered or unexported fields
}
BaseRunner 基础 Runner 结构
设计原则:
- 公共字段:大写导出,子类可以直接访问(符合 Go 嵌入的惯用法)
- 私有字段:小写私有,只有 mutex 保持私有以保证并发安全
- 辅助方法:提供复杂操作的辅助方法(如 GetWorkingDirectory)
提供所有 Runner 的公共功能:
- 任务对象管理
- 状态管理
- 结果管理
- 并发安全(读写锁)
- API Server 客户端注入
- Context 管理(取消和超时)
- 执行时间追踪
- 工作目录获取
- 日志发送
func (*BaseRunner) GetResult ¶
func (b *BaseRunner) GetResult() *core.Result
GetResult 获取执行结果(保留此方法以符合接口)
func (*BaseRunner) GetStatus ¶
func (b *BaseRunner) GetStatus() core.Status
GetStatus 获取当前状态(保留此方法以符合接口)
func (*BaseRunner) GetWorkingDirectory ¶
func (b *BaseRunner) GetWorkingDirectory() (string, error)
GetWorkingDirectory 获取任务工作目录
自动处理:
- 从 task.Metadata 中读取自定义工作目录
- 如果没有配置,使用默认目录
- 自动去除路径两边的空格
- 自动创建目录(如果不存在)
- 验证路径是否为目录
返回:
- string: 工作目录路径
- error: 如果创建目录失败或路径不是目录
func (*BaseRunner) InitBase ¶
func (b *BaseRunner) InitBase()
InitBase 初始化 BaseRunner
子类 Runner 应在构造函数中调用此方法
func (*BaseRunner) SendLog ¶
func (b *BaseRunner) SendLog(logChan chan<- string, message string)
SendLog 发送日志到 channel(非阻塞)
使用 select 实现非阻塞发送,避免 channel 满时阻塞
func (*BaseRunner) SetApiserver ¶
func (b *BaseRunner) SetApiserver(apiserver core.Apiserver)
SetApiserver 注入 API Server 客户端(保留此方法以符合接口)
type CommandRunner ¶
type CommandRunner struct {
BaseRunner // 🔥 嵌入基类
// contains filtered or unexported fields
}
CommandRunner 命令执行器
用于执行系统命令和脚本的Runner实现 使用 bash -c 执行完整的命令字符串,支持复杂的shell命令 包括管道、重定向、逻辑操作符等
func (*CommandRunner) GetStatus ¶
func (r *CommandRunner) GetStatus() core.Status
GetStatus 获取当前执行状态
type CommandSecurity ¶
type CommandSecurity struct {
// contains filtered or unexported fields
}
CommandSecurity 命令安全检查器
func (*CommandSecurity) ValidateCommand ¶
func (cs *CommandSecurity) ValidateCommand(command string, args []string) error
ValidateCommand 验证命令是否安全
type ContainerConfig ¶
type ContainerConfig struct {
Action string `json:"action"` // build/run/push/pull/stop/restart/remove/logs/exec/inspect/prune/stats
// ========== 运行时配置 ==========
Runtime string `json:"runtime"` // docker/containerd (默认 docker)
Connection string `json:"connection"` // unix/tcp (默认 unix)
// Unix Socket 连接(本地)
Socket string `json:"socket"` // /var/run/docker.sock 或 /run/containerd/containerd.sock
// TCP 连接(远程,仅 Docker)
Host string `json:"host"` // docker-proxy:2376
TLS bool `json:"tls"` // 是否启用 TLS
TLSCred string `json:"tls_cred"` // TLS 凭证 ID(key_value 类型)
// containerd 特定
Namespace string `json:"namespace"` // containerd namespace(默认 default)
BuildkitAddr string `json:"buildkit_addr"` // buildkit 地址(镜像构建)
// ========== build 字段 ==========
Dockerfile string `json:"dockerfile"` // Dockerfile 路径
Context string `json:"context"` // 构建上下文
Tags []string `json:"tags"` // 镜像标签
BuildArgs map[string]string `json:"build_args"` // 构建参数
NoCache bool `json:"no_cache"` // 不使用缓存
Pull bool `json:"pull"` // 拉取最新基础镜像
// ========== run 字段 ==========
Image string `json:"image"` // 镜像名称
Name string `json:"name"` // 容器名称
Ports []string `json:"ports"` // 端口映射 ["80:80", "443:443"]
Volumes []string `json:"volumes"` // 卷挂载 ["/host:/container"]
Env []string `json:"env"` // 环境变量 ["KEY=VALUE"]
Network string `json:"network"` // 网络
Restart string `json:"restart"` // 重启策略
Detach bool `json:"detach"` // 后台运行
Remove bool `json:"remove"` // 退出后删除
Command []string `json:"command"` // 覆盖 CMD
Entrypoint []string `json:"entrypoint"` // 覆盖 ENTRYPOINT
// ========== push/pull 字段 ==========
Registry string `json:"registry"` // 镜像仓库
RegistryCred string `json:"registry_cred"` // Registry 凭证 ID(username_password)
TagLatest bool `json:"tag_latest"` // 同时推送 latest 标签
// ========== stop/start/restart 字段 ==========
Container string `json:"container"` // 容器名或 ID
Timeout int `json:"timeout"` // 停止超时(秒)
// ========== remove 字段 ==========
Force bool `json:"force"` // 强制删除
RemoveVolumes bool `json:"remove_volumes"` // 删除关联卷
// ========== prune 字段 ==========
Type string `json:"type"` // image/container/volume/network/all
Filters map[string]string `json:"filters"` // dangling=true, until=24h
// ========== logs 字段 ==========
Lines int `json:"lines"` // 显示行数
Follow bool `json:"follow"` // 持续输出
Timestamps bool `json:"timestamps"` // 显示时间戳
Tail int `json:"tail"` // 从末尾开始
// ========== exec 字段 ==========
ExecCommand []string `json:"exec_command"` // 要执行的命令
Interactive bool `json:"interactive"` // 交互模式
TTY bool `json:"tty"` // 分配 TTY
// ========== inspect 字段 ==========
CheckHealth bool `json:"check_health"` // 检查健康状态
}
ContainerConfig 容器操作配置(运行时无关,参考 skaffold)
type ContainerRunner ¶
type ContainerRunner struct {
BaseRunner // 🔥 嵌入基类
// contains filtered or unexported fields
}
ContainerRunner 容器执行器(运行时无关,参考 skaffold)
支持两种容器运行时: - Docker: 开发环境、容器化 Worker(支持 Unix Socket + TCP Remote) - containerd: 生产环境、Kubernetes 节点(仅 Unix Socket)
核心功能: - 镜像管理:build, pull, push, tag, remove - 容器管理:run, stop, start, restart, remove - 容器操作:logs, exec, inspect, stats - 系统维护:prune (image/container/volume/network)
连接模式: - Unix Socket: 本地高性能连接 - TCP Remote: 容器化 Worker(类似 Jenkins Docker Plugin)
func NewContainerRunner ¶
func NewContainerRunner() *ContainerRunner
NewContainerRunner 创建新的 ContainerRunner
type DatabaseConfig ¶
type DatabaseConfig struct {
DBType string `json:"db_type"` // 数据库类型:mysql/postgresql/redis
CredentialID string `json:"credential_id"` // 凭证ID(username_password类型)
Host string `json:"host"` // 主机地址
Port int `json:"port"` // 端口
Database string `json:"database"` // 数据库名(MySQL/PostgreSQL)或 DB序号(Redis,0-15)
SQL string `json:"sql"` // SQL语句(MySQL/PostgreSQL)
Command string `json:"command"` // Redis命令(Redis专用)
Args []string `json:"args"` // Redis命令参数(Redis专用)
Params []interface{} `json:"params"` // SQL参数(可选,用于参数化查询)
MaxRows int `json:"max_rows"` // 最大返回/导出行数,默认10000
ExportExcel bool `json:"export_excel"` // 是否导出Excel(仅SELECT有效)
}
DatabaseConfig 数据库配置
type DatabaseRunner ¶
type DatabaseRunner struct {
BaseRunner // 🔥 嵌入基类
// contains filtered or unexported fields
}
DatabaseRunner 数据库执行器
支持 MySQL、PostgreSQL、Redis 数据库操作 核心功能: - SQL 类型智能识别(DQL/DML/DDL/MAINTENANCE) - Redis 常见命令支持(GET/SET/HGETALL/KEYS/SCAN/DEL等) - Excel 自动导出(SELECT 查询结果) - Task Output 机制(供下游任务使用)
func NewDatabaseRunner ¶
func NewDatabaseRunner() *DatabaseRunner
NewDatabaseRunner 创建新的 DatabaseRunner
type EmailSender ¶
type EmailSender struct{}
EmailSender 邮件发送器
使用 SMTP 协议发送邮件,支持: - TLS 加密连接 - HTML 和纯文本内容 - 多个收件人
func (*EmailSender) Send ¶
func (s *EmailSender) Send(ctx context.Context, cred *core.Credential, config MessageConfig, logChan chan<- string) (*core.Result, error)
Send 发送邮件
type FeishuBotSender ¶
type FeishuBotSender struct {
// contains filtered or unexported fields
}
FeishuBotSender 飞书群机器人发送器
用于通过飞书群机器人 Webhook 发送消息 API文档: https://open.feishu.cn/document/ukTMukTMukTM/ucTM5YjL3ETO24yNxkjN
func (*FeishuBotSender) Send ¶
func (s *FeishuBotSender) Send(ctx context.Context, cred *core.Credential, config MessageConfig, logChan chan<- string) (*core.Result, error)
Send 发送飞书群机器人消息
type FileConfig ¶
type FileConfig struct {
Action string `json:"action"` // cleanup/backup/compress/stat
// 远程连接(可选,为空则本地操作)
Host string `json:"host"` // 远程主机地址
Port int `json:"port"` // SSH 端口(默认 22)
Credential string `json:"credential"` // 凭证 ID
Username string `json:"username"` // SSH 用户名(默认 root)
// 通用字段
Path string `json:"path"` // 目标路径
Pattern string `json:"pattern"` // 文件匹配模式
Recursive bool `json:"recursive"` // 递归子目录
// cleanup 专用
OlderThan string `json:"older_than"` // 7d, 30d, 90d
LargerThan string `json:"larger_than"` // 100M, 1G
DryRun bool `json:"dry_run"` // 试运行模式
Exclude []string `json:"exclude"` // 排除路径
// backup 专用
Source string `json:"source"` // 源路径
Target string `json:"target"` // 目标路径
Compress bool `json:"compress"` // 是否压缩
Incremental bool `json:"incremental"` // 增量备份
KeepDays int `json:"keep_days"` // 保留天数
// compress 专用
Format string `json:"format"` // tar.gz, zip
RemoveSource bool `json:"remove_source"` // 压缩后删除源
Level int `json:"level"` // 压缩级别
// stat 专用
SortBy string `json:"sort_by"` // size/time/name
Limit int `json:"limit"` // 返回数量
}
FileConfig 文件操作配置
type FileRunner ¶
type FileRunner struct {
BaseRunner // 🔥 嵌入基类
// contains filtered or unexported fields
}
FileRunner 文件操作执行器
支持本地和远程文件操作(通过纯 SSH 命令) 核心功能: - cleanup:文件清理(支持时间、大小筛选、DryRun) - backup:文件备份(支持压缩、增量) - compress:文件压缩(tar.gz/zip) - stat:文件统计(磁盘占用分析)
远程操作:纯 SSH 命令,无需 SFTP
func (*FileRunner) ParseArgs ¶
func (r *FileRunner) ParseArgs(task *core.Task) error
ParseArgs 解析任务参数
type GitConfig ¶
type GitConfig struct {
// URL 仓库地址(必填)
// 支持 SSH: git@github.com:user/repo.git
// 支持 HTTPS: https://github.com/user/repo.git
URL string `json:"url"`
// Branch 分支名(可选,默认 main)
Branch string `json:"branch"`
// Credential 凭证 ID(必填)
// 支持类型:ssh_private_key, username_password, api_token
Credential string `json:"credential"`
// Clean 清空模式(可选,默认 false)
// false: 智能 sync(不存在就 clone,存在就 pull)✅ 推荐
// true: 强制重来(删除后 clone)⚠️ 慎用
Clean bool `json:"clean"`
}
GitConfig Git 操作配置(极简版)
type GitResult ¶
type GitResult struct {
// Action 操作类型(clone/pull)
Action string `json:"action"`
// Repository 仓库信息
Repository string `json:"repository"` // 仓库 URL
Branch string `json:"branch"` // 分支名
Commit string `json:"commit"` // 当前提交哈希
// ChangedFiles 变更统计(pull 时有效)
ChangedFiles int `json:"changed_files,omitempty"` // 变更文件数
Insertions int `json:"insertions,omitempty"` // 新增行数
Deletions int `json:"deletions,omitempty"` // 删除行数
// ExecuteInfo 执行信息
WorkDir string `json:"work_dir"` // 工作目录
Duration float64 `json:"duration"` // 执行时长(秒)
Timestamp string `json:"timestamp"` // 执行时间
}
GitResult Git 操作结果
type GitRunner ¶
type GitRunner struct {
BaseRunner // 🔥 嵌入基类
// contains filtered or unexported fields
}
GitRunner Git 操作执行器
type HTTPConfig ¶
type HTTPConfig struct {
URL string `json:"url"` // 必填:请求URL
Method string `json:"method"` // 选填:请求方法,默认GET
Headers map[string]string `json:"headers"` // 选填:请求头
Query map[string]string `json:"query"` // 选填:URL参数
Body interface{} `json:"body"` // 选填:请求体(仅JSON对象)
ExpectedStatus []int `json:"expected_status"` // 选填:预期状态码,默认[200]
}
HTTPConfig HTTP请求配置(v2.0 简化版) 注意:超时和重试使用 Task 的配置,不在这里定义
type HTTPResponse ¶
HTTPResponse HTTP响应
type HTTPRunner ¶
type HTTPRunner struct {
BaseRunner // 🔥 嵌入基类
// contains filtered or unexported fields
}
HTTPRunner HTTP请求执行器(v2.0 简化版)
type MessageConfig ¶
type MessageConfig struct {
Type string `json:"type"` // 消息类型:email/wechat_work/wechat_work_bot/feishu_bot
CredentialID string `json:"credential_id"` // 凭证ID(引用凭证管理)
To []string `json:"to"` // 接收人列表(邮件地址或用户ID)
Subject string `json:"subject"` // 邮件主题(email专用)
Content string `json:"content"` // 消息内容
ContentType string `json:"content_type"` // 内容类型:text/markdown/html(默认text)
// 企业微信应用专用
ToUser string `json:"to_user,omitempty"` // 成员ID列表(用|分隔)
ToParty string `json:"to_party,omitempty"` // 部门ID列表(用|分隔)
ToTag string `json:"to_tag,omitempty"` // 标签ID列表(用|分隔)
// 机器人 @人 专用
AtMobiles []string `json:"at_mobiles,omitempty"` // @的手机号列表
AtUserIds []string `json:"at_user_ids,omitempty"` // @的用户ID列表
IsAtAll bool `json:"is_at_all,omitempty"` // 是否@所有人
}
MessageConfig 消息配置
type MessageRunner ¶
type MessageRunner struct {
BaseRunner // 🔥 嵌入基类
// contains filtered or unexported fields
}
MessageRunner 消息发送 Runner
支持多种消息发送渠道: - email: SMTP 邮件 - wechat_work: 企业微信应用消息 - wechat_work_bot: 企业微信群机器人 - feishu_bot: 飞书群机器人
func (*MessageRunner) ParseArgs ¶
func (r *MessageRunner) ParseArgs(task *core.Task) error
ParseArgs 解析任务参数
func (*MessageRunner) SetApiserver ¶
func (r *MessageRunner) SetApiserver(apiserver core.Apiserver)
SetApiserver 设置API Server客户端(依赖注入)
type MessageSender ¶
type MessageSender interface {
// Send 发送消息
//
// 参数:
// - ctx: 上下文(支持超时和取消)
// - cred: 凭证信息(已解密)
// - config: 消息配置
// - logChan: 日志通道
//
// 返回:
// - *core.Result: 执行结果
// - error: 错误信息
Send(ctx context.Context, cred *core.Credential, config MessageConfig, logChan chan<- string) (*core.Result, error)
}
MessageSender 消息发送器接口
不同的消息类型实现各自的发送逻辑
type ScriptConfig ¶
type ScriptConfig struct {
Language string `json:"language"` // 必填:python/nodejs/shell
Type string `json:"type"` // 必填:file(文件)或 inline(内联)
File string `json:"file"` // type=file时必填:脚本文件绝对路径
Code string `json:"code"` // type=inline时必填:脚本内容
Args []string `json:"args"` // 选填:脚本参数
Interpreter string `json:"interpreter"` // 选填:自定义解释器路径,留空则使用默认
}
ScriptConfig 脚本配置(v1.0 - 标准版)
支持文件模式和内联模式,但不支持依赖管理(保持简单)
type ScriptRunner ¶
type ScriptRunner struct {
BaseRunner // 🔥 嵌入基类
// contains filtered or unexported fields
}
ScriptRunner 脚本执行器(v1.0 - 标准版)
用于执行 Python、Node.js、Shell 等脚本语言
核心特性:
- 支持文件模式:执行已存在的脚本文件
- 支持内联模式:将代码保存为临时文件执行
- 支持参数传递:通过命令行参数传递
- 支持环境变量:从 metadata 注入
- 支持工作目录:从 metadata 设置
- 不支持依赖管理(v1.0),可在 setup 中处理
安全措施:
- 文件路径白名单验证(配置中设置)
- 内联代码长度限制(10KB)
- 临时文件自动清理
- 超时控制(使用 Task 的 Timeout)
type SecurityConfig ¶
type SecurityConfig struct {
// 是否启用安全检查
Enabled bool `json:"enabled"`
// 允许的命令白名单(如果为空则允许所有)
AllowedCommands []string `json:"allowed_commands"`
// 禁止的命令黑名单
BlockedCommands []string `json:"blocked_commands"`
// 禁止的命令模式(正则表达式)
BlockedPatterns []string `json:"blocked_patterns"`
// 禁止的路径模式
BlockedPaths []string `json:"blocked_paths"`
// 最大参数长度
MaxArgsLength int `json:"max_args_length"`
}
SecurityConfig 安全配置
func DefaultSecurityConfig ¶
func DefaultSecurityConfig() *SecurityConfig
DefaultSecurityConfig 默认安全配置
type WechatWorkBotSender ¶
type WechatWorkBotSender struct {
// contains filtered or unexported fields
}
WechatWorkBotSender 企业微信群机器人发送器
用于通过群机器人 Webhook 发送消息 API文档: https://developer.work.weixin.qq.com/document/path/91770
func (*WechatWorkBotSender) Send ¶
func (s *WechatWorkBotSender) Send(ctx context.Context, cred *core.Credential, config MessageConfig, logChan chan<- string) (*core.Result, error)
Send 发送企业微信群机器人消息
type WechatWorkSender ¶
type WechatWorkSender struct {
// contains filtered or unexported fields
}
WechatWorkSender 企业微信应用消息发送器
用于通过企业微信应用发送消息到成员/部门/标签 API文档: https://developer.work.weixin.qq.com/document/path/90236
func (*WechatWorkSender) Send ¶
func (s *WechatWorkSender) Send(ctx context.Context, cred *core.Credential, config MessageConfig, logChan chan<- string) (*core.Result, error)
Send 发送企业微信应用消息