gotaskflow

package module
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2025 License: Apache-2.0 Imports: 14 Imported by: 1

README

Go-Taskflow

codecov Go Reference Go Report Card Mentioned in Awesome Go

go-taskflow

A General-purpose Task-parallel Programming Framework for Go, inspired by taskflow-cpp, with Go's native capabilities and simplicity, suitable for complex dependency management in concurrent tasks.

Feature

  • High extensibility: Easily extend the framework to adapt to various specific use cases.

  • Native Go's concurrency model: Leverages Go's goroutines to manage concurrent task execution effectively.

  • User-friendly programming interface: Simplify complex task dependency management using Go.

  • Static\Subflow\Conditional\Cyclic tasking: Define static tasks, condition nodes, nested subflows and cyclic flow to enhance modularity and programmability.

    Static Subflow Condition Cyclic
  • Priority Task Schedule: Define tasks' priority, higher priority tasks will be scheduled first.

  • Built-in visualization & profiling tools: Generate visual representations of tasks and profile task execution performance using integrated tools, making debugging and optimization easier.

Use Cases

  • Data Pipeline: Orchestrate data processing stages that have complex dependencies.

  • AI Agent Workflow Automation: Define and run AI Agent automation workflows where tasks have a clear sequence and dependency structure.

  • Parallel Graph Tasking: Execute Graph-based tasks concurrently to fully utilize CPU resources.

Example

import latest version: go get -u github.com/noneback/go-taskflow

package main

import (
	"fmt"
	"log"
	"math/rand"
	"os"
	"slices"
	"strconv"
	"sync"

	gtf "github.com/noneback/go-taskflow"
)

// merge sorted src to sorted dest
func mergeInto(dest, src []int) []int {
	size := len(dest) + len(src)
	tmp := make([]int, 0, size)
	i, j := 0, 0
	for i < len(dest) && j < len(src) {
		if dest[i] < src[j] {
			tmp = append(tmp, dest[i])
			i++
		} else {
			tmp = append(tmp, src[j])
			j++
		}
	}

	if i < len(dest) {
		tmp = append(tmp, dest[i:]...)
	} else {
		tmp = append(tmp, src[j:]...)
	}

	return tmp
}
func main() {
	size := 100
	radomArr := make([][]int, 10)
	sortedArr := make([]int, 0, 10*size)
	mutex := &sync.Mutex{}

	for i := 0; i < 10; i++ {
		for j := 0; j < size; j++ {
			radomArr[i] = append(radomArr[i], rand.Int())
		}
	}

	sortTasks := make([]*gtf.Task, 10)
	tf := gtf.NewTaskFlow("merge sort")
	done := tf.NewTask("Done", func() {
		if !slices.IsSorted(sortedArr) {
			log.Fatal("Failed")
		}
		fmt.Println("Sorted")
		fmt.Println(sortedArr[:1000])
	})

	for i := 0; i < 10; i++ {
		sortTasks[i] = tf.NewTask("sort_"+strconv.Itoa(i), func() {
			arr := radomArr[i]
			slices.Sort(arr)
			mutex.Lock()
			defer mutex.Unlock()
			sortedArr = mergeInto(sortedArr, arr)
		})

	}
	done.Succeed(sortTasks...)

	executor := gtf.NewExecutor(1000)

	executor.Run(tf).Wait()

	if err := tf.Dump(os.Stdout); err != nil {
		log.Fatal("V->", err)
	}

	if err := executor.Profile(os.Stdout); err != nil {
		log.Fatal("P->", err)
	}

}

more code examples

Benchmark

We provide a basic benchmark to give a rough estimate of performance. However, most realistic workloads are I/O-bound, and their performance cannot be accurately reflected by the benchmark results. So, don’t take it too seriously.

If you really care about CPU Performance, we strongly recommend taskflow-cpp.

goos: linux
goarch: amd64
pkg: github.com/noneback/go-taskflow/benchmark
cpu: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz
BenchmarkC32-4             17964             68105 ns/op            7368 B/op        226 allocs/op
BenchmarkS32-4              5848            195952 ns/op            6907 B/op        255 allocs/op
BenchmarkC6-4              53138             22913 ns/op            1296 B/op         46 allocs/op
BenchmarkC8x8-4             6099            194579 ns/op           16956 B/op        503 allocs/op
PASS
ok      github.com/noneback/go-taskflow/benchmark       5.802s

Understand Condition Task Correctly

Condition Node is special in taskflow-cpp. It not only enrolls in Condition Control but also in Looping.

Our repo keeps almost the same behavior. You should read ConditionTasking to avoid common pitfalls.

Error Handling in go-taskflow

errors in golang are values. It is the user's job to handle it correctly.

Only unrecovered panic needs to be addressed by the framework. Now, if it happens, the whole parent graph will be canceled, leaving the rest tasks undone. This behavior may evolve someday. If you have any good thoughts, feel free to let me know.

If you prefer not to interrupt the whole taskflow when panics occur, you can also handle panics manually while registering tasks. Eg:

tf.NewTask("not interrupt", func() {
	defer func() {
		if r := recover(); r != nil {
			// deal with it.
		}
	}()
	// user functions.
)

How to use visualize taskflow

if err := tf.Dump(os.Stdout); err != nil {
		log.Fatal(err)
}

tf.Dump generates raw strings in dot format, use dot to draw a Graph svg.

dot

How to use profile taskflow

if err :=exector.Profile(os.Stdout);err != nil {
		log.Fatal(err)
}

Profile generates raw strings in flamegraph format, use flamegraph to draw a flamegraph svg.

flg

Stargazer

Star History Chart

Documentation

Index

Constants

View Source
const (
	HIGH = TaskPriority(iota + 1)
	NORMAL
	LOW
)

Variables

View Source
var ErrVisualizerNotSupport = errors.New("visualization not support")

Functions

This section is empty.

Types

type Condition added in v0.0.5

type Condition struct {
	// contains filtered or unexported fields
}

Condition Wrapper

type Executor

type Executor interface {
	Wait()                     // Wait block until all tasks finished
	Profile(w io.Writer) error // Profile write flame graph raw text into w
	Run(tf *TaskFlow) Executor // Run start to schedule and execute taskflow
}

Executor schedule and execute taskflow

func NewExecutor

func NewExecutor(concurrency uint) Executor

NewExecutor return a Executor with a specified max goroutine concurrency(recommend a value bigger than Runtime.NumCPU, **MUST** bigger than num(subflows). )

type Static

type Static struct {
	// contains filtered or unexported fields
}

Static Wrapper

type Subflow

type Subflow struct {
	// contains filtered or unexported fields
}

Subflow Wrapper

func (*Subflow) NewCondition added in v0.1.2

func (sf *Subflow) NewCondition(name string, predict func() uint) *Task

NewCondition returns a condition task. The predict func return value determines its successor.

func (*Subflow) NewSubflow added in v0.1.2

func (sf *Subflow) NewSubflow(name string, f func(sf *Subflow)) *Task

NewSubflow returns a subflow task

func (*Subflow) NewTask added in v0.1.2

func (sf *Subflow) NewTask(name string, f func()) *Task

NewStaticTask returns a static task

type Task

type Task struct {
	// contains filtered or unexported fields
}

Basic component of Taskflow

func (*Task) Name

func (t *Task) Name() string

func (*Task) Precede

func (t *Task) Precede(tasks ...*Task)

Precede: Tasks all depend on *this*. In Addition, order of tasks is correspond to predict result, ranging from 0...len(tasks)

func (*Task) Priority added in v0.0.9

func (t *Task) Priority(p TaskPriority) *Task

Priority sets task's sche priority. Noted that due to goroutine concurrent mode, it can only assure task schedule priority, rather than its execution.

func (*Task) Succeed

func (t *Task) Succeed(tasks ...*Task)

Succeed: *this* deps on tasks

type TaskFlow

type TaskFlow struct {
	// contains filtered or unexported fields
}

TaskFlow represents a series of tasks

func NewTaskFlow

func NewTaskFlow(name string) *TaskFlow

NewTaskFlow returns a taskflow struct

func (*TaskFlow) Dump added in v0.1.3

func (tf *TaskFlow) Dump(writer io.Writer) error

Dump writes graph dot data into writer

func (*TaskFlow) Name

func (tf *TaskFlow) Name() string

func (*TaskFlow) NewCondition added in v0.1.2

func (tf *TaskFlow) NewCondition(name string, predict func() uint) *Task

NewCondition returns a attached condition task. NOTICE: The predict func return value determines its successor.

func (*TaskFlow) NewSubflow added in v0.1.2

func (tf *TaskFlow) NewSubflow(name string, instantiate func(sf *Subflow)) *Task

NewSubflow returns a attached subflow task NOTICE: instantiate will be invoke only once to instantiate itself

func (*TaskFlow) NewTask added in v0.1.2

func (tf *TaskFlow) NewTask(name string, f func()) *Task

NewStaticTask returns a attached static task

func (*TaskFlow) Reset

func (tf *TaskFlow) Reset()

Reset resets taskflow

type TaskPriority added in v0.0.9

type TaskPriority uint

Task sche priority

type Visualizer

type Visualizer interface {
	// Visualize generate raw dag text in dot format and write to writer
	Visualize(tf *TaskFlow, writer io.Writer) error
}

Directories

Path Synopsis
examples
conditional command
fibonacci command
loop command
simple command
word_count command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL