scheduler

package
v1.8.0-alpha-aaaabbbb1234 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2024 License: GPL-3.0 Imports: 11 Imported by: 0

Documentation

Overview

Package scheduler ensures all tasks in a pipeline and child pipelines are loaded and executed in the order they need to be, parallelizing where possible.

Index

Examples

Constants

View Source
const (
	StatusWaiting int32 = iota
	StatusRunning
	StatusSkipped
	StatusDone
	StatusError
	StatusCanceled
)

Stage statuses

View Source
const (
	RootNodeName = "root"
)

Variables

View Source
var ErrCycleDetected = errors.New("cycle detected")

ErrCycleDetected occurs when added edge causes cycle to appear

Functions

This section is empty.

Types

type ExecutionGraph

type ExecutionGraph struct {
	Generator map[string]any
	Env       map[string][]string
	// contains filtered or unexported fields
}

ExecutionGraph is a DAG whose nodes are Stages and edges are their dependencies

func NewExecutionGraph

func NewExecutionGraph(name string, stages ...*Stage) (*ExecutionGraph, error)

NewExecutionGraph creates new ExecutionGraph instance. It accepts zero or more stages and adds them to resulted graph

func (*ExecutionGraph) AddStage

func (g *ExecutionGraph) AddStage(stage *Stage) error

AddStage adds Stage to ExecutionGraph. If newly added stage causes a cycle to appear in the graph it return an error

func (*ExecutionGraph) BFSNodesFlattened

func (g *ExecutionGraph) BFSNodesFlattened(nodeName string) []*Stage

BFSNodesFlattened returns a Breadth-First-Search flattened list of top level tasks/pipelines This is useful in summaries as we want the things that run in parallel on the same level to show in that order before the level below and so on.

When generating CI definitions - we don't need to generate the same jobs/steps over and over again they will be referenced with a needs/depends_on/etc... keyword

func (*ExecutionGraph) Children

func (g *ExecutionGraph) Children(node string) map[string]*Stage

func (*ExecutionGraph) Duration

func (g *ExecutionGraph) Duration() time.Duration

Duration returns execution duration

func (*ExecutionGraph) From

func (g *ExecutionGraph) From(name string) []string

From returns stages that depend on the given stage Children of the node

func (*ExecutionGraph) HasCycle

func (g *ExecutionGraph) HasCycle() error

func (*ExecutionGraph) LastError

func (g *ExecutionGraph) LastError() error

LastError returns latest error appeared during stages execution

func (*ExecutionGraph) Name

func (g *ExecutionGraph) Name() string

LastError returns latest error appeared during stages execution

func (*ExecutionGraph) Node

func (g *ExecutionGraph) Node(name string) (*Stage, error)

Node returns stage by its name

func (*ExecutionGraph) Nodes

func (g *ExecutionGraph) Nodes() map[string]*Stage

Nodes returns ExecutionGraph stages - an n-ary tree itself Stage (Node) may appear multiple times in a scheduling scenario, this is desired behaviour to loop over the nodes as many times as they appear in a DAG manner.

func (*ExecutionGraph) To

func (g *ExecutionGraph) To(name string) []string

To returns stages on whiсh given stage depends on returns the parents of the node

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler executes ExecutionGraph

func NewScheduler

func NewScheduler(r runner.Runner) *Scheduler

NewScheduler create new Scheduler instance

func (*Scheduler) Cancel

func (s *Scheduler) Cancel()

Cancel cancels executing tasks atomically loads the cancelled code

func (*Scheduler) Cancelled added in v1.7.5

func (s *Scheduler) Cancelled() int32

func (*Scheduler) Finish

func (s *Scheduler) Finish()

Finish finishes scheduler's TaskRunner

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(g *ExecutionGraph) error

Schedule starts execution of the given ExecutionGraph

Example
package main

import (
	"fmt"

	"github.com/Ensono/taskctl/pkg/scheduler"

	"github.com/Ensono/taskctl/pkg/runner"

	"github.com/Ensono/taskctl/pkg/task"
)

func main() {
	format := task.FromCommands("t1", "go fmt ./...")
	build := task.FromCommands("t2", "go build ./..")
	r, _ := runner.NewTaskRunner()
	s := scheduler.NewScheduler(r)

	graph, err := scheduler.NewExecutionGraph("t1",
		scheduler.NewStage(func(s *scheduler.Stage) {
			s.Name = "format"
			s.Task = format
		}),
		scheduler.NewStage(func(s *scheduler.Stage) {
			s.Name = "build"
			s.Task = build
			s.DependsOn = []string{"format"}
		}),
	)
	if err != nil {
		return
	}

	err = s.Schedule(graph)
	if err != nil {
		fmt.Println(err)
	}
}

type Stage

type Stage struct {
	Name         string
	Condition    string
	Task         *task.Task
	Pipeline     *ExecutionGraph
	DependsOn    []string
	Dir          string
	AllowFailure bool
	Status       *atomic.Int32
	Env          variables.Container
	Variables    variables.Container
	Start        time.Time
	End          time.Time
	Generator    map[string]any
}

Stage is a structure that describes execution stage Stage is a synonym for a Node in a the unary tree of the execution graph/tree

func NewStage

func NewStage(opts ...StageOpts) *Stage

func (*Stage) Duration

func (s *Stage) Duration() time.Duration

Duration returns stage's execution duration

func (*Stage) ReadStatus

func (s *Stage) ReadStatus() int32

ReadStatus is a helper to read stage's status atomically

func (*Stage) UpdateStatus

func (s *Stage) UpdateStatus(status int32)

UpdateStatus updates stage's status atomically

type StageOpts

type StageOpts func(*Stage)

StageOpts is the Node options

Pass in tasks/pipelines or other properties using the options pattern

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL