taskgraph

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2025 License: Apache-2.0 Imports: 19 Imported by: 0

README

Taskgraph

Package taskgraph provides declarative modelling and orchestration of workflows which look like directed acyclic graphs.

Taskgraph was designed to be used for building operators which reconcile Kubernetes custom resources (custom resource reconciliation is often a multi-step process, where each step has dependencies on other steps). However, it is not in any way tied to Kubernetes, and can be used for other workflows as well.

Tasks form the nodes in the graph, and the dependencies between them form the edges between the tasks. The dependencies are modeled as typed Keys; the tasks declare which keys they depend on and which Keys they provide values for. There is thus an edge in the graph between the task which produces a key and each task which depends on the key. It is an error for multiple tasks to provide the same key.

When executing a graph, keys are bound to values; tasks which depend on a key can then read the bound value and use it to perform its own work. It is an error for a task to not bind a key which the task declares that it provides.

Execution of a graph starts with the "source" tasks whose dependencies are already fulfilled (either because they do not depend on any keys, or because the dependencies have been provided as inputs to the graph). As tasks complete and produce bindings for their provided keys, the dependent tasks are triggered (with checks to wait for all dependencies to be available). As a result, execution of the graph is performed as a parallelised breadth first search of the graph.

This package contains some creative APIs which work around limitations in Golang generics; chief of these is the ID type used in place of slices of keys because Golang slices are invariant.

Limitations and sharp edges

  • Taskgraph has no deadlock detection. In theory the fact that the graph is acyclic should prevent deadlock, but in practice there may be other potential causes for deadlock that have not been considered.
  • Taskgraph runs every task in its own goroutine, with no limitation on how many tasks can be running at the same time.
  • Taskgraph is not easy to debug and understand the execution. While there is a small amount of logging and tracing, there is no way to inspect the data being passed through the graph.

Documentation

Overview

Package taskgraph provides declarative modelling and orchestration of workflows which look like directed acyclic graphs.

Taskgraph was designed to be used for building operators which reconcile Kubernetes custom resources (custom resource reconciliation is often a multi-step process, where each step has dependencies on other steps). However, it is not in any way tied to Kubernetes, and can be used for other workflows as well.

Tasks form the nodes in the graph, and the dependencies between them form the edges between the tasks. The dependencies are modeled as typed `Key`s; the tasks declare which keys they depend on and which Keys they provide values for. There is thus an edge in the graph between the task which produces a key and each task which depends on the key. It is an error for multiple tasks to provide the same key.

When executing a graph, keys are bound to values; tasks which depend on a key can then read the bound value and use it to perform its own work. It is an error for a task to not bind a key which the task declares that it provides.

Execution of a graph starts with the "source" tasks whose dependencies are already fulfilled (either because they do not depend on any keys, or because the dependencies have been provided as inputs to the graph). As tasks complete and produce bindings for their provided keys, the dependent tasks are triggered (with checks to wait for all dependencies to be available). As a result, execution of the graph is performed as a parallelised breadth first search of the graph.

This package contains some creative APIs which work around limitations in Golang generics; chief of these is the `ID` type used in place of slices of keys because Golang slices are invariant.

Example
package main

import (
	"context"
	"fmt"
	"log"

	tg "github.com/thought-machine/taskgraph"
)

var (
	keyInput    = tg.NewKey[string]("input")
	keyReversed = tg.NewNamespacedKey[string]("input", "reversed")
	keyResult   = tg.NewKey[bool]("result")
)

var taskReverseInput = tg.Reflect[string]{
	Name:      "reverseInput",
	ResultKey: keyReversed,
	Fn: func(input string) (string, error) {
		var res string
		for _, v := range input {
			res = string(v) + res
		}
		return res, nil
	},
	Depends: []any{keyInput},
}.Locate()

var taskIsPalindrome = tg.Reflect[bool]{
	Name:      "isPalindrome",
	ResultKey: keyResult,
	Fn: func(input, reversed string) (bool, error) {
		return input == reversed, nil
	},
	Depends: []any{keyInput, keyReversed},
}.Locate()

var graphIsPalindrome = tg.Must(tg.New("example_graph", tg.WithTasks(taskReverseInput, taskIsPalindrome)))

func main() {
	res, err := graphIsPalindrome.Run(context.Background(), keyInput.Bind("racecar"))
	if err != nil {
		log.Fatal(err)
	}
	val, err := keyResult.Get(res)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("%t\n", val)

}
Output:

true

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrExposedKeyNotProvided is returned from Graph.AsTask() when a key requested to be exposed is
	// not provided by any task in the graph.
	ErrExposedKeyNotProvided = errors.New("key(s) exposed but not provided by graph")

	// ErrDuplicateTaskNames is returned from New() if multiple tasks with the same name are passed to
	// it.
	ErrDuplicateTaskNames = errors.New("duplicate task names")

	// ErrDuplicateProvidedKeys is returned from New() if multiple tasks provide the same key.
	ErrDuplicateProvidedKeys = errors.New("keys provided by multiple tasks")

	// ErrGraphCycle is returned from New() if there is a cycle in the graph tasks (i.e. if a task A
	// depends on a key which is produced by some task B which depends indirectly on a key produced by
	// task A).
	ErrGraphCycle = errors.New("found cycle in graph")

	// ErrTooManyTasks is returned from New() if too many tasks are passed to it. This is a sanity
	// check to avoid taking too long to check for cycles. The limit could be increased if the cycle
	// checking is optimised.
	ErrTooManyTasks = wrapStackErrorf("too many tasks in graph (limit %d)", taskLimit)

	// ErrMissingInputs is returned from Graph.Run() if the provided inputs do not satisfy all of the
	// graph's dependencies (i.e. all task dependencies that are not provided by some other task in
	// the graph).
	ErrMissingInputs = errors.New("missing inputs")
)
View Source
var (
	// ErrIsAbsent is returned from Key[T].Get() when the Key has been bound as absent without
	// providing a more specific error.
	ErrIsAbsent = errors.New("is absent")

	// ErrIsPending is returned from Key[T].Get() when the Key has not been bound.
	ErrIsPending = errors.New("is pending")

	// ErrWrongType is returned from Key[T].Get() when the Binder contains a Binding for the Key's ID,
	// but that Binding contains a value which is not of type T. This can only happen if 2 Keys are
	// created with the same ID but different types.
	ErrWrongType = errors.New("wrong type")
)
View Source
var (
	// ErrMultipleMaybesPresent is returned from SelectSingleMaybe if multiple inputs are present.
	ErrMultipleMaybesPresent = errors.New("multiple present maybes passed to SelectSingleMaybe")
	// ErrNoMaybesPresent is returned from SelectSingleMaybe if no inputs are present.
	ErrNoMaybesPresent = errors.New("no present maybes passed to SelectSingleMaybe")
)
View Source
var (
	// ErrDuplicateBinding is returned when Binder.Store is called with a binding whose ID has already
	// been stored (which implies that the graph being executed contains multiple tasks producing
	// bindings for the same Key).
	ErrDuplicateBinding = errors.New("duplicate binding")
)

Functions

func ErrorsMaybe

func ErrorsMaybe(maybes ...MaybeStatus) error

ErrorsMaybe detects if there are any errors in the maybes. It returns a single error wrapping any errors present.

func MissingMaybe

func MissingMaybe(maybes map[string]MaybeStatus) []string

MissingMaybe returns true if any maybes are not present.

func Must

func Must[T any](val T, err error) T

Must wraps around a function returning a value and error, calls log.Fatal if the error is non-nil, and otherwise returns the value. This is intended to be used for constructing top-level Graph variables:

var (
  myGraph = taskgraph.Must(taskgraph.New(myTask))
)

func RegisterMetrics

func RegisterMetrics(registry prometheus.Registerer)

RegisterMetrics registers all taskgraph metrics with a prometheus registry.

func SelectSingleMaybe

func SelectSingleMaybe[T any](maybes ...Maybe[T]) (T, error)

SelectSingleMaybe finds the single Maybe in the given list which is present. It returns an error if no present Maybe is found, or if multiple present Maybes are found.

Types

type BindStatus

type BindStatus int

BindStatus represents the tristate of a Binding.

const (
	// Pending represents where the key is unbound (i.e. that no task has yet provided a binding for
	// the key, and no input binding was provided).
	Pending BindStatus = iota

	// Absent represents where the key is explicitly unbound (i.e. that the task which provides it was
	// unable to provide a value). The binding will contain an error, which is ErrIsAbsent by default
	// but can be another error to propagate information between tasks. This allows for errors which
	// do not terminate the execution of a graph.
	Absent

	// Present represents where the key is bound to a valid value (i.e. the task was able to provide a
	// value, or the value was bound as an input).
	Present
)

func (BindStatus) String

func (bs BindStatus) String() string

type Binder

type Binder interface {
	// Store adds bindings to the binder that can be retrieved with Get().
	Store(...Binding) error

	// Returns whether the given IDs have all been bound (as Present or Absent).
	Has(...ID) bool

	// Get a previously stored binding. If no binding with the given ID has yet been stored, a binding with Status() = Pending is generated.
	Get(ID) Binding

	// GetAll returns all stored bindings. This is typically used only for tests.
	GetAll() []Binding
}

A Binder is the state store for tasks in a graph.

func NewBinder

func NewBinder() Binder

NewBinder returns a new binder.

func NewOverlayBinder

func NewOverlayBinder(base, overlay Binder) Binder

NewOverlayBinder creates a new overlay binder.

func TestOnlyNewGraphTaskBinder

func TestOnlyNewGraphTaskBinder(internal, external Binder, exposeKeys set.Set[ID]) Binder

TestOnlyNewGraphTaskBinder creates a new graph task binder. This is exported for testing, and should not be called in production code.

type Binding

type Binding interface {
	// ID returns the ID of the key which is bound by this Binding.
	ID() ID

	// Status returns the status of this binding.
	Status() BindStatus

	// Value returns the value bound to the key. This should only be called if Status() returns Present.
	Value() any

	// Value returns the error bound to the key. This should only be called if Status() returns Absent.
	Error() error
}

A Binding is a tristate wrapper around a key ID and an optional value or error. See the documentation for BindStatus for details of the 3 states. Bindings are produced by calling the Bind, BindAbsent, or BindError methods on a Key.

type Condition

type Condition interface {
	// Evaluate should return whether the conditional task should be executed.
	Evaluate(ctx context.Context, b Binder) (bool, error)
	// Deps should return the IDs of the keys used by the Evaluate function.
	Deps() []ID
}

Condition defines a condition for a Conditional task.

type ConditionAnd

type ConditionAnd []ReadOnlyKey[bool]

ConditionAnd evaluates to true if and only if all of the keys it contains are bound to true.

func (ConditionAnd) Deps

func (ca ConditionAnd) Deps() []ID

Deps is Condition.Deps

func (ConditionAnd) Evaluate

func (ca ConditionAnd) Evaluate(_ context.Context, b Binder) (bool, error)

Evaluate is Condition.Evaluate.

type ConditionOr

type ConditionOr []ReadOnlyKey[bool]

ConditionOr evaluates to true if any of the keys it contains are bound to true.

func (ConditionOr) Deps

func (co ConditionOr) Deps() []ID

Deps is Condition.Deps

func (ConditionOr) Evaluate

func (co ConditionOr) Evaluate(_ context.Context, b Binder) (bool, error)

Evaluate is Condition.Evaluate.

type Conditional

type Conditional struct {
	NamePrefix      string
	Wrapped         TaskSet
	Condition       Condition
	DefaultBindings []Binding
	// contains filtered or unexported fields
}

Conditional wraps tasks such that they are only run if given Condition evaluates to true. If it evaluates to false, the bindings in DefaultBindings are used, with any missing keys provided by the wrapped tasks bound as absent.

Note that the tasks will not run until all of the wrapped task's dependencies and all of the condition's dependencies have been bound.

To run tasks if keys of any type have been bound to some value (i.e. not bound as absent), use Presence() to wrap the key. To check for specific values, use Mapped() to wrap the key.

func (Conditional) Locate

func (c Conditional) Locate() Conditional

Locate annotates the Conditional with its location in the source code, to make error messages easier to understand. Calling it is required.

func (Conditional) Tasks

func (c Conditional) Tasks() []Task

Tasks satisfies TaskSet.Tasks.

type Graph

type Graph interface {
	// Check whether the given input bindings are sufficient to run the graph.
	//
	// This is intended to be run by a genrule at build time to assert that all keys required by tasks
	// in the graph are provided either as an input or by some other task in the graph. It also checks
	// that there are no duplicate inputs.
	Check(inputs ...Binding) error

	// Run executes the task graph with the given inputs, returning a Binder containing the bound
	// values from all tasks (but not any of the input bindings).
	//
	// It is advisable to set a timeout on the passed context, although it is up to the individual
	// tasks to listen for context cancellation.
	Run(ctx context.Context, inputs ...Binding) (Binder, error)

	// AsTask produces a Task which runs this Graph in full to allow composition of graphs. The task
	// depends on all keys which are required by any task within it and not provided by any task
	// within it. The task provides only the key IDs passed to this method; and only their bindings
	// will be available in the result of any graph the task is included in (any bindings produced by
	// tasks within this graph whose IDs were not passed to this method will be suppressed).
	//
	// Bindings for the exposed keys are added to the binder of the parent task as soon as they are
	// generated by tasks within this graph, which means that tasks outside this graph which depend on
	// the exposed keys can start running as soon as the producing task completes, rather than waiting
	// for this entire task to complete.
	AsTask(exposeKeys ...ID) (Task, error)

	// Graphviz produces a graphviz representation of the graph, with the tasks as nodes and the
	// dependencies as edges. This output can be pased into tools like
	// https://dreampuf.github.io/GraphvizOnline or https://dot-to-ascii.ggerganov.com/ to view the
	// structure of the graph.
	//
	// The includeInputs parameter controls whether graph inputs are included in the output; including
	// them tends to make the graph significantly more complicated and harder for the graphviz engine
	// to lay out in a useful way.
	Graphviz(includeInputs bool) string
}

A Graph represents a declarative workflow of tasks.

func New

func New(name string, opts ...GraphOption) (Graph, error)

New creates a new Graph. Exactly one WithTasks option should be passed.

Ideally, Graphs should be created on program startup, rather than creating them dynamically.

type GraphOption

type GraphOption func(opts *GraphOptions) error

func WithTasks

func WithTasks(tasks ...TaskSet) GraphOption

func WithTracer

func WithTracer(tracer trace.Tracer) GraphOption

type GraphOptions

type GraphOptions struct {
	// contains filtered or unexported fields
}

type ID

type ID struct {
	// contains filtered or unexported fields
}

ID represents a type-parameter-less identifier for a Key.

func (ID) String

func (i ID) String() string

type Key

type Key[T any] interface {
	ReadOnlyKey[T]

	// Bind this key to the given value.
	Bind(val T) Binding

	// Bind this key as absent (see the comment on the Absent BindStatus). This is equivalent to
	// calling BindError(ErrIsAbsent)
	BindAbsent() Binding

	// Bind this key as absent with a specific error (see the comment on the Absent BindStatus).
	BindError(err error) Binding
}

A Key identifies an input and/or output to a task or graph, which can be bound to a value.

func NewKey

func NewKey[T any](id string) Key[T]

NewKey creates a new Key. This should typically be called at the top level of a package as a var.

func NewNamespacedKey

func NewNamespacedKey[T any](namespace, id string) Key[T]

NewNamespacedKey creates a new namespaced Key. This should typically be called at the top level of a package as a var.

type Maybe

type Maybe[T any] struct {
	// contains filtered or unexported fields
}

Maybe encapsulates a value and error. This is primarily intended to be used by Optional(key), which in turn is primarily intended to be used to make it easier to build tasks with reflection where their dependencies may be absent.

func MaybeErr

func MaybeErr[T any](err error) Maybe[T]

MaybeErr constructs a Maybe containing the given error and no value. If the error is nil, ErrIsAbsent is used.

func MaybeOf

func MaybeOf[T any](val T) Maybe[T]

MaybeOf constructs a Maybe containing the given value and no error.

func WrapMaybe

func WrapMaybe[T any](val T, err error) Maybe[T]

WrapMaybe takes the output of a function returning (T, error) and encapsulates it into a Maybe[T]. The value is presumed to be valid if err is nil.

func (Maybe[T]) Error

func (m Maybe[T]) Error() error

Error returns the encapsulated error.

func (Maybe[T]) Get

func (m Maybe[T]) Get() (T, error)

Get the encapsulated value and error.

func (Maybe[T]) Present

func (m Maybe[T]) Present() bool

Present returns the encapsulated present value.

type MaybeStatus

type MaybeStatus interface {
	Error() error
	Present() bool
}

MaybeStatus interface includes only the uniformly-typed functions of a Maybe, which is useful for status aggregation

type ReadOnlyKey

type ReadOnlyKey[T any] interface {
	// ID returns the type-parameter-less ID which identifies the key.
	ID() ID

	// Location returns the file and line where this key was defined.
	Location() string

	// Get retrieves the value for this key from the binder.
	Get(Binder) (T, error)
}

ReadOnlyKey represents a key which can be read from a binder, but not bound itself.

func Mapped

func Mapped[In, Out any](key ReadOnlyKey[In], fn func(In) Out) ReadOnlyKey[Out]

Mapped returns a ReadOnlyKey which applies the given mapping function when Get() is called. This is primarily intended for generating keys to use with Conditional().

func Not

func Not(key ReadOnlyKey[bool]) ReadOnlyKey[bool]

Not negates a boolean key.

func Optional

func Optional[T any](base ReadOnlyKey[T]) ReadOnlyKey[Maybe[T]]

Optional returns a key that will wrap any error from the base key in a Maybe, such that Get will never return an error. This is intended for use with reflection, where a task's parameters may be absent.

func Presence

func Presence[T any](key ReadOnlyKey[T]) ReadOnlyKey[bool]

Presence returns a ReadOnlyKey key which returns whether the underlying key is present in the binder.

type Reflect

type Reflect[T any] struct {
	// Name of the built task
	Name string

	// The key which Fn produces a value for
	ResultKey Key[T]

	// The task function. This function should:
	//  * Optionally take a context.Context as the first argument
	//  * Take one argument for each entry in Depends whose type matches the type parameter of the Key
	//  * Return T or (T, error)
	Fn any

	// A list of Key[X], where X may be different for each key (which is why this is []any). These
	// keys are used to provide the arguments to Fn. Each key is expected to be bound as present when
	// the Task is run; it is up to the user to either wrap the built task in a Conditional() or to
	// use Optional() keys where necessary.
	Depends []any
	// contains filtered or unexported fields
}

A Reflect uses reflection to build a Task providing a single value, avoiding the need for the task function to call myKey.Get(binder) and check the error for each dependency. This is a struct to provide poor-man's named arguments.

func (Reflect[T]) Build

func (r Reflect[T]) Build() (Task, error)

Build the task from the parameters in the Reflect struct. This is exposed for testing; prefer using Reflect[T] as a TaskSet rather than calling Build() directly.

func (Reflect[T]) Locate

func (r Reflect[T]) Locate() Reflect[T]

Locate annotates the Reflect with its location in the source code, to make error messages easier to understand. Calling it is recommended but not required if wrapped in a Conditional

func (Reflect[T]) Tasks

func (r Reflect[T]) Tasks() []Task

Tasks satisfies the TaskSet interface to avoid the need to call Build(). It is equivalent to calling Must(Build()).

type ReflectMulti

type ReflectMulti struct {
	// Name of the built task
	Name string

	// The task function. This function should:
	//  * Optionally take a context.Context as the first argument
	//  * Take one argument for each entry in Depends whose type matches the type parameter of the Key
	//  * Return []Binding or ([]Binding, error)
	Fn any

	// The list of key IDs which the task provides.
	Provides []ID

	// A list of Key[X], where X may be different for each key (which is why this is []any). These
	// keys are used to provide the arguments to Fn. Each key is expected to be bound as present when
	// the Task is run; it is up to the user to either wrap the built task in a Conditional() or to
	// use Optional() keys where necessary.
	Depends []any
	// contains filtered or unexported fields
}

A ReflectMulti uses reflection to build a Task providing multiple values, avoiding the need for the task function to call myKey.Get(binder) and check the error for each dependency. This is a struct to provide poor-man's named arguments.

func (ReflectMulti) Build

func (r ReflectMulti) Build() (Task, error)

Build the task from the parameters in the ReflectMulti struct. This is exposed for testing; prefer using Reflect[T] as a TaskSet rather than calling Build() directly.

func (ReflectMulti) Locate

func (r ReflectMulti) Locate() ReflectMulti

Locate annotates the ReflectMulti with its location in the source code, to make error messages easier to understand. Calling it is recommended but not required if wrapped in a Conditional

func (ReflectMulti) Tasks

func (r ReflectMulti) Tasks() []Task

Tasks satisfies the TaskSet interface to avoid the need to call Build(). It is equivalent to calling Must(Build()).

type Task

type Task interface {
	// A task can be considered to be a singleton set.
	TaskSet

	// Name returns the name of the task given when the task was created.
	Name() string
	// Depends returns the IDs of the keys on which this task depends (i.e. the keys which must be
	// provided as a graph input or by another task before this task can be executed)
	Depends() []ID
	// Provides returns the IDs of the keys for which this task provides bindings (i.e. the list of
	// Bindings returned by Execute must exactly match this list aside from ordering).
	Provides() []ID
	// Execute performs the unit of work for this task, consuming its dependencies from the given
	// Binder, and returning Bindings for each key the task has declared that it provides. Any error
	// returned from Execute() will terminate the processing of the entire graph.
	Execute(context.Context, Binder) ([]Binding, error)
	// Location returns the file and line where this task was defined.
	Location() string
}

A Task represents a small unit of work within the graph (tasks form the nodes of the graph).

func AllBound

func AllBound(name string, result Key[bool], deps ...ID) Task

AllBound returns a task which binds the result key to true without reading its dependencies.

This is intended to be used with conditional tasks to wait for multiple tasks to be completed.

func NewTask

func NewTask(name string, fn func(context.Context, Binder) ([]Binding, error), depends, provides []ID) Task

NewTask builds a task with any number of inputs and outputs.

func NoOutputTask

func NoOutputTask(name string, fn func(ctx context.Context, b Binder) error, depends ...ID) Task

NoOutputTask builds a task which may consume inputs but produces no output bindings.

func SimpleTask

func SimpleTask[T any](name string, key Key[T], fn func(ctx context.Context, b Binder) (T, error), depends ...ID) Task

SimpleTask builds a task which produces a single output binding.

func SimpleTask1

func SimpleTask1[A1, Res any](name string, resKey Key[Res], fn func(ctx context.Context, arg1 A1) (Res, error), depKey1 ReadOnlyKey[A1]) Task

SimpleTask1 builds a task from a function taking a single argument and returning a single value plus an error.

func SimpleTask2

func SimpleTask2[A1, A2, Res any](name string, resKey Key[Res], fn func(ctx context.Context, arg1 A1, arg2 A2) (Res, error), depKey1 ReadOnlyKey[A1], depKey2 ReadOnlyKey[A2]) Task

SimpleTask2 builds a task from a function taking two arguments and returning a single value plus an error.

type TaskSet

type TaskSet interface {
	Tasks() []Task
}

A TaskSet defines a nestable collection of tasks. A Task fulfils the TaskSet interface, acting as a singleton set.

func NewTaskSet

func NewTaskSet(els ...TaskSet) TaskSet

NewTaskSet creates a new TaskSet from Tasks or TaskSets (or a combination of both).

Directories

Path Synopsis
Package taskgraphtest provides utilities for testing Graphs and Tasks built using the taskgraph package.
Package taskgraphtest provides utilities for testing Graphs and Tasks built using the taskgraph package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL