vortex
A Go library for lazy iterators, parallel processing, and resilient pipelines.
Built on Go 1.23's iter.Seq — zero external dependencies.
Install
go get github.com/MostafaMagdSalama/vortex@latest
Requirements
Go 1.23 or later.
Packages
| Package |
What it does |
vortex/iter |
Lazy sequences — Filter, Map, Take, FlatMap |
vortex/parallel |
Parallel processing — ParallelMap, BatchMap, WorkerPoolMap |
vortex/reduce |
Aggregation — AsyncReduce, Fold, Collect |
vortex/resilience |
Fault tolerance — Retry, Backoff, CircuitBreaker |
vortex/sources |
Data sources — CSVRows, DBRows, Lines, FileLines |
Benchmarks
CSV file — 1,000,000 rows (Windows)
| Approach |
Peak memory |
Notes |
| Eager (load all) |
287 MB |
loads entire file into RAM |
| Lazy (vortex) |
3 MB |
one row at a time |
95x less memory with lazy processing.
file size eager peak vortex peak
────────── ────────── ───────────
1M rows 287 MB 3 MB
10M rows ~2.8 GB 3 MB
100M rows out of memory 3 MB
Database — 1,000,000 rows (Windows)
| Approach |
Peak memory |
Rows read |
Notes |
| Eager (load all) |
247 MB |
1,000,000 |
loads all rows before processing |
| Lazy (vortex) |
397 KB |
10 |
stops the moment it has what it needs |
636x less memory with lazy processing.
without vortex:
memory after loading all rows: 134 MB
memory after filtering: 204 MB
memory after extracting names: 247 MB
rows loaded: 1,000,000
with vortex:
memory after creating source: 393 KB
memory after defining filter: 393 KB
memory after defining map: 393 KB
memory after defining take: 393 KB
peak memory: 397 KB
rows read: 10 out of 1,000,000
The lazy approach stops reading from the database the moment
it has enough results — it never touches the remaining 999,990 rows.
Examples
Lazy filtering
import (
"slices"
"github.com/MostafaMagdSalama/vortex/iter"
)
numbers := slices.Values([]int{1, 2, 3, 4, 5})
for v := range iter.Filter(numbers, func(n int) bool { return n > 2 }) {
fmt.Println(v) // 3, 4, 5
}
Parallel processing
import (
"slices"
"github.com/MostafaMagdSalama/vortex/parallel"
)
numbers := slices.Values([]int{1, 2, 3, 4, 5})
for v := range parallel.ParallelMap(numbers, func(n int) int {
return n * 2
}, 4) {
fmt.Println(v) // 2, 4, 6, 8, 10 (unordered)
}
Batch processing
for v := range parallel.BatchMap(numbers, func(batch []int) []int {
results := make([]int, len(batch))
for i, v := range batch {
results[i] = v * 2
}
return results
}, 3) {
fmt.Println(v)
}
CSV pipeline
import (
"github.com/MostafaMagdSalama/vortex/iter"
"github.com/MostafaMagdSalama/vortex/sources"
)
file, _ := os.Open("users.csv")
defer file.Close()
for user := range iter.Filter(
sources.CSVRows(file),
func(row []string) bool { return row[3] == "active" },
) {
fmt.Println(user)
}
Database pipeline
import (
"github.com/MostafaMagdSalama/vortex/iter"
"github.com/MostafaMagdSalama/vortex/sources"
)
// reads one row at a time — stops as soon as Take is satisfied
names := iter.Map(
iter.Take(
iter.Filter(
sources.DBRows(db, "SELECT id, name, email, status FROM users", scanUser),
func(u User) bool { return u.Status == "active" },
),
5,
),
func(u User) string { return u.Name },
)
for name := range names {
fmt.Println(name)
}
Retry with backoff
import (
"context"
"github.com/MostafaMagdSalama/vortex/resilience"
)
err := resilience.Retry(context.Background(), resilience.DefaultRetry, func() error {
return callSomeAPI()
})
Circuit breaker
cb := resilience.NewCircuitBreaker(5, 10*time.Second)
err := cb.Execute(func() error {
return callSomeAPI()
})
if errors.Is(err, resilience.ErrCircuitOpen) {
// service is down, circuit is open
}
Composing retry + circuit breaker
cb := resilience.NewCircuitBreaker(5, 10*time.Second)
err := resilience.Retry(ctx, resilience.DefaultRetry, func() error {
return cb.Execute(func() error {
return callSomeAPI()
})
})
License
MIT