table

package
v0.2.0-rc.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2025 License: Apache-2.0 Imports: 33 Imported by: 36

Documentation

Index

Constants

View Source
const JSONLChunkSize = 10000

JSONLChunkSize the number of rows to write in each JSONL file - make the same size as duck db uses to infer schema (10000)

Variables

View Source
var Factory = newTableFactory()

Factory is a global TableFactory instance

Functions

func ExecutionIdToFileName

func ExecutionIdToFileName(executionId string, chunkNumber int) string

ExecutionIdToFileName convert an execution id and chunk number to a filename assuming a convention of <executionId>-<chunkNumber>.jsonl

func FileNameToExecutionId

func FileNameToExecutionId(filename string) (string, int, error)

FileNameToExecutionId convert a filename to an execution id assuming a convention of <executionId>-<chunkNumber>.jsonl

func GetReadCsvChunkQueryFormat

func GetReadCsvChunkQueryFormat(sourceFile string, opts ...CsvToJsonOpts) string

func RegisterCustomTable added in v0.2.0

func RegisterCustomTable[T CustomTable](opts ...TableOption)

RegisterCustomTable registers a custom table type with optional configuration

func RegisterFormat added in v0.2.0

func RegisterFormat[T formats.Format]()

func RegisterFormatPresets added in v0.2.0

func RegisterFormatPresets(presets ...formats.Format)

func RegisterTable

func RegisterTable[R types.RowStruct, T Table[R]]()

RegisterTable registers a collector constructor with the factory this is called from the package init function of the table implementation

func TestIdentifier added in v0.2.0

func TestIdentifier(t *testing.T, c Collector)

func TestSchema added in v0.2.0

func TestSchema(t *testing.T, c Collector)

func Validate added in v0.2.0

func Validate(t *testing.T, ctor func() Collector)

Types

type ArtifactConversionCollector

type ArtifactConversionCollector struct {
	observable.ObservableImpl

	// the source format
	//formatData *proto.ConfigData
	// the table config
	Format parse.Config
	// contains filtered or unexported fields
}

ArtifactConversionCollector is a collector that converts artifacts directly to JSONL S is the table config type

func (*ArtifactConversionCollector) Collect

Collect executes the collection process. Tell our source to start collection

func (*ArtifactConversionCollector) GetFromTime

func (*ArtifactConversionCollector) GetSchema

GetSchema returns the schema of the table if available for dynamic tables, the schema is only available at this if the config contains a schema

func (*ArtifactConversionCollector) Identifier

func (c *ArtifactConversionCollector) Identifier() string

func (*ArtifactConversionCollector) Init

func (*ArtifactConversionCollector) Notify

Notify implements observable.Observer it handles all events which collectorFuncMap may receive (these will all come from the source)

func (*ArtifactConversionCollector) OnChunk

func (c *ArtifactConversionCollector) OnChunk(ctx context.Context, chunkNumber int, collectionState json.RawMessage) error

OnChunk is called by the we have written a chunk of enriched rows to a JSONL/CSV file notify observers of the chunk

func (*ArtifactConversionCollector) UpdateCollectionState

func (c *ArtifactConversionCollector) UpdateCollectionState(ctx context.Context, request *types.CollectRequest) error

type ArtifactToJsonConverter

type ArtifactToJsonConverter[S parse.Config] interface {
	GetArtifactConversionQuery(string, string, S) string
	ArtifactToJSON(context.Context, string, string, int, S) (int, int, error)
}

type ArtifactToJsonConverterImpl

type ArtifactToJsonConverterImpl[S parse.Config] struct {
}

func (*ArtifactToJsonConverterImpl[S]) EnrichRow

func (*ArtifactToJsonConverterImpl[S]) GetSourceMetadata

func (c *ArtifactToJsonConverterImpl[S]) GetSourceMetadata(_ S) []*SourceMetadata[*types.DynamicRow]

type ChunkWriter

type ChunkWriter interface {
	WriteChunk(ctx context.Context, rows []any, chunkNumber int) error
}

func NewJSONLWriter

func NewJSONLWriter(destPath string) ChunkWriter

type Collector

type Collector interface {
	observable.Observable

	Init(ctx context.Context, request *types.CollectRequest) error
	Identifier() string
	Collect(context.Context) (int, int, error)
	GetSchema() (*schema.TableSchema, error)
	GetFromTime() *row_source.ResolvedFromTime
}

Collector is an interface which provides a methods for collecting table data from a source This is implemented by the generic CollectorImpl struct

type CollectorImpl

type CollectorImpl[R types.RowStruct] struct {
	observable.ObservableImpl

	Table Table[R]
	// contains filtered or unexported fields
}

CollectorImpl is a generic implementation of the Collector interface it is responsible for coordinating the collection process and reporting status R is the type of the row struct S is the type of the partition config T is the type of the table U is the type of the connection

func NewCollectorImpl added in v0.2.0

func NewCollectorImpl[R types.RowStruct](table Table[R]) *CollectorImpl[R]

func (*CollectorImpl[R]) Collect

func (c *CollectorImpl[R]) Collect(ctx context.Context) (int, int, error)

Collect executes the collection process. Tell our source to start collection

func (*CollectorImpl[S]) GetFromTime

func (c *CollectorImpl[S]) GetFromTime() *row_source.ResolvedFromTime

GetFromTime returns the 'resolved' from time of the source

func (*CollectorImpl[R]) GetSchema

func (c *CollectorImpl[R]) GetSchema() (*schema.TableSchema, error)

GetSchema returns the schema of the table

func (*CollectorImpl[R]) Identifier

func (c *CollectorImpl[R]) Identifier() string

func (*CollectorImpl[R]) Init

func (c *CollectorImpl[R]) Init(ctx context.Context, req *types.CollectRequest) error

func (*CollectorImpl[R]) Notify

func (c *CollectorImpl[R]) Notify(ctx context.Context, event events.Event) error

Notify implements observable.Observer it receives events from the source it handles ONLY Row and Error events

func (*CollectorImpl[R]) OnChunk

func (c *CollectorImpl[R]) OnChunk(ctx context.Context, chunkNumber int) error

OnChunk is called by the we have written a chunk of enriched rows to a JSONL/CSV file notify observers of the chunk

func (*CollectorImpl[R]) WriteRemainingRows

func (c *CollectorImpl[R]) WriteRemainingRows(ctx context.Context, executionId string) (int, int, error)

type CsvHeaderMode

type CsvHeaderMode string

TODO LOWER CASE SQL KEYWORDS

const (
	CsvHeaderModeAuto CsvHeaderMode = "auto"
	CsvHeaderModeOff  CsvHeaderMode = "off"
	CsvHeaderModeOn   CsvHeaderMode = "on"
)

type CsvTableConfig

type CsvTableConfig struct {
	HeaderMode CsvHeaderMode
	Delimiter  *string
	Comment    *string
	Schema     *schema.TableSchema
	Mappings   map[string]string
}

type CsvToJsonOpts

type CsvToJsonOpts func(*CsvTableConfig)

func WithCsvComment

func WithCsvComment(comment string) CsvToJsonOpts

func WithCsvDelimiter

func WithCsvDelimiter(delimiter string) CsvToJsonOpts

func WithCsvHeaderMode

func WithCsvHeaderMode(headerMode CsvHeaderMode) CsvToJsonOpts

func WithCsvSchema

func WithCsvSchema(schema *schema.TableSchema) CsvToJsonOpts

func WithMappings

func WithMappings(mappings map[string]string) CsvToJsonOpts

type CustomTable added in v0.2.0

type CustomTable interface {
	Table[*types.DynamicRow]
	Initialize(formats.Format, *schema.TableSchema)
	GetSchema() (*schema.TableSchema, error)
	GetDefaultFormat() formats.Format
	GetTableDefinition() *schema.TableSchema
}

CustomTable is a generic interface representing a plugin table definition with a format

type CustomTableImpl added in v0.2.0

type CustomTableImpl struct {
	Schema *schema.TableSchema
	// the format
	Format formats.Format
}

CustomTableImpl is a generic struct representing a plugin table definition with a format

func (*CustomTableImpl) EnrichRow added in v0.2.0

func (c *CustomTableImpl) EnrichRow(row *types.DynamicRow, sourceEnrichmentFields schema.SourceEnrichment) (*types.DynamicRow, error)

func (*CustomTableImpl) GetDefaultFormat added in v0.2.0

func (c *CustomTableImpl) GetDefaultFormat() formats.Format

func (*CustomTableImpl) GetSchema added in v0.2.0

func (c *CustomTableImpl) GetSchema() (*schema.TableSchema, error)

GetSchema implements the CustomTable interface

func (*CustomTableImpl) Initialize added in v0.2.0

func (c *CustomTableImpl) Initialize(format formats.Format, customTableSchema *schema.TableSchema)

Initialize sets the format and schema for the table

type JSONLWriter

type JSONLWriter struct {
	// contains filtered or unexported fields
}

JSONLWriter implements ChunkWriter and writes rows to JSONL files

func (JSONLWriter) WriteChunk

func (j JSONLWriter) WriteChunk(ctx context.Context, rows []any, chunkNumber int) error

type SourceMetadata

type SourceMetadata[R types.RowStruct] struct {
	SourceName string

	Mapper  mappers.Mapper[R]
	Options []row_source.RowSourceOption
}

type Table

type Table[R types.RowStruct] interface {
	// Identifier returns the table name
	Identifier() string

	// GetSourceMetadata returns the supported sources for the table
	GetSourceMetadata() ([]*SourceMetadata[R], error)
	// EnrichRow is called to enrich the row with common (tp_*) fields
	EnrichRow(R, schema.SourceEnrichment) (R, error)
}

Table is a generic interface representing a plugin table definition R is the row struct type

type TableFactory

type TableFactory struct {
	// contains filtered or unexported fields
}

func (*TableFactory) DescribeCustomFormats added in v0.2.0

func (f *TableFactory) DescribeCustomFormats(customFormatData []*proto.FormatData) (*types.DescribeResponse, error)

DescribeCustomFormats describes the custom formats which are provided in the request

func (*TableFactory) DescribeFormats added in v0.2.0

func (f *TableFactory) DescribeFormats(customFormatData ...*proto.FormatData) (*types.DescribeResponse, error)

DescribeFormats describes the formats available for the plugin, including provided custom formats

func (*TableFactory) GetCollector

func (f *TableFactory) GetCollector(req *types.CollectRequest) (Collector, error)

func (*TableFactory) GetCollectorMap added in v0.2.0

func (f *TableFactory) GetCollectorMap() map[string]func() Collector

func (*TableFactory) GetSchema

func (f *TableFactory) GetSchema() (schema.SchemaMap, error)

func (*TableFactory) Initialized

func (f *TableFactory) Initialized() bool

type TableOption added in v0.2.0

type TableOption func(*tableConfig)

TableOption is an option function passed to the table factory

func WithName added in v0.2.0

func WithName(name string) TableOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL