iterx

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package iterx provides lazy, context-aware sequence transformations for Go iter.Seq and iter.Seq2 values.

The iterx package is useful when you want to build allocation-light data pipelines over slices, files, database rows, or generated streams without materializing every intermediate result. Each helper returns a new lazy sequence or consumes one directly, so work only happens when the caller ranges over the final sequence.

API split

iterx provides two variants of every function:

  • Plain variants (FilterSeq, MapSeq, TakeSeq, ...) accept iter.Seq[T] and are suitable for slices, custom generators, or any source that does not yield errors.

  • Error-aware variants (Filter, Map, Take, ...) accept iter.Seq2[T, error] and are suitable for pipelines that start from vortex/sources. Errors pass through the pipeline untouched — fn is only called on valid values.

Plain iter.Seq example

ctx := context.Background()
numbers := slices.Values([]int{1, 2, 3, 4, 5})

for v := range iterx.FilterSeq(ctx, numbers, func(n int) bool {
	return n%2 == 0
}) {
	fmt.Println(v) // 2, 4
}

iter.Seq2 pipeline example (with vortex/sources)

ctx := context.Background()
file, _ := os.Open("users.csv")
defer file.Close()

rows     := sources.CSVRows(ctx, file)
filtered := iterx.Filter(ctx, rows, func(row []string) bool { return row[2] == "active" })
taken    := iterx.Take(ctx, filtered, 10)

for row, err := range taken {
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(row)
}

Memory notes

All functions stream values one at a time except Reverse, which buffers the full sequence in memory before yielding results. Reverse is not suitable for infinite or very large sequences.

Context cancellation

All functions check ctx.Err() at every iteration. Pipelines cancel cleanly the moment the context is cancelled — no goroutines are leaked and no extra items are processed.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Chunk

func Chunk[T any](ctx context.Context, seq iter.Seq2[T, error], n int) iter.Seq2[[]T, error]

Chunk splits a sequence into slices of size n. Errors from the underlying sequence are passed through untouched, and they yield an empty batch with the error.

Example

ExampleChunk demonstrates how to process data in batches, such as for bulk database inserts or writing to chunked APIs.

package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

func main() {
	ctx := context.Background()
	logs := slices.Values([]string{"log1", "log2", "log3", "log4", "log5"})

	for batch := range iterx.ChunkSeq(ctx, logs, 2) {
		fmt.Printf("Batch size: %d, items: %v\n", len(batch), batch)
	}
}
Output:
Batch size: 2, items: [log1 log2]
Batch size: 2, items: [log3 log4]
Batch size: 1, items: [log5]

func ChunkSeq added in v1.0.0

func ChunkSeq[T any](ctx context.Context, seq iter.Seq[T], n int) iter.Seq[[]T]

ChunkSeq splits a sequence into slices of size n.

func Contains

func Contains[T comparable](ctx context.Context, seq iter.Seq2[T, error], target T) (bool, error)

Contains returns true if the sequence contains the target value. It returns an error if the context is canceled or the underlying sequence yields an error.

Example

ExampleContains demonstrates checking if an item exists efficiently, as the iterator stops processing as soon as it finds a match.

package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

func main() {
	ctx := context.Background()
	users := slices.Values([]string{"alice", "bob", "charlie", "admin"})

	hasAdmin := iterx.ContainsSeq(ctx, users, "admin")
	fmt.Println("Has admin:", hasAdmin)
}
Output:
Has admin: true

func ContainsSeq added in v1.0.0

func ContainsSeq[T comparable](ctx context.Context, seq iter.Seq[T], target T) bool

ContainsSeq returns true if the sequence contains the target value.

func Distinct

func Distinct[T comparable](ctx context.Context, seq iter.Seq2[T, error]) iter.Seq2[T, error]

Distinct filters out duplicate values keeping only the first occurrence. Errors from the underlying sequence are passed through untouched.

Example

ExampleDistinct demonstrates removing duplicates from an incoming stream, such as a sequence of IP addresses.

package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

func main() {
	ctx := context.Background()
	ipAddresses := slices.Values([]string{"192.168.1.1", "10.0.0.1", "192.168.1.1", "10.0.0.2"})

	for ip := range iterx.DistinctSeq(ctx, ipAddresses) {
		fmt.Println(ip)
	}
}
Output:
192.168.1.1
10.0.0.1
10.0.0.2

func DistinctSeq added in v1.0.0

func DistinctSeq[T comparable](ctx context.Context, seq iter.Seq[T]) iter.Seq[T]

DistinctSeq filters out duplicate values keeping only the first occurrence.

func Drain

func Drain[T any](ctx context.Context, seq iter.Seq2[T, error], fn func(T) error) error

Drain consumes a sequence and calls fn for each item. Stops immediately if ctx is cancelled, fn returns an error, or the underlying sequence yields an error. Use Drain when your terminal operation can fail — writing to CSV, DB, files. Use ForEach when your terminal operation cannot fail — logging, printing.

Example

ExampleDrain demonstrates exhausting a sequence when the terminal operation can fail, like writing to an io.Writer.

package main

import (
	"context"
	"fmt"
	"slices"
	"strings"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

func main() {
	ctx := context.Background()
	lines := slices.Values([]string{"header", "data 1", "data 2"})

	// Simulate writing to something that could fail
	var out strings.Builder

	err := iterx.DrainSeq(ctx, lines, func(line string) error {
		_, err := out.WriteString(line + "\n")
		return err // stops early if err != nil
	})

	fmt.Printf("Error: %v, Output:\n%s", err, out.String())
}
Output:
Error: <nil>, Output:
header
data 1
data 2

func DrainSeq added in v1.0.0

func DrainSeq[T any](ctx context.Context, seq iter.Seq[T], fn func(T) error) error

DrainSeq consumes a sequence and calls fn for each item. Stops immediately if ctx is cancelled or fn returns an error. Use DrainSeq when your terminal operation can fail — writing to CSV, DB, files. Use ForEachSeq when your terminal operation cannot fail — logging, printing.

func Filter

func Filter[T any](ctx context.Context, seq iter.Seq2[T, error], fn func(T) bool) iter.Seq2[T, error]

Filter returns a new sequence containing only elements where fn returns true. Errors from the underlying sequence are passed through untouched.

Example

ExampleFilter demonstrates removing unwanted items from a stream.

package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

func main() {
	ctx := context.Background()
	numbers := slices.Values([]int{1, 2, 3, 4, 5, 6})

	evens := iterx.FilterSeq(ctx, numbers, func(n int) bool {
		return n%2 == 0
	})

	for v := range evens {
		fmt.Println(v)
	}
}
Output:
2
4
6

func FilterSeq added in v1.0.0

func FilterSeq[T any](ctx context.Context, seq iter.Seq[T], fn func(T) bool) iter.Seq[T]

FilterSeq returns a new sequence containing only elements where fn returns true.

func FlatMap

func FlatMap[T, U any](ctx context.Context, seq iter.Seq2[T, error], fn func(T) iter.Seq2[U, error]) iter.Seq2[U, error]

FlatMap transforms each element into a sequence, then flattens all sequences. Errors from the underlying sequence or inner sequences are passed through.

Example

ExampleFlatMap demonstrates expanding a single item into multiple items. In this case: expanding a user into a stream of their roles.

package main

import (
	"context"
	"fmt"
	"iter"
	"slices"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

type DemoUser struct {
	Name  string
	Roles []string
}

func main() {
	ctx := context.Background()
	users := slices.Values([]DemoUser{
		{Name: "Alice", Roles: []string{"admin", "editor"}},
		{Name: "Bob", Roles: []string{"viewer"}},
	})

	roles := iterx.FlatMapSeq(ctx, users, func(u DemoUser) iter.Seq[string] {
		return slices.Values(u.Roles)
	})

	for role := range roles {
		fmt.Println(role)
	}
}
Output:
admin
editor
viewer

func FlatMapSeq added in v1.0.0

func FlatMapSeq[T, U any](ctx context.Context, seq iter.Seq[T], fn func(T) iter.Seq[U]) iter.Seq[U]

FlatMapSeq transforms each element into a sequence, then flattens all sequences.

func Flatten

func Flatten[T any](ctx context.Context, seq iter.Seq2[[]T, error]) iter.Seq2[T, error]

Flatten converts a sequence of slices into a flat sequence of elements. Errors from the underlying sequence are passed through untouched.

Example

ExampleFlatten demonstrates unrolling an iterator of slices into a flat iterator.

package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

func main() {
	ctx := context.Background()
	batches := slices.Values([][]int{
		{1, 2},
		{3},
		{4, 5, 6},
	})

	for v := range iterx.FlattenSeq(ctx, batches) {
		fmt.Println(v)
	}
}
Output:
1
2
3
4
5
6

func FlattenSeq added in v1.0.0

func FlattenSeq[T any](ctx context.Context, seq iter.Seq[[]T]) iter.Seq[T]

FlattenSeq converts a sequence of slices into a flat sequence of elements.

func ForEach

func ForEach[T any](ctx context.Context, seq iter.Seq2[T, error], fn func(T)) error

ForEach calls fn for every element in the sequence. Stops immediately and returns the error if the context is cancelled or the underlying sequence yields an error.

Example

ExampleForEach demonstrates running a non-failing side effect on every item.

package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

func main() {
	ctx := context.Background()
	messages := slices.Values([]string{"hello", "world"})

	iterx.ForEachSeq(ctx, messages, func(msg string) {
		fmt.Println("Processed:", msg)
	})
}
Output:
Processed: hello
Processed: world

func ForEachSeq added in v1.0.0

func ForEachSeq[T any](ctx context.Context, seq iter.Seq[T], fn func(T))

ForEachSeq calls fn for every element in the sequence.

func Map

func Map[T, U any](ctx context.Context, seq iter.Seq2[T, error], fn func(T) U) iter.Seq2[U, error]

Map transforms each element using fn. Errors from the underlying sequence are passed through untouched.

Example

ExampleMap demonstrates transforming items from one type to another.

package main

import (
	"context"
	"fmt"
	"slices"
	"strings"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

type DemoUser struct {
	Name  string
	Roles []string
}

func main() {
	ctx := context.Background()
	users := slices.Values([]DemoUser{
		{Name: "Alice"},
		{Name: "Bob"},
	})

	names := iterx.MapSeq(ctx, users, func(u DemoUser) string {
		return strings.ToUpper(u.Name) // U is string
	})

	for name := range names {
		fmt.Println(name)
	}
}
Output:
ALICE
BOB

func MapSeq added in v1.0.0

func MapSeq[T, U any](ctx context.Context, seq iter.Seq[T], fn func(T) U) iter.Seq[U]

MapSeq transforms each element using fn.

func Reverse

func Reverse[T any](ctx context.Context, seq iter.Seq2[T, error]) iter.Seq2[T, error]

Reverse collects the sequence into memory and yields it in reverse order. Any errors encountered during collection or yielding are passed through inline.

Example

ExampleReverse demonstrates yielding the sequence in reverse order.

package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

func main() {
	ctx := context.Background()
	steps := slices.Values([]string{"step 1", "step 2", "step 3"})

	for step := range iterx.ReverseSeq(ctx, steps) {
		fmt.Println(step)
	}
}
Output:
step 3
step 2
step 1

func ReverseSeq added in v1.0.0

func ReverseSeq[T any](ctx context.Context, seq iter.Seq[T]) iter.Seq[T]

ReverseSeq collects the sequence into memory and yields it in reverse order.

func Take

func Take[T any](ctx context.Context, seq iter.Seq2[T, error], n int) iter.Seq2[T, error]

Take returns the first n elements. Errors from the underlying sequence are passed through untouched, and do not count towards n.

Example

ExampleTake demonstrates grabbing just the first N elements. Particularly useful with endless streams or large files.

package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

func main() {
	ctx := context.Background()
	items := slices.Values([]int{10, 20, 30, 40, 50, 60})

	top3 := iterx.TakeSeq(ctx, items, 3)

	for v := range top3 {
		fmt.Println(v)
	}
}
Output:
10
20
30

func TakeSeq added in v1.0.0

func TakeSeq[T any](ctx context.Context, seq iter.Seq[T], n int) iter.Seq[T]

TakeSeq returns the first n elements.

func TakeWhile

func TakeWhile[T any](ctx context.Context, seq iter.Seq2[T, error], fn func(T) bool) iter.Seq2[T, error]

TakeWhile yields values from the sequence as long as fn returns true. Iteration stops as soon as fn evaluates to false for the first time. Errors from the underlying sequence are passed through untouched and do not cause stopping.

Example

ExampleTakeWhile demonstrates reading a stream until a condition stops being met. For instance: reading lines from a log until a marker is reached.

package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

func main() {
	ctx := context.Background()
	logs := slices.Values([]string{"ok", "ok", "ok", "error", "ok"})

	for v := range iterx.TakeWhileSeq(ctx, logs, func(s string) bool {
		return s != "error"
	}) {
		fmt.Println(v)
	}
}
Output:
ok
ok
ok

func TakeWhileSeq added in v1.0.0

func TakeWhileSeq[T any](ctx context.Context, seq iter.Seq[T], fn func(T) bool) iter.Seq[T]

TakeWhileSeq yields values from the sequence as long as fn returns true. Iteration stops as soon as fn evaluates to false for the first time.

func Validate

func Validate[T any](ctx context.Context, seq iter.Seq2[T, error], fn func(T) (bool, string), onError func(ValidationError[T])) iter.Seq2[T, error]

Validate conditionally streams elements by evaluating fn(item). If fn yields {false, reason}, the onError callback is triggered with a ValidationError, and the element is discarded from the resulting sequence.

Example

ExampleValidate demonstrates checking structures and extracting validation metadata on failing entries without stopping the processing pipeline.

package main

import (
	"context"
	"fmt"
	"slices"
	"strings"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

func main() {
	ctx := context.Background()
	emails := slices.Values([]string{"test@example.com", "invalid-email", "hello@world.com"})

	valid := iterx.ValidateSeq(ctx, emails,
		func(email string) (bool, string) {
			if !strings.Contains(email, "@") {
				return false, "missing @"
			}
			return true, ""
		},
		func(err iterx.ValidationError[string]) {
			fmt.Printf("Validation error: %s - %s\n", err.Item, err.Reason)
		},
	)

	for email := range valid {
		fmt.Println("Processed valid email:", email)
	}
}
Output:
Processed valid email: test@example.com
Validation error: invalid-email - missing @
Processed valid email: hello@world.com

func ValidateSeq added in v1.0.0

func ValidateSeq[T any](ctx context.Context, seq iter.Seq[T], fn func(T) (bool, string), onError func(ValidationError[T])) iter.Seq[T]

ValidateSeq conditionally streams elements by evaluating fn(item). If fn yields {false, reason}, the onError callback is triggered with a ValidationError, and the element is discarded from the resulting sequence.

func Zip

func Zip[A, B any](ctx context.Context, a iter.Seq2[A, error], b iter.Seq2[B, error]) iter.Seq2[[2]any, error]

Zip combines two sequences into pairs, yielding [2]any{a, b} for each corresponding valid element. It skips errors from both sequences independently, yielding the errors directly in the stream. It stops as soon as either sequence runs out of valid elements safely.

Example

ExampleZip demonstrates combining two synchronised streams into pairs.

package main

import (
	"context"
	"fmt"
	"slices"

	"github.com/MostafaMagdSalama/vortex/iterx"
)

func main() {
	ctx := context.Background()
	keys := slices.Values([]string{"A", "B", "C"})
	values := slices.Values([]int{100, 200}) // Notice: shorter sequence!

	for pair := range iterx.ZipSeq(ctx, keys, values) {
		fmt.Printf("%v: %v\n", pair[0], pair[1])
	}
}
Output:
A: 100
B: 200

func ZipSeq added in v1.0.0

func ZipSeq[A, B any](ctx context.Context, a iter.Seq[A], b iter.Seq[B]) iter.Seq[[2]any]

ZipSeq combines two sequences into pairs, yielding [2]any{a, b} for each corresponding element. It stops as soon as the shortest sequence runs out.

Types

type ValidationError

type ValidationError[T any] struct {
	Item   T
	Reason string
}

ValidationError represents an item that failed validation.

func (ValidationError[T]) Error added in v1.0.0

func (e ValidationError[T]) Error() string

Error implements the error interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL