scheduler

package
v0.9.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2025 License: GPL-3.0 Imports: 14 Imported by: 0

Documentation

Overview

Package scheduler ensures all tasks in a pipeline and child pipelines are loaded and executed in the order they need to be, parallelizing where possible.

Index

Examples

Constants

View Source
const (
	StatusWaiting int32 = iota
	StatusRunning
	StatusSkipped
	StatusDone
	StatusError
	StatusCanceled
)

Stage statuses

View Source
const (
	RootNodeName = "root"
)

Variables

View Source
var (
	// ErrCycleDetected occurs when added edge causes cycle to appear
	ErrCycleDetected = errors.New("cycle detected")
	// ErrNodeNotFound occurs when node is not found in the graph
	ErrNodeNotFound = errors.New("node not found")
	ErrRunTimeFault = errors.New("execution fault")
)

Functions

This section is empty.

Types

type ExecutionGraph

type ExecutionGraph struct {
	Generator map[string]any
	Env       map[string]string
	EnvFile   *utils.Envfile
	// contains filtered or unexported fields
}

ExecutionGraph is a DAG whose nodes are Stages and edges are their dependencies

func NewExecutionGraph

func NewExecutionGraph(name string, stages ...*Stage) (*ExecutionGraph, error)

NewExecutionGraph creates new ExecutionGraph instance. It accepts zero or more stages and adds them to resulted graph

func (*ExecutionGraph) AddStage

func (g *ExecutionGraph) AddStage(stage *Stage) error

AddStage adds Stage to ExecutionGraph. If newly added stage causes a cycle to appear in the graph it return an error

func (*ExecutionGraph) BFSNodesFlattened

func (g *ExecutionGraph) BFSNodesFlattened(nodeName string) StageList

BFSNodesFlattened returns a Breadth-First-Search flattened list of top level tasks/pipelines This is useful in summaries as we want the things that run in parallel on the same level to show in that order before the level below and so on.

When generating CI definitions - we don't need to generate the same jobs/steps over and over again they will be referenced with a needs/depends_on/etc... keyword.

Returns a slice of stages in this level of the graph.

func (*ExecutionGraph) Children

func (g *ExecutionGraph) Children(node string) map[string]*Stage

func (*ExecutionGraph) Denormalize

func (g *ExecutionGraph) Denormalize() (*ExecutionGraph, error)

Denormalize performs a recursive DFS traversal on the ExecutionGraph from the root node and creates a new stage reference.

In order to be able to call the same pipeline from another pipeline, we want to create a new pointer to it, this will avoid race conditions in times/outputs/env vars/etc... We can also set separate vars and environment variables

The denormalized pipeline will include all the same stages and nested pipelines, but with all names rebuilt using the cascaded ancestors as prefixes

func (*ExecutionGraph) Duration

func (g *ExecutionGraph) Duration() time.Duration

Duration returns execution duration

func (*ExecutionGraph) Error

func (g *ExecutionGraph) Error() error

func (*ExecutionGraph) Flatten

func (graph *ExecutionGraph) Flatten(nodeName string, ancestralParentNames []string, flattenedStage map[string]*Stage)

Flatten is a recursive helper function to clone nodes with unique paths.

Each new instance will have a separate memory address allocation. Will be used for denormalization.

func (*ExecutionGraph) LastError

func (g *ExecutionGraph) LastError() error

LastError returns latest error appeared during stages execution

func (*ExecutionGraph) Name

func (g *ExecutionGraph) Name() string

Name returns the name of the graph

func (*ExecutionGraph) Node

func (g *ExecutionGraph) Node(name string) (*Stage, error)

Node returns stage by its name

func (*ExecutionGraph) Nodes

func (g *ExecutionGraph) Nodes() map[string]*Stage

Nodes returns ExecutionGraph stages - an n-ary tree itself Stage (Node) may appear multiple times in a scheduling scenario, this is desired behaviour to loop over the nodes as many times as they appear in a DAG manner.

func (*ExecutionGraph) Parents

func (g *ExecutionGraph) Parents(name string) map[string]*Stage

Parents returns stages on whiсh given stage depends on

func (*ExecutionGraph) VisitNodes

func (g *ExecutionGraph) VisitNodes(callback func(node *Stage) (done bool), recursive bool)

VisitNodes visits all nodes in a given graph or recursively through all subgraphs in a parented graph

func (*ExecutionGraph) WithAlias

func (g *ExecutionGraph) WithAlias(v string) *ExecutionGraph

func (*ExecutionGraph) WithStageError

func (g *ExecutionGraph) WithStageError(stage *Stage, err error)

type GraphError

type GraphError struct {
	// contains filtered or unexported fields
}

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler executes ExecutionGraph

func NewScheduler

func NewScheduler(r runner.Runner) *Scheduler

NewScheduler create new Scheduler instance

func (*Scheduler) Cancel

func (s *Scheduler) Cancel()

Cancel cancels executing tasks atomically loads the cancelled code

func (*Scheduler) Cancelled

func (s *Scheduler) Cancelled() int32

func (*Scheduler) Finish

func (s *Scheduler) Finish()

Finish finishes scheduler's TaskRunner

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(g *ExecutionGraph) error

Schedule starts execution of the given ExecutionGraph

Example
package main

import (
	"fmt"

	"github.com/Ensono/eirctl/scheduler"

	"github.com/Ensono/eirctl/runner"
	"github.com/Ensono/eirctl/task"
)

func main() {
	format := task.FromCommands("t1", "go fmt ./...")
	build := task.FromCommands("t2", "go build ./..")
	r, _ := runner.NewTaskRunner()
	s := scheduler.NewScheduler(r)

	graph, err := scheduler.NewExecutionGraph("t1",
		scheduler.NewStage("format", func(s *scheduler.Stage) {
			s.Task = format
		}),
		scheduler.NewStage("build", func(s *scheduler.Stage) {
			s.Task = build
			s.DependsOn = []string{"format"}
		}),
	)
	if err != nil {
		return
	}

	err = s.Schedule(graph)
	if err != nil {
		fmt.Println(err)
	}
}

type Stage

type Stage struct {
	Name      string
	Condition string
	Task      *task.Task
	Pipeline  *ExecutionGraph
	// Alias is a pointer to the source pipeline
	// this can be referenced multiple times
	// the denormalization process will dereference these
	Alias        string
	DependsOn    []string
	Dir          string
	AllowFailure bool

	Generator map[string]any
	// contains filtered or unexported fields
}

Stage is a structure that describes execution stage Stage is a synonym for a Node in a the unary tree of the execution graph/tree

func NewStage

func NewStage(name string, opts ...StageOpts) *Stage

func (*Stage) Duration

func (s *Stage) Duration() time.Duration

Duration returns stage's execution duration

func (*Stage) End

func (s *Stage) End() time.Time

func (*Stage) Env

func (s *Stage) Env() *variables.Variables

func (*Stage) EnvFile added in v0.8.0

func (s *Stage) EnvFile() *utils.Envfile

func (*Stage) FromStage

func (s *Stage) FromStage(originalStage *Stage, existingGraph *ExecutionGraph, ancestralParents []string) *Stage

func (*Stage) ReadStatus

func (s *Stage) ReadStatus() int32

ReadStatus is a helper to read stage's status atomically

func (*Stage) Start

func (s *Stage) Start() time.Time

func (*Stage) UpdateStatus

func (s *Stage) UpdateStatus(status int32)

UpdateStatus updates stage's status atomically

func (*Stage) Variables

func (s *Stage) Variables() *variables.Variables

func (*Stage) WithEnd

func (s *Stage) WithEnd(v time.Time) *Stage

func (*Stage) WithEnv

func (s *Stage) WithEnv(v *variables.Variables)

func (*Stage) WithEnvFile added in v0.8.0

func (s *Stage) WithEnvFile(v *utils.Envfile)

func (*Stage) WithStart

func (s *Stage) WithStart(v time.Time) *Stage

func (*Stage) WithVariables

func (s *Stage) WithVariables(v *variables.Variables)

type StageList

type StageList []*Stage

func (StageList) Len

func (s StageList) Len() int

Len returns the length of the StageList

func (StageList) Less

func (s StageList) Less(i, j int) bool

Less defines the comparison logic for sorting the StageList It needs to put all parents at the top and children towards the bottom

func (StageList) Swap

func (s StageList) Swap(i, j int)

Swap swaps two elements in the StageList

type StageOpts

type StageOpts func(*Stage)

StageOpts is the Node options

Pass in tasks/pipelines or other properties using the options pattern

type StageTable

type StageTable map[string]*Stage

StageTable is a simple hash table of denormalized stages into a flat hash table (map)

NOTE: used for read only at this point

func (StageTable) NthLevelChildren

func (st StageTable) NthLevelChildren(prefix string, depth int) []*Stage

NthLevelChildren retrieves the nodes by prefix and depth specified

removing the base prefix and looking at the depth of the keyprefix per stage

func (StageTable) RecurseParents

func (st StageTable) RecurseParents(prefix string) []*Stage

RecurseParents walks all the parents recursively and appends to the list in revers order

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL