Documentation
¶
Index ¶
- func GenerateJSONFile(columns []string, data [][]interface{}) (string, int, error)
- func SlimCondition(maxThread int, minSplitKey, maxSplitKey uint64) [][]uint64
- func SplitCondition(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey uint64) []string
- func SplitConditionAccordingMaxGoRoutine(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey, allMax uint64) <-chan string
- func SplitConditionAccordingToTimeSplitKey(cfg *config.Config, minTimeSplitKey, maxTimeSplitKey string) ([]string, error)
- func SplitTimeConditionsByMaxThread(conditions []string, maxThread int) [][]string
- type DatabendIngesterStatsData
- type DatabendSourceStatsRecorder
- type MysqlSource
- func (s *MysqlSource) AdjustBatchSizeAccordingToSourceDbTable() uint64
- func (s *MysqlSource) DeleteAfterSync() error
- func (s *MysqlSource) GetAllSourceReadRowsCount() (int, error)
- func (s *MysqlSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
- func (s *MysqlSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
- func (s *MysqlSource) GetMinMaxSplitKey() (uint64, uint64, error)
- func (s *MysqlSource) GetMinMaxTimeSplitKey() (string, string, error)
- func (s *MysqlSource) GetSourceReadRowsCount() (int, error)
- func (s *MysqlSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
- func (s *MysqlSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
- type NullUint64
- type OracleSource
- func (p *OracleSource) AdjustBatchSizeAccordingToSourceDbTable() uint64
- func (p *OracleSource) DeleteAfterSync() error
- func (p *OracleSource) GetAllSourceReadRowsCount() (int, error)
- func (p *OracleSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
- func (p *OracleSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
- func (p *OracleSource) GetMinMaxSplitKey() (uint64, uint64, error)
- func (p *OracleSource) GetMinMaxTimeSplitKey() (string, string, error)
- func (p *OracleSource) GetSourceReadRowsCount() (int, error)
- func (p *OracleSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
- func (p *OracleSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
- func (p *OracleSource) SwitchDatabase() error
- type PostgresSource
- func (p *PostgresSource) AdjustBatchSizeAccordingToSourceDbTable() uint64
- func (p *PostgresSource) DeleteAfterSync() error
- func (p *PostgresSource) GetAllSourceReadRowsCount() (int, error)
- func (p *PostgresSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
- func (p *PostgresSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
- func (p *PostgresSource) GetMinMaxSplitKey() (uint64, uint64, error)
- func (p *PostgresSource) GetMinMaxTimeSplitKey() (string, string, error)
- func (p *PostgresSource) GetSourceReadRowsCount() (int, error)
- func (p *PostgresSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
- func (p *PostgresSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
- func (p *PostgresSource) SwitchDatabase() error
- type SQLServerSource
- func (s *SQLServerSource) AdjustBatchSizeAccordingToSourceDbTable() uint64
- func (s *SQLServerSource) DeleteAfterSync() error
- func (s *SQLServerSource) GetAllSourceReadRowsCount() (int, error)
- func (s *SQLServerSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
- func (s *SQLServerSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
- func (s *SQLServerSource) GetMinMaxSplitKey() (uint64, uint64, error)
- func (s *SQLServerSource) GetMinMaxTimeSplitKey() (string, string, error)
- func (s *SQLServerSource) GetSourceReadRowsCount() (int, error)
- func (s *SQLServerSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
- func (s *SQLServerSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
- type Sourcer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GenerateJSONFile ¶
func SlimCondition ¶
func SplitCondition ¶
Types ¶
type DatabendSourceStatsRecorder ¶
type DatabendSourceStatsRecorder struct {
// contains filtered or unexported fields
}
func NewDatabendIntesterStatsRecorder ¶
func NewDatabendIntesterStatsRecorder() *DatabendSourceStatsRecorder
func (*DatabendSourceStatsRecorder) RecordMetric ¶
func (stats *DatabendSourceStatsRecorder) RecordMetric(rows int)
func (*DatabendSourceStatsRecorder) Stats ¶
func (stats *DatabendSourceStatsRecorder) Stats(statsWindow time.Duration) DatabendIngesterStatsData
type MysqlSource ¶
type MysqlSource struct {
// contains filtered or unexported fields
}
func NewMysqlSource ¶
func NewMysqlSource(cfg *config.Config) (*MysqlSource, error)
func (*MysqlSource) AdjustBatchSizeAccordingToSourceDbTable ¶
func (s *MysqlSource) AdjustBatchSizeAccordingToSourceDbTable() uint64
AdjustBatchSizeAccordingToSourceDbTable has a concept called s, s = (maxKey - minKey) / sourceTableRowCount if s == 1 it means the data is uniform in the table, if s is much bigger than 1, it means the data is not uniform in the table
func (*MysqlSource) DeleteAfterSync ¶
func (s *MysqlSource) DeleteAfterSync() error
func (*MysqlSource) GetAllSourceReadRowsCount ¶
func (s *MysqlSource) GetAllSourceReadRowsCount() (int, error)
func (*MysqlSource) GetDatabasesAccordingToSourceDbRegex ¶
func (s *MysqlSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
func (*MysqlSource) GetDbTablesAccordingToSourceDbTables ¶
func (s *MysqlSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
func (*MysqlSource) GetMinMaxSplitKey ¶
func (s *MysqlSource) GetMinMaxSplitKey() (uint64, uint64, error)
func (*MysqlSource) GetMinMaxTimeSplitKey ¶
func (s *MysqlSource) GetMinMaxTimeSplitKey() (string, string, error)
func (*MysqlSource) GetSourceReadRowsCount ¶
func (s *MysqlSource) GetSourceReadRowsCount() (int, error)
func (*MysqlSource) GetTablesAccordingToSourceTableRegex ¶
func (*MysqlSource) QueryTableData ¶
func (s *MysqlSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
type NullUint64 ¶
NullUint64 represents a uint64 that may be null.
func (*NullUint64) Scan ¶
func (n *NullUint64) Scan(value interface{}) error
Scan implements the Scanner interface.
type OracleSource ¶
type OracleSource struct {
// contains filtered or unexported fields
}
func NewOracleSource ¶
func NewOracleSource(cfg *config.Config) (*OracleSource, error)
func (*OracleSource) AdjustBatchSizeAccordingToSourceDbTable ¶
func (p *OracleSource) AdjustBatchSizeAccordingToSourceDbTable() uint64
func (*OracleSource) DeleteAfterSync ¶
func (p *OracleSource) DeleteAfterSync() error
func (*OracleSource) GetAllSourceReadRowsCount ¶
func (p *OracleSource) GetAllSourceReadRowsCount() (int, error)
func (*OracleSource) GetDatabasesAccordingToSourceDbRegex ¶
func (p *OracleSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
func (*OracleSource) GetDbTablesAccordingToSourceDbTables ¶
func (p *OracleSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
func (*OracleSource) GetMinMaxSplitKey ¶
func (p *OracleSource) GetMinMaxSplitKey() (uint64, uint64, error)
func (*OracleSource) GetMinMaxTimeSplitKey ¶
func (p *OracleSource) GetMinMaxTimeSplitKey() (string, string, error)
func (*OracleSource) GetSourceReadRowsCount ¶
func (p *OracleSource) GetSourceReadRowsCount() (int, error)
func (*OracleSource) GetTablesAccordingToSourceTableRegex ¶
func (*OracleSource) QueryTableData ¶
func (p *OracleSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
func (*OracleSource) SwitchDatabase ¶
func (p *OracleSource) SwitchDatabase() error
type PostgresSource ¶
type PostgresSource struct {
// contains filtered or unexported fields
}
func NewPostgresSource ¶
func NewPostgresSource(cfg *config.Config) (*PostgresSource, error)
func (*PostgresSource) AdjustBatchSizeAccordingToSourceDbTable ¶
func (p *PostgresSource) AdjustBatchSizeAccordingToSourceDbTable() uint64
func (*PostgresSource) DeleteAfterSync ¶
func (p *PostgresSource) DeleteAfterSync() error
func (*PostgresSource) GetAllSourceReadRowsCount ¶
func (p *PostgresSource) GetAllSourceReadRowsCount() (int, error)
func (*PostgresSource) GetDatabasesAccordingToSourceDbRegex ¶
func (p *PostgresSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
func (*PostgresSource) GetDbTablesAccordingToSourceDbTables ¶
func (p *PostgresSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
func (*PostgresSource) GetMinMaxSplitKey ¶
func (p *PostgresSource) GetMinMaxSplitKey() (uint64, uint64, error)
func (*PostgresSource) GetMinMaxTimeSplitKey ¶
func (p *PostgresSource) GetMinMaxTimeSplitKey() (string, string, error)
func (*PostgresSource) GetSourceReadRowsCount ¶
func (p *PostgresSource) GetSourceReadRowsCount() (int, error)
func (*PostgresSource) GetTablesAccordingToSourceTableRegex ¶
func (*PostgresSource) QueryTableData ¶
func (p *PostgresSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
func (*PostgresSource) SwitchDatabase ¶
func (p *PostgresSource) SwitchDatabase() error
type SQLServerSource ¶
type SQLServerSource struct {
// contains filtered or unexported fields
}
func NewSqlServerSource ¶
func NewSqlServerSource(cfg *config.Config) (*SQLServerSource, error)
func (*SQLServerSource) AdjustBatchSizeAccordingToSourceDbTable ¶
func (s *SQLServerSource) AdjustBatchSizeAccordingToSourceDbTable() uint64
func (*SQLServerSource) DeleteAfterSync ¶
func (s *SQLServerSource) DeleteAfterSync() error
func (*SQLServerSource) GetAllSourceReadRowsCount ¶
func (s *SQLServerSource) GetAllSourceReadRowsCount() (int, error)
func (*SQLServerSource) GetDatabasesAccordingToSourceDbRegex ¶
func (s *SQLServerSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
func (*SQLServerSource) GetDbTablesAccordingToSourceDbTables ¶
func (s *SQLServerSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
func (*SQLServerSource) GetMinMaxSplitKey ¶
func (s *SQLServerSource) GetMinMaxSplitKey() (uint64, uint64, error)
func (*SQLServerSource) GetMinMaxTimeSplitKey ¶
func (s *SQLServerSource) GetMinMaxTimeSplitKey() (string, string, error)
func (*SQLServerSource) GetSourceReadRowsCount ¶
func (s *SQLServerSource) GetSourceReadRowsCount() (int, error)
func (*SQLServerSource) GetTablesAccordingToSourceTableRegex ¶
func (*SQLServerSource) QueryTableData ¶
func (s *SQLServerSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
type Sourcer ¶
type Sourcer interface {
AdjustBatchSizeAccordingToSourceDbTable() uint64
GetSourceReadRowsCount() (int, error)
GetMinMaxSplitKey() (uint64, uint64, error)
GetMinMaxTimeSplitKey() (string, string, error)
DeleteAfterSync() error
QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
GetAllSourceReadRowsCount() (int, error)
GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
}
Click to show internal directories.
Click to hide internal directories.