Documentation
¶
Overview ¶
Package scheduler ensures all tasks in a pipeline and child pipelines are loaded and executed in the order they need to be, parallelizing where possible.
Index ¶
- Constants
- Variables
- type ExecutionGraph
- func (g *ExecutionGraph) AddStage(stage *Stage) error
- func (g *ExecutionGraph) BFSNodesFlattened(nodeName string) StageList
- func (g *ExecutionGraph) Children(node string) map[string]*Stage
- func (g *ExecutionGraph) Denormalize() (*ExecutionGraph, error)
- func (g *ExecutionGraph) Duration() time.Duration
- func (g *ExecutionGraph) Error() error
- func (graph *ExecutionGraph) Flatten(nodeName string, ancestralParentNames []string, ...)
- func (g *ExecutionGraph) LastError() error
- func (g *ExecutionGraph) Name() string
- func (g *ExecutionGraph) Node(name string) (*Stage, error)
- func (g *ExecutionGraph) Nodes() map[string]*Stage
- func (g *ExecutionGraph) Parents(name string) map[string]*Stage
- func (g *ExecutionGraph) VisitNodes(callback func(node *Stage) (done bool), recursive bool)
- func (g *ExecutionGraph) WithAlias(v string) *ExecutionGraph
- func (g *ExecutionGraph) WithStageError(stage *Stage, err error)
- type GraphError
- type Scheduler
- type Stage
- func (s *Stage) Duration() time.Duration
- func (s *Stage) End() time.Time
- func (s *Stage) Env() *variables.Variables
- func (s *Stage) EnvFile() *utils.Envfile
- func (s *Stage) FromStage(originalStage *Stage, existingGraph *ExecutionGraph, ancestralParents []string) *Stage
- func (s *Stage) ReadStatus() int32
- func (s *Stage) Start() time.Time
- func (s *Stage) UpdateStatus(status int32)
- func (s *Stage) Variables() *variables.Variables
- func (s *Stage) WithEnd(v time.Time) *Stage
- func (s *Stage) WithEnv(v *variables.Variables)
- func (s *Stage) WithEnvFile(v *utils.Envfile)
- func (s *Stage) WithStart(v time.Time) *Stage
- func (s *Stage) WithVariables(v *variables.Variables)
- type StageList
- type StageOpts
- type StageTable
Examples ¶
Constants ¶
const ( StatusWaiting int32 = iota StatusRunning StatusSkipped StatusDone StatusError StatusCanceled )
Stage statuses
const (
RootNodeName = "root"
)
Variables ¶
Functions ¶
This section is empty.
Types ¶
type ExecutionGraph ¶
type ExecutionGraph struct {
Generator map[string]any
Env map[string]string
EnvFile *utils.Envfile
// contains filtered or unexported fields
}
ExecutionGraph is a DAG whose nodes are Stages and edges are their dependencies
func NewExecutionGraph ¶
func NewExecutionGraph(name string, stages ...*Stage) (*ExecutionGraph, error)
NewExecutionGraph creates new ExecutionGraph instance. It accepts zero or more stages and adds them to resulted graph
func (*ExecutionGraph) AddStage ¶
func (g *ExecutionGraph) AddStage(stage *Stage) error
AddStage adds Stage to ExecutionGraph. If newly added stage causes a cycle to appear in the graph it return an error
func (*ExecutionGraph) BFSNodesFlattened ¶
func (g *ExecutionGraph) BFSNodesFlattened(nodeName string) StageList
BFSNodesFlattened returns a Breadth-First-Search flattened list of top level tasks/pipelines This is useful in summaries as we want the things that run in parallel on the same level to show in that order before the level below and so on.
When generating CI definitions - we don't need to generate the same jobs/steps over and over again they will be referenced with a needs/depends_on/etc... keyword.
Returns a slice of stages in this level of the graph.
func (*ExecutionGraph) Denormalize ¶
func (g *ExecutionGraph) Denormalize() (*ExecutionGraph, error)
Denormalize performs a recursive DFS traversal on the ExecutionGraph from the root node and creates a new stage reference.
In order to be able to call the same pipeline from another pipeline, we want to create a new pointer to it, this will avoid race conditions in times/outputs/env vars/etc... We can also set separate vars and environment variables
The denormalized pipeline will include all the same stages and nested pipelines, but with all names rebuilt using the cascaded ancestors as prefixes
func (*ExecutionGraph) Duration ¶
func (g *ExecutionGraph) Duration() time.Duration
Duration returns execution duration
func (*ExecutionGraph) Error ¶
func (g *ExecutionGraph) Error() error
func (*ExecutionGraph) Flatten ¶
func (graph *ExecutionGraph) Flatten(nodeName string, ancestralParentNames []string, flattenedStage map[string]*Stage)
Flatten is a recursive helper function to clone nodes with unique paths.
Each new instance will have a separate memory address allocation. Will be used for denormalization.
func (*ExecutionGraph) LastError ¶
func (g *ExecutionGraph) LastError() error
LastError returns latest error appeared during stages execution
func (*ExecutionGraph) Name ¶
func (g *ExecutionGraph) Name() string
Name returns the name of the graph
func (*ExecutionGraph) Node ¶
func (g *ExecutionGraph) Node(name string) (*Stage, error)
Node returns stage by its name
func (*ExecutionGraph) Nodes ¶
func (g *ExecutionGraph) Nodes() map[string]*Stage
Nodes returns ExecutionGraph stages - an n-ary tree itself Stage (Node) may appear multiple times in a scheduling scenario, this is desired behaviour to loop over the nodes as many times as they appear in a DAG manner.
func (*ExecutionGraph) Parents ¶
func (g *ExecutionGraph) Parents(name string) map[string]*Stage
Parents returns stages on whiсh given stage depends on
func (*ExecutionGraph) VisitNodes ¶
func (g *ExecutionGraph) VisitNodes(callback func(node *Stage) (done bool), recursive bool)
VisitNodes visits all nodes in a given graph or recursively through all subgraphs in a parented graph
func (*ExecutionGraph) WithAlias ¶
func (g *ExecutionGraph) WithAlias(v string) *ExecutionGraph
func (*ExecutionGraph) WithStageError ¶
func (g *ExecutionGraph) WithStageError(stage *Stage, err error)
type GraphError ¶
type GraphError struct {
// contains filtered or unexported fields
}
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler executes ExecutionGraph
func NewScheduler ¶
NewScheduler create new Scheduler instance
func (*Scheduler) Cancel ¶
func (s *Scheduler) Cancel()
Cancel cancels executing tasks atomically loads the cancelled code
func (*Scheduler) Schedule ¶
func (s *Scheduler) Schedule(g *ExecutionGraph) error
Schedule starts execution of the given ExecutionGraph
Example ¶
package main
import (
"fmt"
"github.com/Ensono/eirctl/scheduler"
"github.com/Ensono/eirctl/runner"
"github.com/Ensono/eirctl/task"
)
func main() {
format := task.FromCommands("t1", "go fmt ./...")
build := task.FromCommands("t2", "go build ./..")
r, _ := runner.NewTaskRunner()
s := scheduler.NewScheduler(r)
graph, err := scheduler.NewExecutionGraph("t1",
scheduler.NewStage("format", func(s *scheduler.Stage) {
s.Task = format
}),
scheduler.NewStage("build", func(s *scheduler.Stage) {
s.Task = build
s.DependsOn = []string{"format"}
}),
)
if err != nil {
return
}
err = s.Schedule(graph)
if err != nil {
fmt.Println(err)
}
}
type Stage ¶
type Stage struct {
Name string
Condition string
Task *task.Task
Pipeline *ExecutionGraph
// Alias is a pointer to the source pipeline
// this can be referenced multiple times
// the denormalization process will dereference these
Alias string
DependsOn []string
Dir string
AllowFailure bool
Generator map[string]any
// contains filtered or unexported fields
}
Stage is a structure that describes execution stage Stage is a synonym for a Node in a the unary tree of the execution graph/tree
func (*Stage) FromStage ¶
func (s *Stage) FromStage(originalStage *Stage, existingGraph *ExecutionGraph, ancestralParents []string) *Stage
func (*Stage) ReadStatus ¶
ReadStatus is a helper to read stage's status atomically
func (*Stage) UpdateStatus ¶
UpdateStatus updates stage's status atomically
func (*Stage) WithEnvFile ¶ added in v0.8.0
func (*Stage) WithVariables ¶
type StageList ¶
type StageList []*Stage
type StageOpts ¶
type StageOpts func(*Stage)
StageOpts is the Node options
Pass in tasks/pipelines or other properties using the options pattern
type StageTable ¶
StageTable is a simple hash table of denormalized stages into a flat hash table (map)
NOTE: used for read only at this point
func (StageTable) NthLevelChildren ¶
func (st StageTable) NthLevelChildren(prefix string, depth int) []*Stage
NthLevelChildren retrieves the nodes by prefix and depth specified
removing the base prefix and looking at the depth of the keyprefix per stage
func (StageTable) RecurseParents ¶
func (st StageTable) RecurseParents(prefix string) []*Stage
RecurseParents walks all the parents recursively and appends to the list in revers order