Documentation
¶
Index ¶
- Constants
- Variables
- func ExecutionIdToFileName(executionId string, chunkNumber int) string
- func FileNameToExecutionId(filename string) (string, int, error)
- func GetReadCsvChunkQueryFormat(sourceFile string, opts ...CsvToJsonOpts) string
- func RegisterCustomTable[T CustomTable](opts ...TableOption)
- func RegisterFormat[T formats.Format](presets ...T)
- func RegisterTable[R types.RowStruct, T Table[R]]()
- func TestIdentifier(t *testing.T, c Collector)
- func TestSchema(t *testing.T, c Collector)
- func Validate(t *testing.T, ctor func() Collector)
- type ArtifactConversionCollector
- func (c *ArtifactConversionCollector) Collect(ctx context.Context) (int, int, error)
- func (c *ArtifactConversionCollector) GetFromTime() *row_source.ResolvedFromTime
- func (c *ArtifactConversionCollector) GetSchema() *schema.TableSchema
- func (c *ArtifactConversionCollector) Identifier() string
- func (c *ArtifactConversionCollector) Init(ctx context.Context, req *types.CollectRequest) error
- func (c *ArtifactConversionCollector) Notify(ctx context.Context, event events.Event) error
- func (c *ArtifactConversionCollector) OnChunk(ctx context.Context, chunkNumber int, collectionState json.RawMessage) error
- func (c *ArtifactConversionCollector) UpdateCollectionState(ctx context.Context, request *types.CollectRequest) error
- type ArtifactToJsonConverter
- type ArtifactToJsonConverterImpl
- type ChunkWriter
- type Collector
- type CollectorImpl
- func (c *CollectorImpl[R]) Collect(ctx context.Context) (int, int, error)
- func (c *CollectorImpl[S]) GetFromTime() *row_source.ResolvedFromTime
- func (c *CollectorImpl[R]) GetSchema() *schema.TableSchema
- func (c *CollectorImpl[R]) Identifier() string
- func (c *CollectorImpl[R]) Init(ctx context.Context, req *types.CollectRequest) error
- func (c *CollectorImpl[R]) Notify(ctx context.Context, event events.Event) error
- func (c *CollectorImpl[R]) OnChunk(ctx context.Context, chunkNumber int) error
- func (c *CollectorImpl[R]) WriteRemainingRows(ctx context.Context, executionId string) (int, int, error)
- type CsvHeaderMode
- type CsvTableConfig
- type CsvToJsonOpts
- type CustomTable
- type CustomTableImpl
- type JSONLWriter
- type SourceMetadata
- type Table
- type TableFactory
- func (f *TableFactory) DescribeFormats(customFormatConfigs []*proto.FormatData) (presetDescriptions, customFormatDescriptions formats.FormatDescriptionMap, ...)
- func (f *TableFactory) GetCollector(req *types.CollectRequest) (Collector, error)
- func (f *TableFactory) GetCollectorMap() map[string]func() Collector
- func (f *TableFactory) GetFormat(formatConfig *types.FormatConfigData) (formats.Format, error)
- func (f *TableFactory) GetSchema() (schema.SchemaMap, error)
- func (f *TableFactory) Initialized() bool
- type TableOption
Constants ¶
const JSONLChunkSize = 10000
JSONLChunkSize the number of rows to write in each JSONL file - make the same size as duck db uses to infer schema (10000)
Variables ¶
var Factory = newTableFactory()
Factory is a global TableFactory instance
Functions ¶
func ExecutionIdToFileName ¶
ExecutionIdToFileName convert an execution id and chunk number to a filename assuming a convention of <executionId>-<chunkNumber>.jsonl
func FileNameToExecutionId ¶
FileNameToExecutionId convert a filename to an execution id assuming a convention of <executionId>-<chunkNumber>.jsonl
func GetReadCsvChunkQueryFormat ¶
func GetReadCsvChunkQueryFormat(sourceFile string, opts ...CsvToJsonOpts) string
func RegisterCustomTable ¶ added in v0.2.0
func RegisterCustomTable[T CustomTable](opts ...TableOption)
RegisterCustomTable registers a custom table type with optional configuration
func RegisterFormat ¶ added in v0.2.0
func RegisterTable ¶
RegisterTable registers a collector constructor with the factory this is called from the package init function of the table implementation
func TestIdentifier ¶ added in v0.2.0
func TestSchema ¶ added in v0.2.0
Types ¶
type ArtifactConversionCollector ¶
type ArtifactConversionCollector struct {
observable.ObservableImpl
// the source format
//formatData *proto.ConfigData
// the table config
Format parse.Config
// contains filtered or unexported fields
}
ArtifactConversionCollector is a collector that converts artifacts directly to JSONL S is the table config type
func NewArtifactConversionCollector ¶
func NewArtifactConversionCollector(Table[*types.DynamicRow]) *ArtifactConversionCollector
func (*ArtifactConversionCollector) Collect ¶
Collect executes the collection process. Tell our source to start collection
func (*ArtifactConversionCollector) GetFromTime ¶
func (c *ArtifactConversionCollector) GetFromTime() *row_source.ResolvedFromTime
func (*ArtifactConversionCollector) GetSchema ¶
func (c *ArtifactConversionCollector) GetSchema() *schema.TableSchema
GetSchema returns the schema of the table if available for dynamic tables, the schema is only available at this if the config contains a schema
func (*ArtifactConversionCollector) Identifier ¶
func (c *ArtifactConversionCollector) Identifier() string
func (*ArtifactConversionCollector) Init ¶
func (c *ArtifactConversionCollector) Init(ctx context.Context, req *types.CollectRequest) error
func (*ArtifactConversionCollector) Notify ¶
Notify implements observable.Observer it handles all events which collectorFuncMap may receive (these will all come from the source)
func (*ArtifactConversionCollector) OnChunk ¶
func (c *ArtifactConversionCollector) OnChunk(ctx context.Context, chunkNumber int, collectionState json.RawMessage) error
OnChunk is called by the we have written a chunk of enriched rows to a JSONL/CSV file notify observers of the chunk
func (*ArtifactConversionCollector) UpdateCollectionState ¶
func (c *ArtifactConversionCollector) UpdateCollectionState(ctx context.Context, request *types.CollectRequest) error
type ArtifactToJsonConverter ¶
type ArtifactToJsonConverterImpl ¶
func (*ArtifactToJsonConverterImpl[S]) EnrichRow ¶
func (c *ArtifactToJsonConverterImpl[S]) EnrichRow(_ *types.DynamicRow, _ S, _ schema.SourceEnrichment) (*types.DynamicRow, error)
func (*ArtifactToJsonConverterImpl[S]) GetSourceMetadata ¶
func (c *ArtifactToJsonConverterImpl[S]) GetSourceMetadata(_ S) []*SourceMetadata[*types.DynamicRow]
type ChunkWriter ¶
func NewJSONLWriter ¶
func NewJSONLWriter(destPath string) ChunkWriter
type Collector ¶
type Collector interface {
observable.Observable
Init(ctx context.Context, request *types.CollectRequest) error
Identifier() string
Collect(context.Context) (int, int, error)
GetSchema() *schema.TableSchema
GetFromTime() *row_source.ResolvedFromTime
}
Collector is an interface which provides a methods for collecting table data from a source This is implemented by the generic CollectorImpl struct
type CollectorImpl ¶
type CollectorImpl[R types.RowStruct] struct { observable.ObservableImpl Table Table[R] // contains filtered or unexported fields }
CollectorImpl is a generic implementation of the Collector interface it is responsible for coordinating the collection process and reporting status R is the type of the row struct S is the type of the partition config T is the type of the table U is the type of the connection
func NewCollectorImpl ¶ added in v0.2.0
func NewCollectorImpl[R types.RowStruct](table Table[R]) *CollectorImpl[R]
func (*CollectorImpl[R]) Collect ¶
Collect executes the collection process. Tell our source to start collection
func (*CollectorImpl[S]) GetFromTime ¶
func (c *CollectorImpl[S]) GetFromTime() *row_source.ResolvedFromTime
GetFromTime returns the 'resolved' from time of the source
func (*CollectorImpl[R]) GetSchema ¶
func (c *CollectorImpl[R]) GetSchema() *schema.TableSchema
GetSchema returns the schema of the table
func (*CollectorImpl[R]) Identifier ¶
func (c *CollectorImpl[R]) Identifier() string
func (*CollectorImpl[R]) Init ¶
func (c *CollectorImpl[R]) Init(ctx context.Context, req *types.CollectRequest) error
func (*CollectorImpl[R]) Notify ¶
Notify implements observable.Observer it receives events from the source it handles ONLY Row and Error events
func (*CollectorImpl[R]) OnChunk ¶
func (c *CollectorImpl[R]) OnChunk(ctx context.Context, chunkNumber int) error
OnChunk is called by the we have written a chunk of enriched rows to a JSONL/CSV file notify observers of the chunk
func (*CollectorImpl[R]) WriteRemainingRows ¶
type CsvHeaderMode ¶
type CsvHeaderMode string
const ( CsvHeaderModeAuto CsvHeaderMode = "auto" CsvHeaderModeOff CsvHeaderMode = "off" CsvHeaderModeOn CsvHeaderMode = "on" )
type CsvTableConfig ¶
type CsvTableConfig struct {
HeaderMode CsvHeaderMode
Delimiter *string
Comment *string
Schema *schema.TableSchema
Mappings map[string]string
}
type CsvToJsonOpts ¶
type CsvToJsonOpts func(*CsvTableConfig)
func WithCsvComment ¶
func WithCsvComment(comment string) CsvToJsonOpts
func WithCsvDelimiter ¶
func WithCsvDelimiter(delimiter string) CsvToJsonOpts
func WithCsvHeaderMode ¶
func WithCsvHeaderMode(headerMode CsvHeaderMode) CsvToJsonOpts
func WithCsvSchema ¶
func WithCsvSchema(schema *schema.TableSchema) CsvToJsonOpts
func WithMappings ¶
func WithMappings(mappings map[string]string) CsvToJsonOpts
type CustomTable ¶ added in v0.2.0
type CustomTable interface {
Table[*types.DynamicRow]
GetSchema() *schema.TableSchema
Initialize(formats.Format, *schema.TableSchema)
GetSupportedFormats() *formats.SupportedFormats
GetTableDefinition() *schema.TableSchema
}
CustomTable is a generic interface representing a plugin table definition with a format
type CustomTableImpl ¶ added in v0.2.0
type CustomTableImpl struct {
Schema *schema.TableSchema
// the format
Format formats.Format
}
CustomTableImpl is a generic struct representing a plugin table definition with a format
func (*CustomTableImpl) EnrichRow ¶ added in v0.2.0
func (c *CustomTableImpl) EnrichRow(row *types.DynamicRow, sourceEnrichmentFields schema.SourceEnrichment) (*types.DynamicRow, error)
func (*CustomTableImpl) GetSchema ¶ added in v0.2.0
func (c *CustomTableImpl) GetSchema() *schema.TableSchema
GetSchema implements the CustomTable interface
func (*CustomTableImpl) Initialize ¶ added in v0.2.0
func (c *CustomTableImpl) Initialize(format formats.Format, customTableSchema *schema.TableSchema)
Initialize sets the format and schema for the table
type JSONLWriter ¶
type JSONLWriter struct {
// contains filtered or unexported fields
}
JSONLWriter implements ChunkWriter and writes rows to JSONL files
func (JSONLWriter) WriteChunk ¶
type SourceMetadata ¶
type SourceMetadata[R types.RowStruct] struct { SourceName string Mapper mappers.Mapper[R] Options []row_source.RowSourceOption }
type Table ¶
type Table[R types.RowStruct] interface { // Identifier returns the table name Identifier() string // GetSourceMetadata returns the supported sources for the table GetSourceMetadata() ([]*SourceMetadata[R], error) // EnrichRow is called to enrich the row with common (tp_*) fields EnrichRow(R, schema.SourceEnrichment) (R, error) }
Table is a generic interface representing a plugin table definition R is the row struct type
type TableFactory ¶
type TableFactory struct {
// contains filtered or unexported fields
}
func (*TableFactory) DescribeFormats ¶ added in v0.2.0
func (f *TableFactory) DescribeFormats(customFormatConfigs []*proto.FormatData) (presetDescriptions, customFormatDescriptions formats.FormatDescriptionMap, formatTypes []string, err error)
DescribeFormats returns a map of format instances -
func (*TableFactory) GetCollector ¶
func (f *TableFactory) GetCollector(req *types.CollectRequest) (Collector, error)
func (*TableFactory) GetCollectorMap ¶ added in v0.2.0
func (f *TableFactory) GetCollectorMap() map[string]func() Collector
func (*TableFactory) GetFormat ¶
func (f *TableFactory) GetFormat(formatConfig *types.FormatConfigData) (formats.Format, error)
GetFormat returns a format instance for the given format config
func (*TableFactory) Initialized ¶
func (f *TableFactory) Initialized() bool
type TableOption ¶ added in v0.2.0
type TableOption func(*tableConfig)
TableOption is an option function passed to the table factory
func WithName ¶ added in v0.2.0
func WithName(name string) TableOption