master

package
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package master implements the components for executing goals and sub-tasks across multiple plugins concurrently with proper interleaving and error handling.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunTasksAndAccumulate

func RunTasksAndAccumulate[Idx comparable, In, Out any](
	ctx context.Context,
	inputs Iterable[Idx, In],
	task func(context.Context, Idx, In) (Out, error),
) ([]Out, error)

RunTasksAndAccumulate runs the task function for each input returned by the Iterable. Each task is run concurrently. The results and errors are accumulated. The accumulator values and errors and returned once all tasks have completed.

func RunTasksAndAccumulateErrors

func RunTasksAndAccumulateErrors[Idx comparable, In any](
	ctx context.Context,
	inputs Iterable[Idx, In],
	task func(context.Context, Idx, In) error,
) error

RunTasksAndAccumulateErrors runs all the given task against all the inputs concurrently. Then it returns any errors that were returned by any of those tasks.

Types

type DepNode

type DepNode struct {
	// Path is the task path for a task.
	Path string

	// Tasks is the list of tasks that the task named by Path requires to run
	// before it runs.
	Tasks []plugin.TaskDescription
}

DepNode is a vertex in the directed acyclic graph that is used to track task requirements for use in ordering tasks.

type DepsGraph

type DepsGraph struct {
	// contains filtered or unexported fields
}

DepsGraph is (intended to be) a directed acyclic graph that tracks the requirements dependencies between tasks.

func NewDepsGraph

func NewDepsGraph(goal string, tasks []plugin.TaskDescription) *DepsGraph

NewDepsGraph constructs a DepsGraph for the named goal with the given tasks. This immediately configures the direct acyclic graph (DAG) kept by the DepsGraph object.

This DAG is setup with two sets of dependencies:

  1. Hierarchical dependencies exist to goal from task from sub-task from sub-sub-task and so on. That is, a sub-sub-task has a directed edge to its parent sub-task, which has a directed edge to its parent task, which has a directed edge to its parent goal.

  2. Requirements dependencies exist from required task to task doing the requiring. That is, if /release/publish requires /release/mint, there exists an edge in the DAG pointing fro /release/mint to /release/publish.

The contained DAG has the relationship of requirements inverted. The most required node will have no edges pointing to it. Anything that requires something else will have one or more edges pointing at it in the graph.

func (*DepsGraph) GroupOrder

func (d *DepsGraph) GroupOrder() ([][]plugin.TaskDescription, error)

GroupOrder constructs an ordered list of plugin.TaskDescription groups based upon the DAG stored in DepsGraph. In the DAG, the most required ndoes (verticed) will have no edges directed to them. Therefore, to produce a set of tasks that are grouped and ordered, we do the following:

  1. Find all unmarked nodes that have no edges pointing to them by unmarked nodes. This becomes the next slice of plugin.TaskDescription objects.

  2. Mark all the nodes found in (1).

  3. Repeat (1) and (2) until all nodes are marked.

The result is a slice of slices of plugin.TaskDescription objects, which represent tasks that can safely be run concurrently.

Along the way, at least two error conditions could be encountered, which sill result in this method failing with an error:

  1. If an edge is encountered that refers to a node that does not exist (e.g., a plugin.TaskDescription has a requirement that belongs to another goal), an error is returned. A failed requirement indicates either a issing plugin dependency or a plugin that contains a serious bug.

  2. If while looking for unmarked nodes that have no edges poining to them by unmarked nodes we run into a case where we get zero such nodes, but the number of unmarked nodes is non-zero, then we have a cycle. This is a directed acyclic graph. A cycle indicates that some plugin contains a serious bug.

type Interface

type Interface struct {
	// contains filtered or unexported fields
}

Interface is a plugin.Interface implementation that aids in the execution of a set of other plugin.Interface implementations. This is combined with the use of InterfaceExecutor to provide a full set of tools for concurrently executing a goal or task.

func NewInterface

func NewInterface(
	logger hclog.Logger,
	cfg *config.Config,
	is map[string]plugin.Interface,
) *Interface

NewInterface creates a new Interface object for the given configuration and plugins.

func (*Interface) Cancel

func (ti *Interface) Cancel(
	ctx context.Context,
	pluginTask plugin.Task,
) error

Cancel performs cancellation for task in progress. It works to immediatly terminate and close out any resources held by all associated plugins.

func (*Interface) Complete

func (ti *Interface) Complete(
	ctx context.Context,
	pluginTask plugin.Task,
) error

Complete performs completion for the task in progress. It frees up resources held by the master.Interface as well as telling each plugin to free up any resources associated with task execution on their end for all plugins associated with this task.

func (*Interface) Define

func (ti *Interface) Define(values map[string]string)

Define records a new value to store in the in-memory properties used during interface execution.

func (*Interface) GetInterface

func (ti *Interface) GetInterface(name string) plugin.Interface

GetInterface retrieves the plugin.Interface for the named plugin.

func (*Interface) Goal

func (ti *Interface) Goal(
	ctx context.Context,
	name string,
) (plugin.GoalDescription, error)

Goal calls Goal for the given goal name on all associated plugins. If no plugin provides a plugin.GoalDescription for this goal, then plugin.ErrUnsupportedGoal is returned. Otherwise, the first GoalDescription received is returned. If multiple plugins describe a goal with the same name, the behavior is non-deterministic.

func (*Interface) Implements

func (ti *Interface) Implements(ctx context.Context) ([]plugin.TaskDescription, error)

Implements calls Implements on all the associated plugins and returns a combined list of all the tasks defined by all the plugins. It fails with an error if any plugin fails with an error.

func (*Interface) Prepare

func (ti *Interface) Prepare(
	ctx context.Context,
	taskName string,
) (plugin.Task, error)

Prepare calls the Prepare method on all plugins which impement the named task. This returns a pointer to a master.Task which is able to execute the task for all these plugins. If no plugin implements the named task, then this method fails with plugin.ErrUnsupportedTask instead.

func (*Interface) SetTargetName

func (ti *Interface) SetTargetName(name string)

SetTargetName changes the target used to select the configuration used during execution.

type InterfaceExecutor

type InterfaceExecutor struct {
	// contains filtered or unexported fields
}

InterfaceExecutor is a tool for executing plugin.Interface objects. It must be paired with the master.Interface to help perform this task.

These exist as separate objects because of the separation of concerns between these two objects. This object is focused on executing all the operations of a task in the correct order and then resolve any errors that occur correctly.

func NewExecutor

func NewExecutor(logger hclog.Logger, m *Interface) *InterfaceExecutor

NewExecutor creates a new InterfaceExecutor paired with the given Interface.

func (*InterfaceExecutor) Define

func (e *InterfaceExecutor) Define(values map[string]string)

Define is used to set properties from the command-line or other locations to be used when running the plugin.Interface.

func (*InterfaceExecutor) Execute

func (e *InterfaceExecutor) Execute(
	ctx context.Context,
	taskName string,
) error

Execute will execute a single task and return an error if execution fails.

func (*InterfaceExecutor) ExecuteAllStages

func (e *InterfaceExecutor) ExecuteAllStages(
	ctx context.Context,
	group *TaskGroup,
) error

ExecuteAllStages sorts all the stages for execution of a task into groups. These groups may represent implementations of multiple tasks to achieve a goal or a sub-task of a goal. These are each executed concurrently in order from most required to least required to complete the goal or subtask in its entirety.

Lots of error checking is performed. If anything goes wrong, this operation will fail with an error.

func (*InterfaceExecutor) ExecuteStage

func (e *InterfaceExecutor) ExecuteStage(
	ctx context.Context,
	stage []plugin.TaskDescription,
) error

ExecuteStage will execute a set of tasks concurrently. Any errors that occur executing this stage will be returned as an Error.

func (*InterfaceExecutor) SetTargetName

func (e *InterfaceExecutor) SetTargetName(name string)

SetTargetName is used to update the target name to use when configuring the plugin.Context used to execute plugin.Interface.

func (*InterfaceExecutor) TaskGroups

func (e *InterfaceExecutor) TaskGroups(
	ctx context.Context,
) ([]*TaskGroup, error)

TaskGroups builds an returns a slice of TaskGroup objects that will be executed as part of this InterfaceExecutor.

type Iterable

type Iterable[Idx comparable, Val any] interface {
	// Next increments the internal cursor to refer to the next object. It
	// returns true if another object exists or false if the end of iteration
	// has been reached.
	Next() bool

	// Id returns the ID value of the current value.
	Id() Idx

	// Value returns the value of the current value.
	Value() Val

	// Len reports the lengths of the iterated object.
	Len() int
}

Iterable is a helper to the concurrent execution tools RunTasksAndAccumulate and RunTasksAndAccumulateErrors. It provides a generic iterator that can be used to iterate over slices and maps of whatever types.

type MapIterator

type MapIterator[Idx comparable, Val any] struct {
	// contains filtered or unexported fields
}

MapIterator is a generic implementation of Iterable for maps.

func NewMapIterator

func NewMapIterator[Idx comparable, Val any](
	is map[Idx]Val,
) *MapIterator[Idx, Val]

NewMapIterator will create an iterator that iterates over a given map.

func (*MapIterator[Idx, Val]) Id

