Documentation
¶
Overview ¶
Package sources provides lazy, context-aware iter.Seq values for common input sources such as readers, files, stdin, and SQL queries.
The sources package is useful when you want to turn external data into sequences that can be consumed directly or composed with the iter and parallel packages. It lets you process CSV rows, lines, and database results incrementally instead of loading entire inputs into memory up front.
Example:
ctx := context.Background()
input := strings.NewReader("name,age\nAlice,30\nBob,25\n")
for row := range sources.CSVRows(ctx, input) {
fmt.Println(row[0], row[1])
}
Any important notes about behaviour such as laziness, context cancellation, memory usage. These sequences read incrementally and stop immediately when the context is cancelled. File and database helpers acquire resources when iteration starts, and silent variants drop read or scan errors while the WithError variants surface them through iter.Seq2.
Index ¶
- func CSVRows(ctx context.Context, r io.Reader) iter.Seq[[]string]
- func CSVRowsWithError(ctx context.Context, r io.Reader) iter.Seq2[[]string, error]
- func DBRows[T any](ctx context.Context, db querier, query string, scan func(*sql.Rows) (T, error)) iter.Seq[T]
- func DBRowsWithArgs[T any](ctx context.Context, db querier, query string, args []any, ...) iter.Seq[T]
- func FileLines(ctx context.Context, path string) iter.Seq[string]
- func FileLinesWithError(ctx context.Context, path string) iter.Seq2[string, error]
- func Lines(ctx context.Context, r io.Reader) iter.Seq[string]
- func LinesWithError(ctx context.Context, r io.Reader) iter.Seq2[string, error]
- func Stdin(ctx context.Context) iter.Seq[string]
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CSVRows ¶
CSVRows returns a lazy sequence of rows from a CSV reader.
Example ¶
package main
import (
"context"
"fmt"
"strings"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
input := strings.NewReader("name,age\nAlice,30\nBob,25\n")
for row := range sources.CSVRows(context.Background(), input) {
fmt.Println(row[0], row[1])
}
}
Output: name age Alice 30 Bob 25
func CSVRowsWithError ¶
CSVRowsWithError is like CSVRows but surfaces read errors to the caller.
Example ¶
package main
import (
"context"
"fmt"
"strings"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
input := strings.NewReader("name,age\nAlice,30\nBob,25\n")
for row, err := range sources.CSVRowsWithError(context.Background(), input) {
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println(row[0], row[1])
}
}
Output: name age Alice 30 Bob 25
func DBRows ¶
func DBRows[T any](ctx context.Context, db querier, query string, scan func(*sql.Rows) (T, error)) iter.Seq[T]
DBRows returns a lazy sequence of scanned rows from a SQL query.
Example ¶
package main
import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"io"
"sync"
"github.com/MostafaMagdSalama/vortex/sources"
)
type stubDriver struct{}
type stubConn struct{}
type stubRows struct {
columns []string
data [][]driver.Value
index int
}
var (
registerStubDriver sync.Once
queryCount int
queryArgsCount int
)
func (stubDriver) Open(name string) (driver.Conn, error) {
return stubConn{}, nil
}
func (stubConn) Prepare(query string) (driver.Stmt, error) { return nil, nil }
func (stubConn) Close() error { return nil }
func (stubConn) Begin() (driver.Tx, error) { return nil, nil }
func (stubConn) QueryContext(_ context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
switch query {
case "SELECT name FROM users":
queryCount++
return &stubRows{
columns: []string{"name"},
data: [][]driver.Value{
{"alice"},
{"bob"},
},
}, nil
case "SELECT value FROM numbers WHERE kind = ?":
queryArgsCount++
return &stubRows{
columns: []string{"value"},
data: [][]driver.Value{
{1},
{2},
{3},
},
}, nil
default:
return &stubRows{columns: []string{"value"}}, nil
}
}
func (r *stubRows) Columns() []string {
return r.columns
}
func (r *stubRows) Close() error {
return nil
}
func (r *stubRows) Next(dest []driver.Value) error {
if r.index >= len(r.data) {
return io.EOF
}
copy(dest, r.data[r.index])
r.index++
return nil
}
func openExampleDB() (*sql.DB, error) {
registerStubDriver.Do(func() {
sql.Register("vortex_stub", stubDriver{})
})
return sql.Open("vortex_stub", "")
}
func main() {
db, err := openExampleDB()
if err != nil {
return
}
defer db.Close()
for name := range sources.DBRows(context.Background(), db, "SELECT name FROM users", func(rows *sql.Rows) (string, error) {
var name string
return name, rows.Scan(&name)
}) {
fmt.Println(name)
}
}
Output: alice bob
func DBRowsWithArgs ¶
func DBRowsWithArgs[T any](ctx context.Context, db querier, query string, args []any, scan func(*sql.Rows) (T, error)) iter.Seq[T]
DBRowsWithArgs is like DBRows but accepts query arguments.
Example ¶
package main
import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"io"
"sync"
"github.com/MostafaMagdSalama/vortex/sources"
)
type stubDriver struct{}
type stubConn struct{}
type stubRows struct {
columns []string
data [][]driver.Value
index int
}
var (
registerStubDriver sync.Once
queryCount int
queryArgsCount int
)
func (stubDriver) Open(name string) (driver.Conn, error) {
return stubConn{}, nil
}
func (stubConn) Prepare(query string) (driver.Stmt, error) { return nil, nil }
func (stubConn) Close() error { return nil }
func (stubConn) Begin() (driver.Tx, error) { return nil, nil }
func (stubConn) QueryContext(_ context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
switch query {
case "SELECT name FROM users":
queryCount++
return &stubRows{
columns: []string{"name"},
data: [][]driver.Value{
{"alice"},
{"bob"},
},
}, nil
case "SELECT value FROM numbers WHERE kind = ?":
queryArgsCount++
return &stubRows{
columns: []string{"value"},
data: [][]driver.Value{
{1},
{2},
{3},
},
}, nil
default:
return &stubRows{columns: []string{"value"}}, nil
}
}
func (r *stubRows) Columns() []string {
return r.columns
}
func (r *stubRows) Close() error {
return nil
}
func (r *stubRows) Next(dest []driver.Value) error {
if r.index >= len(r.data) {
return io.EOF
}
copy(dest, r.data[r.index])
r.index++
return nil
}
func openExampleDB() (*sql.DB, error) {
registerStubDriver.Do(func() {
sql.Register("vortex_stub", stubDriver{})
})
return sql.Open("vortex_stub", "")
}
func main() {
db, err := openExampleDB()
if err != nil {
return
}
defer db.Close()
for value := range sources.DBRowsWithArgs(context.Background(), db, "SELECT value FROM numbers WHERE kind = ?", []any{"even"}, func(rows *sql.Rows) (int, error) {
var value int
return value, rows.Scan(&value)
}) {
fmt.Println(value)
}
}
Output: 1 2 3
func FileLines ¶
FileLines opens a file and returns a lazy sequence of its lines.
Example ¶
package main
import (
"context"
"fmt"
"path/filepath"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
for line := range sources.FileLines(context.Background(), filepath.Join("testdata", "sample.txt")) {
fmt.Println(line)
}
}
Output: hello\nworld\nfoo
func FileLinesWithError ¶
FileLinesWithError is like FileLines but surfaces errors.
Example ¶
package main
import (
"context"
"fmt"
"path/filepath"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
for line, err := range sources.FileLinesWithError(context.Background(), filepath.Join("testdata", "sample.txt")) {
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println(line)
}
}
Output: hello\nworld\nfoo
func Lines ¶
Lines returns a lazy sequence of lines from any io.Reader.
Example ¶
package main
import (
"context"
"fmt"
"strings"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
input := strings.NewReader("line1\nline2\nline3\n")
for line := range sources.Lines(context.Background(), input) {
fmt.Println(line)
}
}
Output: line1 line2 line3
func LinesWithError ¶
LinesWithError is like Lines but surfaces read errors and oversized lines.
Example ¶
package main
import (
"context"
"fmt"
"strings"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
input := strings.NewReader("line1\nline2\n")
for line, err := range sources.LinesWithError(context.Background(), input) {
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println(line)
}
}
Output: line1 line2
func Stdin ¶
Stdin returns a lazy sequence of lines from standard input.
Example ¶
package main
import (
"context"
"fmt"
"os"
"github.com/MostafaMagdSalama/vortex/sources"
)
func main() {
tmpFile, err := os.CreateTemp("", "vortex-stdin-example-*")
if err != nil {
return
}
defer os.Remove(tmpFile.Name())
defer tmpFile.Close()
if _, err := tmpFile.WriteString("one\ntwo\n"); err != nil {
return
}
if _, err := tmpFile.Seek(0, 0); err != nil {
return
}
oldStdin := os.Stdin
os.Stdin = tmpFile
defer func() { os.Stdin = oldStdin }()
for line := range sources.Stdin(context.Background()) {
fmt.Println(line)
}
}
Output: one two
Types ¶
This section is empty.