workerpool

package module
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2026 License: Apache-2.0 Imports: 5 Imported by: 24

README

Workerpool

Go Reference CI Go Report Card

A concurrency-limiting worker pool for Go with backpressure and zero dependencies.

Perfect for CPU-bound tasks that need controlled parallelism without unbounded queuing.

Features

  • Backpressure by design - Blocks on submit when workers are busy (no unbounded queues)
  • On-demand workers - Spawns workers as needed, up to configured limit
  • Two result modes - Collect via Drain() or stream via callback
  • Context-aware - Full cancellation support for graceful shutdown
  • Zero dependencies - Pure standard library
  • Simple API - Submit, Drain, Close. That's it.

Installation

go get github.com/cilium/workerpool

Quick Start

wp := workerpool.New(runtime.NumCPU())
defer wp.Close()

// Submit tasks (blocks when all workers are busy)
err := wp.Submit("task-1", func(ctx context.Context) error {
    // Your CPU-bound work here
    return process(data)
})

// Collect results
tasks, _ := wp.Drain()
for _, task := range tasks {
    if err := task.Err(); err != nil {
        log.Printf("Task %s failed: %v", task, err)
    }
}

When to Use This

Use workerpool when:

  • Tasks are CPU-bound and need parallelism control
  • You want backpressure (block submission instead of queuing unbounded tasks)
  • You need simple, predictable concurrency limiting

Don't use if:

  • You need I/O-bound task handling (consider channels or goroutines directly)
  • You want automatic retries, priorities, or complex scheduling
  • You need persistent job queues (use a proper job queue)

Usage Patterns

Pattern 1: Batch Processing with Drain

Process tasks in batches and collect all results at once.

Click to expand full example
package main

import (
	"context"
	"fmt"
	"os"
	"runtime"

	"github.com/cilium/workerpool"
)

// IsPrime returns true if n is prime, false otherwise.
func IsPrime(ctx context.Context, n int64) bool {
	if n < 2 {
		return false
	}
	for p := int64(2); p*p <= n; p++ {
		// Check for cancellation periodically (every 10000 iterations)
		if p%10000 == 0 {
			select {
			case <-ctx.Done():
				return false
			default:
			}
		}
		if n%p == 0 {
			return false
		}
	}
	return true
}

func main() {
	wp := workerpool.New(runtime.NumCPU())
	// Defer Close to ensure cleanup on early return (e.g., errors during Submit).
	// Close sends cancellation to running tasks and waits for them to complete.
	// It's safe to call Close multiple times; subsequent calls return ErrClosed.
	defer func() { _ = wp.Close() }()

	for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
		id := fmt.Sprintf("task #%d", i)
		// Use Submit to submit tasks for processing. Submit blocks when no
		// worker is available to pick up the task.
		err := wp.Submit(id, func(ctx context.Context) error {
			fmt.Println("isprime", n)
			if IsPrime(ctx, n) {
				fmt.Println(n, "is prime!")
			}
			return nil
		})
		// Submit fails when the pool is closed (ErrClosed), being drained
		// (ErrDraining), or the parent context is done (context.Canceled).
		// Check for the error when appropriate.
		if err != nil {
			fmt.Fprintln(os.Stderr, err)
			return
		}
	}

	// Drain prevents submitting new tasks and blocks until all submitted tasks
	// complete.
	tasks, err := wp.Drain()
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}

	// Iterating over the results is useful if non-nil errors can be expected.
	for _, task := range tasks {
		// Err returns the error that the task returned after execution.
		if err := task.Err(); err != nil {
			fmt.Println("task", task, "failed:", err)
		}
	}

	// Close is called here explicitly to check for errors. The deferred Close
	// will also run but returns ErrClosed (which we can ignore on defer).
	if err := wp.Close(); err != nil {
		fmt.Fprintln(os.Stderr, err)
	}
}
Pattern 2: Streaming Results with Callback

Use WithResultCallback to process each result as it completes rather than accumulating them for a later Drain call. The callback receives a Result, which extends Task with a Duration() method reporting how long the task took to execute. This is useful for logging, metrics, or long-running pools where unbounded result accumulation is undesirable.

Click to expand full example
package main

import (
	"context"
	"fmt"
	"os"
	"runtime"

	"github.com/cilium/workerpool"
)

func main() {
	wp := workerpool.New(runtime.NumCPU(), workerpool.WithResultCallback(func(r workerpool.Result) {
		if err := r.Err(); err != nil {
			fmt.Fprintf(os.Stderr, "task %s failed after %s: %v\n", r, r.Duration(), err)
		} else {
			fmt.Printf("task %s completed in %s\n", r, r.Duration())
		}
	}))
	defer func() { _ = wp.Close() }()

	for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
		id := fmt.Sprintf("task #%d", i)
		err := wp.Submit(id, func(ctx context.Context) error {
			if IsPrime(ctx, n) {
				fmt.Println(n, "is prime!")
			}
			return nil
		})
		if err != nil {
			fmt.Fprintln(os.Stderr, err)
			return
		}
	}

	// Close waits for all in-flight tasks to complete before returning,
	// ensuring all callback invocations have finished.
	if err := wp.Close(); err != nil {
		fmt.Fprintln(os.Stderr, err)
	}
}

Important Notes

[!WARNING] Result accumulation: Without WithResultCallback, results accumulate in memory until drained. For large workloads, drain periodically or use the callback mode.

[!NOTE] Backpressure behavior: Submit() blocks when no workers are available. This is intentional to prevent unbounded queuing. Queue tasks yourself if needed.

[!IMPORTANT] Cleanup: Always defer wp.Close() to ensure graceful shutdown and context cancellation.

Documentation

Full API documentation: https://pkg.go.dev/github.com/cilium/workerpool

Documentation

Overview

Package workerpool implements a concurrency limiting worker pool. Worker routines are spawned on demand as tasks are submitted; up to the configured limit of concurrent workers.

When the limit of concurrently running workers is reached, submitting a task blocks until a worker is able to pick it up. This behavior is intentional as it prevents from accumulating tasks which could grow unbounded. Therefore, it is the responsibility of the caller to queue up tasks if that's the intended behavior.

One caveat is that while the number of concurrently running workers is limited, task results are not and they accumulate until they are collected. Therefore, if a large number of tasks can be expected, the workerpool should be periodically drained (e.g. every 10k tasks). Alternatively, use WithResultCallback to process results as they complete without accumulation.

Example

Example demonstrates basic usage of a worker pool with Drain and Close.

package main

import (
	"context"
	"fmt"
	"os"
	"runtime"

	"github.com/cilium/workerpool"
)

// IsPrime returns true if n is prime, false otherwise.
func IsPrime(ctx context.Context, n int64) bool {
	if n < 2 {
		return false
	}
	for p := int64(2); p*p <= n; p++ {

		if p%10000 == 0 {
			select {
			case <-ctx.Done():
				return false
			default:
			}
		}
		if n%p == 0 {
			return false
		}
	}
	return true
}

func main() {
	wp := workerpool.New(runtime.NumCPU())
	// Defer Close to ensure cleanup on early return (e.g., errors during Submit).
	// Close sends cancellation to running tasks and waits for them to complete.
	// It's safe to call Close multiple times; subsequent calls return [ErrClosed].
	defer func() { _ = wp.Close() }()

	for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
		id := fmt.Sprintf("task #%d", i)
		// Use Submit to submit tasks for processing. Submit blocks when no
		// worker is available to pick up the task.
		err := wp.Submit(id, func(ctx context.Context) error {
			fmt.Println("isprime", n)
			if IsPrime(ctx, n) {
				fmt.Println(n, "is prime!")
			}
			return nil
		})
		// Submit fails when the pool is closed ([ErrClosed]), being drained
		// ([ErrDraining]), or the parent context is done ([context.Canceled]).
		// Check for the error when appropriate.
		if err != nil {
			fmt.Fprintln(os.Stderr, err)
			return
		}
	}

	// Drain prevents submitting new tasks and blocks until all submitted tasks
	// complete.
	tasks, err := wp.Drain()
	if err != nil {
		fmt.Fprintln(os.Stderr, err)
		return
	}

	// Iterating over the results is useful if non-nil errors can be expected.
	for _, task := range tasks {
		// Err returns the error that the task returned after execution.
		if err := task.Err(); err != nil {
			fmt.Println("task", task, "failed:", err)
		}
	}

	// Close is called here explicitly to check for errors. The deferred Close
	// will also run but returns [ErrClosed] (which we can ignore on defer).
	if err := wp.Close(); err != nil {
		fmt.Fprintln(os.Stderr, err)
	}
}

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrDraining is returned when an operation is not possible because
	// draining is in progress.
	ErrDraining = errors.New("drain operation in progress")
	// ErrClosed is returned when operations are attempted after a call to [Close].
	ErrClosed = errors.New("worker pool is closed")
	// ErrCallbackSet is returned by [Drain] when a result callback has been
	// registered via [WithResultCallback].
	ErrCallbackSet = errors.New("a result callback is set")
)

Functions

This section is empty.

Types

type Option added in v1.4.0

type Option func(*WorkerPool)

Option configures a WorkerPool.

func WithResultCallback added in v1.4.0

func WithResultCallback(fn func(Result)) Option

WithResultCallback registers fn to be called each time a task completes.

When a callback is set, results are NOT accumulated internally. This means:

  • [Drain] will return ErrCallbackSet instead of collecting results
  • Results are processed immediately upon completion, avoiding memory buildup

The callback fn is invoked from the worker goroutines. This has a few implications: 1. fn must be safe for concurrent use. 2. fn must NOT call [Submit] nor [Close] as it will lead to a deadlock.

WithResultCallback panics if fn is nil.

Example

ExampleWithResultCallback demonstrates using a result callback to process task results immediately without accumulation.

package main

import (
	"context"
	"fmt"
	"os"
	"runtime"

	"github.com/cilium/workerpool"
)

// IsPrime returns true if n is prime, false otherwise.
func IsPrime(ctx context.Context, n int64) bool {
	if n < 2 {
		return false
	}
	for p := int64(2); p*p <= n; p++ {

		if p%10000 == 0 {
			select {
			case <-ctx.Done():
				return false
			default:
			}
		}
		if n%p == 0 {
			return false
		}
	}
	return true
}

func main() {
	wp := workerpool.New(runtime.NumCPU(), workerpool.WithResultCallback(func(r workerpool.Result) {
		if err := r.Err(); err != nil {
			fmt.Fprintf(os.Stderr, "task %s failed after %s: %v\n", r, r.Duration(), err)
		} else {
			fmt.Printf("task %s completed in %s\n", r, r.Duration())
		}
	}))
	defer func() { _ = wp.Close() }()

	for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
		id := fmt.Sprintf("task #%d", i)
		err := wp.Submit(id, func(ctx context.Context) error {
			if IsPrime(ctx, n) {
				fmt.Println(n, "is prime!")
			}
			return nil
		})
		if err != nil {
			fmt.Fprintln(os.Stderr, err)
			return
		}
	}

	// Close waits for all in-flight tasks to complete before returning,
	// ensuring all callback invocations have finished.
	if err := wp.Close(); err != nil {
		fmt.Fprintln(os.Stderr, err)
	}
}

type Result added in v1.4.0

type Result interface {
	Task
	// Duration returns the time taken to execute the task.
	Duration() time.Duration
}

Result is a completed Task that also reports its execution duration. It is passed to the callback registered with WithResultCallback.

type Task

type Task interface {
	// String returns the task identifier.
	fmt.Stringer
	// Err returns the error resulting from processing the
	// unit of work.
	Err() error
}

Task is a unit of work.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool spawns, on demand, a number of worker routines to process submitted tasks concurrently. The number of concurrent routines never exceeds the specified limit.

func New

func New(n int, opts ...Option) *WorkerPool

New creates a new pool of workers where at most n workers process submitted tasks concurrently. New panics if n ≤ 0.

func NewWithContext added in v1.2.0

func NewWithContext(ctx context.Context, n int, opts ...Option) *WorkerPool

NewWithContext creates a new pool of workers where at most n workers process submitted tasks concurrently. NewWithContext panics if n ≤ 0. The context is used as the parent context to the context of the task func passed to [Submit].

func (*WorkerPool) Cap

func (wp *WorkerPool) Cap() int

Cap returns the concurrent workers capacity, see New.

func (*WorkerPool) Close

func (wp *WorkerPool) Close() error

Close closes the worker pool, rendering it unable to process new tasks. Close sends the cancellation signal to any running task via context cancellation and waits indefinitely for all workers to return. If tasks do not respect context cancellation, Close will block until they complete. When a result callback is set via WithResultCallback, all callback invocations are guaranteed to have completed before Close returns.

Close will return ErrClosed if it has already been called. This makes it safe to use with defer immediately after creating the pool (for cleanup on early returns) while still calling Close explicitly to check for errors.

Note: Close cancels running tasks via context, while [Drain] waits for tasks to complete without cancellation. If you want tasks to finish naturally, call [Drain] before Close.

func (*WorkerPool) Drain

func (wp *WorkerPool) Drain() ([]Task, error)

Drain waits until all tasks are completed. This operation prevents submitting new tasks to the worker pool. Drain returns the results of the tasks that have been processed.

Drain is incompatible with the WithResultCallback option. When a result callback is configured, results are processed immediately upon completion rather than being accumulated, so Drain returns ErrCallbackSet.

Unlike [Close], Drain does not cancel task contexts. Tasks run to completion naturally. After Drain, the pool can be closed with [Close] (which will not cancel any tasks since none are running) or more tasks can be submitted.

ErrCallbackSet is returned if the WithResultCallback option is used. ErrDraining is returned if a drain operation is already in progress. ErrClosed is returned if the worker pool is closed.

func (*WorkerPool) Len added in v1.1.0

func (wp *WorkerPool) Len() int

Len returns the count of concurrent workers currently running.

func (*WorkerPool) Submit

func (wp *WorkerPool) Submit(id string, f func(ctx context.Context) error) error

Submit submits f for processing by a worker. The given id is useful for identifying the task once it is completed.

The task function f receives a context that is cancelled when [Close] is called or when the parent context passed to NewWithContext is done. Tasks MUST respect context cancellation and return promptly when ctx.Done() is signaled. Tasks that ignore cancellation will cause [Close] to block indefinitely waiting for them to complete. Use context-aware operations (e.g., select with ctx.Done()) to ensure timely shutdown.

Submit blocks until a routine starts processing the task.

ErrDraining is returned if a drain operation is in progress. ErrClosed is returned if the worker pool is closed. context.Canceled is returned if the parent context is done.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL