worker

package
v1.0.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2019 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitConditionHub

func InitConditionHub(w *Worker)

InitConditionHub inits the singleton instance of ConditionHub

func InitStatus

func InitStatus(lis net.Listener)

InitStatus initializes the HTTP status server

Types

type ConditionHub

type ConditionHub struct {
	// contains filtered or unexported fields
}

ConditionHub holds a DM-worker and it is used for wait condition detection

func GetConditionHub

func GetConditionHub() *ConditionHub

GetConditionHub returns singleton instance of ConditionHub

type Config

type Config struct {
	LogLevel  string `toml:"log-level" json:"log-level"`
	LogFile   string `toml:"log-file" json:"log-file"`
	LogRotate string `toml:"log-rotate" json:"log-rotate"`

	WorkerAddr string `toml:"worker-addr" json:"worker-addr"`

	EnableGTID  bool   `toml:"enable-gtid" json:"enable-gtid"`
	AutoFixGTID bool   `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
	MetaFile    string `toml:"meta-file" json:"meta-file"`
	RelayDir    string `toml:"relay-dir" json:"relay-dir"`
	ServerID    int    `toml:"server-id" json:"server-id"`
	Flavor      string `toml:"flavor" json:"flavor"`
	Charset     string `toml:"charset" json:"charset"`

	// relay synchronous starting point (if specified)
	RelayBinLogName string `toml:"relay-binlog-name" json:"relay-binlog-name"`
	RelayBinlogGTID string `toml:"relay-binlog-gtid" json:"relay-binlog-gtid"`

	SourceID string          `toml:"source-id" json:"source-id"`
	From     config.DBConfig `toml:"from" json:"from"`

	// config items for purger
	Purge purger.Config `toml:"purge" json:"purge"`

	ConfigFile string `json:"config-file"`
	// contains filtered or unexported fields
}

Config is the configuration.

func NewConfig

func NewConfig() *Config

NewConfig creates a new base config for worker.

func (*Config) Clone

func (c *Config) Clone() *Config

Clone clones a config

func (*Config) Parse

func (c *Config) Parse(arguments []string) error

Parse parses flag definitions from the argument list.

func (*Config) Reload

func (c *Config) Reload() error

Reload reload configure from ConfigFile

func (*Config) String

func (c *Config) String() string

func (*Config) Toml

func (c *Config) Toml() (string, error)

Toml returns TOML format representation of config

func (*Config) UpdateConfigFile

func (c *Config) UpdateConfigFile(content string) error

UpdateConfigFile write configure to local file

type RelayHolder

type RelayHolder struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

RelayHolder used to hold the relay unit

func NewRelayHolder

func NewRelayHolder(cfg *Config) *RelayHolder

NewRelayHolder creates a new RelayHolder

func (*RelayHolder) Close

func (h *RelayHolder) Close()

Close closes the holder

func (*RelayHolder) EarliestActiveRelayLog

func (h *RelayHolder) EarliestActiveRelayLog() *streamer.RelayLogInfo

EarliestActiveRelayLog implements RelayOperator.EarliestActiveRelayLog

func (*RelayHolder) Error

func (h *RelayHolder) Error() *pb.RelayError

Error returns relay unit's status

func (*RelayHolder) Init

func (h *RelayHolder) Init() error

Init initializes the holder

func (*RelayHolder) Migrate

func (h *RelayHolder) Migrate(ctx context.Context, binlogName string, binlogPos uint32) error

Migrate reset binlog name and binlog pos for relay unit

func (*RelayHolder) Operate

func (h *RelayHolder) Operate(ctx context.Context, req *pb.OperateRelayRequest) error

Operate operates relay unit

func (*RelayHolder) Result

func (h *RelayHolder) Result() *pb.ProcessResult

Result returns the result of the relay

func (*RelayHolder) Stage

func (h *RelayHolder) Stage() pb.Stage

Stage returns the stage of the relay

func (*RelayHolder) Start

func (h *RelayHolder) Start()

Start starts run the relay

func (*RelayHolder) Status

func (h *RelayHolder) Status() *pb.RelayStatus

Status returns relay unit's status

func (*RelayHolder) SwitchMaster

func (h *RelayHolder) SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error

SwitchMaster requests relay unit to switch master server

func (*RelayHolder) Update

func (h *RelayHolder) Update(ctx context.Context, cfg *Config) error

Update update relay config online

type Server

type Server struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Server accepts RPC requests dispatches requests to worker sends responses to RPC client

func NewServer

func NewServer(cfg *Config) *Server

NewServer creates a new Server

func (*Server) BreakDDLLock

func (s *Server) BreakDDLLock(ctx context.Context, req *pb.BreakDDLLockRequest) (*pb.CommonWorkerResponse, error)

BreakDDLLock implements WorkerServer.BreakDDLLock

func (*Server) Close

func (s *Server) Close()

Close close the RPC server

func (*Server) ExecuteDDL

func (s *Server) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) (*pb.CommonWorkerResponse, error)

ExecuteDDL implements WorkerServer.ExecuteDDL

func (*Server) FetchDDLInfo

func (s *Server) FetchDDLInfo(stream pb.Worker_FetchDDLInfoServer) error

FetchDDLInfo implements WorkerServer.FetchDDLInfo we do ping-pong send-receive on stream for DDL (lock) info if error occurred in Send / Recv, just retry in client

func (*Server) HandleSQLs

HandleSQLs implements WorkerServer.HandleSQLs

func (*Server) MigrateRelay

func (s *Server) MigrateRelay(ctx context.Context, req *pb.MigrateRelayRequest) (*pb.CommonWorkerResponse, error)

MigrateRelay migrate relay to original binlog pos

func (*Server) OperateRelay

func (s *Server) OperateRelay(ctx context.Context, req *pb.OperateRelayRequest) (*pb.OperateRelayResponse, error)

OperateRelay implements WorkerServer.OperateRelay

func (*Server) OperateSubTask

func (s *Server) OperateSubTask(ctx context.Context, req *pb.OperateSubTaskRequest) (*pb.OperateSubTaskResponse, error)

OperateSubTask implements WorkerServer.OperateSubTask

func (*Server) PurgeRelay

func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb.CommonWorkerResponse, error)

PurgeRelay implements WorkerServer.PurgeRelay

func (*Server) QueryError

func (s *Server) QueryError(ctx context.Context, req *pb.QueryErrorRequest) (*pb.QueryErrorResponse, error)

QueryError implements WorkerServer.QueryError

func (*Server) QueryStatus

func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*pb.QueryStatusResponse, error)

QueryStatus implements WorkerServer.QueryStatus

func (*Server) QueryWorkerConfig

func (s *Server) QueryWorkerConfig(ctx context.Context, req *pb.QueryWorkerConfigRequest) (*pb.QueryWorkerConfigResponse, error)

QueryWorkerConfig return worker config worker config is defined in worker directory now, to avoid circular import, we only return db config

func (*Server) Start

func (s *Server) Start() error

Start starts to serving

func (*Server) StartSubTask

func (s *Server) StartSubTask(ctx context.Context, req *pb.StartSubTaskRequest) (*pb.CommonWorkerResponse, error)

StartSubTask implements WorkerServer.StartSubTask

func (*Server) SwitchRelayMaster

func (s *Server) SwitchRelayMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) (*pb.CommonWorkerResponse, error)

SwitchRelayMaster implements WorkerServer.SwitchRelayMaster

func (*Server) UpdateRelayConfig

func (s *Server) UpdateRelayConfig(ctx context.Context, req *pb.UpdateRelayRequest) (*pb.CommonWorkerResponse, error)

UpdateRelayConfig updates config for relay and (dm-worker)

func (*Server) UpdateSubTask

func (s *Server) UpdateSubTask(ctx context.Context, req *pb.UpdateSubTaskRequest) (*pb.CommonWorkerResponse, error)

UpdateSubTask implements WorkerServer.UpdateSubTask

type SubTask

type SubTask struct {
	sync.RWMutex

	// only support sync one DDL lock one time, refine if needed
	DDLInfo chan *pb.DDLInfo // DDL info pending to sync
	// contains filtered or unexported fields
}

SubTask represents a sub task of data migration

func NewSubTask

func NewSubTask(cfg *config.SubTaskConfig) *SubTask

NewSubTask creates a new SubTask

func (*SubTask) CheckUnit

func (st *SubTask) CheckUnit() bool

CheckUnit checks whether current unit is sync unit

func (*SubTask) ClearDDLInfo

func (st *SubTask) ClearDDLInfo()

ClearDDLInfo clears current CacheDDLInfo.

func (*SubTask) ClearDDLLockInfo

func (st *SubTask) ClearDDLLockInfo()

ClearDDLLockInfo clears current DDLLockInfo

func (*SubTask) Close

func (st *SubTask) Close()

Close stops the sub task

func (*SubTask) CurrUnit

func (st *SubTask) CurrUnit() unit.Unit

CurrUnit returns current dm unit

func (*SubTask) DDLLockInfo

func (st *SubTask) DDLLockInfo() *pb.DDLLockInfo

DDLLockInfo returns current DDLLockInfo, maybe nil

func (*SubTask) Error

func (st *SubTask) Error() interface{}

Error returns the error of the current sub task

func (*SubTask) ExecuteDDL

func (st *SubTask) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) error

ExecuteDDL requests current unit to execute a DDL

func (*SubTask) GetDDLInfo

func (st *SubTask) GetDDLInfo() *pb.DDLInfo

GetDDLInfo returns current CacheDDLInfo.

func (*SubTask) Init

func (st *SubTask) Init() error

Init initializes the sub task processing units

func (*SubTask) Pause

func (st *SubTask) Pause() error

Pause pauses the running sub task

func (*SubTask) PrevUnit

func (st *SubTask) PrevUnit() unit.Unit

PrevUnit returns dm previous unit

func (*SubTask) Result

func (st *SubTask) Result() *pb.ProcessResult

Result returns the result of the sub task

func (*SubTask) Resume

func (st *SubTask) Resume() error

Resume resumes the paused sub task similar to Run

func (*SubTask) Run

func (st *SubTask) Run()

Run runs the sub task

func (*SubTask) SaveDDLInfo

func (st *SubTask) SaveDDLInfo(info *pb.DDLInfo) error

SaveDDLInfo saves a CacheDDLInfo.

func (*SubTask) SaveDDLLockInfo

func (st *SubTask) SaveDDLLockInfo(info *pb.DDLLockInfo) error

SaveDDLLockInfo saves a DDLLockInfo

func (*SubTask) SendBackDDLInfo

func (st *SubTask) SendBackDDLInfo(ctx context.Context, info *pb.DDLInfo) bool

SendBackDDLInfo sends DDL info back for pending

func (*SubTask) SetSyncerSQLOperator

func (st *SubTask) SetSyncerSQLOperator(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) error

SetSyncerSQLOperator sets an operator to syncer.

func (*SubTask) Stage

func (st *SubTask) Stage() pb.Stage

Stage returns the stage of the sub task

func (*SubTask) Status

func (st *SubTask) Status() interface{}

Status returns the status of the current sub task

func (*SubTask) StatusJSON

func (st *SubTask) StatusJSON() string

StatusJSON returns the status of the current sub task as json string

func (*SubTask) Update

func (st *SubTask) Update(cfg *config.SubTaskConfig) error

Update update the sub task's config

func (*SubTask) UpdateFromConfig

func (st *SubTask) UpdateFromConfig(cfg *config.SubTaskConfig) error

UpdateFromConfig updates config for `From`

type Worker

type Worker struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Worker manages sub tasks and process units for data migration

func NewWorker

func NewWorker(cfg *Config) *Worker

NewWorker creates a new Worker

func (*Worker) BreakDDLLock

func (w *Worker) BreakDDLLock(ctx context.Context, req *pb.BreakDDLLockRequest) error

BreakDDLLock breaks current blocking DDL lock and/or remove current DDLLockInfo

func (*Worker) Close

func (w *Worker) Close()

Close stops working and releases resources

func (*Worker) Error

func (w *Worker) Error(stName string) []*pb.SubTaskError

Error returns the error information of the worker (and sub tasks) if stName is empty, all sub task's error information will be returned

func (*Worker) ExecuteDDL

func (w *Worker) ExecuteDDL(ctx context.Context, req *pb.ExecDDLRequest) error

ExecuteDDL executes (or ignores) DDL (in sharding DDL lock, requested by dm-master)

func (*Worker) FetchDDLInfo

func (w *Worker) FetchDDLInfo(ctx context.Context) *pb.DDLInfo

FetchDDLInfo fetches all sub tasks' DDL info which pending to sync

func (*Worker) ForbidPurge

func (w *Worker) ForbidPurge() (bool, string)

ForbidPurge implements PurgeInterceptor.ForbidPurge

func (*Worker) HandleSQLs

func (w *Worker) HandleSQLs(ctx context.Context, req *pb.HandleSubTaskSQLsRequest) error

HandleSQLs implements Handler.HandleSQLs.

func (*Worker) Init

func (w *Worker) Init() error

Init initializes the worker

func (*Worker) MigrateRelay

func (w *Worker) MigrateRelay(ctx context.Context, binlogName string, binlogPos uint32) error

MigrateRelay migrate relay unit

func (*Worker) OperateRelay

func (w *Worker) OperateRelay(ctx context.Context, req *pb.OperateRelayRequest) error

OperateRelay operates relay unit

func (*Worker) PauseSubTask

func (w *Worker) PauseSubTask(name string) error

PauseSubTask pauses a running sub task

func (*Worker) PurgeRelay

func (w *Worker) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) error

PurgeRelay purges relay log files

func (*Worker) QueryConfig

func (w *Worker) QueryConfig(ctx context.Context) (*Config, error)

QueryConfig returns worker's config

func (*Worker) QueryError

func (w *Worker) QueryError(name string) []*pb.SubTaskError

QueryError query worker's sub tasks' error

func (*Worker) QueryStatus

func (w *Worker) QueryStatus(name string) []*pb.SubTaskStatus

QueryStatus query worker's sub tasks' status

func (*Worker) RecordDDLLockInfo

func (w *Worker) RecordDDLLockInfo(info *pb.DDLLockInfo) error

RecordDDLLockInfo records the current DDL lock info which pending to sync

func (*Worker) ResumeSubTask

func (w *Worker) ResumeSubTask(name string) error

ResumeSubTask resumes a paused sub task

func (*Worker) SendBackDDLInfo

func (w *Worker) SendBackDDLInfo(ctx context.Context, info *pb.DDLInfo) bool

SendBackDDLInfo sends sub tasks' DDL info back to pending

func (*Worker) Start

func (w *Worker) Start()

Start starts working

func (*Worker) StartSubTask

func (w *Worker) StartSubTask(cfg *config.SubTaskConfig) error

StartSubTask creates a sub task an run it

func (*Worker) Status

func (w *Worker) Status(stName string) []*pb.SubTaskStatus

Status returns the status of the worker (and sub tasks) if stName is empty, all sub task's status will be returned

func (*Worker) StatusJSON

func (w *Worker) StatusJSON(stName string) string

StatusJSON returns the status of the worker as json string

func (*Worker) StopSubTask

func (w *Worker) StopSubTask(name string) error

StopSubTask stops a running sub task

func (*Worker) SwitchRelayMaster

func (w *Worker) SwitchRelayMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error

SwitchRelayMaster switches relay unit's master server

func (*Worker) UpdateRelayConfig

func (w *Worker) UpdateRelayConfig(ctx context.Context, content string) error

UpdateRelayConfig update subTask ans relay unit configure online

func (*Worker) UpdateSubTask

func (w *Worker) UpdateSubTask(cfg *config.SubTaskConfig) error

UpdateSubTask update config for a sub task

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL