gotaskflow

package module
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2024 License: Apache-2.0 Imports: 10 Imported by: 1

README

Go-Taskflow

Coverage Go Reference Go Report Card

A static DAG (Directed Acyclic Graph) task computing framework for Go, inspired by taskflow-cpp, with Go's native capabilities and simplicity, suitable for complex dependency management in concurrent tasks.

Feature

  • High extensibility: Easily extend the framework to adapt to various specific use cases.

  • Native Go's concurrency model: Leverages Go's goroutines to manage concurrent task execution effectively.

  • User-friendly programming interface: Simplify complex task dependency management using Go.

  • Static\Subflow\Conditional tasking: Define static tasks, condition nodes, and nested subflows to enhance modularity and programmability.

    Static Subflow Condition
  • Built-in visualization & profiling tools: Generate visual representations of tasks and profile task execution performance using integrated tools, making debugging and optimization easier.

Use Cases

  • Data Pipeline: Orchestrate data processing stages that have complex dependencies.

  • Workflow Automation: Define and run automation workflows where tasks have a clear sequence and dependency structure.

  • Parallel Tasking: Execute independent tasks concurrently to fully utilize CPU resources.

Example

import latest version: go get -u github.com/noneback/go-taskflow

package main

import (
	"fmt"
	"log"
	"os"
	"runtime"
	"time"

	gotaskflow "github.com/noneback/go-taskflow"
)

func main() {
	// 1. Create An executor
	executor := gotaskflow.NewExecutor(uint(runtime.NumCPU() - 1))
	// 2. Prepare all node you want and arrenge their dependencies in a refined DAG
	tf := gotaskflow.NewTaskFlow("G")
	A, B, C :=
		gotaskflow.NewTask("A", func() {
			fmt.Println("A")
		}),
		gotaskflow.NewTask("B", func() {
			fmt.Println("B")
		}),
		gotaskflow.NewTask("C", func() {
			fmt.Println("C")
		})

	A1, B1, C1 :=
		gotaskflow.NewTask("A1", func() {
			fmt.Println("A1")
		}),
		gotaskflow.NewTask("B1", func() {
			fmt.Println("B1")
		}),
		gotaskflow.NewTask("C1", func() {
			fmt.Println("C1")
		})
	A.Precede(B)
	C.Precede(B)
	A1.Precede(B)
	C.Succeed(A1)
	C.Succeed(B1)

	subflow := gotaskflow.NewSubflow("sub1", func(sf *gotaskflow.Subflow) {
		A2, B2, C2 :=
			gotaskflow.NewTask("A2", func() {
				fmt.Println("A2")
			}),
			gotaskflow.NewTask("B2", func() {
				fmt.Println("B2")
			}),
			gotaskflow.NewTask("C2", func() {
				fmt.Println("C2")
			})
		A2.Precede(B2)
		C2.Precede(B2)
		sf.Push(A2, B2, C2)
	})

	subflow2 := gotaskflow.NewSubflow("sub2", func(sf *gotaskflow.Subflow) {
		A3, B3, C3 :=
			gotaskflow.NewTask("A3", func() {
				fmt.Println("A3")
			}),
			gotaskflow.NewTask("B3", func() {
				fmt.Println("B3")
			}),
			gotaskflow.NewTask("C3", func() {
				fmt.Println("C3")
			})
		A3.Precede(B3)
		C3.Precede(B3)
		sf.Push(A3, B3, C3)
	})

	cond := gotaskflow.NewCondition("binary", func() uint {
		return uint(time.Now().Second() % 2)
	})
	B.Precede(cond)
	cond.Precede(subflow, subflow2)

	// 3. Push all node into Taskflow
	tf.Push(A, B, C)
	tf.Push(A1, B1, C1, cond, subflow, subflow2)
	// 4. Run Taskflow via Executor
	executor.Run(tf).Wait()

	// Visualize dag if you need to check dag execution.
	if err := gotaskflow.Visualize(tf, os.Stdout); err != nil {
		log.Fatal(err)
	}
	// Profile it if you need to see which task is most time-consuming
	if err := executor.Profile(os.Stdout); err != nil {
		log.Fatal(err)
	}
}
How to use visualize taskflow
if err := gotaskflow.Visualize(tf, os.Stdout); err != nil {
		log.Fatal(err)
}

Visualize generate raw string in dot format, just use dot to draw a DAG svg.

dot

How to use profile taskflow
if err :=exector.Profile(os.Stdout);err != nil {
		log.Fatal(err)
}

Profile alse generate raw string in flamegraph format, just use flamegraph to draw a flamegraph svg.

flg

What's next

  • Conditional Tasking
  • Task Priority Schedule
  • Taskflow Loop Support

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Visualize added in v0.0.5

func Visualize(tf *TaskFlow, writer io.Writer) error

Visualize generate raw dag text in dot format and write to writer

Types

type Condition added in v0.0.5

type Condition struct {
	// contains filtered or unexported fields
}

Condition Wrapper

type Executor

type Executor interface {
	Wait()                     // Wait block until all tasks finished
	Profile(w io.Writer) error // Profile write flame graph raw text into w
	Run(tf *TaskFlow) Executor // Run start to schedule and execute taskflow
}

Executor schedule and execute taskflow

func NewExecutor

func NewExecutor(concurrency uint) Executor

NewExecutor return a Executor with a specified max concurrency

type NodeType

type NodeType string
const (
	NodeSubflow   NodeType = "subflow"   // subflow
	NodeStatic    NodeType = "static"    // static
	NodeCondition NodeType = "condition" // static
)

type Static

type Static struct {
	// contains filtered or unexported fields
}

Static Wrapper

type Subflow

type Subflow struct {
	// contains filtered or unexported fields
}

Subflow Wrapper

func (*Subflow) Push

func (sf *Subflow) Push(tasks ...*Task)

Push pushs all tasks into subflow

type Task

type Task struct {
	// contains filtered or unexported fields
}

Basic component of Taskflow

func NewCondition added in v0.0.5

func NewCondition(name string, predict func() uint) *Task

NewCondition returns a condition task. The predict func return value determines its successor.

func NewSubflow

func NewSubflow(name string, f func(sf *Subflow)) *Task

NewSubflow returns a subflow task

func NewTask

func NewTask(name string, f func()) *Task

NewStaticTask returns a static task

func (*Task) Name

func (t *Task) Name() string

func (*Task) Precede

func (t *Task) Precede(tasks ...*Task)

Precede: Tasks all depend on *this*. In Addition, order of tasks is correspond to predict result, ranging from 0...len(tasks)

func (*Task) Succeed

func (t *Task) Succeed(tasks ...*Task)

Succeed: *this* deps on tasks

type TaskFlow

type TaskFlow struct {
	// contains filtered or unexported fields
}

TaskFlow represents a series of tasks organized in DAG. Tasks must be pushed via a `Push` api.

func NewTaskFlow

func NewTaskFlow(name string) *TaskFlow

NewTaskFlow returns a taskflow struct

func (*TaskFlow) Name

func (tf *TaskFlow) Name() string

func (*TaskFlow) Push

func (tf *TaskFlow) Push(tasks ...*Task)

Push pushs all task into taskflow

func (*TaskFlow) Reset

func (tf *TaskFlow) Reset()

Reset resets taskflow

Directories

Path Synopsis
examples
conditional command
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL