worker

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2026 License: MPL-2.0 Imports: 17 Imported by: 0

README

go-worker

Go CodeQL Go Report Card Go Reference

go-worker provides a simple way to manage and execute tasks concurrently and prioritized, leveraging a TaskManager that spawns a pool of workers. Each Task represents a function scheduled by priority.

Features

  • Task prioritization: You can register tasks with a priority level influencing the execution order.
  • Concurrent execution: Tasks are executed concurrently by a pool of workers.
  • Middleware: You can apply middleware to the TaskManager to add additional functionality.
  • Results: You can access the results of the tasks via the Results channel.
  • Rate limiting: You can rate limit the tasks schedule by setting a maximum number of jobs per second.
  • Cancellation: You can cancel Tasks before or while they are running.

Architecture

flowchart LR
    Client[Client code] -->|register tasks| TaskManager
    TaskManager --> Queue[Priority Queue]
    Queue -->|dispatch| Worker1[Worker]
    Queue -->|dispatch| WorkerN[Worker]
    Worker1 --> Results[Results Channel]
    WorkerN --> Results

gRPC Service

go-worker exposes its functionality over gRPC through the WorkerService. The service allows clients to register tasks, stream results, cancel running tasks and query their status.

Handlers and Payloads

The server registers handlers keyed by name. Each handler consists of a Make function that constructs the expected payload type, and a Fn function that executes the task logic using the unpacked payload.

Clients send a Task message containing a name and a serialized payload using google.protobuf.Any. The server automatically unpacks the Any payload into the correct type based on the registered handler and passes it to the corresponding function.

Here is an example of registering a handler and sending a task with a payload:

// Server-side handler registration
tm.RegisterHandler("create_user", worker.Handler{
    Make: func() any { return &workerpb.CreateUserPayload{} },
    Fn: func(ctx context.Context, payload any) (any, error) {
        p := payload.(*workerpb.CreateUserPayload)
        // Handle user creation logic here
        return &workerpb.CreateUserResponse{UserId: "1234"}, nil
    },
})

// Client-side task creation with payload
payload, err := anypb.New(&workerpb.CreateUserPayload{
    Username: "newuser",
    Email:    "newuser@example.com",
})
if err != nil {
    log.Fatal(err)
}

task := &workerpb.Task{
    Name:    "create_user",
    Payload: payload,
}
_, err = client.RegisterTasks(ctx, &workerpb.RegisterTasksRequest{
    Tasks: []*workerpb.Task{task},
})

Note on deadlines: When the client uses a stream context with a deadline, exceeding the deadline will terminate the stream but does not cancel the tasks running on the server. To properly handle cancellation, use separate contexts for the task execution or use the close_on_completion flag (if implemented) to close the stream once tasks complete.

API Example

tm := worker.NewTaskManagerWithDefaults(context.Background())
srv := worker.NewGRPCServer(tm)

gs := grpc.NewServer()
workerpb.RegisterWorkerServiceServer(gs, srv)
// listen and serve ...

client := workerpb.NewWorkerServiceClient(conn)

import (
    "github.com/google/uuid"
)

// register a task with payload
payload, err := anypb.New(&workerpb.CreateUserPayload{
    Username: "newuser",
    Email:    "newuser@example.com",
})
if err != nil {
    log.Fatal(err)
}

_, _ = client.RegisterTasks(ctx, &workerpb.RegisterTasksRequest{
    Tasks: []*workerpb.Task{
        {
            Name:           "create_user",
            Payload:        payload,
            CorrelationId:  uuid.NewString(),
            IdempotencyKey: "create_user:newuser@example.com",
            Metadata:       map[string]string{"source": "api_example", "role": "admin"},
        },
    },
})

// cancel by id
_, _ = client.CancelTask(ctx, &workerpb.CancelTaskRequest{Id: "<task-id>"})

// get task information
res, _ := client.GetTask(ctx, &workerpb.GetTaskRequest{Id: "<task-id>"})
fmt.Println(res.Status)

API Usage Examples

Quick Start
tm := worker.NewTaskManager(2, 10, 5, time.Second, time.Second, 3)
defer tm.Close()

task := worker.Task{ID: uuid.New(), Priority: 1, Fn: func() (any, error) { return "hello", nil }}
tm.RegisterTask(context.Background(), task)

for res := range tm.GetResults() {
    fmt.Println(res)
}
Initialization

Create a new TaskManager by calling the NewTaskManager() function with the following parameters:

  • maxWorkers is the number of workers to start. If 0 is specified, it will default to the number of available CPUs
  • maxTasks is the maximum number of tasks that can be executed at once, defaults to 10
  • tasksPerSecond is the rate limit of tasks that can be executed per second, defaults to 1
  • timeout is the default timeout for tasks, defaults to 5 minute
  • retryDelay is the default delay between retries, defaults to 1 second
  • maxRetries is the default maximum number of retries, defaults to 3
tm := worker.NewTaskManager(4, 10, 5, time.Second*30, time.Second*30, 3)
Registering Tasks

Register new tasks by calling the RegisterTasks() method of the TaskManager struct and passing in a variadic number of tasks.

id := uuid.New()
task := worker.Task{
    ID:          id,
    Name:        "Some task",
    Description: "Here goes the description of the task",
    Priority:    10,
    Fn: func() (any, error) {
        emptyFile, err := os.Create(path.Join("examples", "test", "res", fmt.Sprintf("1st__EmptyFile___%v.txt", j)))
        if err != nil {
            log.Fatal(err)
        }
        emptyFile.Close()
        time.Sleep(time.Second)
        return fmt.Sprintf("** task number %v with id %s executed", j, id), err
    },
    Retries:    10,
    RetryDelay: 3,
}

task2 := worker.Task{
    ID:       uuid.New(),
    Priority: 10,
    Fn:       func() (val any, err error){ return "Hello, World!", err },
}

tm.RegisterTasks(context.Background(), task, task2)
Stopping the Task Manager

You can stop the task manager and its goroutines by calling the Stop() method of the TaskManager struct.

tm.Stop()
Results

The results of the tasks can be accessed via the Results channel of the TaskManager, calling the GetResults() method.

for result := range tm.GetResults() {
   // Do something with the result
}

Cancellation

You can cancel a Task by calling the CancelTask() method of the TaskManager struct and passing in the task ID as a parameter.

tm.CancelTask(task.ID)

You can cancel all tasks by calling the CancelAllTasks() method of the TaskManager struct.

tm.CancelAllTasks()
Middleware

You can apply middleware to the TaskManager by calling the RegisterMiddleware() function and passing in the TaskManager and the middleware functions.

tm = worker.RegisterMiddleware(tm,
    //middleware.YourMiddleware,
    func(next worker.Service) worker.Service {
        return middleware.NewLoggerMiddleware(next, logger)
    },
)
Example
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/google/uuid"
    worker "github.com/hyp3rd/go-worker"
    "github.com/hyp3rd/go-worker/middleware"
)

func main() {
    tm := worker.NewTaskManager(4, 10, 5, time.Second*3, time.Second*30, 3)

    defer tm.Close()

    var srv worker.Service = tm
    // apply middleware in the same order as you want to execute them
    srv = worker.RegisterMiddleware(tm,
        // middleware.YourMiddleware,
        func(next worker.Service) worker.Service {
            return middleware.NewLoggerMiddleware(next, middleware.DefaultLogger())
        },
    )

    defer srv.Close()

    task := worker.Task{
        ID:       uuid.New(),
        Priority: 1,
        Fn: func() (val any, err error) {
            return func(a int, b int) (val any, err error) {
                return a + b, err
            }(2, 5)
        },
    }

    // Invalid task, it doesn't have a function
    task1 := worker.Task{
        ID:       uuid.New(),
        Priority: 10,
        // Fn:       func() (val any, err error) { return "Hello, World from Task 1!", err },
    }

    task2 := worker.Task{
        ID:       uuid.New(),
        Priority: 5,
        Fn: func() (val any, err error) {
            time.Sleep(time.Second * 2)
            return "Hello, World from Task 2!", err
        },
    }

    task3 := worker.Task{
        ID:       uuid.New(),
        Priority: 90,
        Fn: func() (val any, err error) {
            // Simulate a long running task
            // time.Sleep(3 * time.Second)
            return "Hello, World from Task 3!", err
        },
    }

    task4 := worker.Task{
        ID:       uuid.New(),
        Priority: 150,
        Fn: func() (val any, err error) {
            // Simulate a long running task
            time.Sleep(1 * time.Second)
            return "Hello, World from Task 4!", err
        },
    }

    srv.RegisterTasks(context.Background(), task, task1, task2, task3)

    srv.CancelTask(task3.ID)

    srv.RegisterTask(context.Background(), task4)

    // Print results
    for result := range srv.GetResults() {
        fmt.Println(result)
    }

    tasks := srv.GetTasks()
    for _, task := range tasks {
        fmt.Println(task)
    }
}

Versioning

This project follows Semantic Versioning. Release notes are available in CHANGELOG.md.

Contribution Guidelines

We welcome contributions! Fork the repository, create a feature branch, run the linters and tests, then open a pull request.

Feature Requests

To propose new ideas, open an issue using the Feature request template.

Newcomer-Friendly Issues

Issues labeled good first issue or help wanted are ideal starting points for new contributors.

Release Notes

See CHANGELOG.md for the history of released versions.

Conclusion

The worker package provides an efficient way to manage and execute tasks concurrently and with prioritization. The package is highly configurable and can be used in various scenarios.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	// ContextDeadlineReached means the context is past its deadline.
	ContextDeadlineReached = TaskStatus(1)
	// RateLimited means the number of concurrent tasks per second exceeded the maximum allowed.
	RateLimited = TaskStatus(2)
	// Cancelled means `CancelTask` was invoked and the `Task` was cancelled.
	Cancelled = TaskStatus(3)
	// Failed means the `Task` failed.
	Failed = TaskStatus(4)
	// Queued means the `Task` is queued.
	Queued = TaskStatus(5)
	// Running means the `Task` is running.
	Running = TaskStatus(6)
	// Invalid means the `Task` is invalid.
	Invalid = TaskStatus(7)
	// Completed means the `Task` is completed.
	Completed = TaskStatus(8)
)

CancelReason values

  • 1: `ContextDeadlineReached`
  • 2: `RateLimited`
  • 3: `Cancelled`
  • 4: `Failed`
  • 5: `Queued`
  • 6: `Running`
  • 7: `Invalid`
  • 8: `Completed`
View Source
const (
	// DefaultMaxTasks is the default maximum number of tasks that can be executed at once.
	DefaultMaxTasks = 10
	// DefaultTasksPerSecond is the default rate limit of tasks that can be executed per second.
	DefaultTasksPerSecond = 5
	// DefaultTimeout is the default timeout for tasks.
	DefaultTimeout = 5
	// DefaultRetryDelay is the default delay between retries.
	DefaultRetryDelay = 1
	// DefaultMaxRetries is the default maximum number of retries.
	DefaultMaxRetries = 3
)

Variables

View Source
var (
	// ErrInvalidTaskID is returned when a task has an invalid ID.
	ErrInvalidTaskID = ewrap.New("invalid task id")
	// ErrInvalidTaskFunc is returned when a task has an invalid function.
	ErrInvalidTaskFunc = ewrap.New("invalid task function")
	// ErrInvalidTaskContext is returned when a task has an invalid context.
	ErrInvalidTaskContext = ewrap.New("invalid task context")
	// ErrTaskNotFound is returned when a task is not found.
	ErrTaskNotFound = ewrap.New("task not found")
	// ErrTaskTimeout is returned when a task times out.
	ErrTaskTimeout = ewrap.New("task timeout")
	// ErrTaskCancelled is returned when a task is cancelled.
	ErrTaskCancelled = ewrap.New("task cancelled")
	// ErrTaskAlreadyStarted is returned when a task is already started.
	ErrTaskAlreadyStarted = ewrap.New("task already started")
	// ErrTaskCompleted is returned when a task is already completed.
	ErrTaskCompleted = ewrap.New("task completed")
)

Errors returned by the TaskManager.

Functions

func RegisterMiddleware

func RegisterMiddleware[T any](svc T, mw ...Middleware[T]) T

RegisterMiddleware registers middlewares to the provided service.

Types

type GRPCServer added in v0.1.1

type GRPCServer struct {
	// contains filtered or unexported fields
}

GRPCServer implements the generated WorkerServiceServer interface.

func NewGRPCServer added in v0.1.1

func NewGRPCServer(svc Service, handlers map[string]HandlerSpec) *GRPCServer

NewGRPCServer creates a new gRPC server backed by the provided Service.

func (*GRPCServer) CancelTask added in v0.1.1

CancelTask cancels an active task by its ID.

func (*GRPCServer) GetTask added in v0.1.1

GetTask returns information about a task by its ID.

func (*GRPCServer) RegisterTasks added in v0.1.1

RegisterTasks registers one or more tasks with the underlying service.

func (*GRPCServer) StreamResults added in v0.1.1

StreamResults streams task results back to the client.

type HandlerSpec added in v0.1.1

type HandlerSpec struct {
	// Make returns a zero value of the payload message to unmarshal into.
	Make func() protoreflect.ProtoMessage
	// Fn does the work. Your Task.Execute will call this.
	Fn func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error)
}

HandlerSpec describes a single handler for a gRPC method.

type MetricsSnapshot added in v0.1.0

type MetricsSnapshot struct {
	Scheduled int64
	Completed int64
	Failed    int64
	Cancelled int64
}

MetricsSnapshot represents a snapshot of task metrics.

type Middleware

type Middleware[T any] func(T) T

Middleware describes a generic middleware.

type Result added in v0.0.4

type Result struct {
	Task   *Task // the task that produced the result
	Result any   // the result of the task
	Error  error // the error returned by the task
}

Result is a task result.

func (*Result) String added in v0.0.4

func (r *Result) String() string

String returns a string representation of the result.

type Service

type Service interface {
	// contains filtered or unexported methods
}

Service is an interface for a task manager.

type Task

type Task struct {
	ID          uuid.UUID          `json:"id"`          // ID is the id of the task
	Name        string             `json:"name"`        // Name is the name of the task
	Description string             `json:"description"` // Description is the description of the task
	Priority    int                `json:"priority"`    // Priority is the priority of the task
	Execute     TaskFunc           `json:"-"`           // Execute is the function that will be executed by the task
	Ctx         context.Context    `json:"context"`     // Ctx is the context of the task
	CancelFunc  context.CancelFunc `json:"-"`           // CancelFunc is the cancel function of the task
	Status      TaskStatus         `json:"task_status"` // TaskStatus is stores the status of the task
	Result      atomic.Value       `json:"result"`      // Result is the result of the task
	Error       atomic.Value       `json:"error"`       // Error is the error of the task
	Started     atomic.Int64       `json:"started"`     // Started is the time the task started
	Completed   atomic.Int64       `json:"completed"`   // Completed is the time the task completed
	Cancelled   atomic.Int64       `json:"cancelled"`   // Cancelled is the time the task was cancelled
	Retries     int                `json:"retries"`     // Retries is the maximum number of retries for failed tasks
	RetryDelay  time.Duration      `json:"retry_delay"` // RetryDelay is the time delay between retries for failed tasks
	// contains filtered or unexported fields
}

Task represents a function that can be executed by the task manager.

func NewTask added in v0.0.7

func NewTask(ctx context.Context, fn TaskFunc) (*Task, error)

NewTask creates a new task with the provided function and context.

func (*Task) CancelledChan added in v0.0.4

func (task *Task) CancelledChan() <-chan struct{}

CancelledChan returns a channel which gets closed when the task is cancelled.

func (*Task) IsValid added in v0.0.2

func (task *Task) IsValid() (err error)

IsValid returns an error if the task is invalid.

func (*Task) ShouldSchedule added in v0.0.5

func (task *Task) ShouldSchedule() error

ShouldSchedule returns an error if the task should not be scheduled.

func (*Task) WaitCancelled added in v0.0.4

func (task *Task) WaitCancelled()

WaitCancelled waits for the task to be cancelled.

type TaskFunc added in v0.0.2

type TaskFunc func(ctx context.Context, args ...any) (any, error)

TaskFunc signature of `Task` function.

type TaskManager

type TaskManager struct {
	Registry           sync.Map          // Registry is a map of registered tasks
	Results            chan Result       // Results is the channel of results
	Tasks              chan *Task        // Tasks is the channel of tasks
	RegisterTaskErrors *ewrap.ErrorGroup // RegisterTaskErrors groups the errors that occurred while registering tasks
	ExecuteTaskErrors  *ewrap.ErrorGroup // ExecuteTaskErrors groups the errors that occurred while executing tasks
	Cancelled          chan *Task        // Cancelled is the channel of cancelled tasks
	Timeout            time.Duration     // Timeout is the default timeout for tasks
	MaxWorkers         int               // MaxWorkers is the maximum number of workers that can be started
	MaxTasks           int               // MaxTasks is the maximum number of tasks that can be executed at once
	RetryDelay         time.Duration     // RetryDelay is the delay between retries
	MaxRetries         int               // MaxRetries is the maximum number of retries
	// contains filtered or unexported fields
}

TaskManager is a struct that manages a pool of goroutines that can execute tasks.

func NewTaskManager

func NewTaskManager(
	ctx context.Context,
	maxWorkers, maxTasks int,
	tasksPerSecond float64,
	timeout, retryDelay time.Duration,
	maxRetries int,
) *TaskManager

NewTaskManager creates a new task manager

  • `ctx` is the context for the task manager
  • `maxWorkers` is the number of workers to start, if not specified, the number of CPUs will be used
  • `maxTasks` is the maximum number of tasks that can be executed at once, defaults to 10
  • `tasksPerSecond` is the rate limit of tasks that can be executed per second, defaults to 1
  • `timeout` is the default timeout for tasks, defaults to 5 minute
  • `retryDelay` is the default delay between retries, defaults to 1 second
  • `maxRetries` is the default maximum number of retries, defaults to 3

func NewTaskManagerWithDefaults added in v0.0.5

func NewTaskManagerWithDefaults(ctx context.Context) *TaskManager

NewTaskManagerWithDefaults creates a new task manager with default values

  • `maxWorkers`: `runtime.NumCPU()`
  • `maxTasks`: 10
  • `tasksPerSecond`: 5
  • `timeout`: 5 minute
  • `retryDelay`: 1 second
  • `maxRetries`: 3

func (*TaskManager) CancelAll

func (tm *TaskManager) CancelAll()

CancelAll cancels all tasks.

func (*TaskManager) CancelTask

func (tm *TaskManager) CancelTask(id uuid.UUID)

CancelTask cancels a task by its ID.

func (*TaskManager) ExecuteTask added in v0.0.2

func (tm *TaskManager) ExecuteTask(ctx context.Context, id uuid.UUID, timeout time.Duration) (any, error)

ExecuteTask executes a task given its ID and returns the result

  • It gets the task by ID and locks the mutex to access the task data.
  • If the task has already been started, it cancels it and returns an error.
  • If the task is invalid, it sends it to the cancelled channel and returns an error.
  • If the task is already running, it returns an error.
  • It creates a new context for this task and waits for the result to be available and return it.
  • It reserves a token from the limiter and waits for the task execution.
  • If the token reservation fails, it waits for a delay and tries again.
  • It executes the task and sends the result to the results channel.
  • If the task execution fails, it retries the task up to max retries with a delay between retries.
  • If the task fails with all retries exhausted, it cancels the task and returns an error.

func (*TaskManager) GetActiveTasks added in v0.0.4

func (tm *TaskManager) GetActiveTasks() int

GetActiveTasks returns the number of active tasks.

func (*TaskManager) GetCancelledTasks added in v0.0.7

func (tm *TaskManager) GetCancelledTasks() <-chan *Task

GetCancelledTasks gets the cancelled tasks channel Example usage:

get the cancelled tasks cancelledTasks := tm.GetCancelledTasks()

select { case task := <-cancelledTasks:

fmt.Printf("Task %s was cancelled\n", task.ID.String())

default:

fmt.Println("No tasks have been cancelled yet")
}

func (*TaskManager) GetMetrics added in v0.1.0

func (tm *TaskManager) GetMetrics() MetricsSnapshot

GetMetrics returns a snapshot of current metrics.

func (*TaskManager) GetResults

func (tm *TaskManager) GetResults() []Result

GetResults gets the results channel.

func (*TaskManager) GetTask

func (tm *TaskManager) GetTask(id uuid.UUID) (*Task, error)

GetTask gets a task by its ID.

func (*TaskManager) GetTasks

func (tm *TaskManager) GetTasks() []*Task

GetTasks gets all tasks.

func (*TaskManager) IsEmpty added in v0.0.7

func (tm *TaskManager) IsEmpty() bool

IsEmpty checks if the task scheduler queue is empty.

func (*TaskManager) RegisterTask

func (tm *TaskManager) RegisterTask(ctx context.Context, task *Task) error

RegisterTask registers a new task to the task manager.

func (*TaskManager) RegisterTasks added in v0.0.4

func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...*Task)

RegisterTasks registers multiple tasks to the task manager at once.

func (*TaskManager) SetMaxWorkers added in v0.1.0

func (tm *TaskManager) SetMaxWorkers(maxWorkers int)

SetMaxWorkers adjusts the number of worker goroutines. Increasing the number spawns additional workers; decreasing signals workers to stop.

func (*TaskManager) StartWorkers added in v0.0.4

func (tm *TaskManager) StartWorkers(ctx context.Context)

StartWorkers starts the task manager and its goroutines.

func (*TaskManager) Stop

func (tm *TaskManager) Stop()

Stop stops the task manager and waits for all tasks to finish.

func (*TaskManager) StreamResults added in v0.0.5

func (tm *TaskManager) StreamResults() <-chan Result

StreamResults streams the results channel.

func (*TaskManager) Wait added in v0.0.4

func (tm *TaskManager) Wait(timeout time.Duration)

Wait waits for all tasks to complete or for the timeout to elapse.

type TaskStatus added in v0.0.4

type TaskStatus uint8

TaskStatus is a value used to represent the task status.

func (TaskStatus) String added in v0.0.4

func (ts TaskStatus) String() string

String returns the string representation of the task status.

Directories

Path Synopsis
__examples
grpc command
manual command
middleware command
multi command
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL