Documentation
¶
Index ¶
- func InitConditionHub(w *Worker)
- func InitStatus(lis net.Listener)
- type ConditionHub
- type Config
- type RelayHolder
- func (h *RelayHolder) Close()
- func (h *RelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo
- func (h *RelayHolder) Error() *pb.RelayError
- func (h *RelayHolder) Init() error
- func (h *RelayHolder) Migrate(ctx context.Context, binlogName string, binlogPos uint32) error
- func (h *RelayHolder) Operate(ctx context.Context, req *pb.OperateRelayRequest) error
- func (h *RelayHolder) Result() *pb.ProcessResult
- func (h *RelayHolder) Stage() pb.Stage
- func (h *RelayHolder) Start()
- func (h *RelayHolder) Status() *pb.RelayStatus
- func (h *RelayHolder) SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error
- func (h *RelayHolder) Update(ctx context.Context, cfg *Config) error
- type Server
- func (s *Server) BreakDDLLock(ctx context.Context, req *pb.BreakDDLLockRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) Close()
- func (s *Server) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) FetchDDLInfo(stream pb.Worker_FetchDDLInfoServer) error
- func (s *Server) HandleSQLs(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) MigrateRelay(ctx context.Context, req *pb.MigrateRelayRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) OperateRelay(ctx context.Context, req *pb.OperateRelayRequest) (*pb.OperateRelayResponse, error)
- func (s *Server) OperateSubTask(ctx context.Context, req *pb.OperateSubTaskRequest) (*pb.OperateSubTaskResponse, error)
- func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) QueryError(ctx context.Context, req *pb.QueryErrorRequest) (*pb.QueryErrorResponse, error)
- func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*pb.QueryStatusResponse, error)
- func (s *Server) QueryWorkerConfig(ctx context.Context, req *pb.QueryWorkerConfigRequest) (*pb.QueryWorkerConfigResponse, error)
- func (s *Server) Start() error
- func (s *Server) StartSubTask(ctx context.Context, req *pb.StartSubTaskRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) SwitchRelayMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) UpdateRelayConfig(ctx context.Context, req *pb.UpdateRelayRequest) (*pb.CommonWorkerResponse, error)
- func (s *Server) UpdateSubTask(ctx context.Context, req *pb.UpdateSubTaskRequest) (*pb.CommonWorkerResponse, error)
- type SubTask
- func (st *SubTask) CheckUnit() bool
- func (st *SubTask) ClearDDLInfo()
- func (st *SubTask) ClearDDLLockInfo()
- func (st *SubTask) Close()
- func (st *SubTask) CurrUnit() unit.Unit
- func (st *SubTask) DDLLockInfo() *pb.DDLLockInfo
- func (st *SubTask) Error() interface{}
- func (st *SubTask) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) error
- func (st *SubTask) GetDDLInfo() *pb.DDLInfo
- func (st *SubTask) Init() error
- func (st *SubTask) Pause() error
- func (st *SubTask) PrevUnit() unit.Unit
- func (st *SubTask) Result() *pb.ProcessResult
- func (st *SubTask) Resume() error
- func (st *SubTask) Run()
- func (st *SubTask) SaveDDLInfo(info *pb.DDLInfo) error
- func (st *SubTask) SaveDDLLockInfo(info *pb.DDLLockInfo) error
- func (st *SubTask) SendBackDDLInfo(ctx context.Context, info *pb.DDLInfo) bool
- func (st *SubTask) SetSyncerSQLOperator(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) error
- func (st *SubTask) Stage() pb.Stage
- func (st *SubTask) Status() interface{}
- func (st *SubTask) StatusJSON() string
- func (st *SubTask) Update(cfg *config.SubTaskConfig) error
- func (st *SubTask) UpdateFromConfig(cfg *config.SubTaskConfig) error
- type Worker
- func (w *Worker) BreakDDLLock(ctx context.Context, req *pb.BreakDDLLockRequest) error
- func (w *Worker) Close()
- func (w *Worker) Error(stName string) []*pb.SubTaskError
- func (w *Worker) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) error
- func (w *Worker) FetchDDLInfo(ctx context.Context) *pb.DDLInfo
- func (w *Worker) ForbidPurge() (bool, string)
- func (w *Worker) HandleSQLs(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) error
- func (w *Worker) Init() error
- func (w *Worker) MigrateRelay(ctx context.Context, binlogName string, binlogPos uint32) error
- func (w *Worker) OperateRelay(ctx context.Context, req *pb.OperateRelayRequest) error
- func (w *Worker) PauseSubTask(name string) error
- func (w *Worker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) error
- func (w *Worker) QueryConfig(ctx context.Context) (*Config, error)
- func (w *Worker) QueryError(name string) []*pb.SubTaskError
- func (w *Worker) QueryStatus(name string) []*pb.SubTaskStatus
- func (w *Worker) RecordDDLLockInfo(info *pb.DDLLockInfo) error
- func (w *Worker) ResumeSubTask(name string) error
- func (w *Worker) SendBackDDLInfo(ctx context.Context, info *pb.DDLInfo) bool
- func (w *Worker) Start()
- func (w *Worker) StartSubTask(cfg *config.SubTaskConfig) error
- func (w *Worker) Status(stName string) []*pb.SubTaskStatus
- func (w *Worker) StatusJSON(stName string) string
- func (w *Worker) StopSubTask(name string) error
- func (w *Worker) SwitchRelayMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error
- func (w *Worker) UpdateRelayConfig(ctx context.Context, content string) error
- func (w *Worker) UpdateSubTask(cfg *config.SubTaskConfig) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitConditionHub ¶
func InitConditionHub(w *Worker)
InitConditionHub inits the singleton instance of ConditionHub
Types ¶
type ConditionHub ¶
type ConditionHub struct {
// contains filtered or unexported fields
}
ConditionHub holds a DM-worker and it is used for wait condition detection
func GetConditionHub ¶
func GetConditionHub() *ConditionHub
GetConditionHub returns singleton instance of ConditionHub
type Config ¶
type Config struct {
LogLevel string `toml:"log-level" json:"log-level"`
LogFile string `toml:"log-file" json:"log-file"`
LogRotate string `toml:"log-rotate" json:"log-rotate"`
WorkerAddr string `toml:"worker-addr" json:"worker-addr"`
EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"`
AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
MetaFile string `toml:"meta-file" json:"meta-file"`
RelayDir string `toml:"relay-dir" json:"relay-dir"`
ServerID int `toml:"server-id" json:"server-id"`
Flavor string `toml:"flavor" json:"flavor"`
Charset string `toml:"charset" json:"charset"`
// relay synchronous starting point (if specified)
RelayBinLogName string `toml:"relay-binlog-name" json:"relay-binlog-name"`
RelayBinlogGTID string `toml:"relay-binlog-gtid" json:"relay-binlog-gtid"`
SourceID string `toml:"source-id" json:"source-id"`
From config.DBConfig `toml:"from" json:"from"`
// config items for purger
Purge purger.Config `toml:"purge" json:"purge"`
ConfigFile string `json:"config-file"`
// contains filtered or unexported fields
}
Config is the configuration.
func (*Config) UpdateConfigFile ¶
UpdateConfigFile write configure to local file
type RelayHolder ¶
RelayHolder used to hold the relay unit
func NewRelayHolder ¶
func NewRelayHolder(cfg *Config) *RelayHolder
NewRelayHolder creates a new RelayHolder
func (*RelayHolder) EarliestActiveRelayLog ¶
func (h *RelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo
EarliestActiveRelayLog implements RelayOperator.EarliestActiveRelayLog
func (*RelayHolder) Error ¶
func (h *RelayHolder) Error() *pb.RelayError
Error returns relay unit's status
func (*RelayHolder) Operate ¶
func (h *RelayHolder) Operate(ctx context.Context, req *pb.OperateRelayRequest) error
Operate operates relay unit
func (*RelayHolder) Result ¶
func (h *RelayHolder) Result() *pb.ProcessResult
Result returns the result of the relay
func (*RelayHolder) Stage ¶
func (h *RelayHolder) Stage() pb.Stage
Stage returns the stage of the relay
func (*RelayHolder) Status ¶
func (h *RelayHolder) Status() *pb.RelayStatus
Status returns relay unit's status
func (*RelayHolder) SwitchMaster ¶
func (h *RelayHolder) SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error
SwitchMaster requests relay unit to switch master server
type Server ¶
Server accepts RPC requests dispatches requests to worker sends responses to RPC client
func (*Server) BreakDDLLock ¶
func (s *Server) BreakDDLLock(ctx context.Context, req *pb.BreakDDLLockRequest) (*pb.CommonWorkerResponse, error)
BreakDDLLock implements WorkerServer.BreakDDLLock
func (*Server) ExecuteDDL ¶
func (s *Server) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) (*pb.CommonWorkerResponse, error)
ExecuteDDL implements WorkerServer.ExecuteDDL
func (*Server) FetchDDLInfo ¶
func (s *Server) FetchDDLInfo(stream pb.Worker_FetchDDLInfoServer) error
FetchDDLInfo implements WorkerServer.FetchDDLInfo we do ping-pong send-receive on stream for DDL (lock) info if error occurred in Send / Recv, just retry in client
func (*Server) HandleSQLs ¶
func (s *Server) HandleSQLs(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) (*pb.CommonWorkerResponse, error)
HandleSQLs implements WorkerServer.HandleSQLs
func (*Server) MigrateRelay ¶
func (s *Server) MigrateRelay(ctx context.Context, req *pb.MigrateRelayRequest) (*pb.CommonWorkerResponse, error)
MigrateRelay migrate relay to original binlog pos
func (*Server) OperateRelay ¶
func (s *Server) OperateRelay(ctx context.Context, req *pb.OperateRelayRequest) (*pb.OperateRelayResponse, error)
OperateRelay implements WorkerServer.OperateRelay
func (*Server) OperateSubTask ¶
func (s *Server) OperateSubTask(ctx context.Context, req *pb.OperateSubTaskRequest) (*pb.OperateSubTaskResponse, error)
OperateSubTask implements WorkerServer.OperateSubTask
func (*Server) PurgeRelay ¶
func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb.CommonWorkerResponse, error)
PurgeRelay implements WorkerServer.PurgeRelay
func (*Server) QueryError ¶
func (s *Server) QueryError(ctx context.Context, req *pb.QueryErrorRequest) (*pb.QueryErrorResponse, error)
QueryError implements WorkerServer.QueryError
func (*Server) QueryStatus ¶
func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*pb.QueryStatusResponse, error)
QueryStatus implements WorkerServer.QueryStatus
func (*Server) QueryWorkerConfig ¶
func (s *Server) QueryWorkerConfig(ctx context.Context, req *pb.QueryWorkerConfigRequest) (*pb.QueryWorkerConfigResponse, error)
QueryWorkerConfig return worker config worker config is defined in worker directory now, to avoid circular import, we only return db config
func (*Server) StartSubTask ¶
func (s *Server) StartSubTask(ctx context.Context, req *pb.StartSubTaskRequest) (*pb.CommonWorkerResponse, error)
StartSubTask implements WorkerServer.StartSubTask
func (*Server) SwitchRelayMaster ¶
func (s *Server) SwitchRelayMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) (*pb.CommonWorkerResponse, error)
SwitchRelayMaster implements WorkerServer.SwitchRelayMaster
func (*Server) UpdateRelayConfig ¶
func (s *Server) UpdateRelayConfig(ctx context.Context, req *pb.UpdateRelayRequest) (*pb.CommonWorkerResponse, error)
UpdateRelayConfig updates config for relay and (dm-worker)
func (*Server) UpdateSubTask ¶
func (s *Server) UpdateSubTask(ctx context.Context, req *pb.UpdateSubTaskRequest) (*pb.CommonWorkerResponse, error)
UpdateSubTask implements WorkerServer.UpdateSubTask
type SubTask ¶
type SubTask struct {
sync.RWMutex
// only support sync one DDL lock one time, refine if needed
DDLInfo chan *pb.DDLInfo // DDL info pending to sync
// contains filtered or unexported fields
}
SubTask represents a sub task of data migration
func NewSubTask ¶
func NewSubTask(cfg *config.SubTaskConfig) *SubTask
NewSubTask creates a new SubTask
func (*SubTask) ClearDDLInfo ¶
func (st *SubTask) ClearDDLInfo()
ClearDDLInfo clears current CacheDDLInfo.
func (*SubTask) ClearDDLLockInfo ¶
func (st *SubTask) ClearDDLLockInfo()
ClearDDLLockInfo clears current DDLLockInfo
func (*SubTask) DDLLockInfo ¶
func (st *SubTask) DDLLockInfo() *pb.DDLLockInfo
DDLLockInfo returns current DDLLockInfo, maybe nil
func (*SubTask) Error ¶
func (st *SubTask) Error() interface{}
Error returns the error of the current sub task
func (*SubTask) ExecuteDDL ¶
ExecuteDDL requests current unit to execute a DDL
func (*SubTask) GetDDLInfo ¶
GetDDLInfo returns current CacheDDLInfo.
func (*SubTask) Result ¶
func (st *SubTask) Result() *pb.ProcessResult
Result returns the result of the sub task
func (*SubTask) SaveDDLInfo ¶
SaveDDLInfo saves a CacheDDLInfo.
func (*SubTask) SaveDDLLockInfo ¶
func (st *SubTask) SaveDDLLockInfo(info *pb.DDLLockInfo) error
SaveDDLLockInfo saves a DDLLockInfo
func (*SubTask) SendBackDDLInfo ¶
SendBackDDLInfo sends DDL info back for pending
func (*SubTask) SetSyncerSQLOperator ¶
func (st *SubTask) SetSyncerSQLOperator(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) error
SetSyncerSQLOperator sets an operator to syncer.
func (*SubTask) Status ¶
func (st *SubTask) Status() interface{}
Status returns the status of the current sub task
func (*SubTask) StatusJSON ¶
StatusJSON returns the status of the current sub task as json string
func (*SubTask) Update ¶
func (st *SubTask) Update(cfg *config.SubTaskConfig) error
Update update the sub task's config
func (*SubTask) UpdateFromConfig ¶
func (st *SubTask) UpdateFromConfig(cfg *config.SubTaskConfig) error
UpdateFromConfig updates config for `From`
type Worker ¶
Worker manages sub tasks and process units for data migration
func (*Worker) BreakDDLLock ¶
BreakDDLLock breaks current blocking DDL lock and/or remove current DDLLockInfo
func (*Worker) Error ¶
func (w *Worker) Error(stName string) []*pb.SubTaskError
Error returns the error information of the worker (and sub tasks) if stName is empty, all sub task's error information will be returned
func (*Worker) ExecuteDDL ¶
ExecuteDDL executes (or ignores) DDL (in sharding DDL lock, requested by dm-master)
func (*Worker) FetchDDLInfo ¶
FetchDDLInfo fetches all sub tasks' DDL info which pending to sync
func (*Worker) ForbidPurge ¶
ForbidPurge implements PurgeInterceptor.ForbidPurge
func (*Worker) HandleSQLs ¶
HandleSQLs implements Handler.HandleSQLs.
func (*Worker) MigrateRelay ¶
MigrateRelay migrate relay unit
func (*Worker) OperateRelay ¶
OperateRelay operates relay unit
func (*Worker) PauseSubTask ¶
PauseSubTask pauses a running sub task
func (*Worker) PurgeRelay ¶
PurgeRelay purges relay log files
func (*Worker) QueryConfig ¶
QueryConfig returns worker's config
func (*Worker) QueryError ¶
func (w *Worker) QueryError(name string) []*pb.SubTaskError
QueryError query worker's sub tasks' error
func (*Worker) QueryStatus ¶
func (w *Worker) QueryStatus(name string) []*pb.SubTaskStatus
QueryStatus query worker's sub tasks' status
func (*Worker) RecordDDLLockInfo ¶
func (w *Worker) RecordDDLLockInfo(info *pb.DDLLockInfo) error
RecordDDLLockInfo records the current DDL lock info which pending to sync
func (*Worker) ResumeSubTask ¶
ResumeSubTask resumes a paused sub task
func (*Worker) SendBackDDLInfo ¶
SendBackDDLInfo sends sub tasks' DDL info back to pending
func (*Worker) StartSubTask ¶
func (w *Worker) StartSubTask(cfg *config.SubTaskConfig) error
StartSubTask creates a sub task an run it
func (*Worker) Status ¶
func (w *Worker) Status(stName string) []*pb.SubTaskStatus
Status returns the status of the worker (and sub tasks) if stName is empty, all sub task's status will be returned
func (*Worker) StatusJSON ¶
StatusJSON returns the status of the worker as json string
func (*Worker) StopSubTask ¶
StopSubTask stops a running sub task
func (*Worker) SwitchRelayMaster ¶
SwitchRelayMaster switches relay unit's master server
func (*Worker) UpdateRelayConfig ¶
UpdateRelayConfig update subTask ans relay unit configure online
func (*Worker) UpdateSubTask ¶
func (w *Worker) UpdateSubTask(cfg *config.SubTaskConfig) error
UpdateSubTask update config for a sub task