Documentation
¶
Index ¶
- Constants
- func File() api.Sink
- type CsvReader
- type FileSource
- func (fs *FileSource) Close(ctx api.StreamContext) error
- func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error
- func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error
- func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error)
- type FileSourceConfig
- type FileType
- type FormatReader
- func CreateCsvReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
- func CreateJsonReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
- func CreateLineReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
- func GetReader(fileType FileType, fileStream io.Reader, config *FileSourceConfig, ...) (FormatReader, error)
- type JsonReader
- type LineReader
- type ReaderError
Constants ¶
View Source
const ( GZIP = "gzip" ZSTD = "zstd" )
View Source
const (
TupleError int = iota // display error in tuple
)
Variables ¶
This section is empty.
Functions ¶
Types ¶
type FileSource ¶
type FileSource struct {
// contains filtered or unexported fields
}
FileSource The BATCH to load data from file at once
func (*FileSource) Close ¶
func (fs *FileSource) Close(ctx api.StreamContext) error
func (*FileSource) Configure ¶
func (fs *FileSource) Configure(fileName string, props map[string]interface{}) error
func (*FileSource) Load ¶
func (fs *FileSource) Load(ctx api.StreamContext, consumer chan<- api.SourceTuple) error
func (*FileSource) Open ¶
func (fs *FileSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error)
type FileSourceConfig ¶
type FileSourceConfig struct {
FileType FileType `json:"fileType"`
Path string `json:"path"`
Interval int `json:"interval"`
IsTable bool `json:"isTable"`
Parallel bool `json:"parallel"`
SendInterval int `json:"sendInterval"`
ActionAfterRead int `json:"actionAfterRead"`
MoveTo string `json:"moveTo"`
HasHeader bool `json:"hasHeader"`
Columns []string `json:"columns"`
IgnoreStartLines int `json:"ignoreStartLines"`
IgnoreEndLines int `json:"ignoreEndLines"`
Delimiter string `json:"delimiter"`
Decompression string `json:"decompression"`
}
type FormatReader ¶
type FormatReader interface {
Read() (map[string]interface{}, error) // Reads the next record. Returns EOF when the input has reached its end.
Close() error
}
func CreateCsvReader ¶
func CreateCsvReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
func CreateJsonReader ¶
func CreateJsonReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
func CreateLineReader ¶
func CreateLineReader(fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
func GetReader ¶
func GetReader(fileType FileType, fileStream io.Reader, config *FileSourceConfig, ctx api.StreamContext) (FormatReader, error)
type JsonReader ¶
type JsonReader struct {
// contains filtered or unexported fields
}
func (*JsonReader) Close ¶
func (r *JsonReader) Close() error
func (*JsonReader) Read ¶
func (r *JsonReader) Read() (map[string]interface{}, error)
type LineReader ¶
type LineReader struct {
// contains filtered or unexported fields
}
func (*LineReader) Close ¶
func (r *LineReader) Close() error
func (*LineReader) Read ¶
func (r *LineReader) Read() (map[string]interface{}, error)
type ReaderError ¶
func BuildError ¶
func BuildError(code int, msg string) *ReaderError
func (ReaderError) Error ¶
func (e ReaderError) Error() string
Source Files
¶
Click to show internal directories.
Click to hide internal directories.