dbqcore

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

README

dbqcore

DataBridge Quality Core library is a part of dbqctl.

Documentation

Index

Constants

View Source
const (
	OnFailActionError   = "error"
	OnFailActionWarning = "warn"
)
View Source
const (
	CheckTypeRawQuery = "raw_query"
)
View Source
const (
	Version = "v0.0.6"
)

Variables

This section is empty.

Functions

func GetDbqCoreLibVersion added in v0.0.2

func GetDbqCoreLibVersion() string

Types

type Check

type Check struct {
	ID          string       `yaml:"id"`
	Description string       `yaml:"description,omitempty"` // optional
	OnFail      OnFailAction `yaml:"on_fail,omitempty"`     // optional (error, warn)
	Query       string       `yaml:"query,omitempty"`       // optional raw query
}

type ChecksConfig

type ChecksConfig struct {
	Version     string       `yaml:"version"`
	Validations []Validation `yaml:"validations"`
}

func LoadChecksConfig

func LoadChecksConfig(fileName string) (*ChecksConfig, error)

type ClickhouseDbqConnector

type ClickhouseDbqConnector struct {
	// contains filtered or unexported fields
}

func (*ClickhouseDbqConnector) ImportDatasets

func (c *ClickhouseDbqConnector) ImportDatasets(filter string) ([]string, error)

func (*ClickhouseDbqConnector) Ping

func (c *ClickhouseDbqConnector) Ping() (string, error)

func (*ClickhouseDbqConnector) ProfileDataset

func (c *ClickhouseDbqConnector) ProfileDataset(dataset string, sample bool, maxConcurrent int) (*TableMetrics, error)

func (*ClickhouseDbqConnector) RunCheck

func (c *ClickhouseDbqConnector) RunCheck(check *Check, dataset string, defaultWhere string) (bool, string, error)

type ColumnInfo

type ColumnInfo struct {
	Name     string
	Type     string
	Comment  string
	Position uint
}

type ColumnMetrics

type ColumnMetrics struct {
	ColumnName          string   `json:"col_name"`
	ColumnComment       string   `json:"col_comment"`
	ColumnPosition      uint     `json:"col_position"`
	DataType            string   `json:"data_type"`
	NullCount           uint64   `json:"null_count"`
	BlankCount          *int64   `json:"blank_count,omitempty"`         // string only
	MinValue            *float64 `json:"min_value,omitempty"`           // numeric only
	MaxValue            *float64 `json:"max_value,omitempty"`           // numeric only
	AvgValue            *float64 `json:"avg_value,omitempty"`           // numeric only
	StddevValue         *float64 `json:"stddev_value,omitempty"`        // numeric only (Population StdDev)
	MostFrequentValue   *string  `json:"most_frequent_value,omitempty"` // pointer to handle NULL as most frequent
	ProfilingDurationMs int64    `json:"profiling_duration_ms"`
}

type ConfigDetails

type ConfigDetails struct {
	Host     string `yaml:"host"`
	Port     int    `yaml:"port"`
	Username string `yaml:"username"`
	Password string `yaml:"password"`
	Database string `yaml:"database,omitempty"`
}

type DataSource

type DataSource struct {
	ID            string        `yaml:"id"`
	Type          string        `yaml:"type"`
	Configuration ConfigDetails `yaml:"configuration"`
	Datasets      []string      `yaml:"datasets"`
}

type DbqConfig

type DbqConfig struct {
	Version     string       `yaml:"version"`
	DataSources []DataSource `yaml:"datasources"`
}

type DbqConnector

type DbqConnector interface {
	Ping() (string, error)
	ImportDatasets(filter string) ([]string, error)
	ProfileDataset(dataset string, sample bool, maxConcurrent int) (*TableMetrics, error)
	RunCheck(check *Check, dataset string, defaultWhere string) (bool, string, error)
}

func NewClickhouseDbqConnector

func NewClickhouseDbqConnector(dataSource DataSource, logger *slog.Logger) (DbqConnector, error)

type OnFailAction

type OnFailAction string

type TableMetrics

type TableMetrics struct {
	ProfiledAt          int64                     `json:"profiled_at"`
	TableName           string                    `json:"table_name"`
	DatabaseName        string                    `json:"database_name"`
	TotalRows           uint64                    `json:"total_rows"`
	ColumnsMetrics      map[string]*ColumnMetrics `json:"columns_metrics"`
	RowsSample          []map[string]interface{}  `json:"rows_sample"`
	ProfilingDurationMs int64                     `json:"profiling_duration_ms"`
	DbqErrors           []error                   `json:"_dbq_errors"`
}

type TaskPool added in v0.0.5

type TaskPool struct {
	// contains filtered or unexported fields
}

func NewTaskPool added in v0.0.5

func NewTaskPool(poolSize int, logger *slog.Logger) *TaskPool

func (*TaskPool) Enqueue added in v0.0.5

func (tp *TaskPool) Enqueue(id string, task func() error)

func (*TaskPool) Errors added in v0.0.6

func (tp *TaskPool) Errors() []error

func (*TaskPool) Join added in v0.0.5

func (tp *TaskPool) Join()

type Validation

type Validation struct {
	Dataset string  `yaml:"dataset"`
	Where   string  `yaml:"where,omitempty"` // Optional where clause
	Checks  []Check `yaml:"checks"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL