ppln

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2025 License: MIT Imports: 5 Imported by: 2

Documentation

Overview

Package ppln provides generic parallel processing pipelines.

General usage

This package provides two modes of operation: serial and non-serial. In Serial the outputs are ordered in the same order of the inputs. In NonSerial the order of outputs is arbitrary, but correlated with the order of inputs.

Each of the functions blocks the calling function until either the processing is done (output was called on the last value) or until an error is returned.

Stopping

Each user-function (input, transform, output) may return an error. Returning a non-nil error stops the pipeline prematurely, and that error is returned to the caller.

Experimental

This package relies on the experimental iter package. In order to use it, go 1.22 is required with GOEXPERIMENT=rangefunc.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NonSerial

func NonSerial[T1 any, T2 any](
	ngoroutines int,
	input iter.Seq2[T1, error],
	transform func(a T1, g int) (T2, error),
	output func(a T2) error) error

NonSerial starts a multi-goroutine transformation pipeline.

Input is an iterator over the input values to be transformed. It will be called in a thread-safe manner. Transform receives an input (a) and a 0-based goroutine number (g), and returns the result of processing a. Output acts on a single result, and will be called in a thread-safe manner. The order of outputs is arbitrary, but correlated with the order of inputs.

If one of the functions returns a non-nil error, the process stops and the error is returned. Otherwise returns nil.

func RangeInput added in v1.0.1

func RangeInput(start, stop int) iter.Seq2[int, error]

RangeInput returns a function that iterates over a range of integers, starting at start and ending at (and excluding) stop, to be used as the input function in Serial and NonSerial.

func Serial

func Serial[T1 any, T2 any](
	ngoroutines int,
	input iter.Seq2[T1, error],
	transform func(a T1, i int, g int) (T2, error),
	output func(a T2) error) error

Serial starts a multi-goroutine transformation pipeline that maintains the order of the inputs.

Input is an iterator over the input values to be transformed. It will be called in a thread-safe manner. Transform receives an input (a), 0-based input serial number (i), 0-based goroutine number (g), and returns the result of processing a. Output acts on a single result, and will be called by the same order of the input, in a thread-safe manner.

If one of the functions returns a non-nil error, the process stops and the error is returned. Otherwise returns nil.

Example
ngoroutines := 4
var results []float64

Serial[int, float64](ngoroutines,
	// Read/generate input data.
	RangeInput(1, 101),
	// Some processing.
	func(a, i, g int) (float64, error) {
		return float64(a*a) + 0.5, nil
	},
	// Accumulate/forward outputs.
	func(a float64) error {
		results = append(results, a)
		return nil
	})

fmt.Println(results[:3], results[len(results)-3:])
Output:

[1.5 4.5 9.5] [9604.5 9801.5 10000.5]
Example (ParallelAggregation)
ngoroutines := 4
results := make([]int, ngoroutines) // Goroutine-specific data and objects.

Serial(
	ngoroutines,
	// Read/generate input data.
	RangeInput(1, 101),
	// Accumulate in goroutine-specific memory.
	func(a int, i, g int) (int, error) {
		results[g] += a
		return 0, nil // Unused.
	},
	// No outputs.
	func(a int) error { return nil })

// Collect the results of all goroutines.
fmt.Println("Sum of 1-100:", gnum.Sum(results))
Output:

Sum of 1-100: 5050

func SliceInput added in v1.0.1

func SliceInput[T any](s []T) iter.Seq2[T, error]

SliceInput returns a function that iterates over a slice, to be used as the input function in Serial and NonSerial.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL