table

package
v0.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2025 License: Apache-2.0 Imports: 37 Imported by: 36

Documentation

Index

Constants

View Source
const JSONLChunkSize = 10000

JSONLChunkSize the number of rows to write in each JSONL file - make the same size as duck db uses to infer schema (10000)

Variables

View Source
var Factory = newTableFactory()

Factory is a global TableFactory instance

Functions

func ExecutionIdToJsonlFileName added in v0.3.0

func ExecutionIdToJsonlFileName(executionId string, chunkNumber int32) string

ExecutionIdToJsonlFileName convert an execution id and chunk number to a filename assuming a convention of <executionId>-<chunkNumber>.jsonl

func FileNameToExecutionId

func FileNameToExecutionId(filename string) (string, int, error)

FileNameToExecutionId convert a filename to an execution id assuming a convention of <executionId>-<chunkNumber>.jsonl

func FormatSupportsDirectConversion added in v0.3.0

func FormatSupportsDirectConversion(formatName string) bool

func RegisterCustomTable added in v0.2.0

func RegisterCustomTable[T CustomTable](opts ...TableOption)

RegisterCustomTable registers a custom table type with optional configuration

func RegisterFormat added in v0.2.0

func RegisterFormat[T formats.Format]()

func RegisterFormatPresets added in v0.2.0

func RegisterFormatPresets(presets ...formats.Format)

func RegisterTable

func RegisterTable[R any, T Table[R]]()

RegisterTable registers a collector constructor with the factory this is called from the package init function of the table implementation

func TestIdentifier added in v0.2.0

func TestIdentifier(t *testing.T, c Collector)

func TestSchema added in v0.2.0

func TestSchema(t *testing.T, c Collector)

func Validate added in v0.2.0

func Validate(t *testing.T, ctor func() Collector)

Types

type ArtifactConversionCollector

type ArtifactConversionCollector struct {
	CollectorImpl[*types.DynamicRow]
	// contains filtered or unexported fields
}

ArtifactConversionCollector is a collector that converts artifacts directly to JSONL S is the table config type

func NewArtifactConversionCollector

func NewArtifactConversionCollector(table CustomTable) *ArtifactConversionCollector

func (*ArtifactConversionCollector) Close added in v0.3.0

func (c *ArtifactConversionCollector) Close()

Close closes the collector and releases any resources

func (*ArtifactConversionCollector) GetSchema

GetSchema returns the schema of the table

func (*ArtifactConversionCollector) Identifier

func (c *ArtifactConversionCollector) Identifier() string

func (*ArtifactConversionCollector) Init

func (*ArtifactConversionCollector) Notify

Notify implements observable.Observer it handles all events which collectorFuncMap may receive (these will all come from the source)

type ArtifactToJsonConverter

type ArtifactToJsonConverter[S parse.Config] interface {
	GetArtifactConversionQuery(string, string, S) string
	ArtifactToJSON(context.Context, string, string, int, S) (int, int, error)
}

type ChunkWriter

type ChunkWriter interface {
	WriteChunk(ctx context.Context, rows []any, chunkNumber int32) error
}

func NewJSONLWriter

func NewJSONLWriter(destPath string) ChunkWriter

type Collector

type Collector interface {
	observable.PausableObservable

	Init(ctx context.Context, request *types.CollectRequest) error
	Identifier() string
	Collect(context.Context) (int64, int32, error)
	GetSchema() (*schema.TableSchema, error)
	GetFromTime() *row_source.ResolvedFromTime
	Close()
}

Collector is an interface which provides a methods for collecting table data from a source This is implemented by the generic CollectorImpl struct

type CollectorImpl

type CollectorImpl[R any] struct {
	observable.PausableObservableImpl
	// contains filtered or unexported fields
}

func (*CollectorImpl[R]) Close added in v0.3.0

func (c *CollectorImpl[R]) Close()

func (*CollectorImpl[R]) Collect

func (c *CollectorImpl[R]) Collect(ctx context.Context) (int64, int32, error)

Collect executes the collection process. Tell our source to start collection

func (*CollectorImpl[R]) GetFromTime

func (c *CollectorImpl[R]) GetFromTime() *row_source.ResolvedFromTime

GetFromTime returns the 'resolved' from time of the source

func (*CollectorImpl[R]) PauseCollection added in v0.5.0

func (c *CollectorImpl[R]) PauseCollection() error

PauseCollection pauses the source and pauses our own event handling

func (*CollectorImpl[R]) ResumeCollection added in v0.5.0

func (c *CollectorImpl[R]) ResumeCollection() error

ResumeCollection resumes the source and resumes our own event handling

type CustomTable added in v0.2.0

type CustomTable interface {
	Table[*types.DynamicRow]
	Initialize(formats.Format, *schema.TableSchema) error
	GetSchema() *schema.TableSchema
	GetCustomSchema() *schema.TableSchema
	GetDefaultFormat() formats.Format
	GetTableDefinition() *schema.TableSchema
	GetFormat() formats.Format
}

CustomTable is an interface representing a plugin table definition with a format

type CustomTableImpl added in v0.2.0

type CustomTableImpl struct {
	// the full schema, i.e. common fields AND any custom schema,
	// defined either in the plugin config or by the predefined custom table
	Schema *schema.TableSchema

	// the format
	Format formats.Format
	// contains filtered or unexported fields
}

CustomTableImpl is a generic struct representing a plugin table definition with a format

func (*CustomTableImpl) EnrichRow added in v0.2.0

func (c *CustomTableImpl) EnrichRow(row *types.DynamicRow, sourceEnrichmentFields schema.SourceEnrichment) (*types.DynamicRow, error)

func (*CustomTableImpl) GetCustomSchema added in v0.8.0

func (c *CustomTableImpl) GetCustomSchema() *schema.TableSchema

GetCustomSchema returns custom schema defined in the plugin config or by the predefined custom table This DOES NOT include common fields

func (*CustomTableImpl) GetDefaultFormat added in v0.2.0

func (c *CustomTableImpl) GetDefaultFormat() formats.Format

func (*CustomTableImpl) GetFormat added in v0.3.0

func (c *CustomTableImpl) GetFormat() formats.Format

func (*CustomTableImpl) GetSchema added in v0.2.0

func (c *CustomTableImpl) GetSchema() *schema.TableSchema

GetSchema implements the CustomTable interface and returns the full schema for the table, which includes common fields

func (*CustomTableImpl) Initialize added in v0.2.0

func (c *CustomTableImpl) Initialize(format formats.Format, customTableSchema *schema.TableSchema) error

Initialize sets the format and schema for the table

type JSONLWriter

type JSONLWriter struct {
	// contains filtered or unexported fields
}

JSONLWriter implements ChunkWriter and writes rows to JSONL files

func (JSONLWriter) WriteChunk

func (j JSONLWriter) WriteChunk(ctx context.Context, rows []any, chunkNumber int32) error

type RowEnrichmentCollector added in v0.3.0

type RowEnrichmentCollector[R any] struct {
	CollectorImpl[R]
	// contains filtered or unexported fields
}

RowEnrichmentCollector is a generic implementation of the Collector interface it is responsible for coordinating the collection process and reporting status R is the type of the row struct

func NewRowEnrichmentCollector added in v0.3.0

func NewRowEnrichmentCollector[R any](table Table[R]) *RowEnrichmentCollector[R]

func (*RowEnrichmentCollector[R]) Collect added in v0.3.0

func (c *RowEnrichmentCollector[R]) Collect(ctx context.Context) (int64, int32, error)

Collect executes the collection process. Tell our source to start collection

func (*RowEnrichmentCollector[R]) GetSchema added in v0.3.0

func (c *RowEnrichmentCollector[R]) GetSchema() (*schema.TableSchema, error)

GetSchema returns the schema of the table

func (*RowEnrichmentCollector[R]) Identifier added in v0.3.0

func (c *RowEnrichmentCollector[R]) Identifier() string

func (*RowEnrichmentCollector[R]) Init added in v0.3.0

func (*RowEnrichmentCollector[R]) Notify added in v0.3.0

func (c *RowEnrichmentCollector[R]) Notify(ctx context.Context, event events.Event) error

Notify implements observable.Observer it receives events from the source it handles ONLY Row and Error events

type SourceMetadata

type SourceMetadata[R any] struct {
	SourceName string

	Mapper  mappers.Mapper[R]
	Options []row_source.RowSourceOption
}

type Table

type Table[R any] interface {
	// Identifier returns the table name
	Identifier() string

	// GetSourceMetadata returns the supported sources for the table
	GetSourceMetadata() ([]*SourceMetadata[R], error)
	// EnrichRow is called to enrich the row with common (tp_*) fields
	EnrichRow(R, schema.SourceEnrichment) (R, error)
}

Table is a generic interface representing a plugin table definition R is the row struct type

type TableFactory

type TableFactory struct {
	// contains filtered or unexported fields
}

func (*TableFactory) DescribeCustomFormats added in v0.2.0

func (f *TableFactory) DescribeCustomFormats(customFormatData []*proto.FormatData) (*types.DescribeResponse, error)

DescribeCustomFormats describes the custom formats which are provided in the request

func (*TableFactory) DescribeFormats added in v0.2.0

func (f *TableFactory) DescribeFormats(customFormatData ...*proto.FormatData) (*types.DescribeResponse, error)

DescribeFormats describes the formats available for the plugin, including provided custom formats

func (*TableFactory) GetCollector

func (f *TableFactory) GetCollector(req *types.CollectRequest) (Collector, error)

func (*TableFactory) GetCollectorMap added in v0.2.0

func (f *TableFactory) GetCollectorMap() map[string]func() Collector

func (*TableFactory) GetSchema

func (f *TableFactory) GetSchema() (schema.SchemaMap, error)

func (*TableFactory) Initialized

func (f *TableFactory) Initialized() bool

type TableOption added in v0.2.0

type TableOption func(*tableConfig)

TableOption is an option function passed to the table factory

func WithName added in v0.2.0

func WithName(name string) TableOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL