Documentation
¶
Overview ¶
Package ppln provides generic parallel processing pipelines.
General usage ¶
This package provides two modes of operation: serial and non-serial. In Serial the outputs are ordered in the same order of the inputs. In NonSerial the order of outputs is arbitrary, but correlated with the order of inputs.
Each of the functions blocks the calling function until either the processing is done (output was called on the last value) or until an error is returned.
Stopping ¶
Each user-function (input, transform, output) may return an error. Returning a non-nil error stops the pipeline prematurely, and that error is returned to the caller.
Experimental ¶
This package relies on the experimental iter package. In order to use it, go 1.22 is required with GOEXPERIMENT=rangefunc.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NonSerial ¶
func NonSerial[T1 any, T2 any]( ngoroutines int, input iter.Seq2[T1, error], transform func(a T1, g int) (T2, error), output func(a T2) error) error
NonSerial starts a multi-goroutine transformation pipeline.
Input is an iterator over the input values to be transformed. It will be called in a thread-safe manner. Transform receives an input (a) and a 0-based goroutine number (g), and returns the result of processing a. Output acts on a single result, and will be called in a thread-safe manner. The order of outputs is arbitrary, but correlated with the order of inputs.
If one of the functions returns a non-nil error, the process stops and the error is returned. Otherwise returns nil.
func RangeInput ¶ added in v1.0.1
RangeInput returns a function that iterates over a range of integers, starting at start and ending at (and excluding) stop, to be used as the input function in Serial and NonSerial.
func Serial ¶
func Serial[T1 any, T2 any]( ngoroutines int, input iter.Seq2[T1, error], transform func(a T1, i int, g int) (T2, error), output func(a T2) error) error
Serial starts a multi-goroutine transformation pipeline that maintains the order of the inputs.
Input is an iterator over the input values to be transformed. It will be called in a thread-safe manner. Transform receives an input (a), 0-based input serial number (i), 0-based goroutine number (g), and returns the result of processing a. Output acts on a single result, and will be called by the same order of the input, in a thread-safe manner.
If one of the functions returns a non-nil error, the process stops and the error is returned. Otherwise returns nil.
Example ¶
ngoroutines := 4
var results []float64
Serial[int, float64](ngoroutines,
// Read/generate input data.
RangeInput(1, 101),
// Some processing.
func(a, i, g int) (float64, error) {
return float64(a*a) + 0.5, nil
},
// Accumulate/forward outputs.
func(a float64) error {
results = append(results, a)
return nil
})
fmt.Println(results[:3], results[len(results)-3:])
Output: [1.5 4.5 9.5] [9604.5 9801.5 10000.5]
Example (ParallelAggregation) ¶
ngoroutines := 4
results := make([]int, ngoroutines) // Goroutine-specific data and objects.
Serial(
ngoroutines,
// Read/generate input data.
RangeInput(1, 101),
// Accumulate in goroutine-specific memory.
func(a int, i, g int) (int, error) {
results[g] += a
return 0, nil // Unused.
},
// No outputs.
func(a int) error { return nil })
// Collect the results of all goroutines.
fmt.Println("Sum of 1-100:", gnum.Sum(results))
Output: Sum of 1-100: 5050
Types ¶
This section is empty.