csv

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2025 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewCSVSource

func NewCSVSource(config *config.BaseConfig) (core.Source, error)

NewCSVSource creates a new CSV source connector

func NewSimpleCSVSource

func NewSimpleCSVSource(config *config.BaseConfig) (core.Source, error)

NewSimpleCSVSource creates a new simple CSV source DEPRECATED: Use NewCSVSource instead. This will be removed in v3.0.0

Types

type CSVChunk

type CSVChunk struct {
	ID       int
	Lines    []string
	StartRow int
}

CSVChunk represents a chunk of CSV data to be processed

type CSVSource

type CSVSource struct {
	*base.BaseConnector
	// contains filtered or unexported fields
}

CSVSource is a production-ready CSV source connector using BaseConnector

func (*CSVSource) Close

func (s *CSVSource) Close(ctx context.Context) error

Close closes the CSV source connector

func (*CSVSource) Discover

func (s *CSVSource) Discover(ctx context.Context) (*core.Schema, error)

Discover returns the schema of the CSV file

func (*CSVSource) Initialize

func (s *CSVSource) Initialize(ctx context.Context, config *config.BaseConfig) error

Initialize initializes the CSV source connector

func (*CSVSource) Read

func (s *CSVSource) Read(ctx context.Context) (*core.RecordStream, error)

Read returns a stream of records from the CSV file

func (*CSVSource) ReadBatch

func (s *CSVSource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)

ReadBatch returns batches of records from the CSV file

func (*CSVSource) Subscribe

func (s *CSVSource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)

Subscribe is not supported by CSV files (CDC-only method)

func (*CSVSource) SupportsBatch

func (s *CSVSource) SupportsBatch() bool

SupportsBatch returns true since CSV supports batch reads

func (*CSVSource) SupportsIncremental

func (s *CSVSource) SupportsIncremental() bool

SupportsIncremental returns true since CSV can support incremental reads

func (*CSVSource) SupportsRealtime

func (s *CSVSource) SupportsRealtime() bool

SupportsRealtime returns false since CSV is not real-time

type ParallelCSVConfig

type ParallelCSVConfig struct {
	NumWorkers int // Number of parallel workers (0 = auto)
	ChunkSize  int // Lines per chunk
	Headers    []string
	SkipHeader bool
	Delimiter  rune
	ParseFunc  func([]string) (*models.Record, error)
}

ParallelCSVConfig configures the parallel CSV parser

type ParallelCSVParser

type ParallelCSVParser struct {
	// contains filtered or unexported fields
}

ParallelCSVParser implements parallel CSV parsing for improved performance

func NewParallelCSVParser

func NewParallelCSVParser(config ParallelCSVConfig, logger *zap.Logger) *ParallelCSVParser

NewParallelCSVParser creates a new parallel CSV parser

func (*ParallelCSVParser) GetMetrics

func (p *ParallelCSVParser) GetMetrics() (rowsParsed, errors int64)

GetMetrics returns parsing metrics

func (*ParallelCSVParser) ParseFile

func (p *ParallelCSVParser) ParseFile(reader io.Reader) (<-chan *models.Record, <-chan error)

ParseFile parses a CSV file in parallel

func (*ParallelCSVParser) Stop

func (p *ParallelCSVParser) Stop()

Stop stops the parallel parser

type ParallelCSVTransform

type ParallelCSVTransform struct {
	// contains filtered or unexported fields
}

ParallelCSVTransform applies transformations to CSV records in parallel

func NewParallelCSVTransform

func NewParallelCSVTransform(transform func(*models.Record) (*models.Record, error), numWorkers int, logger *zap.Logger) *ParallelCSVTransform

NewParallelCSVTransform creates a transform that processes records in parallel

func (*ParallelCSVTransform) Process

func (t *ParallelCSVTransform) Process(ctx context.Context, input <-chan *models.Record) <-chan *models.Record

Process processes records in parallel while maintaining order

type SimpleCSVSource

type SimpleCSVSource struct {
	// contains filtered or unexported fields
}

SimpleCSVSource is a minimal CSV source implementation

func (*SimpleCSVSource) Close

func (s *SimpleCSVSource) Close(ctx context.Context) error

Close closes the CSV source

func (*SimpleCSVSource) Discover

func (s *SimpleCSVSource) Discover(ctx context.Context) (*core.Schema, error)

Discover discovers available streams/tables

func (*SimpleCSVSource) DiscoverSchema

func (s *SimpleCSVSource) DiscoverSchema(ctx context.Context) (*core.Schema, error)

DiscoverSchema discovers the schema of the CSV file

func (*SimpleCSVSource) GetPosition

func (s *SimpleCSVSource) GetPosition() core.Position

GetPosition returns the current position

func (*SimpleCSVSource) GetState

func (s *SimpleCSVSource) GetState() core.State

GetState returns the current state

func (*SimpleCSVSource) Health

func (s *SimpleCSVSource) Health(ctx context.Context) error

Health checks the health of the source

func (*SimpleCSVSource) Initialize

func (s *SimpleCSVSource) Initialize(ctx context.Context, config *config.BaseConfig) error

Initialize initializes the CSV source

func (*SimpleCSVSource) LoadState

func (s *SimpleCSVSource) LoadState(ctx context.Context) (core.State, error)

LoadState loads the saved state

func (*SimpleCSVSource) Metrics

func (s *SimpleCSVSource) Metrics() map[string]interface{}

Metrics returns metrics for the source

func (*SimpleCSVSource) Read

Read reads records from the CSV file

func (*SimpleCSVSource) ReadBatch

func (s *SimpleCSVSource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)

ReadBatch reads records in batches

func (*SimpleCSVSource) SaveState

func (s *SimpleCSVSource) SaveState(ctx context.Context, state core.State) error

SaveState saves the current state

func (*SimpleCSVSource) SetPosition

func (s *SimpleCSVSource) SetPosition(position core.Position) error

SetPosition sets the current position

func (*SimpleCSVSource) SetState

func (s *SimpleCSVSource) SetState(state core.State) error

SetState sets the current state

func (*SimpleCSVSource) Subscribe

func (s *SimpleCSVSource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)

Subscribe subscribes to real-time changes

func (*SimpleCSVSource) SupportsBatch

func (s *SimpleCSVSource) SupportsBatch() bool

SupportsBatch returns whether the source supports batch reading

func (*SimpleCSVSource) SupportsIncremental

func (s *SimpleCSVSource) SupportsIncremental() bool

SupportsIncremental returns whether the source supports incremental sync

func (*SimpleCSVSource) SupportsRealtime

func (s *SimpleCSVSource) SupportsRealtime() bool

SupportsRealtime returns whether the source supports real-time updates

func (*SimpleCSVSource) SupportsStreaming

func (s *SimpleCSVSource) SupportsStreaming() bool

SupportsStreaming returns whether the source supports streaming

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL