sources

package
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package sources provides lazy, context-aware iter.Seq values for common input sources such as readers, files, stdin, and SQL queries.

The sources package is useful when you want to turn external data into sequences that can be consumed directly or composed with the interx and parallel packages. It lets you process CSV rows, lines, and database results incrementally instead of loading entire inputs into memory up front.

Example:

ctx := context.Background()
input := strings.NewReader("name,age\nAlice,30\nBob,25\n")

for row := range sources.CSVRows(ctx, input) {
	fmt.Println(row[0], row[1])
}

Any important notes about behaviour such as laziness, context cancellation, memory usage. These sequences read incrementally and stop immediately when the context is cancelled. File and database helpers acquire resources when iteration starts, and silent variants drop read or scan errors while the WithError variants surface them through iter.Seq2.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func CSVRows

func CSVRows(ctx context.Context, r io.Reader) iter.Seq[[]string]

CSVRows returns a lazy sequence of rows from a CSV reader.

Example
package main

import (
	"context"
	"fmt"
	"strings"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	input := strings.NewReader("name,age\nAlice,30\nBob,25\n")

	for row := range sources.CSVRows(context.Background(), input) {
		fmt.Println(row[0], row[1])
	}
}
Output:
name age
Alice 30
Bob 25
Example (Filter)
input := "name,email,status\nAlice,alice@example.com,active\nBob,bob@example.com,inactive\nCharlie,charlie@example.com,active\n"
r := strings.NewReader(input)

// skip header and filter active users
first := true
for row := range CSVRows(context.Background(), r) {
	if first {
		first = false
		continue
	}
	if row[2] == "active" {
		fmt.Println(row[0])
	}
}
Output:
Alice
Charlie
Example (Pipeline)
input := "name,email,status\nAlice,alice@example.com,active\nBob,bob@example.com,inactive\nCharlie,charlie@example.com,active\n"
r := strings.NewReader(input)

ctx := context.Background()

rows := CSVRows(ctx, r)

first := true
dataRows := interx.Filter(ctx, rows, func(row []string) bool {
	if first {
		first = false
		return false
	}
	return true
})

names := interx.Map(ctx,
	interx.Filter(ctx, dataRows, func(row []string) bool {
		return row[2] == "active"
	}),
	func(row []string) string {
		return row[0]
	},
)

for name := range names {
	fmt.Println(name)
}
Output:
Alice
Charlie

func CSVRowsWithError

func CSVRowsWithError(ctx context.Context, r io.Reader) iter.Seq2[[]string, error]

CSVRowsWithError is like CSVRows but surfaces read errors to the caller.

Example
package main

import (
	"context"
	"fmt"
	"strings"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	input := strings.NewReader("name,age\nAlice,30\nBob,25\n")

	for row, err := range sources.CSVRowsWithError(context.Background(), input) {
		if err != nil {
			fmt.Println("error:", err)
			return
		}
		fmt.Println(row[0], row[1])
	}
}
Output:
name age
Alice 30
Bob 25

func DBRows

func DBRows[T any](ctx context.Context, db querier, query string, scan func(*sql.Rows) (T, error), args ...any) iter.Seq2[T, error]

DBRows returns a lazy sequence of scanned rows from a SQL query. Accepts optional query arguments via variadic args. Always check the error — a non-nil error means iteration stopped early due to a scan failure, driver error, or context cancellation.

example without args:

for u, err := range sources.DBRows(ctx, db, "SELECT * FROM users", scan) {
    if err != nil {
        log.Println(err)
        return
    }
    process(u)
}

example with args:

for u, err := range sources.DBRows(ctx, db,
    "SELECT * FROM users WHERE status = ?",
    scan,
    "active",
) {
    if err != nil {
        log.Println(err)
        return
    }
    process(u)
}

func FileLines

func FileLines(ctx context.Context, path string) iter.Seq[string]

FileLines opens a file and returns a lazy sequence of its lines.

Example
package main

import (
	"context"
	"fmt"
	"path/filepath"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	for line := range sources.FileLines(context.Background(), filepath.Join("testdata", "sample.txt")) {
		fmt.Println(line)
	}
}
Output:
hello\nworld\nfoo

func FileLinesWithError

func FileLinesWithError(ctx context.Context, path string) iter.Seq2[string, error]

FileLinesWithError is like FileLines but surfaces errors.

Example
package main

import (
	"context"
	"fmt"
	"path/filepath"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	for line, err := range sources.FileLinesWithError(context.Background(), filepath.Join("testdata", "sample.txt")) {
		if err != nil {
			fmt.Println("error:", err)
			return
		}
		fmt.Println(line)
	}
}
Output:
hello\nworld\nfoo

func Lines

func Lines(ctx context.Context, r io.Reader) iter.Seq[string]

Lines returns a lazy sequence of lines from any io.Reader.

Example
package main

import (
	"context"
	"fmt"
	"strings"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	input := strings.NewReader("line1\nline2\nline3\n")

	for line := range sources.Lines(context.Background(), input) {
		fmt.Println(line)
	}
}
Output:
line1
line2
line3

func LinesWithError

func LinesWithError(ctx context.Context, r io.Reader) iter.Seq2[string, error]

LinesWithError is like Lines but surfaces read errors and oversized lines.

Example
package main

import (
	"context"
	"fmt"
	"strings"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	input := strings.NewReader("line1\nline2\n")

	for line, err := range sources.LinesWithError(context.Background(), input) {
		if err != nil {
			fmt.Println("error:", err)
			return
		}
		fmt.Println(line)
	}
}
Output:
line1
line2

func Stdin

func Stdin(ctx context.Context) iter.Seq[string]

Stdin returns a lazy sequence of lines from standard input.

Example
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/MostafaMagdSalama/vortex/sources"
)

func main() {
	tmpFile, err := os.CreateTemp("", "vortex-stdin-example-*")
	if err != nil {
		return
	}
	defer os.Remove(tmpFile.Name())
	defer tmpFile.Close()

	if _, err := tmpFile.WriteString("one\ntwo\n"); err != nil {
		return
	}
	if _, err := tmpFile.Seek(0, 0); err != nil {
		return
	}

	oldStdin := os.Stdin
	os.Stdin = tmpFile
	defer func() { os.Stdin = oldStdin }()

	for line := range sources.Stdin(context.Background()) {
		fmt.Println(line)
	}
}
Output:
one
two

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL