Documentation
¶
Overview ¶
Package master implements the components for executing goals and sub-tasks across multiple plugins concurrently with proper interleaving and error handling.
Index ¶
- func RunTasksAndAccumulate[Idx comparable, In, Out any](ctx context.Context, inputs Iterable[Idx, In], ...) ([]Out, error)
- func RunTasksAndAccumulateErrors[Idx comparable, In any](ctx context.Context, inputs Iterable[Idx, In], ...) error
- type DepNode
- type DepsGraph
- type Interface
- func (ti *Interface) Cancel(ctx context.Context, pluginTask plugin.Task) error
- func (ti *Interface) Complete(ctx context.Context, pluginTask plugin.Task) error
- func (ti *Interface) Define(values map[string]string)
- func (ti *Interface) GetInterface(name string) plugin.Interface
- func (ti *Interface) Goal(ctx context.Context, name string) (plugin.GoalDescription, error)
- func (ti *Interface) Implements(ctx context.Context) ([]plugin.TaskDescription, error)
- func (ti *Interface) Prepare(ctx context.Context, taskName string) (plugin.Task, error)
- func (ti *Interface) SetTargetName(name string)
- type InterfaceExecutor
- func (e *InterfaceExecutor) Define(values map[string]string)
- func (e *InterfaceExecutor) Execute(ctx context.Context, taskName string) error
- func (e *InterfaceExecutor) ExecuteAllStages(ctx context.Context, group *TaskGroup) error
- func (e *InterfaceExecutor) ExecuteStage(ctx context.Context, stage []plugin.TaskDescription) error
- func (e *InterfaceExecutor) SetTargetName(name string)
- func (e *InterfaceExecutor) TaskGroups(ctx context.Context) ([]*TaskGroup, error)
- type Iterable
- type MapIterator
- type OperationHandler
- type SliceIterator
- type Task
- func (t *Task) Begin(ctx context.Context) (plugin.Operations, error)
- func (t *Task) Check(ctx context.Context) error
- func (t *Task) End(ctx context.Context) (plugin.Operations, error)
- func (t *Task) Finish(ctx context.Context) error
- func (t *Task) Run(ctx context.Context) (plugin.Operations, error)
- func (t *Task) Setup(ctx context.Context) error
- func (t *Task) Teardown(ctx context.Context) error
- type TaskGroup
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RunTasksAndAccumulate ¶
func RunTasksAndAccumulate[Idx comparable, In, Out any]( ctx context.Context, inputs Iterable[Idx, In], task func(context.Context, Idx, In) (Out, error), ) ([]Out, error)
RunTasksAndAccumulate runs the task function for each input returned by the Iterable. Each task is run concurrently. The results and errors are accumulated. The accumulator values and errors and returned once all tasks have completed.
func RunTasksAndAccumulateErrors ¶
func RunTasksAndAccumulateErrors[Idx comparable, In any]( ctx context.Context, inputs Iterable[Idx, In], task func(context.Context, Idx, In) error, ) error
RunTasksAndAccumulateErrors runs all the given task against all the inputs concurrently. Then it returns any errors that were returned by any of those tasks.
Types ¶
type DepNode ¶
type DepNode struct {
// Path is the task path for a task.
Path string
// Tasks is the list of tasks that the task named by Path requires to run
// before it runs.
Tasks []plugin.TaskDescription
}
DepNode is a vertex in the directed acyclic graph that is used to track task requirements for use in ordering tasks.
type DepsGraph ¶
type DepsGraph struct {
// contains filtered or unexported fields
}
DepsGraph is (intended to be) a directed acyclic graph that tracks the requirements dependencies between tasks.
func NewDepsGraph ¶
func NewDepsGraph(goal string, tasks []plugin.TaskDescription) *DepsGraph
NewDepsGraph constructs a DepsGraph for the named goal with the given tasks. This immediately configures the direct acyclic graph (DAG) kept by the DepsGraph object.
This DAG is setup with two sets of dependencies:
Hierarchical dependencies exist to goal from task from sub-task from sub-sub-task and so on. That is, a sub-sub-task has a directed edge to its parent sub-task, which has a directed edge to its parent task, which has a directed edge to its parent goal.
Requirements dependencies exist from required task to task doing the requiring. That is, if /release/publish requires /release/mint, there exists an edge in the DAG pointing fro /release/mint to /release/publish.
The contained DAG has the relationship of requirements inverted. The most required node will have no edges pointing to it. Anything that requires something else will have one or more edges pointing at it in the graph.
func (*DepsGraph) GroupOrder ¶
func (d *DepsGraph) GroupOrder() ([][]plugin.TaskDescription, error)
GroupOrder constructs an ordered list of plugin.TaskDescription groups based upon the DAG stored in DepsGraph. In the DAG, the most required ndoes (verticed) will have no edges directed to them. Therefore, to produce a set of tasks that are grouped and ordered, we do the following:
Find all unmarked nodes that have no edges pointing to them by unmarked nodes. This becomes the next slice of plugin.TaskDescription objects.
Mark all the nodes found in (1).
Repeat (1) and (2) until all nodes are marked.
The result is a slice of slices of plugin.TaskDescription objects, which represent tasks that can safely be run concurrently.
Along the way, at least two error conditions could be encountered, which sill result in this method failing with an error:
If an edge is encountered that refers to a node that does not exist (e.g., a plugin.TaskDescription has a requirement that belongs to another goal), an error is returned. A failed requirement indicates either a issing plugin dependency or a plugin that contains a serious bug.
If while looking for unmarked nodes that have no edges poining to them by unmarked nodes we run into a case where we get zero such nodes, but the number of unmarked nodes is non-zero, then we have a cycle. This is a directed acyclic graph. A cycle indicates that some plugin contains a serious bug.
type Interface ¶
type Interface struct {
// contains filtered or unexported fields
}
Interface is a plugin.Interface implementation that aids in the execution of a set of other plugin.Interface implementations. This is combined with the use of InterfaceExecutor to provide a full set of tools for concurrently executing a goal or task.
func NewInterface ¶
func NewInterface( logger hclog.Logger, cfg *config.Config, is map[string]plugin.Interface, ) *Interface
NewInterface creates a new Interface object for the given configuration and plugins.
func (*Interface) Cancel ¶
Cancel performs cancellation for task in progress. It works to immediatly terminate and close out any resources held by all associated plugins.
func (*Interface) Complete ¶
Complete performs completion for the task in progress. It frees up resources held by the master.Interface as well as telling each plugin to free up any resources associated with task execution on their end for all plugins associated with this task.
func (*Interface) Define ¶
Define records a new value to store in the in-memory properties used during interface execution.
func (*Interface) GetInterface ¶
GetInterface retrieves the plugin.Interface for the named plugin.
func (*Interface) Goal ¶
Goal calls Goal for the given goal name on all associated plugins. If no plugin provides a plugin.GoalDescription for this goal, then plugin.ErrUnsupportedGoal is returned. Otherwise, the first GoalDescription received is returned. If multiple plugins describe a goal with the same name, the behavior is non-deterministic.
func (*Interface) Implements ¶
Implements calls Implements on all the associated plugins and returns a combined list of all the tasks defined by all the plugins. It fails with an error if any plugin fails with an error.
func (*Interface) Prepare ¶
Prepare calls the Prepare method on all plugins which impement the named task. This returns a pointer to a master.Task which is able to execute the task for all these plugins. If no plugin implements the named task, then this method fails with plugin.ErrUnsupportedTask instead.
func (*Interface) SetTargetName ¶
SetTargetName changes the target used to select the configuration used during execution.
type InterfaceExecutor ¶
type InterfaceExecutor struct {
// contains filtered or unexported fields
}
InterfaceExecutor is a tool for executing plugin.Interface objects. It must be paired with the master.Interface to help perform this task.
These exist as separate objects because of the separation of concerns between these two objects. This object is focused on executing all the operations of a task in the correct order and then resolve any errors that occur correctly.
func NewExecutor ¶
func NewExecutor(logger hclog.Logger, m *Interface) *InterfaceExecutor
NewExecutor creates a new InterfaceExecutor paired with the given Interface.
func (*InterfaceExecutor) Define ¶
func (e *InterfaceExecutor) Define(values map[string]string)
Define is used to set properties from the command-line or other locations to be used when running the plugin.Interface.
func (*InterfaceExecutor) Execute ¶
func (e *InterfaceExecutor) Execute( ctx context.Context, taskName string, ) error
Execute will execute a single task and return an error if execution fails.
func (*InterfaceExecutor) ExecuteAllStages ¶
func (e *InterfaceExecutor) ExecuteAllStages( ctx context.Context, group *TaskGroup, ) error
ExecuteAllStages sorts all the stages for execution of a task into groups. These groups may represent implementations of multiple tasks to achieve a goal or a sub-task of a goal. These are each executed concurrently in order from most required to least required to complete the goal or subtask in its entirety.
Lots of error checking is performed. If anything goes wrong, this operation will fail with an error.
func (*InterfaceExecutor) ExecuteStage ¶
func (e *InterfaceExecutor) ExecuteStage( ctx context.Context, stage []plugin.TaskDescription, ) error
ExecuteStage will execute a set of tasks concurrently. Any errors that occur executing this stage will be returned as an Error.
func (*InterfaceExecutor) SetTargetName ¶
func (e *InterfaceExecutor) SetTargetName(name string)
SetTargetName is used to update the target name to use when configuring the plugin.Context used to execute plugin.Interface.
func (*InterfaceExecutor) TaskGroups ¶
func (e *InterfaceExecutor) TaskGroups( ctx context.Context, ) ([]*TaskGroup, error)
TaskGroups builds an returns a slice of TaskGroup objects that will be executed as part of this InterfaceExecutor.
type Iterable ¶
type Iterable[Idx comparable, Val any] interface { // Next increments the internal cursor to refer to the next object. It // returns true if another object exists or false if the end of iteration // has been reached. Next() bool // Id returns the ID value of the current value. Id() Idx // Value returns the value of the current value. Value() Val // Len reports the lengths of the iterated object. Len() int }
Iterable is a helper to the concurrent execution tools RunTasksAndAccumulate and RunTasksAndAccumulateErrors. It provides a generic iterator that can be used to iterate over slices and maps of whatever types.
type MapIterator ¶
type MapIterator[Idx comparable, Val any] struct { // contains filtered or unexported fields }
MapIterator is a generic implementation of Iterable for maps.
func NewMapIterator ¶
func NewMapIterator[Idx comparable, Val any]( is map[Idx]Val, ) *MapIterator[Idx, Val]
NewMapIterator will create an iterator that iterates over a given map.
func (*MapIterator[Idx, Val]) Id ¶
func (i *MapIterator[Idx, Val]) Id() Idx
Id returns the currnet key of the key/value pair iteration.
func (*MapIterator[Idx, Val]) Len ¶
func (i *MapIterator[Idx, Val]) Len() int
Len returns the len of the underlying map.
func (*MapIterator[Idx, Val]) Next ¶
func (i *MapIterator[Idx, Val]) Next() bool
Next returns false if there is no more work to do with this iterator. It returns true and increments the cursor pointer if there is more work to do.
func (*MapIterator[Idx, Val]) Value ¶
func (i *MapIterator[Idx, Val]) Value() Val
Value returns the current value of the key/value pair iteration.
type OperationHandler ¶
type OperationHandler struct {
// contains filtered or unexported fields
}
OperationHandler implements plugin.OperationHandler and is able to execute all the operations for all the plugins associated with executing a particular task, operation, stage, and priority order.
func (*OperationHandler) Call ¶
func (h *OperationHandler) Call(ctx context.Context) error
Call concurrently executes this associated operation and order in all plugins that can perform it. It initializes a plugin.Context for each and passes the associated configuration through to the plugin. Then, it updates the temporary properties for the task using the settings set by the plugin.
type SliceIterator ¶
type SliceIterator[Val any] struct { // contains filtered or unexported fields }
SliceIterator provide a generic implementation of Iterable over slice objects. The Idx type is always int, in this case.
func NewSliceIterator ¶
func NewSliceIterator[Val any]( is []Val, ) *SliceIterator[Val]
NewSliceIterator creates a new iterator for the given slice.
func (*SliceIterator[Val]) Id ¶
func (i *SliceIterator[Val]) Id() int
Id returns the index of teh current slice element during iteration.
func (*SliceIterator[Val]) Len ¶
func (i *SliceIterator[Val]) Len() int
Len returns the len of the underlying slice.
func (*SliceIterator[Val]) Next ¶
func (i *SliceIterator[Val]) Next() bool
Next returns false if there are no more elements in the slice to process. It returns true and increments the index to operate upon if there is another element to process.
func (*SliceIterator[Val]) Value ¶
func (i *SliceIterator[Val]) Value() Val
Value returns the value of the current slice element during iteration.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task implements plugin.Task by running the operations associated with a set of plugins whenever those operations are executed on this task object.
func (*Task) Begin ¶
Begin collects all the prioritized operations for the Begin stage of all associated plugins and returns a set of master.Operation objects that can execute them.
func (*Task) End ¶
End collects all the prioritized operations for the End stage of all associated plugins and returns a set of master.Operation objects that can execute them.
func (*Task) Run ¶
Run collects all the prioritized operations for the Run stage of all associated plugins and returns a set of master.Operation objects that can execute them.
type TaskGroup ¶
type TaskGroup struct {
// Tree names the task path to which this TaskGroup belongs.
Tree string
// Goal is the goal these tasks belong to.
Goal plugin.GoalDescription
// Tasks are the tasks under that goal that are selected by the Tree, given
// the current plugin load-out.
Tasks []plugin.TaskDescription
}
TaskGroup is used to group either all or some sub-group of a goal's tasks for execution and sorting. For a given load-out of plugins, each TaskGroup is unique for a given Tree. The Tree names a task path.
func (*TaskGroup) ExecutionGroups ¶
func (g *TaskGroup) ExecutionGroups() ([][]plugin.TaskDescription, error)
ExecutionGroups returns a prioritized list of tasks that can be executed in stages. Each item in the return slice is a group of tasks that can be executed concurrently as a unit. Each group must finish before the next group starts.