workflow

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2025 License: MIT Imports: 0 Imported by: 0

README

Simple Workflow

Go Version License Go Report Card

一个轻量级、易用的 Go 工作流编排引擎,支持复杂的任务流程管理和持久化。

✨ 特性

  • 🚀 简单易用:清晰的 API 设计,快速上手
  • 🔄 灵活编排:支持顺序、分支、并行等多种工作流模式
  • 异步支持:内置异步任务和状态检查机制
  • 💾 数据持久化:支持 GORM,可使用 MySQL、PostgreSQL、SQLite 等数据库
  • 🔒 并发安全:支持本地锁和分布式锁(Redis)
  • 📊 状态管理:完善的工作流和任务状态跟踪
  • 🔌 易于集成:零依赖核心,可与任何 Go 项目集成

📦 模块说明

公共 API(推荐使用)✅
# 稳定的公共 API,可安全导入
go get github.com/blingmoon/simple-workflow/workflow

推荐导入

  • github.com/blingmoon/simple-workflow/workflow - 核心工作流引擎
内部模块(受保护)🔒

项目使用 Go 的 internal 机制保护内部实现:

  • 无法导入: github.com/blingmoon/simple-workflow/internal/tests - 内部测试模块
  • 无法导入: github.com/blingmoon/simple-workflow/internal/examples - 内部示例模块

说明

  • Go 编译器会阻止外部项目导入 internal/ 下的包(编译错误:use of internal package not allowed
  • 示例代码位于 internal/examples/,可以复制到你的项目中使用
  • 测试代码位于 internal/tests/,仅用于项目内部质量保证

📦 安装

go get github.com/blingmoon/simple-workflow

🚀 快速开始

1. 初始化工作流服务
package main

import (
    "github.com/blingmoon/simple-workflow/workflow"
    "gorm.io/driver/sqlite"
    "gorm.io/gorm"
)

func main() {
    // 1. 初始化数据库
    db, err := gorm.Open(sqlite.Open("workflow.db"), &gorm.Config{})
    if err != nil {
        panic(err)
    }
    
    // 2. 自动迁移表结构
    db.AutoMigrate(&workflow.WorkflowInstancePo{}, &workflow.WorkflowTaskInstancePo{})
    
    // 3. 创建工作流服务
    workflowRepo := workflow.NewWorkflowRepo(db)
    workflowLock := workflow.NewLocalWorkflowLock()  // 本地锁,或使用 Redis 分布式锁
    workflowService := workflow.NewWorkflowService(workflowRepo, workflowLock)
}
2. 定义工作流配置
import (
    "context"
    "encoding/json"
)

// 定义工作流结构:提交 -> 审核 -> 批准
workflowConfigJSON := `{
    "id": "approval_workflow",
    "name": "审批工作流",
    "nodes": [
        {
            "id": "submit",
            "name": "提交申请",
            "next_nodes": ["review"]
        },
        {
            "id": "review",
            "name": "审核",
            "next_nodes": ["approve"]
        },
        {
            "id": "approve",
            "name": "批准",
            "next_nodes": []
        }
    ]
}`

// 加载配置
workflowConfig := &workflow.WorkflowConfig{}
json.Unmarshal([]byte(workflowConfigJSON), workflowConfig)
workflow.LoadWorkflowConfig(workflowConfig)
3. 注册任务处理器
// 注册"提交"任务
workflow.RegisterWorkflowTask("approval_workflow", "submit", 
    workflow.NewNormalTaskWorker(
        // Run 函数:同步执行
        func(ctx context.Context, nodeContext *workflow.JSONContext) error {
            // 读取数据
            orderID, _ := nodeContext.GetString("workflow_context", "order_id")
            
            // 处理业务逻辑
            // ...
            
            // 写入数据
            nodeContext.Set([]string{"submit_time"}, time.Now().Unix())
            nodeContext.Set([]string{"status"}, "submitted")
            
            return nil
        },
        // AsynchronousWaitCheck 函数:异步检查(可选,传 nil 表示同步任务)
        nil,
    ),
)

// 注册"审核"任务(带异步检查)
workflow.RegisterWorkflowTask("approval_workflow", "review",
    workflow.NewNormalTaskWorker(
        func(ctx context.Context, nodeContext *workflow.JSONContext) error {
            // 启动审核流程
            nodeContext.Set([]string{"review_status"}, "pending")
            return nil
        },
        // 异步检查:等待外部审核结果
        func(ctx context.Context, nodeContext *workflow.JSONContext) error {
            status, _ := nodeContext.GetString("review_status")
            if status != "approved" {
                // 返回错误表示还未完成,工作流会稍后重试
                return errors.New("waiting for approval")
            }
            return nil  // 返回 nil 表示异步任务完成
        },
    ),
)

// 注册"批准"任务
workflow.RegisterWorkflowTask("approval_workflow", "approve",
    workflow.NewNormalTaskWorker(
        func(ctx context.Context, nodeContext *workflow.JSONContext) error {
            nodeContext.Set([]string{"final_status"}, "approved")
            return nil
        },
        nil,
    ),
)
4. 创建和运行工作流实例
// 创建工作流实例
workflowInstance, err := workflowService.CreateWorkflow(context.Background(), 
    &workflow.CreateWorkflowReq{
        WorkflowType: "approval_workflow",
        BusinessID:   "ORDER-001",
        Context: map[string]any{
            "order_id": "ORDER-001",
            "amount":   1000.00,
            "user_id":  "user123",
        },
    },
)

// 运行工作流
err = workflowService.RunWorkflow(context.Background(), workflowInstance.ID)
5. 处理异步任务

对于包含异步任务的工作流,需要定期调用 RunWorkflow 来检查异步任务状态:

// 第一次运行:执行所有同步任务,启动异步任务
workflowService.RunWorkflow(ctx, workflowInstance.ID)

// 等待一段时间后,再次运行以检查异步任务
time.Sleep(5 * time.Second)
workflowService.RunWorkflow(ctx, workflowInstance.ID)

// 可以在定时任务或消息队列中定期调用

📖 核心概念

工作流配置

工作流由多个任务节点组成,每个节点可以指定下一个要执行的节点:

type WorkflowConfig struct {
    ID    string                   `json:"id"`     // 工作流类型 ID
    Name  string                   `json:"name"`   // 工作流名称
    Nodes []*NodeDefinitionConfig  `json:"nodes"`  // 任务节点列表
}

type NodeDefinitionConfig struct {
    ID            string   `json:"id"`              // 节点 ID
    Name          string   `json:"name"`            // 节点名称
    NextNodes     []string `json:"next_nodes"`      // 下一个节点列表(空表示结束)
    FailMaxCount  int      `json:"fail_max_count"`  // 最大失败次数
    MaxWaitTimeTs int64    `json:"max_wait_time_ts"` // 最大等待时间(秒)
}
任务处理器

任务处理器包含两个函数:

  1. Run 函数(必需):同步执行任务逻辑
  2. AsynchronousWaitCheck 函数(可选):异步检查任务是否完成
type WorkflowTaskNodeWorker interface {
    Run(ctx context.Context, nodeContext *JSONContext) error
    AsynchronousWaitCheck(ctx context.Context, nodeContext *JSONContext) error
}
JSONContext 数据操作

JSONContext 提供了便捷的方法来读写任务上下文数据:

// 写入数据
nodeContext.Set([]string{"user", "name"}, "Alice")
nodeContext.Set([]string{"status"}, "completed")

// 读取数据
name, ok := nodeContext.GetString("user", "name")
timestamp, ok := nodeContext.GetInt64("created_at")
success, ok := nodeContext.GetBool("is_success")

// 访问工作流全局上下文
orderID, ok := nodeContext.GetString("workflow_context", "order_id")
特殊错误类型

工作流引擎提供了一些特殊的错误类型,用于精确控制工作流的执行行为:

1. ErrorWorkflowTaskInstanceNotReady(任务未准备好)

用途:表示任务当前阶段还没有准备好,需要稍后重试。

场景

  • 等待外部审核结果
  • 等待第三方 API 响应
  • 等待定时任务触发

示例

workflow.RegisterWorkflowTask("approval_workflow", "review",
    workflow.NewNormalTaskWorker(
        func(ctx context.Context, nodeContext *workflow.JSONContext) error {
            // 发起审核请求
            nodeContext.Set([]string{"review_status"}, "pending")
            return nil
        },
        func(ctx context.Context, nodeContext *workflow.JSONContext) error {
            // 异步检查审核结果
            status, _ := nodeContext.GetString("review_status")
            if status == "pending" {
                // 返回此错误,工作流会保持任务为 pending 状态,稍后重试
                return workflow.ErrorWorkflowTaskInstanceNotReady
            }
            return nil  // 审核完成,继续执行
        },
    ),
)

行为

  • 任务保持 pending 状态
  • 工作流继续运行,但不会推进此任务
  • 下次运行工作流时会重新检查
2. ErrorWorkflowTaskFailedWithContinue(失败但继续)

用途:任务失败,但不影响工作流继续执行,可以当作另一种形式的"完成"。

场景

  • 非关键的通知任务失败(如发送邮件)
  • 可选的数据采集任务
  • 降级场景处理

示例

workflow.RegisterWorkflowTask("order_workflow", "send_notification",
    workflow.NewNormalTaskWorker(
        func(ctx context.Context, nodeContext *workflow.JSONContext) error {
            // 尝试发送通知
            err := sendEmail(...)
            if err != nil {
                // 发送失败,但不影响订单流程
                nodeContext.Set([]string{"notification_sent"}, false)
                return workflow.ErrorWorkflowTaskFailedWithContinue
            }
            nodeContext.Set([]string{"notification_sent"}, true)
            return nil
        },
        nil,
    ),
)

行为

  • 任务标记为完成(虽然失败了)
  • 工作流继续执行后续节点
  • 可以通过上下文查看任务实际执行结果
3. ErrWorkflowTaskFailedWithFailed(失败并终止)

用途:任务失败,整个工作流应该终止,状态变为 failed

场景

  • 关键参数缺失或无效
  • 不可恢复的业务错误
  • 数据一致性检查失败

示例

workflow.RegisterWorkflowTask("payment_workflow", "validate_account",
    workflow.NewNormalTaskWorker(
        func(ctx context.Context, nodeContext *workflow.JSONContext) error {
            accountID, ok := nodeContext.GetString("workflow_context", "account_id")
            if !ok || accountID == "" {
                // 账户ID缺失,无法继续,终止工作流
                return workflow.ErrWorkflowTaskFailedWithFailed
            }
            
            // 验证账户
            if !isValidAccount(accountID) {
                // 账户无效,终止工作流
                return workflow.ErrWorkflowTaskFailedWithFailed
            }
            
            return nil
        },
        nil,
    ),
)

行为

  • 任务标记为 failed
  • 工作流状态变为 failed
  • 停止执行后续节点
  • 需要人工介入或重启工作流
4. ErrWorkBussinessCriticalError(业务严重错误)

用途:用于标识需要人工介入的严重业务错误,通常用于报警和监控。

场景

  • 数据不一致
  • 重要业务流程异常
  • 需要立即处理的错误

示例

import "github.com/pkg/errors"

workflow.RegisterWorkflowTask("reconciliation_workflow", "check_balance",
    workflow.NewNormalTaskWorker(
        func(ctx context.Context, nodeContext *workflow.JSONContext) error {
            expected := getExpectedBalance()
            actual := getActualBalance()
            
            if expected != actual {
                // 余额不一致,严重错误,需要报警
                return errors.Wrapf(
                    workflow.ErrWorkBussinessCriticalError,
                    "balance mismatch: expected=%f, actual=%f",
                    expected, actual,
                )
            }
            return nil
        },
        nil,
    ),
)

行为

  • 任务失败
  • 日志记录为 ERROR 级别
  • 触发报警系统(需要在调度系统中配置)
  • 可以通过 errors.Is() 识别此类错误
5. ErrWorkBussinessWarningError(业务警告错误)

用途:用于标识需要关注但不严重的业务错误,记录为警告级别。

场景

  • 降级服务使用
  • 性能指标异常
  • 非关键功能异常

示例

workflow.RegisterWorkflowTask("order_workflow", "calculate_discount",
    workflow.NewNormalTaskWorker(
        func(ctx context.Context, nodeContext *workflow.JSONContext) error {
            discount, err := getDiscountFromService()
            if err != nil {
                // 折扣服务失败,使用默认值,记录警告
                discount = 0.0
                nodeContext.Set([]string{"discount"}, discount)
                return errors.Wrapf(
                    workflow.ErrWorkBussinessWarningError,
                    "discount service failed, using default: %v",
                    err,
                )
            }
            nodeContext.Set([]string{"discount"}, discount)
            return nil
        },
        nil,
    ),
)

行为

  • 任务失败(或根据配置决定)
  • 日志记录为 WARN 级别
  • 可用于监控趋势
  • 不会触发紧急报警
错误处理最佳实践
// 1. 检查特定错误类型
if errors.Is(err, workflow.ErrorWorkflowTaskInstanceNotReady) {
    // 任务未准备好,继续等待
}

// 2. 包装业务错误
if criticalError {
    return errors.Wrapf(
        workflow.ErrWorkBussinessCriticalError,
        "详细错误信息: %v", err,
    )
}

// 3. 根据场景选择合适的错误
func handleTask(ctx context.Context, nodeContext *workflow.JSONContext) error {
    if missingRequiredData {
        // 缺少必需数据,终止工作流
        return workflow.ErrWorkflowTaskFailedWithFailed
    }
    
    if externalServiceNotReady {
        // 外部服务未准备好,稍后重试
        return workflow.ErrorWorkflowTaskInstanceNotReady
    }
    
    if optionalFeatureFailed {
        // 可选功能失败,继续执行
        return workflow.ErrorWorkflowTaskFailedWithContinue
    }
    
    return nil  // 成功
}

🗄️ 数据持久化

使用不同的数据库
// SQLite
import "gorm.io/driver/sqlite"
db, _ := gorm.Open(sqlite.Open("workflow.db"), &gorm.Config{})

// MySQL
import "gorm.io/driver/mysql"
dsn := "user:pass@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4"
db, _ := gorm.Open(mysql.Open(dsn), &gorm.Config{})

// PostgreSQL
import "gorm.io/driver/postgres"
dsn := "host=localhost user=gorm password=gorm dbname=gorm port=9920"
db, _ := gorm.Open(postgres.Open(dsn), &gorm.Config{})
数据表结构

工作流引擎会自动创建两张表:

  • workflow_instance:工作流实例表
  • task_instance:任务实例表

🔒 并发控制

本地锁(单机)
workflowLock := workflow.NewLocalWorkflowLock()
Redis 分布式锁(多机)
import "github.com/redis/go-redis/v9"

redisClient := redis.NewClient(&redis.Options{
    Addr: "localhost:6379",
})

workflowLock := workflow.NewRedisWorkflowLock(redisClient)

📊 工作流状态

工作流实例状态
  • init - 初始化
  • running - 运行中
  • completed - 已完成
  • failed - 失败
  • cancelled - 已取消
任务节点状态
  • running - 运行中
  • pending - 等待中(异步任务)
  • finishing - 完成中
  • completed - 已完成
  • failed - 失败
  • cancelled - 已取消

🎯 高级功能

重启失败的任务
// 重启指定任务节点
err := workflowService.RestartWorkflowNode(ctx, &workflow.RestartWorkflowNodeParams{
    WorkflowInstanceID: instanceID,
    TaskType:          "review",
    IsAsynchronous:    false,  // 是否异步重启
})

// 重启整个工作流实例
err := workflowService.RestartWorkflowInstance(ctx, &workflow.RestartWorkflowParams{
    WorkflowInstanceID: instanceID,
    Context:           newContext,  // 可选:更新上下文
    IsRun:             true,        // 是否立即运行
})
添加外部事件
// 为任务节点添加外部事件(如审核结果)
err := workflowService.AddNodeExternalEvent(ctx, &workflow.AddNodeExternalEventParams{
    WorkflowInstanceID: instanceID,
    TaskType:          "review",
    NodeEvent: &workflow.NodeEvent{
        EventTs:      time.Now().Unix(),
        EventContent: map[string]any{"result": "approved"},
    },
})
查询工作流状态
// 查询工作流实例详情
details, err := workflowService.QueryWorkflowInstanceDetail(ctx, 
    &workflow.QueryWorkflowInstanceParams{
        WorkflowInstanceID: &instanceID,
    },
)

// 统计工作流实例数量
count, err := workflowService.CountWorkflowInstance(ctx, 
    &workflow.QueryWorkflowInstanceParams{
        WorkflowType: "approval_workflow",
        Status:       "running",
    },
)

📚 完整示例

查看 examples/with-sqlite 目录获取完整的可运行示例:

# 运行完整示例
cd examples/with-sqlite
go run main.go

# 运行测试
go test -v

示例包含:

  • ✅ 基础工作流创建和执行
  • ✅ 异步任务处理
  • ✅ 多分支工作流
  • ✅ 批量创建工作流实例
  • ✅ 数据持久化和查询

🧪 测试

# 运行所有测试
go test ./...

# 查看测试覆盖率
go test -cover ./...

# 运行特定包的测试
go test ./workflow -v

🤝 贡献

欢迎提交 Issue 和 Pull Request!

📄 许可证

本项目采用 MIT 许可证 - 详见 LICENSE 文件。

🔗 相关链接

Documentation

Overview

Package workflow 提供工作流编排功能。

这个包可以帮助你构建和管理任务流程。

基础使用示例:

package main

import (
    workflow "github.com/blingmoon/simple-workflow"
)

func main() {
    wf := workflow.New("my-workflow")
    if err := wf.Run(); err != nil {
        panic(err)
    }
}

更多示例和文档请访问: https://github.com/blingmoon/simple-workflow

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL