layer

package
v0.0.0-...-de9adca Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TableName      = "table_name"
	FlushThreshold = "flush_threshold"
	AppendMode     = "append_mode"
	SinceColumn    = "since_column"
	EntityColumn   = "entity_column"
	SinceTable     = "since_table"
	SinceDatatype  = "since_datatype"
	DataQuery      = "data_query"
)

Variables

View Source
var (
	ErrQuery = func(err error) common.LayerError {
		return common.Errorf(common.LayerErrorInternal, "failed to query database. %w", err)
	}
	ErrNotSupported    = common.Errorf(common.LayerErrorInternal, "operation not supported in this layer")
	ErrDatasetNotFound = func(datasetName string) common.LayerError {
		return common.Errorf(common.LayerErrorBadParameter, "dataset %s not found", datasetName)
	}
	ErrConnection = func(e error) common.LayerError {
		return common.Errorf(common.LayerErrorInternal, "database connection error. %w", e)
	}
	ErrBatchSizeMismatch = func(observed, expected int) common.LayerError {
		return common.Errorf(common.LayerErrorInternal, "batch size mismatch. rows affected: %d, expected: %d", observed, expected)
	}
	ErrGeneric = func(msg string, extra ...any) common.LayerError {
		return common.Errorf(common.LayerErrorInternal, fmt.Sprintf(msg, extra...))
	}
)

Functions

func EnrichConfig

func EnrichConfig(config *common.Config) error

func NewPgsqlDataLayer

func NewPgsqlDataLayer(conf *common.Config, logger common.Logger, metrics common.Metrics) (common.DataLayerService, error)

Types

type Dataset

type Dataset struct {
	// contains filtered or unexported fields
}

func (*Dataset) Changes

func (d *Dataset) Changes(since string, limit int, latestOnly bool) (cdl.EntityIterator, cdl.LayerError)

func (*Dataset) Entities

func (d *Dataset) Entities(from string, limit int) (cdl.EntityIterator, cdl.LayerError)

func (*Dataset) FullSync

func (d *Dataset) FullSync(ctx context.Context, batchInfo common.BatchInfo) (common.DatasetWriter, common.LayerError)

func (*Dataset) Incremental

func (d *Dataset) Incremental(ctx context.Context) (common.DatasetWriter, common.LayerError)

func (*Dataset) MetaData

func (d *Dataset) MetaData() map[string]any

func (*Dataset) Name

func (d *Dataset) Name() string

type PgsqlConf

type PgsqlConf struct {
	Hostname string `json:"host"`
	Port     string `json:"port"`
	Database string `json:"database"`
	User     string `json:"user"`
	Password string `json:"password"`
	Schema   string `json:"schema"`
}

type PgsqlDatalayer

type PgsqlDatalayer struct {
	// contains filtered or unexported fields
}

func (*PgsqlDatalayer) Dataset

func (dl *PgsqlDatalayer) Dataset(dataset string) (common.Dataset, common.LayerError)

func (*PgsqlDatalayer) DatasetDescriptions

func (dl *PgsqlDatalayer) DatasetDescriptions() []*common.DatasetDescription

func (*PgsqlDatalayer) Stop

func (dl *PgsqlDatalayer) Stop(ctx context.Context) error

func (*PgsqlDatalayer) UpdateConfiguration

func (dl *PgsqlDatalayer) UpdateConfiguration(config *cdl.Config) cdl.LayerError

type PgsqlWriter

type PgsqlWriter struct {
	// contains filtered or unexported fields
}

func (*PgsqlWriter) Close

func (o *PgsqlWriter) Close() common.LayerError

func (*PgsqlWriter) Write

func (o *PgsqlWriter) Write(entity *egdm.Entity) common.LayerError

type RowItem

type RowItem struct {
	Map     map[string]any
	Columns []string
	Values  []any
	// contains filtered or unexported fields
}

func (*RowItem) GetPropertyNames

func (r *RowItem) GetPropertyNames() []string

func (*RowItem) GetValue

func (r *RowItem) GetValue(name string) any

func (*RowItem) NativeItem

func (r *RowItem) NativeItem() any

func (*RowItem) SetValue

func (r *RowItem) SetValue(name string, value any)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL