Documentation
¶
Index ¶
- Constants
- Variables
- func CheckBinHeaderCondition(cfg *ConfCmd, header *replication.EventHeader, currentBinlog string) int
- func CheckElementOfSliceInt(arr []int, element int, prefix string, ifExt bool) bool
- func CheckElementOfSliceStr(arr []string, element string, prefix string, ifExt bool) bool
- func CheckIsDir(fd string) (bool, string)
- func CommaSeparatedListToArray(str string) []string
- func CompareBinlogPos(sBinFile string, sPos uint, eBinFile string, ePos uint) int
- func CompareEquelByteSlice(s1 []byte, s2 []byte) bool
- func ConvertRowToExpressRow(row []interface{}, ifIgnorePrimary bool, primaryIdx []int) []SQL.Expression
- func ConvertStrArrToIntferfaceArrForPrint(arr []string) []interface{}
- func CreateMysqlCon(mysqlUrl string) (*sql.DB, error)
- func GenDeleteSqlsForOneRowsEvent(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, ...) []string
- func GenDeleteSqlsForOneRowsEventRollbackInsert(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, ...) []string
- func GenEqualConditions(row []interface{}, colDefs []SQL.NonAliasColumn, uniKey []int, ...) []SQL.BoolExpression
- func GenForwardRollbackSqlFromBinEvent(i uint, cfg *ConfCmd, wg *sync.WaitGroup)
- func GenInsertSqlForRows(rows [][]interface{}, insertSql SQL.InsertStatement, schema string, ...) (string, error)
- func GenInsertSqlsForOneRowsEvent(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, ...) []string
- func GenInsertSqlsForOneRowsEventRollbackDelete(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, ...) []string
- func GenUpdateSetPart(colsTypeNameFromMysql []string, colTypeNames []string, ...) SQL.UpdateStatement
- func GenUpdateSqlsForOneRowsEvent(posStr string, colsTypeNameFromMysql []string, colsTypeName []string, ...) []string
- func GetAbsTableName(schema, table string) string
- func GetBigLongTrxContentLine(blTrx BigLongTrxInfo) string
- func GetBigLongTrxPrintHeaderLine(headers []string) string
- func GetBigLongTrxStatementsStr(st map[string]map[string]uint32) string
- func GetBinlogBasenameAndIndex(binlog string) (string, int)
- func GetBinlogPosAsKey(binlog string, spos, epos uint32) string
- func GetColDefIgnorePrimary(colDefs []SQL.NonAliasColumn, primaryIdx []int) []SQL.NonAliasColumn
- func GetColIndexFromKey(ki KeyInfo, columns []FieldInfo) []int
- func GetDatetimeStr(sec int64, nsec int64, timeFmt string) string
- func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32)
- func GetDbTbFromAbsTbName(name string) (string, string)
- func GetDroppedFieldName(idx int) string
- func GetFiledType(filed string) string
- func GetFirstBinlogPosToParse(cfg *ConfCmd) (string, int64)
- func GetForwardRollbackContentLineWithExtra(sq ForwardRollbackSqlOfPrint, ifExtra bool) string
- func GetForwardRollbackSqlFileName(schema string, table string, filePerTable bool, outDir string, ifRollback bool, ...) string
- func GetLineHeaderStrFromColumnNamesArr(arr []string, sep string) string
- func GetMaxValue(nums ...int) int
- func GetMinValue(nums ...int) int
- func GetMysqlDataTypeNameAndSqlColumn(tpDef string, colName string, tp byte, meta uint16) (string, SQL.NonAliasColumn)
- func GetMysqlUrl(cfg *ConfCmd) string
- func GetNextBinlog(baseName string, indx int) string
- func GetPosStr(name string, spos uint32, epos uint32) string
- func GetSqlFieldsEXpressions(colCnt int, colNames []FieldInfo, tbMap *replication.TableMapEvent) ([]SQL.NonAliasColumn, []string)
- func GetStatsPrintContentLine(st *BinEventStatsPrint) string
- func GetStatsPrintHeaderLine(headers []string) string
- func GetSystemHomeNameAndAdderss() (hostname string, address string)
- func IntSliceToString(iArr []int, sep string, prefix string) string
- func IsUnsigned(filed string) bool
- func NewReplBinlogStreamer(cfg *ConfCmd) *replication.BinlogStreamer
- func ParserAllBinEventsFromRepl(cfg *ConfCmd)
- func PrintExtraInfoForForwardRollbackupSql(cfg *ConfCmd, wg *sync.WaitGroup)
- func ProcessBinEventStats(cfg *ConfCmd, wg *sync.WaitGroup)
- func ReverseFileGo(threadIdx int, rollbackFileChan chan map[string]string, ...)
- func ReverseFileToNewFileOneByOneLineAndKeepTrxBatchRead(srcFile string, destFile string, trxPoses [][]int, keepTrx bool) error
- func SendBinlogEventRepl(cfg *ConfCmd)
- func StrSliceToString(sArr []string, sep, prefix string) string
- type BigLongTrxInfo
- type BinEventHandlingIndx
- type BinEventStats
- type BinEventStatsPrint
- type BinFileParser
- type ConfCmd
- func (this *ConfCmd) CheckCmdOptions()
- func (this *ConfCmd) CheckRequiredOption(v interface{}, prefix string, ifExt bool) bool
- func (this *ConfCmd) CheckValueInRange(opt string, val int, prefix string, ifExt bool) bool
- func (this *ConfCmd) CloseChan()
- func (this *ConfCmd) CloseFH()
- func (this *ConfCmd) CreateDB()
- func (this *ConfCmd) GetDefaultAndRangeValueMsg(opt string) string
- func (this *ConfCmd) GetDefaultValueOfRange(opt string) int
- func (this *ConfCmd) GetMaxValueOfRange(opt string) int
- func (this *ConfCmd) GetMinValueOfRange(opt string) int
- func (this *ConfCmd) IsTargetDml(dml string) bool
- func (this *ConfCmd) OpenStatsResultFiles()
- func (this *ConfCmd) OpenTxResultFiles()
- func (this *ConfCmd) ParseCmdOptions()
- func (this *ConfCmd) PrintUsageMsg()
- type DdlPosInfo
- type ExtraSqlInfoOfPrint
- type FieldInfo
- type ForwardRollbackSqlOfPrint
- type KeyInfo
- type MyBinEvent
- type OrgSqlPrint
- type TablesColumnsInfo
- func (this *TablesColumnsInfo) GetTableColumns(db *sql.DB, dbname string, tbname string) error
- func (this *TablesColumnsInfo) GetTableInfoJson(schema string, table string) (*TblInfoJson, error)
- func (this *TablesColumnsInfo) GetTableKeysInfo(db *sql.DB, dbName string, tbName string) error
- func (this *TablesColumnsInfo) GetTbDefFromDb(cfg *ConfCmd, dbname string, tbname string)
- type TblInfoJson
Constants ¶
View Source
const ( C_Version = "my2sql V2.0" C_validOptMsg = "valid options are: " C_joinSepComma = "," EventTimeout = 5 * time.Second C_unknownColPrefix = "dropped_column_" C_unknownColType = "unknown_type" C_unknownColTypeCode = mysql.MYSQL_TYPE_NULL C_trxBegin = 0 C_trxCommit = 1 C_trxRollback = 2 C_trxProcess = -1 C_reProcess = 0 C_reContinue = 1 C_reBreak = 2 C_reFileEnd = 3 )
View Source
const ( //PRIMARY_KEY_LABLE = "primary" //UNIQUE_KEY_LABLE = "unique" KEY_BINLOG_POS_SEP = "/" KEY_DB_TABLE_SEP = "." KEY_NONE_BINLOG = "_" )
Variables ¶
View Source
var ( GConfCmd *ConfCmd = &ConfCmd{} GBinlogTimeLocation *time.Location GUseDatabase string = "" GOptsValidMode []string = []string{"repl", "file"} GOptsValidWorkType []string = []string{"2sql", "rollback", "stats"} GOptsValidMysqlType []string = []string{"mysql", "mariadb"} GOptsValidFilterSql []string = []string{"insert", "update", "delete"} GOptsValueRange map[string][]int = map[string][]int{ "PrintInterval": []int{1, 600, 30}, "BigTrxRowLimit": []int{1, 30000, 10}, "LongTrxSeconds": []int{0, 3600, 1}, "InsertRows": []int{1, 500, 30}, "Threads": []int{1, 16, 2}, } GStatsColumns []string = []string{ "StartTime", "StopTime", "Binlog", "PosRange", "Database", "Table", "BigTrxs", "BiggestTrx", "LongTrxs", "LongestTrx", "Inserts", "Updates", "Deletes", "Trxs", "Statements", "Renames", "RenamePoses", "Ddls", "DdlPoses", } GDdlPrintHeader []string = []string{"datetime", "binlog", "startposition", "stopposition", "sql"} )
View Source
var ( ForwardSqlFileNamePrefix string = "forward" RollbackSqlFileNamePrefix string = "rollback" )
View Source
var ( //gDdlRegexp *regexp.Regexp = regexp.MustCompile(C_ddlRegexp) Stats_Result_Header_Column_names []string = []string{"binlog", "starttime", "stoptime", "startpos", "stoppos", "inserts", "updates", "deletes", "database", "table"} Stats_DDL_Header_Column_names []string = []string{"datetime", "binlog", "startpos", "stoppos", "sql"} Stats_BigLongTrx_Header_Column_names []string = []string{"binlog", "starttime", "stoptime", "startpos", "stoppos", "rows", "duration", "tables"} )
View Source
var G_Bytes_Column_Types []string = []string{"blob", "json", "geometry", C_unknownColType}
Functions ¶
func CheckBinHeaderCondition ¶
func CheckBinHeaderCondition(cfg *ConfCmd, header *replication.EventHeader, currentBinlog string) int
func CheckElementOfSliceInt ¶
func CheckElementOfSliceStr ¶
func CheckIsDir ¶
func CompareBinlogPos ¶
func CompareEquelByteSlice ¶
func ConvertRowToExpressRow ¶
func ConvertRowToExpressRow(row []interface{}, ifIgnorePrimary bool, primaryIdx []int) []SQL.Expression
func ConvertStrArrToIntferfaceArrForPrint ¶
func ConvertStrArrToIntferfaceArrForPrint(arr []string) []interface{}
func GenDeleteSqlsForOneRowsEvent ¶
func GenDeleteSqlsForOneRowsEvent(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, uniKey []int, ifFullImage bool, ifRollback bool, ifprefixDb bool) []string
func GenDeleteSqlsForOneRowsEventRollbackInsert ¶
func GenDeleteSqlsForOneRowsEventRollbackInsert(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, uniKey []int, ifFullImage bool, ifprefixDb bool) []string
func GenEqualConditions ¶
func GenEqualConditions(row []interface{}, colDefs []SQL.NonAliasColumn, uniKey []int, ifFullImage bool) []SQL.BoolExpression
func GenInsertSqlForRows ¶
func GenInsertSqlsForOneRowsEventRollbackDelete ¶
func GenInsertSqlsForOneRowsEventRollbackDelete(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, rowsPerSql int, ifprefixDb bool) []string
func GenUpdateSetPart ¶
func GenUpdateSetPart(colsTypeNameFromMysql []string, colTypeNames []string, updateSql SQL.UpdateStatement, colDefs []SQL.NonAliasColumn, rowAfter []interface{}, rowBefore []interface{}, ifFullImage bool) SQL.UpdateStatement
func GetAbsTableName ¶
func GetBigLongTrxContentLine ¶
func GetBigLongTrxContentLine(blTrx BigLongTrxInfo) string
func GetBinlogPosAsKey ¶
func GetColDefIgnorePrimary ¶
func GetColDefIgnorePrimary(colDefs []SQL.NonAliasColumn, primaryIdx []int) []SQL.NonAliasColumn
func GetColIndexFromKey ¶
func GetDbTbAndQueryAndRowCntFromBinevent ¶
func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32)
func GetDbTbFromAbsTbName ¶
func GetDroppedFieldName ¶
func GetFiledType ¶
func GetForwardRollbackContentLineWithExtra ¶
func GetForwardRollbackContentLineWithExtra(sq ForwardRollbackSqlOfPrint, ifExtra bool) string
func GetMaxValue ¶
func GetMinValue ¶
func GetMysqlUrl ¶
func GetNextBinlog ¶
func GetSqlFieldsEXpressions ¶
func GetSqlFieldsEXpressions(colCnt int, colNames []FieldInfo, tbMap *replication.TableMapEvent) ([]SQL.NonAliasColumn, []string)
func GetStatsPrintContentLine ¶
func GetStatsPrintContentLine(st *BinEventStatsPrint) string
func GetStatsPrintHeaderLine ¶
func GetSystemHomeNameAndAdderss ¶
获取 系统的hostname 和 ip地址
func IsUnsigned ¶
func NewReplBinlogStreamer ¶
func NewReplBinlogStreamer(cfg *ConfCmd) *replication.BinlogStreamer
func ParserAllBinEventsFromRepl ¶
func ParserAllBinEventsFromRepl(cfg *ConfCmd)
func ProcessBinEventStats ¶
func ReverseFileGo ¶
func SendBinlogEventRepl ¶
func SendBinlogEventRepl(cfg *ConfCmd)
func StrSliceToString ¶
Types ¶
type BigLongTrxInfo ¶
type BigLongTrxInfo struct {
//IsBig bool
//IsLong bool
StartTime uint32
StopTime uint32
Binlog string
StartPos uint32
StopPos uint32
RowCnt uint32 // total row count for all statement
Duration uint32 // how long the trx lasts
Statements map[string]map[string]uint32 // rowcnt for each type statment: insert, update, delete. {db1.tb1:{insert:0, update:2, delete:10}}
}
type BinEventHandlingIndx ¶
type BinEventHandlingIndx struct {
EventIdx uint64
Finished bool
// contains filtered or unexported fields
}
var (
G_HandlingBinEventIndex *BinEventHandlingIndx
)
type BinEventStats ¶
type BinEventStatsPrint ¶
type BinFileParser ¶
type BinFileParser struct {
Parser *replication.BinlogParser
}
type ConfCmd ¶
type ConfCmd struct {
Mode string
WorkType string
MysqlType string
Host string
Port uint
User string
Passwd string
ServerId uint
Databases []string
Tables []string
//DatabaseRegs []*regexp.Regexp
//ifHasDbReg bool
//TableRegs []*regexp.Regexp
//ifHasTbReg bool
IgnoreDatabases []string
IgnoreTables []string
FilterSql []string
FilterSqlLen int
StartFile string
StartPos uint
StartFilePos mysql.Position
IfSetStartFilePos bool
StopFile string
StopPos uint
StopFilePos mysql.Position
IfSetStopFilePos bool
StartDatetime uint32
StopDatetime uint32
BinlogTimeLocation string
IfSetStartDateTime bool
IfSetStopDateTime bool
MaxFileNumber int
LocalBinFile string
OutputToScreen bool
PrintInterval int
BigTrxRowLimit int
LongTrxSeconds int
IfSetStopParsPoint bool
OutputDir string
//MinColumns bool
FullColumns bool
InsertRows int
KeepTrx bool
SqlTblPrefixDb bool
FilePerTable bool
PrintExtraInfo bool
Threads uint
ReadTblDefJsonFile string
OnlyColFromFile bool
DumpTblDefToFile string
BinlogDir string
GivenBinlogFile string
UseUniqueKeyFirst bool
IgnorePrimaryKeyForInsert bool
ReplaceIntoForInsert bool
//DdlRegexp string
ParseStatementSql bool
IgnoreParsedErrForSql string // if parsed error, for sql match this regexp, only print error info, but not exits
IgnoreParsedErrRegexp *regexp.Regexp
EventChan chan MyBinEvent
StatChan chan BinEventStats
OrgSqlChan chan OrgSqlPrint
SqlChan chan ForwardRollbackSqlOfPrint
StatFH *os.File
//DdlFH *os.File
BiglongFH *os.File
BinlogStreamer *replication.BinlogStreamer
FromDB *sql.DB
}
func (*ConfCmd) CheckCmdOptions ¶
func (this *ConfCmd) CheckCmdOptions()
func (*ConfCmd) CheckRequiredOption ¶
func (*ConfCmd) CheckValueInRange ¶
func (*ConfCmd) GetDefaultAndRangeValueMsg ¶
func (*ConfCmd) GetDefaultValueOfRange ¶
func (*ConfCmd) GetMaxValueOfRange ¶
func (*ConfCmd) GetMinValueOfRange ¶
func (*ConfCmd) IsTargetDml ¶
func (*ConfCmd) OpenStatsResultFiles ¶
func (this *ConfCmd) OpenStatsResultFiles()
func (*ConfCmd) OpenTxResultFiles ¶
func (this *ConfCmd) OpenTxResultFiles()
func (*ConfCmd) ParseCmdOptions ¶
func (this *ConfCmd) ParseCmdOptions()
func (*ConfCmd) PrintUsageMsg ¶
func (this *ConfCmd) PrintUsageMsg()
type DdlPosInfo ¶
type ExtraSqlInfoOfPrint ¶
type ExtraSqlInfoOfPrint struct {
// contains filtered or unexported fields
}
type FieldInfo ¶
type FieldInfo struct {
FieldName string `json:"column_name"`
FieldType string `json:"column_type"`
IsUnsigned bool `json:"is_unsigned"`
}
type ForwardRollbackSqlOfPrint ¶
type ForwardRollbackSqlOfPrint struct {
// contains filtered or unexported fields
}
type MyBinEvent ¶
type MyBinEvent struct {
MyPos mysql.Position //this is the end position
EventIdx uint64
BinEvent *replication.RowsEvent
StartPos uint32 // this is the start position
IfRowsEvent bool
SqlType string // insert, update, delete
Timestamp uint32
TrxIndex uint64
TrxStatus int // 0:begin, 1: commit, 2: rollback, -1: in_progress
QuerySql *dsql.SqlInfo // for ddl and binlog which is not row format
OrgSql string // for ddl and binlog which is not row format
}
func (*MyBinEvent) CheckBinEvent ¶
func (this *MyBinEvent) CheckBinEvent(cfg *ConfCmd, ev *replication.BinlogEvent, currentBinlog *string) int
type OrgSqlPrint ¶
type TablesColumnsInfo ¶
type TablesColumnsInfo struct {
// contains filtered or unexported fields
}
var (
G_TablesColumnsInfo TablesColumnsInfo
)
func (*TablesColumnsInfo) GetTableColumns ¶
func (*TablesColumnsInfo) GetTableInfoJson ¶
func (this *TablesColumnsInfo) GetTableInfoJson(schema string, table string) (*TblInfoJson, error)
func (*TablesColumnsInfo) GetTableKeysInfo ¶
func (*TablesColumnsInfo) GetTbDefFromDb ¶
func (this *TablesColumnsInfo) GetTbDefFromDb(cfg *ConfCmd, dbname string, tbname string)
type TblInfoJson ¶
type TblInfoJson struct {
Database string `json:"database"`
Table string `json:"table"`
Columns []FieldInfo `json:"columns"`
PrimaryKey KeyInfo `json:"primary_key"`
UniqueKeys []KeyInfo `json:"unique_keys"`
}
func (*TblInfoJson) GetOneUniqueKey ¶
func (this *TblInfoJson) GetOneUniqueKey(uniqueFirst bool) KeyInfo
Click to show internal directories.
Click to hide internal directories.