Documentation
¶
Index ¶
- Constants
- Variables
- func ExecutionIdToFileName(executionId string, chunkNumber int) string
- func FileNameToExecutionId(filename string) (string, int, error)
- func GetReadCsvChunkQueryFormat(sourceFile string, opts ...CsvToJsonOpts) string
- func RegisterCollector(collectorFunc func() Collector)
- func RegisterTable[R types.RowStruct, T Table[R]]()
- func RegisterTableFormat[R types.RowStruct, S parse.Config, T TableWithFormat[R, S]]()
- type ArtifactConversionCollector
- func (c *ArtifactConversionCollector[S]) Collect(ctx context.Context) (int, int, error)
- func (c *ArtifactConversionCollector[S]) GetFromTime() *row_source.ResolvedFromTime
- func (c *ArtifactConversionCollector[S]) GetSchema() (*schema.RowSchema, error)
- func (c *ArtifactConversionCollector[S]) Identifier() string
- func (c *ArtifactConversionCollector[S]) Init(ctx context.Context, req *types.CollectRequest) error
- func (c *ArtifactConversionCollector[S]) Notify(ctx context.Context, event events.Event) error
- func (c *ArtifactConversionCollector[S]) OnChunk(ctx context.Context, chunkNumber int, collectionState json.RawMessage) error
- func (c *ArtifactConversionCollector[S]) UpdateCollectionState(ctx context.Context, request *types.CollectRequest) error
- type ArtifactToJsonConverter
- type ArtifactToJsonConverterImpl
- type ChunkWriter
- type Collector
- type CollectorImpl
- func (c *CollectorImpl[R]) Collect(ctx context.Context) (int, int, error)
- func (c *CollectorImpl[S]) GetFromTime() *row_source.ResolvedFromTime
- func (c *CollectorImpl[R]) GetSchema() (*schema.RowSchema, error)
- func (c *CollectorImpl[R]) Identifier() string
- func (c *CollectorImpl[R]) Init(ctx context.Context, req *types.CollectRequest) error
- func (c *CollectorImpl[R]) Notify(ctx context.Context, event events.Event) error
- func (c *CollectorImpl[R]) OnChunk(ctx context.Context, chunkNumber int) error
- func (c *CollectorImpl[R]) WriteRemainingRows(ctx context.Context, executionId string) (int, int, error)
- type CollectorWithFormat
- type CsvHeaderMode
- type CsvTableConfig
- type CsvToJsonOpts
- type DynamicRow
- func (l *DynamicRow) Enrich(fields schema.CommonFields)
- func (l *DynamicRow) GetCommonFields() schema.CommonFields
- func (l *DynamicRow) InitialiseFromMap(m map[string]string) error
- func (l *DynamicRow) MarshalJSON() ([]byte, error)
- func (l *DynamicRow) ResolveSchema(customTable *types.Table) (*schema.RowSchema, error)
- func (l *DynamicRow) Validate() error
- type JSONLWriter
- type MapInitialisedRow
- type MapOption
- type Mapper
- type SchemaSetter
- type SourceMetadata
- type Table
- type TableFactory
- type TableWithFormat
- type TableWithFormatImpl
Constants ¶
const JSONLChunkSize = 10000
JSONLChunkSize the number of rows to write in each JSONL file - make the same size as duck db uses to infer schema (10000)
Variables ¶
var Factory = newTableFactory()
Factory is a global TableFactory instance
Functions ¶
func ExecutionIdToFileName ¶
ExecutionIdToFileName convert an execution id and chunk number to a filename assuming a convention of <executionId>-<chunkNumber>.jsonl
func FileNameToExecutionId ¶
FileNameToExecutionId convert a filename to an execution id assuming a convention of <executionId>-<chunkNumber>.jsonl
func GetReadCsvChunkQueryFormat ¶
func GetReadCsvChunkQueryFormat(sourceFile string, opts ...CsvToJsonOpts) string
func RegisterCollector ¶
func RegisterCollector(collectorFunc func() Collector)
RegisterCollector registers a collector constructor directly this is only used if we need to specify a custom collector (used for custom tables)
func RegisterTable ¶
RegisterTable registers a collector constructor with the factory this is called from the package init function of the table implementation
func RegisterTableFormat ¶
func RegisterTableFormat[R types.RowStruct, S parse.Config, T TableWithFormat[R, S]]()
RegisterTableFormat registers a collector constructor for a table which supports Format this is called from the package init function of the table implementation
Types ¶
type ArtifactConversionCollector ¶
type ArtifactConversionCollector[S parse.Config] struct { observable.ObservableImpl // the table config Format S // contains filtered or unexported fields }
ArtifactConversionCollector is a collector that converts artifacts directly to JSONL S is the table config type
func NewArtifactConversionCollector ¶
func NewArtifactConversionCollector[S parse.Config](tableName string, formatData *proto.ConfigData) *ArtifactConversionCollector[S]
func (*ArtifactConversionCollector[S]) Collect ¶
Collect executes the collection process. Tell our source to start collection
func (*ArtifactConversionCollector[S]) GetFromTime ¶
func (c *ArtifactConversionCollector[S]) GetFromTime() *row_source.ResolvedFromTime
func (*ArtifactConversionCollector[S]) GetSchema ¶
func (c *ArtifactConversionCollector[S]) GetSchema() (*schema.RowSchema, error)
GetSchema returns the schema of the table if available for dynamic tables, the schema is only available at this if the config contains a schema
func (*ArtifactConversionCollector[S]) Identifier ¶
func (c *ArtifactConversionCollector[S]) Identifier() string
func (*ArtifactConversionCollector[S]) Init ¶
func (c *ArtifactConversionCollector[S]) Init(ctx context.Context, req *types.CollectRequest) error
func (*ArtifactConversionCollector[S]) Notify ¶
Notify implements observable.Observer it handles all events which collectorFuncMap may receive (these will all come from the source)
func (*ArtifactConversionCollector[S]) OnChunk ¶
func (c *ArtifactConversionCollector[S]) OnChunk(ctx context.Context, chunkNumber int, collectionState json.RawMessage) error
OnChunk is called by the we have written a chunk of enriched rows to a JSONL/CSV file notify observers of the chunk
func (*ArtifactConversionCollector[S]) UpdateCollectionState ¶
func (c *ArtifactConversionCollector[S]) UpdateCollectionState(ctx context.Context, request *types.CollectRequest) error
type ArtifactToJsonConverter ¶
type ArtifactToJsonConverterImpl ¶
func (*ArtifactToJsonConverterImpl[S]) EnrichRow ¶
func (c *ArtifactToJsonConverterImpl[S]) EnrichRow(_ *DynamicRow, _ S, _ schema.SourceEnrichment) (*DynamicRow, error)
func (*ArtifactToJsonConverterImpl[S]) GetSourceMetadata ¶
func (c *ArtifactToJsonConverterImpl[S]) GetSourceMetadata(_ S) []*SourceMetadata[*DynamicRow]
type ChunkWriter ¶
func NewJSONLWriter ¶
func NewJSONLWriter(destPath string) ChunkWriter
type Collector ¶
type Collector interface {
observable.Observable
Init(ctx context.Context, request *types.CollectRequest) error
Identifier() string
Collect(context.Context) (int, int, error)
GetSchema() (*schema.RowSchema, error)
GetFromTime() *row_source.ResolvedFromTime
}
Collector is an interface which provides a methods for collecting table data from a source This is implemented by the generic CollectorImpl struct
type CollectorImpl ¶
type CollectorImpl[R types.RowStruct] struct { observable.ObservableImpl Table Table[R] // contains filtered or unexported fields }
CollectorImpl is a generic implementation of the Collector interface it is responsible for coordinating the collection process and reporting status R is the type of the row struct S is the type of the partition config T is the type of the table U is the type of the connection
func (*CollectorImpl[R]) Collect ¶
Collect executes the collection process. Tell our source to start collection
func (*CollectorImpl[S]) GetFromTime ¶
func (c *CollectorImpl[S]) GetFromTime() *row_source.ResolvedFromTime
GetFromTime returns the 'resolved' from time of the source
func (*CollectorImpl[R]) GetSchema ¶
func (c *CollectorImpl[R]) GetSchema() (*schema.RowSchema, error)
GetSchema returns the schema of the table
func (*CollectorImpl[R]) Identifier ¶
func (c *CollectorImpl[R]) Identifier() string
func (*CollectorImpl[R]) Init ¶
func (c *CollectorImpl[R]) Init(ctx context.Context, req *types.CollectRequest) error
func (*CollectorImpl[R]) Notify ¶
Notify implements observable.Observer it receives events from the source it handles ONLY Row and Error events
func (*CollectorImpl[R]) OnChunk ¶
func (c *CollectorImpl[R]) OnChunk(ctx context.Context, chunkNumber int) error
OnChunk is called by the we have written a chunk of enriched rows to a JSONL/CSV file notify observers of the chunk
func (*CollectorImpl[R]) WriteRemainingRows ¶
type CollectorWithFormat ¶
type CollectorWithFormat[R types.RowStruct, S parse.Config] struct { CollectorImpl[R] // shadow the table field from the base collector, so we store it as a TableWithFormat, //to avoid the need for a type assertion Table TableWithFormat[R, S] // the table format Format S }
CollectorWithFormat is a collector that has a table format The format is parsed from the source format config
func NewCollectorWithFormat ¶
func NewCollectorWithFormat[R types.RowStruct, S parse.Config, T TableWithFormat[R, S]]() *CollectorWithFormat[R, S]
func (*CollectorWithFormat[R, S]) Init ¶
func (c *CollectorWithFormat[R, S]) Init(ctx context.Context, req *types.CollectRequest) error
type CsvHeaderMode ¶
type CsvHeaderMode string
const ( CsvHeaderModeAuto CsvHeaderMode = "auto" CsvHeaderModeOff CsvHeaderMode = "off" CsvHeaderModeOn CsvHeaderMode = "on" )
type CsvTableConfig ¶
type CsvToJsonOpts ¶
type CsvToJsonOpts func(*CsvTableConfig)
func WithCsvComment ¶
func WithCsvComment(comment string) CsvToJsonOpts
func WithCsvDelimiter ¶
func WithCsvDelimiter(delimiter string) CsvToJsonOpts
func WithCsvHeaderMode ¶
func WithCsvHeaderMode(headerMode CsvHeaderMode) CsvToJsonOpts
func WithCsvSchema ¶
func WithCsvSchema(schema *schema.RowSchema) CsvToJsonOpts
func WithMappings ¶
func WithMappings(mappings map[string]string) CsvToJsonOpts
type DynamicRow ¶
func NewDynamicRow ¶
func NewDynamicRow() *DynamicRow
func (*DynamicRow) Enrich ¶
func (l *DynamicRow) Enrich(fields schema.CommonFields)
Enrich uses the provided mappings to populate the common fields from mapped column values
func (*DynamicRow) GetCommonFields ¶
func (l *DynamicRow) GetCommonFields() schema.CommonFields
func (*DynamicRow) InitialiseFromMap ¶
func (l *DynamicRow) InitialiseFromMap(m map[string]string) error
InitialiseFromMap initializes the struct from a map of string values
func (*DynamicRow) MarshalJSON ¶
func (l *DynamicRow) MarshalJSON() ([]byte, error)
MarshalJSON overrides JSON serialization to include the dynamic columns
func (*DynamicRow) ResolveSchema ¶
ResolveSchema returns the (potentially partial) schema for the dynamic row - this will be used for the JSONL-parquet conversion
func (*DynamicRow) Validate ¶
func (l *DynamicRow) Validate() error
type JSONLWriter ¶
type JSONLWriter struct {
// contains filtered or unexported fields
}
JSONLWriter implements ChunkWriter and writes rows to JSONL files
func (JSONLWriter) WriteChunk ¶
type MapInitialisedRow ¶
MapInitialisedRow is an interface which provides a means to initialise a row struct from a string map this is used in combination with the GonxMapper/GrokMapper
type Mapper ¶
type Mapper[R types.RowStruct] interface { Identifier() string // Map converts raw rows to the desired format (type 'R') Map(context.Context, any, ...MapOption[R]) (R, error) }
Mapper is a generic interface which provides a method for mapping raw source data into row structs R is the type of the row struct which the mapperFunc outputs
type SchemaSetter ¶
SchemaSetter is an interface which provides a method to set the schema
type SourceMetadata ¶
type SourceMetadata[R types.RowStruct] struct { SourceName string Mapper Mapper[R] Options []row_source.RowSourceOption }
type Table ¶
type Table[R types.RowStruct] interface { // Identifier must return the collection name Identifier() string // GetSourceMetadata returns the supported sources for the table GetSourceMetadata() []*SourceMetadata[R] // EnrichRow is called to enrich the row with common (tp_*) fields EnrichRow(R, schema.SourceEnrichment) (R, error) }
Table is a generic interface representing a plugin table definition R is the row struct type
type TableFactory ¶
type TableFactory struct {
// contains filtered or unexported fields
}
func (*TableFactory) GetCollector ¶
func (f *TableFactory) GetCollector(req *types.CollectRequest) (Collector, error)
func (*TableFactory) GetPartitions ¶
func (f *TableFactory) GetPartitions() map[string]func() Collector
func (*TableFactory) Init ¶
func (f *TableFactory) Init() (err error)
Init builds the map of table constructors and schemas
func (*TableFactory) Initialized ¶
func (f *TableFactory) Initialized() bool
type TableWithFormat ¶
TableWithFormat is a generic interface representing a plugin table definition with a format
type TableWithFormatImpl ¶
TableWithFormatImpl is a generic struct representing a plugin table definition with a format
func (*TableWithFormatImpl[S]) SetFormat ¶
func (c *TableWithFormatImpl[S]) SetFormat(format S)