Documentation
¶
Index ¶
- Constants
- Variables
- type Middleware
- type Result
- type Service
- type Task
- type TaskFunc
- type TaskManager
- func (tm *TaskManager) CancelAll()
- func (tm *TaskManager) CancelAllAndWait()
- func (tm *TaskManager) CancelTask(id uuid.UUID)
- func (tm *TaskManager) Close()
- func (tm *TaskManager) ExecuteTask(id uuid.UUID, timeout time.Duration) (interface{}, error)
- func (tm *TaskManager) GetActiveTasks() int
- func (tm *TaskManager) GetCancelled() <-chan Task
- func (tm *TaskManager) GetResults() []Result
- func (tm *TaskManager) GetTask(id uuid.UUID) (task *Task, err error)
- func (tm *TaskManager) GetTasks() []Task
- func (tm *TaskManager) RegisterTask(ctx context.Context, task Task) error
- func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...Task)
- func (tm *TaskManager) StartWorkers()
- func (tm *TaskManager) StreamResults() <-chan Result
- func (tm *TaskManager) Wait(timeout time.Duration)
- type TaskStatus
Constants ¶
const ( // ContextDeadlineReached means the context is past its deadline. ContextDeadlineReached = TaskStatus(1) // RateLimited means the number of concurrent tasks per second exceeded the maximum allowed. RateLimited = TaskStatus(2) // Cancelled means `CancelTask` was invked and the `Task` was cancelled. Cancelled = TaskStatus(3) // Failed means the `Task` failed. Failed = TaskStatus(4) // Queued means the `Task` is queued. Queued = TaskStatus(5) // Running means the `Task` is running. Running = TaskStatus(6) )
CancelReason values
- 1: `ContextDeadlineReached`
- 2: `RateLimited`
- 3: `Cancelled`
- 4: `Failed`
const ( // DefaultMaxTasks is the default maximum number of tasks that can be executed at once DefaultMaxTasks = 10 // DefaultTasksPerSecond is the default rate limit of tasks that can be executed per second DefaultTasksPerSecond = 5 // DefaultTimeout is the default timeout for tasks DefaultTimeout = 5 // DefaultRetryDelay is the default delay between retries DefaultRetryDelay = 1 // DefaultMaxRetries is the default maximum number of retries DefaultMaxRetries = 3 )
Variables ¶
var ( // ErrInvalidTaskID is returned when a task has an invalid ID ErrInvalidTaskID = errors.New("invalid task id") // ErrInvalidTaskFunc is returned when a task has an invalid function ErrInvalidTaskFunc = errors.New("invalid task function") // ErrTaskNotFound is returned when a task is not found ErrTaskNotFound = errors.New("task not found") // ErrTaskTimeout is returned when a task times out ErrTaskTimeout = errors.New("task timeout") // ErrTaskCancelled is returned when a task is cancelled ErrTaskCancelled = errors.New("task cancelled") // ErrTaskAlreadyStarted is returned when a task is already started ErrTaskAlreadyStarted = errors.New("task already started") // ErrTaskCompleted is returned when a task is already completed ErrTaskCompleted = errors.New("task completed") )
Errors returned by the TaskManager
Functions ¶
This section is empty.
Types ¶
type Middleware ¶
Middleware describes a `Service` middleware.
type Result ¶ added in v0.0.4
type Result struct {
Task *Task // the task that produced the result
Result interface{} // the result of the task
Error error // the error returned by the task
}
Result is a task result
type Service ¶
type Service interface {
// RegisterTask registers a new task to the worker
RegisterTask(ctx context.Context, task Task) error
// RegisterTasks registers multiple tasks to the worker
RegisterTasks(ctx context.Context, tasks ...Task)
// StartWorkers starts the task manager's workers
StartWorkers()
// Wait for all tasks to finish
Wait(timeout time.Duration)
// Close the task manage
Close()
// CloseAndWait the task manage to finish all tasks
CancelAllAndWait()
// CancelAll cancels all tasks
CancelAll()
// CancelTask cancels a task by its ID
CancelTask(id uuid.UUID)
// GetActiveTasks returns the number of active tasks
GetActiveTasks() int
// StreamResults streams the `Result` channel
StreamResults() <-chan Result
// GetResults retruns the `Result` channel
GetResults() []Result
// GetCancelled gets the cancelled tasks channel
GetCancelled() <-chan Task
// GetTask gets a task by its ID
GetTask(id uuid.UUID) (task *Task, err error)
// GetTasks gets all tasks
GetTasks() []Task
// ExecuteTask executes a task given its ID and returns the result
ExecuteTask(id uuid.UUID, timeout time.Duration) (interface{}, error)
}
Service is an interface for a task manager
func RegisterMiddleware ¶
func RegisterMiddleware(svc Service, mw ...Middleware) Service
RegisterMiddleware registers middlewares to the `Service`.
type Task ¶
type Task struct {
ID uuid.UUID `json:"id"` // ID is the id of the task
Name string `json:"name"` // Name is the name of the task
Description string `json:"description"` // Description is the description of the task
Priority int `json:"priority"` // Priority is the priority of the task
Fn TaskFunc `json:"-"` // Fn is the function that will be executed by the task
Ctx context.Context `json:"context"` // Ctx is the context of the task
CancelFunc context.CancelFunc `json:"-"` // CancelFunc is the cancel function of the task
Status TaskStatus `json:"task_status"` // TaskStatus is stores the status of the task
Error atomic.Value `json:"error"` // Error is the error of the task
Started atomic.Int64 `json:"started"` // Started is the time the task started
Completed atomic.Int64 `json:"completed"` // Completed is the time the task completed
Cancelled atomic.Int64 `json:"cancelled"` // Cancelled is the time the task was cancelled
Retries int `json:"retries"` // Retries is the maximum number of retries for failed tasks
RetryDelay time.Duration `json:"retry_delay"` // RetryDelay is the time delay between retries for failed tasks
// contains filtered or unexported fields
}
Task represents a function that can be executed by the task manager
func (*Task) CancelledChan ¶ added in v0.0.4
func (task *Task) CancelledChan() <-chan struct{}
CancelledChan returns a channel that will be closed when the task is cancelled
func (*Task) ShouldSchedule ¶ added in v0.0.5
ShouldSchedule returns an error if the task should not be scheduled
func (*Task) WaitCancelled ¶ added in v0.0.4
func (task *Task) WaitCancelled()
WaitCancelled waits for the task to be cancelled
type TaskFunc ¶ added in v0.0.2
type TaskFunc func() (interface{}, error)
TaskFunc signature of `Task` function
type TaskManager ¶
type TaskManager struct {
Registry sync.Map // Registry is a map of registered tasks
Results chan Result // Results is the channel of results
Tasks chan Task // Tasks is the channel of tasks
Cancelled chan Task // Cancelled is the channel of cancelled tasks
Timeout time.Duration // Timeout is the default timeout for tasks
MaxWorkers int // MaxWorkers is the maximum number of workers that can be started
MaxTasks int // MaxTasks is the maximum number of tasks that can be executed at once
RetryDelay time.Duration // RetryDelay is the delay between retries
MaxRetries int // MaxRetries is the maximum number of retries
// contains filtered or unexported fields
}
TaskManager is a struct that manages a pool of goroutines that can execute tasks
func NewTaskManager ¶
func NewTaskManager(maxWorkers int, maxTasks int, tasksPerSecond float64, timeout time.Duration, retryDelay time.Duration, maxRetries int) *TaskManager
NewTaskManager creates a new task manager
- `maxWorkers` is the number of workers to start, if not specified, the number of CPUs will be used
- `maxTasks` is the maximum number of tasks that can be executed at once, defaults to 10
- `tasksPerSecond` is the rate limit of tasks that can be executed per second, defaults to 1
- `timeout` is the default timeout for tasks, defaults to 5 minute
- `retryDelay` is the default delay between retries, defaults to 1 second
- `maxRetries` is the default maximum number of retries, defaults to 3
func NewTaskManagerWithDefaults ¶ added in v0.0.5
func NewTaskManagerWithDefaults() *TaskManager
NewTaskManagerWithDefaults creates a new task manager with default values
- `maxWorkers`: `runtime.NumCPU()`
- `maxTasks`: 10
- `tasksPerSecond`: 5
- `timeout`: 5 minute
- `retryDelay`: 1 second
- `maxRetries`: 3
func (*TaskManager) CancelAllAndWait ¶ added in v0.0.4
func (tm *TaskManager) CancelAllAndWait()
CancelAllAndWait cancels all tasks and waits for them to finish
func (*TaskManager) CancelTask ¶
func (tm *TaskManager) CancelTask(id uuid.UUID)
CancelTask cancels a task by its ID
func (*TaskManager) Close ¶ added in v0.0.4
func (tm *TaskManager) Close()
Close stops the task manager and waits for all tasks to finish
func (*TaskManager) ExecuteTask ¶ added in v0.0.2
ExecuteTask executes a task given its ID and returns the result
- It gets the task by ID and locks the mutex to access the task data.
- If the task has already been started, it cancels it and returns an error.
- If the task is invalid, it sends it to the cancelled channel and returns an error.
- If the task is already running, it returns an error.
- It creates a new context for this task and waits for the result to be available and return it.
- It reserves a token from the limiter and waits for the task execution.
- If the token reservation fails, it waits for a delay and tries again.
- It executes the task and sends the result to the results channel.
- If the task execution fails, it retries the task up to max retries with a delay between retries.
- If the task fails with all retries exhausted, it cancels the task and returns an error.
func (*TaskManager) GetActiveTasks ¶ added in v0.0.4
func (tm *TaskManager) GetActiveTasks() int
GetActiveTasks returns the number of active tasks
func (*TaskManager) GetCancelled ¶ added in v0.0.4
func (tm *TaskManager) GetCancelled() <-chan Task
GetCancelled gets the cancelled tasks channel
func (*TaskManager) GetResults ¶
func (tm *TaskManager) GetResults() []Result
GetResults gets the results channel
func (*TaskManager) GetTask ¶
func (tm *TaskManager) GetTask(id uuid.UUID) (task *Task, err error)
GetTask gets a task by its ID
func (*TaskManager) RegisterTask ¶
func (tm *TaskManager) RegisterTask(ctx context.Context, task Task) error
RegisterTask registers a new task to the task manager
func (*TaskManager) RegisterTasks ¶ added in v0.0.4
func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...Task)
RegisterTasks registers multiple tasks to the task manager at once
func (*TaskManager) StartWorkers ¶ added in v0.0.4
func (tm *TaskManager) StartWorkers()
StartWorkers starts the task manager and its goroutines
func (*TaskManager) StreamResults ¶ added in v0.0.5
func (tm *TaskManager) StreamResults() <-chan Result
StreamResults streams the results channel
func (*TaskManager) Wait ¶ added in v0.0.4
func (tm *TaskManager) Wait(timeout time.Duration)
Wait waits for all tasks to complete or for the timeout to elapse
type TaskStatus ¶ added in v0.0.4
type TaskStatus uint8
TaskStatus is a value used to represent the task status.
func (TaskStatus) String ¶ added in v0.0.4
func (ts TaskStatus) String() string
String returns the string representation of the task status.