Documentation
¶
Overview ¶
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright [2018] [jc3wish]
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Variables
- func AddNewDB(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, ...) *db
- func AddTable(db string, schema string, tableName string, channelId int) error
- func AddTableToServer(db string, schemaName string, tableName string, ToServerInfo ToServer) error
- func Close()
- func CompareBinlogPositionAndReturnGreater(BinlogFileNum1 int, BinlogPosition1 uint32, BinlogFileNum2 int, ...) (int, uint32)
- func CompareBinlogPositionAndReturnLess(BinlogFileNum1 int, BinlogPosition1 uint32, BinlogFileNum2 int, ...) (int, uint32)
- func DelChannel(name string, channelID int) bool
- func DelDB(Name string) bool
- func DelTable(db string, schema string, tableName string) error
- func DoRecoveryByBackupData(fileContent string)
- func DoRecoverySnapshotData()
- func DoSaveSnapshotData()
- func GetDBObj(Name string) *db
- func GetFileQueue(dbName, SchemaName, tableName, ToServerID string) string
- func GetListDb() map[string]DbListStruct
- func GetSchemaAndTableBySplit(schemaAndTableName string) (schemaName, tableName string)
- func GetSchemaAndTableJoin(schema, tableName string) string
- func GetSnapshotData() ([]byte, error)
- func GetSnapshotData2() ([]byte, error)
- func InitStorage()
- func NewConsumeChannel(c *Channel) *consume_channel_obj
- func NewDb(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, ...) *db
- func Recovery(content *json.RawMessage, isStop bool)
- func SaveDBConfigInfo()
- func SaveDBInfoToFileData() interface{}
- func StopAllChannel()
- func UpdateDB(Name string, ConnectUri string, binlogFileName string, binlogPostion uint32, ...) error
- type Channel
- func (Channel *Channel) Close()
- func (Channel *Channel) GetChannel() chan mysql.EventReslut
- func (This *Channel) GetChannelMaxThreadNum() int
- func (Channel *Channel) GetCountChan() chan *count.FlowCount
- func (This *Channel) SetChannelMaxThreadNum(n int)
- func (Channel *Channel) SetFlowCountChan(flowChan chan *count.FlowCount)
- func (Channel *Channel) Start() chan mysql.EventReslut
- func (Channel *Channel) Stop()
- type DbListStruct
- type Table
- type TmpPositioinStruct
- type ToServer
- func (This *ToServer) AddWaitError(WaitErr error, WaitData interface{}) bool
- func (This *ToServer) AppendToFileQueue(data *pluginDriver.PluginDataType) error
- func (This *ToServer) ConsumeToServer(db *db, SchemaName string, TableName string)
- func (This *ToServer) DealWaitError() bool
- func (This *ToServer) DelWaitError() bool
- func (This *ToServer) FileQueueStart() error
- func (This *ToServer) GetFileQueueInfo() (info filequeue.QueueInfo, err error)
- func (This *ToServer) GetWaitErrorDeal() int
- func (This *ToServer) InitFileQueue(dbName, SchemaName, tableName string) *ToServer
- func (This *ToServer) PopFileQueue() (*pluginDriver.PluginDataType, error)
- func (This *ToServer) ReadLastFromFileQueue() (*pluginDriver.PluginDataType, error)
- func (This *ToServer) UpdateBinlogPosition(BinlogFileNum int, BinlogPosition uint32) bool
- type ToServerChan
Constants ¶
This section is empty.
Variables ¶
var AllSchemaAndTablekey string = GetSchemaAndTableJoin("*", "*")
var DbList map[string]*db
var DbLock sync.Mutex
var TmpPositioin []*TmpPositioinStruct
Functions ¶
func AddTableToServer ¶
func DelChannel ¶
func DoRecoveryByBackupData ¶ added in v1.2.2
func DoRecoveryByBackupData(fileContent string)
func DoRecoverySnapshotData ¶ added in v1.2.2
func DoRecoverySnapshotData()
func DoSaveSnapshotData ¶ added in v1.2.2
func DoSaveSnapshotData()
func GetFileQueue ¶ added in v1.2.2
func GetListDb ¶
func GetListDb() map[string]DbListStruct
func GetSchemaAndTableJoin ¶
func GetSnapshotData ¶ added in v1.2.2
func InitStorage ¶
func InitStorage()
func NewConsumeChannel ¶
func NewConsumeChannel(c *Channel) *consume_channel_obj
func Recovery ¶
func Recovery(content *json.RawMessage, isStop bool)
func SaveDBConfigInfo ¶
func SaveDBConfigInfo()
func SaveDBInfoToFileData ¶
func SaveDBInfoToFileData() interface{}
func StopAllChannel ¶
func StopAllChannel()
Types ¶
type Channel ¶
type Channel struct {
sync.Mutex
Name string
MaxThreadNum int // 消费通道的最大线程数
CurrentThreadNum int
Status string //stop ,starting,running,wait
// contains filtered or unexported fields
}
func GetChannel ¶
func NewChannel ¶
func (*Channel) GetChannel ¶
func (Channel *Channel) GetChannel() chan mysql.EventReslut
func (*Channel) GetChannelMaxThreadNum ¶
func (*Channel) GetCountChan ¶
func (*Channel) SetChannelMaxThreadNum ¶
func (*Channel) SetFlowCountChan ¶
func (*Channel) Start ¶
func (Channel *Channel) Start() chan mysql.EventReslut
type DbListStruct ¶
type DbListStruct struct {
Name string
ConnectUri string
ConnStatus string //close,stop,starting,running
ConnErr string
ChannelCount int
LastChannelID int
TableCount int
BinlogDumpFileName string
BinlogDumpPosition uint32
BinlogDumpTimestamp uint32
MaxBinlogDumpFileName string
MaxBinlogDumpPosition uint32
ReplicateDoDb map[string]uint8
ServerId uint32
AddTime int64
}
func GetDbInfo ¶ added in v1.2.2
func GetDbInfo(dbname string) *DbListStruct
type TmpPositioinStruct ¶
type ToServer ¶
type ToServer struct {
sync.RWMutex
Key *string `json:"-"` // 上一级的key
ToServerID int
PluginName string
MustBeSuccess bool
FilterQuery bool
FilterUpdate bool
FieldList []string
ToServerKey string
BinlogFileNum int
BinlogPosition uint32
PluginParam map[string]interface{}
Status string
ToServerChan *ToServerChan `json:"-"`
Error string
ErrorWaitDeal int
ErrorWaitData interface{}
LastBinlogFileNum int // 由 channel 提交到 ToServerChan 的最后一个位点
LastBinlogPosition uint32 // 假如 BinlogFileNum == LastBinlogFileNum && BinlogPosition == LastBinlogPosition 则说明这个位点是没有问题的
LastBinlogKey []byte `json:"-"` // 将数据保存到 level 的key
QueueMsgCount uint32 // 队列里的堆积的数量
FileQueueStatus bool // 是否启动文件队列
Notes string
ThreadCount int16 // 消费线程数量
FileQueueUsableCount uint32 // 在开始文件队列的配置下,每次写入 ToServerChan 后 ,在 FileQueueUsableCountTimeDiff 时间内 队列都是满的次数
FileQueueUsableCountStartTime int64 // 开始统计 FileQueueUsableCount 计算的时间
CosumerPluginParamMap map[uint16]interface{} `json:"-"` // 用以区分多个消费者的身份
CosumerIdInrc uint16 // 消费者自增id
// contains filtered or unexported fields
}
func (*ToServer) AddWaitError ¶
func (*ToServer) AppendToFileQueue ¶ added in v1.2.2
func (This *ToServer) AppendToFileQueue(data *pluginDriver.PluginDataType) error
将数据刷到磁盘队列中
func (*ToServer) ConsumeToServer ¶
func (*ToServer) DealWaitError ¶
func (*ToServer) DelWaitError ¶
func (*ToServer) FileQueueStart ¶ added in v1.2.2
文件队列启用
func (*ToServer) GetFileQueueInfo ¶ added in v1.2.2
查看文件队列基本信息
func (*ToServer) GetWaitErrorDeal ¶
func (*ToServer) InitFileQueue ¶ added in v1.2.2
初始化文件队列
func (*ToServer) PopFileQueue ¶ added in v1.2.2
func (This *ToServer) PopFileQueue() (*pluginDriver.PluginDataType, error)
从磁盘队列中取出最前面一条数据
func (*ToServer) ReadLastFromFileQueue ¶ added in v1.2.2
func (This *ToServer) ReadLastFromFileQueue() (*pluginDriver.PluginDataType, error)
从磁盘队列中取出最后面一条数据
type ToServerChan ¶
type ToServerChan struct {
To chan *pluginDriver.PluginDataType
}