Documentation
¶
Overview ¶
Package workerpool implements a concurrency limiting worker pool. Worker routines are spawned on demand as tasks are submitted; up to the configured limit of concurrent workers.
When the limit of concurrently running workers is reached, submitting a task blocks until a worker is able to pick it up. This behavior is intentional as it prevents from accumulating tasks which could grow unbounded. Therefore, it is the responsibility of the caller to queue up tasks if that's the intended behavior.
One caveat is that while the number of concurrently running workers is limited, task results are not and they accumulate until they are collected. Therefore, if a large number of tasks can be expected, the workerpool should be periodically drained (e.g. every 10k tasks). Alternatively, use WithResultCallback to process results as they complete without accumulation.
Example ¶
Example demonstrates basic usage of a worker pool with Drain and Close.
package main
import (
"context"
"fmt"
"os"
"runtime"
"github.com/cilium/workerpool"
)
// IsPrime returns true if n is prime, false otherwise.
func IsPrime(ctx context.Context, n int64) bool {
if n < 2 {
return false
}
for p := int64(2); p*p <= n; p++ {
if p%10000 == 0 {
select {
case <-ctx.Done():
return false
default:
}
}
if n%p == 0 {
return false
}
}
return true
}
func main() {
wp := workerpool.New(runtime.NumCPU())
// Defer Close to ensure cleanup on early return (e.g., errors during Submit).
// Close sends cancellation to running tasks and waits for them to complete.
// It's safe to call Close multiple times; subsequent calls return [ErrClosed].
defer func() { _ = wp.Close() }()
for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
id := fmt.Sprintf("task #%d", i)
// Use Submit to submit tasks for processing. Submit blocks when no
// worker is available to pick up the task.
err := wp.Submit(id, func(ctx context.Context) error {
fmt.Println("isprime", n)
if IsPrime(ctx, n) {
fmt.Println(n, "is prime!")
}
return nil
})
// Submit fails when the pool is closed ([ErrClosed]), being drained
// ([ErrDraining]), or the parent context is done ([context.Canceled]).
// Check for the error when appropriate.
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
}
// Drain prevents submitting new tasks and blocks until all submitted tasks
// complete.
tasks, err := wp.Drain()
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
// Iterating over the results is useful if non-nil errors can be expected.
for _, task := range tasks {
// Err returns the error that the task returned after execution.
if err := task.Err(); err != nil {
fmt.Println("task", task, "failed:", err)
}
}
// Close is called here explicitly to check for errors. The deferred Close
// will also run but returns [ErrClosed] (which we can ignore on defer).
if err := wp.Close(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrDraining is returned when an operation is not possible because // draining is in progress. ErrDraining = errors.New("drain operation in progress") // ErrClosed is returned when operations are attempted after a call to [Close]. ErrClosed = errors.New("worker pool is closed") // ErrCallbackSet is returned by [Drain] when a result callback has been // registered via [WithResultCallback]. ErrCallbackSet = errors.New("a result callback is set") )
Functions ¶
This section is empty.
Types ¶
type Option ¶ added in v1.4.0
type Option func(*WorkerPool)
Option configures a WorkerPool.
func WithResultCallback ¶ added in v1.4.0
WithResultCallback registers fn to be called each time a task completes.
When a callback is set, results are NOT accumulated internally. This means:
- [Drain] will return ErrCallbackSet instead of collecting results
- Results are processed immediately upon completion, avoiding memory buildup
The callback fn is invoked from the worker goroutines. This has a few implications: 1. fn must be safe for concurrent use. 2. fn must NOT call [Submit] nor [Close] as it will lead to a deadlock.
WithResultCallback panics if fn is nil.
Example ¶
ExampleWithResultCallback demonstrates using a result callback to process task results immediately without accumulation.
package main
import (
"context"
"fmt"
"os"
"runtime"
"github.com/cilium/workerpool"
)
// IsPrime returns true if n is prime, false otherwise.
func IsPrime(ctx context.Context, n int64) bool {
if n < 2 {
return false
}
for p := int64(2); p*p <= n; p++ {
if p%10000 == 0 {
select {
case <-ctx.Done():
return false
default:
}
}
if n%p == 0 {
return false
}
}
return true
}
func main() {
wp := workerpool.New(runtime.NumCPU(), workerpool.WithResultCallback(func(r workerpool.Result) {
if err := r.Err(); err != nil {
fmt.Fprintf(os.Stderr, "task %s failed after %s: %v\n", r, r.Duration(), err)
} else {
fmt.Printf("task %s completed in %s\n", r, r.Duration())
}
}))
defer func() { _ = wp.Close() }()
for i, n := 0, int64(1_000_000_000_000_000_000); i < 100; i, n = i+1, n+1 {
id := fmt.Sprintf("task #%d", i)
err := wp.Submit(id, func(ctx context.Context) error {
if IsPrime(ctx, n) {
fmt.Println(n, "is prime!")
}
return nil
})
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
}
// Close waits for all in-flight tasks to complete before returning,
// ensuring all callback invocations have finished.
if err := wp.Close(); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}
Output:
type Result ¶ added in v1.4.0
type Result interface {
Task
// Duration returns the time taken to execute the task.
Duration() time.Duration
}
Result is a completed Task that also reports its execution duration. It is passed to the callback registered with WithResultCallback.
type Task ¶
type Task interface {
// String returns the task identifier.
fmt.Stringer
// Err returns the error resulting from processing the
// unit of work.
Err() error
}
Task is a unit of work.
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool spawns, on demand, a number of worker routines to process submitted tasks concurrently. The number of concurrent routines never exceeds the specified limit.
func New ¶
func New(n int, opts ...Option) *WorkerPool
New creates a new pool of workers where at most n workers process submitted tasks concurrently. New panics if n ≤ 0.
func NewWithContext ¶ added in v1.2.0
func NewWithContext(ctx context.Context, n int, opts ...Option) *WorkerPool
NewWithContext creates a new pool of workers where at most n workers process submitted tasks concurrently. NewWithContext panics if n ≤ 0. The context is used as the parent context to the context of the task func passed to [Submit].
func (*WorkerPool) Cap ¶
func (wp *WorkerPool) Cap() int
Cap returns the concurrent workers capacity, see New.
func (*WorkerPool) Close ¶
func (wp *WorkerPool) Close() error
Close closes the worker pool, rendering it unable to process new tasks. Close sends the cancellation signal to any running task via context cancellation and waits indefinitely for all workers to return. If tasks do not respect context cancellation, Close will block until they complete. When a result callback is set via WithResultCallback, all callback invocations are guaranteed to have completed before Close returns.
Close will return ErrClosed if it has already been called. This makes it safe to use with defer immediately after creating the pool (for cleanup on early returns) while still calling Close explicitly to check for errors.
Note: Close cancels running tasks via context, while [Drain] waits for tasks to complete without cancellation. If you want tasks to finish naturally, call [Drain] before Close.
func (*WorkerPool) Drain ¶
func (wp *WorkerPool) Drain() ([]Task, error)
Drain waits until all tasks are completed. This operation prevents submitting new tasks to the worker pool. Drain returns the results of the tasks that have been processed.
Drain is incompatible with the WithResultCallback option. When a result callback is configured, results are processed immediately upon completion rather than being accumulated, so Drain returns ErrCallbackSet.
Unlike [Close], Drain does not cancel task contexts. Tasks run to completion naturally. After Drain, the pool can be closed with [Close] (which will not cancel any tasks since none are running) or more tasks can be submitted.
ErrCallbackSet is returned if the WithResultCallback option is used. ErrDraining is returned if a drain operation is already in progress. ErrClosed is returned if the worker pool is closed.
func (*WorkerPool) Len ¶ added in v1.1.0
func (wp *WorkerPool) Len() int
Len returns the count of concurrent workers currently running.
func (*WorkerPool) Submit ¶
Submit submits f for processing by a worker. The given id is useful for identifying the task once it is completed.
The task function f receives a context that is cancelled when [Close] is called or when the parent context passed to NewWithContext is done. Tasks MUST respect context cancellation and return promptly when ctx.Done() is signaled. Tasks that ignore cancellation will cause [Close] to block indefinitely waiting for them to complete. Use context-aware operations (e.g., select with ctx.Done()) to ensure timely shutdown.
Submit blocks until a routine starts processing the task.
ErrDraining is returned if a drain operation is in progress. ErrClosed is returned if the worker pool is closed. context.Canceled is returned if the parent context is done.