source

package
v0.3.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2025 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GenerateJSONFile

func GenerateJSONFile(columns []string, data [][]interface{}) (string, int, error)

func SlimCondition added in v0.1.8

func SlimCondition(maxThread int, minSplitKey, maxSplitKey uint64) [][]uint64

func SplitCondition added in v0.1.8

func SplitCondition(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey uint64) []string

func SplitConditionAccordingMaxGoRoutine added in v0.1.8

func SplitConditionAccordingMaxGoRoutine(sourceSplitKey string, batchSize, minSplitKey, maxSplitKey, allMax uint64) <-chan string

func SplitConditionAccordingToTimeSplitKey added in v0.1.8

func SplitConditionAccordingToTimeSplitKey(cfg *config.Config, minTimeSplitKey, maxTimeSplitKey string) ([]string, error)

func SplitTimeConditionsByMaxThread added in v0.1.8

func SplitTimeConditionsByMaxThread(conditions []string, maxThread int) [][]string

Types

type DatabendIngesterStatsData added in v0.1.3

type DatabendIngesterStatsData struct {
	BytesPerSecond float64
	RowsPerSecondd float64
}

type DatabendSourceStatsRecorder added in v0.1.3

type DatabendSourceStatsRecorder struct {
	// contains filtered or unexported fields
}

func NewDatabendIntesterStatsRecorder added in v0.1.3

func NewDatabendIntesterStatsRecorder() *DatabendSourceStatsRecorder

func (*DatabendSourceStatsRecorder) RecordMetric added in v0.1.3

func (stats *DatabendSourceStatsRecorder) RecordMetric(rows int)

func (*DatabendSourceStatsRecorder) Stats added in v0.1.3

type MysqlSource added in v0.1.8

type MysqlSource struct {
	// contains filtered or unexported fields
}

func NewMysqlSource added in v0.1.8

func NewMysqlSource(cfg *config.Config) (*MysqlSource, error)

func (*MysqlSource) AdjustBatchSizeAccordingToSourceDbTable added in v0.1.8

func (s *MysqlSource) AdjustBatchSizeAccordingToSourceDbTable() uint64

AdjustBatchSizeAccordingToSourceDbTable has a concept called s, s = (maxKey - minKey) / sourceTableRowCount if s == 1 it means the data is uniform in the table, if s is much bigger than 1, it means the data is not uniform in the table

func (*MysqlSource) DeleteAfterSync added in v0.1.8

func (s *MysqlSource) DeleteAfterSync() error

func (*MysqlSource) GetAllSourceReadRowsCount added in v0.1.8

func (s *MysqlSource) GetAllSourceReadRowsCount() (int, error)

func (*MysqlSource) GetDatabasesAccordingToSourceDbRegex added in v0.1.8

func (s *MysqlSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)

func (*MysqlSource) GetDbTablesAccordingToSourceDbTables added in v0.1.8

func (s *MysqlSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)

func (*MysqlSource) GetMinMaxSplitKey added in v0.1.8

func (s *MysqlSource) GetMinMaxSplitKey() (uint64, uint64, error)

func (*MysqlSource) GetMinMaxTimeSplitKey added in v0.1.8

func (s *MysqlSource) GetMinMaxTimeSplitKey() (string, string, error)

func (*MysqlSource) GetSourceReadRowsCount added in v0.1.8

func (s *MysqlSource) GetSourceReadRowsCount() (int, error)

func (*MysqlSource) GetTablesAccordingToSourceTableRegex added in v0.1.8

func (s *MysqlSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)

func (*MysqlSource) QueryTableData added in v0.1.8

func (s *MysqlSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)

type NullUint64 added in v0.3.7

type NullUint64 struct {
	Uint64 uint64
	Valid  bool // Valid is true if Uint64 is not NULL
}

NullUint64 represents a uint64 that may be null.

func (*NullUint64) Scan added in v0.3.7

func (n *NullUint64) Scan(value interface{}) error

Scan implements the Scanner interface.

func (NullUint64) Value added in v0.3.7

func (n NullUint64) Value() (driver.Value, error)

Value implements the driver Valuer interface.

type OracleSource added in v0.2.0

type OracleSource struct {
	// contains filtered or unexported fields
}

func NewOracleSource added in v0.2.0

func NewOracleSource(cfg *config.Config) (*OracleSource, error)

func (*OracleSource) AdjustBatchSizeAccordingToSourceDbTable added in v0.2.0

func (p *OracleSource) AdjustBatchSizeAccordingToSourceDbTable() uint64

func (*OracleSource) DeleteAfterSync added in v0.2.0

func (p *OracleSource) DeleteAfterSync() error

func (*OracleSource) GetAllSourceReadRowsCount added in v0.2.0

func (p *OracleSource) GetAllSourceReadRowsCount() (int, error)

func (*OracleSource) GetDatabasesAccordingToSourceDbRegex added in v0.2.0

func (p *OracleSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)

func (*OracleSource) GetDbTablesAccordingToSourceDbTables added in v0.2.0

func (p *OracleSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)

func (*OracleSource) GetMinMaxSplitKey added in v0.2.0

func (p *OracleSource) GetMinMaxSplitKey() (uint64, uint64, error)

func (*OracleSource) GetMinMaxTimeSplitKey added in v0.2.0

func (p *OracleSource) GetMinMaxTimeSplitKey() (string, string, error)

func (*OracleSource) GetSourceReadRowsCount added in v0.2.0

func (p *OracleSource) GetSourceReadRowsCount() (int, error)

func (*OracleSource) GetTablesAccordingToSourceTableRegex added in v0.2.0

func (p *OracleSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)

func (*OracleSource) QueryTableData added in v0.2.0

func (p *OracleSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)

func (*OracleSource) SwitchDatabase added in v0.2.0

func (p *OracleSource) SwitchDatabase() error

type PostgresSource added in v0.1.8

type PostgresSource struct {
	// contains filtered or unexported fields
}

func NewPostgresSource added in v0.1.8

func NewPostgresSource(cfg *config.Config) (*PostgresSource, error)

func (*PostgresSource) AdjustBatchSizeAccordingToSourceDbTable added in v0.1.8

func (p *PostgresSource) AdjustBatchSizeAccordingToSourceDbTable() uint64

func (*PostgresSource) DeleteAfterSync added in v0.1.8

func (p *PostgresSource) DeleteAfterSync() error

func (*PostgresSource) GetAllSourceReadRowsCount added in v0.1.8

func (p *PostgresSource) GetAllSourceReadRowsCount() (int, error)

func (*PostgresSource) GetDatabasesAccordingToSourceDbRegex added in v0.1.8

func (p *PostgresSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)

func (*PostgresSource) GetDbTablesAccordingToSourceDbTables added in v0.1.8

func (p *PostgresSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)

func (*PostgresSource) GetMinMaxSplitKey added in v0.1.8

func (p *PostgresSource) GetMinMaxSplitKey() (uint64, uint64, error)

func (*PostgresSource) GetMinMaxTimeSplitKey added in v0.1.8

func (p *PostgresSource) GetMinMaxTimeSplitKey() (string, string, error)

func (*PostgresSource) GetSourceReadRowsCount added in v0.1.8

func (p *PostgresSource) GetSourceReadRowsCount() (int, error)

func (*PostgresSource) GetTablesAccordingToSourceTableRegex added in v0.1.8

func (p *PostgresSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)

func (*PostgresSource) QueryTableData added in v0.1.8

func (p *PostgresSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)

func (*PostgresSource) SwitchDatabase added in v0.1.8

func (p *PostgresSource) SwitchDatabase() error

type SQLServerSource added in v0.3.0

type SQLServerSource struct {
	// contains filtered or unexported fields
}

func NewSqlServerSource added in v0.3.0

func NewSqlServerSource(cfg *config.Config) (*SQLServerSource, error)

func (*SQLServerSource) AdjustBatchSizeAccordingToSourceDbTable added in v0.3.0

func (s *SQLServerSource) AdjustBatchSizeAccordingToSourceDbTable() uint64

func (*SQLServerSource) DeleteAfterSync added in v0.3.0

func (s *SQLServerSource) DeleteAfterSync() error

func (*SQLServerSource) GetAllSourceReadRowsCount added in v0.3.0

func (s *SQLServerSource) GetAllSourceReadRowsCount() (int, error)

func (*SQLServerSource) GetDatabasesAccordingToSourceDbRegex added in v0.3.0

func (s *SQLServerSource) GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)

func (*SQLServerSource) GetDbTablesAccordingToSourceDbTables added in v0.3.0

func (s *SQLServerSource) GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)

func (*SQLServerSource) GetMinMaxSplitKey added in v0.3.0

func (s *SQLServerSource) GetMinMaxSplitKey() (uint64, uint64, error)

func (*SQLServerSource) GetMinMaxTimeSplitKey added in v0.3.0

func (s *SQLServerSource) GetMinMaxTimeSplitKey() (string, string, error)

func (*SQLServerSource) GetSourceReadRowsCount added in v0.3.0

func (s *SQLServerSource) GetSourceReadRowsCount() (int, error)

func (*SQLServerSource) GetTablesAccordingToSourceTableRegex added in v0.3.0

func (s *SQLServerSource) GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)

func (*SQLServerSource) QueryTableData added in v0.3.0

func (s *SQLServerSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)

type Sourcer

type Sourcer interface {
	AdjustBatchSizeAccordingToSourceDbTable() uint64
	GetSourceReadRowsCount() (int, error)
	GetMinMaxSplitKey() (uint64, uint64, error)
	GetMinMaxTimeSplitKey() (string, string, error)
	DeleteAfterSync() error
	QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error)
	GetDatabasesAccordingToSourceDbRegex(sourceDatabasePattern string) ([]string, error)
	GetTablesAccordingToSourceTableRegex(sourceTablePattern string, databases []string) (map[string][]string, error)
	GetAllSourceReadRowsCount() (int, error)
	GetDbTablesAccordingToSourceDbTables() (map[string][]string, error)
}

func NewSource

func NewSource(cfg *config.Config) (Sourcer, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL