source

package
v0.1.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewFile

func NewFile(fsys fs.FS, paths ...string) (iosystem.Source, error)

NewFile creates a source that reads files from the given filesystem. The fs.FS parameter is provided by the client (e.g., lfs.FS, stream.FS). Paths are passed as-is to fs.Open() - the client is responsible for proper formatting.

func NewReaderJSON

func NewReaderJSON(path string, r io.Reader) iosystem.Source

NewReaderJSON creates a Source that yields a single document from the given reader with content type application/json. Use this when the reader contains structured JSON data.

func NewUnion

func NewUnion(sources ...iosystem.Source) (iosystem.Source, error)

NewUnion creates a source that merges multiple sources into one document.

Types

type FS

type FS struct {
	// contains filtered or unexported fields
}

FS traverses a filesystem directory and yields documents for each file. It uses fs.WalkDir in a background goroutine for progressive file discovery. The goroutine is lazily initialized on the first call to Next(). Directories are skipped - only regular files are yielded as documents.

func NewFS

func NewFS(fsys fs.FS, dir string) (*FS, error)

NewFS creates a source that walks through a filesystem directory progressively. The fs.FS parameter should support fs.ReadDirFS (e.g., from lfs.New or stream.NewFS). The dir parameter specifies the starting directory path. For directories, the path should end with `/` as per stream library conventions.

The walk operation is lazily initialized on the first call to Next(), running in a background goroutine that feeds file paths through a buffered channel. This means:

  • No startup delay: NewFS returns immediately
  • Directory validation is deferred until Next() is called
  • Memory efficient: Only buffer size (100) paths held in memory at a time
  • Works with both local filesystem (lfs) and S3 (stream)

func (*FS) Close

func (w *FS) Close() error

Close signals the background walker to stop and drains the path channel to prevent goroutine leaks. It's safe to call Close() even if Next() was never called.

func (*FS) Next

func (w *FS) Next(ctx context.Context) (*iosystem.Document, error)

Next returns the next file document or io.EOF when all files are read. Files are wrapped in an auto-closer to prevent descriptor leaks. The first call to Next() lazily initializes the background walker goroutine.

type File

type File struct {
	// contains filtered or unexported fields
}

File reads one or more files sequentially from a filesystem. A single file is a special case where paths contains one element.

func (*File) Close

func (f *File) Close() error

Close implements iosystem.Source.

func (*File) Next

func (f *File) Next(ctx context.Context) (*iosystem.Document, error)

Next returns the next file document or io.EOF when all files are read. Files are wrapped in an auto-closer to prevent descriptor leaks.

type None

type None struct {
	// contains filtered or unexported fields
}

func NewNone

func NewNone() *None

NewNone creates a Source that yields a single document from the given reader.

func (*None) Close

func (s *None) Close() error

Close does nothing since None doesn't own any resources.

func (*None) Next

func (s *None) Next(ctx context.Context) (*iosystem.Document, error)

Next returns a single empty document on the first call and io.EOF on subsequent calls.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader wraps an io.Reader as a single-document Source. This is useful for integrating with spool.ForEach or other scenarios where you have an io.Reader and want to use it with Pipeline.

Example with spool:

spool.ForEach(ctx, root, func(ctx, path, r, w) error {
    source := source.NewReaderSource(path, r)
    sink := sink.NewWriterSink(w)
    return pipeline.Run(ctx, source, sink)
})

func NewReader

func NewReader(path string, r io.Reader) *Reader

NewReader creates a Source that yields a single document from the given reader.

func NewStdin

func NewStdin() *Reader

NewStdin creates a source that reads from os.Stdin.

func (*Reader) Close

func (s *Reader) Close() error

Close does nothing since ReaderSource doesn't own the reader. The reader lifecycle is managed by the caller (e.g., spool).

func (*Reader) Next

func (s *Reader) Next(ctx context.Context) (*iosystem.Document, error)

Next returns the document on first call, then io.EOF.

type Union

type Union struct {
	// contains filtered or unexported fields
}

Union combines multiple sources into a single document. It reads all documents from all sources and concatenates their content. This is useful for --merge mode where multiple files should be processed as one.

func (*Union) Close

func (m *Union) Close() error

Close closes all underlying sources.

func (*Union) Next

func (m *Union) Next(ctx context.Context) (*iosystem.Document, error)

Next returns a single merged document containing all content from all sources. Returns io.EOF on subsequent calls.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL