Documentation
¶
Overview ¶
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Variables
- func AddNewDB(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, ...) *db
- func AddTable(db string, schema string, tableName string, channelId int) error
- func AddTableToServer(db string, schemaName string, tableName string, ToServerInfo ToServer) error
- func CompareBinlogPositionAndReturnGreater(BinlogFileNum1 int, BinlogPosition1 uint32, BinlogFileNum2 int, ...) (int, uint32)
- func CompareBinlogPositionAndReturnLess(BinlogFileNum1 int, BinlogPosition1 uint32, BinlogFileNum2 int, ...) (int, uint32)
- func DelChannel(name string, channelID int) bool
- func DelDB(Name string) bool
- func DelTable(db string, schema string, tableName string) error
- func GetDBObj(Name string) *db
- func GetListDb() map[string]DbListStruct
- func GetSchemaAndTableBySplit(schemaAndTableName string) (schemaName, tableName string)
- func GetSchemaAndTableJoin(schema, tableName string) string
- func InitStorage()
- func InitStrageChan(ch chan int8)
- func NewConsumeChannel(c *Channel) *consume_channel_obj
- func NewDb(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, ...) *db
- func Recovery(content *json.RawMessage)
- func SaveDBConfigInfo()
- func SaveDBInfoToFileData() interface{}
- func StopAllChannel()
- func UpdateDB(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, ...) error
- type Channel
- func (Channel *Channel) Close()
- func (Channel *Channel) GetChannel() chan mysql.EventReslut
- func (This *Channel) GetChannelMaxThreadNum() int
- func (Channel *Channel) GetCountChan() chan *count.FlowCount
- func (This *Channel) SetChannelMaxThreadNum(n int)
- func (Channel *Channel) SetFlowCountChan(flowChan chan *count.FlowCount)
- func (Channel *Channel) Start() chan mysql.EventReslut
- func (Channel *Channel) Stop()
- type DbListStruct
- type Table
- type TmpPositioinStruct
- type ToServer
- func (This *ToServer) AddWaitError(WaitErr error, WaitData interface{}) bool
- func (This *ToServer) ConsumeToServer(db *db, SchemaName string, TableName string)
- func (This *ToServer) DealWaitError() bool
- func (This *ToServer) DelWaitError() bool
- func (This *ToServer) GetWaitErrorDeal() int
- func (This *ToServer) UpdateBinlogPosition(BinlogFileNum int, BinlogPosition uint32) bool
- type ToServerChan
Constants ¶
This section is empty.
Variables ¶
var DbList map[string]*db
var DbLock sync.Mutex
var TmpPositioin []*TmpPositioinStruct
Functions ¶
func AddTableToServer ¶
func DelChannel ¶
func GetListDb ¶
func GetListDb() map[string]DbListStruct
func GetSchemaAndTableJoin ¶
func InitStorage ¶
func InitStorage()
func InitStrageChan ¶
func InitStrageChan(ch chan int8)
func NewConsumeChannel ¶
func NewConsumeChannel(c *Channel) *consume_channel_obj
func Recovery ¶
func Recovery(content *json.RawMessage)
func SaveDBConfigInfo ¶
func SaveDBConfigInfo()
func SaveDBInfoToFileData ¶
func SaveDBInfoToFileData() interface{}
func StopAllChannel ¶
func StopAllChannel()
Types ¶
type Channel ¶
type Channel struct {
sync.Mutex
Name string
MaxThreadNum int // 消费通道的最大线程数
CurrentThreadNum int
Status string //stop ,starting,running,wait
// contains filtered or unexported fields
}
func GetChannel ¶
func NewChannel ¶
func (*Channel) GetChannel ¶
func (Channel *Channel) GetChannel() chan mysql.EventReslut
func (*Channel) GetChannelMaxThreadNum ¶
func (*Channel) GetCountChan ¶
func (*Channel) SetChannelMaxThreadNum ¶
func (*Channel) SetFlowCountChan ¶
func (*Channel) Start ¶
func (Channel *Channel) Start() chan mysql.EventReslut
type DbListStruct ¶
type DbListStruct struct {
Name string
ConnectUri string
ConnStatus string //close,stop,starting,running
ConnErr string
ChannelCount int
LastChannelID int
TableCount int
BinlogDumpFileName string
BinlogDumpPosition uint32
MaxBinlogDumpFileName string
MaxBinlogDumpPosition uint32
ReplicateDoDb map[string]uint8
ServerId uint32
AddTime int64
}
type TmpPositioinStruct ¶
type ToServer ¶
type ToServer struct {
sync.RWMutex
ToServerID int
PluginName string
MustBeSuccess bool
FilterQuery bool
FilterUpdate bool
FieldList []string
ToServerKey string
BinlogFileNum int
BinlogPosition uint32
PluginParam map[string]interface{}
Status string
ToServerChan *ToServerChan `json:"-"`
Error string
ErrorWaitDeal int
ErrorWaitData interface{}
PluginConn driver.ConnFun `json:"-"`
PluginConnKey string `json:"-"`
PluginParamObj interface{} `json:"-"`
LastBinlogFileNum int // 由 channel 提交到 ToServerChan 的最后一个位点
LastBinlogPosition uint32 // 假如 BinlogFileNum == LastBinlogFileNum && BinlogPosition == LastBinlogPosition 则说明这个位点是没有问题的
LastBinlogKey []byte `json:"-"` // 将数据保存到 level 的key
QueueMsgCount uint32 // 队列里的堆积的数量
}
func (*ToServer) AddWaitError ¶
func (*ToServer) ConsumeToServer ¶
func (*ToServer) DealWaitError ¶
func (*ToServer) DelWaitError ¶
func (*ToServer) GetWaitErrorDeal ¶
type ToServerChan ¶
type ToServerChan struct {
To chan *pluginDriver.PluginDataType
}