func (i *MapIterator[Idx, Val]) Id() Idx

Id returns the currnet key of the key/value pair iteration.

func (*MapIterator[Idx, Val]) Len

func (i *MapIterator[Idx, Val]) Len() int

Len returns the len of the underlying map.

func (*MapIterator[Idx, Val]) Next

func (i *MapIterator[Idx, Val]) Next() bool

Next returns false if there is no more work to do with this iterator. It returns true and increments the cursor pointer if there is more work to do.

func (*MapIterator[Idx, Val]) Value

func (i *MapIterator[Idx, Val]) Value() Val

Value returns the current value of the key/value pair iteration.

type OperationHandler

type OperationHandler struct {
	// contains filtered or unexported fields
}

OperationHandler implements plugin.OperationHandler and is able to execute all the operations for all the plugins associated with executing a particular task, operation, stage, and priority order.

func (*OperationHandler) Call

func (h *OperationHandler) Call(ctx context.Context) error

Call concurrently executes this associated operation and order in all plugins that can perform it. It initializes a plugin.Context for each and passes the associated configuration through to the plugin. Then, it updates the temporary properties for the task using the settings set by the plugin.

type SliceIterator

type SliceIterator[Val any] struct {
	// contains filtered or unexported fields
}

SliceIterator provide a generic implementation of Iterable over slice objects. The Idx type is always int, in this case.

func NewSliceIterator

func NewSliceIterator[Val any](
	is []Val,
) *SliceIterator[Val]

NewSliceIterator creates a new iterator for the given slice.

func (*SliceIterator[Val]) Id

func (i *SliceIterator[Val]) Id() int

Id returns the index of teh current slice element during iteration.

func (*SliceIterator[Val]) Len

func (i *SliceIterator[Val]) Len() int

Len returns the len of the underlying slice.

func (*SliceIterator[Val]) Next

func (i *SliceIterator[Val]) Next() bool

Next returns false if there are no more elements in the slice to process. It returns true and increments the index to operate upon if there is another element to process.

func (*SliceIterator[Val]) Value

func (i *SliceIterator[Val]) Value() Val

Value returns the value of the current slice element during iteration.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task implements plugin.Task by running the operations associated with a set of plugins whenever those operations are executed on this task object.

func (*Task) Begin

func (t *Task) Begin(ctx context.Context) (plugin.Operations, error)

Begin collects all the prioritized operations for the Begin stage of all associated plugins and returns a set of master.Operation objects that can execute them.

func (*Task) Check

func (t *Task) Check(ctx context.Context) error

Check executes the Check operation on all associated plugins concurrently.

func (*Task) End

func (t *Task) End(ctx context.Context) (plugin.Operations, error)

End collects all the prioritized operations for the End stage of all associated plugins and returns a set of master.Operation objects that can execute them.

func (*Task) Finish

func (t *Task) Finish(ctx context.Context) error

Finish executes the Finish operation on all associated plugins concurrently.

func (*Task) Run

func (t *Task) Run(ctx context.Context) (plugin.Operations, error)

Run collects all the prioritized operations for the Run stage of all associated plugins and returns a set of master.Operation objects that can execute them.

func (*Task) Setup

func (t *Task) Setup(ctx context.Context) error

Setup executes the Setup operation on all associated plugins concurrently.

func (*Task) Teardown

func (t *Task) Teardown(ctx context.Context) error

Teardown executes the Teardown operation on all associated plugins concurrently.

type TaskGroup

type TaskGroup struct {
	// Tree names the task path to which this TaskGroup belongs.
	Tree string

	// Goal is the goal these tasks belong to.
	Goal plugin.GoalDescription

	// Tasks are the tasks under that goal that are selected by the Tree, given
	// the current plugin load-out.
	Tasks []plugin.TaskDescription
}

TaskGroup is used to group either all or some sub-group of a goal's tasks for execution and sorting. For a given load-out of plugins, each TaskGroup is unique for a given Tree. The Tree names a task path.

func (*TaskGroup) ExecutionGroups

func (g *TaskGroup) ExecutionGroups() ([][]plugin.TaskDescription, error)

ExecutionGroups returns a prioritized list of tasks that can be executed in stages. Each item in the return slice is a group of tasks that can be executed concurrently as a unit. Each group must finish before the next group starts.

func (*TaskGroup) SubTasks

func (g *TaskGroup) SubTasks() []*TaskGroup

SubTasks returns a complete list of sub-tasks for the given task group. This will include every possible task group under the given task group.

func (*TaskGroup) TaskNames

func (g *TaskGroup) TaskNames() string

TaskNames is a comma-separated list of all tasks in the group.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL