itertool

package
v0.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package itertool provides a set of functional helpers for managinging and using fun.Iterator implementations, including a parallel Map/Reduce, Merge, and other convenient tools.

Index

Constants

This section is empty.

Variables

View Source
var ErrAbortGenerator = errors.New("abort generator signal")

ErrAbortGenerator is a sentinel error returned by generators to abort. This error is never propagated to calling functions.

Functions

func Channel

func Channel[T any](pipe <-chan T) fun.Iterator[T]

Channel produces an iterator for a specified channel. The iterator does not start any background threads.

func CollectChannel

func CollectChannel[T any](ctx context.Context, iter fun.Iterator[T]) <-chan T

CollectChannel converts and iterator to a channel. The iterator is not closed.

func CollectSlice

func CollectSlice[T any](ctx context.Context, iter fun.Iterator[T]) ([]T, error)

CollectSlice converts an iterator to the slice of it's values, and closes the iterator at the when the iterator has been exhausted..

In the case of an error in the underlying iterator the output slice will have the values encountered before the error.

func Contains added in v0.9.1

func Contains[T comparable](ctx context.Context, item T, iter fun.Iterator[T]) bool

Contains processes an iterator of compareable type returning true after the first element that equals item, and false otherwise.

func FromMap added in v0.9.3

func FromMap[K comparable, V any](in map[K]V) fun.Iterator[fun.Pair[K, V]]

FromMap converts a map into an iterator of fun.Pair objects. The iterator is panic-safe, and uses one go routine to track the progress through the map. As a result you should always, either exhaust the iterator, cancel the context that you pass to the iterator OR call iterator.Close().

To use this iterator the items in the map are not copied, and the iteration order is randomized following the convention in go.

Use in combination with other iterator processing tools (generators, observers, transformers, etc.) to limit the number of times a collection of data must be coppied.

func Generate added in v0.2.0

func Generate[T any](
	ctx context.Context,
	fn func(context.Context) (T, error),
	opts Options,
) fun.Iterator[T]

Generate creates an iterator using a generator pattern which produces items until the context is canceled or the generator function returns ErrAbortGenerator. Parallel operation is also available and Generate shares configuration and semantics with the Map operation.

func Map

func Map[T any, O any](
	ctx context.Context,
	iter fun.Iterator[T],
	mapper func(context.Context, T) (O, error),
	opts Options,
) fun.Iterator[O]

Map provides an orthodox functional map implementation based around fun.Iterator. Operates in asynchronous/streaming manner, so that the output Iterator must be consumed. The zero values of Options provide reasonable defaults for abort-on-error and single-threaded map operation.

If the mapper function errors, the result isn't included, but the errors would be aggregated and propagated to the `Close()` method of the resulting iterator. If there are more than one error (as is the case with a panic or with ContinueOnError semantics,) the error is an *erc.Stack object. Panics in the map function are converted to errors and handled according to the ContinueOnPanic option.

func MarshalJSON added in v0.3.3

func MarshalJSON[T any](ctx context.Context, iter fun.Iterator[T]) ([]byte, error)

MarshalJSON is useful for implementing json.Marshaler methods from iterator-supporting types. Wrapping the standard library's json.Marshal method, it produces a byte array of encoded JSON documents.

func Merge

func Merge[T any](ctx context.Context, iters ...fun.Iterator[T]) fun.Iterator[T]

Merge combines a set of related iterators into a single iterator. Starts a thread to consume from each iterator and does not otherwise guarantee the iterator's order.

func ParallelForEach added in v0.4.0

func ParallelForEach[T any](
	ctx context.Context,
	iter fun.Iterator[T],
	fn func(context.Context, T) error,
	opts Options,
) (err error)

ParallelForEach processes the iterator in parallel, and is essentially an iterator-driven worker pool. The input iterator is split dynamically into iterators for every worker (determined by Options.NumWorkers,) with the division between workers determined by their processing speed (e.g. workers should not suffer from head-of-line blocking,) and input iterators are consumed (safely) as work is processed.

Because there is no output in these operations Options.OutputBufferSize is ignored.

func Reduce

func Reduce[T any, O any](
	ctx context.Context,
	iter fun.Iterator[T],
	reducer func(T, O) (O, error),
	initalValue O,
) (value O, err error)

Reduce processes an input iterator with a reduce function and outputs the final value. The initial value may be a zero or nil value.

func Slice

func Slice[T any](in []T) fun.Iterator[T]

Slice produces an iterator for an arbitrary slice.

func Split added in v0.2.0

func Split[T any](ctx context.Context, numSplits int, input fun.Iterator[T]) []fun.Iterator[T]

Split produces an arbitrary number of iterators which divide the input. The division is lazy and depends on the rate of consumption of output iterators, but every item from the input iterator is sent to exactly one output iterator, each of which can be safely used from a different go routine.

The input iterator is not closed after the output iterators are exhausted. If the context passed to split is closed, Split will no stop populating the output iterators.

func Synchronize

func Synchronize[T any](in fun.Iterator[T]) fun.Iterator[T]

Synchronized produces wraps an existing iterator with one that is protected by a mutex. The underling implementation provides an Unwrap method.

Even when synchronized in this manner, Iterators are generally not safe for concurrent access from multiple go routines, as Next() and Value() calls may interleave. The underlying implementation has a special case with fun.IterateOne which does make it safe for concurrent use, if the iterator is only accessed using fun.IterateOne.

func UnmarshalJSON added in v0.3.3

func UnmarshalJSON[T any](in []byte) fun.Iterator[T]

UnmarshalJSON reads a JSON input and produces an iterator of the items. The implementation reads all items from the slice before returning. Any errors encountered are propgated to the Close method of the iterator.

func Variadic added in v0.5.0

func Variadic[T any](in ...T) fun.Iterator[T]

Variadic is a wrapper around Slice() for more ergonomic use at some call sites.

Types

type Options added in v0.2.0

type Options struct {
	// ContinueOnPanic forces the entire IteratorMap operation to
	// halt when a single map function panics. All panics are
	// converted to errors and propagated to the output iterator's
	// Close() method.
	ContinueOnPanic bool
	// ContinueOnError allows a map or generate function to return
	// an error and allow the work of the broader operation to
	// continue. Errors are aggregated propagated to the output
	// iterator's Close() method.
	ContinueOnError bool
	// NumWorkers describes the number of parallel workers
	// processing the incoming iterator items and running the map
	// function. All values less than 1 are converted to 1. Any
	// value greater than 1 will result in out-of-sequence results
	// in the output iterator.
	NumWorkers int
	// OutputBufferSize controls how buffered the output pipe on the
	// iterator should be. Typically this should be zero, but
	// there are workloads for which a moderate buffer may be
	// useful.
	OutputBufferSize int
	// IncludeContextExpirationErrors changes the default handling
	// of context cancellation errors. By default all errors
	// rooted in context cancellation are not propagated to the
	// Close() method, however, when true, these errors are
	// captured. All other error handling semantics
	// (e.g. ContinueOnError) are applicable.
	IncludeContextExpirationErrors bool
	// SkipErrorCheck, when specified, should return true if the
	// error should be skipped and false otherwise.
	SkipErrorCheck func(error) bool
}

Options describes the runtime options to several operations operations. The zero value of this struct provides a usable strict operation.

type RangeFunction added in v0.2.0

type RangeFunction[T any] func(context.Context, *T) bool

RangeFunction describes a function that operates similar to the range keyword in the language specification, but that bridges the gap between fun.Iterators and range statements.

func Range added in v0.2.0

func Range[T any](ctx context.Context, iter fun.Iterator[T]) RangeFunction[T]

Range produces a function that can be used like an iterator, but that is safe for concurrent use from multiple go routines. (e.g. the output of the function synchronizes the output of Next() and Value()): for example:

var out type
for rf(ctx, &out) {
     // do work
}

Range does not provide a convenient way to close or access the error state of the iterator, which you must synchronize on your own. The safety of range assumes that you do not interact with the iterator outside of the range function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL