Documentation
¶
Index ¶
- Constants
- Variables
- func GetTaskResult[T any](ctx context.Context, reference taskid.TaskReference[T]) T
- func GetTaskResultFromLocalRunner[TaskResult any](runner *LocalRunner, taskRef taskid.TaskReference[TaskResult]) (TaskResult, bool)
- func GetTaskResultOptional[T any](ctx context.Context, reference taskid.TaskReference[T]) (T, bool)
- func HasDependency(taskSet *TaskSet, dependencyFrom UntypedTask, dependencyTo UntypedTask) (bool, error)
- func NewEqualFilter[T comparable](labelKey TaskLabelKey[T], value T, includeUndefined bool) filter.TypedMapFilter[T]
- func NewLabelSet(labelOpts ...LabelOpt) *typedmap.ReadonlyTypedMap
- func NewRequiredTaskLabel() *requiredTaskLabelImpl
- func RegisterTasks(registry TaskRegistry, tasks ...UntypedTask) error
- func WrapErrorWithTaskInformation(ctx context.Context, err error) error
- type GraphResolver
- type GraphResolverRule
- type GraphResolverRuleResult
- type LabelOpt
- type LabelPredicate
- type LocalRunner
- type LocalRunnerTaskStat
- type RequiredTaskLabelGraphResolverRule
- type SubsequentTaskRefsGraphResolverRule
- type Task
- type TaskDependencyGraphResolverRule
- type TaskImpl
- func (c *TaskImpl[TaskResult]) Dependencies() []taskid.UntypedTaskReference
- func (c *TaskImpl[TaskResult]) ID() taskid.TaskImplementationID[TaskResult]
- func (c *TaskImpl[TaskResult]) Labels() *typedmap.ReadonlyTypedMap
- func (c *TaskImpl[TaskResult]) Run(ctx context.Context) (TaskResult, error)
- func (c *TaskImpl[TaskResult]) UntypedID() taskid.UntypedTaskImplementationID
- func (c *TaskImpl[TaskResult]) UntypedRun(ctx context.Context) (any, error)
- type TaskLabelKey
- type TaskRegistry
- type TaskRunner
- type TaskSet
- type UntypedTask
Constants ¶
const ( // LocalRunnerTaskStatPhaseWaiting indicates that the task is waiting for its dependencies to complete. LocalRunnerTaskStatPhaseWaiting = "WAITING" // LocalRunnerTaskStatPhaseRunning indicates that the task is currently running. LocalRunnerTaskStatPhaseRunning = "RUNNING" // LocalRunnerTaskStatPhaseStopped indicates that the task has finished its execution (either completed or failed). LocalRunnerTaskStatPhaseStopped = "STOPPED" )
const (
KHISystemPrefix = "khi.google.com/"
)
Variables ¶
var DefaultTaskGraphResolver = NewGraphResolver(100, &RequiredTaskLabelGraphResolverRule{}, &TaskDependencyGraphResolverRule{}, &SubsequentTaskRefsGraphResolverRule{}, )
DefaultTaskGraphResolver is the default configuration of graph resolver used for constructing complete task graph.
var LabelKeyRequiredTask = NewTaskLabelKey[bool](KHISystemPrefix + "required-task")
LabelKeyRequiredTask is the task label to tell task resolver to always include the task in the task graph when the task is available.
var LabelKeySubsequentTaskRefs = NewTaskLabelKey[[]taskid.UntypedTaskReference](KHISystemPrefix + "subsquent-task-refs")
LabelKeySubsequentTaskRefs is the list of task references. These tasks are included in the task graph later and the included task reference this task.
var LabelKeyTaskSelectionPriority = NewTaskLabelKey[int](KHISystemPrefix + "task-selection-priority")
Functions ¶
func GetTaskResult ¶
func GetTaskResult[T any](ctx context.Context, reference taskid.TaskReference[T]) T
GetTaskResult retrieves the result of a previously executed task.
func GetTaskResultFromLocalRunner ¶
func GetTaskResultFromLocalRunner[TaskResult any](runner *LocalRunner, taskRef taskid.TaskReference[TaskResult]) (TaskResult, bool)
GetTaskResultFromLocalRunner is a helper function to safely extract a specific task's result from a LocalRunner's result map. It provides a type-safe way to access results using a TaskReference.
func GetTaskResultOptional ¶
GetTaskResultOptional retrieves the result from previously executed task. Use GetTaskResult for the most cases this is for getting the task value from a task but that task won't depend on the task explicitly.
func HasDependency ¶
func HasDependency(taskSet *TaskSet, dependencyFrom UntypedTask, dependencyTo UntypedTask) (bool, error)
HasDependency check if 2 tasks have dependency between them when the task graph was resolved with given task set.
func NewEqualFilter ¶
func NewEqualFilter[T comparable](labelKey TaskLabelKey[T], value T, includeUndefined bool) filter.TypedMapFilter[T]
NewEqualFilter creates a new filter that matches exact label values
func NewLabelSet ¶
func NewLabelSet(labelOpts ...LabelOpt) *typedmap.ReadonlyTypedMap
Construct the LabelSet with required fields.
func NewRequiredTaskLabel ¶
func NewRequiredTaskLabel() *requiredTaskLabelImpl
InspectionTypeLabel returns a LabelOpt to mark the task is always included in the result task graph.
func RegisterTasks ¶
func RegisterTasks(registry TaskRegistry, tasks ...UntypedTask) error
RegisterTasks registers multiple tasks into given registry.
Types ¶
type GraphResolver ¶
type GraphResolver struct {
// Rules is the list of rules to be applied in each iteration.
Rules []GraphResolverRule
// MaxIteration is the maximum number of iterations to perform before considering the resolution failed.
MaxIteration int
}
GraphResolver iteratively applies a set of rules to determine the final set of tasks for the task graph. It starts with a set of required tasks and expands it based on the rules until a stable state is reached.
func NewGraphResolver ¶
func NewGraphResolver(maxIteration int, rules ...GraphResolverRule) *GraphResolver
NewGraphResolver returns a new instance of GraphResolver with given rules and configurations.
func (*GraphResolver) Resolve ¶
func (r *GraphResolver) Resolve(requiredTasks []UntypedTask, availableTasks []UntypedTask) ([]UntypedTask, error)
Resolve determines the final set of tasks for the task graph. It iteratively applies the configured rules, starting with the initial `requiredTasks`. The process continues until no more changes are made to the task list (a stable state) or the `MaxIteration` limit is reached.
type GraphResolverRule ¶
type GraphResolverRule interface {
// Name returns the name of the rule.
Name() string
// Resolve applies the rule to the current task graph, potentially adding, modifying tasks or removing tasks.
Resolve(currentGraphTasks []UntypedTask, availableTasks []UntypedTask) (GraphResolverRuleResult, error)
}
GraphResolverRule provides a rule to change the list of tasks included in the task graph from the given mandatory tasks.
type GraphResolverRuleResult ¶
type GraphResolverRuleResult struct {
// Changed indicates whether the rule modified the task list.
Changed bool
// Tasks is the updated list of tasks after the rule has been applied.
Tasks []UntypedTask
}
GraphResolverRuleResult represents the result of a single GraphResolverRule execution.
type LabelOpt ¶
LabelOpt implementations wraps setting values to the task albels.
func FromLabels ¶
func FromLabels(labels *typedmap.ReadonlyTypedMap) []LabelOpt
FromLabels creates a list of LabelOpt to clone the set of labels from a task to the other.
func NewSubsequentTaskRefsTaskLabel ¶
func NewSubsequentTaskRefsTaskLabel(refs ...taskid.UntypedTaskReference) LabelOpt
NewSubsequentTaskRefsTaskLabel returns a LabelOpt to add subsequent task to the current task.
func WithLabelValue ¶
func WithLabelValue[T any](labelKey TaskLabelKey[T], value T) LabelOpt
WithLabelValue creates a LabelOpt to store a single value associated to a label key.
func WithSelectionPriority ¶
type LabelPredicate ¶
type LocalRunner ¶
type LocalRunner struct {
// contains filtered or unexported fields
}
LocalRunner executes a task graph defined by a TaskSet on the local machine. It manages task dependencies, concurrent execution, and result aggregation.
func NewLocalRunner ¶
func NewLocalRunner(taskSet *TaskSet) (*LocalRunner, error)
NewLocalRunner creates and initializes a new LocalRunner for a given TaskSet. The TaskSet must be runnable (i.e., topologically sorted with all dependencies met). It returns an error if the provided TaskSet is not runnable.
func (*LocalRunner) Result ¶
func (r *LocalRunner) Result() (*typedmap.ReadonlyTypedMap, error)
Result returns the final results of the task graph execution. It returns a map of task results if the execution was successful, or an error if any task failed or the runner has not yet completed. This method should only be called after the channel from Wait() has been closed.
func (*LocalRunner) Run ¶
func (r *LocalRunner) Run(ctx context.Context) error
Run starts the execution of the task graph in a non-blocking manner. It launches a goroutine to manage the entire execution process. It returns an error if the runner has already been started.
func (*LocalRunner) TaskStatuses ¶
func (r *LocalRunner) TaskStatuses() []*LocalRunnerTaskStat
TaskStatuses returns a slice of LocalRunnerTaskStat, providing the status and execution details for each task in the runner's task set. The order of statuses corresponds to the order of tasks in the resolved TaskSet.
func (*LocalRunner) Wait ¶
func (r *LocalRunner) Wait() <-chan interface{}
Wait returns a channel that is closed when the runner finishes executing all tasks in the graph. This is the primary mechanism for waiting for the completion of the entire task set.
type LocalRunnerTaskStat ¶
LocalRunnerTaskStat holds the status and metrics for a single task executed by the LocalRunner.
type RequiredTaskLabelGraphResolverRule ¶
type RequiredTaskLabelGraphResolverRule struct{}
RequiredTaskLabelGraphResolverRule is a resolver rule that adds tasks to the graph if they have the `LabelKeyRequiredTask` label set to true.
func (*RequiredTaskLabelGraphResolverRule) Name ¶
func (r *RequiredTaskLabelGraphResolverRule) Name() string
Name implements GraphResolverRule.
func (*RequiredTaskLabelGraphResolverRule) Resolve ¶
func (r *RequiredTaskLabelGraphResolverRule) Resolve(currentGraphTasks []UntypedTask, availableTasks []UntypedTask) (GraphResolverRuleResult, error)
Resolve implements GraphResolverRule.
type SubsequentTaskRefsGraphResolverRule ¶
type SubsequentTaskRefsGraphResolverRule struct {
}
SubsequentTaskRefsGraphResolverRule is a resolver rule that adds tasks to the graph based on the `LabelKeySubsequentTaskRefs` label of tasks already in the graph. This rule won't resolve dependencies of tasks added as the subsequent task. This must be used with the `dependency` resolver rule.
func (*SubsequentTaskRefsGraphResolverRule) Name ¶
func (s *SubsequentTaskRefsGraphResolverRule) Name() string
Name implements GraphResolverRule.
func (*SubsequentTaskRefsGraphResolverRule) Resolve ¶
func (s *SubsequentTaskRefsGraphResolverRule) Resolve(currentGraphTasks []UntypedTask, availableTasks []UntypedTask) (GraphResolverRuleResult, error)
Resolve ensures that subsequent tasks specified by the `LabelKeySubsequentTaskRefs` label are included in the graph. It dynamically updates dependencies to ensure that the subsequent task runs after the task that requested it.
type Task ¶
type Task[TaskResult any] interface { UntypedTask // ID returns an unique TaskID of taskid.TaskImplementationID[TaskResult] // The implementation of this function must return a constant value. ID() taskid.TaskImplementationID[TaskResult] Run(ctx context.Context) (TaskResult, error) }
Task is the fundamental interface that all of DAG nodes in KHI task system implements. The implementation of ID and Labels must be deterministic when the application started. The implementation of Sinks and Source must be pure function not depending anything outside of the argument.
type TaskDependencyGraphResolverRule ¶
type TaskDependencyGraphResolverRule struct {
}
TaskDependencyGraphResolverRule is a resolver rule that adds tasks to the graph to satisfy the dependencies of tasks already in the graph.
func (*TaskDependencyGraphResolverRule) Name ¶
func (d *TaskDependencyGraphResolverRule) Name() string
Name implements GraphResolverRule.
func (*TaskDependencyGraphResolverRule) Resolve ¶
func (d *TaskDependencyGraphResolverRule) Resolve(currentGraphTasks []UntypedTask, availableTasks []UntypedTask) (GraphResolverRuleResult, error)
Resolve adds tasks from the available pool to satisfy unmet dependencies of tasks currently in the graph. If multiple tasks can satisfy a single dependency, the one with the highest priority is chosen. It returns an error if a dependency cannot be resolved.
type TaskImpl ¶
type TaskImpl[TaskResult any] struct { // contains filtered or unexported fields }
func NewTask ¶
func NewTask[TaskResult any](taskId taskid.TaskImplementationID[TaskResult], dependencies []taskid.UntypedTaskReference, runFunc func(ctx context.Context) (TaskResult, error), labelOpts ...LabelOpt) *TaskImpl[TaskResult]
func (*TaskImpl[TaskResult]) Dependencies ¶
func (c *TaskImpl[TaskResult]) Dependencies() []taskid.UntypedTaskReference
Dependencies implements Task.
func (*TaskImpl[TaskResult]) ID ¶
func (c *TaskImpl[TaskResult]) ID() taskid.TaskImplementationID[TaskResult]
ID implements Task.
func (*TaskImpl[TaskResult]) Labels ¶
func (c *TaskImpl[TaskResult]) Labels() *typedmap.ReadonlyTypedMap
Labels implements Task.
func (*TaskImpl[TaskResult]) UntypedID ¶
func (c *TaskImpl[TaskResult]) UntypedID() taskid.UntypedTaskImplementationID
type TaskLabelKey ¶
TaskLabelKey is a key of labels given to task.
func NewTaskLabelKey ¶
func NewTaskLabelKey[T any](key string) TaskLabelKey[T]
NewTaskLabelKey returns the key used in labels with type annotation,
type TaskRegistry ¶
type TaskRegistry interface {
// AddTask registers a type of a task.
AddTask(task UntypedTask) error
}
TaskRegistry provides point to regsiter task.
type TaskRunner ¶
type TaskRunner interface {
Run(ctx context.Context) error
Wait() <-chan interface{}
Result() (*typedmap.ReadonlyTypedMap, error)
}
TaskRunner receives the runnable TaskSet and run tasks with topological sorted order.
type TaskSet ¶
type TaskSet struct {
// contains filtered or unexported fields
}
TaskSet is a collection of tasks. It has several collection operation features for constructing the task graph to execute.
func NewTaskSet ¶
func NewTaskSet(tasks []UntypedTask) (*TaskSet, error)
NewTaskSet creates a new TaskSet with the given tasks. Returns an error if there are duplicate task IDs.
func Subset ¶
func Subset[T any](taskSet *TaskSet, mapFilter filter.TypedMapFilter[T]) *TaskSet
Subset returns a new TaskSet filtered using the provided type-safe filter
func (*TaskSet) Add ¶
func (s *TaskSet) Add(newTask UntypedTask) error
Add a task definiton to current TaskSet. Returns an error when duplicated task Id is assigned on the task.
func (*TaskSet) DumpGraphviz ¶
DumpGraphviz returns task graph as graphviz string for debugging purpose. The generated string can be converted to DAG graph using `dot` command.
func (*TaskSet) Get ¶
func (s *TaskSet) Get(id string) (UntypedTask, error)
Get returns a task with the given string task ID notation.
func (*TaskSet) GetAll ¶
func (s *TaskSet) GetAll() []UntypedTask
func (*TaskSet) Remove ¶
Remove a task definition from current DefinitionSet. Returns error if the definition does not exist
func (*TaskSet) ToRunnableTaskSet ¶
ToRunnableTaskSet sorts given task list as topological order.
type UntypedTask ¶
type UntypedTask interface {
UntypedID() taskid.UntypedTaskImplementationID
// Labels returns KHITaskLabelSet assigned to this task unit.
// The implementation of this function must return a constant value.
Labels() *typedmap.ReadonlyTypedMap
// Dependencies returns the list of task references. Task runner will wait these dependent tasks to be done before running this task.
Dependencies() []taskid.UntypedTaskReference
UntypedRun(ctx context.Context) (any, error)
}