flowx

package module
v0.0.0-...-c693505 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: MIT Imports: 16 Imported by: 0

README

中文文档

FlowX

FlowX Icon

A flexible and extensible pipeline execution library for Go, supporting multiple execution backends and DAG-based workflow orchestration.

Go Version License

Features

  • DAG-Based Workflow: Define complex pipelines using Directed Acyclic Graph (DAG) structure with Mermaid syntax
  • Cyclic Graph Support: Conditional back-edges for controlled loops with iteration counter
  • Multi-Backend Execution: Support for Local, Docker, and Kubernetes executors
  • Concurrent Execution: Independent tasks run in parallel for optimal performance
  • Conditional Edges: Dynamic execution paths with template-based condition expressions
  • Event-Driven Architecture: Monitor pipeline lifecycle through event listeners
  • Template Engine: Dynamic configuration rendering with Pongo2 templates
  • Metadata Management: Process-safe metadata storage and retrieval
  • Log Streaming: Real-time log output with customizable log pushing
  • Output Extraction: Extract structured data from command output using codec-block or regex patterns
  • Runtime Recovery: Resume pipeline execution from saved state
  • Pause & Resume: Pause running pipelines and modify graph structure during pause
  • Dynamic Graph Modification: Add/remove nodes and edges at runtime
  • Data Passing: Share data between nodes using metadata

Installation

go get github.com/LerkoX/flowx

Quick Start

package main

import (
    "context"
    "fmt"
    "github.com/LerkoX/flowx"
)

func main() {
    ctx := context.Background()

    // Create runtime
    runtime := flowx.NewRuntime(ctx)

    // Pipeline configuration
    config := `
Version: "1.0"
Name: example-pipeline

Executors:
  local:
    type: local
    config:
      shell: bash

Graph: |
  stateDiagram-v2
    [*] --> Build
    Build --> Test
    Test --> [*]

Nodes:
  Build:
    executor: local
    steps:
      - name: build
        run: echo "Building..."
  Test:
    executor: local
    steps:
      - name: test
        run: echo "Testing..."
`

    // Execute pipeline synchronously
    pipeline, err := runtime.RunSync(ctx, "pipeline-1", config, nil)
    if err != nil {
        fmt.Printf("Pipeline failed: %v\n", err)
        return
    }

    fmt.Println("Pipeline completed successfully!")
}

Output Extraction

FlowX supports extracting structured data from command output and saving it to pipeline metadata for use in subsequent nodes. Extracted data includes field descriptions (from comments) and source node tracking.

Codec-Block Extraction

Automatically recognizes and parses flowx-yaml code blocks:

Nodes:
  Build:
    executor: local
    extract:
      type: codec-block
      maxOutputSize: 1048576  # Optional, default 1MB
    steps:
      - name: build
        run: |
          echo "Building..."
          echo '```flowx-yaml'
          echo 'buildId: "12345"  # 构建ID'
          echo 'version: "1.0.0"  # 版本号'
          echo '```'

This extracts buildId and version from output with descriptions, and makes them available as {{ .Metadata.Build.buildId }} and {{ .Metadata.Build.version }}.

Regex Extraction

Extract data using regular expressions:

Nodes:
  Test:
    executor: local
    extract:
      type: regex
      patterns:
        coverage: "coverage: (\\d+\\.\\d+)%"
        tests: "(\\d+) tests? passed"
      maxOutputSize: 524288
    steps:
      - name: test
        run: go test -cover

This extracts test coverage and count from command output.

Configuration

FlowX uses YAML configuration with the following structure:

Version: "1.0"              # Configuration version
Name: my-pipeline           # Pipeline name

Metadate:                   # Metadata configuration
  type: in-config           # Store type: in-config, redis, http
  data:
    key: value

Param:                      # Pipeline parameters
  buildId: "123"
  branch: "main"

Executors:                  # Global executor definitions
  local:
    type: local
    config:
      shell: bash
      workdir: /tmp

  docker:
    type: docker
    config:
      registry: docker.io
      network: host
      volumes:
        - /var/run/docker.sock:/var/run/docker.sock

Graph: |                    # DAG definition (Mermaid stateDiagram-v2)
  stateDiagram-v2
    [*] --> Build
    Build --> Test
    Test --> Deploy
    Deploy --> [*]

Nodes:                      # Node definitions
  Build:
    executor: local
    steps:
      - name: build
        run: go build .

  Test:
    executor: docker
    image: golang:1.21
    steps:
      - name: test
        run: go test ./...

Executors

Local Executor

Executes commands on the local machine.

Executors:
  local:
    type: local
    config:
      shell: bash          # Shell to use (bash, sh, zsh)
      workdir: /tmp        # Working directory
      env:                 # Environment variables
        KEY: value
      ptimeout: "30s"      # Command timeout
      pty: true            # Enable PTY for interactive programs
Docker Executor

Executes commands inside Docker containers.

Executors:
  docker:
    type: docker
    config:
      registry: docker.io   # Image registry
      network: host         # Network mode
      workdir: /app         # Container working directory
      tty: true             # Enable TTY
      ttyWidth: 120         # TTY width
      ttyHeight: 40         # TTY height
      volumes:              # Volume mounts
        - /host/path:/container/path
      env:                  # Environment variables
        GO_VERSION: "1.21"
Kubernetes Executor

Executes commands inside Kubernetes pods.

Executors:
  k8s:
    type: k8s
    config:
      namespace: default
      serviceAccount: pipeline-sa
      podReadyTimeout: "60s"  # Pod ready timeout

Conditional Edges

Define conditional execution paths using template expressions:

Graph: |
  stateDiagram-v2
    [*] --> Build
    Build --> Deploy: {{ Param.branch == "main" }}
    Build --> Test: {{ Param.branch != "main" }}
    Test --> [*]
    Deploy --> [*]

Complex conditions are supported:

# Multiple conditions
QualityCheck --> DeployStaging: {{ QualityCheck.allTestsPassed == true and QualityCheck.codeCoverage >= 80 }}

# Nested conditions
Deploy --> Production: {{ Param.environment == "production" and ManualApproval.approved == true }}

Cyclic Graph (Loop Execution)

FlowX supports controlled loops through conditional back-edges. A conditional edge that creates a cycle is accepted as a back-edge, enabling iterative execution.

MaxLoopIterations: 5          # Safety limit (default: 100)

Graph: |
  stateDiagram-v2
    [*] --> A
    A --> B
    B --> C
    C --> A: {{ iteration < 3 }}    # Back-edge: loop 3 times
    C --> D
    D --> [*]

The iteration variable starts at 0 and increments each loop. In this example, nodes A→B→C→D execute 3 times, then the loop exits.

See Conditional Edges for details on back-edges and iteration.

Dynamic Graph Modification

FlowX supports modifying the pipeline graph at runtime. You can add/remove nodes and edges while the pipeline is paused.

Using ModifyGraph (Fine-grained Control)
err := runtime.ModifyGraph(ctx, "pipeline-id", flowx.GraphModifications{
    AddNodes: []flowx.NodeConfig{
        {
            Name: "NewNode",
            Executor: "local",
            Steps: []flowx.Step{
                {Name: "step1", Run: "echo 'New step'"},
            },
        },
    },
    AddEdges: []flowx.EdgeModification{
        {Source: "ExistingNode", Target: "NewNode", Expression: ""},
    },
    RemoveNodes: []string{"UnusedNode"},
    RemoveEdges: []flowx.EdgeID{{Source: "A", Target: "B"}},
})
Using UpdateConfig (Config-based Diff)

Update the pipeline by providing a new YAML configuration. FlowX automatically computes the difference and applies the changes:

newConfig := `
Version: "1.0"
Name: my-pipeline

Graph: |
  stateDiagram-v2
    [*] --> Build
    Build --> Deploy
    Deploy --> [*]

Nodes:
  Build:
    executor: local
    steps:
      - name: build
        run: echo "Building..."
  Deploy:
    executor: local
    steps:
      - name: deploy
        run: echo "Deploying..."
`

err := runtime.UpdateConfig(ctx, "pipeline-id", newConfig)

Rules:

  • Nodes that have already executed cannot be removed or modified
  • New nodes will execute on resume
  • Edges can only be removed if neither endpoint has executed

See doc/runtime.md for detailed API documentation.

Data Passing

Share data between nodes using metadata:

Nodes:
  Generate:
    executor: local
    steps:
      - name: generate
        run: |
          echo '```flowx-yaml'
          echo 'value: 42  # 计算结果'
          echo 'message: "hello world"  # 消息内容'
          echo '```'
    extract:
      type: codec-block

  Process:
    executor: local
    steps:
      - name: process
        run: |
          echo "Processing value: {{ .Metadata.Generate.value }}"
          echo "Message: {{ .Metadata.Generate.message }}"

Runtime Recovery

Resume pipeline execution from saved state:

Nodes:
  Build:
    executor: local
    runtime:                    # Node runtime status for recovery
      status: "SUCCESS"         # Already completed, will be skipped
      startTime: "2026-03-30T10:00:00Z"
      endTime: "2026-03-30T10:01:00Z"
      steps:
        - name: build
          status: "SUCCESS"
          output: "Build completed"
    steps:
      - name: build
        run: echo "Building..."

  Test:
    executor: local
    runtime:                    # Node runtime status for recovery
      status: "PENDING"         # Will execute
    steps:
      - name: test
        run: echo "Testing..."

Event Monitoring

Monitor pipeline execution through event listeners:

listener := flowx.NewListener()
listener.Handle(func(p flowx.Pipeline, event flowx.Event) {
    switch event {
    case flowx.PipelineInit:
        fmt.Println("Pipeline initialized")
    case flowx.PipelineStart:
        fmt.Println("Pipeline started")
    case flowx.PipelineFinish:
        fmt.Println("Pipeline finished")
    case flowx.PipelineExecutorPrepare:
        fmt.Println("Executor preparing")
    case flowx.PipelineExecutorPrepareDone:
        fmt.Println("Executor prepared")
    case flowx.PipelineNodeStart:
        fmt.Println("Node started")
    case flowx.PipelineNodeFinish:
        fmt.Println("Node completed")
    case flowx.PipelineCancelled:
        fmt.Println("Pipeline cancelled")
    case flowx.PipelineStatusUpdate:
        fmt.Println("Pipeline status updated")
    case flowx.PipelinePaused:
        fmt.Println("Pipeline paused")
    case flowx.PipelineResumed:
        fmt.Println("Pipeline resumed")
    case flowx.PipelineGraphModified:
        fmt.Println("Pipeline graph modified")
    }
})

pipeline, err := runtime.RunSync(ctx, "id", config, listener)

Available Events:

Event Description
PipelineInit Pipeline initialized
PipelineStart Pipeline started
PipelineFinish Pipeline finished (success or failure)
PipelineExecutorPrepare Node executor is being prepared
PipelineExecutorPrepareDone Node executor preparation completed
PipelineNodeStart Node execution started
PipelineNodeFinish Node execution finished
PipelineCancelled Pipeline cancelled
PipelineStatusUpdate Pipeline status changed
PipelinePaused Pipeline paused
PipelineResumed Pipeline resumed
PipelineGraphModified Pipeline graph was modified

Architecture

graph TB
    subgraph "Runtime Layer"
        RT[Runtime]
        RTI[Runtime Impl]
    end

    subgraph "Pipeline Core"
        PL[Pipeline]
        PLI[Pipeline Impl]
        NODE[Node]
        EDGE[Edge]
        EVAL[Eval Context]
    end

    subgraph "Configuration"
        CFG[Config]
        TPL[Template Engine]
    end

    subgraph "Metadata"
        MD[Metadata Store]
        EXT[Extractor]
    end

    subgraph "Executor Core"
        EP[Executor Provider]
        EI[Executor Interfaces]
        ADP[Adapter Pattern]
        BRG[Bridge Pattern]
    end

    subgraph "Executor Implementations"
        K8S[K8s Executor]
        DOCKER[Docker Executor]
        LOCAL[Local Executor]
        SSH[SSH Executor]
    end

    subgraph "Utilities"
        LOG[Logger]
        EVT[Event System]
    end

    RT --> RTI
    RTI --> PL
    PL --> PLI
    PLI --> NODE
    PLI --> EDGE
    PLI --> EVAL
    PLI --> EP
    EP --> EI
    EI --> ADP
    EI --> BRG
    ADP --> K8S
    ADP --> DOCKER
    ADP --> LOCAL
    ADP --> SSH
    BRG --> K8S
    BRG --> DOCKER
    BRG --> LOCAL
    BRG --> SSH
    RTI --> CFG
    RTI --> MD
    PLI --> MD
    CFG --> TPL
    NODE --> EXT
    PLI --> EVT
    RTI --> LOG

Examples

See examples/workflows/README.md for detailed workflow examples:

  • File Processing: Automated log archiving and cleanup
  • Data ETL: Parallel data collection and transformation
  • CI/CD Deployment: Complete deployment pipeline with quality gates
  • Weather Notification: Weather API integration with messaging

API Reference

Runtime
type Runtime interface {
    Get(id string) (Pipeline, error)                          // Get pipeline by ID
    Cancel(ctx context.Context, id string) error              // Cancel running pipeline
    RunAsync(ctx context.Context, id string, config string, listener Listener) (Pipeline, error)  // Async execution
    RunSync(ctx context.Context, id string, config string, listener Listener) (Pipeline, error)   // Sync execution
    Rm(id string)                                             // Remove pipeline record
    Done() chan struct{}                                      // Runtime completion signal
    Notify(data interface{}) error                            // Notify runtime
    Ctx() context.Context                                     // Get runtime context
    StopBackground()                                          // Stop background processing
    StartBackground()                                         // Start background processing
    SetPusher(pusher Pusher)                                  // Set log pusher
    SetTemplateEngine(engine TemplateEngine)                  // Set template engine
    GetTemplateEngine() TemplateEngine                        // Get template engine
    ExportConfig(id string) (string, error)                   // Export pipeline config
}
Pipeline
type Pipeline interface {
    Id() string                                               // Get pipeline ID
    GetGraph() Graph                                          // Get DAG graph
    SetGraph(graph Graph)                                     // Set DAG graph
    Status() string                                           // Get pipeline status
    SetMetadata(store MetadataStore)                          // Set metadata store
    Metadata() Metadata                                       // Get pipeline metadata
    Listening(listener Listener)                              // Set event listener
    Done() <-chan struct{}                                    // Pipeline completion signal
    Run(ctx context.Context) error                            // Run pipeline
    Notify()                                                  // Step notifies pipeline
    Cancel()                                                  // Cancel pipeline
    SetExecutorProvider(provider ExecutorProvider)           // Set executor provider
    SetTemplateEngine(engine TemplateEngine)                   // Set template engine
    GetTemplateEngine() TemplateEngine                        // Get template engine
    SetPusher(pusher Pusher)                                  // Set log pusher
    Pause() error                                             // Pause pipeline (waits for current level)
    Resume(ctx context.Context) error                          // Resume paused pipeline
    IsModifiable() bool                                       // Check if graph can be modified
}

Testing

go test ./...

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

MIT License - see LICENSE for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExtractExpression

func ExtractExpression(label string) string

ExtractExpression 从边标签中提取条件表达式(公共函数供测试使用) 使用模板引擎的 Validate 方法验证表达式语法 先检查是否包含模板标记 {{ 或 {%,再使用模板引擎验证

func SetPipelineParam

func SetPipelineParam(pipeline dag.Pipeline, param map[string]interface{})

SetPipelineParam 设置 pipeline 的 param 值(内部使用)

Types

type Adapter

type Adapter = executor.Adapter

Adapter 适配器接口

type Bridge

type Bridge = executor.Bridge

Bridge 桥接器接口

type Executor

type Executor = executor.Executor

Executor 执行器

type Runtime

type Runtime interface {
	//获取流水线状态
	Get(id string) (dag.Pipeline, error)
	//取消运行中的流水线
	Cancel(ctx context.Context, id string) error
	//执行异步流水线
	RunAsync(ctx context.Context, id string, config string, listener dag.Listener) (dag.Pipeline, error)
	//执行同步流水线
	RunSync(ctx context.Context, id string, config string, listener dag.Listener) (dag.Pipeline, error)
	//移除流水线记录
	Rm(id string)
	//runtime已经执行完成
	Done() chan struct{}
	//通知runtime
	Notify(data interface{}) error
	//反回runtime公共
	Ctx() context.Context
	//停止后台处理
	StopBackground()
	// 启动后台
	StartBackground()
	// 设置日志推送器
	SetPusher(pusher logger.Pusher)
	// 设置模板引擎
	SetTemplateEngine(engine template.TemplateEngine)
	// 获取模板引擎
	GetTemplateEngine() template.TemplateEngine
	// ExportConfig 导出流水线的运行时配置
	// 返回包含当前运行时状态的 YAML 格式配置字符串
	ExportConfig(id string) (string, error)
	// Pause 暂停运行中的流水线
	Pause(ctx context.Context, id string) error
	// Resume 恢复暂停或停止的流水线
	Resume(ctx context.Context, id string) error
	// ModifyGraph 对暂停或停止的流水线执行图修改(原子操作)
	ModifyGraph(ctx context.Context, id string, modifications dag.GraphModifications) error
	// UpdateConfig 通过新的 YAML 配置自动比对差异并更新流水线图
	// 已执行的节点不允许删除或替换,只允许修改尚未运行的节点
	// 除 Nodes 和 Graph 外的其他配置字段不可更新
	UpdateConfig(ctx context.Context, id string, newConfigYAML string) error
	// ListPipelines 列出所有活跃的流水线ID
	ListPipelines() []string
}

Runtime 运行时

func NewRuntime

func NewRuntime(ctx context.Context) Runtime

NewRuntime 创建新的Runtime实例

type RuntimeImpl

type RuntimeImpl struct {
	// contains filtered or unexported fields
}

RuntimeImpl Runtime接口的实现

func (*RuntimeImpl) Cancel

func (r *RuntimeImpl) Cancel(ctx context.Context, id string) error

Cancel 取消运行中的流水线

func (*RuntimeImpl) Ctx

func (r *RuntimeImpl) Ctx() context.Context

Ctx 返回runtime公共上下文

func (*RuntimeImpl) Done

func (r *RuntimeImpl) Done() chan struct{}

Done runtime已经执行完成

func (*RuntimeImpl) ExportConfig

func (r *RuntimeImpl) ExportConfig(id string) (string, error)

ExportConfig 导出流水线的运行时配置 返回包含当前运行时状态的 YAML 格式配置字符串

func (*RuntimeImpl) Get

func (r *RuntimeImpl) Get(id string) (dag.Pipeline, error)

Get 获取流水线状态

func (*RuntimeImpl) GetTemplateEngine

func (r *RuntimeImpl) GetTemplateEngine() template.TemplateEngine

GetTemplateEngine 获取当前使用的模板引擎

func (*RuntimeImpl) ListPipelines

func (r *RuntimeImpl) ListPipelines() []string

ListPipelines 列出所有活跃的流水线ID

func (*RuntimeImpl) ModifyGraph

func (r *RuntimeImpl) ModifyGraph(ctx context.Context, id string, modifications dag.GraphModifications) error

ModifyGraph 对暂停或停止的流水线执行图修改(原子操作)

func (*RuntimeImpl) Notify

func (r *RuntimeImpl) Notify(data interface{}) error

Notify 通知runtime

func (*RuntimeImpl) Pause

func (r *RuntimeImpl) Pause(ctx context.Context, id string) error

Pause 暂停运行中的流水线

func (*RuntimeImpl) Resume

func (r *RuntimeImpl) Resume(ctx context.Context, id string) error

Resume 恢复暂停或停止的流水线

func (*RuntimeImpl) Rm

func (r *RuntimeImpl) Rm(id string)

Rm 移除流水线记录

func (*RuntimeImpl) RunAsync

func (r *RuntimeImpl) RunAsync(ctx context.Context, id string, config string, listener dag.Listener) (dag.Pipeline, error)

RunAsync 执行异步流水线

func (*RuntimeImpl) RunSync

func (r *RuntimeImpl) RunSync(ctx context.Context, id string, config string, listener dag.Listener) (dag.Pipeline, error)

RunSync 执行同步流水线

func (*RuntimeImpl) SetPusher

func (r *RuntimeImpl) SetPusher(pusher logger.Pusher)

SetPusher 设置日志推送器

func (*RuntimeImpl) SetTemplateEngine

func (r *RuntimeImpl) SetTemplateEngine(engine template.TemplateEngine)

SetTemplateEngine 设置模板引擎

func (*RuntimeImpl) StartBackground

func (r *RuntimeImpl) StartBackground()

StartBackground 启动后台处理

func (*RuntimeImpl) StopBackground

func (r *RuntimeImpl) StopBackground()

StopBackground 停止后台处理

func (*RuntimeImpl) UpdateConfig

func (r *RuntimeImpl) UpdateConfig(ctx context.Context, id string, newConfigYAML string) error

UpdateConfig 通过新的 YAML 配置自动比对差异并更新流水线图 已执行的节点不允许删除或替换,只允许修改尚未运行的节点 除 Nodes 和 dag.Graph 外的其他配置字段不可更新

type StepResult

type StepResult = executor.StepResult

StepResult 步骤执行结果

Directories

Path Synopsis
examples
workflows command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL