parallel

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package parallel provides concurrent, context-aware sequence transforms for Go iter.Seq and iter.Seq2 values.

The parallel package is useful when each item in a sequence needs independent CPU or I/O work and you want to spread that work across a fixed number of workers. It keeps the same lazy iteration style as the rest of the library while letting callers control concurrency with standard context cancellation.

Use ParallelMapSeq, BatchMapSeq, and OrderedParallelMapSeq with plain iter.Seq[T] values such as slices.Values(...). Use ParallelMap, BatchMap, and OrderedParallelMap with iter.Seq2[T, error] values so source errors can flow through the pipeline instead of being dropped.

Example:

ctx := context.Background()
numbers := slices.Values([]int{1, 2, 3, 4})

for v := range parallel.ParallelMapSeq(ctx, numbers, func(n int) int {
	return n * 2
}, 2) {
	fmt.Println(v)
}

ParallelMapSeq and ParallelMap may yield results out of order when multiple workers are used, and they stop quickly when the context is cancelled or the consumer stops early. BatchMapSeq and BatchMap preserve batch order and do not spin up worker goroutines by themselves.

Error-returning variants

ParallelMapErr, OrderedParallelMapErr, ParallelMapSeqErr, and OrderedParallelMapSeqErr accept fn func(T) (U, error). Errors returned by fn are wrapped with vortex.Wrap and yielded inline alongside source-sequence errors. The pipeline is not terminated when fn returns an error — processing continues with the remaining items.

Panic safety

Worker and source-sequence panics are recovered and surfaced as wrapped errors through the returned iter.Seq2 (for Seq2 and *Err variants) or cause the pipeline to cancel cleanly (for plain-Seq variants without an error channel). A worker that panics exits without processing further items — the pool shrinks by one for the rest of that call. Throughput degrades but ordering and error delivery remain correct.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func BatchMap

func BatchMap[T, U any](ctx context.Context, seq iter.Seq2[T, error], fn func([]T) []U, batchSize int) iter.Seq2[U, error]

BatchMap groups items into batches of size n and processes each batch. Errors from the underlying sequence are yielded inline and do not enter batches.

func BatchMapSeq added in v1.0.0

func BatchMapSeq[T, U any](ctx context.Context, seq iter.Seq[T], fn func([]T) []U, batchSize int) iter.Seq[U]

BatchMapSeq groups items into batches of size n and processes each batch.

Example
package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/parallel"
)

func main() {
	numbers := slices.Values([]int{1, 2, 3, 4})

	for v := range parallel.BatchMapSeq(context.Background(), numbers, func(batch []int) []int {
		out := make([]int, len(batch))
		for i, n := range batch {
			out[i] = n * 10
		}
		return out
	}, 2) {
		fmt.Println(v)
	}
}
Output:
10
20
30
40

func OrderedParallelMap added in v0.1.8

func OrderedParallelMap[T, U any](
	ctx context.Context,
	seq iter.Seq2[T, error],
	fn func(T) U,
	workers int,
) iter.Seq2[U, error]

OrderedParallelMap applies fn to each element of seq concurrently using worker goroutines and yields results and errors in the original input order.

func OrderedParallelMapErr added in v1.0.1

func OrderedParallelMapErr[T, U any](
	ctx context.Context,
	seq iter.Seq2[T, error],
	fn func(T) (U, error),
	workers int,
) iter.Seq2[U, error]

OrderedParallelMapErr applies fn concurrently with `workers` goroutines and yields results and errors in the original input order. Errors (from seq, fn, or worker panics) occupy the slot where the failing item would have been; surrounding items are not shifted.

fn must be safe to call concurrently. A panic inside fn is recovered and surfaced as a worker error so it cannot crash the program.

func OrderedParallelMapSeq added in v1.0.0

func OrderedParallelMapSeq[T, U any](
	ctx context.Context,
	seq iter.Seq[T],
	fn func(T) U,
	workers int,
) iter.Seq[U]

OrderedParallelMapSeq applies fn to each element of seq concurrently using workers goroutines and yields results in the original input order.

Example
package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/parallel"
)

func main() {
	ctx := context.Background()
	numbers := slices.Values([]int{1, 2, 3, 4, 5})

	for v := range parallel.OrderedParallelMapSeq(ctx, numbers, func(n int) int {
		return n * 2
	}, 3) {
		fmt.Println(v)
	}
}
Output:
2
4
6
8
10
Example (Strings)
package main

import (
	"context"
	"fmt"
	"slices"
	"strings"

	"github.com/MostafaMagdSalama/vortex/parallel"
)

func main() {
	ctx := context.Background()
	words := slices.Values([]string{"hello", "world", "foo"})

	for v := range parallel.OrderedParallelMapSeq(ctx, words, strings.ToUpper, 2) {
		fmt.Println(v)
	}
}
Output:
HELLO
WORLD
FOO

func OrderedParallelMapSeqErr added in v1.0.1

func OrderedParallelMapSeqErr[T, U any](
	ctx context.Context,
	seq iter.Seq[T],
	fn func(T) (U, error),
	workers int,
) iter.Seq2[U, error]

OrderedParallelMapSeqErr is the ordered variant of ParallelMapSeqErr. Results are yielded in input order. fn errors and worker panics are surfaced through the returned iter.Seq2[U, error].

func ParallelMap

func ParallelMap[T, U any](ctx context.Context, seq iter.Seq2[T, error], fn func(T) U, workers int) iter.Seq2[U, error]

ParallelMap processes each element concurrently with n workers. Errors from the underlying sequence are passed through untouched.

Example
package main

import (
	"context"
	"fmt"
	"iter"

	"github.com/MostafaMagdSalama/vortex/parallel"
)

func seq2FromSlice[T any](items []T) iter.Seq2[T, error] {
	return func(yield func(T, error) bool) {
		for _, item := range items {
			if !yield(item, nil) {
				return
			}
		}
	}
}

func main() {
	numbers := seq2FromSlice([]int{1, 2, 3})

	for v, err := range parallel.ParallelMap(context.Background(), numbers, func(n int) int {
		return n * 2
	}, 1) {
		if err != nil {
			fmt.Println("error:", err)
			continue
		}
		fmt.Println(v)
	}
}
Output:
2
4
6

func ParallelMapErr added in v1.0.1

func ParallelMapErr[T, U any](
	ctx context.Context,
	seq iter.Seq2[T, error],
	fn func(T) (U, error),
	workers int,
) iter.Seq2[U, error]

ParallelMapErr applies fn concurrently with `workers` goroutines. Both errors yielded by seq and errors returned by fn are wrapped with vortex.Wrap and yielded inline. Output order is unspecified.

fn must be safe to call concurrently. A panic inside fn is recovered and surfaced as a worker error so it cannot crash the program.

func ParallelMapSeq added in v1.0.0

func ParallelMapSeq[T, U any](ctx context.Context, seq iter.Seq[T], fn func(T) U, workers int) iter.Seq[U]

ParallelMapSeq processes each element concurrently with n workers.

Example
package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/parallel"
)

func main() {
	numbers := slices.Values([]int{1, 2, 3})

	for v := range parallel.ParallelMapSeq(context.Background(), numbers, func(n int) int {
		return n * 2
	}, 1) {
		fmt.Println(v)
	}
}
Output:
2
4
6

func ParallelMapSeqErr added in v1.0.1

func ParallelMapSeqErr[T, U any](
	ctx context.Context,
	seq iter.Seq[T],
	fn func(T) (U, error),
	workers int,
) iter.Seq2[U, error]

ParallelMapSeqErr is like ParallelMapErr but takes a plain iter.Seq[T]. fn errors and worker panics are surfaced through the returned iter.Seq2[U, error]. Output order is unspecified.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL