coretask

package
v0.51.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// LocalRunnerTaskStatPhaseWaiting indicates that the task is waiting for its dependencies to complete.
	LocalRunnerTaskStatPhaseWaiting = "WAITING"
	// LocalRunnerTaskStatPhaseRunning indicates that the task is currently running.
	LocalRunnerTaskStatPhaseRunning = "RUNNING"
	// LocalRunnerTaskStatPhaseStopped indicates that the task has finished its execution (either completed or failed).
	LocalRunnerTaskStatPhaseStopped = "STOPPED"
)
View Source
const (
	KHISystemPrefix = "khi.google.com/"
)

Variables

DefaultTaskGraphResolver is the default configuration of graph resolver used for constructing complete task graph.

View Source
var LabelKeyRequiredTask = NewTaskLabelKey[bool](KHISystemPrefix + "required-task")

LabelKeyRequiredTask is the task label to tell task resolver to always include the task in the task graph when the task is available.

View Source
var LabelKeySubsequentTaskRefs = NewTaskLabelKey[[]taskid.UntypedTaskReference](KHISystemPrefix + "subsquent-task-refs")

LabelKeySubsequentTaskRefs is the list of task references. These tasks are included in the task graph later and the included task reference this task.

View Source
var LabelKeyTaskSelectionPriority = NewTaskLabelKey[int](KHISystemPrefix + "task-selection-priority")

Functions

func GetTaskResult

func GetTaskResult[T any](ctx context.Context, reference taskid.TaskReference[T]) T

GetTaskResult retrieves the result of a previously executed task.

func GetTaskResultFromLocalRunner

func GetTaskResultFromLocalRunner[TaskResult any](runner *LocalRunner, taskRef taskid.TaskReference[TaskResult]) (TaskResult, bool)

GetTaskResultFromLocalRunner is a helper function to safely extract a specific task's result from a LocalRunner's result map. It provides a type-safe way to access results using a TaskReference.

func GetTaskResultOptional

func GetTaskResultOptional[T any](ctx context.Context, reference taskid.TaskReference[T]) (T, bool)

GetTaskResultOptional retrieves the result from previously executed task. Use GetTaskResult for the most cases this is for getting the task value from a task but that task won't depend on the task explicitly.

func HasDependency

func HasDependency(taskSet *TaskSet, dependencyFrom UntypedTask, dependencyTo UntypedTask) (bool, error)

HasDependency check if 2 tasks have dependency between them when the task graph was resolved with given task set.

func NewEqualFilter

func NewEqualFilter[T comparable](labelKey TaskLabelKey[T], value T, includeUndefined bool) filter.TypedMapFilter[T]

NewEqualFilter creates a new filter that matches exact label values

func NewLabelSet

func NewLabelSet(labelOpts ...LabelOpt) *typedmap.ReadonlyTypedMap

Construct the LabelSet with required fields.

func NewRequiredTaskLabel

func NewRequiredTaskLabel() *requiredTaskLabelImpl

InspectionTypeLabel returns a LabelOpt to mark the task is always included in the result task graph.

func RegisterTasks

func RegisterTasks(registry TaskRegistry, tasks ...UntypedTask) error

RegisterTasks registers multiple tasks into given registry.

func WrapErrorWithTaskInformation

func WrapErrorWithTaskInformation(ctx context.Context, err error) error

WrapErrorWithTaskInformation annotate given error with the current task information.

Types

type GraphResolver

type GraphResolver struct {
	// Rules is the list of rules to be applied in each iteration.
	Rules []GraphResolverRule
	// MaxIteration is the maximum number of iterations to perform before considering the resolution failed.
	MaxIteration int
}

GraphResolver iteratively applies a set of rules to determine the final set of tasks for the task graph. It starts with a set of required tasks and expands it based on the rules until a stable state is reached.

func NewGraphResolver

func NewGraphResolver(maxIteration int, rules ...GraphResolverRule) *GraphResolver

NewGraphResolver returns a new instance of GraphResolver with given rules and configurations.

func (*GraphResolver) Resolve

func (r *GraphResolver) Resolve(requiredTasks []UntypedTask, availableTasks []UntypedTask) ([]UntypedTask, error)

Resolve determines the final set of tasks for the task graph. It iteratively applies the configured rules, starting with the initial `requiredTasks`. The process continues until no more changes are made to the task list (a stable state) or the `MaxIteration` limit is reached.

type GraphResolverRule

type GraphResolverRule interface {
	// Name returns the name of the rule.
	Name() string
	// Resolve applies the rule to the current task graph, potentially adding, modifying tasks or removing tasks.
	Resolve(currentGraphTasks []UntypedTask, availableTasks []UntypedTask) (GraphResolverRuleResult, error)
}

GraphResolverRule provides a rule to change the list of tasks included in the task graph from the given mandatory tasks.

type GraphResolverRuleResult

type GraphResolverRuleResult struct {
	// Changed indicates whether the rule modified the task list.
	Changed bool
	// Tasks is the updated list of tasks after the rule has been applied.
	Tasks []UntypedTask
}

GraphResolverRuleResult represents the result of a single GraphResolverRule execution.

type Interceptor added in v0.50.0

type Interceptor func(ctx context.Context, task UntypedTask, next func(context.Context) (any, error)) (any, error)

Interceptor is a function that can intercept the execution of a task. It allows injecting custom logic before and after the task execution.

type LabelOpt

type LabelOpt interface {
	Write(labels *typedmap.TypedMap)
}

LabelOpt implementations wraps setting values to the task albels.

func FromLabels

func FromLabels(labels *typedmap.ReadonlyTypedMap) []LabelOpt

FromLabels creates a list of LabelOpt to clone the set of labels from a task to the other.

func NewSubsequentTaskRefsTaskLabel

func NewSubsequentTaskRefsTaskLabel(refs ...taskid.UntypedTaskReference) LabelOpt

NewSubsequentTaskRefsTaskLabel returns a LabelOpt to add subsequent task to the current task.

func WithLabelValue

func WithLabelValue[T any](labelKey TaskLabelKey[T], value T) LabelOpt

WithLabelValue creates a LabelOpt to store a single value associated to a label key.

func WithSelectionPriority

func WithSelectionPriority(priority int) LabelOpt

type LabelPredicate

type LabelPredicate[T any] = func(v T) bool

type LocalRunner

type LocalRunner struct {
	// contains filtered or unexported fields
}

LocalRunner executes a task graph defined by a TaskSet on the local machine. It manages task dependencies, concurrent execution, and result aggregation.

func NewLocalRunner

func NewLocalRunner(taskSet *TaskSet) (*LocalRunner, error)

NewLocalRunner creates and initializes a new LocalRunner for a given TaskSet. The TaskSet must be runnable (i.e., topologically sorted with all dependencies met). It returns an error if the provided TaskSet is not runnable.

func (*LocalRunner) AddInterceptor added in v0.50.0

func (r *LocalRunner) AddInterceptor(interceptor Interceptor)

AddInterceptor adds an interceptor to the runner. Interceptors are executed in the order they are added.

func (*LocalRunner) Result

func (r *LocalRunner) Result() (*typedmap.ReadonlyTypedMap, error)

Result returns the final results of the task graph execution. It returns a map of task results if the execution was successful, or an error if any task failed or the runner has not yet completed. This method should only be called after the channel from Wait() has been closed.

func (*LocalRunner) Run

func (r *LocalRunner) Run(ctx context.Context) error

Run starts the execution of the task graph in a non-blocking manner. It launches a goroutine to manage the entire execution process. It returns an error if the runner has already been started.

func (*LocalRunner) TaskStatuses

func (r *LocalRunner) TaskStatuses() []*LocalRunnerTaskStat

TaskStatuses returns a slice of LocalRunnerTaskStat, providing the status and execution details for each task in the runner's task set. The order of statuses corresponds to the order of tasks in the resolved TaskSet.

func (*LocalRunner) Tasks added in v0.50.0

func (r *LocalRunner) Tasks() []UntypedTask

func (*LocalRunner) Wait

func (r *LocalRunner) Wait() <-chan interface{}

Wait returns a channel that is closed when the runner finishes executing all tasks in the graph. This is the primary mechanism for waiting for the completion of the entire task set.

type LocalRunnerTaskStat

type LocalRunnerTaskStat struct {
	Phase     string
	Error     error
	StartTime time.Time
	EndTime   time.Time
}

LocalRunnerTaskStat holds the status and metrics for a single task executed by the LocalRunner.

type RequiredTaskLabelGraphResolverRule

type RequiredTaskLabelGraphResolverRule struct{}

RequiredTaskLabelGraphResolverRule is a resolver rule that adds tasks to the graph if they have the `LabelKeyRequiredTask` label set to true.

func (*RequiredTaskLabelGraphResolverRule) Name

Name implements GraphResolverRule.

func (*RequiredTaskLabelGraphResolverRule) Resolve

func (r *RequiredTaskLabelGraphResolverRule) Resolve(currentGraphTasks []UntypedTask, availableTasks []UntypedTask) (GraphResolverRuleResult, error)

Resolve implements GraphResolverRule.

type SubsequentTaskRefsGraphResolverRule

type SubsequentTaskRefsGraphResolverRule struct {
}

SubsequentTaskRefsGraphResolverRule is a resolver rule that adds tasks to the graph based on the `LabelKeySubsequentTaskRefs` label of tasks already in the graph. This rule won't resolve dependencies of tasks added as the subsequent task. This must be used with the `dependency` resolver rule.

func (*SubsequentTaskRefsGraphResolverRule) Name

Name implements GraphResolverRule.

func (*SubsequentTaskRefsGraphResolverRule) Resolve

func (s *SubsequentTaskRefsGraphResolverRule) Resolve(currentGraphTasks []UntypedTask, availableTasks []UntypedTask) (GraphResolverRuleResult, error)

Resolve ensures that subsequent tasks specified by the `LabelKeySubsequentTaskRefs` label are included in the graph. It dynamically updates dependencies to ensure that the subsequent task runs after the task that requested it.

type Task

type Task[TaskResult any] interface {
	UntypedTask
	// ID returns an unique TaskID of taskid.TaskImplementationID[TaskResult]
	// The implementation of this function must return a constant value.
	ID() taskid.TaskImplementationID[TaskResult]

	Run(ctx context.Context) (TaskResult, error)
}

Task is the fundamental interface that all of DAG nodes in KHI task system implements. The implementation of ID and Labels must be deterministic when the application started. The implementation of Sinks and Source must be pure function not depending anything outside of the argument.

type TaskDependencyGraphResolverRule

type TaskDependencyGraphResolverRule struct {
}

TaskDependencyGraphResolverRule is a resolver rule that adds tasks to the graph to satisfy the dependencies of tasks already in the graph.

func (*TaskDependencyGraphResolverRule) Name

Name implements GraphResolverRule.

func (*TaskDependencyGraphResolverRule) Resolve

func (d *TaskDependencyGraphResolverRule) Resolve(currentGraphTasks []UntypedTask, availableTasks []UntypedTask) (GraphResolverRuleResult, error)

Resolve adds tasks from the available pool to satisfy unmet dependencies of tasks currently in the graph. If multiple tasks can satisfy a single dependency, the one with the highest priority is chosen. It returns an error if a dependency cannot be resolved.

type TaskImpl

type TaskImpl[TaskResult any] struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask[TaskResult any](taskId taskid.TaskImplementationID[TaskResult], dependencies []taskid.UntypedTaskReference, runFunc func(ctx context.Context) (TaskResult, error), labelOpts ...LabelOpt) *TaskImpl[TaskResult]

func (*TaskImpl[TaskResult]) Dependencies

func (c *TaskImpl[TaskResult]) Dependencies() []taskid.UntypedTaskReference

Dependencies implements Task.

func (*TaskImpl[TaskResult]) ID

func (c *TaskImpl[TaskResult]) ID() taskid.TaskImplementationID[TaskResult]

ID implements Task.

func (*TaskImpl[TaskResult]) Labels

func (c *TaskImpl[TaskResult]) Labels() *typedmap.ReadonlyTypedMap

Labels implements Task.

func (*TaskImpl[TaskResult]) Run

func (c *TaskImpl[TaskResult]) Run(ctx context.Context) (TaskResult, error)

Run implements Task.

func (*TaskImpl[TaskResult]) UntypedID

func (c *TaskImpl[TaskResult]) UntypedID() taskid.UntypedTaskImplementationID

func (*TaskImpl[TaskResult]) UntypedRun

func (c *TaskImpl[TaskResult]) UntypedRun(ctx context.Context) (any, error)

type TaskLabelKey

type TaskLabelKey[LabelValueType any] = typedmap.TypedKey[LabelValueType]

TaskLabelKey is a key of labels given to task.

func NewTaskLabelKey

func NewTaskLabelKey[T any](key string) TaskLabelKey[T]

NewTaskLabelKey returns the key used in labels with type annotation,

type TaskRegistry

type TaskRegistry interface {
	// AddTask registers a type of a task.
	AddTask(task UntypedTask) error
}

TaskRegistry provides point to regsiter task.

type TaskRunner

type TaskRunner interface {
	Run(ctx context.Context) error
	Wait() <-chan interface{}
	Result() (*typedmap.ReadonlyTypedMap, error)
	Tasks() []UntypedTask
	AddInterceptor(interceptor Interceptor)
}

TaskRunner receives the runnable TaskSet and run tasks with topological sorted order.

type TaskSet

type TaskSet struct {
	// contains filtered or unexported fields
}

TaskSet is a collection of tasks. It has several collection operation features for constructing the task graph to execute.

func NewTaskSet

func NewTaskSet(tasks []UntypedTask) (*TaskSet, error)

NewTaskSet creates a new TaskSet with the given tasks. Returns an error if there are duplicate task IDs.

func Subset

func Subset[T any](taskSet *TaskSet, mapFilter filter.TypedMapFilter[T]) *TaskSet

Subset returns a new TaskSet filtered using the provided type-safe filter

func (*TaskSet) Add

func (s *TaskSet) Add(newTask UntypedTask) error

Add a task definiton to current TaskSet. Returns an error when duplicated task Id is assigned on the task.

func (*TaskSet) DumpGraphviz

func (s *TaskSet) DumpGraphviz() (string, error)

DumpGraphviz returns task graph as graphviz string for debugging purpose. The generated string can be converted to DAG graph using `dot` command.

func (*TaskSet) Get

func (s *TaskSet) Get(id string) (UntypedTask, error)

Get returns a task with the given string task ID notation.

func (*TaskSet) GetAll

func (s *TaskSet) GetAll() []UntypedTask

func (*TaskSet) Remove

func (s *TaskSet) Remove(id string) error

Remove a task definition from current DefinitionSet. Returns error if the definition does not exist

func (*TaskSet) ToRunnableTaskSet

func (s *TaskSet) ToRunnableTaskSet() (*TaskSet, error)

ToRunnableTaskSet sorts given task list as topological order.

type UntypedTask

type UntypedTask interface {
	UntypedID() taskid.UntypedTaskImplementationID
	// Labels returns KHITaskLabelSet assigned to this task unit.
	// The implementation of this function must return a constant value.
	Labels() *typedmap.ReadonlyTypedMap

	// Dependencies returns the list of task references. Task runner will wait these dependent tasks to be done before running this task.
	Dependencies() []taskid.UntypedTaskReference

	UntypedRun(ctx context.Context) (any, error)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL