Documentation
¶
Overview ¶
Package itertool provides a set of functional helpers for managinging and using fun.Iterator implementations, including a parallel Map/Reduce, Merge, and other convenient tools.
Index ¶
- Variables
- func Channel[T any](pipe <-chan T) fun.Iterator[T]
- func CollectChannel[T any](ctx context.Context, iter fun.Iterator[T]) <-chan T
- func CollectSlice[T any](ctx context.Context, iter fun.Iterator[T]) ([]T, error)
- func Filter[T any](ctx context.Context, iter fun.Iterator[T], fn ProcessingFunction[T, T]) fun.Iterator[T]
- func ForEach[T any](ctx context.Context, iter fun.Iterator[T], fn func(context.Context, T) error) (err error)
- func Generate[T any](ctx context.Context, fn func(context.Context) (T, error), opts Options) fun.Iterator[T]
- func Map[T any, O any](ctx context.Context, iter fun.Iterator[T], mapper ProcessingFunction[T, O], ...) fun.Iterator[O]
- func MarshalJSON[T any](ctx context.Context, iter fun.Iterator[T]) ([]byte, error)
- func Merge[T any](ctx context.Context, iters ...fun.Iterator[T]) fun.Iterator[T]
- func Observe[T any](ctx context.Context, iter fun.Iterator[T], obfn func(T)) error
- func ObserveWorker(ctx context.Context, iter fun.Iterator[fun.WorkerFunc], ob func(error))
- func ObserveWorkerPool(ctx context.Context, iter fun.Iterator[fun.WorkerFunc], ob func(error), ...)
- func ParallelForEach[T any](ctx context.Context, iter fun.Iterator[T], fn func(context.Context, T) error, ...) (err error)
- func ParallelObserve[T any](ctx context.Context, iter fun.Iterator[T], obfn func(T), opts Options) error
- func ProcessWork(ctx context.Context, iter fun.Iterator[fun.WorkerFunc]) error
- func Reduce[T any, O any](ctx context.Context, iter fun.Iterator[T], ...) (value O, err error)
- func Slice[T any](in []T) fun.Iterator[T]
- func Split[T any](ctx context.Context, numSplits int, input fun.Iterator[T]) []fun.Iterator[T]
- func Synchronize[T any](in fun.Iterator[T]) fun.Iterator[T]
- func Transform[T any, O any](ctx context.Context, iter fun.Iterator[T], fn ProcessingFunction[T, O]) fun.Iterator[O]
- func UnmarshalJSON[T any](in []byte) fun.Iterator[T]
- func Variadic[T any](in ...T) fun.Iterator[T]
- func WorkerPool(ctx context.Context, iter fun.Iterator[fun.WorkerFunc], opts Options) error
- type Options
- type ProcessingFunction
- func Checker[IN any, OUT any](fn func(context.Context, IN) (OUT, bool)) ProcessingFunction[IN, OUT]
- func Collector[IN any, OUT any](ec *erc.Collector, fn func(context.Context, IN) (OUT, error)) ProcessingFunction[IN, OUT]
- func Mapper[IN any, OUT any](fn func(context.Context, IN) (OUT, error)) ProcessingFunction[IN, OUT]
- func Transformer[IN any, OUT any](fn func(IN) OUT) ProcessingFunction[IN, OUT]
- type RangeFunction
Constants ¶
This section is empty.
Variables ¶
var ErrAbortGenerator = errors.New("abort generator signal")
ErrAbortGenerator is a sentinel error returned by generators to abort. This error is never propagated to calling functions.
Functions ¶
func Channel ¶
Channel produces an iterator for a specified channel. The iterator does not start any background threads.
func CollectChannel ¶
CollectChannel converts and iterator to a channel. The iterator is not closed.
func CollectSlice ¶
CollectSlice converts an iterator to the slice of it's values, and closes the iterator at the when the iterator has been exhausted..
In the case of an error in the underlying iterator the output slice will have the values encountered before the error.
func Filter ¶
func Filter[T any]( ctx context.Context, iter fun.Iterator[T], fn ProcessingFunction[T, T], ) fun.Iterator[T]
Filter passes all objects in an iterator through the specified filter function. If the filter function errors, the operation aborts and the error is reported by the returned iterator's Close method. If the include boolean is true the result of the function is included in the output iterator, otherwise the operation is skipped.
Filter is equivalent to Transform with the same input and output types. Filter operations are processed in a different thread, but are not cached or buffered: to process all options, you must consume the output iterator.
The output iterator is produced iteratively as the returned iterator is consumed.
func ForEach ¶
func ForEach[T any]( ctx context.Context, iter fun.Iterator[T], fn func(context.Context, T) error, ) (err error)
ForEach passes each item in the iterator through the specified handler function, return an error if the handler function errors.
ForEach aborts on the first error and converts any panic into an error which is propagated with other errors.
func Generate ¶ added in v0.2.0
func Generate[T any]( ctx context.Context, fn func(context.Context) (T, error), opts Options, ) fun.Iterator[T]
Generate creates an iterator using a generator pattern which produces items until the context is canceled or the generator function returns ErrAbortGenerator. Parallel operation is also available and Generate shares configuration and semantics with the Map operation.
func Map ¶
func Map[T any, O any]( ctx context.Context, iter fun.Iterator[T], mapper ProcessingFunction[T, O], opts Options, ) fun.Iterator[O]
Map provides an orthodox functional map implementation based around fun.Iterator. Operates in asynchronous/streaming manner, so that the output Iterator must be consumed. The zero values of Options provide reasonable defaults for abort-on-error and single-threaded map operation.
If the mapper function errors, the result isn't included, but the errors would be aggregated and propagated to the `Close()` method of the resulting iterator. If there are more than one error (as is the case with a panic or with ContinueOnError semantics,) the error is an *erc.Stack object. Panics in the map function are converted to errors and handled according to the ContinueOnPanic option.
func MarshalJSON ¶ added in v0.3.3
MarshalJSON is useful for implementing json.Marshaler methods from iterator-supporting types. Wrapping the standard library's json.Marshal method, it produces a byte array of encoded JSON documents.
func Merge ¶
Merge combines a set of related iterators into a single iterator. Starts a thread to consume from each iterator and does not otherwise guarantee the iterator's order.
func Observe ¶ added in v0.6.4
Observe is a special case of ForEach to support observer pattern functions. Observe functions should be short running as they do not take a context, and could block unexpectedly.
Unlike fun.Observe, itertool.Observe collects errors from the iterator's Close method and recovers from panics in the observer function, propagating them in the returned error.
func ObserveWorker ¶ added in v0.8.0
ObserveWorker executes the worker functions from the iterator, and passes all errors through the observe function. If a worker panics, ObserveWorker will abort, convert the panic into an error, and pass that panic through the observe function.
func ObserveWorkerPool ¶ added in v0.8.0
func ObserveWorkerPool(ctx context.Context, iter fun.Iterator[fun.WorkerFunc], ob func(error), opts Options)
ObserveWorkerPool executes the worker functions from the iterator in a worker pool that operates according to the Options configuration: including pool size, and error and panic handling. If a worker panics, ObserveWorkerPool will convert the panic(s) into an error, and pass that panic through the observe function.
func ParallelForEach ¶ added in v0.4.0
func ParallelForEach[T any]( ctx context.Context, iter fun.Iterator[T], fn func(context.Context, T) error, opts Options, ) (err error)
ParallelForEach processes the iterator in parallel, and is essentially an iterator-driven worker pool. The input iterator is split dynamically into iterators for every worker (determined by Options.NumWorkers,) with the division between workers determined by their processing speed (e.g. workers should not suffer from head-of-line blocking,) and input iterators are consumed (safely) as work is processed.
Because there is no output in these operations Options.OutputBufferSize is ignored.
func ParallelObserve ¶ added in v0.6.4
func ParallelObserve[T any]( ctx context.Context, iter fun.Iterator[T], obfn func(T), opts Options, ) error
ParallelObserve is a special case of ParallelObserve to support observer pattern functions. Observe functions should be short running as they do not take a context, and could block unexpectedly.
func ProcessWork ¶ added in v0.8.0
ProcessWork provides a serial implementation of WorkerPool with similar semantics. These operations will abort on the first worker function to error.
func Reduce ¶
func Reduce[T any, O any]( ctx context.Context, iter fun.Iterator[T], reducer func(context.Context, T, O) (O, error), initalValue O, ) (value O, err error)
Reduce processes an input iterator with a reduce function and outputs the final value. The initial value may be a zero or nil value.
func Split ¶ added in v0.2.0
Split produces an arbitrary number of iterators which divide the input. The division is lazy and depends on the rate of consumption of output iterators, but every item from the input iterator is sent to exactly one output iterator, each of which can be safely used from a different go routine.
The input iterator is not closed after the output iterators are exhausted. If the context passed to split is closed, Split will no stop populating the output iterators.
func Synchronize ¶
Synchronized produces wraps an existing iterator with one that is protected by a mutex. The underling implementation provides an Unwrap method.
Even when synchronized in this manner, Iterators are generally not safe for concurrent access from multiple go routines, as Next() and Value() calls may interleave. The Split and Range options provide alternatives for consuming a single iterator from multiple consumers, and the Map operation supports multiple workers.
func Transform ¶ added in v0.4.0
func Transform[T any, O any]( ctx context.Context, iter fun.Iterator[T], fn ProcessingFunction[T, O], ) fun.Iterator[O]
Transform processes the input iterator of type T into an output iterator of type O. This is the same as a Filter, but with different input and output types.
While the transformations themselves are processed in a different go routine, the operations are not buffered or cached and the output iterator must be consumed to process all of the results. For concurrent processing, use the Map() operation.
func UnmarshalJSON ¶ added in v0.3.3
UnmarshalJSON reads a JSON input and produces an iterator of the items. The implementation reads all items from the slice before returning.
func Variadic ¶ added in v0.5.0
Variadic is a wrapper around Slice() for more ergonomic use at some call sites.
func WorkerPool ¶ added in v0.8.0
WorkerPool process the worker functions in the provided iterator. Unlike fun.WorkerPool, this implementation checks has options for handling errors and aborting on error.
The pool follows the semantics configured by the Options, with regards to error handling, panic handling, and parallelism. Errors are collected and propagated WorkerPool output.
Types ¶
type Options ¶ added in v0.2.0
type Options struct {
// ContinueOnPanic forces the entire IteratorMap operation to
// halt when a single map function panics. All panics are
// converted to errors and propagated to the output iterator's
// Close() method.
ContinueOnPanic bool
// ContinueOnError allows a map or generate function to return
// an error and allow the work of the broader operation to
// continue. Errors are aggregated propagated to the output
// iterator's Close() method.
ContinueOnError bool
// NumWorkers describes the number of parallel workers
// processing the incoming iterator items and running the map
// function. All values less than 1 are converted to 1. Any
// value greater than 1 will result in out-of-sequence results
// in the output iterator.
NumWorkers int
// OutputBufferSize controls how buffered the output pipe on the
// iterator should be. Typically this should be zero, but
// there are workloads for which a moderate buffer may be
// useful.
OutputBufferSize int
// IncludeContextExpirationErrors changes the default handling
// of context cancellation errors. By default all errors
// rooted in context cancellation are not propagated to the
// Close() method, however, when true, these errors are
// captured. All other error handling semantics
// (e.g. ContinueOnError) are applicable.
IncludeContextExpirationErrors bool
}
Options describes the runtime options to several operations operations. The zero value of this struct provides a usable strict operation.
type ProcessingFunction ¶ added in v0.4.0
type ProcessingFunction[IN any, OUT any] func(ctx context.Context, input IN) (output OUT, include bool, err error)
ProcessingFunction represents the underlying type used by serveral processing tools. Use the ProcessingFunction constructors to be able to write business logic more ergonomically.
While the generic for ProcessingFunction allows for IN and OUT to be different types, they may be of the same type for Filtering operations.
func Checker ¶ added in v0.6.4
Checker wraps a function, and *never* propogates an error to the calling operation but will include or exclude output based on the second boolean output.
func Collector ¶ added in v0.6.4
func Collector[IN any, OUT any](ec *erc.Collector, fn func(context.Context, IN) (OUT, error)) ProcessingFunction[IN, OUT]
Collector makes adds all errors to the error collector (instructing the iterator to skip these results,) but does not return an error to the outer processing operation. This would be the same as using "ContinueOnError" with the Map() operation.
func Mapper ¶ added in v0.6.4
Mapper wraps a simple function to provide a ProcessingFunction that will include the output of all mapped functions, but propagates errors (which usually abort processing) to the larger operation.
func Transformer ¶ added in v0.8.0
func Transformer[IN any, OUT any](fn func(IN) OUT) ProcessingFunction[IN, OUT]
Transformer for simple transformations of iterators.
type RangeFunction ¶ added in v0.2.0
RangeFunction describes a function that operates similar to the range keyword in the language specification, but that bridges the gap between fun.Iterators and range statements.
func Range ¶ added in v0.2.0
Range produces a function that can be used like an iterator, but that is safe for concurrent use from multiple go routines. (e.g. the output of the function synchronizes the output of Next() and Value()): for example:
var out type
for rf(ctx, &out) {
// do work
}
Range does not provide a convenient way to close or access the error state of the iterator, which you must synchronize on your own. The safety of range assumes that you do not interact with the iterator outside of the range function.