Documentation
¶
Overview ¶
Package parallel provides concurrent, context-aware sequence transforms for Go iter.Seq and iter.Seq2 values.
The parallel package is useful when each item in a sequence needs independent CPU or I/O work and you want to spread that work across a fixed number of workers. It keeps the same lazy iteration style as the rest of the library while letting callers control concurrency with standard context cancellation.
Use ParallelMapSeq, BatchMapSeq, and OrderedParallelMapSeq with plain iter.Seq[T] values such as slices.Values(...). Use ParallelMap, BatchMap, and OrderedParallelMap with iter.Seq2[T, error] values so source errors can flow through the pipeline instead of being dropped.
Example:
ctx := context.Background()
numbers := slices.Values([]int{1, 2, 3, 4})
for v := range parallel.ParallelMapSeq(ctx, numbers, func(n int) int {
return n * 2
}, 2) {
fmt.Println(v)
}
ParallelMapSeq and ParallelMap may yield results out of order when multiple workers are used, and they stop quickly when the context is cancelled or the consumer stops early. BatchMapSeq and BatchMap preserve batch order and do not spin up worker goroutines by themselves.
Index ¶
- func BatchMap[T, U any](ctx context.Context, seq iter.Seq2[T, error], fn func([]T) []U, batchSize int) iter.Seq2[U, error]
- func BatchMapSeq[T, U any](ctx context.Context, seq iter.Seq[T], fn func([]T) []U, batchSize int) iter.Seq[U]
- func OrderedParallelMap[T, U any](ctx context.Context, seq iter.Seq2[T, error], fn func(T) U, workers int) iter.Seq2[U, error]
- func OrderedParallelMapSeq[T, U any](ctx context.Context, seq iter.Seq[T], fn func(T) U, workers int) iter.Seq[U]
- func ParallelMap[T, U any](ctx context.Context, seq iter.Seq2[T, error], fn func(T) U, workers int) iter.Seq2[U, error]
- func ParallelMapSeq[T, U any](ctx context.Context, seq iter.Seq[T], fn func(T) U, workers int) iter.Seq[U]
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BatchMap ¶
func BatchMap[T, U any](ctx context.Context, seq iter.Seq2[T, error], fn func([]T) []U, batchSize int) iter.Seq2[U, error]
BatchMap groups items into batches of size n and processes each batch. Errors from the underlying sequence are yielded inline and do not enter batches.
func BatchMapSeq ¶ added in v1.0.0
func BatchMapSeq[T, U any](ctx context.Context, seq iter.Seq[T], fn func([]T) []U, batchSize int) iter.Seq[U]
BatchMapSeq groups items into batches of size n and processes each batch.
Example ¶
package main
import (
"context"
"fmt"
"slices"
"github.com/MostafaMagdSalama/vortex/parallel"
)
func main() {
numbers := slices.Values([]int{1, 2, 3, 4})
for v := range parallel.BatchMapSeq(context.Background(), numbers, func(batch []int) []int {
out := make([]int, len(batch))
for i, n := range batch {
out[i] = n * 10
}
return out
}, 2) {
fmt.Println(v)
}
}
Output: 10 20 30 40
func OrderedParallelMap ¶ added in v0.1.8
func OrderedParallelMap[T, U any]( ctx context.Context, seq iter.Seq2[T, error], fn func(T) U, workers int, ) iter.Seq2[U, error]
OrderedParallelMap applies fn to each element of seq concurrently using worker goroutines and yields results and errors in the original input order.
func OrderedParallelMapSeq ¶ added in v1.0.0
func OrderedParallelMapSeq[T, U any]( ctx context.Context, seq iter.Seq[T], fn func(T) U, workers int, ) iter.Seq[U]
OrderedParallelMapSeq applies fn to each element of seq concurrently using workers goroutines and yields results in the original input order.
Example ¶
package main
import (
"context"
"fmt"
"slices"
"github.com/MostafaMagdSalama/vortex/parallel"
)
func main() {
ctx := context.Background()
numbers := slices.Values([]int{1, 2, 3, 4, 5})
for v := range parallel.OrderedParallelMapSeq(ctx, numbers, func(n int) int {
return n * 2
}, 3) {
fmt.Println(v)
}
}
Output: 2 4 6 8 10
Example (Strings) ¶
package main
import (
"context"
"fmt"
"slices"
"strings"
"github.com/MostafaMagdSalama/vortex/parallel"
)
func main() {
ctx := context.Background()
words := slices.Values([]string{"hello", "world", "foo"})
for v := range parallel.OrderedParallelMapSeq(ctx, words, strings.ToUpper, 2) {
fmt.Println(v)
}
}
Output: HELLO WORLD FOO
func ParallelMap ¶
func ParallelMap[T, U any](ctx context.Context, seq iter.Seq2[T, error], fn func(T) U, workers int) iter.Seq2[U, error]
ParallelMap processes each element concurrently with n workers. Errors from the underlying sequence are passed through untouched.
Example ¶
package main
import (
"context"
"fmt"
"iter"
"github.com/MostafaMagdSalama/vortex/parallel"
)
func seq2FromSlice[T any](items []T) iter.Seq2[T, error] {
return func(yield func(T, error) bool) {
for _, item := range items {
if !yield(item, nil) {
return
}
}
}
}
func main() {
numbers := seq2FromSlice([]int{1, 2, 3})
for v, err := range parallel.ParallelMap(context.Background(), numbers, func(n int) int {
return n * 2
}, 1) {
if err != nil {
fmt.Println("error:", err)
continue
}
fmt.Println(v)
}
}
Output: 2 4 6
func ParallelMapSeq ¶ added in v1.0.0
func ParallelMapSeq[T, U any](ctx context.Context, seq iter.Seq[T], fn func(T) U, workers int) iter.Seq[U]
ParallelMapSeq processes each element concurrently with n workers.
Example ¶
package main
import (
"context"
"fmt"
"slices"
"github.com/MostafaMagdSalama/vortex/parallel"
)
func main() {
numbers := slices.Values([]int{1, 2, 3})
for v := range parallel.ParallelMapSeq(context.Background(), numbers, func(n int) int {
return n * 2
}, 1) {
fmt.Println(v)
}
}
Output: 2 4 6
Types ¶
This section is empty.