wpa

package
v0.14.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Jobs

func Jobs[T Job](seq iter.Seq[T]) iter.Seq[fnx.Worker]

Jobs is a simple wrapper to convert a sequence of Job objects to fnx.Workers.

func Pull

func Pull[T any, HF Handler[T]](ctx context.Context, seq iter.Seq[T], hf HF) iter.Seq[error]

Pull processes items sequentially with abort-on-error semantics, although ers.ErrCurrentOpSkipped are ignored. Terminating errors (io.EOF, ers.ErrCurrentOpAbort) stop processing without yielding an error

Execution is lazy and depends on a consumer of the output sequence consuming the sequence.

func PullAll

func PullAll[T any, HF Handler[T]](ctx context.Context, seq iter.Seq[T], hf HF) iter.Seq[error]

PullAll processes all items sequentially returning all non-nil errors to the output sequence.

Execution is lazy and depends on a consumer of the output sequence consuming the sequence.

func PullWithPool

func PullWithPool[T any, HF Handler[T]](
	ctx context.Context,
	seq iter.Seq[T],
	hf HF,
	opts ...opt.Provider[*WorkerGroupConf],
) iter.Seq[error]

PullWithPool processes items concurrently using a worker pool and yields errors to the returned sequence. Worker pool size and error handling behavior and filtering are configurable.

Execution is lazy, and does not begin until the sequence is iterated. While there is no _extra_ buffering, each of the pool's worker effectively buffers an item from the pool, if the consumer backs up.

func Run

func Run[T Job](seq iter.Seq[T]) fnx.Worker

Run executes jobs sequentially with abort-on-error handling. Panics are converted to errors. Run returns immediately on the first error (except ers.ErrCurrentOpSkip which is ignored). Terminating errors (io.EOF, ers.ErrCurrentOpAbort) return nil. Context errors abort execution. Jobs are executed serially.

Execution of job processing does not begin until the returned worker function is called. The error value of the worker function is the (filtered) error produced by the erroring job, (or nil, if there were no errors.)

func RunAll

func RunAll[T Job](seq iter.Seq[T]) fnx.Worker

RunAll executes all jobs sequentially, collecting all errors. Panics are converted to errors. Unlike Run, RunAll exeuctes all jobs, even after an error.

Execution of job processing does not begin until the returned worker function is called. The worker's error is the aggregate error for all jobs.

func RunWithPool

func RunWithPool[T Job](seq iter.Seq[T], opts ...opt.Provider[*WorkerGroupConf]) fnx.Worker

RunWithPool executes jobs concurrently using a worker pool. Panics are converted to errors; continue-on-error, continue-on-panic, and custom error filtering are available via the configuration options. Worker pool size defaults to the number of CPUs, but is also configurable.

Execution of job processing does not begin until the returned worker function is called. RunWithPool blocks until all tasks have returned, and the return value is always the aggregated errors for all jobs that produced an error (or panic).

func TaskHandler

func TaskHandler[J Job](ctx context.Context, job J) error

TaskHandler runs a task. Because this function which has the signature of an fnx.Handler function, can be used as a Handler for WithHandler operations, as needed.

func WithHandler

func WithHandler[T any, H Handler[T]](op H) interface {
	For(seq iter.Seq[T]) iter.Seq[fnx.Worker]
	Run() interface{ For(iter.Seq[T]) fnx.Worker }
	RunAll() interface{ For(iter.Seq[T]) fnx.Worker }
	RunWithPool(opts ...opt.Provider[*WorkerGroupConf]) interface{ For(iter.Seq[T]) fnx.Worker }

	ForEach(seq iter.Seq[T]) interface {
		Pull(context.Context) iter.Seq[error]
		PullAll(context.Context) iter.Seq[error]
		PullWithPool(context.Context, ...opt.Provider[*WorkerGroupConf]) iter.Seq[error]
	}
}

WithHandler provides an ergonomic bridge between sequences with arbitrary types and handler functions, and the worker pools that the execution in the Run and Pull operations.

func WorkerGroupConfAddExcludeErrors

func WorkerGroupConfAddExcludeErrors(errs ...error) opt.Provider[*WorkerGroupConf]

WorkerGroupConfAddExcludeErrors appends the provided errors to the ExcludedErrors value. The provider will never exclude an error that is rooted in ers.ErrRecoveredPanic..

func WorkerGroupConfContinueOnError

func WorkerGroupConfContinueOnError() opt.Provider[*WorkerGroupConf]

WorkerGroupConfContinueOnError toggles the option that allows the operation to continue when the operation encounters an error. Otherwise, any option will lead to an abort.

func WorkerGroupConfContinueOnPanic

func WorkerGroupConfContinueOnPanic() opt.Provider[*WorkerGroupConf]

WorkerGroupConfContinueOnPanic toggles the option that allows the operation to continue when encountering a panic.

func WorkerGroupConfCustomValidatorAppend

func WorkerGroupConfCustomValidatorAppend(vf func(*WorkerGroupConf) error) opt.Provider[*WorkerGroupConf]

WorkerGroupConfCustomValidatorAppend provides a chance to add additional worker group configuration checks (or in practice, default settings), when callers need to enforce more strict validation constraints.

func WorkerGroupConfCustomValidatorReset

func WorkerGroupConfCustomValidatorReset() opt.Provider[*WorkerGroupConf]

WorkerGroupConfCustomValidatorReset removes all custom validation functions.

func WorkerGroupConfDefaults

func WorkerGroupConfDefaults() opt.Provider[*WorkerGroupConf]

WorkerGroupConfDefaults sets the "continue-on-error" option and the "number-of-worers-equals-numcpus" options.

func WorkerGroupConfDisableErrorCollector

func WorkerGroupConfDisableErrorCollector() opt.Provider[*WorkerGroupConf]

WorkerGroupConfDisableErrorCollector disales the default error collector, that collects all non-filtered errors. Use this option for all long-running operations that may collect a large number of errors. These could be.

func WorkerGroupConfIncludeContextErrors

func WorkerGroupConfIncludeContextErrors() opt.Provider[*WorkerGroupConf]

WorkerGroupConfIncludeContextErrors toggles the option that forces the operation to include context errors in the output. By default they are not included.

func WorkerGroupConfNumWorkers

func WorkerGroupConfNumWorkers(num int) opt.Provider[*WorkerGroupConf]

WorkerGroupConfNumWorkers sets the number of workers configured. It is not possible to set this value to less than 1: negative values and 0 are always ignored.

func WorkerGroupConfSet

func WorkerGroupConfSet(opt *WorkerGroupConf) opt.Provider[*WorkerGroupConf]

WorkerGroupConfSet overrides the option with the provided option.

func WorkerGroupConfWithErrorCollector

func WorkerGroupConfWithErrorCollector(ec *erc.Collector) opt.Provider[*WorkerGroupConf]

WorkerGroupConfWithErrorCollector sets an error collector implementation for later use in the Worker GroupOptions.

func WorkerGroupConfWorkerPerCPU

func WorkerGroupConfWorkerPerCPU() opt.Provider[*WorkerGroupConf]

WorkerGroupConfWorkerPerCPU sets the number of workers to the number of detected CPUs by the runtime (e.g. runtime.NumCPU()).

Types

type Handler

type Handler[T any] interface {
	fnx.Handler[T] | fn.Handler[T] | Reader[T]
	Job(T) func(context.Context) error
}

Handler provides a type constraint for processing data, to be used with the WithHandler().For() function in order to use the Run/RunAll/RunWithPool functions with arbitrary sequences.

type Job

type Job interface {
	fnx.Worker | fnx.Operation | Task | Thunk
	Job() func(context.Context) error
}

Job describes the union of the fnx.Worker and fnx.Operation types, allowing the wpa functions to operate on both types without runtime type casting.

type Reader

type Reader[T any] func(T) error

Reader describes a common function signature for use in the WithHandler().For() operation.

func (Reader[T]) Job

func (hf Reader[T]) Job(in T) func(context.Context) error

Job converts a Reader method into an fnx.Worker to satisfy the Handler[T] constraint for use with the WithHandler().For() operation.

type Task

type Task func() error

Task represents a simple error-returning function that takes no arguments. Tasks are more simple units of work for use with worker pool and execution functions in the wpa package.

func (Task) Job

func (sf Task) Job() func(ctx context.Context) error

Job evaluates converts a Task into a panic-safe worker function suitable for use in worker pool implementations (e.g., wpa.RunWithPool). This exists to satisfy the wpa.Job type constraint, enabling interoperability with existing function types.

type Thunk

type Thunk func()

Thunk represents a single niladic function that can be used interchangeably in wpa pool and worker execution.

func (Thunk) Job

func (tf Thunk) Job() func(ctx context.Context) error

Job converts a Thunk into a panic-safe worker function suitable for use in worker pool implementations (e.g., wpa.RunWithPool). This exists to satisfy the wpa.Job type constraint, enabling interoperability with existing function types.

type WorkerGroupConf

type WorkerGroupConf struct {
	// NumWorkers describes the number of parallel workers
	// processing the incoming items and running the map
	// function. All values less than 1 are converted to 1. Any
	// value greater than 1 will result in out-of-sequence results
	// in the output.
	NumWorkers int
	// ContinueOnPanic prevents the operations from halting when a
	// single processing function panics. In all modes mode panics
	// are converted to errors and propagated to the caller.
	ContinueOnPanic bool
	// ContinueOnError allows a processing function to return an
	// error and allow the work of the broader operation to
	// continue. Errors are aggregated propagated to the caller.
	ContinueOnError bool
	// IncludeContextExpirationErrors changes the default handling
	// of context cancellation errors. By default all errors
	// rooted in context cancellation are not propagated to the
	// Close() method, however, when true, these errors are
	// captured. All other error handling semantics
	// (e.g. ContinueOnError) are applicable.
	IncludeContextExpirationErrors bool
	// ExcludedErrors is a list of that should not be included
	// in the collected errors of the
	// output. ers.ErrRecoveredPanic is always included and io.EOF
	// is never included.
	ExcludedErrors []error
	// DisableErrorCollection disables the automatic and default
	// error collection. When true, no default ErrorCollector is
	// used.
	DisableErrorCollection bool
	// ErrorCollector provides a way to connect an existing error
	// collector to a worker group.
	ErrorCollector *erc.Collector
	// CustomValidators are used by some worker group
	// implementations to add specific validation rules.
	CustomValidators []func(*WorkerGroupConf) error
}

WorkerGroupConf describes the runtime options to several operations operations. The zero value of this struct provides a usable strict operation.

func (*WorkerGroupConf) CanContinueOnError

func (o *WorkerGroupConf) CanContinueOnError(err error) (out bool)

CanContinueOnError checks an error, collecting it as needed using the WorkerGroupConf, and then returning true if processing should continue and false otherwise.

Neither io.EOF nor ers.ErrCurrentOpSkip errors are ever observed. All panic errors are observed. Context cancellation errors are observed only when configured. as well as context cancellation errors when configured.

func (*WorkerGroupConf) Filter

func (o *WorkerGroupConf) Filter(err error) error

Filter is a method which can be used as an erc.Filter function, and used in worker pool implementations process as configured.

func (*WorkerGroupConf) Validate

func (o *WorkerGroupConf) Validate() error

Validate ensures that the configuration is valid, and returns an error if there are impossible configurations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL