Documentation
¶
Overview ¶
Package itertool provides a set of functional helpers for managinging and using fun.Iterator implementations, including a parallel Map/Reduce, Merge, and other convenient tools.
Index ¶
- Variables
- func Channel[T any](pipe <-chan T) fun.Iterator[T]
- func CollectChannel[T any](ctx context.Context, iter fun.Iterator[T]) <-chan T
- func CollectSlice[T any](ctx context.Context, iter fun.Iterator[T]) ([]T, error)
- func Contains[T comparable](ctx context.Context, item T, iter fun.Iterator[T]) bool
- func Generate[T any](ctx context.Context, fn func(context.Context) (T, error), opts Options) fun.Iterator[T]
- func Map[T any, O any](ctx context.Context, iter fun.Iterator[T], ...) fun.Iterator[O]
- func MarshalJSON[T any](ctx context.Context, iter fun.Iterator[T]) ([]byte, error)
- func Merge[T any](ctx context.Context, iters ...fun.Iterator[T]) fun.Iterator[T]
- func ParallelForEach[T any](ctx context.Context, iter fun.Iterator[T], fn func(context.Context, T) error, ...) (err error)
- func Reduce[T any, O any](ctx context.Context, iter fun.Iterator[T], reducer func(T, O) (O, error), ...) (value O, err error)
- func Slice[T any](in []T) fun.Iterator[T]
- func Split[T any](ctx context.Context, numSplits int, input fun.Iterator[T]) []fun.Iterator[T]
- func Synchronize[T any](in fun.Iterator[T]) fun.Iterator[T]
- func UnmarshalJSON[T any](in []byte) fun.Iterator[T]
- func Variadic[T any](in ...T) fun.Iterator[T]
- type Options
- type RangeFunction
Constants ¶
This section is empty.
Variables ¶
var ErrAbortGenerator = errors.New("abort generator signal")
ErrAbortGenerator is a sentinel error returned by generators to abort. This error is never propagated to calling functions.
Functions ¶
func Channel ¶
Channel produces an iterator for a specified channel. The iterator does not start any background threads.
func CollectChannel ¶
CollectChannel converts and iterator to a channel. The iterator is not closed.
func CollectSlice ¶
CollectSlice converts an iterator to the slice of it's values, and closes the iterator at the when the iterator has been exhausted..
In the case of an error in the underlying iterator the output slice will have the values encountered before the error.
func Contains ¶ added in v0.9.1
Contains processes an iterator of compareable type returning true after the first element that equals item, and false otherwise.
func Generate ¶ added in v0.2.0
func Generate[T any]( ctx context.Context, fn func(context.Context) (T, error), opts Options, ) fun.Iterator[T]
Generate creates an iterator using a generator pattern which produces items until the context is canceled or the generator function returns ErrAbortGenerator. Parallel operation is also available and Generate shares configuration and semantics with the Map operation.
func Map ¶
func Map[T any, O any]( ctx context.Context, iter fun.Iterator[T], mapper func(context.Context, T) (O, error), opts Options, ) fun.Iterator[O]
Map provides an orthodox functional map implementation based around fun.Iterator. Operates in asynchronous/streaming manner, so that the output Iterator must be consumed. The zero values of Options provide reasonable defaults for abort-on-error and single-threaded map operation.
If the mapper function errors, the result isn't included, but the errors would be aggregated and propagated to the `Close()` method of the resulting iterator. If there are more than one error (as is the case with a panic or with ContinueOnError semantics,) the error is an *erc.Stack object. Panics in the map function are converted to errors and handled according to the ContinueOnPanic option.
func MarshalJSON ¶ added in v0.3.3
MarshalJSON is useful for implementing json.Marshaler methods from iterator-supporting types. Wrapping the standard library's json.Marshal method, it produces a byte array of encoded JSON documents.
func Merge ¶
Merge combines a set of related iterators into a single iterator. Starts a thread to consume from each iterator and does not otherwise guarantee the iterator's order.
func ParallelForEach ¶ added in v0.4.0
func ParallelForEach[T any]( ctx context.Context, iter fun.Iterator[T], fn func(context.Context, T) error, opts Options, ) (err error)
ParallelForEach processes the iterator in parallel, and is essentially an iterator-driven worker pool. The input iterator is split dynamically into iterators for every worker (determined by Options.NumWorkers,) with the division between workers determined by their processing speed (e.g. workers should not suffer from head-of-line blocking,) and input iterators are consumed (safely) as work is processed.
Because there is no output in these operations Options.OutputBufferSize is ignored.
func Reduce ¶
func Reduce[T any, O any]( ctx context.Context, iter fun.Iterator[T], reducer func(T, O) (O, error), initalValue O, ) (value O, err error)
Reduce processes an input iterator with a reduce function and outputs the final value. The initial value may be a zero or nil value.
func Split ¶ added in v0.2.0
Split produces an arbitrary number of iterators which divide the input. The division is lazy and depends on the rate of consumption of output iterators, but every item from the input iterator is sent to exactly one output iterator, each of which can be safely used from a different go routine.
The input iterator is not closed after the output iterators are exhausted. If the context passed to split is closed, Split will no stop populating the output iterators.
func Synchronize ¶
Synchronized produces wraps an existing iterator with one that is protected by a mutex. The underling implementation provides an Unwrap method.
Even when synchronized in this manner, Iterators are generally not safe for concurrent access from multiple go routines, as Next() and Value() calls may interleave. The underlying implementation has a special case with fun.IterateOne which does make it safe for concurrent use, if the iterator is only accessed using fun.IterateOne.
func UnmarshalJSON ¶ added in v0.3.3
UnmarshalJSON reads a JSON input and produces an iterator of the items. The implementation reads all items from the slice before returning.
Types ¶
type Options ¶ added in v0.2.0
type Options struct {
// ContinueOnPanic forces the entire IteratorMap operation to
// halt when a single map function panics. All panics are
// converted to errors and propagated to the output iterator's
// Close() method.
ContinueOnPanic bool
// ContinueOnError allows a map or generate function to return
// an error and allow the work of the broader operation to
// continue. Errors are aggregated propagated to the output
// iterator's Close() method.
ContinueOnError bool
// NumWorkers describes the number of parallel workers
// processing the incoming iterator items and running the map
// function. All values less than 1 are converted to 1. Any
// value greater than 1 will result in out-of-sequence results
// in the output iterator.
NumWorkers int
// OutputBufferSize controls how buffered the output pipe on the
// iterator should be. Typically this should be zero, but
// there are workloads for which a moderate buffer may be
// useful.
OutputBufferSize int
// IncludeContextExpirationErrors changes the default handling
// of context cancellation errors. By default all errors
// rooted in context cancellation are not propagated to the
// Close() method, however, when true, these errors are
// captured. All other error handling semantics
// (e.g. ContinueOnError) are applicable.
IncludeContextExpirationErrors bool
}
Options describes the runtime options to several operations operations. The zero value of this struct provides a usable strict operation.
type RangeFunction ¶ added in v0.2.0
RangeFunction describes a function that operates similar to the range keyword in the language specification, but that bridges the gap between fun.Iterators and range statements.
func Range ¶ added in v0.2.0
Range produces a function that can be used like an iterator, but that is safe for concurrent use from multiple go routines. (e.g. the output of the function synchronizes the output of Next() and Value()): for example:
var out type
for rf(ctx, &out) {
// do work
}
Range does not provide a convenient way to close or access the error state of the iterator, which you must synchronize on your own. The safety of range assumes that you do not interact with the iterator outside of the range function.