taskengine

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2022 License: GPL-3.0 Imports: 5 Imported by: 2

README

taskengine

Package taskengine concurrently execute a set of tasks assigned to multiple different workers.

Each worker can works all or a subset of the tasks.

When a worker is ready, the next task to execute is dynamically choosen considering the current status of the tasks so to maximize the thoughput of the tasks successfully executed.

After the first success result is found the remaining jobs for same task are cancelled.

The main types defined by the package are:

  • Engine
  • Task
  • Worker
  • WorkerTasks
  • Event

Engine

NewEngine

The NewEngine function initialize a new Engine object given the list of workers and the tasks of each worker.

func NewEngine(ws []*Worker, wts WorkerTasks) (*Engine, error)
Execute

The Execute method returns a chan that receives the workers results after each task execution.

func (eng *Engine) Execute(ctx context.Context, mode Mode) (chan Result, error)

The Execute method uses internally the ExecuteEvents method and filters the returned results based on the Mode parameter.

The Mode enum type represents the mode of execution:

  • AllResults: for each task returns the results of all the workers: success, error or canceled. Multiple success results might be returned if they happen almost at the same time.

  • SuccessOrErrorResults: for each task returns the success or error results. The canceled results are not returned. Multiple success results might be returned if they happen almost at the same time.

  • ResultsUntilFirstSuccess: for each task returns the results preceding the first success (included). At most one success is returned.

  • FirstSuccessOrLastResult: For each task returns only one result: the first success or the last result. At most one success is returned.

ExecuteEvents

The ExecuteEvents method returns a chan that receives all the Events generated by each task execution. For each (worker, task) pair, it is emitted a Start event followed by a final event that can be Success, Canceled or Error.

func (eng *Engine) ExecuteEvents(ctx context.Context) (chan *Event, error)

This method is useful to track the execution of the tasks: while Execute can only return the result on completion of execution, the ExecuteEvents method returns also the Start event at the beginning of execution (with a nil result).

Task

A Task represents a unit of work to be executed. Each task can be assigned to one or more workers. Two tasks are considered equivalent if they have the same TaskID.

NOTE: tasks with the same TaskID can be different object with different information; this allows a task object assigned to a worker to contain information specific to that worker.

type Task interface {
    TaskID() TaskID // Unique ID of the task
}

Worker

Each Worker has a WorkFunc that performs the task. Multiple instances of the same worker can be used in order to execute concurrently different tasks assign to the worker.

type Worker struct {
    WorkerID  WorkerID   // Unique ID of the worker
    Instances int        // Number of worker instances
    Work      WorkFunc   // The work function
}

The WorkFunc receives in input a context, the *Worker and the instance number of the worker and the Task, and returns an object that meets the Result interface.

type WorkFunc func(context.Context, *Worker, int, Task) Result

The Result interface has only the Error method.

type Result interface {
    Error() error
}

The Error method is used to determine the status of the task execution based on the error returned:

  • Success: if error is nil
  • Canceled: if error is context.Canceled
  • Error: otherwise

WorkerTasks

WorkerTasks type is a map that contains the tasks list of each WorkerID.

type WorkerTasks map[WorkerID]Tasks

Event

Event type contains the informations to track a task execution. Events objects are emitted by the engine.ExecuteEvents method. For each (worker, task) pair, it is emitted a Start event followeb by a final event that can be Success, Canceled or Error.

type Event struct {
    Result     Result // nil for Start event
    WorkerID   WorkerID
    WorkerInst int
    Task       Task
    TaskStat   TaskStat
    TimeStart  time.Time
    TimeEnd    time.Time // same as TimeStart for Start event
}
Type

The Type method returns the EventType of the event.

const (
    EventNil EventType = iota
    EventStart
    EventSuccess
    EventError
    EventCanceled
)

Documentation

Overview

Package taskengine concurrently execute a set of tasks assigned to multiple different workers.

Each worker can works all or a subset of the tasks.

When a worker is ready, the next task to execute is dynamically choosen considering the current status of the tasks so to maximize the thoughput of the tasks successfully executed.

After the first success result is found the remaining jobs for same task are cancelled.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FilterEventFunc added in v0.4.0

func FilterEventFunc(mode Mode) func(*Event) bool

FilterEventFunc returns a function that, given an *Event, returns true if the event satisfy the criteria of the Mode argument.

func IsFirstSuccessOrLastResult added in v0.4.0

func IsFirstSuccessOrLastResult(e *Event) bool

IsFirstSuccessOrLastResult returns true if it is the first success result or it is the last result and no success was previously found.

func IsResult added in v0.4.0

func IsResult(e *Event) bool

IsResult return true if the event has a not nil result i.e. not a start event.

func IsResultUntilFirstSuccess added in v0.4.0

func IsResultUntilFirstSuccess(e *Event) bool

IsResultUntilFirstSuccess returns true for all the results until the first success (included).

func IsSuccessOrError added in v0.4.0

func IsSuccessOrError(e *Event) bool

IsSuccessOrError returns true if it is a result and it is a success or an error result. Return false in case of canceled result.

Types

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine type is the main struct used to execute the tasks. It internally saves the inforations about the workers and the tasks of each worker.

func NewEngine

func NewEngine(ws []*Worker, wts WorkerTasks) (*Engine, error)

NewEngine initialize a new engine object from the list of workers and the tasks of each worker. It performs some sanity checks and returns error in case of incongruences.

func (*Engine) Execute

func (eng *Engine) Execute(ctx context.Context, mode Mode) (chan Result, error)

Execute returns a chan that receives the results generated by tasks execution. It calls the ExecuteEvents method and filters the returned results based on the Mode parameter.

func (*Engine) ExecuteEvents

func (eng *Engine) ExecuteEvents(ctx context.Context) (chan *Event, error)

ExecuteEvents returns a chan that receives all the Events generated by each task execution. For each (worker, task) pair, it is emitted a Start event followed by a final event that can be Success, Canceled or Error.

The method is useful to track the execution of the tasks: while the Execute method can only return the result on completion of execution, the ExecuteEvents method returns also the Start event at the beginning of execution (with a nil result).

type Event

type Event struct {
	Result     Result // nil for Start event
	WorkerID   WorkerID
	WorkerInst int
	Task       Task
	TaskStat   TaskStat
	TimeStart  time.Time
	TimeEnd    time.Time // same as TimeStart for Start event
}

Event type contains the informations of the task execution. Events objects are emitted by the engine.ExecuteEvents method. For each (worker, task) pair, it is emitted a Start event followeb by a final event that can be a Success, Canceled or Error event. The event.Type() method returns the type of event.

func (*Event) String

func (e *Event) String() string

String returns a representation of an event.

func (*Event) Type

func (e *Event) Type() EventType

Type method returns the type of Event.

type EventType

type EventType int
const (
	EventNil EventType = iota
	EventStart
	EventSuccess
	EventError
	EventCanceled
)

func (EventType) MarshalJSON added in v0.4.0

func (t EventType) MarshalJSON() ([]byte, error)

MarshalJSON returns the json representation of the EventType

func (EventType) String

func (t EventType) String() string

String representation of an EventType.

type Mode

type Mode int

Mode of execution for each task.

const (
	// For each task returns the result of all the workers: success, error or canceled.
	// Multiple success results can be returned.
	AllResults Mode = iota

	// For each task returns the success or error results.
	// The canceled resuts are not returned.
	// Multiple success results can be returned.
	SuccessOrErrorResults

	// For each task returns the results preceding the first success (included).
	// At most one success is returned.
	ResultsUntilFirstSuccess

	// For each task returns only one result: the first success or the last result.
	FirstSuccessOrLastResult
)

Values of mode of execution for each task.

type Result

type Result interface {

	// String representation of the Result.
	String() string

	// The error returned by the Work function.
	// It is used to determine the status of the task execution as follow
	//    Success:  error is nil
	//    Canceled: error is context.Canceled
	//    Error:    otherwise
	Error() error
}

Result is the interface that must be matched by the output of the Work function.

type Task

type Task interface {
	TaskID() TaskID
}

Task is a unit of work that can be executed by a worker Two or more task with the same TaskID are equivalent and possibly only one will be executed. Two or more task with the same TaskID can contain different information usefull for a specific worker.

type TaskID

type TaskID string

TaskID type definition.

type TaskStat

type TaskStat struct {
	Todo    int // how many workers have to do the task.
	Doing   int // how many workers are doing the task.
	Done    int // how many workers have done the task.
	Success int // how many workers have done the task with success.
}

TaskStat type object tracks the number of workers dealing with the task. It is used to dynamically choose the next task to execute.

func (*TaskStat) Completed

func (stat *TaskStat) Completed() bool

Completed returns if no worker has to do or is doing the task.

func (TaskStat) String

func (stat TaskStat) String() string

String representation of a TaskStat object.

type Tasks

type Tasks []Task

Tasks is an array of tasks.

type WorkFunc

type WorkFunc func(context.Context, *Worker, int, Task) Result

WorkFunc is the worker function to execute a given task. The int parameter represents the worker instance.

type Worker

type Worker struct {

	// Unique ID of the worker
	WorkerID WorkerID

	// Number of worker instances. Must be greater or equal 1
	Instances int

	// The work function
	Work WorkFunc
}

Worker is the unit (identified by WorkerID) that receives the Requests and executes a specific WorkFunc function to return the Responses. The Instances parameters represents the number of instances of each worker

type WorkerID

type WorkerID string

WorkerID type definition.

type WorkerTasks

type WorkerTasks map[WorkerID]Tasks

WorkerTasks is a map representing the tasks list of each worker

func (WorkerTasks) Clone

func (wts WorkerTasks) Clone() WorkerTasks

Clone method returns a cloned copy of the WorkerTasks object.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL