Documentation
¶
Index ¶
- Constants
- Variables
- func EnrichConfig(config *common.Config) error
- func NewPgsqlDataLayer(conf *common.Config, logger common.Logger, metrics common.Metrics) (common.DataLayerService, error)
- type Dataset
- func (d *Dataset) Changes(since string, limit int, latestOnly bool) (cdl.EntityIterator, cdl.LayerError)
- func (d *Dataset) Entities(from string, limit int) (cdl.EntityIterator, cdl.LayerError)
- func (d *Dataset) FullSync(ctx context.Context, batchInfo common.BatchInfo) (common.DatasetWriter, common.LayerError)
- func (d *Dataset) Incremental(ctx context.Context) (common.DatasetWriter, common.LayerError)
- func (d *Dataset) MetaData() map[string]any
- func (d *Dataset) Name() string
- type PgsqlConf
- type PgsqlDatalayer
- func (dl *PgsqlDatalayer) Dataset(dataset string) (common.Dataset, common.LayerError)
- func (dl *PgsqlDatalayer) DatasetDescriptions() []*common.DatasetDescription
- func (dl *PgsqlDatalayer) Stop(ctx context.Context) error
- func (dl *PgsqlDatalayer) UpdateConfiguration(config *cdl.Config) cdl.LayerError
- type PgsqlWriter
- type RowItem
Constants ¶
View Source
const ( TableName = "table_name" FlushThreshold = "flush_threshold" AppendMode = "append_mode" SinceColumn = "since_column" EntityColumn = "entity_column" SinceTable = "since_table" SinceDatatype = "since_datatype" DataQuery = "data_query" )
Variables ¶
View Source
var ( ErrQuery = func(err error) common.LayerError { return common.Errorf(common.LayerErrorInternal, "failed to query database. %w", err) } ErrNotSupported = common.Errorf(common.LayerErrorInternal, "operation not supported in this layer") ErrDatasetNotFound = func(datasetName string) common.LayerError { return common.Errorf(common.LayerErrorBadParameter, "dataset %s not found", datasetName) } ErrConnection = func(e error) common.LayerError { return common.Errorf(common.LayerErrorInternal, "database connection error. %w", e) } ErrBatchSizeMismatch = func(observed, expected int) common.LayerError { return common.Errorf(common.LayerErrorInternal, "batch size mismatch. rows affected: %d, expected: %d", observed, expected) } ErrGeneric = func(msg string, extra ...any) common.LayerError { return common.Errorf(common.LayerErrorInternal, fmt.Sprintf(msg, extra...)) } )
Functions ¶
func EnrichConfig ¶
Types ¶
type Dataset ¶
type Dataset struct {
// contains filtered or unexported fields
}
func (*Dataset) Changes ¶
func (d *Dataset) Changes(since string, limit int, latestOnly bool) (cdl.EntityIterator, cdl.LayerError)
func (*Dataset) Entities ¶
func (d *Dataset) Entities(from string, limit int) (cdl.EntityIterator, cdl.LayerError)
func (*Dataset) FullSync ¶
func (d *Dataset) FullSync(ctx context.Context, batchInfo common.BatchInfo) (common.DatasetWriter, common.LayerError)
func (*Dataset) Incremental ¶
func (d *Dataset) Incremental(ctx context.Context) (common.DatasetWriter, common.LayerError)
type PgsqlDatalayer ¶
type PgsqlDatalayer struct {
// contains filtered or unexported fields
}
func (*PgsqlDatalayer) Dataset ¶
func (dl *PgsqlDatalayer) Dataset(dataset string) (common.Dataset, common.LayerError)
func (*PgsqlDatalayer) DatasetDescriptions ¶
func (dl *PgsqlDatalayer) DatasetDescriptions() []*common.DatasetDescription
func (*PgsqlDatalayer) UpdateConfiguration ¶
func (dl *PgsqlDatalayer) UpdateConfiguration(config *cdl.Config) cdl.LayerError
type PgsqlWriter ¶
type PgsqlWriter struct {
// contains filtered or unexported fields
}
func (*PgsqlWriter) Close ¶
func (o *PgsqlWriter) Close() common.LayerError
func (*PgsqlWriter) Write ¶
func (o *PgsqlWriter) Write(entity *egdm.Entity) common.LayerError
type RowItem ¶
type RowItem struct {
Map map[string]any
Columns []string
Values []any
// contains filtered or unexported fields
}
func (*RowItem) GetPropertyNames ¶
func (*RowItem) NativeItem ¶
Click to show internal directories.
Click to hide internal directories.