worker

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2023 License: MPL-2.0 Imports: 9 Imported by: 0

README

go-worker

Go CodeQL Go Report Card Go Reference

go-worker provides a simple way to manage and execute tasks concurrently and prioritized, leveraging a TaskManager that spawns a pool of workers. Each Task represents a function scheduled by priority.

Features

  • Task prioritization: You can register tasks with a priority level influencing the execution order.
  • Concurrent execution: Tasks are executed concurrently by a pool of workers.
  • Middleware: You can apply middleware to the TaskManager to add additional functionality.
  • Results: You can access the results of the tasks via the Results channel.
  • Rate limiting: You can rate limit the tasks schedule by setting a maximum number of jobs per second.
  • Cancellation: Tasks can be canceled before or while they are running.

API

Initialization

Create a new TaskManager by calling the NewTaskManager() function and passing in the maximum number of tasks and the rate limit as parameters.

tm := worker.NewTaskManager(10, 5)
Registering Tasks

Register new tasks by calling the RegisterTask() method of the TaskManager struct and passing in the tasks.

task := worker.Task{
    ID:       uuid.New(),
    Priority: 1,
    Fn:       func() interface{} { return "Hello, World!" },
}


task2 := worker.Task{
    ID:       uuid.New(),
    Priority: 10,
    Fn:       func() interface{} { return "Hello, World!" },
}

tm.RegisterTask(task, task2)
Starting and Stopping

You can start the task manager and its goroutines by calling the Start() method of the TaskManager struct and passing in the number of workers.

tm.Start(5)

You can stop the task manager and its goroutines by calling the Stop() method of the TaskManager struct.

tm.Stop()
Results

The results of the tasks can be accessed via the Results channel of the TaskManager, calling the GetResults() method.

for result := range tm.GetResults() {
   // Do something with the result
}

Cancellation

You can cancel a Task by calling the CancelTask() method of the TaskManager struct and passing in the task ID as a parameter.

tm.CancelTask(task.ID)

You can cancel all tasks by calling the CancelAllTasks() method of the TaskManager struct.

tm.CancelAllTasks()
Middleware

You can apply middleware to the TaskManager by calling the RegisterMiddleware() function and passing in the TaskManager and the middleware functions.

tm = worker.RegisterMiddleware(tm,
    //middleware.YourMiddleware,
    func(next worker.Service) worker.Service {
        return middleware.NewLoggerMiddleware(next, logger)
    },
)
Example
package main

import (
    "fmt"
    "time"

    "github.com/google/uuid"
    worker "github.com/hyp3rd/go-worker"
    "github.com/hyp3rd/go-worker/middleware"
)

    func main() {
    tm := worker.NewTaskManager(5, 10)

    // apply middleware in the same order as you want to execute them
    tm = worker.RegisterMiddleware(tm,
        // middleware.YourMiddleware,
        func(next worker.Service) worker.Service {
            return middleware.NewLoggerMiddleware(next, middleware.DefaultLogger())
        },
    )

    task := worker.Task{
        ID:       uuid.New(),
        Priority: 1,
        Fn: func() interface{} {
            return func(a int, b int) interface{} {
                return a + b
            }(2, 5)
        },
    }

    // Invalid task, it doesn't have a function
    task1 := worker.Task{
        ID:       uuid.New(),
        Priority: 1,
    }

    task2 := worker.Task{
        ID:       uuid.New(),
        Priority: 5,
        Fn:       func() interface{} { return "Hello, World from Task 2!" },
    }

    task3 := worker.Task{
        ID:       uuid.New(),
        Priority: 90,
        Fn: func() interface{} {
            // Simulate a long running task
            time.Sleep(3 * time.Second)
            return "Hello, World from Task 3!"
        },
    }

    task4 := worker.Task{
        ID:       uuid.New(),
        Priority: 15,
        Fn: func() interface{} {
            // Simulate a long running task
            time.Sleep(5 * time.Second)
            return "Hello, World from Task 4!"
        },
    }

    tm.RegisterTask(task, task1, task2, task3, task4)
    tm.Start(5)

    tm.CancelTask(task3.ID)

    // Print results
    for result := range tm.GetResults() {
        fmt.Println(result)
    }

    tasks := tm.GetTasks()
    for _, task := range tasks {
        fmt.Println(task)
    }
}

Conclusion

The worker package provides an efficient way to manage and execute tasks concurrently and with prioritization. The package is highly configurable and can be used in various scenarios.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	// ContextDeadlineReached means the context is past its deadline.
	ContextDeadlineReached = CancelReason(1)
	// RateLimited means the number of concurrent tasks per second exceeded the maximum allowed.
	RateLimited = CancelReason(2)
	// Cancelled means `CancelTask` was invked and the `Task` was cancelled.
	Cancelled = CancelReason(3)
)

CancelReason values

  • 1: `ContextDeadlineReached`
  • 2: `RateLimited`
  • 3: `Cancelled`

Variables

View Source
var (
	// ErrInvalidTaskID is returned when a task has an invalid ID
	ErrInvalidTaskID = errors.New("invalid task id")
	// ErrInvalidTaskFunc is returned when a task has an invalid function
	ErrInvalidTaskFunc = errors.New("invalid task function")
	// ErrTaskNotFound is returned when a task is not found
	ErrTaskNotFound = errors.New("task not found")
)

Functions

This section is empty.

Types

type CancelReason added in v0.0.2

type CancelReason uint8

CancelReason is a value used to represent the cancel reason.

type Middleware

type Middleware func(Service) Service

Middleware describes a `Service` middleware.

type Service

type Service interface {
	// RegisterTask registers a new task to the worker
	RegisterTask(tasks ...Task)
	// Start the task manager
	Start(numWorkers int)
	// Stop the task manage
	Stop()
	// GetResults gets the results channel
	GetResults() <-chan interface{}
	// GetTask gets a task by its ID
	GetTask(id uuid.UUID) (task Task, ok bool)
	// GetTasks gets all tasks
	GetTasks() []Task
	// ExecuteTask executes a task given its ID and returns the result
	ExecuteTask(id uuid.UUID) (interface{}, error)
	// CancelAll cancels all tasks
	CancelAll()
	// CancelTask cancels a task by its ID
	CancelTask(id uuid.UUID)
}

Service is an interface for a task manager

func NewTaskManager

func NewTaskManager(maxTasks int, tasksPerSecond float64) Service

NewTaskManager creates a new task manager

  • `maxTasks` is the maximum number of tasks that can be executed at once, defaults to 1
  • `tasksPerSecond` is the rate limit of tasks that can be executed per second, defaults to 1

func RegisterMiddleware

func RegisterMiddleware(svc Service, mw ...Middleware) Service

RegisterMiddleware registers middlewares to the `Service`.

type Task

type Task struct {
	ID           uuid.UUID          `json:"id"`            // ID is the id of the task
	Priority     int                `json:"priority"`      // Priority is the priority of the task
	Fn           TaskFunc           `json:"-"`             // Fn is the function that will be executed by the task
	Ctx          context.Context    `json:"context"`       // Ctx is the context of the task
	Cancel       context.CancelFunc `json:"-"`             // Cancel is the cancel function of the task
	Error        atomic.Value       `json:"error"`         // Error is the error of the task
	Started      atomic.Int64       `json:"started"`       // Started is the time the task started
	Completed    atomic.Int64       `json:"completed"`     // Completed is the time the task completed
	Cancelled    atomic.Int64       `json:"cancelled"`     // Cancelled is the time the task was cancelled
	CancelReason CancelReason       `json:"cancel_reason"` // CancelReason is the reason the task was cancelled
}

Task represents a function that can be executed by the task manager

func (*Task) IsValid added in v0.0.2

func (t *Task) IsValid() (err error)

IsValid returns an error if the task is invalid

type TaskFunc added in v0.0.2

type TaskFunc func() interface{}

TaskFunc signature of `Task` function

type TaskManager

type TaskManager struct {
	Registry sync.Map         // Registry is a map of registered tasks
	Results  chan interface{} // Results is the channel of results
	// contains filtered or unexported fields
}

TaskManager is a struct that manages a pool of goroutines that can execute tasks

func (*TaskManager) CancelAll

func (tm *TaskManager) CancelAll()

CancelAll cancels all tasks

func (*TaskManager) CancelTask

func (tm *TaskManager) CancelTask(id uuid.UUID)

CancelTask cancels a task by its ID

func (*TaskManager) ExecuteTask added in v0.0.2

func (tm *TaskManager) ExecuteTask(id uuid.UUID) (interface{}, error)

ExecuteTask executes a task given its ID and returns the result

func (*TaskManager) GetResults

func (tm *TaskManager) GetResults() <-chan interface{}

GetResults gets the results channel

func (*TaskManager) GetTask

func (tm *TaskManager) GetTask(id uuid.UUID) (task Task, ok bool)

GetTask gets a task by its ID

func (*TaskManager) GetTasks

func (tm *TaskManager) GetTasks() []Task

GetTasks gets all tasks

func (*TaskManager) RegisterTask

func (tm *TaskManager) RegisterTask(tasks ...Task)

RegisterTask registers a new task to the task manager

func (*TaskManager) Start

func (tm *TaskManager) Start(numWorkers int)

Start starts the task manager and its goroutines

  • `numWorkers` is the number of workers to start, if not specified, the number of CPUs will be used

func (*TaskManager) Stop

func (tm *TaskManager) Stop()

Stop stops the task manager and its goroutines

Directories

Path Synopsis
examples
manual command
middleware command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL