Versions in this module Expand all Collapse all v0 v0.3.9 Jan 27, 2026 Changes in this version + func GenerateJSONFile(columns []string, data [][]interface{}) (string, int, error) + func SlimCondition(maxThread int, minSplitKey, maxSplitKey uint64) [][]uint64 + func SplitCondition(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey uint64) []string + func SplitConditionAccordingMaxGoRoutine(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey, allMax uint64) <-chan string + func SplitConditionAccordingToTimeSplitKey(cfg *config.Config, minTimeSplitKey, maxTimeSplitKey string) ([]string, error) + func SplitTimeConditionsByMaxThread(conditions []string, maxThread int) [][]string + type DatabendIngesterStatsData struct + BytesPerSecond float64 + RowsPerSecondd float64 + type DatabendSourceStatsRecorder struct + func NewDatabendIntesterStatsRecorder() *DatabendSourceStatsRecorder + func (stats *DatabendSourceStatsRecorder) RecordMetric(rows int) + func (stats *DatabendSourceStatsRecorder) Stats(statsWindow time.Duration) DatabendIngesterStatsData + type MysqlSource struct + func NewMysqlSource(cfg *config.Config) (*MysqlSource, error) + func (s *MysqlSource) AdjustBatchSizeAccordingToSourceDbTable() uint64 + func (s *MysqlSource) DeleteAfterSync() error + func (s *MysqlSource) GetAllSourceReadRowsCount() (int, error) + func (s *MysqlSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error) + func (s *MysqlSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error) + func (s *MysqlSource) GetMinMaxSplitKey() (uint64, uint64, error) + func (s *MysqlSource) GetMinMaxTimeSplitKey() (string, string, error) + func (s *MysqlSource) GetSourceReadRowsCount() (int, error) + func (s *MysqlSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error) + func (s *MysqlSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error) + type NullUint64 struct + Uint64 uint64 + Valid bool + func (n *NullUint64) Scan(value interface{}) error + func (n NullUint64) Value() (driver.Value, error) + type OracleSource struct + func NewOracleSource(cfg *config.Config) (*OracleSource, error) + func (p *OracleSource) AdjustBatchSizeAccordingToSourceDbTable() uint64 + func (p *OracleSource) DeleteAfterSync() error + func (p *OracleSource) GetAllSourceReadRowsCount() (int, error) + func (p *OracleSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error) + func (p *OracleSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error) + func (p *OracleSource) GetMinMaxSplitKey() (uint64, uint64, error) + func (p *OracleSource) GetMinMaxTimeSplitKey() (string, string, error) + func (p *OracleSource) GetSourceReadRowsCount() (int, error) + func (p *OracleSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error) + func (p *OracleSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error) + func (p *OracleSource) SwitchDatabase() error + type PostgresSource struct + func NewPostgresSource(cfg *config.Config) (*PostgresSource, error) + func (p *PostgresSource) AdjustBatchSizeAccordingToSourceDbTable() uint64 + func (p *PostgresSource) DeleteAfterSync() error + func (p *PostgresSource) GetAllSourceReadRowsCount() (int, error) + func (p *PostgresSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error) + func (p *PostgresSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error) + func (p *PostgresSource) GetMinMaxSplitKey() (uint64, uint64, error) + func (p *PostgresSource) GetMinMaxTimeSplitKey() (string, string, error) + func (p *PostgresSource) GetSourceReadRowsCount() (int, error) + func (p *PostgresSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error) + func (p *PostgresSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error) + func (p *PostgresSource) SwitchDatabase() error + type SQLServerSource struct + func NewSqlServerSource(cfg *config.Config) (*SQLServerSource, error) + func (s *SQLServerSource) AdjustBatchSizeAccordingToSourceDbTable() uint64 + func (s *SQLServerSource) DeleteAfterSync() error + func (s *SQLServerSource) GetAllSourceReadRowsCount() (int, error) + func (s *SQLServerSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error) + func (s *SQLServerSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error) + func (s *SQLServerSource) GetMinMaxSplitKey() (uint64, uint64, error) + func (s *SQLServerSource) GetMinMaxTimeSplitKey() (string, string, error) + func (s *SQLServerSource) GetSourceReadRowsCount() (int, error) + func (s *SQLServerSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error) + func (s *SQLServerSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error) + type Sourcer interface + AdjustBatchSizeAccordingToSourceDbTable func() uint64 + DeleteAfterSync func() error + GetAllSourceReadRowsCount func() (int, error) + GetDatabasesAccordingToSourceDbRegex func(sourceDatabasePattern string) ([]string, error) + GetDbTablesAccordingToSourceDbTables func() (map[string][]string, error) + GetMinMaxSplitKey func() (uint64, uint64, error) + GetMinMaxTimeSplitKey func() (string, string, error) + GetSourceReadRowsCount func() (int, error) + GetTablesAccordingToSourceTableRegex func(sourceTablePattern string, databases []string) (map[string][]string, error) + QueryTableData func(threadNum int, conditionSql string) ([][]interface{}, []string, error) + func NewSource(cfg *config.Config) (Sourcer, error)