Documentation
¶
Index ¶
- Variables
- func CreateLoader(db *sql.DB, cfg *DBConfig, worker int, batchSize int, ...) (ld loader.Loader, err error)
- func NewPBSyncer(dir string, retentionDays int, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error)
- type CheckpointConfig
- type DBConfig
- type Item
- type KafkaSyncer
- type MysqlSyncer
- type OracleSyncer
- type Syncer
Constants ¶
This section is empty.
Variables ¶
var QueueSizeGauge *prometheus.GaugeVec
QueueSizeGauge to be used.
Functions ¶
func CreateLoader ¶
func CreateLoader( db *sql.DB, cfg *DBConfig, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string, info *loopbacksync.LoopBackSync, enableDispatch bool, enableCausility bool, ) (ld loader.Loader, err error)
CreateLoader create the Loader instance.
func NewPBSyncer ¶
func NewPBSyncer(dir string, retentionDays int, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error)
NewPBSyncer sync binlog to files
Types ¶
type CheckpointConfig ¶
type CheckpointConfig struct {
Type string `toml:"type" json:"type"`
Schema string `toml:"schema" json:"schema"`
Table string `toml:"table" json:"table"`
Host string `toml:"host" json:"host"`
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`
// if EncryptedPassword is not empty, Password will be ignore.
EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"`
Port int `toml:"port" json:"port"`
Security security.Config `toml:"security" json:"security"`
TLS *tls.Config `toml:"-" json:"-"`
//for oracle database
OracleServiceName string `toml:"oracle-service-name" json:"oracle-service-name"`
OracleConnectString string `toml:"oracle-connect-string" json:"oracle-connect-string"`
}
CheckpointConfig is the Checkpoint configuration.
type DBConfig ¶
type DBConfig struct {
Host string `toml:"host" json:"host"`
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`
Security security.Config `toml:"security" json:"security"`
TLS *tls.Config `toml:"-" json:"-"`
// if EncryptedPassword is not empty, Password will be ignore.
EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"`
SyncMode int `toml:"sync-mode" json:"sync-mode"`
Port int `toml:"port" json:"port"`
Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"`
BinlogFileDir string `toml:"dir" json:"dir"`
BinlogFileRetentionTime int `toml:"retention-time" json:"retention-time"`
Params map[string]string `toml:"params" json:"params"`
ReadTimeout time.Duration `toml:"-" json:"-"`
ReadTimeoutStr util.Duration `toml:"read-timeout" json:"read-timeout"`
Merge bool `toml:"merge" json:"merge"`
ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"`
KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"`
KafkaVersion string `toml:"kafka-version" json:"kafka-version"`
KafkaMaxMessages int `toml:"kafka-max-messages" json:"kafka-max-messages"`
KafkaClientID string `toml:"kafka-client-id" json:"kafka-client-id"`
KafkaMaxMessageSize int32 `toml:"kafka-max-message-size" json:"kafka-max-message-size"`
TopicName string `toml:"topic-name" json:"topic-name"`
// get it from pd
ClusterID uint64 `toml:"-" json:"-"`
//for oracle database
OracleServiceName string `toml:"oracle-service-name" json:"oracle-service-name"`
OracleConnectString string `toml:"oracle-connect-string" json:"oracle-connect-string"`
}
DBConfig is the DB configuration.
type Item ¶
type Item struct {
Binlog *pb.Binlog
PrewriteValue *pb.PrewriteValue // only for DML
Schema string
Table string
RelayLogPos pb.Pos
// Each item has a schemaVersion. with amend txn feature the prewrite DML's SchemaVersion could change.
// which makes restart & reload history DDL with previous SchemaVersion not reliable.
// so we should save this version as checkpoint.
SchemaVersion int64
// the applied TS executed in downstream, only for tidb
AppliedTS int64
// should skip to replicate this item at downstream
// currently only used for signal the syncer to learn that the downstream schema is changed
// when we don't replicate DDL.
ShouldSkip bool
}
Item contains information about binlog
type KafkaSyncer ¶
type KafkaSyncer struct {
// contains filtered or unexported fields
}
KafkaSyncer sync data to kafka
func NewKafka ¶
func NewKafka(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter) (*KafkaSyncer, error)
NewKafka returns a instance of KafkaSyncer
func (KafkaSyncer) Error ¶
func (s KafkaSyncer) Error() <-chan error
Error implements Syncer interface
func (*KafkaSyncer) SetSafeMode ¶
func (p *KafkaSyncer) SetSafeMode(mode bool) bool
SetSafeMode should be ignore by KafkaSyncer
func (KafkaSyncer) Successes ¶
func (s KafkaSyncer) Successes() <-chan *Item
Successes implements Syncer interface
func (*KafkaSyncer) Sync ¶
func (p *KafkaSyncer) Sync(item *Item) error
Sync implements Syncer interface
type MysqlSyncer ¶
type MysqlSyncer struct {
// contains filtered or unexported fields
}
MysqlSyncer sync binlog to Mysql
func NewMysqlSyncer ¶
func NewMysqlSyncer( cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string, relayer relay.Relayer, info *loopbacksync.LoopBackSync, enableDispatch bool, enableCausility bool, ) (*MysqlSyncer, error)
NewMysqlSyncer returns a instance of MysqlSyncer
func (MysqlSyncer) Error ¶
func (s MysqlSyncer) Error() <-chan error
Error implements Syncer interface
func (*MysqlSyncer) SetSafeMode ¶
func (m *MysqlSyncer) SetSafeMode(mode bool) bool
SetSafeMode make the MysqlSyncer to use safe mode or not
func (MysqlSyncer) Successes ¶
func (s MysqlSyncer) Successes() <-chan *Item
Successes implements Syncer interface
func (*MysqlSyncer) Sync ¶
func (m *MysqlSyncer) Sync(item *Item) error
Sync implements Syncer interface
type OracleSyncer ¶
type OracleSyncer struct {
// contains filtered or unexported fields
}
OracleSyncer sync binlog to Oracle
func NewOracleSyncer ¶
func NewOracleSyncer( cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string, relayer relay.Relayer, enableDispatch bool, enableCausility bool, tableRouter *router.Table, ) (*OracleSyncer, error)
NewOracleSyncer returns a instance of OracleSyncer
func (OracleSyncer) Error ¶
func (s OracleSyncer) Error() <-chan error
Error implements Syncer interface
func (*OracleSyncer) SetSafeMode ¶
func (m *OracleSyncer) SetSafeMode(mode bool) bool
SetSafeMode make the OracleSyncer to use safe mode or not
func (OracleSyncer) Successes ¶
func (s OracleSyncer) Successes() <-chan *Item
Successes implements Syncer interface
func (*OracleSyncer) Sync ¶
func (m *OracleSyncer) Sync(item *Item) error
Sync implements Syncer interface
type Syncer ¶
type Syncer interface {
// Sync the binlog item to downstream
Sync(item *Item) error
// will be close if Close normally or meet error, call Error() to check it
Successes() <-chan *Item
// Return not nil if fail to sync data to downstream or nil if closed normally
Error() <-chan error
// Close the Syncer, no more item can be added by `Sync`
// will drain all items and return nil if all successfully sync into downstream
Close() error
// SetSafeMode make the Syncer to use safe mode or not. If no need to handle, it should return false
SetSafeMode(mode bool) bool
}
Syncer sync binlog item to downstream