Documentation
¶
Overview ¶
Package ppln provides generic parallel processing pipelines.
NOTE: this API is currently experimental and may change in future releases.
General usage ¶
This package provides two modes of operation: serial and non-serial. Serial transforms each value of type T1 to a value of type T2. The outputs are ordered in the same order of the inputs. Non-serial transforms each value of type T1 to zero or more values of type T2. The order of the outputs is arbitrary, but correlated with the order of inputs.
Each of the functions blocks the calling function until either the processing is done (puller was called on the last value) or until stopped.
Stopping ¶
Each user-function (pusher, mapper, puller) may return an error. Returning a non-nil error stops the pipeline prematurely, and that error will be propagated to the caller.
Number of goroutines ¶
Each pipeline creates ngoroutines+2 new goroutines and blocks the calling one. There is one pusher goroutine, one puller goroutine, and ngoroutines mapper goroutines.
A special case is when ngoroutines==0, in which case no new goroutines are created. Processing is done serially using the calling goroutine.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NonSerial ¶
func NonSerial[T1 any, T2 any]( ngoroutines int, pusher func(push func(T1), stop func() bool) error, mapper func(a T1, push func(T2), g int) error, puller func(a T2) error) error
NonSerial starts a multi-goroutine transformation pipeline.
Pusher should call push on every input value. Stop indicates if an error was returned and pushing should stop. Mapper receives an input (a), a push function for the results, 0-based goroutine number (g). It should call push zero or more times with the processing results of a. Puller acts on a single output. The order of the outputs is arbitrary, but correlated with the order of pusher's inputs.
If one of the functions returns a non-nil error, the process stops and the error is returned. Otherwise returns nil.
func Serial ¶
func Serial[T1 any, T2 any]( ngoroutines int, pusher func(push func(T1), stop func() bool) error, mapper func(a T1, i int, g int) (T2, error), puller func(a T2) error) error
Serial starts a multi-goroutine transformation pipeline that maintains the order of the inputs.
Pusher should call push on every input value. Stop indicates if an error was returned and pushing should stop. Mapper receives an input (a), 0-based input serial number (i), 0-based goroutine number (g), and returns the result of processing a. Puller acts on a single result, and will be called by the same order of pusher's input.
If one of the functions returns a non-nil error, the process stops and the error is returned. Otherwise returns nil.
Example ¶
ngoroutines := 4
var results []float64
Serial(
ngoroutines,
// Read/generate input data.
func(push func(int), stop func() bool) error {
for i := 1; i <= 100; i++ {
push(i)
}
return nil
},
// Some processing.
func(a int, i, g int) (float64, error) {
return float64(a*a) + 0.5, nil
},
// Accumulate/forward outputs.
func(a float64) error {
results = append(results, a)
return nil
})
fmt.Println(results[:3], results[len(results)-3:])
Output: [1.5 4.5 9.5] [9604.5 9801.5 10000.5]
Example (ParallelAggregation) ¶
ngoroutines := 4
results := make([]int, ngoroutines) // Goroutine-specific data and objects.
Serial(
ngoroutines,
// Read/generate input data.
func(push func(int), stop func() bool) error {
for i := 1; i <= 100; i++ {
push(i)
}
return nil
},
// Accumulate in goroutine-specific memory.
func(a int, i, g int) (int, error) {
results[g] += a
return 0, nil // Unused.
},
// No outputs.
func(a int) error { return nil })
// Collect the results of all goroutines.
fmt.Println("Sum of 1-100:", gnum.Sum(results))
Output: Sum of 1-100: 5050
Types ¶
This section is empty.