worker

package module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2026 License: MPL-2.0 Imports: 20 Imported by: 0

README

go-worker

Go CodeQL Go Report Card Go Reference

go-worker provides a simple way to manage and execute prioritized tasks concurrently, backed by a TaskManager with a worker pool and a priority queue.

Breaking changes (January 2026)

  • Stop() removed. Use StopGraceful(ctx) or StopNow().
  • Local result streaming uses SubscribeResults(buffer); GetResults()/StreamResults() are removed.
  • RegisterTasks now returns an error.
  • Task.Execute replaces Fn in examples.
  • NewGRPCServer requires a handler map.

Features

  • Task prioritization: tasks are scheduled by priority.
  • Concurrent execution: tasks run in a worker pool with strict rate limiting.
  • Middleware: wrap the TaskManager for logging/metrics, etc.
  • Results: fan-out subscriptions via SubscribeResults.
  • Cancellation: cancel tasks before or during execution.
  • Retries: exponential backoff with capped delays.

Architecture

flowchart LR
    Client[Client code] -->|register tasks| TaskManager
    TaskManager --> Queue[Priority Queue]
    Queue -->|dispatch| Worker1[Worker]
    Queue -->|dispatch| WorkerN[Worker]
    Worker1 --> Results[Result Broadcaster]
    WorkerN --> Results

gRPC Service

go-worker exposes its functionality over gRPC through the WorkerService. The service allows clients to register tasks, stream results, cancel running tasks and query their status.

Handlers and Payloads

The server registers handlers keyed by name. Each handler consists of a Make function that constructs the expected payload type, and a Fn function that executes the task logic using the unpacked payload.

Clients send a Task message containing a name and a serialized payload using google.protobuf.Any. The server automatically unpacks the Any payload into the correct type based on the registered handler and passes it to the corresponding function.

handlers := map[string]worker.HandlerSpec{
    "create_user": {
        Make: func() protoreflect.ProtoMessage { return &workerpb.CreateUserPayload{} },
        Fn: func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error) {
            p := payload.(*workerpb.CreateUserPayload)
            return &workerpb.CreateUserResponse{UserId: "1234"}, nil
        },
    },
}

srv := worker.NewGRPCServer(tm, handlers)

Note on deadlines: When the client uses a stream context with a deadline, exceeding the deadline will terminate the stream but does not cancel the tasks running on the server. To properly handle cancellation, use separate contexts for task execution or cancel tasks explicitly.

API Example (gRPC)

tm := worker.NewTaskManagerWithDefaults(context.Background())
handlers := map[string]worker.HandlerSpec{
    "create_user": {
        Make: func() protoreflect.ProtoMessage { return &workerpb.CreateUserPayload{} },
        Fn: func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error) {
            p := payload.(*workerpb.CreateUserPayload)
            return &workerpb.CreateUserResponse{UserId: "1234"}, nil
        },
    },
}

srv := worker.NewGRPCServer(tm, handlers)

gs := grpc.NewServer()
workerpb.RegisterWorkerServiceServer(gs, srv)
// listen and serve ...

client := workerpb.NewWorkerServiceClient(conn)

// register a task with payload
payload, err := anypb.New(&workerpb.CreateUserPayload{
    Username: "newuser",
    Email:    "newuser@example.com",
})
if err != nil {
    log.Fatal(err)
}

_, _ = client.RegisterTasks(ctx, &workerpb.RegisterTasksRequest{
    Tasks: []*workerpb.Task{
        {
            Name:           "create_user",
            Payload:        payload,
            CorrelationId:  uuid.NewString(),
            IdempotencyKey: "create_user:newuser@example.com",
            Metadata:       map[string]string{"source": "api_example", "role": "admin"},
        },
    },
})

// cancel by id
_, _ = client.CancelTask(ctx, &workerpb.CancelTaskRequest{Id: "<task-id>"})

// get task information
res, _ := client.GetTask(ctx, &workerpb.GetTaskRequest{Id: "<task-id>"})
fmt.Println(res.Status)

API Usage Examples

Quick Start
tm := worker.NewTaskManager(context.Background(), 2, 10, 5, 30*time.Second, time.Second, 3)

task := &worker.Task{
    ID:       uuid.New(),
    Priority: 1,
    Ctx:      context.Background(),
    Execute:  func(ctx context.Context, _ ...any) (any, error) { return "hello", nil },
}

if err := tm.RegisterTask(context.Background(), task); err != nil {
    log.Fatal(err)
}

results, cancel := tm.SubscribeResults(1)
res := <-results
cancel()

fmt.Println(res.Result)
Initialization

Create a new TaskManager by calling the NewTaskManager() function with the following parameters:

  • ctx is the base context for the task manager (used for shutdown and derived task contexts)
  • maxWorkers is the number of workers to start. If <= 0, it will default to the number of available CPUs
  • maxTasks is the maximum number of queued tasks and rate limiter burst size, defaults to 10
  • tasksPerSecond is the rate limit of tasks that can be executed per second. If <= 0, rate limiting is disabled
  • timeout is the default timeout for tasks, defaults to 5 minutes
  • retryDelay is the default delay between retries, defaults to 1 second
  • maxRetries is the default maximum number of retries, defaults to 3 (0 disables retries)
tm := worker.NewTaskManager(context.Background(), 4, 10, 5, 30*time.Second, 1*time.Second, 3)

Optional retention can be configured to prevent unbounded task registry growth:

tm.SetRetentionPolicy(worker.RetentionPolicy{
    TTL:        24 * time.Hour,
    MaxEntries: 100000,
})

Task lifecycle hooks can be configured for structured logging or tracing:

tm.SetHooks(worker.TaskHooks{
    OnQueued: func(task *worker.Task) {
        // log enqueue
    },
    OnStart: func(task *worker.Task) {
        // log start
    },
    OnFinish: func(task *worker.Task, status worker.TaskStatus, _ any, err error) {
        // log completion
        _ = err
        _ = status
    },
})
Registering Tasks

Register new tasks by calling the RegisterTasks() method of the TaskManager struct and passing in a variadic number of tasks.

id := uuid.New()

task := &worker.Task{
    ID:          id,
    Name:        "Some task",
    Description: "Here goes the description of the task",
    Priority:    10,
    Ctx:         context.Background(),
    Execute: func(ctx context.Context, _ ...any) (any, error) {
        time.Sleep(time.Second)
        return fmt.Sprintf("task %s executed", id), nil
    },
    Retries:    3,
    RetryDelay: 2 * time.Second,
}

task2 := &worker.Task{
    ID:       uuid.New(),
    Priority: 10,
    Ctx:      context.Background(),
    Execute:  func(ctx context.Context, _ ...any) (any, error) { return "Hello, World!", nil },
}

if err := tm.RegisterTasks(context.Background(), task, task2); err != nil {
    log.Fatal(err)
}
Stopping the Task Manager

Use StopGraceful to stop accepting new tasks and wait for completion, or StopNow to cancel tasks immediately.

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_ = tm.StopGraceful(ctx)
// or
// tm.StopNow()
Results

Subscribe to results with a dedicated channel per subscriber.

results, cancel := tm.SubscribeResults(10)

ctx, cancelWait := context.WithTimeout(context.Background(), 5*time.Second)
defer cancelWait()

_ = tm.Wait(ctx)
cancel()

for res := range results {
    fmt.Println(res)
}
Cancellation

You can cancel a Task by calling the CancelTask() method of the TaskManager struct and passing in the task ID as a parameter.

_ = tm.CancelTask(task.ID)

You can cancel all tasks by calling the CancelAll() method of the TaskManager struct.

tm.CancelAll()
Middleware

You can apply middleware to the TaskManager by calling the RegisterMiddleware() function and passing in the TaskManager and the middleware functions.

srv := worker.RegisterMiddleware[worker.Service](tm,
    func(next worker.Service) worker.Service {
        return middleware.NewLoggerMiddleware(next, logger)
    },
)
Example
package main

import (
    "context"
    "fmt"
    "time"

    "github.com/google/uuid"
    worker "github.com/hyp3rd/go-worker"
    "github.com/hyp3rd/go-worker/middleware"
)

func main() {
    tm := worker.NewTaskManager(context.Background(), 4, 10, 5, 3*time.Second, 30*time.Second, 3)

    var srv worker.Service = worker.RegisterMiddleware[worker.Service](tm,
        func(next worker.Service) worker.Service {
            return middleware.NewLoggerMiddleware(next, middleware.DefaultLogger())
        },
    )

    task := &worker.Task{
        ID:       uuid.New(),
        Priority: 1,
        Ctx:      context.Background(),
        Execute: func(ctx context.Context, _ ...any) (any, error) {
            return 2 + 5, nil
        },
    }

    _ = srv.RegisterTasks(context.Background(), task)

    results, cancel := srv.SubscribeResults(10)
    defer cancel()

    ctx, cancelWait := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancelWait()
    _ = srv.Wait(ctx)

    for res := range results {
        fmt.Println(res)
    }
}

Versioning

This project follows Semantic Versioning. Release notes are available in CHANGELOG.md.

Contribution Guidelines

We welcome contributions! Fork the repository, create a feature branch, run the linters and tests, then open a pull request.

Feature Requests

To propose new ideas, open an issue using the Feature request template.

Newcomer-Friendly Issues

Issues labeled good first issue or help wanted are ideal starting points for new contributors.

Release Notes

See CHANGELOG.md for the history of released versions.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	// ContextDeadlineReached means the context is past its deadline.
	ContextDeadlineReached = TaskStatus(1)
	// RateLimited means the number of concurrent tasks per second exceeded the maximum allowed.
	RateLimited = TaskStatus(2)
	// Cancelled means `CancelTask` was invoked and the `Task` was cancelled.
	Cancelled = TaskStatus(3)
	// Failed means the `Task` failed.
	Failed = TaskStatus(4)
	// Queued means the `Task` is queued.
	Queued = TaskStatus(5)
	// Running means the `Task` is running.
	Running = TaskStatus(6)
	// Invalid means the `Task` is invalid.
	Invalid = TaskStatus(7)
	// Completed means the `Task` is completed.
	Completed = TaskStatus(8)
)

TaskStatus values.

View Source
const (
	// DefaultMaxTasks is the default maximum number of tasks that can be executed at once.
	DefaultMaxTasks = 10
	// DefaultTasksPerSecond is the default rate limit of tasks that can be executed per second.
	DefaultTasksPerSecond = 5
	// DefaultTimeout is the default timeout for tasks.
	DefaultTimeout = 5 * time.Minute
	// DefaultRetryDelay is the default delay between retries.
	DefaultRetryDelay = 1 * time.Second
	// DefaultMaxRetries is the default maximum number of retries.
	DefaultMaxRetries = 3
	// ErrMsgContextDone is the error message used when the context is done.
	ErrMsgContextDone = "context done"
)

Variables

View Source
var (
	// ErrInvalidTaskID is returned when a task has an invalid ID.
	ErrInvalidTaskID = ewrap.New("invalid task id")
	// ErrInvalidTaskFunc is returned when a task has an invalid function.
	ErrInvalidTaskFunc = ewrap.New("invalid task function")
	// ErrInvalidTaskContext is returned when a task has an invalid context.
	ErrInvalidTaskContext = ewrap.New("invalid task context")
	// ErrTaskNotFound is returned when a task is not found.
	ErrTaskNotFound = ewrap.New("task not found")
	// ErrTaskTimeout is returned when a task times out.
	ErrTaskTimeout = ewrap.New("task timeout")
	// ErrTaskCancelled is returned when a task is cancelled.
	ErrTaskCancelled = ewrap.New("task cancelled")
	// ErrTaskAlreadyStarted is returned when a task is already started.
	ErrTaskAlreadyStarted = ewrap.New("task already started")
	// ErrTaskCompleted is returned when a task is already completed.
	ErrTaskCompleted = ewrap.New("task completed")
)

Errors returned by the TaskManager.

Functions

func RegisterMiddleware

func RegisterMiddleware[T any](svc T, mw ...Middleware[T]) T

RegisterMiddleware registers middlewares to the provided service.

Types

type GRPCServer added in v0.1.1

type GRPCServer struct {
	// contains filtered or unexported fields
}

GRPCServer implements the generated WorkerServiceServer interface.

func NewGRPCServer added in v0.1.1

func NewGRPCServer(svc Service, handlers map[string]HandlerSpec) *GRPCServer

NewGRPCServer creates a new gRPC server backed by the provided Service.

func (*GRPCServer) CancelTask added in v0.1.1

CancelTask cancels an active task by its ID.

func (*GRPCServer) GetTask added in v0.1.1

GetTask returns information about a task by its ID.

func (*GRPCServer) RegisterTasks added in v0.1.1

RegisterTasks registers one or more tasks with the underlying service.

func (*GRPCServer) StreamResults added in v0.1.1

StreamResults streams task results back to the client.

type HandlerSpec added in v0.1.1

type HandlerSpec struct {
	// Make returns a zero value of the payload message to unmarshal into.
	Make func() protoreflect.ProtoMessage
	// Fn does the work. Your Task.Execute will call this.
	Fn func(ctx context.Context, payload protoreflect.ProtoMessage) (any, error)
}

HandlerSpec describes a single handler for a gRPC method.

type MetricsSnapshot added in v0.1.0

type MetricsSnapshot struct {
	Scheduled        int64
	Running          int64
	Completed        int64
	Failed           int64
	Cancelled        int64
	Retried          int64
	ResultsDropped   int64
	QueueDepth       int
	TaskLatencyCount int64
	TaskLatencyTotal time.Duration
	TaskLatencyMax   time.Duration
}

MetricsSnapshot represents a snapshot of task metrics.

type Middleware

type Middleware[T any] func(T) T

Middleware describes a generic middleware.

type Result added in v0.0.4

type Result struct {
	Task   *Task // the task that produced the result
	Result any   // the result of the task
	Error  error // the error returned by the task
}

Result is a task result.

func (*Result) String added in v0.0.4

func (r *Result) String() string

String returns a string representation of the result.

type RetentionPolicy added in v0.1.4

type RetentionPolicy struct {
	TTL             time.Duration
	MaxEntries      int
	CleanupInterval time.Duration
}

RetentionPolicy controls how completed tasks are kept in the registry.

type Service

type Service interface {
	// contains filtered or unexported methods
}

Service is an interface for a task manager.

type Task

type Task struct {
	ID          uuid.UUID          `json:"id"`          // ID is the id of the task
	Name        string             `json:"name"`        // Name is the name of the task
	Description string             `json:"description"` // Description is the description of the task
	Priority    int                `json:"priority"`    // Priority is the priority of the task
	Execute     TaskFunc           `json:"-"`           // Execute is the function that will be executed by the task
	Ctx         context.Context    `json:"-"`           // Ctx is the context of the task
	CancelFunc  context.CancelFunc `json:"-"`           // CancelFunc is the cancel function of the task

	Retries    int           `json:"retries"`     // Retries is the maximum number of retries for failed tasks
	RetryDelay time.Duration `json:"retry_delay"` // RetryDelay is the time delay between retries for failed tasks
	// contains filtered or unexported fields
}

Task represents a function that can be executed by the task manager.

Note: access Task state via methods to avoid data races.

func NewTask added in v0.0.7

func NewTask(ctx context.Context, fn TaskFunc) (*Task, error)

NewTask creates a new task with the provided function and context.

func (*Task) CancelledAt added in v0.1.4

func (task *Task) CancelledAt() time.Time

CancelledAt returns the cancellation time if set.

func (*Task) CancelledChan added in v0.0.4

func (task *Task) CancelledChan() <-chan struct{}

CancelledChan returns a channel which gets closed when the task is cancelled.

func (*Task) CompletedAt added in v0.1.4

func (task *Task) CompletedAt() time.Time

CompletedAt returns the completion time if set.

func (*Task) Error added in v0.0.2

func (task *Task) Error() error

Error returns the task error if available.

func (*Task) IsValid added in v0.0.2

func (task *Task) IsValid() (err error)

IsValid returns an error if the task is invalid.

func (*Task) Result added in v0.0.8

func (task *Task) Result() any

Result returns the task result if available.

func (*Task) ShouldSchedule added in v0.0.5

func (task *Task) ShouldSchedule() error

ShouldSchedule returns an error if the task should not be scheduled.

func (*Task) StartedAt added in v0.1.4

func (task *Task) StartedAt() time.Time

StartedAt returns the start time if set.

func (*Task) Status added in v0.0.4

func (task *Task) Status() TaskStatus

Status returns the current task status.

type TaskFunc added in v0.0.2

type TaskFunc func(ctx context.Context, args ...any) (any, error)

TaskFunc signature of `Task` function.

type TaskHooks added in v0.1.5

type TaskHooks struct {
	OnQueued func(task *Task)
	OnStart  func(task *Task)
	OnFinish func(task *Task, status TaskStatus, result any, err error)
	OnRetry  func(task *Task, delay time.Duration, attempt int)
}

TaskHooks defines optional callbacks for task lifecycle events.

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

TaskManager is a struct that manages a pool of goroutines that can execute tasks.

func NewTaskManager

func NewTaskManager(
	ctx context.Context,
	maxWorkers, maxTasks int,
	tasksPerSecond float64,
	timeout, retryDelay time.Duration,
	maxRetries int,
) *TaskManager

NewTaskManager creates a new task manager.

  • ctx is the context for the task manager
  • maxWorkers is the number of workers to start, if <=0, the number of CPUs will be used
  • maxTasks is the maximum number of tasks that can be queued at once, defaults to 10
  • tasksPerSecond is the rate limit of tasks that can be executed per second; <=0 disables rate limiting
  • timeout is the default timeout for tasks, defaults to 5 minutes
  • retryDelay is the default delay between retries, defaults to 1 second
  • maxRetries is the default maximum number of retries, defaults to 3 (0 disables retries)

func NewTaskManagerWithDefaults added in v0.0.5

func NewTaskManagerWithDefaults(ctx context.Context) *TaskManager

NewTaskManagerWithDefaults creates a new task manager with default values.

  • maxWorkers: runtime.NumCPU()
  • maxTasks: 10
  • tasksPerSecond: 5
  • timeout: 5 minutes
  • retryDelay: 1 second
  • maxRetries: 3

func (*TaskManager) CancelAll

func (tm *TaskManager) CancelAll()

CancelAll cancels all tasks.

func (*TaskManager) CancelTask

func (tm *TaskManager) CancelTask(id uuid.UUID) error

CancelTask cancels a task by its ID.

func (*TaskManager) ExecuteTask added in v0.0.2

func (tm *TaskManager) ExecuteTask(ctx context.Context, id uuid.UUID, timeout time.Duration) (any, error)

ExecuteTask executes a task given its ID and returns the result.

func (*TaskManager) GetActiveTasks added in v0.0.4

func (tm *TaskManager) GetActiveTasks() int

GetActiveTasks returns the number of running tasks.

func (*TaskManager) GetMetrics added in v0.1.0

func (tm *TaskManager) GetMetrics() MetricsSnapshot

GetMetrics returns a snapshot of current metrics.

func (*TaskManager) GetTask

func (tm *TaskManager) GetTask(id uuid.UUID) (*Task, error)

GetTask gets a task by its ID.

func (*TaskManager) GetTasks

func (tm *TaskManager) GetTasks() []*Task

GetTasks gets all tasks.

func (*TaskManager) IsEmpty added in v0.0.7

func (tm *TaskManager) IsEmpty() bool

IsEmpty checks if the task scheduler queue is empty.

func (*TaskManager) RegisterTask

func (tm *TaskManager) RegisterTask(ctx context.Context, task *Task) error

RegisterTask registers a new task to the task manager.

func (*TaskManager) RegisterTasks added in v0.0.4

func (tm *TaskManager) RegisterTasks(ctx context.Context, tasks ...*Task) error

RegisterTasks registers multiple tasks to the task manager at once.

func (*TaskManager) SetHooks added in v0.1.5

func (tm *TaskManager) SetHooks(hooks TaskHooks)

SetHooks configures callbacks for task lifecycle events.

func (*TaskManager) SetMaxWorkers added in v0.1.0

func (tm *TaskManager) SetMaxWorkers(maxWorkers int)

SetMaxWorkers adjusts the number of worker goroutines. Increasing the number spawns additional workers; decreasing signals workers to stop.

func (*TaskManager) SetRetentionPolicy added in v0.1.4

func (tm *TaskManager) SetRetentionPolicy(policy RetentionPolicy)

SetRetentionPolicy configures task registry retention.

func (*TaskManager) StartWorkers added in v0.0.4

func (tm *TaskManager) StartWorkers(ctx context.Context)

StartWorkers starts the task manager's workers and scheduler (idempotent).

func (*TaskManager) StopGraceful added in v0.1.4

func (tm *TaskManager) StopGraceful(ctx context.Context) error

StopGraceful stops accepting new tasks and waits for completion before stopping workers.

func (*TaskManager) StopNow added in v0.1.4

func (tm *TaskManager) StopNow()

StopNow cancels running tasks and stops workers immediately.

func (*TaskManager) SubscribeResults added in v0.1.4

func (tm *TaskManager) SubscribeResults(buffer int) (<-chan Result, func())

SubscribeResults returns a results channel and an unsubscribe function.

func (*TaskManager) Wait added in v0.0.4

func (tm *TaskManager) Wait(ctx context.Context) error

Wait waits for all tasks to complete or context cancellation.

type TaskStatus added in v0.0.4

type TaskStatus uint8

TaskStatus is a value used to represent the task status.

func (TaskStatus) String added in v0.0.4

func (ts TaskStatus) String() string

String returns the string representation of the task status.

Directories

Path Synopsis
__examples
grpc command
manual command
middleware command
multi command
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL