Documentation
¶
Index ¶
- func NewCSVSource(config *config.BaseConfig) (core.Source, error)
- func NewSimpleCSVSource(config *config.BaseConfig) (core.Source, error)
- type CSVChunk
- type CSVSource
- func (s *CSVSource) Close(ctx context.Context) error
- func (s *CSVSource) Discover(ctx context.Context) (*core.Schema, error)
- func (s *CSVSource) Initialize(ctx context.Context, config *config.BaseConfig) error
- func (s *CSVSource) Read(ctx context.Context) (*core.RecordStream, error)
- func (s *CSVSource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)
- func (s *CSVSource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)
- func (s *CSVSource) SupportsBatch() bool
- func (s *CSVSource) SupportsIncremental() bool
- func (s *CSVSource) SupportsRealtime() bool
- type ParallelCSVConfig
- type ParallelCSVParser
- type ParallelCSVTransform
- type SimpleCSVSource
- func (s *SimpleCSVSource) Close(ctx context.Context) error
- func (s *SimpleCSVSource) Discover(ctx context.Context) (*core.Schema, error)
- func (s *SimpleCSVSource) DiscoverSchema(ctx context.Context) (*core.Schema, error)
- func (s *SimpleCSVSource) GetPosition() core.Position
- func (s *SimpleCSVSource) GetState() core.State
- func (s *SimpleCSVSource) Health(ctx context.Context) error
- func (s *SimpleCSVSource) Initialize(ctx context.Context, config *config.BaseConfig) error
- func (s *SimpleCSVSource) LoadState(ctx context.Context) (core.State, error)
- func (s *SimpleCSVSource) Metrics() map[string]interface{}
- func (s *SimpleCSVSource) Read(ctx context.Context) (*core.RecordStream, error)
- func (s *SimpleCSVSource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)
- func (s *SimpleCSVSource) SaveState(ctx context.Context, state core.State) error
- func (s *SimpleCSVSource) SetPosition(position core.Position) error
- func (s *SimpleCSVSource) SetState(state core.State) error
- func (s *SimpleCSVSource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)
- func (s *SimpleCSVSource) SupportsBatch() bool
- func (s *SimpleCSVSource) SupportsIncremental() bool
- func (s *SimpleCSVSource) SupportsRealtime() bool
- func (s *SimpleCSVSource) SupportsStreaming() bool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewCSVSource ¶
func NewCSVSource(config *config.BaseConfig) (core.Source, error)
NewCSVSource creates a new CSV source connector
func NewSimpleCSVSource ¶
func NewSimpleCSVSource(config *config.BaseConfig) (core.Source, error)
NewSimpleCSVSource creates a new simple CSV source DEPRECATED: Use NewCSVSource instead. This will be removed in v3.0.0
Types ¶
type CSVSource ¶
type CSVSource struct { *base.BaseConnector // contains filtered or unexported fields }
CSVSource is a production-ready CSV source connector using BaseConnector
func (*CSVSource) Initialize ¶
Initialize initializes the CSV source connector
func (*CSVSource) SupportsBatch ¶
SupportsBatch returns true since CSV supports batch reads
func (*CSVSource) SupportsIncremental ¶
SupportsIncremental returns true since CSV can support incremental reads
func (*CSVSource) SupportsRealtime ¶
SupportsRealtime returns false since CSV is not real-time
type ParallelCSVConfig ¶
type ParallelCSVConfig struct { NumWorkers int // Number of parallel workers (0 = auto) ChunkSize int // Lines per chunk Headers []string SkipHeader bool Delimiter rune ParseFunc func([]string) (*models.Record, error) }
ParallelCSVConfig configures the parallel CSV parser
type ParallelCSVParser ¶
type ParallelCSVParser struct {
// contains filtered or unexported fields
}
ParallelCSVParser implements parallel CSV parsing for improved performance
func NewParallelCSVParser ¶
func NewParallelCSVParser(config ParallelCSVConfig, logger *zap.Logger) *ParallelCSVParser
NewParallelCSVParser creates a new parallel CSV parser
func (*ParallelCSVParser) GetMetrics ¶
func (p *ParallelCSVParser) GetMetrics() (rowsParsed, errors int64)
GetMetrics returns parsing metrics
type ParallelCSVTransform ¶
type ParallelCSVTransform struct {
// contains filtered or unexported fields
}
ParallelCSVTransform applies transformations to CSV records in parallel
type SimpleCSVSource ¶
type SimpleCSVSource struct {
// contains filtered or unexported fields
}
SimpleCSVSource is a minimal CSV source implementation
func (*SimpleCSVSource) Close ¶
func (s *SimpleCSVSource) Close(ctx context.Context) error
Close closes the CSV source
func (*SimpleCSVSource) DiscoverSchema ¶
DiscoverSchema discovers the schema of the CSV file
func (*SimpleCSVSource) GetPosition ¶
func (s *SimpleCSVSource) GetPosition() core.Position
GetPosition returns the current position
func (*SimpleCSVSource) GetState ¶
func (s *SimpleCSVSource) GetState() core.State
GetState returns the current state
func (*SimpleCSVSource) Health ¶
func (s *SimpleCSVSource) Health(ctx context.Context) error
Health checks the health of the source
func (*SimpleCSVSource) Initialize ¶
func (s *SimpleCSVSource) Initialize(ctx context.Context, config *config.BaseConfig) error
Initialize initializes the CSV source
func (*SimpleCSVSource) Metrics ¶
func (s *SimpleCSVSource) Metrics() map[string]interface{}
Metrics returns metrics for the source
func (*SimpleCSVSource) Read ¶
func (s *SimpleCSVSource) Read(ctx context.Context) (*core.RecordStream, error)
Read reads records from the CSV file
func (*SimpleCSVSource) ReadBatch ¶
func (s *SimpleCSVSource) ReadBatch(ctx context.Context, batchSize int) (*core.BatchStream, error)
ReadBatch reads records in batches
func (*SimpleCSVSource) SetPosition ¶
func (s *SimpleCSVSource) SetPosition(position core.Position) error
SetPosition sets the current position
func (*SimpleCSVSource) SetState ¶
func (s *SimpleCSVSource) SetState(state core.State) error
SetState sets the current state
func (*SimpleCSVSource) Subscribe ¶
func (s *SimpleCSVSource) Subscribe(ctx context.Context, tables []string) (*core.ChangeStream, error)
Subscribe subscribes to real-time changes
func (*SimpleCSVSource) SupportsBatch ¶
func (s *SimpleCSVSource) SupportsBatch() bool
SupportsBatch returns whether the source supports batch reading
func (*SimpleCSVSource) SupportsIncremental ¶
func (s *SimpleCSVSource) SupportsIncremental() bool
SupportsIncremental returns whether the source supports incremental sync
func (*SimpleCSVSource) SupportsRealtime ¶
func (s *SimpleCSVSource) SupportsRealtime() bool
SupportsRealtime returns whether the source supports real-time updates
func (*SimpleCSVSource) SupportsStreaming ¶
func (s *SimpleCSVSource) SupportsStreaming() bool
SupportsStreaming returns whether the source supports streaming