Documentation
¶
Overview ¶
Package sources provides lazy, context-aware iter.Seq values for common input sources such as readers, files, stdin, and SQL queries.
The sources package is useful when you want to turn external data into sequences that can be consumed directly or composed with the interx and parallel packages. It lets you process CSV rows, lines, and database results incrementally instead of loading entire inputs into memory up front.
Example:
ctx := context.Background()
input := strings.NewReader("name,age\nAlice,30\nBob,25\n")
for row := range sources.CSVRows(ctx, input) {
fmt.Println(row[0], row[1])
}
Any important notes about behaviour such as laziness, context cancellation, memory usage. These sequences read incrementally and stop immediately when the context is cancelled. File and database helpers acquire resources when iteration starts, and silent variants drop read or scan errors while the WithError variants surface them through iter.Seq2.
Index ¶
- func CSVRows(ctx context.Context, r io.Reader) iter.Seq[[]string]
- func CSVRowsWithError(ctx context.Context, r io.Reader) iter.Seq2[[]string, error]
- func DBRows[T any](ctx context.Context, db querier, query string, scan func(*sql.Rows) (T, error), ...) iter.Seq2[T, error]
- func FileLines(ctx context.Context, path string) iter.Seq[string]
- func FileLinesWithError(ctx context.Context, path string) iter.Seq2[string, error]
- func Lines(ctx context.Context, r io.Reader) iter.Seq[string]
- func LinesWithError(ctx context.Context, r io.Reader) iter.Seq2[string, error]
- func Stdin(ctx context.Context) iter.Seq[string]
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CSVRows ¶
CSVRows returns a lazy sequence of rows from a CSV reader.
Example ¶
package main
import (
"context"
"fmt"
"strings"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
input := strings.NewReader("name,age\nAlice,30\nBob,25\n")
for row := range sources.CSVRows(context.Background(), input) {
fmt.Println(row[0], row[1])
}
}
Output: name age Alice 30 Bob 25
Example (Filter) ¶
input := "name,email,status\nAlice,alice@example.com,active\nBob,bob@example.com,inactive\nCharlie,charlie@example.com,active\n"
r := strings.NewReader(input)
// skip header and filter active users
first := true
for row := range CSVRows(context.Background(), r) {
if first {
first = false
continue
}
if row[2] == "active" {
fmt.Println(row[0])
}
}
Output: Alice Charlie
Example (Pipeline) ¶
input := "name,email,status\nAlice,alice@example.com,active\nBob,bob@example.com,inactive\nCharlie,charlie@example.com,active\n"
r := strings.NewReader(input)
ctx := context.Background()
rows := CSVRows(ctx, r)
first := true
dataRows := interx.Filter(ctx, rows, func(row []string) bool {
if first {
first = false
return false
}
return true
})
names := interx.Map(ctx,
interx.Filter(ctx, dataRows, func(row []string) bool {
return row[2] == "active"
}),
func(row []string) string {
return row[0]
},
)
for name := range names {
fmt.Println(name)
}
Output: Alice Charlie
func CSVRowsWithError ¶
CSVRowsWithError is like CSVRows but surfaces read errors to the caller.
Example ¶
package main
import (
"context"
"fmt"
"strings"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
input := strings.NewReader("name,age\nAlice,30\nBob,25\n")
for row, err := range sources.CSVRowsWithError(context.Background(), input) {
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println(row[0], row[1])
}
}
Output: name age Alice 30 Bob 25
func DBRows ¶
func DBRows[T any](ctx context.Context, db querier, query string, scan func(*sql.Rows) (T, error), args ...any) iter.Seq2[T, error]
DBRows returns a lazy sequence of scanned rows from a SQL query. Accepts optional query arguments via variadic args. Always check the error — a non-nil error means iteration stopped early due to a scan failure, driver error, or context cancellation.
example without args:
for u, err := range sources.DBRows(ctx, db, "SELECT * FROM users", scan) {
if err != nil {
log.Println(err)
return
}
process(u)
}
example with args:
for u, err := range sources.DBRows(ctx, db,
"SELECT * FROM users WHERE status = ?",
scan,
"active",
) {
if err != nil {
log.Println(err)
return
}
process(u)
}
func FileLines ¶
FileLines opens a file and returns a lazy sequence of its lines.
Example ¶
package main
import (
"context"
"fmt"
"path/filepath"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
for line := range sources.FileLines(context.Background(), filepath.Join("testdata", "sample.txt")) {
fmt.Println(line)
}
}
Output: hello\nworld\nfoo
func FileLinesWithError ¶
FileLinesWithError is like FileLines but surfaces errors.
Example ¶
package main
import (
"context"
"fmt"
"path/filepath"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
for line, err := range sources.FileLinesWithError(context.Background(), filepath.Join("testdata", "sample.txt")) {
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println(line)
}
}
Output: hello\nworld\nfoo
func Lines ¶
Lines returns a lazy sequence of lines from any io.Reader.
Example ¶
package main
import (
"context"
"fmt"
"strings"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
input := strings.NewReader("line1\nline2\nline3\n")
for line := range sources.Lines(context.Background(), input) {
fmt.Println(line)
}
}
Output: line1 line2 line3
func LinesWithError ¶
LinesWithError is like Lines but surfaces read errors and oversized lines.
Example ¶
package main
import (
"context"
"fmt"
"strings"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
input := strings.NewReader("line1\nline2\n")
for line, err := range sources.LinesWithError(context.Background(), input) {
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println(line)
}
}
Output: line1 line2
func Stdin ¶
Stdin returns a lazy sequence of lines from standard input.
Example ¶
package main
import (
"context"
"fmt"
"os"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
tmpFile, err := os.CreateTemp("", "vortex-stdin-example-*")
if err != nil {
return
}
defer os.Remove(tmpFile.Name())
defer tmpFile.Close()
if _, err := tmpFile.WriteString("one\ntwo\n"); err != nil {
return
}
if _, err := tmpFile.Seek(0, 0); err != nil {
return
}
oldStdin := os.Stdin
os.Stdin = tmpFile
defer func() { os.Stdin = oldStdin }()
for line := range sources.Stdin(context.Background()) {
fmt.Println(line)
}
}
Output: one two
Types ¶
This section is empty.