ppln

package
v0.1.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2022 License: MIT Imports: 3 Imported by: 2

Documentation

Overview

Package ppln provides generic parallel processing pipelines.

NOTE: this API is currently experimental and may change in future releases.

General usage

This package provides two modes of operation: serial and non-serial. Serial transforms each value of type T1 to a value of type T2. The outputs are ordered in the same order of the inputs. Non-serial transforms each value of type T1 to zero or more values of type T2. The order of the outputs is arbitrary, but correlated with the order of inputs.

Each of the functions blocks the calling function until either the processing is done (puller was called on the last value) or until stopped.

Stopping

Each user-function (pusher, mapper, puller) receives a Stopper instance. It can be used to stop the pipeline prematurely, and to check whether Stop was called. After calling this function no further calls to mapper and puller will be made. Pusher should check for Stopped and stop itself if necessary.

Number of goroutines

Each pipeline creates ngoroutines+2 new goroutines and blocks the calling one. There is one pusher goroutine, one puller goroutine, and ngoroutines mapper goroutines.

A special case is when ngoroutines==1, in which case no new goroutines are created. Processing is done serially using the calling goroutine.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NonSerial

func NonSerial[T1 any, T2 any](
	ngoroutines int,
	pusher func(push func(T1), s Stopper),
	mapper func(a T1, push func(T2), g int, s Stopper),
	puller func(a T2, s Stopper))

NonSerial starts a multi-goroutine transformation pipeline.

Pusher should call push on every input value. Mapper receives an input (a), a push function for the results, 0-based goroutine number (g), and a Stopper. It should call push zero or more times with the processing results of a. Puller acts on a single output. The order of the outputs is arbitrary, but correlated with the order of pusher's inputs.

func Serial

func Serial[T1 any, T2 any](
	ngoroutines int,
	pusher func(push func(T1), s Stopper),
	mapper func(a T1, i int, g int, s Stopper) T2,
	puller func(a T2, s Stopper))

Serial starts a multi-goroutine transformation pipeline that maintains the order of the inputs.

Pusher should call push on every input value. Mapper receives an input (a), 0-based input serial number (i), 0-based goroutine number (g), and a Stopper, and returns the result of processing a. Puller acts on a single result, and will be called by the same order of pusher's input.

Example
ngoroutines := 4
var results []float64

Serial(
	ngoroutines,
	// Read/generate input data.
	func(push func(int), s Stopper) {
		for i := 1; i <= 100; i++ {
			push(i)
		}
	},
	// Some processing.
	func(a int, i, g int, s Stopper) float64 {
		return float64(a*a) + 0.5
	},
	// Accumulate/forward outputs.
	func(a float64, s Stopper) {
		results = append(results, a)
	})

fmt.Println(results[:3], results[len(results)-3:])
Output:

[1.5 4.5 9.5] [9604.5 9801.5 10000.5]
Example (ParallelAggregation)
ngoroutines := 4
results := make([]int, ngoroutines) // Goroutine-specific data and objects.

Serial(
	ngoroutines,
	// Read/generate input data.
	func(push func(int), s Stopper) {
		for i := 1; i <= 100; i++ {
			push(i)
		}
	},
	// Accumulate in goroutine-specific memory.
	func(a int, i, g int, s Stopper) int {
		results[g] += a
		return 0 // Unused.
	},
	// No outputs.
	func(a int, s Stopper) {})

// Collect the results of all goroutines.
fmt.Println("Sum of 1-100:", gnum.Sum(results))
Output:

Sum of 1-100: 5050

Types

type Stopper

type Stopper chan struct{}

A Stopper is used in pipelines to communicate that processing should stop.

func (Stopper) Stop

func (s Stopper) Stop()

Stop sets Stopped to true. After calling this function no further calls to mapper and puller will be made. Pusher should stop itself if stopped.

func (Stopper) Stopped

func (s Stopper) Stopped() bool

Stopped returns whether Stop was called.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL