Documentation
¶
Overview ¶
Package taskgraph provides declarative modelling and orchestration of workflows which look like directed acyclic graphs.
Taskgraph was designed to be used for building operators which reconcile Kubernetes custom resources (custom resource reconciliation is often a multi-step process, where each step has dependencies on other steps). However, it is not in any way tied to Kubernetes, and can be used for other workflows as well.
Tasks form the nodes in the graph, and the dependencies between them form the edges between the tasks. The dependencies are modeled as typed `Key`s; the tasks declare which keys they depend on and which Keys they provide values for. There is thus an edge in the graph between the task which produces a key and each task which depends on the key. It is an error for multiple tasks to provide the same key.
When executing a graph, keys are bound to values; tasks which depend on a key can then read the bound value and use it to perform its own work. It is an error for a task to not bind a key which the task declares that it provides.
Execution of a graph starts with the "source" tasks whose dependencies are already fulfilled (either because they do not depend on any keys, or because the dependencies have been provided as inputs to the graph). As tasks complete and produce bindings for their provided keys, the dependent tasks are triggered (with checks to wait for all dependencies to be available). As a result, execution of the graph is performed as a parallelised breadth first search of the graph.
This package contains some creative APIs which work around limitations in Golang generics; chief of these is the `ID` type used in place of slices of keys because Golang slices are invariant.
Example ¶
package main
import (
"context"
"fmt"
"log"
tg "github.com/thought-machine/taskgraph"
)
var (
keyInput = tg.NewKey[string]("input")
keyReversed = tg.NewNamespacedKey[string]("input", "reversed")
keyResult = tg.NewKey[bool]("result")
)
var taskReverseInput = tg.Reflect[string]{
Name: "reverseInput",
ResultKey: keyReversed,
Fn: func(input string) (string, error) {
var res string
for _, v := range input {
res = string(v) + res
}
return res, nil
},
Depends: []any{keyInput},
}.Locate()
var taskIsPalindrome = tg.Reflect[bool]{
Name: "isPalindrome",
ResultKey: keyResult,
Fn: func(input, reversed string) (bool, error) {
return input == reversed, nil
},
Depends: []any{keyInput, keyReversed},
}.Locate()
var graphIsPalindrome = tg.Must(tg.New("example_graph", tg.WithTasks(taskReverseInput, taskIsPalindrome)))
func main() {
res, err := graphIsPalindrome.Run(context.Background(), keyInput.Bind("racecar"))
if err != nil {
log.Fatal(err)
}
val, err := keyResult.Get(res)
if err != nil {
log.Fatal(err)
}
fmt.Printf("%t\n", val)
}
Output: true
Index ¶
- Variables
- func ErrorsMaybe(maybes ...MaybeStatus) error
- func MissingMaybe(maybes map[string]MaybeStatus) []string
- func Must[T any](val T, err error) T
- func RegisterMetrics(registry prometheus.Registerer)
- func SelectSingleMaybe[T any](maybes ...Maybe[T]) (T, error)
- type BindStatus
- type Binder
- type Binding
- type Condition
- type ConditionAnd
- type ConditionOr
- type Conditional
- type Graph
- type GraphOption
- type GraphOptions
- type ID
- type Key
- type Maybe
- type MaybeStatus
- type ReadOnlyKey
- type Reflect
- type ReflectMulti
- type Task
- func AllBound(name string, result Key[bool], deps ...ID) Task
- func NewTask(name string, fn func(context.Context, Binder) ([]Binding, error), ...) Task
- func NoOutputTask(name string, fn func(ctx context.Context, b Binder) error, depends ...ID) Task
- func SimpleTask[T any](name string, key Key[T], fn func(ctx context.Context, b Binder) (T, error), ...) Task
- func SimpleTask1[A1, Res any](name string, resKey Key[Res], ...) Task
- func SimpleTask2[A1, A2, Res any](name string, resKey Key[Res], ...) Task
- type TaskSet
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrExposedKeyNotProvided is returned from Graph.AsTask() when a key requested to be exposed is // not provided by any task in the graph. ErrExposedKeyNotProvided = errors.New("key(s) exposed but not provided by graph") // ErrDuplicateTaskNames is returned from New() if multiple tasks with the same name are passed to // it. ErrDuplicateTaskNames = errors.New("duplicate task names") // ErrDuplicateProvidedKeys is returned from New() if multiple tasks provide the same key. ErrDuplicateProvidedKeys = errors.New("keys provided by multiple tasks") // ErrGraphCycle is returned from New() if there is a cycle in the graph tasks (i.e. if a task A // depends on a key which is produced by some task B which depends indirectly on a key produced by // task A). ErrGraphCycle = errors.New("found cycle in graph") // ErrTooManyTasks is returned from New() if too many tasks are passed to it. This is a sanity // check to avoid taking too long to check for cycles. The limit could be increased if the cycle // checking is optimised. ErrTooManyTasks = wrapStackErrorf("too many tasks in graph (limit %d)", taskLimit) // ErrMissingInputs is returned from Graph.Run() if the provided inputs do not satisfy all of the // graph's dependencies (i.e. all task dependencies that are not provided by some other task in // the graph). ErrMissingInputs = errors.New("missing inputs") )
var ( // ErrIsAbsent is returned from Key[T].Get() when the Key has been bound as absent without // providing a more specific error. ErrIsAbsent = errors.New("is absent") // ErrIsPending is returned from Key[T].Get() when the Key has not been bound. ErrIsPending = errors.New("is pending") // ErrWrongType is returned from Key[T].Get() when the Binder contains a Binding for the Key's ID, // but that Binding contains a value which is not of type T. This can only happen if 2 Keys are // created with the same ID but different types. ErrWrongType = errors.New("wrong type") )
var ( // ErrMultipleMaybesPresent is returned from SelectSingleMaybe if multiple inputs are present. ErrMultipleMaybesPresent = errors.New("multiple present maybes passed to SelectSingleMaybe") // ErrNoMaybesPresent is returned from SelectSingleMaybe if no inputs are present. ErrNoMaybesPresent = errors.New("no present maybes passed to SelectSingleMaybe") )
var ( // ErrDuplicateBinding is returned when Binder.Store is called with a binding whose ID has already // been stored (which implies that the graph being executed contains multiple tasks producing // bindings for the same Key). ErrDuplicateBinding = errors.New("duplicate binding") )
Functions ¶
func ErrorsMaybe ¶
func ErrorsMaybe(maybes ...MaybeStatus) error
ErrorsMaybe detects if there are any errors in the maybes. It returns a single error wrapping any errors present.
func MissingMaybe ¶
func MissingMaybe(maybes map[string]MaybeStatus) []string
MissingMaybe returns true if any maybes are not present.
func Must ¶
Must wraps around a function returning a value and error, calls log.Fatal if the error is non-nil, and otherwise returns the value. This is intended to be used for constructing top-level Graph variables:
var ( myGraph = taskgraph.Must(taskgraph.New(myTask)) )
func RegisterMetrics ¶
func RegisterMetrics(registry prometheus.Registerer)
RegisterMetrics registers all taskgraph metrics with a prometheus registry.
func SelectSingleMaybe ¶
SelectSingleMaybe finds the single Maybe in the given list which is present. It returns an error if no present Maybe is found, or if multiple present Maybes are found.
Types ¶
type BindStatus ¶
type BindStatus int
BindStatus represents the tristate of a Binding.
const ( // Pending represents where the key is unbound (i.e. that no task has yet provided a binding for // the key, and no input binding was provided). Pending BindStatus = iota // Absent represents where the key is explicitly unbound (i.e. that the task which provides it was // unable to provide a value). The binding will contain an error, which is ErrIsAbsent by default // but can be another error to propagate information between tasks. This allows for errors which // do not terminate the execution of a graph. Absent // Present represents where the key is bound to a valid value (i.e. the task was able to provide a // value, or the value was bound as an input). Present )
func (BindStatus) String ¶
func (bs BindStatus) String() string
type Binder ¶
type Binder interface {
// Store adds bindings to the binder that can be retrieved with Get().
Store(...Binding) error
// Returns whether the given IDs have all been bound (as Present or Absent).
Has(...ID) bool
// Get a previously stored binding. If no binding with the given ID has yet been stored, a binding with Status() = Pending is generated.
Get(ID) Binding
// GetAll returns all stored bindings. This is typically used only for tests.
GetAll() []Binding
}
A Binder is the state store for tasks in a graph.
func NewOverlayBinder ¶
NewOverlayBinder creates a new overlay binder.
type Binding ¶
type Binding interface {
// ID returns the ID of the key which is bound by this Binding.
ID() ID
// Status returns the status of this binding.
Status() BindStatus
// Value returns the value bound to the key. This should only be called if Status() returns Present.
Value() any
// Value returns the error bound to the key. This should only be called if Status() returns Absent.
Error() error
}
A Binding is a tristate wrapper around a key ID and an optional value or error. See the documentation for BindStatus for details of the 3 states. Bindings are produced by calling the Bind, BindAbsent, or BindError methods on a Key.
type Condition ¶
type Condition interface {
// Evaluate should return whether the conditional task should be executed.
Evaluate(ctx context.Context, b Binder) (bool, error)
// Deps should return the IDs of the keys used by the Evaluate function.
Deps() []ID
}
Condition defines a condition for a Conditional task.
type ConditionAnd ¶
type ConditionAnd []ReadOnlyKey[bool]
ConditionAnd evaluates to true if and only if all of the keys it contains are bound to true.
type ConditionOr ¶
type ConditionOr []ReadOnlyKey[bool]
ConditionOr evaluates to true if any of the keys it contains are bound to true.
type Conditional ¶
type Conditional struct {
NamePrefix string
Wrapped TaskSet
Condition Condition
DefaultBindings []Binding
// contains filtered or unexported fields
}
Conditional wraps tasks such that they are only run if given Condition evaluates to true. If it evaluates to false, the bindings in DefaultBindings are used, with any missing keys provided by the wrapped tasks bound as absent.
Note that the tasks will not run until all of the wrapped task's dependencies and all of the condition's dependencies have been bound.
To run tasks if keys of any type have been bound to some value (i.e. not bound as absent), use Presence() to wrap the key. To check for specific values, use Mapped() to wrap the key.
func (Conditional) Locate ¶
func (c Conditional) Locate() Conditional
Locate annotates the Conditional with its location in the source code, to make error messages easier to understand. Calling it is required.
type Graph ¶
type Graph interface {
// Check whether the given input bindings are sufficient to run the graph.
//
// This is intended to be run by a genrule at build time to assert that all keys required by tasks
// in the graph are provided either as an input or by some other task in the graph. It also checks
// that there are no duplicate inputs.
Check(inputs ...Binding) error
// Run executes the task graph with the given inputs, returning a Binder containing the bound
// values from all tasks (but not any of the input bindings).
//
// It is advisable to set a timeout on the passed context, although it is up to the individual
// tasks to listen for context cancellation.
Run(ctx context.Context, inputs ...Binding) (Binder, error)
// AsTask produces a Task which runs this Graph in full to allow composition of graphs. The task
// depends on all keys which are required by any task within it and not provided by any task
// within it. The task provides only the key IDs passed to this method; and only their bindings
// will be available in the result of any graph the task is included in (any bindings produced by
// tasks within this graph whose IDs were not passed to this method will be suppressed).
//
// Bindings for the exposed keys are added to the binder of the parent task as soon as they are
// generated by tasks within this graph, which means that tasks outside this graph which depend on
// the exposed keys can start running as soon as the producing task completes, rather than waiting
// for this entire task to complete.
AsTask(exposeKeys ...ID) (Task, error)
// Graphviz produces a graphviz representation of the graph, with the tasks as nodes and the
// dependencies as edges. This output can be pased into tools like
// https://dreampuf.github.io/GraphvizOnline or https://dot-to-ascii.ggerganov.com/ to view the
// structure of the graph.
//
// The includeInputs parameter controls whether graph inputs are included in the output; including
// them tends to make the graph significantly more complicated and harder for the graphviz engine
// to lay out in a useful way.
Graphviz(includeInputs bool) string
}
A Graph represents a declarative workflow of tasks.
type GraphOption ¶
type GraphOption func(opts *GraphOptions) error
func WithTasks ¶
func WithTasks(tasks ...TaskSet) GraphOption
func WithTracer ¶
func WithTracer(tracer trace.Tracer) GraphOption
type GraphOptions ¶
type GraphOptions struct {
// contains filtered or unexported fields
}
type ID ¶
type ID struct {
// contains filtered or unexported fields
}
ID represents a type-parameter-less identifier for a Key.
type Key ¶
type Key[T any] interface { ReadOnlyKey[T] // Bind this key to the given value. Bind(val T) Binding // Bind this key as absent (see the comment on the Absent BindStatus). This is equivalent to // calling BindError(ErrIsAbsent) BindAbsent() Binding // Bind this key as absent with a specific error (see the comment on the Absent BindStatus). BindError(err error) Binding }
A Key identifies an input and/or output to a task or graph, which can be bound to a value.
func NewKey ¶
NewKey creates a new Key. This should typically be called at the top level of a package as a var.
func NewNamespacedKey ¶
NewNamespacedKey creates a new namespaced Key. This should typically be called at the top level of a package as a var.
type Maybe ¶
type Maybe[T any] struct { // contains filtered or unexported fields }
Maybe encapsulates a value and error. This is primarily intended to be used by Optional(key), which in turn is primarily intended to be used to make it easier to build tasks with reflection where their dependencies may be absent.
func MaybeErr ¶
MaybeErr constructs a Maybe containing the given error and no value. If the error is nil, ErrIsAbsent is used.
type MaybeStatus ¶
MaybeStatus interface includes only the uniformly-typed functions of a Maybe, which is useful for status aggregation
type ReadOnlyKey ¶
type ReadOnlyKey[T any] interface { // ID returns the type-parameter-less ID which identifies the key. ID() ID // Location returns the file and line where this key was defined. Location() string // Get retrieves the value for this key from the binder. Get(Binder) (T, error) }
ReadOnlyKey represents a key which can be read from a binder, but not bound itself.
func Mapped ¶
func Mapped[In, Out any](key ReadOnlyKey[In], fn func(In) Out) ReadOnlyKey[Out]
Mapped returns a ReadOnlyKey which applies the given mapping function when Get() is called. This is primarily intended for generating keys to use with Conditional().
func Optional ¶
func Optional[T any](base ReadOnlyKey[T]) ReadOnlyKey[Maybe[T]]
Optional returns a key that will wrap any error from the base key in a Maybe, such that Get will never return an error. This is intended for use with reflection, where a task's parameters may be absent.
func Presence ¶
func Presence[T any](key ReadOnlyKey[T]) ReadOnlyKey[bool]
Presence returns a ReadOnlyKey key which returns whether the underlying key is present in the binder.
type Reflect ¶
type Reflect[T any] struct { // Name of the built task Name string // The key which Fn produces a value for ResultKey Key[T] // The task function. This function should: // * Optionally take a context.Context as the first argument // * Take one argument for each entry in Depends whose type matches the type parameter of the Key // * Return T or (T, error) Fn any // A list of Key[X], where X may be different for each key (which is why this is []any). These // keys are used to provide the arguments to Fn. Each key is expected to be bound as present when // the Task is run; it is up to the user to either wrap the built task in a Conditional() or to // use Optional() keys where necessary. Depends []any // contains filtered or unexported fields }
A Reflect uses reflection to build a Task providing a single value, avoiding the need for the task function to call myKey.Get(binder) and check the error for each dependency. This is a struct to provide poor-man's named arguments.
func (Reflect[T]) Build ¶
Build the task from the parameters in the Reflect struct. This is exposed for testing; prefer using Reflect[T] as a TaskSet rather than calling Build() directly.
type ReflectMulti ¶
type ReflectMulti struct {
// Name of the built task
Name string
// The task function. This function should:
// * Optionally take a context.Context as the first argument
// * Take one argument for each entry in Depends whose type matches the type parameter of the Key
// * Return []Binding or ([]Binding, error)
Fn any
// The list of key IDs which the task provides.
Provides []ID
// A list of Key[X], where X may be different for each key (which is why this is []any). These
// keys are used to provide the arguments to Fn. Each key is expected to be bound as present when
// the Task is run; it is up to the user to either wrap the built task in a Conditional() or to
// use Optional() keys where necessary.
Depends []any
// contains filtered or unexported fields
}
A ReflectMulti uses reflection to build a Task providing multiple values, avoiding the need for the task function to call myKey.Get(binder) and check the error for each dependency. This is a struct to provide poor-man's named arguments.
func (ReflectMulti) Build ¶
func (r ReflectMulti) Build() (Task, error)
Build the task from the parameters in the ReflectMulti struct. This is exposed for testing; prefer using Reflect[T] as a TaskSet rather than calling Build() directly.
func (ReflectMulti) Locate ¶
func (r ReflectMulti) Locate() ReflectMulti
Locate annotates the ReflectMulti with its location in the source code, to make error messages easier to understand. Calling it is recommended but not required if wrapped in a Conditional
func (ReflectMulti) Tasks ¶
func (r ReflectMulti) Tasks() []Task
Tasks satisfies the TaskSet interface to avoid the need to call Build(). It is equivalent to calling Must(Build()).
type Task ¶
type Task interface {
// A task can be considered to be a singleton set.
TaskSet
// Name returns the name of the task given when the task was created.
Name() string
// Depends returns the IDs of the keys on which this task depends (i.e. the keys which must be
// provided as a graph input or by another task before this task can be executed)
Depends() []ID
// Provides returns the IDs of the keys for which this task provides bindings (i.e. the list of
// Bindings returned by Execute must exactly match this list aside from ordering).
Provides() []ID
// Execute performs the unit of work for this task, consuming its dependencies from the given
// Binder, and returning Bindings for each key the task has declared that it provides. Any error
// returned from Execute() will terminate the processing of the entire graph.
Execute(context.Context, Binder) ([]Binding, error)
// Location returns the file and line where this task was defined.
Location() string
}
A Task represents a small unit of work within the graph (tasks form the nodes of the graph).
func AllBound ¶
AllBound returns a task which binds the result key to true without reading its dependencies.
This is intended to be used with conditional tasks to wait for multiple tasks to be completed.
func NewTask ¶
func NewTask(name string, fn func(context.Context, Binder) ([]Binding, error), depends, provides []ID) Task
NewTask builds a task with any number of inputs and outputs.
func NoOutputTask ¶
NoOutputTask builds a task which may consume inputs but produces no output bindings.
func SimpleTask ¶
func SimpleTask[T any](name string, key Key[T], fn func(ctx context.Context, b Binder) (T, error), depends ...ID) Task
SimpleTask builds a task which produces a single output binding.
func SimpleTask1 ¶
func SimpleTask1[A1, Res any](name string, resKey Key[Res], fn func(ctx context.Context, arg1 A1) (Res, error), depKey1 ReadOnlyKey[A1]) Task
SimpleTask1 builds a task from a function taking a single argument and returning a single value plus an error.
func SimpleTask2 ¶
func SimpleTask2[A1, A2, Res any](name string, resKey Key[Res], fn func(ctx context.Context, arg1 A1, arg2 A2) (Res, error), depKey1 ReadOnlyKey[A1], depKey2 ReadOnlyKey[A2]) Task
SimpleTask2 builds a task from a function taking two arguments and returning a single value plus an error.
type TaskSet ¶
type TaskSet interface {
Tasks() []Task
}
A TaskSet defines a nestable collection of tasks. A Task fulfils the TaskSet interface, acting as a singleton set.
func NewTaskSet ¶
NewTaskSet creates a new TaskSet from Tasks or TaskSets (or a combination of both).
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package taskgraphtest provides utilities for testing Graphs and Tasks built using the taskgraph package.
|
Package taskgraphtest provides utilities for testing Graphs and Tasks built using the taskgraph package. |