Documentation
¶
Overview ¶
Package taskengine concurrently execute a set of tasks assigned to multiple different workers.
Each worker can works all or a subset of the tasks.
When a worker is ready, the next task to execute is dynamically choosen considering the current status of the tasks so to maximize the thoughput of the tasks successfully executed.
After the first success result is found the remaining jobs for same task are cancelled.
Index ¶
- func FilterEventFunc(mode Mode) func(*Event) bool
- func IsFirstSuccessOrLastResult(e *Event) bool
- func IsResult(e *Event) bool
- func IsResultUntilFirstSuccess(e *Event) bool
- func IsSuccessOrError(e *Event) bool
- type Engine
- type Event
- type EventType
- type Mode
- type Result
- type Task
- type TaskID
- type TaskStat
- type Tasks
- type WorkFunc
- type Worker
- type WorkerID
- type WorkerTasks
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FilterEventFunc ¶ added in v0.4.0
FilterEventFunc returns a function that, given an *Event, returns true if the event satisfy the criteria of the Mode argument.
func IsFirstSuccessOrLastResult ¶ added in v0.4.0
IsFirstSuccessOrLastResult returns true if it is the first success result or it is the last result and no success was previously found.
func IsResult ¶ added in v0.4.0
IsResult return true if the event has a not nil result i.e. not a start event.
func IsResultUntilFirstSuccess ¶ added in v0.4.0
IsResultUntilFirstSuccess returns true for all the results until the first success (included).
func IsSuccessOrError ¶ added in v0.4.0
IsSuccessOrError returns true if it is a result and it is a success or an error result. Return false in case of canceled result.
Types ¶
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine type is the main struct used to execute the tasks. It internally saves the inforations about the workers and the tasks of each worker.
func NewEngine ¶
func NewEngine(ws []*Worker, wts WorkerTasks) (*Engine, error)
NewEngine initialize a new engine object from the list of workers and the tasks of each worker. It performs some sanity checks and returns error in case of incongruences.
func (*Engine) Execute ¶
Execute returns a chan that receives the results generated by tasks execution. It calls the ExecuteEvents method and filters the returned results based on the Mode parameter.
func (*Engine) ExecuteEvents ¶
ExecuteEvents returns a chan that receives all the Events generated by each task execution. For each (worker, task) pair, it is emitted a Start event followed by a final event that can be Success, Canceled or Error.
The method is useful to track the execution of the tasks: while the Execute method can only return the result on completion of execution, the ExecuteEvents method returns also the Start event at the beginning of execution (with a nil result).
type Event ¶
type Event struct {
Result Result // nil for Start event
WorkerID WorkerID
WorkerInst int
Task Task
TaskStat TaskStat
TimeStart time.Time
TimeEnd time.Time // same as TimeStart for Start event
}
Event type contains the informations of the task execution. Events objects are emitted by the engine.ExecuteEvents method. For each (worker, task) pair, it is emitted a Start event followeb by a final event that can be a Success, Canceled or Error event. The event.Type() method returns the type of event.
type EventType ¶
type EventType int
func (EventType) MarshalJSON ¶ added in v0.4.0
MarshalJSON returns the json representation of the EventType
type Mode ¶
type Mode int
Mode of execution for each task.
const ( // For each task returns the result of all the workers: success, error or canceled. // Multiple success results can be returned. AllResults Mode = iota // For each task returns the success or error results. // The canceled resuts are not returned. // Multiple success results can be returned. SuccessOrErrorResults // For each task returns the results preceding the first success (included). // At most one success is returned. ResultsUntilFirstSuccess // For each task returns only one result: the first success or the last result. FirstSuccessOrLastResult )
Values of mode of execution for each task.
type Result ¶
type Result interface {
// String representation of the Result.
String() string
// The error returned by the Work function.
// It is used to determine the status of the task execution as follow
// Success: error is nil
// Canceled: error is context.Canceled
// Error: otherwise
Error() error
}
Result is the interface that must be matched by the output of the Work function.
type Task ¶
type Task interface {
TaskID() TaskID
}
Task is a unit of work that can be executed by a worker Two or more task with the same TaskID are equivalent and possibly only one will be executed. Two or more task with the same TaskID can contain different information usefull for a specific worker.
type TaskStat ¶
type TaskStat struct {
Todo int // how many workers have to do the task.
Doing int // how many workers are doing the task.
Done int // how many workers have done the task.
Success int // how many workers have done the task with success.
}
TaskStat type object tracks the number of workers dealing with the task. It is used to dynamically choose the next task to execute.
type WorkFunc ¶
WorkFunc is the worker function to execute a given task. The int parameter represents the worker instance.
type Worker ¶
type Worker struct {
// Unique ID of the worker
WorkerID WorkerID
// Number of worker instances. Must be greater or equal 1
Instances int
// The work function
Work WorkFunc
}
Worker is the unit (identified by WorkerID) that receives the Requests and executes a specific WorkFunc function to return the Responses. The Instances parameters represents the number of instances of each worker
type WorkerTasks ¶
WorkerTasks is a map representing the tasks list of each worker
func (WorkerTasks) Clone ¶
func (wts WorkerTasks) Clone() WorkerTasks
Clone method returns a cloned copy of the WorkerTasks object.