source

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewReaderJSON

func NewReaderJSON(path string, r io.Reader) iosystem.Source

NewReaderJSON creates a Source that yields a single document from the given reader with content type application/json. Use this when the reader contains structured JSON data.

Types

type None

type None struct {
	// contains filtered or unexported fields
}

func NewNone

func NewNone() *None

NewNone creates a Source that yields a single document from the given reader.

func (*None) Close

func (s *None) Close() error

Close does nothing since None doesn't own any resources.

func (*None) Next

func (s *None) Next(ctx context.Context) (*iosystem.Document, error)

Next returns a single empty document on the first call and io.EOF on subsequent calls.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader wraps an io.Reader as a single-document Source. This is useful for integrating with spool.ForEach or other scenarios where you have an io.Reader and want to use it with Pipeline.

Example with spool:

spool.ForEach(ctx, root, func(ctx, path, r, w) error {
    source := source.NewReaderSource(path, r)
    sink := sink.NewWriterSink(w)
    return pipeline.Run(ctx, source, sink)
})

func NewReader

func NewReader(path string, r io.Reader) *Reader

NewReader creates a Source that yields a single document from the given reader.

func NewStdin

func NewStdin() *Reader

NewStdin creates a source that reads from os.Stdin.

func (*Reader) Close

func (s *Reader) Close() error

Close does nothing since ReaderSource doesn't own the reader. The reader lifecycle is managed by the caller (e.g., spool).

func (*Reader) Next

func (s *Reader) Next(ctx context.Context) (*iosystem.Document, error)

Next returns the document on first call, then io.EOF.

type Storage added in v0.2.0

type Storage struct {
	// contains filtered or unexported fields
}

Storage traverses a filesystem directory and yields documents for each file. It uses fs.WalkDir in a background goroutine for progressive file discovery. The goroutine is lazily initialized on the first call to Next(). Directories are skipped - only regular files are yielded as documents.

func NewStorage added in v0.2.0

func NewStorage(storage storage.Storage, prefix iosystem.Key) (*Storage, error)

NewStorage creates a source that walks through a filesystem directory progressively. The fs.FS parameter should support fs.ReadDirFS (e.g., from lfs.New or stream.NewStorage). The dir parameter specifies the starting directory path. For directories, the path should end with `/` as per stream library conventions.

The walk operation is lazily initialized on the first call to Next(), running in a background goroutine that feeds file paths through a buffered channel. This means:

  • No startup delay: NewStorage returns immediately
  • Directory validation is deferred until Next() is called
  • Memory efficient: Only buffer size (100) paths held in memory at a time
  • Works with both local filesystem (lfs) and S3 (stream)

func (*Storage) Close added in v0.2.0

func (w *Storage) Close() error

Close signals the background walker to stop and drains the path channel to prevent goroutine leaks. It's safe to call Close() even if Next() was never called.

func (*Storage) Next added in v0.2.0

func (w *Storage) Next(ctx context.Context) (*iosystem.Document, error)

Next returns the next file document or io.EOF when all files are read. Files are wrapped in an auto-closer to prevent descriptor leaks. The first call to Next() lazily initializes the background walker goroutine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL