Documentation
¶
Index ¶
- func GenerateJSONFile(columns []string, data [][]interface{}) (string, int, error)
- func SlimCondition(maxThread int, minSplitKey, maxSplitKey int64) [][]int64
- func SplitCondition(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey int64) []string
- func SplitConditionAccordingMaxGoRoutine(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey, allMax int64) <-chan string
- func SplitConditionAccordingToTimeSplitKey(cfg *config.Config, minTimeSplitKey, maxTimeSplitKey string) ([]string, error)
- func SplitTimeConditionsByMaxThread(conditions []string, maxThread int) [][]string
- type DatabendIngesterStatsData
- type DatabendSourceStatsRecorder
- type MysqlSource
- func (s *MysqlSource) AdjustBatchSizeAccordingToSourceDbTable() int64
- func (s *MysqlSource) DeleteAfterSync() error
- func (s *MysqlSource) GetAllSourceReadRowsCount() (int, error)
- func (s *MysqlSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
- func (s *MysqlSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
- func (s *MysqlSource) GetMinMaxSplitKey() (int64, int64, error)
- func (s *MysqlSource) GetMinMaxTimeSplitKey() (string, string, error)
- func (s *MysqlSource) GetSourceReadRowsCount() (int, error)
- func (s *MysqlSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
- func (s *MysqlSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
- type PostgresSource
- func (p *PostgresSource) AdjustBatchSizeAccordingToSourceDbTable() int64
- func (p *PostgresSource) DeleteAfterSync() error
- func (p *PostgresSource) GetAllSourceReadRowsCount() (int, error)
- func (p *PostgresSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
- func (p *PostgresSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
- func (p *PostgresSource) GetMinMaxSplitKey() (int64, int64, error)
- func (p *PostgresSource) GetMinMaxTimeSplitKey() (string, string, error)
- func (p *PostgresSource) GetSourceReadRowsCount() (int, error)
- func (p *PostgresSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
- func (p *PostgresSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
- func (p *PostgresSource) SwitchDatabase() error
- type Sourcer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GenerateJSONFile ¶
func SlimCondition ¶ added in v0.1.8
func SplitCondition ¶ added in v0.1.8
func SplitConditionAccordingMaxGoRoutine ¶ added in v0.1.8
func SplitConditionAccordingToTimeSplitKey ¶ added in v0.1.8
func SplitTimeConditionsByMaxThread ¶ added in v0.1.8
Types ¶
type DatabendIngesterStatsData ¶ added in v0.1.3
type DatabendSourceStatsRecorder ¶ added in v0.1.3
type DatabendSourceStatsRecorder struct {
// contains filtered or unexported fields
}
func NewDatabendIntesterStatsRecorder ¶ added in v0.1.3
func NewDatabendIntesterStatsRecorder() *DatabendSourceStatsRecorder
func (*DatabendSourceStatsRecorder) RecordMetric ¶ added in v0.1.3
func (stats *DatabendSourceStatsRecorder) RecordMetric(rows int)
func (*DatabendSourceStatsRecorder) Stats ¶ added in v0.1.3
func (stats *DatabendSourceStatsRecorder) Stats(statsWindow time.Duration) DatabendIngesterStatsData
type MysqlSource ¶ added in v0.1.8
type MysqlSource struct {
// contains filtered or unexported fields
}
func NewMysqlSource ¶ added in v0.1.8
func NewMysqlSource(cfg *config.Config) (*MysqlSource, error)
func (*MysqlSource) AdjustBatchSizeAccordingToSourceDbTable ¶ added in v0.1.8
func (s *MysqlSource) AdjustBatchSizeAccordingToSourceDbTable() int64
AdjustBatchSizeAccordingToSourceDbTable has a concept called s, s = (maxKey - minKey) / sourceTableRowCount if s == 1 it means the data is uniform in the table, if s is much bigger than 1, it means the data is not uniform in the table
func (*MysqlSource) DeleteAfterSync ¶ added in v0.1.8
func (s *MysqlSource) DeleteAfterSync() error
func (*MysqlSource) GetAllSourceReadRowsCount ¶ added in v0.1.8
func (s *MysqlSource) GetAllSourceReadRowsCount() (int, error)
func (*MysqlSource) GetDatabasesAccordingToSourceDbRegex ¶ added in v0.1.8
func (s *MysqlSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
func (*MysqlSource) GetDbTablesAccordingToSourceDbTables ¶ added in v0.1.8
func (s *MysqlSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
func (*MysqlSource) GetMinMaxSplitKey ¶ added in v0.1.8
func (s *MysqlSource) GetMinMaxSplitKey() (int64, int64, error)
func (*MysqlSource) GetMinMaxTimeSplitKey ¶ added in v0.1.8
func (s *MysqlSource) GetMinMaxTimeSplitKey() (string, string, error)
func (*MysqlSource) GetSourceReadRowsCount ¶ added in v0.1.8
func (s *MysqlSource) GetSourceReadRowsCount() (int, error)
func (*MysqlSource) GetTablesAccordingToSourceTableRegex ¶ added in v0.1.8
func (*MysqlSource) QueryTableData ¶ added in v0.1.8
func (s *MysqlSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
type PostgresSource ¶ added in v0.1.8
type PostgresSource struct {
// contains filtered or unexported fields
}
func NewPostgresSource ¶ added in v0.1.8
func NewPostgresSource(cfg *config.Config) (*PostgresSource, error)
func (*PostgresSource) AdjustBatchSizeAccordingToSourceDbTable ¶ added in v0.1.8
func (p *PostgresSource) AdjustBatchSizeAccordingToSourceDbTable() int64
func (*PostgresSource) DeleteAfterSync ¶ added in v0.1.8
func (p *PostgresSource) DeleteAfterSync() error
func (*PostgresSource) GetAllSourceReadRowsCount ¶ added in v0.1.8
func (p *PostgresSource) GetAllSourceReadRowsCount() (int, error)
func (*PostgresSource) GetDatabasesAccordingToSourceDbRegex ¶ added in v0.1.8
func (p *PostgresSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
func (*PostgresSource) GetDbTablesAccordingToSourceDbTables ¶ added in v0.1.8
func (p *PostgresSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
func (*PostgresSource) GetMinMaxSplitKey ¶ added in v0.1.8
func (p *PostgresSource) GetMinMaxSplitKey() (int64, int64, error)
func (*PostgresSource) GetMinMaxTimeSplitKey ¶ added in v0.1.8
func (p *PostgresSource) GetMinMaxTimeSplitKey() (string, string, error)
func (*PostgresSource) GetSourceReadRowsCount ¶ added in v0.1.8
func (p *PostgresSource) GetSourceReadRowsCount() (int, error)
func (*PostgresSource) GetTablesAccordingToSourceTableRegex ¶ added in v0.1.8
func (*PostgresSource) QueryTableData ¶ added in v0.1.8
func (p *PostgresSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
func (*PostgresSource) SwitchDatabase ¶ added in v0.1.8
func (p *PostgresSource) SwitchDatabase() error
type Sourcer ¶
type Sourcer interface {
AdjustBatchSizeAccordingToSourceDbTable() int64
GetSourceReadRowsCount() (int, error)
GetMinMaxSplitKey() (int64, int64, error)
GetMinMaxTimeSplitKey() (string, string, error)
DeleteAfterSync() error
QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
GetAllSourceReadRowsCount() (int, error)
GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
}
Click to show internal directories.
Click to hide internal directories.