Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // TOAddRule 添加规则 TOAddRule = 3 // TODelRule 删除规则 TODelRule = 4 // RuleInitial 规则初始化,可以直接加载到内存中 RuleInitial = 0 )
View Source
var FusionRules = map[string][]Rule{}
FusionRules 解析规则
View Source
var KafkaKeyMap = map[string]string{}
KafkaKeyMap key: db.tb v: mysql中kafka_key_columns, 针对多partition场景,将column的值写到topic的key
View Source
var KafkaTopicMap = map[string]string{}
KafkaTopicMap key: db.tb v: column_name 针对多topic场景, 表示此表需要多topic
View Source
var KafkaTopicRuleMap = map[string]string{}
KafkaTopicRuleMap key: db.tb.column_value v: topic 针对多topic场景,不同的column路由到不同的topic
View Source
var NeedKafka = 0
NeedKafka 表示是否将解析数据发送到kafka, 如果需要发送的kafka, 不对rule 规则进行校验,
Functions ¶
func GetCanalFilterTables ¶
GetCanalFilterTables 组装需要同步表的信息
func MonitorRuleChange ¶
func MonitorRuleChange(cfg *RuleConfig, toKafka int) (bool, string, string)
MonitorRuleChange 监听规则变化
func ResetOperationColumn ¶
func ResetOperationColumn(cfg *RuleConfig) error
ResetOperationColumn 将operation 字段置成 initial state
Types ¶
type KafkaTopicRule ¶
type KafkaTopicRule struct {
ColumnName string `json:"column_name"`
TopicRule interface{} `json:"topic_rule"`
}
KafkaTopicRule 规则表中KafkaTopicRule
type Rule ¶
type Rule struct {
ID int `db:"id"`
SID int `db:"sid"`
DB string `db:"db"`
TB string `db:"tb"`
RefreshType int `db:"refresh_type"`
ISDump int `db:"is_dump"`
ISCompress int `db:"is_compress"`
CompressLevel int `db:"compress_level"`
CompressColumns string `db:"compress_columns"`
ExpireTime int `db:"expire_time"`
ExpireRange int `db:"expire_range"`
ISFullColumn int `db:"is_full_column"`
ReplicColumns string `db:"replic_columns"`
FilterRule string `db:"filter_rule"`
RedisKeyPrefix string `db:"redis_key_prefix"`
RedisKeyColumns string `db:"redis_key_columns"`
RedisKeyType string `db:"redis_key_type"`
ZSetColumn string `db:"redis_zset_score_column"`
OP int `db:"operation"`
KafkaKeyColumns string `db:"kafka_key_columns"`
KafkaTopicRule string `db:"kafka_topic_rule"`
DelayDel int `db:"delay_del"`
DelayExpireTime int `db:"delay_expire_time"`
KafkaISFullColumn int `db:"kafka_is_full_column"`
KafkaReplicColumns string `db:"kafka_replic_columns"`
}
Rule 配置的同步规则
Click to show internal directories.
Click to hide internal directories.