Documentation
¶
Overview ¶
Package workerpool provides a wrapper around github.com/cilium/workerpool to facilitate processing items from a concurrentqueue.Queue using a pool of workers.
One caveat is that while the number of concurrently running workers is limited, task results are not and they accumulate until they are collected. This will be the case when using the Run rather than the RunWorkers methods since one task is created per queue item. For large queues this may lead to high memory usage, so it is advisable to use the RunWorkers methods where possible.
Index ¶
- func Run[T any](jobFunc func(context.Context, T) error, q *queue.Queue[T], ...) ([]cwp.Task, error)
- func RunWithResults[I any, O any](jobFunc func(context.Context, I, chan<- O) error, q *queue.Queue[I], ...) ([]cwp.Task, error)
- func RunWorkers[T any](jobFunc func(ctx context.Context, elems <-chan T, dlq *queue.Queue[T]) error, ...) ([]cwp.Task, error)
- func RunWorkersWithResults[I any, O any](...) ([]cwp.Task, error)
- type OptionFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Run ¶
func Run[T any]( jobFunc func(context.Context, T) error, q *queue.Queue[T], opts ...OptionFunc[T]) ([]cwp.Task, error)
Run creates a pool of workers to run the given jobFunc in parallel.
One instance of jobFunc is created for each element dequeued from q. The number of concurrently running instances of jobFunc is limited to numWorkers which defaults to the number of CPU cores available.
Run returns a slice of Tasks representing the results of processing each element from the queue. If an error occurs during processing, it is returned as the second return value.
The total number of tasks returned is equal to the number of items dequeued from the queue.
If a dead letter queue is provided via WithDeadLetterQueue, the currently processed item will be added to it if jobFunc returns an error.
The provided queue is drained before returning, regardless of whether an error occurred. Any remaining items will be added to the dead letter queue if one is provided.
It is the responsibility of the caller to ensure that the queue is populated with all intended work items before calling Run; if the queue is empty, no work will be performed. Similarly, if the queue contains more items than intended, those extra items will also be processed.
func RunWithResults ¶
func RunWithResults[I any, O any]( jobFunc func(context.Context, I, chan<- O) error, q *queue.Queue[I], out chan<- O, opts ...OptionFunc[I]) ([]cwp.Task, error)
RunWithResults creates a pool of workers to run the given jobFunc in parallel providing an output channel for results.
Type I is the input type dequeued from q and type O is the output (result) type from jobFunc which is sent to the out channel. Caller must ensure out channel is properly buffered or consumed to avoid deadlock.
One instance of jobFunc is created for each element dequeued from q. The number of concurrently running instances of jobFunc is limited to numWorkers which defaults to the number of CPU cores available.
RunWithResults returns a slice of Tasks representing the results of processing each element from the queue. If an error occurs during processing, it is returned as the second return value.
The total number of tasks returned is equal to the number of items dequeued from the queue.
If a dead letter queue is provided via WithDeadLetterQueue, the currently processed item will be added to it if jobFunc returns an error.
The provided queue is drained before returning, regardless of whether an error occurred. Any remaining items will be added to the dead letter queue if one is provided.
It is the responsibility of the caller to ensure that the queue is populated with all intended work items before calling RunWithResults; if the queue is empty, no work will be performed. Similarly, if the queue contains more items than intended, those extra items will also be processed.
func RunWorkers ¶
func RunWorkers[T any]( jobFunc func(ctx context.Context, elems <-chan T, dlq *queue.Queue[T]) error, q *queue.Queue[T], opts ...OptionFunc[T]) ([]cwp.Task, error)
RunWorkers creates a pool of workers to run the given jobFunc in parallel.
Type I is the input type dequeued from q and type O is the output type from jobFunc which is sent to the out channel. Caller must ensure out channel is properly buffered or consumed to avoid deadlock.
The number of instances of jobFunc is limited to numWorkers, however each instance of jobFunc is responsible for dequeuing work items from q. Thus, the number of tasks created is equal to numWorkers which defaults to the number of CPU cores available.
RunWorkers returns a slice of Tasks representing the results of each worker. If an error occurs during processing, it is returned as the second return value.
If a dead letter queue is provided via WithDeadLetterQueue, then the dlq argument to jobFunc will be non-nil. It is then the responsibility of jobFunc to enqueue any failed items to the dlq.
The total number of tasks returned is equal to the number of workers. The provided queue is drained before returning, regardless of whether an error occurred.
The provided queue is drained before returning, regardless of whether an error occurred. Any remaining items will be added to the dead letter queue if one is provided.
It is the responsibility of the caller to ensure that the queue is populated with all intended work items before calling RunWorkers; if the queue is empty, no work will be performed. Similarly, if the queue contains more items than intended, those extra items will also be processed.
func RunWorkersWithResults ¶
func RunWorkersWithResults[I any, O any]( jobFunc func(ctx context.Context, in <-chan I, out chan<- O, dlq *queue.Queue[I]) error, q *queue.Queue[I], out chan<- O, opts ...OptionFunc[I]) ([]cwp.Task, error)
RunWorkersWithResults creates a pool of workers to run the given jobFunc in parallel providing an output channel for results.
Type I is the input type dequeued from q and type O is the output (result) type from jobFunc which is sent to the out channel. Caller must ensure out channel is properly buffered or consumed to avoid deadlock.
One instance of jobFunc is created for each element dequeued from q. The number of concurrently running instances of jobFunc is limited to numWorkers which defaults to the number of CPU cores available.
RunWorkersWithResults returns a slice of Tasks representing the results of processing each element from the queue. If an error occurs during processing, it is returned as the second return value.
If a dead letter queue is provided via WithDeadLetterQueue, then the dlq argument to jobFunc will be non-nil. It is then the responsibility of jobFunc to enqueue any failed items to the dlq.
The total number of tasks returned is equal to the number of workers. The provided queue is drained before returning, regardless of whether an error occurred.
The provided queue is drained before returning, regardless of whether an error occurred. Any remaining items will be added to the dead letter queue if one is provided.
It is the responsibility of the caller to ensure that the queue is populated with all intended work items before calling RunWorkersWithResults; if the queue is empty, no work will be performed. Similarly, if the queue contains more items than intended, those extra items will also be processed.
Types ¶
type OptionFunc ¶
type OptionFunc[T any] func(*poolOptions[T])
OptionFunc is a function that configures a worker pool.
func WithContext ¶
func WithContext[T any](ctx context.Context) OptionFunc[T]
WithContext is a constructor function to set the parent context for the worker pool.
This context is passed to all workers, which should return promptly when the context is cancelled.
If no context is provided, context.Background() is used.
func WithDeadLetterQueue ¶
func WithDeadLetterQueue[T any](dlq *queue.Queue[T]) OptionFunc[T]
WithDeadLetterQueue is a constructor function to set a dead letter queue for failed tasks.
How this queue is populated depends on which Run method you are using. See the documentation for each Run method for details.
func WithFastDrain ¶
func WithFastDrain[T any](fast bool) OptionFunc[T]
WithFastDrain is a constructor function to set whether to use fast draining of input queue to the dead letter queue (if set) at the end of the pool run. You should only use this if you are sure that the input queue is not still being populated by another goroutine.
func WithNumWorkers ¶
func WithNumWorkers[T any](n int) OptionFunc[T]
WithNumWorkers is a constructor function to set the number of workers in the worker pool. The default is the number of CPU cores available.
The underlying worker pool will panic if n ≤ 0.
func WithTaskNamer ¶
func WithTaskNamer[T any](namer func(int, T) string) OptionFunc[T]
WithTaskNamer is a constructor function to set a function that generates task names based on the task ID and the task input value.
For RunWorkers methods, the input value will always be the zero value of T. This is because the task function in these methods receives a channel from which it reads input values, rather than receiving the input value directly.
The default task namer returns the string representation of the task ID.