Documentation
¶
Overview ¶
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Variables
- func AddNewDB(Name string, ConnectUri string, GTID string, binlogFileName string, ...) *db
- func AddTable(db, schema, tableName, IgnoreTable string, channelId int) error
- func AddTableToServer(db, schemaName, tableName string, ToServerInfo ToServer) error
- func Close()
- func DelChannel(name string, channelID int) bool
- func DelDB(Name string) bool
- func DelTable(db, schema, tableName string) error
- func DoRecoveryByBackupData(fileContent string)
- func DoRecoverySnapshotData()
- func DoSaveSnapshotData()
- func GetDBObj(Name string) *db
- func GetFileQueue(dbName, SchemaName, tableName, ToServerID string) string
- func GetListDb() map[string]DbListStruct
- func GetSchemaAndTableBySplit(schemaAndTableName string) (schemaName, tableName string)
- func GetSchemaAndTableJoin(schema, tableName string) string
- func GetSnapshotData() ([]byte, error)
- func GetSnapshotData2() ([]byte, error)
- func InitStorage()
- func NewConsumeChannel(c *Channel) *consume_channel_obj
- func NewDb(Name string, ConnectUri string, GTID string, binlogFileName string, ...) *db
- func NewDbByNull() *db
- func Recovery(content *json.RawMessage, isStop bool)
- func SaveDBConfigInfo()
- func SaveDBInfoToFileData() interface{}
- func StopAllChannel()
- func UpdateDB(Name string, ConnectUri string, GTID string, binlogFileName string, ...) error
- func UpdateTable(db, schema, tableName, IgnoreTable string) error
- type Channel
- func (Channel *Channel) Close()
- func (Channel *Channel) GetChannel() chan mysql.EventReslut
- func (This *Channel) GetChannelMaxThreadNum() int
- func (Channel *Channel) GetCountChan() chan *count.FlowCount
- func (This *Channel) SetChannelMaxThreadNum(n int)
- func (Channel *Channel) SetFlowCountChan(flowChan chan *count.FlowCount)
- func (Channel *Channel) Start() chan mysql.EventReslut
- func (Channel *Channel) Stop()
- type DbListStruct
- type PositionStruct
- type StatusFlag
- type Table
- type TmpPositioinStruct
- type ToServer
- func (This *ToServer) AddWaitError(WaitErr error, WaitData *pluginDriver.PluginDataType) bool
- func (This *ToServer) AppendToFileQueue(data *pluginDriver.PluginDataType) error
- func (This *ToServer) ConsumeToServer(db *db, SchemaName string, TableName string)
- func (This *ToServer) DealWaitError() bool
- func (This *ToServer) DelWaitError() bool
- func (This *ToServer) FileQueueStart() error
- func (This *ToServer) GetFileQueueInfo() (info filequeue.QueueInfo, err error)
- func (This *ToServer) GetWaitErrorDeal() int
- func (This *ToServer) InitFileQueue(dbName, SchemaName, tableName string) *ToServer
- func (This *ToServer) PopFileQueue() (*pluginDriver.PluginDataType, error)
- func (This *ToServer) ReadLastFromFileQueue() (*pluginDriver.PluginDataType, error)
- func (This *ToServer) SkipBinlog(MyConsumerId int, SkipErrData *pluginDriver.PluginDataType) (err error)
- func (This *ToServer) Start()
- func (This *ToServer) Stop()
- func (This *ToServer) UpdateBinlogPosition(BinlogFileNum int, BinlogPosition uint32, GTID string, Timestamp uint32) bool
- type ToServerChan
- type ToServerStatus
Constants ¶
This section is empty.
Variables ¶
var AllSchemaAndTablekey string = GetSchemaAndTableJoin("*", "*")
var DbList map[string]*db
var DbLock sync.Mutex
var TmpPositioin []*TmpPositioinStruct
Functions ¶
func AddTableToServer ¶
func DelChannel ¶
func DoRecoveryByBackupData ¶ added in v1.2.2
func DoRecoveryByBackupData(fileContent string)
func DoRecoverySnapshotData ¶ added in v1.2.2
func DoRecoverySnapshotData()
func DoSaveSnapshotData ¶ added in v1.2.2
func DoSaveSnapshotData()
func GetFileQueue ¶ added in v1.2.2
func GetListDb ¶
func GetListDb() map[string]DbListStruct
func GetSchemaAndTableJoin ¶
func GetSnapshotData ¶ added in v1.2.2
func InitStorage ¶
func InitStorage()
func NewConsumeChannel ¶
func NewConsumeChannel(c *Channel) *consume_channel_obj
func NewDbByNull ¶
func NewDbByNull() *db
func Recovery ¶
func Recovery(content *json.RawMessage, isStop bool)
func SaveDBConfigInfo ¶
func SaveDBConfigInfo()
func SaveDBInfoToFileData ¶
func SaveDBInfoToFileData() interface{}
func StopAllChannel ¶
func StopAllChannel()
func UpdateTable ¶
Types ¶
type Channel ¶
type Channel struct {
sync.RWMutex
Name string
MaxThreadNum int // 消费通道的最大线程数
CurrentThreadNum int
Status StatusFlag //stop ,starting,running,wait
// contains filtered or unexported fields
}
func GetChannel ¶
func NewChannel ¶
func (*Channel) GetChannel ¶
func (Channel *Channel) GetChannel() chan mysql.EventReslut
func (*Channel) GetChannelMaxThreadNum ¶
func (*Channel) GetCountChan ¶
func (*Channel) SetChannelMaxThreadNum ¶
func (*Channel) SetFlowCountChan ¶
func (*Channel) Start ¶
func (Channel *Channel) Start() chan mysql.EventReslut
type DbListStruct ¶
type DbListStruct struct {
Name string
ConnectUri string
ConnStatus StatusFlag //close,stop,starting,running
ConnErr string
ChannelCount int
LastChannelID int
TableCount int
BinlogDumpFileName string
BinlogDumpPosition uint32
IsGtid bool
Gtid string
BinlogDumpTimestamp uint32
LastEventID uint64
MaxBinlogDumpFileName string
MaxBinlogDumpPosition uint32
ReplicateDoDb map[string]uint8
ServerId uint32
AddTime int64
}
func GetDbInfo ¶ added in v1.2.2
func GetDbInfo(dbname string) *DbListStruct
type PositionStruct ¶
type PositionStruct struct {
BinlogFileNum int
BinlogPosition uint32
GTID string
Timestamp uint32
EventID uint64
}
func CompareBinlogPositionAndReturnGreater ¶
func CompareBinlogPositionAndReturnGreater(Binlog1 *PositionStruct, Binlog2 *PositionStruct) *PositionStruct
func CompareBinlogPositionAndReturnLess ¶
func CompareBinlogPositionAndReturnLess(Binlog1 *PositionStruct, Binlog2 *PositionStruct) *PositionStruct
type StatusFlag ¶
type StatusFlag string
const ( DEFAULT StatusFlag = "" STARTING StatusFlag = "starting" RUNNING StatusFlag = "running" STOPPING StatusFlag = "stopping" STOPPED StatusFlag = "stopped" CLOSING StatusFlag = "closing" CLOSED StatusFlag = "closed" KILLING StatusFlag = "killing" KILLED StatusFlag = "killed" DELING StatusFlag = "deling" DELED StatusFlag = "deled" )
type TmpPositioinStruct ¶
type TmpPositioinStruct struct {
sync.RWMutex
Data map[string]*PositionStruct
}
type ToServer ¶
type ToServer struct {
sync.RWMutex
Key *string `json:"-"` // 上一级的key
ToServerID int
PluginName string
MustBeSuccess bool
FilterQuery bool
FilterUpdate bool
FieldList []string
ToServerKey string
LastSuccessBinlog *PositionStruct // 最后处理成功的位点信息
LastQueueBinlog *PositionStruct // 最后进入队列的位点信息
BinlogFileNum int // 支持到 1.8.x
BinlogPosition uint32 // 支持到 1.8.x
PluginParam map[string]interface{}
Status StatusFlag
ToServerChan *ToServerChan `json:"-"`
Error string
ErrorWaitDeal int
ErrorWaitData *pluginDriver.PluginDataType
LastBinlogFileNum int // 由 channel 提交到 ToServerChan 的最后一个位点 // 将会在 1.8.x 版本开始去掉这个字段
LastBinlogPosition uint32 // 假如 BinlogFileNum == LastBinlogFileNum && BinlogPosition == LastBinlogPosition 则说明这个位点是没有问题的 // 支持到 1.8.x
LastBinlogKey []byte `json:"-"` // 将数据保存到 level 的key
QueueMsgCount uint32 // 队列里的堆积的数量
FileQueueStatus bool // 是否启动文件队列
Notes string
ThreadCount int16 // 消费线程数量
FileQueueUsableCount uint32 // 在开始文件队列的配置下,每次写入 ToServerChan 后 ,在 FileQueueUsableCountTimeDiff 时间内 队列都是满的次数
FileQueueUsableCountStartTime int64 // 开始统计 FileQueueUsableCount 计算的时间
// contains filtered or unexported fields
}
func (*ToServer) AddWaitError ¶
func (This *ToServer) AddWaitError(WaitErr error, WaitData *pluginDriver.PluginDataType) bool
func (*ToServer) AppendToFileQueue ¶ added in v1.2.2
func (This *ToServer) AppendToFileQueue(data *pluginDriver.PluginDataType) error
将数据刷到磁盘队列中
func (*ToServer) ConsumeToServer ¶
func (*ToServer) DealWaitError ¶
func (*ToServer) DelWaitError ¶
func (*ToServer) FileQueueStart ¶ added in v1.2.2
文件队列启用
func (*ToServer) GetFileQueueInfo ¶ added in v1.2.2
查看文件队列基本信息
func (*ToServer) GetWaitErrorDeal ¶
func (*ToServer) InitFileQueue ¶ added in v1.2.2
初始化文件队列
func (*ToServer) PopFileQueue ¶ added in v1.2.2
func (This *ToServer) PopFileQueue() (*pluginDriver.PluginDataType, error)
从磁盘队列中取出最前面一条数据
func (*ToServer) ReadLastFromFileQueue ¶ added in v1.2.2
func (This *ToServer) ReadLastFromFileQueue() (*pluginDriver.PluginDataType, error)
从磁盘队列中取出最后面一条数据
func (*ToServer) SkipBinlog ¶
func (This *ToServer) SkipBinlog(MyConsumerId int, SkipErrData *pluginDriver.PluginDataType) (err error)
跳过位点
type ToServerChan ¶
type ToServerChan struct {
To chan *pluginDriver.PluginDataType
}
type ToServerStatus ¶
type ToServerStatus string