table

package
v0.1.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2025 License: Apache-2.0 Imports: 28 Imported by: 36

Documentation

Index

Constants

View Source
const JSONLChunkSize = 10000

JSONLChunkSize the number of rows to write in each JSONL file - make the same size as duck db uses to infer schema (10000)

Variables

View Source
var Factory = newTableFactory()

Factory is a global TableFactory instance

Functions

func ExecutionIdToFileName

func ExecutionIdToFileName(executionId string, chunkNumber int) string

ExecutionIdToFileName convert an execution id and chunk number to a filename assuming a convention of <executionId>-<chunkNumber>.jsonl

func FileNameToExecutionId

func FileNameToExecutionId(filename string) (string, int, error)

FileNameToExecutionId convert a filename to an execution id assuming a convention of <executionId>-<chunkNumber>.jsonl

func GetReadCsvChunkQueryFormat

func GetReadCsvChunkQueryFormat(sourceFile string, opts ...CsvToJsonOpts) string

func RegisterCollector

func RegisterCollector(collectorFunc func() Collector)

RegisterCollector registers a collector constructor directly this is only used if we need to specify a custom collector (used for custom tables)

func RegisterTable

func RegisterTable[R types.RowStruct, T Table[R]]()

RegisterTable registers a collector constructor with the factory this is called from the package init function of the table implementation

func RegisterTableFormat

func RegisterTableFormat[R types.RowStruct, S parse.Config, T TableWithFormat[R, S]]()

RegisterTableFormat registers a collector constructor for a table which supports Format this is called from the package init function of the table implementation

Types

type ArtifactConversionCollector

type ArtifactConversionCollector[S parse.Config] struct {
	observable.ObservableImpl

	// the table config
	Format S
	// contains filtered or unexported fields
}

ArtifactConversionCollector is a collector that converts artifacts directly to JSONL S is the table config type

func NewArtifactConversionCollector

func NewArtifactConversionCollector[S parse.Config](tableName string, formatData *proto.ConfigData) *ArtifactConversionCollector[S]

func (*ArtifactConversionCollector[S]) Collect

func (c *ArtifactConversionCollector[S]) Collect(ctx context.Context) (int, int, error)

Collect executes the collection process. Tell our source to start collection

func (*ArtifactConversionCollector[S]) GetFromTime

func (*ArtifactConversionCollector[S]) GetSchema

func (c *ArtifactConversionCollector[S]) GetSchema() (*schema.RowSchema, error)

GetSchema returns the schema of the table if available for dynamic tables, the schema is only available at this if the config contains a schema

func (*ArtifactConversionCollector[S]) Identifier

func (c *ArtifactConversionCollector[S]) Identifier() string

func (*ArtifactConversionCollector[S]) Init

func (*ArtifactConversionCollector[S]) Notify

func (c *ArtifactConversionCollector[S]) Notify(ctx context.Context, event events.Event) error

Notify implements observable.Observer it handles all events which collectorFuncMap may receive (these will all come from the source)

func (*ArtifactConversionCollector[S]) OnChunk

func (c *ArtifactConversionCollector[S]) OnChunk(ctx context.Context, chunkNumber int, collectionState json.RawMessage) error

OnChunk is called by the we have written a chunk of enriched rows to a JSONL/CSV file notify observers of the chunk

func (*ArtifactConversionCollector[S]) UpdateCollectionState

func (c *ArtifactConversionCollector[S]) UpdateCollectionState(ctx context.Context, request *types.CollectRequest) error

type ArtifactToJsonConverter

type ArtifactToJsonConverter[S parse.Config] interface {
	GetArtifactConversionQuery(string, string, S) string
	ArtifactToJSON(context.Context, string, string, int, S) (int, int, error)
}

type ArtifactToJsonConverterImpl

type ArtifactToJsonConverterImpl[S parse.Config] struct {
}

func (*ArtifactToJsonConverterImpl[S]) EnrichRow

func (*ArtifactToJsonConverterImpl[S]) GetSourceMetadata

func (c *ArtifactToJsonConverterImpl[S]) GetSourceMetadata(_ S) []*SourceMetadata[*DynamicRow]

type ChunkWriter

type ChunkWriter interface {
	WriteChunk(ctx context.Context, rows []any, chunkNumber int) error
}

func NewJSONLWriter

func NewJSONLWriter(destPath string) ChunkWriter

type Collector

type Collector interface {
	observable.Observable

	Init(ctx context.Context, request *types.CollectRequest) error
	Identifier() string
	Collect(context.Context) (int, int, error)
	GetSchema() (*schema.RowSchema, error)
	GetFromTime() *row_source.ResolvedFromTime
}

Collector is an interface which provides a methods for collecting table data from a source This is implemented by the generic CollectorImpl struct

type CollectorImpl

type CollectorImpl[R types.RowStruct] struct {
	observable.ObservableImpl

	Table Table[R]
	// contains filtered or unexported fields
}

CollectorImpl is a generic implementation of the Collector interface it is responsible for coordinating the collection process and reporting status R is the type of the row struct S is the type of the partition config T is the type of the table U is the type of the connection

func (*CollectorImpl[R]) Collect

func (c *CollectorImpl[R]) Collect(ctx context.Context) (int, int, error)

Collect executes the collection process. Tell our source to start collection

func (*CollectorImpl[S]) GetFromTime

func (c *CollectorImpl[S]) GetFromTime() *row_source.ResolvedFromTime

GetFromTime returns the 'resolved' from time of the source

func (*CollectorImpl[R]) GetSchema

func (c *CollectorImpl[R]) GetSchema() (*schema.RowSchema, error)

GetSchema returns the schema of the table

func (*CollectorImpl[R]) Identifier

func (c *CollectorImpl[R]) Identifier() string

func (*CollectorImpl[R]) Init

func (c *CollectorImpl[R]) Init(ctx context.Context, req *types.CollectRequest) error

func (*CollectorImpl[R]) Notify

func (c *CollectorImpl[R]) Notify(ctx context.Context, event events.Event) error

Notify implements observable.Observer it receives events from the source it handles ONLY Row and Error events

func (*CollectorImpl[R]) OnChunk

func (c *CollectorImpl[R]) OnChunk(ctx context.Context, chunkNumber int) error

OnChunk is called by the we have written a chunk of enriched rows to a JSONL/CSV file notify observers of the chunk

func (*CollectorImpl[R]) WriteRemainingRows

func (c *CollectorImpl[R]) WriteRemainingRows(ctx context.Context, executionId string) (int, int, error)

type CollectorWithFormat

type CollectorWithFormat[R types.RowStruct, S parse.Config] struct {
	CollectorImpl[R]

	// shadow the table field from the base collector, so we store it as a TableWithFormat,
	//to avoid the need for a type assertion
	Table TableWithFormat[R, S]
	// the table format
	Format S
}

CollectorWithFormat is a collector that has a table format The format is parsed from the source format config

func NewCollectorWithFormat

func NewCollectorWithFormat[R types.RowStruct, S parse.Config, T TableWithFormat[R, S]]() *CollectorWithFormat[R, S]

func (*CollectorWithFormat[R, S]) Init

func (c *CollectorWithFormat[R, S]) Init(ctx context.Context, req *types.CollectRequest) error

type CsvHeaderMode

type CsvHeaderMode string
const (
	CsvHeaderModeAuto CsvHeaderMode = "auto"
	CsvHeaderModeOff  CsvHeaderMode = "off"
	CsvHeaderModeOn   CsvHeaderMode = "on"
)

type CsvTableConfig

type CsvTableConfig struct {
	HeaderMode CsvHeaderMode
	Delimiter  *string
	Comment    *string
	Schema     *schema.RowSchema
	Mappings   map[string]string
}

type CsvToJsonOpts

type CsvToJsonOpts func(*CsvTableConfig)

func WithCsvComment

func WithCsvComment(comment string) CsvToJsonOpts

func WithCsvDelimiter

func WithCsvDelimiter(delimiter string) CsvToJsonOpts

func WithCsvHeaderMode

func WithCsvHeaderMode(headerMode CsvHeaderMode) CsvToJsonOpts

func WithCsvSchema

func WithCsvSchema(schema *schema.RowSchema) CsvToJsonOpts

func WithMappings

func WithMappings(mappings map[string]string) CsvToJsonOpts

type DynamicRow

type DynamicRow struct {
	// dynamic columns
	Columns map[string]string
}

func NewDynamicRow

func NewDynamicRow() *DynamicRow

func (*DynamicRow) Enrich

func (l *DynamicRow) Enrich(fields schema.CommonFields)

Enrich uses the provided mappings to populate the common fields from mapped column values

func (*DynamicRow) GetCommonFields

func (l *DynamicRow) GetCommonFields() schema.CommonFields

func (*DynamicRow) InitialiseFromMap

func (l *DynamicRow) InitialiseFromMap(m map[string]string) error

InitialiseFromMap initializes the struct from a map of string values

func (*DynamicRow) MarshalJSON

func (l *DynamicRow) MarshalJSON() ([]byte, error)

MarshalJSON overrides JSON serialization to include the dynamic columns

func (*DynamicRow) ResolveSchema

func (l *DynamicRow) ResolveSchema(customTable *types.Table) (*schema.RowSchema, error)

ResolveSchema returns the (potentially partial) schema for the dynamic row - this will be used for the JSONL-parquet conversion

func (*DynamicRow) Validate

func (l *DynamicRow) Validate() error

type JSONLWriter

type JSONLWriter struct {
	// contains filtered or unexported fields
}

JSONLWriter implements ChunkWriter and writes rows to JSONL files

func (JSONLWriter) WriteChunk

func (j JSONLWriter) WriteChunk(ctx context.Context, rows []any, chunkNumber int) error

type MapInitialisedRow

type MapInitialisedRow interface {
	types.RowStruct
	InitialiseFromMap(m map[string]string) error
}

MapInitialisedRow is an interface which provides a means to initialise a row struct from a string map this is used in combination with the GonxMapper/GrokMapper

type MapOption

type MapOption[R types.RowStruct] func(Mapper[R])

func WithSchema

func WithSchema[R types.RowStruct](schema *schema.RowSchema) MapOption[R]

WithSchema is a MapOption which sets the schema on a Mapper

type Mapper

type Mapper[R types.RowStruct] interface {
	Identifier() string
	// Map converts raw rows to the desired format (type 'R')
	Map(context.Context, any, ...MapOption[R]) (R, error)
}

Mapper is a generic interface which provides a method for mapping raw source data into row structs R is the type of the row struct which the mapperFunc outputs

type SchemaSetter

type SchemaSetter interface {
	SetSchema(*schema.RowSchema)
}

SchemaSetter is an interface which provides a method to set the schema

type SourceMetadata

type SourceMetadata[R types.RowStruct] struct {
	SourceName string

	Mapper  Mapper[R]
	Options []row_source.RowSourceOption
}

type Table

type Table[R types.RowStruct] interface {
	// Identifier must return the collection name
	Identifier() string

	// GetSourceMetadata returns the supported sources for the table
	GetSourceMetadata() []*SourceMetadata[R]
	// EnrichRow is called to enrich the row with common (tp_*) fields
	EnrichRow(R, schema.SourceEnrichment) (R, error)
}

Table is a generic interface representing a plugin table definition R is the row struct type

type TableFactory

type TableFactory struct {
	// contains filtered or unexported fields
}

func (*TableFactory) GetCollector

func (f *TableFactory) GetCollector(req *types.CollectRequest) (Collector, error)

func (*TableFactory) GetPartitions

func (f *TableFactory) GetPartitions() map[string]func() Collector

func (*TableFactory) GetSchema

func (f *TableFactory) GetSchema() (schema.SchemaMap, error)

func (*TableFactory) Init

func (f *TableFactory) Init() (err error)

Init builds the map of table constructors and schemas

func (*TableFactory) Initialized

func (f *TableFactory) Initialized() bool

type TableWithFormat

type TableWithFormat[R types.RowStruct, S parse.Config] interface {
	Table[R]
	SetFormat(S)
}

TableWithFormat is a generic interface representing a plugin table definition with a format

type TableWithFormatImpl

type TableWithFormatImpl[S parse.Config] struct {
	Format S
}

TableWithFormatImpl is a generic struct representing a plugin table definition with a format

func (*TableWithFormatImpl[S]) SetFormat

func (c *TableWithFormatImpl[S]) SetFormat(format S)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL