parser

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CSVConfig

type CSVConfig struct {
	Delimiter      string `json:"delimiter"`       // Default: ","
	HasHeader      bool   `json:"has_header"`      // Default: true
	SkipRows       int    `json:"skip_rows"`       // Number of rows to skip at the beginning
	QuoteCharacter string `json:"quote_character"` // Default: "\""
}

CSVConfig holds CSV-specific parsing configuration

type CSVParser

type CSVParser struct {
	// contains filtered or unexported fields
}

CSVParser implements the Parser interface for CSV files

func NewCSVParser

func NewCSVParser(config CSVConfig, stream *types.Stream) *CSVParser

NewCSVParser creates a new CSV parser with the given configuration

func (*CSVParser) InferSchema

func (p *CSVParser) InferSchema(_ context.Context, reader io.Reader) (*types.Stream, error)

InferSchema reads the first few rows of a CSV file to infer the schema Uses small samples to avoid loading entire file into memory

func (*CSVParser) StreamRecords

func (p *CSVParser) StreamRecords(ctx context.Context, reader io.Reader, callback RecordCallback) error

StreamRecords reads and streams CSV records with context support

type JSONConfig

type JSONConfig struct {
	LineDelimited bool `json:"line_delimited"` // Default: true (JSONL format)
}

JSONConfig holds JSON-specific parsing configuration

type JSONParser

type JSONParser struct {
	// contains filtered or unexported fields
}

JSONParser implements the Parser interface for JSON files

func NewJSONParser

func NewJSONParser(config JSONConfig, stream *types.Stream) *JSONParser

NewJSONParser creates a new JSON parser with the given configuration

func (*JSONParser) InferSchema

func (p *JSONParser) InferSchema(_ context.Context, reader io.Reader) (*types.Stream, error)

InferSchema reads the first few records of a JSON file to infer the schema Supports JSONL (line-delimited), JSON Array, and single JSON object formats

func (*JSONParser) StreamRecords

func (p *JSONParser) StreamRecords(ctx context.Context, reader io.Reader, callback RecordCallback) error

StreamRecords reads and streams JSON records with context support

type ParquetConfig

type ParquetConfig struct {
	StreamingEnabled bool `json:"streaming_enabled"` // Default: true - use range requests
}

ParquetConfig holds Parquet-specific parsing configuration

type ParquetParser

type ParquetParser struct {
	// contains filtered or unexported fields
}

ParquetParser implements the Parser interface for Parquet files Note: Parquet schema inference doesn't need to read data, just metadata

func NewParquetParser

func NewParquetParser(config ParquetConfig, stream *types.Stream) *ParquetParser

NewParquetParser creates a new Parquet parser with the given configuration

func (*ParquetParser) InferSchema

func (p *ParquetParser) InferSchema(_ context.Context, reader io.Reader) (*types.Stream, error)

InferSchema reads Parquet file metadata to infer the schema For Parquet, schema is stored in file metadata, so we don't need to read data NOTE: reader must be io.ReaderAt for Parquet (use S3RangeReader or bytes.Reader)

func (*ParquetParser) StreamRecords

func (p *ParquetParser) StreamRecords(ctx context.Context, reader io.Reader, callback RecordCallback) error

StreamRecords reads and streams Parquet records with context support NOTE: reader must be io.ReaderAt for Parquet (use S3RangeReader or bytes.Reader)

type ParquetReaderWrapper

type ParquetReaderWrapper struct {
	// contains filtered or unexported fields
}

ParquetReaderWrapper wraps an io.ReaderAt with size info and implements io.Seeker This allows the Parquet parser to determine file size via Seek Used when reading from sources like S3 that provide ReaderAt but not Seeker

func NewParquetReaderWrapper

func NewParquetReaderWrapper(readerAt io.ReaderAt, size int64) *ParquetReaderWrapper

NewParquetReaderWrapper creates a new wrapper for io.ReaderAt that also implements io.Seeker

func (*ParquetReaderWrapper) Read

func (w *ParquetReaderWrapper) Read(p []byte) (n int, err error)

func (*ParquetReaderWrapper) ReadAt

func (w *ParquetReaderWrapper) ReadAt(p []byte, off int64) (n int, err error)

func (*ParquetReaderWrapper) Seek

func (w *ParquetReaderWrapper) Seek(offset int64, whence int) (int64, error)

type Parser

type Parser interface {
	// InferSchema reads a small sample from the reader to infer schema
	// Should not load entire file into memory
	InferSchema(ctx context.Context, reader io.Reader) (*types.Stream, error)

	// StreamRecords reads records from the reader and calls callback for each record
	// Supports context cancellation to prevent resource leaks
	// Batching is handled by the destination layer, not the parser
	StreamRecords(ctx context.Context, reader io.Reader, callback RecordCallback) error
}

Parser defines the interface for file format parsers This interface separates parsing logic from storage operations (S3, GCS, etc.)

type RecordCallback

type RecordCallback func(ctx context.Context, record map[string]any) error

RecordCallback is called for each record during streaming Return error to stop processing

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL