Documentation
¶
Index ¶
- Variables
- func GenTableID(schema, table string) (ID string, isSchemaOnly bool)
- func InitStatusAndMetrics(addr string)
- func RegisterMetrics(registry *prometheus.Registry)
- func UnpackTableID(id string) (string, string)
- type BinlogType
- type CheckPoint
- type Conn
- type DDLExecInfo
- type DDLExecItem
- type ExecErrorContext
- type Ghost
- func (g *Ghost) Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)
- func (g *Ghost) Clear() error
- func (g *Ghost) Close()
- func (g *Ghost) Finish(schema, table string) error
- func (g *Ghost) GhostName(schema, table string) (string, string)
- func (g *Ghost) InOnlineDDL(schema, table string) bool
- func (g *Ghost) RealName(schema, table string) (string, string)
- func (g *Ghost) SchemeName() string
- func (g *Ghost) TableType(table string) TableType
- type GhostDDLInfo
- type Heartbeat
- type HeartbeatConfig
- type LocalMeta
- type Meta
- type OnlineDDLStorage
- func (s *OnlineDDLStorage) Clear() error
- func (s *OnlineDDLStorage) Close()
- func (s *OnlineDDLStorage) Delete(ghostSchema, ghostTable string) error
- func (s *OnlineDDLStorage) Get(ghostSchema, ghostTable string) *GhostDDLInfo
- func (s *OnlineDDLStorage) Init() error
- func (s *OnlineDDLStorage) Load() error
- func (s *OnlineDDLStorage) Save(ghostSchema, ghostTable, realSchema, realTable, ddl string) error
- type OnlinePlugin
- type PT
- func (p *PT) Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)
- func (p *PT) Clear() error
- func (p *PT) Close()
- func (p *PT) Finish(schema, table string) error
- func (p *PT) GhostName(schema, table string) (string, string)
- func (p *PT) InOnlineDDL(schema, table string) bool
- func (p *PT) RealName(schema, table string) (string, string)
- func (p *PT) SchemeName() string
- func (p *PT) TableType(table string) TableType
- type RemoteCheckPoint
- func (cp *RemoteCheckPoint) CheckGlobalPoint() bool
- func (cp *RemoteCheckPoint) Clear() error
- func (cp *RemoteCheckPoint) Close()
- func (cp *RemoteCheckPoint) DeleteTablePoint(sourceSchema, sourceTable string) error
- func (cp *RemoteCheckPoint) FlushPointsExcept(exceptTables [][]string) error
- func (cp *RemoteCheckPoint) FlushedGlobalPoint() mysql.Position
- func (cp *RemoteCheckPoint) GenUpdateForTableSQLs(tables [][]string) ([]string, [][]interface{})
- func (cp *RemoteCheckPoint) GlobalPoint() mysql.Position
- func (cp *RemoteCheckPoint) Init() error
- func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool
- func (cp *RemoteCheckPoint) Load() error
- func (cp *RemoteCheckPoint) LoadMeta() error
- func (cp *RemoteCheckPoint) Rollback()
- func (cp *RemoteCheckPoint) SaveGlobalPoint(pos mysql.Position)
- func (cp *RemoteCheckPoint) SaveTablePoint(sourceSchema, sourceTable string, pos mysql.Position)
- func (cp *RemoteCheckPoint) String() string
- type ShardingGroup
- func (sg *ShardingGroup) FirstEndPosUnresolved() *mysql.Position
- func (sg *ShardingGroup) FirstPosUnresolved() *mysql.Position
- func (sg *ShardingGroup) InSyncing(source string) bool
- func (sg *ShardingGroup) IsUnresolved() bool
- func (sg *ShardingGroup) Leave(sources []string) error
- func (sg *ShardingGroup) Merge(sources []string) (bool, bool, int, error)
- func (sg *ShardingGroup) Reset()
- func (sg *ShardingGroup) Sources() map[string]bool
- func (sg *ShardingGroup) String() string
- func (sg *ShardingGroup) Tables() [][]string
- func (sg *ShardingGroup) TrySync(source string, pos, endPos mysql.Position, ddls []string) (bool, int, error)
- func (sg *ShardingGroup) UnresolvedGroupInfo() *pb.ShardingGroup
- func (sg *ShardingGroup) UnresolvedTables() [][]string
- type ShardingGroupKeeper
- func (k *ShardingGroupKeeper) AddGroup(targetSchema, targetTable string, sourceIDs []string, merge bool) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, ...)
- func (k *ShardingGroupKeeper) AdjustGlobalPoint(globalPoint mysql.Position) mysql.Position
- func (k *ShardingGroupKeeper) Clear()
- func (k *ShardingGroupKeeper) Group(targetSchema, targetTable string) *ShardingGroup
- func (k *ShardingGroupKeeper) Groups() map[string]*ShardingGroup
- func (k *ShardingGroupKeeper) InSyncing(targetSchema, targetTable, source string) bool
- func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sources []string) error
- func (k *ShardingGroupKeeper) ResetGroups()
- func (k *ShardingGroupKeeper) TrySync(targetSchema, targetTable, source string, pos, endPos mysql.Position, ...) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, ...)
- func (k *ShardingGroupKeeper) UnresolvedGroups() []*pb.ShardingGroup
- func (k *ShardingGroupKeeper) UnresolvedTables() [][]string
- type ShardingReSync
- type Syncer
- func (s *Syncer) Close()
- func (s *Syncer) DDLInfo() <-chan *pb.DDLInfo
- func (s *Syncer) Error() interface{}
- func (s *Syncer) ExecuteDDL(ctx context.Context, execReq *pb.ExecDDLRequest) (<-chan error, error)
- func (s *Syncer) Init() error
- func (s *Syncer) InjectSQLs(ctx context.Context, sqls []string) error
- func (s *Syncer) IsFreshTask() (bool, error)
- func (s *Syncer) Pause()
- func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult)
- func (s *Syncer) Resume(ctx context.Context, pr chan pb.ProcessResult)
- func (s *Syncer) Run(ctx context.Context) (err error)
- func (s *Syncer) SetSQLOperator(req *pb.HandleSubTaskSQLsRequest) error
- func (s *Syncer) Status() interface{}
- func (s *Syncer) Type() pb.UnitType
- func (s *Syncer) Update(cfg *config.SubTaskConfig) error
- func (s *Syncer) UpdateFromConfig(cfg *config.SubTaskConfig) error
- type TableType
Constants ¶
This section is empty.
Variables ¶
var ( // ErrDMLStatementFound defines an error which means we found unexpected dml statement found in query event. ErrDMLStatementFound = errors.New("only support ROW format binlog, unexpected DML statement found in query event") // IncompatibleDDLFormat is for incompatible ddl IncompatibleDDLFormat = `` /* 392-byte string literal not displayed */ )
var ( // ErrNotRowFormat defines an error which means binlog format is not ROW format. ErrNotRowFormat = errors.New("binlog format is not ROW format") )
var (
// MaxDDLConnectionTimeoutMinute also used by SubTask.ExecuteDDL
MaxDDLConnectionTimeoutMinute = 10
)
var ( // OnlineDDLSchemes is scheme name => online ddl handler OnlineDDLSchemes = map[string]func(*config.SubTaskConfig) (OnlinePlugin, error){ config.PT: NewPT, config.GHOST: NewGhost, } )
Functions ¶
func GenTableID ¶
GenTableID generates table ID
func InitStatusAndMetrics ¶
func InitStatusAndMetrics(addr string)
InitStatusAndMetrics register prometheus metrics and listen for status port.
func RegisterMetrics ¶
func RegisterMetrics(registry *prometheus.Registry)
RegisterMetrics registers metrics
func UnpackTableID ¶
UnpackTableID unpacks table ID to <schema, table> pair
Types ¶
type BinlogType ¶
type BinlogType uint8
BinlogType represents binlog sync type
const ( RemoteBinlog BinlogType = iota + 1 LocalBinlog )
binlog sync type
type CheckPoint ¶
type CheckPoint interface {
// Init initializes the CheckPoint
Init() error
// Close closes the CheckPoint
Close()
// Clear clears all checkpoints
Clear() error
// Load loads all checkpoints saved by CheckPoint
Load() error
// LoadMeta loads checkpoints from meta config item or file
LoadMeta() error
// SaveTablePoint saves checkpoint for specified table in memory
SaveTablePoint(sourceSchema, sourceTable string, pos mysql.Position)
// DeleteTablePoint deletes checkpoint for specified table in memory and storage
DeleteTablePoint(sourceSchema, sourceTable string) error
// IsNewerTablePoint checks whether job's checkpoint is newer than previous saved checkpoint
IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool
// SaveGlobalPoint saves the global binlog stream's checkpoint
// corresponding to Meta.Save
SaveGlobalPoint(pos mysql.Position)
// FlushGlobalPointsExcept flushes the global checkpoint and tables' checkpoints except exceptTables
// @exceptTables: [[schema, table]... ]
// corresponding to Meta.Flush
FlushPointsExcept(exceptTables [][]string) error
// GlobalPoint returns the global binlog stream's checkpoint
// corresponding to to Meta.Pos
GlobalPoint() mysql.Position
// FlushedGlobalPoint returns the flushed global binlog stream's checkpoint
// corresponding to to Meta.Pos
FlushedGlobalPoint() mysql.Position
// CheckGlobalPoint checks whether we should save global checkpoint
// corresponding to Meta.Check
CheckGlobalPoint() bool
// Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints
Rollback()
// GenUpdateForTableSQLs generates REPLACE checkpoint SQLs for tables
// @tables: [[schema, table]... ]
GenUpdateForTableSQLs(tables [][]string) ([]string, [][]interface{})
// String return text of global position
String() string
}
CheckPoint represents checkpoints status for syncer including global binlog's checkpoint and every table's checkpoint when save checkpoint, we must differ saving in memory from saving (flushing) to DB (or file) permanently for sharding merging, we must save checkpoint in memory to support skip when re-syncing for the special streamer but before all DDLs for a sharding group to be synced and executed, we should not save checkpoint permanently because, when restarting to continue the sync, all sharding DDLs must try-sync again
func NewRemoteCheckPoint ¶
func NewRemoteCheckPoint(cfg *config.SubTaskConfig, id string) CheckPoint
NewRemoteCheckPoint creates a new RemoteCheckPoint
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn represents a live DB connection
type DDLExecInfo ¶
DDLExecInfo used by syncer to execute or ignore sharding DDL it's specific to syncer, and can not be used by other process unit
func (*DDLExecInfo) BlockingDDLs ¶
func (i *DDLExecInfo) BlockingDDLs() []string
BlockingDDLs returns current blocking DDL
func (*DDLExecInfo) Chan ¶
func (i *DDLExecInfo) Chan(ddls []string) <-chan *DDLExecItem
Chan returns a receive only DDLExecItem chan
func (*DDLExecInfo) ClearBlockingDDL ¶
func (i *DDLExecInfo) ClearBlockingDDL()
ClearBlockingDDL clears current blocking DDL
func (*DDLExecInfo) Send ¶
func (i *DDLExecInfo) Send(ctx context.Context, item *DDLExecItem) error
Send sends an item (with request) to the chan
type DDLExecItem ¶
type DDLExecItem struct {
// contains filtered or unexported fields
}
DDLExecItem wraps request and response for a sharding DDL execution
type ExecErrorContext ¶
type ExecErrorContext struct {
// contains filtered or unexported fields
}
ExecErrorContext records a failed exec SQL information
type Ghost ¶
type Ghost struct {
// contains filtered or unexported fields
}
Ghost handles gh-ost online ddls (not complete, don't need to review it) _*_gho ghost table _*_ghc ghost changelog table _*_del ghost transh table
func (*Ghost) Apply ¶
func (g *Ghost) Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)
Apply implements interface. returns ddls, real schema, real table, error
func (*Ghost) InOnlineDDL ¶
InOnlineDDL implements interface
type GhostDDLInfo ¶
type GhostDDLInfo struct {
Schema string `json:"schema"`
Table string `json:"table"`
DDLs []string `json:"ddls"`
}
GhostDDLInfo stores ghost information and ddls
type Heartbeat ¶
type Heartbeat struct {
// contains filtered or unexported fields
}
Heartbeat represents a heartbeat mechanism to measures replication lag on mysql and tidb/mysql. Learn from: https://www.percona.com/doc/percona-toolkit/LATEST/pt-heartbeat.html
func GetHeartbeat ¶
func GetHeartbeat(cfg *HeartbeatConfig) (*Heartbeat, error)
GetHeartbeat gets singleton instance of Heartbeat
func (*Heartbeat) RemoveTask ¶
RemoveTask removes a previous added task
func (*Heartbeat) TryUpdateTaskTs ¶
TryUpdateTaskTs tries to update task's ts
type HeartbeatConfig ¶
type HeartbeatConfig struct {
// contains filtered or unexported fields
}
HeartbeatConfig represents Heartbeat configurations.
func (*HeartbeatConfig) Equal ¶
func (cfg *HeartbeatConfig) Equal(other *HeartbeatConfig) error
Equal tests whether config equals to other
type LocalMeta ¶
type LocalMeta struct {
sync.RWMutex
BinLogName string `toml:"binlog-name" json:"binlog-name"`
BinLogPos uint32 `toml:"binlog-pos" json:"binlog-pos"`
BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid"`
// contains filtered or unexported fields
}
LocalMeta is local meta struct.
func NewLocalMeta ¶
NewLocalMeta creates a new LocalMeta.
type Meta ¶
type Meta interface {
// Load loads meta information.
Load() error
// Save saves meta information.
Save(pos mysql.Position, gtid gtid.Set) error
// Flush write meta information
Flush() error
// Dirty checks whether meta in memory is dirty (need to Flush)
Dirty() bool
// Pos gets position information.
Pos() mysql.Position
// GTID() returns gtid information.
GTID() gtid.Set
}
Meta is the binlog meta information from sync source. When syncer restarts, we should reload meta info to guarantee continuous transmission.
type OnlineDDLStorage ¶
OnlineDDLStorage stores sharding group online ddls information
func NewOnlineDDLStorage ¶
func NewOnlineDDLStorage(cfg *config.SubTaskConfig) *OnlineDDLStorage
NewOnlineDDLStorage creates a new online ddl storager
func (*OnlineDDLStorage) Clear ¶
func (s *OnlineDDLStorage) Clear() error
Clear clears online ddl information from storage
func (*OnlineDDLStorage) Close ¶
func (s *OnlineDDLStorage) Close()
Close closes database connection
func (*OnlineDDLStorage) Delete ¶
func (s *OnlineDDLStorage) Delete(ghostSchema, ghostTable string) error
Delete deletes online ddl informations
func (*OnlineDDLStorage) Get ¶
func (s *OnlineDDLStorage) Get(ghostSchema, ghostTable string) *GhostDDLInfo
Get returns ddls by given schema/table
func (*OnlineDDLStorage) Init ¶
func (s *OnlineDDLStorage) Init() error
Init initials online handler
func (*OnlineDDLStorage) Load ¶
func (s *OnlineDDLStorage) Load() error
Load loads information from storage
func (*OnlineDDLStorage) Save ¶
func (s *OnlineDDLStorage) Save(ghostSchema, ghostTable, realSchema, realTable, ddl string) error
Save saves online ddl information
type OnlinePlugin ¶
type OnlinePlugin interface {
// Applys does:
// * detect online ddl
// * record changes
// * apply online ddl on real table
// returns sqls, replaced/self schema, repliaced/slef table, error
Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)
// InOnlineDDL returns true if an online ddl is unresolved
InOnlineDDL(schema, table string) bool
// Finish would delete online ddl from memory and storage
Finish(schema, table string) error
// TableType returns ghhost/real table
TableType(table string) TableType
// RealName returns real table name that removed ghost suffix and handled by table router
RealName(schema, table string) (string, string)
// GhostName returns ghost table name of a table
GhostName(schema, table string) (string, string)
// SchemaName returns scheme name (gh-ost/pt)
SchemeName() string
// Clear clears all online information
Clear() error
// Close closes online ddl plugin
Close()
}
OnlinePlugin handles online ddl solutions like pt, gh-ost
func NewGhost ¶
func NewGhost(cfg *config.SubTaskConfig) (OnlinePlugin, error)
NewGhost returns gh-oat online plugin
func NewPT ¶
func NewPT(cfg *config.SubTaskConfig) (OnlinePlugin, error)
NewPT returns pt online schema changes plugin
type PT ¶
type PT struct {
// contains filtered or unexported fields
}
PT handles pt online schema changes (_*).*_new ghost table (_*).*_old ghost transh table we don't support `--new-table-name` flag
func (*PT) Apply ¶
func (p *PT) Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)
Apply implements interface. returns ddls, real schema, real table, error
func (*PT) InOnlineDDL ¶
InOnlineDDL implements interface
type RemoteCheckPoint ¶
RemoteCheckPoint implements CheckPoint which using target database to store info NOTE: now we sync from relay log, so not add GTID support yet
func (*RemoteCheckPoint) CheckGlobalPoint ¶
func (cp *RemoteCheckPoint) CheckGlobalPoint() bool
CheckGlobalPoint implements CheckPoint.CheckGlobalPoint
func (*RemoteCheckPoint) Clear ¶
func (cp *RemoteCheckPoint) Clear() error
Clear implements CheckPoint.Clear
func (*RemoteCheckPoint) Close ¶
func (cp *RemoteCheckPoint) Close()
Close implements CheckPoint.Close
func (*RemoteCheckPoint) DeleteTablePoint ¶
func (cp *RemoteCheckPoint) DeleteTablePoint(sourceSchema, sourceTable string) error
DeleteTablePoint implements CheckPoint.DeleteTablePoint
func (*RemoteCheckPoint) FlushPointsExcept ¶
func (cp *RemoteCheckPoint) FlushPointsExcept(exceptTables [][]string) error
FlushPointsExcept implements CheckPoint.FlushPointsExcept
func (*RemoteCheckPoint) FlushedGlobalPoint ¶
func (cp *RemoteCheckPoint) FlushedGlobalPoint() mysql.Position
FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint
func (*RemoteCheckPoint) GenUpdateForTableSQLs ¶
func (cp *RemoteCheckPoint) GenUpdateForTableSQLs(tables [][]string) ([]string, [][]interface{})
GenUpdateForTableSQLs implements CheckPoint.GenUpdateForTableSQLs
func (*RemoteCheckPoint) GlobalPoint ¶
func (cp *RemoteCheckPoint) GlobalPoint() mysql.Position
GlobalPoint implements CheckPoint.GlobalPoint
func (*RemoteCheckPoint) Init ¶
func (cp *RemoteCheckPoint) Init() error
Init implements CheckPoint.Init
func (*RemoteCheckPoint) IsNewerTablePoint ¶
func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool
IsNewerTablePoint implements CheckPoint.IsNewerTablePoint
func (*RemoteCheckPoint) Load ¶
func (cp *RemoteCheckPoint) Load() error
Load implements CheckPoint.Load
func (*RemoteCheckPoint) LoadMeta ¶
func (cp *RemoteCheckPoint) LoadMeta() error
LoadMeta implements CheckPoint.LoadMeta
func (*RemoteCheckPoint) Rollback ¶
func (cp *RemoteCheckPoint) Rollback()
Rollback implements CheckPoint.Rollback
func (*RemoteCheckPoint) SaveGlobalPoint ¶
func (cp *RemoteCheckPoint) SaveGlobalPoint(pos mysql.Position)
SaveGlobalPoint implements CheckPoint.SaveGlobalPoint
func (*RemoteCheckPoint) SaveTablePoint ¶
func (cp *RemoteCheckPoint) SaveTablePoint(sourceSchema, sourceTable string, pos mysql.Position)
SaveTablePoint implements CheckPoint.SaveTablePoint
func (*RemoteCheckPoint) String ¶
func (cp *RemoteCheckPoint) String() string
String implements CheckPoint.String
type ShardingGroup ¶
type ShardingGroup struct {
sync.RWMutex
IsSchemaOnly bool // whether is a schema (database) only DDL TODO: zxc add schema-level syncing support later
// contains filtered or unexported fields
}
ShardingGroup represents a sharding DDL sync group
func NewShardingGroup ¶
func NewShardingGroup(sources []string, isSchemaOnly bool) *ShardingGroup
NewShardingGroup creates a new ShardingGroup
func (*ShardingGroup) FirstEndPosUnresolved ¶
func (sg *ShardingGroup) FirstEndPosUnresolved() *mysql.Position
FirstEndPosUnresolved returns the first DDL End_log_pos if un-resolved, else nil
func (*ShardingGroup) FirstPosUnresolved ¶
func (sg *ShardingGroup) FirstPosUnresolved() *mysql.Position
FirstPosUnresolved returns the first DDL pos if un-resolved, else nil
func (*ShardingGroup) InSyncing ¶
func (sg *ShardingGroup) InSyncing(source string) bool
InSyncing checks whether the source is in syncing
func (*ShardingGroup) IsUnresolved ¶
func (sg *ShardingGroup) IsUnresolved() bool
IsUnresolved return whether it's unresolved
func (*ShardingGroup) Leave ¶
func (sg *ShardingGroup) Leave(sources []string) error
Leave leaves from sharding group it, doesn't affect in syncing process used cases
- drop a database
- drop table
func (*ShardingGroup) Merge ¶
Merge merges new sources to exists used cases
- add a new database / table to exists sharding group
- add new table(s) to parent database's sharding group if group is un-resolved, we add it in sources and set it true othereise add it in source, set it false and increment remain
func (*ShardingGroup) Reset ¶
func (sg *ShardingGroup) Reset()
Reset resets all sources to un-synced state when the previous sharding DDL synced and resolved, we need reset it
func (*ShardingGroup) Sources ¶
func (sg *ShardingGroup) Sources() map[string]bool
Sources returns all sources (and whether synced)
func (*ShardingGroup) String ¶
func (sg *ShardingGroup) String() string
String implements Stringer.String
func (*ShardingGroup) Tables ¶
func (sg *ShardingGroup) Tables() [][]string
Tables returns all source tables' <schema, table> pair
func (*ShardingGroup) TrySync ¶
func (sg *ShardingGroup) TrySync(source string, pos, endPos mysql.Position, ddls []string) (bool, int, error)
TrySync tries to sync the sharding group if source not in sharding group before, it will be added
func (*ShardingGroup) UnresolvedGroupInfo ¶
func (sg *ShardingGroup) UnresolvedGroupInfo() *pb.ShardingGroup
UnresolvedGroupInfo returns pb.ShardingGroup if is unresolved, else returns nil
func (*ShardingGroup) UnresolvedTables ¶
func (sg *ShardingGroup) UnresolvedTables() [][]string
UnresolvedTables returns all source tables' <schema, table> pair if is unresolved, else returns nil
type ShardingGroupKeeper ¶
ShardingGroupKeeper used to keep ShardingGroup
func NewShardingGroupKeeper ¶
func NewShardingGroupKeeper() *ShardingGroupKeeper
NewShardingGroupKeeper creates a new ShardingGroupKeeper
func (*ShardingGroupKeeper) AddGroup ¶
func (k *ShardingGroupKeeper) AddGroup(targetSchema, targetTable string, sourceIDs []string, merge bool) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error)
AddGroup adds new group(s) according to target schema, table and source IDs
func (*ShardingGroupKeeper) AdjustGlobalPoint ¶
func (k *ShardingGroupKeeper) AdjustGlobalPoint(globalPoint mysql.Position) mysql.Position
AdjustGlobalPoint adjusts globalPoint with sharding groups' lowest first point
func (*ShardingGroupKeeper) Clear ¶
func (k *ShardingGroupKeeper) Clear()
Clear clears all sharding groups
func (*ShardingGroupKeeper) Group ¶
func (k *ShardingGroupKeeper) Group(targetSchema, targetTable string) *ShardingGroup
Group returns target table's group, nil if not exist
func (*ShardingGroupKeeper) Groups ¶
func (k *ShardingGroupKeeper) Groups() map[string]*ShardingGroup
Groups returns all sharding groups, often used for debug caution: do not modify the returned groups directly
func (*ShardingGroupKeeper) InSyncing ¶
func (k *ShardingGroupKeeper) InSyncing(targetSchema, targetTable, source string) bool
InSyncing checks whether the source table is in syncing
func (*ShardingGroupKeeper) LeaveGroup ¶
func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sources []string) error
LeaveGroup leaves group according to target schema, table and source IDs LeaveGroup doesn't affect in syncing process
func (*ShardingGroupKeeper) ResetGroups ¶
func (k *ShardingGroupKeeper) ResetGroups()
ResetGroups resets group's sync status
func (*ShardingGroupKeeper) TrySync ¶
func (k *ShardingGroupKeeper) TrySync(targetSchema, targetTable, source string, pos, endPos mysql.Position, ddls []string) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error)
TrySync tries to sync the sharding group returns
isSharding: whether the source table is in a sharding group group: the sharding group synced: whether the source table's sharding group synced remain: remain un-synced source table's count
func (*ShardingGroupKeeper) UnresolvedGroups ¶
func (k *ShardingGroupKeeper) UnresolvedGroups() []*pb.ShardingGroup
UnresolvedGroups returns sharding groups which are un-resolved caution: do not modify the returned groups directly
func (*ShardingGroupKeeper) UnresolvedTables ¶
func (k *ShardingGroupKeeper) UnresolvedTables() [][]string
UnresolvedTables returns all source tables which with DDLs are un-resolved NOTE: this func only ensure the returned tables are current un-resolved if passing the returned tables to other func (like checkpoint), must ensure their sync state not changed in this progress
type ShardingReSync ¶
type ShardingReSync struct {
// contains filtered or unexported fields
}
ShardingReSync represents re-sync info for a sharding DDL group
type Syncer ¶
Syncer can sync your MySQL data to another MySQL database.
func (*Syncer) ExecuteDDL ¶
ExecuteDDL executes or skips a hanging-up DDL when in sharding
func (*Syncer) Init ¶
Init initializes syncer for a sync task, but not start Process. if fail, it should not call s.Close. some check may move to checker later.
func (*Syncer) InjectSQLs ¶
InjectSQLs injects ddl into syncer as binlog events while meet xid/query event TODO: let user to specify special xid/query event position TODO: inject dml sqls
func (*Syncer) IsFreshTask ¶
IsFreshTask implements Unit.IsFreshTask
func (*Syncer) Pause ¶
func (s *Syncer) Pause()
Pause pauses the process, and it can be resumed later should cancel context from external TODO: it is not a true-meaning Pause because you can't stop it by calling Pause only.
func (*Syncer) Process ¶
func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult)
Process implements the dm.Unit interface.
func (*Syncer) Resume ¶
func (s *Syncer) Resume(ctx context.Context, pr chan pb.ProcessResult)
Resume resumes the paused process
func (*Syncer) SetSQLOperator ¶
func (s *Syncer) SetSQLOperator(req *pb.HandleSubTaskSQLsRequest) error
SetSQLOperator sets an SQL operator to syncer
func (*Syncer) Status ¶
func (s *Syncer) Status() interface{}
Status implements SubTaskUnit.Status it returns status, but does not calc status
func (*Syncer) Update ¶
func (s *Syncer) Update(cfg *config.SubTaskConfig) error
Update implements Unit.Update now, only support to update config for routes, filters, column-mappings, black-white-list now no config diff implemented, so simply re-init use new config
func (*Syncer) UpdateFromConfig ¶
func (s *Syncer) UpdateFromConfig(cfg *config.SubTaskConfig) error
UpdateFromConfig updates config for `From`