source

package
v0.3.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GenerateJSONFile

func GenerateJSONFile(columns []string, data [][]interface{}) (string, int, error)

func SlimCondition

func SlimCondition(maxThread int, minSplitKey, maxSplitKey uint64) [][]uint64

func SplitCondition

func SplitCondition(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey uint64) []string

func SplitConditionAccordingMaxGoRoutine

func SplitConditionAccordingMaxGoRoutine(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey, allMax uint64) <-chan string

func SplitConditionAccordingToTimeSplitKey

func SplitConditionAccordingToTimeSplitKey(cfg *config.Config, minTimeSplitKey, maxTimeSplitKey string) ([]string, error)

func SplitTimeConditionsByMaxThread

func SplitTimeConditionsByMaxThread(conditions []string, maxThread int) [][]string

Types

type DatabendIngesterStatsData

type DatabendIngesterStatsData struct {
	BytesPerSecond float64
	RowsPerSecondd float64
}

type DatabendSourceStatsRecorder

type DatabendSourceStatsRecorder struct {
	// contains filtered or unexported fields
}

func NewDatabendIntesterStatsRecorder

func NewDatabendIntesterStatsRecorder() *DatabendSourceStatsRecorder

func (*DatabendSourceStatsRecorder) RecordMetric

func (stats *DatabendSourceStatsRecorder) RecordMetric(rows int)

func (*DatabendSourceStatsRecorder) Stats

type MysqlSource

type MysqlSource struct {
	// contains filtered or unexported fields
}

func NewMysqlSource

func NewMysqlSource(cfg *config.Config) (*MysqlSource, error)

func (*MysqlSource) AdjustBatchSizeAccordingToSourceDbTable

func (s *MysqlSource) AdjustBatchSizeAccordingToSourceDbTable() uint64

AdjustBatchSizeAccordingToSourceDbTable has a concept called s, s = (maxKey - minKey) / sourceTableRowCount if s == 1 it means the data is uniform in the table, if s is much bigger than 1, it means the data is not uniform in the table

func (*MysqlSource) DeleteAfterSync

func (s *MysqlSource) DeleteAfterSync() error

func (*MysqlSource) GetAllSourceReadRowsCount

func (s *MysqlSource) GetAllSourceReadRowsCount() (int, error)

func (*MysqlSource) GetDatabasesAccordingToSourceDbRegex

func (s *MysqlSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)

func (*MysqlSource) GetDbTablesAccordingToSourceDbTables

func (s *MysqlSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)

func (*MysqlSource) GetMinMaxSplitKey

func (s *MysqlSource) GetMinMaxSplitKey() (uint64, uint64, error)

func (*MysqlSource) GetMinMaxTimeSplitKey

func (s *MysqlSource) GetMinMaxTimeSplitKey() (string, string, error)

func (*MysqlSource) GetSourceReadRowsCount

func (s *MysqlSource) GetSourceReadRowsCount() (int, error)

func (*MysqlSource) GetTablesAccordingToSourceTableRegex

func (s *MysqlSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)

func (*MysqlSource) QueryTableData

func (s *MysqlSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)

type NullUint64

type NullUint64 struct {
	Uint64 uint64
	Valid  bool // Valid is true if Uint64 is not NULL
}

NullUint64 represents a uint64 that may be null.

func (*NullUint64) Scan

func (n *NullUint64) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullUint64) Value

func (n NullUint64) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type OracleSource

type OracleSource struct {
	// contains filtered or unexported fields
}

func NewOracleSource

func NewOracleSource(cfg *config.Config) (*OracleSource, error)

func (*OracleSource) AdjustBatchSizeAccordingToSourceDbTable

func (p *OracleSource) AdjustBatchSizeAccordingToSourceDbTable() uint64

func (*OracleSource) DeleteAfterSync

func (p *OracleSource) DeleteAfterSync() error

func (*OracleSource) GetAllSourceReadRowsCount

func (p *OracleSource) GetAllSourceReadRowsCount() (int, error)

func (*OracleSource) GetDatabasesAccordingToSourceDbRegex

func (p *OracleSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)

func (*OracleSource) GetDbTablesAccordingToSourceDbTables

func (p *OracleSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)

func (*OracleSource) GetMinMaxSplitKey

func (p *OracleSource) GetMinMaxSplitKey() (uint64, uint64, error)

func (*OracleSource) GetMinMaxTimeSplitKey

func (p *OracleSource) GetMinMaxTimeSplitKey() (string, string, error)

func (*OracleSource) GetSourceReadRowsCount

func (p *OracleSource) GetSourceReadRowsCount() (int, error)

func (*OracleSource) GetTablesAccordingToSourceTableRegex

func (p *OracleSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)

func (*OracleSource) QueryTableData

func (p *OracleSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)

func (*OracleSource) SwitchDatabase

func (p *OracleSource) SwitchDatabase() error

type PostgresSource

type PostgresSource struct {
	// contains filtered or unexported fields
}

func NewPostgresSource

func NewPostgresSource(cfg *config.Config) (*PostgresSource, error)

func (*PostgresSource) AdjustBatchSizeAccordingToSourceDbTable

func (p *PostgresSource) AdjustBatchSizeAccordingToSourceDbTable() uint64

func (*PostgresSource) DeleteAfterSync

func (p *PostgresSource) DeleteAfterSync() error

func (*PostgresSource) GetAllSourceReadRowsCount

func (p *PostgresSource) GetAllSourceReadRowsCount() (int, error)

func (*PostgresSource) GetDatabasesAccordingToSourceDbRegex

func (p *PostgresSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)

func (*PostgresSource) GetDbTablesAccordingToSourceDbTables

func (p *PostgresSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)

func (*PostgresSource) GetMinMaxSplitKey

func (p *PostgresSource) GetMinMaxSplitKey() (uint64, uint64, error)

func (*PostgresSource) GetMinMaxTimeSplitKey

func (p *PostgresSource) GetMinMaxTimeSplitKey() (string, string, error)

func (*PostgresSource) GetSourceReadRowsCount

func (p *PostgresSource) GetSourceReadRowsCount() (int, error)

func (*PostgresSource) GetTablesAccordingToSourceTableRegex

func (p *PostgresSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)

func (*PostgresSource) QueryTableData

func (p *PostgresSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)

func (*PostgresSource) SwitchDatabase

func (p *PostgresSource) SwitchDatabase() error

type SQLServerSource

type SQLServerSource struct {
	// contains filtered or unexported fields
}

func NewSqlServerSource

func NewSqlServerSource(cfg *config.Config) (*SQLServerSource, error)

func (*SQLServerSource) AdjustBatchSizeAccordingToSourceDbTable

func (s *SQLServerSource) AdjustBatchSizeAccordingToSourceDbTable() uint64

func (*SQLServerSource) DeleteAfterSync

func (s *SQLServerSource) DeleteAfterSync() error

func (*SQLServerSource) GetAllSourceReadRowsCount

func (s *SQLServerSource) GetAllSourceReadRowsCount() (int, error)

func (*SQLServerSource) GetDatabasesAccordingToSourceDbRegex

func (s *SQLServerSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)

func (*SQLServerSource) GetDbTablesAccordingToSourceDbTables

func (s *SQLServerSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)

func (*SQLServerSource) GetMinMaxSplitKey

func (s *SQLServerSource) GetMinMaxSplitKey() (uint64, uint64, error)

func (*SQLServerSource) GetMinMaxTimeSplitKey

func (s *SQLServerSource) GetMinMaxTimeSplitKey() (string, string, error)

func (*SQLServerSource) GetSourceReadRowsCount

func (s *SQLServerSource) GetSourceReadRowsCount() (int, error)

func (*SQLServerSource) GetTablesAccordingToSourceTableRegex

func (s *SQLServerSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)

func (*SQLServerSource) QueryTableData

func (s *SQLServerSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)

type Sourcer

type Sourcer interface {
	AdjustBatchSizeAccordingToSourceDbTable() uint64
	GetSourceReadRowsCount() (int, error)
	GetMinMaxSplitKey() (uint64, uint64, error)
	GetMinMaxTimeSplitKey() (string, string, error)
	DeleteAfterSync() error
	QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
	GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
	GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
	GetAllSourceReadRowsCount() (int, error)
	GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
}

func NewSource

func NewSource(cfg *config.Config) (Sourcer, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL