Documentation
¶
Index ¶
- Constants
- Variables
- func ExecutionIdToJsonlFileName(executionId string, chunkNumber int32) string
- func FileNameToExecutionId(filename string) (string, int, error)
- func FormatSupportsDirectConversion(formatName string) bool
- func RegisterCustomTable[T CustomTable](opts ...TableOption)
- func RegisterFormat[T formats.Format]()
- func RegisterFormatPresets(presets ...formats.Format)
- func RegisterTable[R any, T Table[R]]()
- func TestIdentifier(t *testing.T, c Collector)
- func TestSchema(t *testing.T, c Collector)
- func Validate(t *testing.T, ctor func() Collector)
- type ArtifactConversionCollector
- func (c *ArtifactConversionCollector) Close()
- func (c *ArtifactConversionCollector) GetSchema() (*schema.TableSchema, error)
- func (c *ArtifactConversionCollector) Identifier() string
- func (c *ArtifactConversionCollector) Init(ctx context.Context, req *types.CollectRequest) error
- func (c *ArtifactConversionCollector) Notify(ctx context.Context, event events.Event) error
- type ArtifactToJsonConverter
- type ChunkWriter
- type Collector
- type CollectorImpl
- type CustomTable
- type CustomTableImpl
- func (c *CustomTableImpl) EnrichRow(row *types.DynamicRow, sourceEnrichmentFields schema.SourceEnrichment) (*types.DynamicRow, error)
- func (c *CustomTableImpl) GetCustomSchema() *schema.TableSchema
- func (c *CustomTableImpl) GetDefaultFormat() formats.Format
- func (c *CustomTableImpl) GetFormat() formats.Format
- func (c *CustomTableImpl) GetSchema() *schema.TableSchema
- func (c *CustomTableImpl) Initialize(format formats.Format, customTableSchema *schema.TableSchema) error
- type JSONLWriter
- type RowEnrichmentCollector
- func (c *RowEnrichmentCollector[R]) Collect(ctx context.Context) (int64, int32, error)
- func (c *RowEnrichmentCollector[R]) GetSchema() (*schema.TableSchema, error)
- func (c *RowEnrichmentCollector[R]) Identifier() string
- func (c *RowEnrichmentCollector[R]) Init(ctx context.Context, req *types.CollectRequest) error
- func (c *RowEnrichmentCollector[R]) Notify(ctx context.Context, event events.Event) error
- type SourceMetadata
- type Table
- type TableFactory
- func (f *TableFactory) DescribeCustomFormats(customFormatData []*proto.FormatData) (*types.DescribeResponse, error)
- func (f *TableFactory) DescribeFormats(customFormatData ...*proto.FormatData) (*types.DescribeResponse, error)
- func (f *TableFactory) GetCollector(req *types.CollectRequest) (Collector, error)
- func (f *TableFactory) GetCollectorMap() map[string]func() Collector
- func (f *TableFactory) GetSchema() (schema.SchemaMap, error)
- func (f *TableFactory) Initialized() bool
- type TableOption
Constants ¶
const JSONLChunkSize = 10000
JSONLChunkSize the number of rows to write in each JSONL file - make the same size as duck db uses to infer schema (10000)
Variables ¶
var Factory = newTableFactory()
Factory is a global TableFactory instance
Functions ¶
func ExecutionIdToJsonlFileName ¶ added in v0.3.0
ExecutionIdToJsonlFileName convert an execution id and chunk number to a filename assuming a convention of <executionId>-<chunkNumber>.jsonl
func FileNameToExecutionId ¶
FileNameToExecutionId convert a filename to an execution id assuming a convention of <executionId>-<chunkNumber>.jsonl
func FormatSupportsDirectConversion ¶ added in v0.3.0
func RegisterCustomTable ¶ added in v0.2.0
func RegisterCustomTable[T CustomTable](opts ...TableOption)
RegisterCustomTable registers a custom table type with optional configuration
func RegisterFormat ¶ added in v0.2.0
func RegisterFormatPresets ¶ added in v0.2.0
func RegisterTable ¶
RegisterTable registers a collector constructor with the factory this is called from the package init function of the table implementation
func TestIdentifier ¶ added in v0.2.0
func TestSchema ¶ added in v0.2.0
Types ¶
type ArtifactConversionCollector ¶
type ArtifactConversionCollector struct {
CollectorImpl[*types.DynamicRow]
// contains filtered or unexported fields
}
ArtifactConversionCollector is a collector that converts artifacts directly to JSONL S is the table config type
func NewArtifactConversionCollector ¶
func NewArtifactConversionCollector(table CustomTable) *ArtifactConversionCollector
func (*ArtifactConversionCollector) Close ¶ added in v0.3.0
func (c *ArtifactConversionCollector) Close()
Close closes the collector and releases any resources
func (*ArtifactConversionCollector) GetSchema ¶
func (c *ArtifactConversionCollector) GetSchema() (*schema.TableSchema, error)
GetSchema returns the schema of the table
func (*ArtifactConversionCollector) Identifier ¶
func (c *ArtifactConversionCollector) Identifier() string
func (*ArtifactConversionCollector) Init ¶
func (c *ArtifactConversionCollector) Init(ctx context.Context, req *types.CollectRequest) error
type ArtifactToJsonConverter ¶
type ChunkWriter ¶
func NewJSONLWriter ¶
func NewJSONLWriter(destPath string) ChunkWriter
type Collector ¶
type Collector interface {
observable.PausableObservable
Init(ctx context.Context, request *types.CollectRequest) error
Identifier() string
Collect(context.Context) (int64, int32, error)
GetSchema() (*schema.TableSchema, error)
GetFromTime() *row_source.ResolvedFromTime
Close()
}
Collector is an interface which provides a methods for collecting table data from a source This is implemented by the generic CollectorImpl struct
type CollectorImpl ¶
type CollectorImpl[R any] struct { observable.PausableObservableImpl // contains filtered or unexported fields }
func (*CollectorImpl[R]) Close ¶ added in v0.3.0
func (c *CollectorImpl[R]) Close()
func (*CollectorImpl[R]) Collect ¶
Collect executes the collection process. Tell our source to start collection
func (*CollectorImpl[R]) GetFromTime ¶
func (c *CollectorImpl[R]) GetFromTime() *row_source.ResolvedFromTime
GetFromTime returns the 'resolved' from time of the source
func (*CollectorImpl[R]) PauseCollection ¶ added in v0.5.0
func (c *CollectorImpl[R]) PauseCollection() error
PauseCollection pauses the source and pauses our own event handling
func (*CollectorImpl[R]) ResumeCollection ¶ added in v0.5.0
func (c *CollectorImpl[R]) ResumeCollection() error
ResumeCollection resumes the source and resumes our own event handling
type CustomTable ¶ added in v0.2.0
type CustomTable interface {
Table[*types.DynamicRow]
Initialize(formats.Format, *schema.TableSchema) error
GetSchema() *schema.TableSchema
GetCustomSchema() *schema.TableSchema
GetDefaultFormat() formats.Format
GetTableDefinition() *schema.TableSchema
GetFormat() formats.Format
}
CustomTable is an interface representing a plugin table definition with a format
type CustomTableImpl ¶ added in v0.2.0
type CustomTableImpl struct {
// the full schema, i.e. common fields AND any custom schema,
// defined either in the plugin config or by the predefined custom table
Schema *schema.TableSchema
// the format
Format formats.Format
// contains filtered or unexported fields
}
CustomTableImpl is a generic struct representing a plugin table definition with a format
func (*CustomTableImpl) EnrichRow ¶ added in v0.2.0
func (c *CustomTableImpl) EnrichRow(row *types.DynamicRow, sourceEnrichmentFields schema.SourceEnrichment) (*types.DynamicRow, error)
func (*CustomTableImpl) GetCustomSchema ¶ added in v0.8.0
func (c *CustomTableImpl) GetCustomSchema() *schema.TableSchema
GetCustomSchema returns custom schema defined in the plugin config or by the predefined custom table This DOES NOT include common fields
func (*CustomTableImpl) GetDefaultFormat ¶ added in v0.2.0
func (c *CustomTableImpl) GetDefaultFormat() formats.Format
func (*CustomTableImpl) GetFormat ¶ added in v0.3.0
func (c *CustomTableImpl) GetFormat() formats.Format
func (*CustomTableImpl) GetSchema ¶ added in v0.2.0
func (c *CustomTableImpl) GetSchema() *schema.TableSchema
GetSchema implements the CustomTable interface and returns the full schema for the table, which includes common fields
func (*CustomTableImpl) Initialize ¶ added in v0.2.0
func (c *CustomTableImpl) Initialize(format formats.Format, customTableSchema *schema.TableSchema) error
Initialize sets the format and schema for the table
type JSONLWriter ¶
type JSONLWriter struct {
// contains filtered or unexported fields
}
JSONLWriter implements ChunkWriter and writes rows to JSONL files
func (JSONLWriter) WriteChunk ¶
type RowEnrichmentCollector ¶ added in v0.3.0
type RowEnrichmentCollector[R any] struct { CollectorImpl[R] // contains filtered or unexported fields }
RowEnrichmentCollector is a generic implementation of the Collector interface it is responsible for coordinating the collection process and reporting status R is the type of the row struct
func NewRowEnrichmentCollector ¶ added in v0.3.0
func NewRowEnrichmentCollector[R any](table Table[R]) *RowEnrichmentCollector[R]
func (*RowEnrichmentCollector[R]) Collect ¶ added in v0.3.0
Collect executes the collection process. Tell our source to start collection
func (*RowEnrichmentCollector[R]) GetSchema ¶ added in v0.3.0
func (c *RowEnrichmentCollector[R]) GetSchema() (*schema.TableSchema, error)
GetSchema returns the schema of the table
func (*RowEnrichmentCollector[R]) Identifier ¶ added in v0.3.0
func (c *RowEnrichmentCollector[R]) Identifier() string
func (*RowEnrichmentCollector[R]) Init ¶ added in v0.3.0
func (c *RowEnrichmentCollector[R]) Init(ctx context.Context, req *types.CollectRequest) error
type SourceMetadata ¶
type SourceMetadata[R any] struct { SourceName string Mapper mappers.Mapper[R] Options []row_source.RowSourceOption }
type Table ¶
type Table[R any] interface { // Identifier returns the table name Identifier() string // GetSourceMetadata returns the supported sources for the table GetSourceMetadata() ([]*SourceMetadata[R], error) // EnrichRow is called to enrich the row with common (tp_*) fields EnrichRow(R, schema.SourceEnrichment) (R, error) }
Table is a generic interface representing a plugin table definition R is the row struct type
type TableFactory ¶
type TableFactory struct {
// contains filtered or unexported fields
}
func (*TableFactory) DescribeCustomFormats ¶ added in v0.2.0
func (f *TableFactory) DescribeCustomFormats(customFormatData []*proto.FormatData) (*types.DescribeResponse, error)
DescribeCustomFormats describes the custom formats which are provided in the request
func (*TableFactory) DescribeFormats ¶ added in v0.2.0
func (f *TableFactory) DescribeFormats(customFormatData ...*proto.FormatData) (*types.DescribeResponse, error)
DescribeFormats describes the formats available for the plugin, including provided custom formats
func (*TableFactory) GetCollector ¶
func (f *TableFactory) GetCollector(req *types.CollectRequest) (Collector, error)
func (*TableFactory) GetCollectorMap ¶ added in v0.2.0
func (f *TableFactory) GetCollectorMap() map[string]func() Collector
func (*TableFactory) Initialized ¶
func (f *TableFactory) Initialized() bool
type TableOption ¶ added in v0.2.0
type TableOption func(*tableConfig)
TableOption is an option function passed to the table factory
func WithName ¶ added in v0.2.0
func WithName(name string) TableOption