ppln

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2023 License: MIT Imports: 3 Imported by: 2

Documentation

Overview

Package ppln provides generic parallel processing pipelines.

NOTE: this API is currently experimental and may change in future releases.

General usage

This package provides two modes of operation: serial and non-serial. Serial transforms each value of type T1 to a value of type T2. The outputs are ordered in the same order of the inputs. Non-serial transforms each value of type T1 to zero or more values of type T2. The order of the outputs is arbitrary, but correlated with the order of inputs.

Each of the functions blocks the calling function until either the processing is done (puller was called on the last value) or until stopped.

Stopping

Each user-function (pusher, mapper, puller) may return an error. Returning a non-nil error stops the pipeline prematurely, and that error will be propagated to the caller.

Number of goroutines

Each pipeline creates ngoroutines+2 new goroutines and blocks the calling one. There is one pusher goroutine, one puller goroutine, and ngoroutines mapper goroutines.

A special case is when ngoroutines==0, in which case no new goroutines are created. Processing is done serially using the calling goroutine.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NonSerial

func NonSerial[T1 any, T2 any](
	ngoroutines int,
	pusher func(push func(T1), stop func() bool) error,
	mapper func(a T1, push func(T2), g int) error,
	puller func(a T2) error) error

NonSerial starts a multi-goroutine transformation pipeline.

Pusher should call push on every input value. Stop indicates if an error was returned and pushing should stop. Mapper receives an input (a), a push function for the results, 0-based goroutine number (g). It should call push zero or more times with the processing results of a. Puller acts on a single output. The order of the outputs is arbitrary, but correlated with the order of pusher's inputs.

If one of the functions returns a non-nil error, the process stops and the error is returned. Otherwise returns nil.

func Serial

func Serial[T1 any, T2 any](
	ngoroutines int,
	pusher func(push func(T1), stop func() bool) error,
	mapper func(a T1, i int, g int) (T2, error),
	puller func(a T2) error) error

Serial starts a multi-goroutine transformation pipeline that maintains the order of the inputs.

Pusher should call push on every input value. Stop indicates if an error was returned and pushing should stop. Mapper receives an input (a), 0-based input serial number (i), 0-based goroutine number (g), and returns the result of processing a. Puller acts on a single result, and will be called by the same order of pusher's input.

If one of the functions returns a non-nil error, the process stops and the error is returned. Otherwise returns nil.

Example
ngoroutines := 4
var results []float64

Serial(
	ngoroutines,
	// Read/generate input data.
	func(push func(int), stop func() bool) error {
		for i := 1; i <= 100; i++ {
			push(i)
		}
		return nil
	},
	// Some processing.
	func(a int, i, g int) (float64, error) {
		return float64(a*a) + 0.5, nil
	},
	// Accumulate/forward outputs.
	func(a float64) error {
		results = append(results, a)
		return nil
	})

fmt.Println(results[:3], results[len(results)-3:])
Output:

[1.5 4.5 9.5] [9604.5 9801.5 10000.5]
Example (ParallelAggregation)
ngoroutines := 4
results := make([]int, ngoroutines) // Goroutine-specific data and objects.

Serial(
	ngoroutines,
	// Read/generate input data.
	func(push func(int), stop func() bool) error {
		for i := 1; i <= 100; i++ {
			push(i)
		}
		return nil
	},
	// Accumulate in goroutine-specific memory.
	func(a int, i, g int) (int, error) {
		results[g] += a
		return 0, nil // Unused.
	},
	// No outputs.
	func(a int) error { return nil })

// Collect the results of all goroutines.
fmt.Println("Sum of 1-100:", gnum.Sum(results))
Output:

Sum of 1-100: 5050

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL