parallel

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package parallel provides concurrent, context-aware sequence transforms for Go iter.Seq and iter.Seq2 values.

The parallel package is useful when each item in a sequence needs independent CPU or I/O work and you want to spread that work across a fixed number of workers. It keeps the same lazy iteration style as the rest of the library while letting callers control concurrency with standard context cancellation.

Use ParallelMapSeq, BatchMapSeq, and OrderedParallelMapSeq with plain iter.Seq[T] values such as slices.Values(...). Use ParallelMap, BatchMap, and OrderedParallelMap with iter.Seq2[T, error] values so source errors can flow through the pipeline instead of being dropped.

Example:

ctx := context.Background()
numbers := slices.Values([]int{1, 2, 3, 4})

for v := range parallel.ParallelMapSeq(ctx, numbers, func(n int) int {
	return n * 2
}, 2) {
	fmt.Println(v)
}

ParallelMapSeq and ParallelMap may yield results out of order when multiple workers are used, and they stop quickly when the context is cancelled or the consumer stops early. BatchMapSeq and BatchMap preserve batch order and do not spin up worker goroutines by themselves.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func BatchMap

func BatchMap[T, U any](ctx context.Context, seq iter.Seq2[T, error], fn func([]T) []U, batchSize int) iter.Seq2[U, error]

BatchMap groups items into batches of size n and processes each batch. Errors from the underlying sequence are yielded inline and do not enter batches.

func BatchMapSeq added in v1.0.0

func BatchMapSeq[T, U any](ctx context.Context, seq iter.Seq[T], fn func([]T) []U, batchSize int) iter.Seq[U]

BatchMapSeq groups items into batches of size n and processes each batch.

Example
package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/parallel"
)

func main() {
	numbers := slices.Values([]int{1, 2, 3, 4})

	for v := range parallel.BatchMapSeq(context.Background(), numbers, func(batch []int) []int {
		out := make([]int, len(batch))
		for i, n := range batch {
			out[i] = n * 10
		}
		return out
	}, 2) {
		fmt.Println(v)
	}
}
Output:
10
20
30
40

func OrderedParallelMap added in v0.1.8

func OrderedParallelMap[T, U any](
	ctx context.Context,
	seq iter.Seq2[T, error],
	fn func(T) U,
	workers int,
) iter.Seq2[U, error]

OrderedParallelMap applies fn to each element of seq concurrently using worker goroutines and yields results and errors in the original input order.

func OrderedParallelMapSeq added in v1.0.0

func OrderedParallelMapSeq[T, U any](
	ctx context.Context,
	seq iter.Seq[T],
	fn func(T) U,
	workers int,
) iter.Seq[U]

OrderedParallelMapSeq applies fn to each element of seq concurrently using workers goroutines and yields results in the original input order.

Example
package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/parallel"
)

func main() {
	ctx := context.Background()
	numbers := slices.Values([]int{1, 2, 3, 4, 5})

	for v := range parallel.OrderedParallelMapSeq(ctx, numbers, func(n int) int {
		return n * 2
	}, 3) {
		fmt.Println(v)
	}
}
Output:
2
4
6
8
10
Example (Strings)
package main

import (
	"context"
	"fmt"
	"slices"
	"strings"

	"github.com/MostafaMagdSalama/vortex/parallel"
)

func main() {
	ctx := context.Background()
	words := slices.Values([]string{"hello", "world", "foo"})

	for v := range parallel.OrderedParallelMapSeq(ctx, words, strings.ToUpper, 2) {
		fmt.Println(v)
	}
}
Output:
HELLO
WORLD
FOO

func ParallelMap

func ParallelMap[T, U any](ctx context.Context, seq iter.Seq2[T, error], fn func(T) U, workers int) iter.Seq2[U, error]

ParallelMap processes each element concurrently with n workers. Errors from the underlying sequence are passed through untouched.

Example
package main

import (
	"context"
	"fmt"
	"iter"

	"github.com/MostafaMagdSalama/vortex/parallel"
)

func seq2FromSlice[T any](items []T) iter.Seq2[T, error] {
	return func(yield func(T, error) bool) {
		for _, item := range items {
			if !yield(item, nil) {
				return
			}
		}
	}
}

func main() {
	numbers := seq2FromSlice([]int{1, 2, 3})

	for v, err := range parallel.ParallelMap(context.Background(), numbers, func(n int) int {
		return n * 2
	}, 1) {
		if err != nil {
			fmt.Println("error:", err)
			continue
		}
		fmt.Println(v)
	}
}
Output:
2
4
6

func ParallelMapSeq added in v1.0.0

func ParallelMapSeq[T, U any](ctx context.Context, seq iter.Seq[T], fn func(T) U, workers int) iter.Seq[U]

ParallelMapSeq processes each element concurrently with n workers.

Example
package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/parallel"
)

func main() {
	numbers := slices.Values([]int{1, 2, 3})

	for v := range parallel.ParallelMapSeq(context.Background(), numbers, func(n int) int {
		return n * 2
	}, 1) {
		fmt.Println(v)
	}
}
Output:
2
4
6

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL