worker

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2023 License: MPL-2.0 Imports: 7 Imported by: 0

README

go-worker

Go CodeQL Go Report Card Go Reference

go-worker provides a simple way to manage and execute tasks concurrently and prioritized, leveraging a TaskManager that spawns a pool of workers. Each Task represents a function scheduled by priority.

Features

  • Task prioritization: You can register tasks with a priority level influencing the execution order.
  • Concurrent execution: Tasks are executed concurrently by a pool of workers.
  • Rate limiting: You can rate limit the tasks schedule by setting a maximum number of jobs per second.
  • Cancellation: Tasks can be canceled before or while they are running.

API

Initialization

Create a new TaskManager by calling the NewTaskManager() function and passing in the maximum number of tasks and the rate limit as parameters.

tm := worker.NewTaskManager(10, 5)
Registering Tasks

Register new tasks by calling the RegisterTask() method of the TaskManager struct and passing in the tasks.

task := worker.Task{
    ID:       uuid.New(),
    Priority: 1,
    Fn:       func() interface{} { return "Hello, World!" },
}


task2 := worker.Task{
    ID:       uuid.New(),
    Priority: 10,
    Fn:       func() interface{} { return "Hello, World!" },
}

tm.RegisterTask(task, task2)
Starting and Stopping

You can start the task manager and its goroutines by calling the Start() method of the TaskManager struct and passing in the number of workers.

tm.Start(5)

You can stop the task manager and its goroutines by calling the Stop() method of the TaskManager struct.

tm.Stop()
Results

The results of the tasks can be accessed via the Results channel of the TaskManager, calling the GetResults() method.

for result := range tm.GetResults() {
   // Do something with the result
}

Cancellation

You can cancel a Task by calling the CancelTask() method of the TaskManager struct and passing in the task ID as a parameter.

tm.CancelTask(task.ID)

You can cancel all tasks by calling the CancelAllTasks() method of the TaskManager struct.

tm.CancelAllTasks()
Middleware

You can apply middleware to the TaskManager by calling the RegisterMiddleware() function and passing in the TaskManager and the middleware functions.

tm = worker.RegisterMiddleware(tm,
    //middleware.YourMiddleware,
    func(next worker.Service) worker.Service {
        return middleware.NewLoggerMiddleware(next, logger)
    },
)
Example
package main

import (
    "fmt"
    "log"
    "time"

    worker "github.com/hyp3rd/go-worker"
    "github.com/hyp3rd/go-worker/middleware"
    "github.com/google/uuid"
)

func main() {
    tm := worker.NewTaskManager(5, 10)
    // Example of using zap logger from uber
    logger := log.Default()

    // apply middleware in the same order as you want to execute them
    tm = worker.RegisterMiddleware(tm,
        //middleware.YourMiddleware,
        func(next worker.Service) worker.Service {
            return middleware.NewLoggerMiddleware(next, logger)
        },
    )

    task := worker.Task{
        ID:       uuid.New(),
        Priority: 1,
        Fn:       func() interface{} { return "Hello, World from Task 1!" },
    }

    task2 := worker.Task{
        ID:       uuid.New(),
        Priority: 5,
        Fn:       func() interface{} { return "Hello, World from Task 2!" },
    }

    task3 := worker.Task{
        ID:       uuid.New(),
        Priority: 90,
        Fn: func() interface{} {
            // Simulate a long running task
            time.Sleep(3 * time.Second)
            return "Hello, World from Task 3!"
        },
    }

    task4 := worker.Task{
        ID:       uuid.New(),
        Priority: 15,
        Fn: func() interface{} {
            // Simulate a long running task
            time.Sleep(5 * time.Second)
            return "Hello, World from Task 4!"
        },
    }

    tm.RegisterTask(task, task2, task3, task4)
    tm.Start(5)

    tm.CancelTask(task3.ID)

    // Print results
    for result := range tm.GetResults() {
        fmt.Println(result)
    }

    tasks := tm.GetTasks()
    for _, task := range tasks {
        fmt.Println(task)
    }
}

Conclusion

The worker package provides an efficient way to manage and execute tasks concurrently and with prioritization. The package is highly configurable and can be used in various scenarios.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Middleware

type Middleware func(Service) Service

Middleware describes a `Service` middleware.

type Service

type Service interface {
	// RegisterTask registers a new task to the worker
	RegisterTask(tasks ...Task)
	// Start the task manager
	Start(numWorkers int)
	// Stop the task manage
	Stop()
	// GetResults gets the results channel
	GetResults() <-chan interface{}
	// GetTask gets a task by its ID
	GetTask(id uuid.UUID) (task Task, ok bool)
	// GetTasks gets all tasks
	GetTasks() []Task
	// CancelAll cancels all tasks
	CancelAll()
	// CancelTask cancels a task by its ID
	CancelTask(id uuid.UUID)
}

Service is an interface for a task manager

func NewTaskManager

func NewTaskManager(maxTasks int, limit float64) Service

NewTaskManager creates a new task manager

func RegisterMiddleware

func RegisterMiddleware(svc Service, mw ...Middleware) Service

RegisterMiddleware registers middlewares to the `Service`.

type Task

type Task struct {
	ID        uuid.UUID          `json:"id"`        // ID is the id of the task
	Priority  int                `json:"priority"`  // Priority is the priority of the task
	Fn        func() interface{} `json:"-"`         // Fn is the function that will be executed by the task
	Ctx       context.Context    `json:"context"`   // Ctx is the context of the task
	Cancel    context.CancelFunc `json:"-"`         // Cancel is the cancel function of the task
	Started   atomic.Int64       `json:"started"`   // Started is the time the task started
	Completed atomic.Int64       `json:"completed"` // Completed is the time the task completed
	Cancelled atomic.Int64       `json:"cancelled"` // Cancelled is the time the task was cancelled
}

Task represents a function that can be executed by the task manager

type TaskManager

type TaskManager struct {
	Registry sync.Map         // Registry is a map of registerd tasks
	Results  chan interface{} // Results is the channel of results
	// contains filtered or unexported fields
}

TaskManager is a struct that manages a pool of goroutines that can execute tasks

func (*TaskManager) CancelAll

func (tm *TaskManager) CancelAll()

CancelAll cancels all tasks

func (*TaskManager) CancelTask

func (tm *TaskManager) CancelTask(id uuid.UUID)

CancelTask cancels a task by its ID

func (*TaskManager) GetResults

func (tm *TaskManager) GetResults() <-chan interface{}

GetResults gets the results channel

func (*TaskManager) GetTask

func (tm *TaskManager) GetTask(id uuid.UUID) (task Task, ok bool)

GetTask gets a task by its ID

func (*TaskManager) GetTasks

func (tm *TaskManager) GetTasks() []Task

GetTasks gets all tasks

func (*TaskManager) RegisterTask

func (tm *TaskManager) RegisterTask(tasks ...Task)

RegisterTask registers a new task to the task manager

func (*TaskManager) Start

func (tm *TaskManager) Start(numWorkers int)

Start starts the task manager and its goroutines

func (*TaskManager) Stop

func (tm *TaskManager) Stop()

Stop stops the task manager and its goroutines

Directories

Path Synopsis
examples
middleware command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL