syncer

package
v1.0.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2019 License: Apache-2.0 Imports: 52 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrDMLStatementFound defines an error which means we found unexpected dml statement found in query event.
	ErrDMLStatementFound = errors.New("only support ROW format binlog, unexpected DML statement found in query event")
	// IncompatibleDDLFormat is for incompatible ddl
	IncompatibleDDLFormat = `` /* 392-byte string literal not displayed */

)
View Source
var (
	// ErrNotRowFormat defines an error which means binlog format is not ROW format.
	ErrNotRowFormat = errors.New("binlog format is not ROW format")
)
View Source
var (

	// MaxDDLConnectionTimeoutMinute also used by SubTask.ExecuteDDL
	MaxDDLConnectionTimeoutMinute = 10
)
View Source
var (
	// OnlineDDLSchemes is scheme name => online ddl handler
	OnlineDDLSchemes = map[string]func(*config.SubTaskConfig) (OnlinePlugin, error){
		config.PT:    NewPT,
		config.GHOST: NewGhost,
	}
)

Functions

func GenTableID

func GenTableID(schema, table string) (ID string, isSchemaOnly bool)

GenTableID generates table ID

func InitStatusAndMetrics

func InitStatusAndMetrics(addr string)

InitStatusAndMetrics register prometheus metrics and listen for status port.

func RegisterMetrics

func RegisterMetrics(registry *prometheus.Registry)

RegisterMetrics registers metrics

func UnpackTableID

func UnpackTableID(id string) (string, string)

UnpackTableID unpacks table ID to <schema, table> pair

Types

type BinlogType

type BinlogType uint8

BinlogType represents binlog sync type

const (
	RemoteBinlog BinlogType = iota + 1
	LocalBinlog
)

binlog sync type

type CheckPoint

type CheckPoint interface {
	// Init initializes the CheckPoint
	Init() error

	// Close closes the CheckPoint
	Close()

	// Clear clears all checkpoints
	Clear() error

	// Load loads all checkpoints saved by CheckPoint
	Load() error

	// LoadMeta loads checkpoints from meta config item or file
	LoadMeta() error

	// SaveTablePoint saves checkpoint for specified table in memory
	SaveTablePoint(sourceSchema, sourceTable string, pos mysql.Position)

	// DeleteTablePoint deletes checkpoint for specified table in memory and storage
	DeleteTablePoint(sourceSchema, sourceTable string) error

	// IsNewerTablePoint checks whether job's checkpoint is newer than previous saved checkpoint
	IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool

	// SaveGlobalPoint saves the global binlog stream's checkpoint
	// corresponding to Meta.Save
	SaveGlobalPoint(pos mysql.Position)

	// FlushGlobalPointsExcept flushes the global checkpoint and tables' checkpoints except exceptTables
	// @exceptTables: [[schema, table]... ]
	// corresponding to Meta.Flush
	FlushPointsExcept(exceptTables [][]string) error

	// GlobalPoint returns the global binlog stream's checkpoint
	// corresponding to to Meta.Pos
	GlobalPoint() mysql.Position

	// FlushedGlobalPoint returns the flushed global binlog stream's checkpoint
	// corresponding to to Meta.Pos
	FlushedGlobalPoint() mysql.Position

	// CheckGlobalPoint checks whether we should save global checkpoint
	// corresponding to Meta.Check
	CheckGlobalPoint() bool

	// Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints
	Rollback()

	// GenUpdateForTableSQLs generates REPLACE checkpoint SQLs for tables
	// @tables: [[schema, table]... ]
	GenUpdateForTableSQLs(tables [][]string) ([]string, [][]interface{})

	// String return text of global position
	String() string
}

CheckPoint represents checkpoints status for syncer including global binlog's checkpoint and every table's checkpoint when save checkpoint, we must differ saving in memory from saving (flushing) to DB (or file) permanently for sharding merging, we must save checkpoint in memory to support skip when re-syncing for the special streamer but before all DDLs for a sharding group to be synced and executed, we should not save checkpoint permanently because, when restarting to continue the sync, all sharding DDLs must try-sync again

func NewRemoteCheckPoint

func NewRemoteCheckPoint(cfg *config.SubTaskConfig, id string) CheckPoint

NewRemoteCheckPoint creates a new RemoteCheckPoint

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn represents a live DB connection

type DDLExecInfo

type DDLExecInfo struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

DDLExecInfo used by syncer to execute or ignore sharding DDL it's specific to syncer, and can not be used by other process unit

func NewDDLExecInfo

func NewDDLExecInfo() *DDLExecInfo

NewDDLExecInfo creates a new DDLExecInfo

func (*DDLExecInfo) BlockingDDLs

func (i *DDLExecInfo) BlockingDDLs() []string

BlockingDDLs returns current blocking DDL

func (*DDLExecInfo) Chan

func (i *DDLExecInfo) Chan(ddls []string) <-chan *DDLExecItem

Chan returns a receive only DDLExecItem chan

func (*DDLExecInfo) ClearBlockingDDL

func (i *DDLExecInfo) ClearBlockingDDL()

ClearBlockingDDL clears current blocking DDL

func (*DDLExecInfo) Close

func (i *DDLExecInfo) Close()

Close closes the chan

func (*DDLExecInfo) Renew

func (i *DDLExecInfo) Renew()

Renew renews the chan

func (*DDLExecInfo) Send

func (i *DDLExecInfo) Send(ctx context.Context, item *DDLExecItem) error

Send sends an item (with request) to the chan

type DDLExecItem

type DDLExecItem struct {
	// contains filtered or unexported fields
}

DDLExecItem wraps request and response for a sharding DDL execution

type ExecErrorContext

type ExecErrorContext struct {
	// contains filtered or unexported fields
}

ExecErrorContext records a failed exec SQL information

type Ghost

type Ghost struct {
	// contains filtered or unexported fields
}

Ghost handles gh-ost online ddls (not complete, don't need to review it) _*_gho ghost table _*_ghc ghost changelog table _*_del ghost transh table

func (*Ghost) Apply

func (g *Ghost) Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)

Apply implements interface. returns ddls, real schema, real table, error

func (*Ghost) Clear

func (g *Ghost) Clear() error

Clear clears online ddl information

func (*Ghost) Close

func (g *Ghost) Close()

Close implements interface

func (*Ghost) Finish

func (g *Ghost) Finish(schema, table string) error

Finish implements interface

func (*Ghost) GhostName

func (g *Ghost) GhostName(schema, table string) (string, string)

GhostName implements interface

func (*Ghost) InOnlineDDL

func (g *Ghost) InOnlineDDL(schema, table string) bool

InOnlineDDL implements interface

func (*Ghost) RealName

func (g *Ghost) RealName(schema, table string) (string, string)

RealName implements interface

func (*Ghost) SchemeName

func (g *Ghost) SchemeName() string

SchemeName implements interface

func (*Ghost) TableType

func (g *Ghost) TableType(table string) TableType

TableType implements interface

type GhostDDLInfo

type GhostDDLInfo struct {
	Schema string `json:"schema"`
	Table  string `json:"table"`

	DDLs []string `json:"ddls"`
}

GhostDDLInfo stores ghost information and ddls

type Heartbeat

type Heartbeat struct {
	// contains filtered or unexported fields
}

Heartbeat represents a heartbeat mechanism to measures replication lag on mysql and tidb/mysql. Learn from: https://www.percona.com/doc/percona-toolkit/LATEST/pt-heartbeat.html

func GetHeartbeat

func GetHeartbeat(cfg *HeartbeatConfig) (*Heartbeat, error)

GetHeartbeat gets singleton instance of Heartbeat

func (*Heartbeat) AddTask

func (h *Heartbeat) AddTask(name string) error

AddTask adds a new task

func (*Heartbeat) RemoveTask

func (h *Heartbeat) RemoveTask(name string) error

RemoveTask removes a previous added task

func (*Heartbeat) TryUpdateTaskTs

func (h *Heartbeat) TryUpdateTaskTs(taskName, schema, table string, data [][]interface{})

TryUpdateTaskTs tries to update task's ts

type HeartbeatConfig

type HeartbeatConfig struct {
	// contains filtered or unexported fields
}

HeartbeatConfig represents Heartbeat configurations.

func (*HeartbeatConfig) Equal

func (cfg *HeartbeatConfig) Equal(other *HeartbeatConfig) error

Equal tests whether config equals to other

type LocalMeta

type LocalMeta struct {
	sync.RWMutex

	BinLogName string `toml:"binlog-name" json:"binlog-name"`
	BinLogPos  uint32 `toml:"binlog-pos" json:"binlog-pos"`
	BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid"`
	// contains filtered or unexported fields
}

LocalMeta is local meta struct.

func NewLocalMeta

func NewLocalMeta(filename, flavor string) *LocalMeta

NewLocalMeta creates a new LocalMeta.

func (*LocalMeta) Dirty

func (lm *LocalMeta) Dirty() bool

Dirty implements Meta.Dirty

func (*LocalMeta) Flush

func (lm *LocalMeta) Flush() error

Flush implements Meta.Flush interface.

func (*LocalMeta) GTID

func (lm *LocalMeta) GTID() gtid.Set

GTID implements Meta.GTID interface

func (*LocalMeta) Load

func (lm *LocalMeta) Load() error

Load implements Meta.Load interface.

func (*LocalMeta) Pos

func (lm *LocalMeta) Pos() mysql.Position

Pos implements Meta.Pos interface.

func (*LocalMeta) Save

func (lm *LocalMeta) Save(pos mysql.Position, gs gtid.Set) error

Save implements Meta.Save interface.

func (*LocalMeta) String

func (lm *LocalMeta) String() string

type Meta

type Meta interface {
	// Load loads meta information.
	Load() error

	// Save saves meta information.
	Save(pos mysql.Position, gtid gtid.Set) error

	// Flush write meta information
	Flush() error

	// Dirty checks whether meta in memory is dirty (need to Flush)
	Dirty() bool

	// Pos gets position information.
	Pos() mysql.Position

	// GTID() returns gtid information.
	GTID() gtid.Set
}

Meta is the binlog meta information from sync source. When syncer restarts, we should reload meta info to guarantee continuous transmission.

type OnlineDDLStorage

type OnlineDDLStorage struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

OnlineDDLStorage stores sharding group online ddls information

func NewOnlineDDLStorage

func NewOnlineDDLStorage(cfg *config.SubTaskConfig) *OnlineDDLStorage

NewOnlineDDLStorage creates a new online ddl storager

func (*OnlineDDLStorage) Clear

func (s *OnlineDDLStorage) Clear() error

Clear clears online ddl information from storage

func (*OnlineDDLStorage) Close

func (s *OnlineDDLStorage) Close()

Close closes database connection

func (*OnlineDDLStorage) Delete

func (s *OnlineDDLStorage) Delete(ghostSchema, ghostTable string) error

Delete deletes online ddl informations

func (*OnlineDDLStorage) Get

func (s *OnlineDDLStorage) Get(ghostSchema, ghostTable string) *GhostDDLInfo

Get returns ddls by given schema/table

func (*OnlineDDLStorage) Init

func (s *OnlineDDLStorage) Init() error

Init initials online handler

func (*OnlineDDLStorage) Load

func (s *OnlineDDLStorage) Load() error

Load loads information from storage

func (*OnlineDDLStorage) Save

func (s *OnlineDDLStorage) Save(ghostSchema, ghostTable, realSchema, realTable, ddl string) error

Save saves online ddl information

type OnlinePlugin

type OnlinePlugin interface {
	// Applys does:
	// * detect online ddl
	// * record changes
	// * apply online ddl on real table
	// returns sqls, replaced/self schema, repliaced/slef table, error
	Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)
	// InOnlineDDL returns true if an online ddl is unresolved
	InOnlineDDL(schema, table string) bool
	// Finish would delete online ddl from memory and storage
	Finish(schema, table string) error
	// TableType returns ghhost/real table
	TableType(table string) TableType
	// RealName returns real table name that removed ghost suffix and handled by table router
	RealName(schema, table string) (string, string)
	// GhostName returns ghost table name of a table
	GhostName(schema, table string) (string, string)
	// SchemaName returns scheme name (gh-ost/pt)
	SchemeName() string
	// Clear clears all online information
	Clear() error
	// Close closes online ddl plugin
	Close()
}

OnlinePlugin handles online ddl solutions like pt, gh-ost

func NewGhost

func NewGhost(cfg *config.SubTaskConfig) (OnlinePlugin, error)

NewGhost returns gh-oat online plugin

func NewPT

func NewPT(cfg *config.SubTaskConfig) (OnlinePlugin, error)

NewPT returns pt online schema changes plugin

type PT

type PT struct {
	// contains filtered or unexported fields
}

PT handles pt online schema changes (_*).*_new ghost table (_*).*_old ghost transh table we don't support `--new-table-name` flag

func (*PT) Apply

func (p *PT) Apply(tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)

Apply implements interface. returns ddls, real schema, real table, error

func (*PT) Clear

func (p *PT) Clear() error

Clear clears online ddl information

func (*PT) Close

func (p *PT) Close()

Close implements interface

func (*PT) Finish

func (p *PT) Finish(schema, table string) error

Finish implements interface

func (*PT) GhostName

func (p *PT) GhostName(schema, table string) (string, string)

GhostName implements interface

func (*PT) InOnlineDDL

func (p *PT) InOnlineDDL(schema, table string) bool

InOnlineDDL implements interface

func (*PT) RealName

func (p *PT) RealName(schema, table string) (string, string)

RealName implements interface

func (*PT) SchemeName

func (p *PT) SchemeName() string

SchemeName implements interface

func (*PT) TableType

func (p *PT) TableType(table string) TableType

TableType implements interface

type RemoteCheckPoint

type RemoteCheckPoint struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

RemoteCheckPoint implements CheckPoint which using target database to store info NOTE: now we sync from relay log, so not add GTID support yet

func (*RemoteCheckPoint) CheckGlobalPoint

func (cp *RemoteCheckPoint) CheckGlobalPoint() bool

CheckGlobalPoint implements CheckPoint.CheckGlobalPoint

func (*RemoteCheckPoint) Clear

func (cp *RemoteCheckPoint) Clear() error

Clear implements CheckPoint.Clear

func (*RemoteCheckPoint) Close

func (cp *RemoteCheckPoint) Close()

Close implements CheckPoint.Close

func (*RemoteCheckPoint) DeleteTablePoint

func (cp *RemoteCheckPoint) DeleteTablePoint(sourceSchema, sourceTable string) error

DeleteTablePoint implements CheckPoint.DeleteTablePoint

func (*RemoteCheckPoint) FlushPointsExcept

func (cp *RemoteCheckPoint) FlushPointsExcept(exceptTables [][]string) error

FlushPointsExcept implements CheckPoint.FlushPointsExcept

func (*RemoteCheckPoint) FlushedGlobalPoint

func (cp *RemoteCheckPoint) FlushedGlobalPoint() mysql.Position

FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint

func (*RemoteCheckPoint) GenUpdateForTableSQLs

func (cp *RemoteCheckPoint) GenUpdateForTableSQLs(tables [][]string) ([]string, [][]interface{})

GenUpdateForTableSQLs implements CheckPoint.GenUpdateForTableSQLs

func (*RemoteCheckPoint) GlobalPoint

func (cp *RemoteCheckPoint) GlobalPoint() mysql.Position

GlobalPoint implements CheckPoint.GlobalPoint

func (*RemoteCheckPoint) Init

func (cp *RemoteCheckPoint) Init() error

Init implements CheckPoint.Init

func (*RemoteCheckPoint) IsNewerTablePoint

func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool

IsNewerTablePoint implements CheckPoint.IsNewerTablePoint

func (*RemoteCheckPoint) Load

func (cp *RemoteCheckPoint) Load() error

Load implements CheckPoint.Load

func (*RemoteCheckPoint) LoadMeta

func (cp *RemoteCheckPoint) LoadMeta() error

LoadMeta implements CheckPoint.LoadMeta

func (*RemoteCheckPoint) Rollback

func (cp *RemoteCheckPoint) Rollback()

Rollback implements CheckPoint.Rollback

func (*RemoteCheckPoint) SaveGlobalPoint

func (cp *RemoteCheckPoint) SaveGlobalPoint(pos mysql.Position)

SaveGlobalPoint implements CheckPoint.SaveGlobalPoint

func (*RemoteCheckPoint) SaveTablePoint

func (cp *RemoteCheckPoint) SaveTablePoint(sourceSchema, sourceTable string, pos mysql.Position)

SaveTablePoint implements CheckPoint.SaveTablePoint

func (*RemoteCheckPoint) String

func (cp *RemoteCheckPoint) String() string

String implements CheckPoint.String

type ShardingGroup

type ShardingGroup struct {
	sync.RWMutex

	IsSchemaOnly bool // whether is a schema (database) only DDL TODO: zxc add schema-level syncing support later
	// contains filtered or unexported fields
}

ShardingGroup represents a sharding DDL sync group

func NewShardingGroup

func NewShardingGroup(sources []string, isSchemaOnly bool) *ShardingGroup

NewShardingGroup creates a new ShardingGroup

func (*ShardingGroup) FirstEndPosUnresolved

func (sg *ShardingGroup) FirstEndPosUnresolved() *mysql.Position

FirstEndPosUnresolved returns the first DDL End_log_pos if un-resolved, else nil

func (*ShardingGroup) FirstPosUnresolved

func (sg *ShardingGroup) FirstPosUnresolved() *mysql.Position

FirstPosUnresolved returns the first DDL pos if un-resolved, else nil

func (*ShardingGroup) InSyncing

func (sg *ShardingGroup) InSyncing(source string) bool

InSyncing checks whether the source is in syncing

func (*ShardingGroup) IsUnresolved

func (sg *ShardingGroup) IsUnresolved() bool

IsUnresolved return whether it's unresolved

func (*ShardingGroup) Leave

func (sg *ShardingGroup) Leave(sources []string) error

Leave leaves from sharding group it, doesn't affect in syncing process used cases

  • drop a database
  • drop table

func (*ShardingGroup) Merge

func (sg *ShardingGroup) Merge(sources []string) (bool, bool, int, error)

Merge merges new sources to exists used cases

  • add a new database / table to exists sharding group
  • add new table(s) to parent database's sharding group if group is un-resolved, we add it in sources and set it true othereise add it in source, set it false and increment remain

func (*ShardingGroup) Reset

func (sg *ShardingGroup) Reset()

Reset resets all sources to un-synced state when the previous sharding DDL synced and resolved, we need reset it

func (*ShardingGroup) Sources

func (sg *ShardingGroup) Sources() map[string]bool

Sources returns all sources (and whether synced)

func (*ShardingGroup) String

func (sg *ShardingGroup) String() string

String implements Stringer.String

func (*ShardingGroup) Tables

func (sg *ShardingGroup) Tables() [][]string

Tables returns all source tables' <schema, table> pair

func (*ShardingGroup) TrySync

func (sg *ShardingGroup) TrySync(source string, pos, endPos mysql.Position, ddls []string) (bool, int, error)

TrySync tries to sync the sharding group if source not in sharding group before, it will be added

func (*ShardingGroup) UnresolvedGroupInfo

func (sg *ShardingGroup) UnresolvedGroupInfo() *pb.ShardingGroup

UnresolvedGroupInfo returns pb.ShardingGroup if is unresolved, else returns nil

func (*ShardingGroup) UnresolvedTables

func (sg *ShardingGroup) UnresolvedTables() [][]string

UnresolvedTables returns all source tables' <schema, table> pair if is unresolved, else returns nil

type ShardingGroupKeeper

type ShardingGroupKeeper struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

ShardingGroupKeeper used to keep ShardingGroup

func NewShardingGroupKeeper

func NewShardingGroupKeeper() *ShardingGroupKeeper

NewShardingGroupKeeper creates a new ShardingGroupKeeper

func (*ShardingGroupKeeper) AddGroup

func (k *ShardingGroupKeeper) AddGroup(targetSchema, targetTable string, sourceIDs []string, merge bool) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error)

AddGroup adds new group(s) according to target schema, table and source IDs

func (*ShardingGroupKeeper) AdjustGlobalPoint

func (k *ShardingGroupKeeper) AdjustGlobalPoint(globalPoint mysql.Position) mysql.Position

AdjustGlobalPoint adjusts globalPoint with sharding groups' lowest first point

func (*ShardingGroupKeeper) Clear

func (k *ShardingGroupKeeper) Clear()

Clear clears all sharding groups

func (*ShardingGroupKeeper) Group

func (k *ShardingGroupKeeper) Group(targetSchema, targetTable string) *ShardingGroup

Group returns target table's group, nil if not exist

func (*ShardingGroupKeeper) Groups

func (k *ShardingGroupKeeper) Groups() map[string]*ShardingGroup

Groups returns all sharding groups, often used for debug caution: do not modify the returned groups directly

func (*ShardingGroupKeeper) InSyncing

func (k *ShardingGroupKeeper) InSyncing(targetSchema, targetTable, source string) bool

InSyncing checks whether the source table is in syncing

func (*ShardingGroupKeeper) LeaveGroup

func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sources []string) error

LeaveGroup leaves group according to target schema, table and source IDs LeaveGroup doesn't affect in syncing process

func (*ShardingGroupKeeper) ResetGroups

func (k *ShardingGroupKeeper) ResetGroups()

ResetGroups resets group's sync status

func (*ShardingGroupKeeper) TrySync

func (k *ShardingGroupKeeper) TrySync(targetSchema, targetTable, source string, pos, endPos mysql.Position, ddls []string) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error)

TrySync tries to sync the sharding group returns

isSharding: whether the source table is in a sharding group
group: the sharding group
synced: whether the source table's sharding group synced
remain: remain un-synced source table's count

func (*ShardingGroupKeeper) UnresolvedGroups

func (k *ShardingGroupKeeper) UnresolvedGroups() []*pb.ShardingGroup

UnresolvedGroups returns sharding groups which are un-resolved caution: do not modify the returned groups directly

func (*ShardingGroupKeeper) UnresolvedTables

func (k *ShardingGroupKeeper) UnresolvedTables() [][]string

UnresolvedTables returns all source tables which with DDLs are un-resolved NOTE: this func only ensure the returned tables are current un-resolved if passing the returned tables to other func (like checkpoint), must ensure their sync state not changed in this progress

type ShardingReSync

type ShardingReSync struct {
	// contains filtered or unexported fields
}

ShardingReSync represents re-sync info for a sharding DDL group

type Syncer

type Syncer struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Syncer can sync your MySQL data to another MySQL database.

func NewSyncer

func NewSyncer(cfg *config.SubTaskConfig) *Syncer

NewSyncer creates a new Syncer.

func (*Syncer) Close

func (s *Syncer) Close()

Close closes syncer.

func (*Syncer) DDLInfo

func (s *Syncer) DDLInfo() <-chan *pb.DDLInfo

DDLInfo returns a chan from which can receive DDLInfo

func (*Syncer) Error

func (s *Syncer) Error() interface{}

Error implements SubTaskUnit.Error

func (*Syncer) ExecuteDDL

func (s *Syncer) ExecuteDDL(ctx context.Context, execReq *pb.ExecDDLRequest) (<-chan error, error)

ExecuteDDL executes or skips a hanging-up DDL when in sharding

func (*Syncer) Init

func (s *Syncer) Init() error

Init initializes syncer for a sync task, but not start Process. if fail, it should not call s.Close. some check may move to checker later.

func (*Syncer) InjectSQLs

func (s *Syncer) InjectSQLs(ctx context.Context, sqls []string) error

InjectSQLs injects ddl into syncer as binlog events while meet xid/query event TODO: let user to specify special xid/query event position TODO: inject dml sqls

func (*Syncer) IsFreshTask

func (s *Syncer) IsFreshTask() (bool, error)

IsFreshTask implements Unit.IsFreshTask

func (*Syncer) Pause

func (s *Syncer) Pause()

Pause pauses the process, and it can be resumed later should cancel context from external TODO: it is not a true-meaning Pause because you can't stop it by calling Pause only.

func (*Syncer) Process

func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult)

Process implements the dm.Unit interface.

func (*Syncer) Resume

func (s *Syncer) Resume(ctx context.Context, pr chan pb.ProcessResult)

Resume resumes the paused process

func (*Syncer) Run

func (s *Syncer) Run(ctx context.Context) (err error)

Run starts running for sync, we should guarantee it can rerun when paused.

func (*Syncer) SetSQLOperator

func (s *Syncer) SetSQLOperator(req *pb.HandleSubTaskSQLsRequest) error

SetSQLOperator sets an SQL operator to syncer

func (*Syncer) Status

func (s *Syncer) Status() interface{}

Status implements SubTaskUnit.Status it returns status, but does not calc status

func (*Syncer) Type

func (s *Syncer) Type() pb.UnitType

Type implements Unit.Type

func (*Syncer) Update

func (s *Syncer) Update(cfg *config.SubTaskConfig) error

Update implements Unit.Update now, only support to update config for routes, filters, column-mappings, black-white-list now no config diff implemented, so simply re-init use new config

func (*Syncer) UpdateFromConfig

func (s *Syncer) UpdateFromConfig(cfg *config.SubTaskConfig) error

UpdateFromConfig updates config for `From`

type TableType

type TableType string

TableType is type of table

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL