Documentation
¶
Index ¶
- Constants
- Variables
- func DecodeBatchPositionValue(s string) (interface{}, error)
- func DetectScanColumns(sourceDB *sql.DB, dbName string, tableName string, estimatedRowsCount int64, ...) ([]string, error)
- func EncodeBatchPositionValue(v interface{}) (string, error)
- func FindMaxMinValueFromDB(db *sql.DB, dbName string, tableName string, scanColumns []string, ...) ([]interface{}, []interface{})
- func GenerateNextScanQueryAndArgs(fullTableName string, scanColumns []string, currentMinValues []interface{}, ...) (string, []interface{})
- func GenerateScanQueryAndArgs(fullTableName string, scanColumns []string, currentMinValues []interface{}, ...) (string, []interface{})
- func GetMaxMin(cache position_cache.PositionCacheInterface, fullTableName string) ([]TablePosition, []TablePosition, bool, error)
- func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)
- func GetStartBinlog(cache position_cache.PositionCacheInterface) (config.MySQLBinlogPosition, error)
- func GetTableColumnTypes(db *sql.DB, schema string, table string) ([]*sql.ColumnType, error)
- func GreaterThanMax(db *sql.DB, fullTableName string, scanColumns []string, ...) (bool, error)
- func InitTablePosition(db *sql.DB, positionCache position_cache.PositionCacheInterface, ...) (bool, error)
- func IsScanColumnsForDump(scanColumns []string) bool
- func NewBarrierMsg(tableDef *schema_store.Table) *core.Msg
- func NewCloseInputStreamMsg(tableDef *schema_store.Table) *core.Msg
- func NewCreateTableMsg(parser *parser.Parser, table *schema_store.Table, createTblStmt string) *core.Msg
- func NewMsg(rowPtrs []interface{}, columnTypes []*sql.ColumnType, ...) *core.Msg
- func NextBatchStartPoint(db *sql.DB, fullTableName string, scanColumns []string, ...) (nextMinValues []interface{}, continueNext bool, pivotIndex int, err error)
- func NextScanElementForChunk(db *sql.DB, fullTableName string, columnTypes []*sql.ColumnType, ...) (nextRowValues []interface{}, exists bool, err error)
- func PutCurrentPos(cache position_cache.PositionCacheInterface, fullTableName string, ...) error
- func PutDone(cache position_cache.PositionCacheInterface, fullTableName string) error
- func PutEstimatedCount(cache position_cache.PositionCacheInterface, fullTableName string, ...) error
- func PutMaxMin(cache position_cache.PositionCacheInterface, fullTableName string, ...) error
- func ScanValuesFromRowValues(rowValues []interface{}, scanIndexes []int) []interface{}
- func SetupInitialPosition(cache position_cache.PositionCacheInterface, sourceDB *sql.DB) error
- type BatchPositionValueV1
- type BatchPositionValueV1Beta1
- type BatchPositionVersionMigrationWrapper
- type PluginConfig
- type TableConfig
- func DeleteEmptyTables(db *sql.DB, tables []*schema_store.Table, tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig)
- func GetTables(db *sql.DB, schemaStore schema_store.SchemaStore, ignoreTables []TableConfig, ...) ([]*schema_store.Table, []TableConfig)
- func InitializePositionAndDeleteScannedTable(db *sql.DB, positionCache position_cache.PositionCacheInterface, ...) ([]*schema_store.Table, []TableConfig, [][]string, []int64, error)
- type TablePosition
- type TableScanner
- func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error
- func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)
- func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig, ...)
- func (tableScanner *TableScanner) Start() error
- func (tableScanner *TableScanner) Wait()
- type TableStats
- type TableStatsV1
- type TableWork
Constants ¶
View Source
const ( Unknown = "*" PlainString = "string" PlainInt = "int" PlainUInt = "uint" PlainBytes = "bytes" PlainTime = "time" SQLNullInt64 = "sqlNullInt64" SQLNullString = "sqlNullString" SQLNullBool = "sqlNullBool" SQLNullTime = "sqlNullTime" SQLRawBytes = "sqlRawBytes" ScanColumnForDump = "*" SchemaVersionV1 = "v1.0" )
View Source
const Name = "mysql-batch"
Variables ¶
View Source
var ( BatchQueryDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "gravity", Subsystem: "output", Name: "batch_query_duration", Help: "bucketed histogram of batch fetch duration time", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{metrics.PipelineTag}) JobFetchedCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "gravity", Subsystem: "output", Name: "job_fetched_count", Help: "Number of data rows fetched by scanner", }, []string{metrics.PipelineTag}) )
Functions ¶
func DecodeBatchPositionValue ¶ added in v0.9.19
func DetectScanColumns ¶ added in v0.9.27
func DetectScanColumns(sourceDB *sql.DB, dbName string, tableName string, estimatedRowsCount int64, maxFullDumpRowsCountLimit int64) ([]string, error)
DetectScanColumns find columns that we used to scan the table First, we try primary keys, then we try unique key; we try dump the table at last. Note that composite unique key is not supported.
func EncodeBatchPositionValue ¶ added in v0.9.19
func FindMaxMinValueFromDB ¶
func GenerateNextScanQueryAndArgs ¶ added in v0.9.27
func GenerateScanQueryAndArgs ¶ added in v0.9.27
func GenerateScanQueryAndArgs( fullTableName string, scanColumns []string, currentMinValues []interface{}, batch int, pivotIndex int, condition string) (string, []interface{})
pivotIndex is the index in scanColumns
func GetMaxMin ¶ added in v0.9.17
func GetMaxMin(cache position_cache.PositionCacheInterface, fullTableName string) ([]TablePosition, []TablePosition, bool, error)
func GetScanIdx ¶
func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)
func GetStartBinlog ¶ added in v0.9.17
func GetStartBinlog(cache position_cache.PositionCacheInterface) (config.MySQLBinlogPosition, error)
func GetTableColumnTypes ¶
func GreaterThanMax ¶ added in v0.9.27
func InitTablePosition ¶ added in v0.9.18
func InitTablePosition( db *sql.DB, positionCache position_cache.PositionCacheInterface, tableDef *schema_store.Table, scanColumns []string, tableConfig TableConfig, estimatedRowCount *int64) (bool, error)
func IsScanColumnsForDump ¶ added in v0.9.27
func NewBarrierMsg ¶ added in v0.9.17
func NewBarrierMsg(tableDef *schema_store.Table) *core.Msg
func NewCloseInputStreamMsg ¶
func NewCloseInputStreamMsg(tableDef *schema_store.Table) *core.Msg
func NewCreateTableMsg ¶
func NewMsg ¶
func NewMsg(
rowPtrs []interface{},
columnTypes []*sql.ColumnType,
sourceTableDef *schema_store.Table,
callbackFunc core.MsgCallbackFunc,
positions []TablePosition,
scanTime time.Time) *core.Msg
NewMsg creates a job, it converts sql.NullString to interface{} based on the column type. If the column type is time, then we parse the time
func NextBatchStartPoint ¶ added in v0.9.27
func NextScanElementForChunk ¶ added in v0.9.27
func PutCurrentPos ¶ added in v0.9.17
func PutCurrentPos(cache position_cache.PositionCacheInterface, fullTableName string, pos []TablePosition, incScanCount bool) error
func PutDone ¶ added in v0.9.27
func PutDone(cache position_cache.PositionCacheInterface, fullTableName string) error
func PutEstimatedCount ¶ added in v0.9.18
func PutEstimatedCount(cache position_cache.PositionCacheInterface, fullTableName string, estimatedCount int64) error
func PutMaxMin ¶ added in v0.9.17
func PutMaxMin(cache position_cache.PositionCacheInterface, fullTableName string, max []TablePosition, min []TablePosition) error
func ScanValuesFromRowValues ¶ added in v0.9.27
func ScanValuesFromRowValues(rowValues []interface{}, scanIndexes []int) []interface{}
func SetupInitialPosition ¶ added in v0.9.17
func SetupInitialPosition(cache position_cache.PositionCacheInterface, sourceDB *sql.DB) error
Types ¶
type BatchPositionValueV1 ¶ added in v0.9.27
type BatchPositionValueV1 struct {
SchemaVersion string `toml:"schema-version" json:"schema-version"`
Start config.MySQLBinlogPosition `toml:"start-binlog" json:"start-binlog"`
TableStates map[string]TableStatsV1 `toml:"table-stats" json:"table-stats"`
}
type BatchPositionValueV1Beta1 ¶ added in v0.9.27
type BatchPositionValueV1Beta1 struct {
SchemaVersion string `toml:"schema-version" json:"schema-version"`
Start config.MySQLBinlogPosition `toml:"start-binlog" json:"start-binlog"`
TableStates map[string]TableStats `toml:"table-stats" json:"table-stats"`
}
type BatchPositionVersionMigrationWrapper ¶ added in v0.9.27
type BatchPositionVersionMigrationWrapper struct {
SchemaVersion string `toml:"schema-version" json:"schema-version"`
}
type PluginConfig ¶
type PluginConfig struct {
Source *config.DBConfig `mapstructure:"source" toml:"source" json:"source"` // keep same with mysql binlog config to make most cases simple
SourceSlave *config.DBConfig `mapstructure:"source-slave" toml:"source-slave" json:"source-slave"`
SourceProbeCfg *helper.SourceProbeCfg `mapstructure:"source-probe-config"json:"source-probe-config"`
PositionRepo *config.GenericPluginConfig `mapstructure:"position-repo" toml:"position-repo" json:"position-repo"`
TableConfigs []TableConfig `mapstructure:"table-configs" json:"table-configs"`
IgnoreTables []TableConfig `mapstructure:"ignore-tables" json:"ignore-tables"`
NrScanner int `mapstructure:"nr-scanner" toml:"nr-scanner" json:"nr-scanner"`
TableScanBatch int `mapstructure:"table-scan-batch" toml:"table-scan-batch" json:"table-scan-batch"`
MaxFullDumpCount int64 `mapstructure:"max-full-dump-count" toml:"max-full-dump-count" json:"max-full-dump-count"`
BatchPerSecondLimit int `mapstructure:"batch-per-second-limit" toml:"batch-per-second-limit" json:"batch-per-second-limit"`
}
func (*PluginConfig) ValidateAndSetDefault ¶
func (cfg *PluginConfig) ValidateAndSetDefault() error
type TableConfig ¶
type TableConfig struct {
Schema string `mapstructure:"schema" toml:"schema" json:"schema"`
// Table is an array of string, each string is a glob expression
// that describes the table name
Table []string `mapstructure:"table" toml:"table" json:"table"`
// ScanColumn is an array of string, that enforces these table's scan columns
ScanColumn []string `mapstructure:"scan-column" toml:"scan-column" json:"scan-column"`
Condition string `mapstructure:"condition" toml:"condition" json:"condition"`
}
func DeleteEmptyTables ¶ added in v0.9.18
func DeleteEmptyTables(db *sql.DB, tables []*schema_store.Table, tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig)
func GetTables ¶
func GetTables(db *sql.DB, schemaStore schema_store.SchemaStore, ignoreTables []TableConfig, tableConfigs []TableConfig, router core.Router) ([]*schema_store.Table, []TableConfig)
GetTables returns a list of table definition based on the schema, table name patterns We only support single sourceDB for now.
func InitializePositionAndDeleteScannedTable ¶ added in v0.9.19
func InitializePositionAndDeleteScannedTable( db *sql.DB, positionCache position_cache.PositionCacheInterface, scanColumnsArray [][]string, estimatedRowCount []int64, tables []*schema_store.Table, tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig, [][]string, []int64, error)
type TablePosition ¶ added in v0.9.17
type TablePosition struct {
Value interface{} `toml:"value" json:"value,omitempty"`
Type string `toml:"type" json:"type"`
Column string `toml:"column" json:"column"`
}
func GetCurrentPos ¶ added in v0.9.17
func GetCurrentPos(cache position_cache.PositionCacheInterface, fullTableName string) ([]TablePosition, bool, bool, error)
func (TablePosition) MapString ¶ added in v0.9.17
func (p TablePosition) MapString() (map[string]string, error)
func (TablePosition) MarshalJSON ¶ added in v0.9.17
func (p TablePosition) MarshalJSON() ([]byte, error)
func (*TablePosition) UnmarshalJSON ¶ added in v0.9.17
func (p *TablePosition) UnmarshalJSON(value []byte) error
type TableScanner ¶
type TableScanner struct {
// contains filtered or unexported fields
}
func NewTableScanner ¶
func NewTableScanner( pipelineName string, tableWorkC chan *TableWork, db *sql.DB, positionCache position_cache.PositionCacheInterface, emitter core.Emitter, throttle *time.Ticker, schemaStore schema_store.SchemaStore, cfg *PluginConfig, ctx context.Context) *TableScanner
func (*TableScanner) AfterMsgCommit ¶
func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error
func (*TableScanner) FindAll ¶
func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)
func (*TableScanner) LoopInBatch ¶
func (tableScanner *TableScanner) LoopInBatch( db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig, scanColumns []string, max []TablePosition, min []TablePosition, batch int)
LoopInBatch will iterate the table by sql like this: SELECT * FROM a WHERE some_key > some_value LIMIT 10000 It will get the min, max value of the column and iterate batch by batch
func (*TableScanner) Start ¶
func (tableScanner *TableScanner) Start() error
func (*TableScanner) Wait ¶
func (tableScanner *TableScanner) Wait()
type TableStats ¶ added in v0.9.18
type TableStats struct {
Max *TablePosition `toml:"max" json:"max"`
Min *TablePosition `toml:"min" json:"min"`
Current *TablePosition `toml:"current" json:"current"`
EstimatedRowCount int64 `json:"estimated-count"`
ScannedCount int64 `json:"scanned-count"`
Done bool `json:"done"`
}
type TableStatsV1 ¶ added in v0.9.27
type TableStatsV1 struct {
Max []TablePosition `toml:"max" json:"max"`
Min []TablePosition `toml:"min" json:"min"`
Current []TablePosition `toml:"current" json:"current"`
EstimatedRowCount int64 `json:"estimated-count"`
ScannedCount int64 `json:"scanned-count"`
Done bool `json:"done"`
}
type TableWork ¶
type TableWork struct {
TableDef *schema_store.Table
TableConfig *TableConfig
ScanColumns []string
EstimatedRowCount int64
Condition string
}
Click to show internal directories.
Click to hide internal directories.