Documentation
¶
Overview ¶
Package ruleEngine 提供规则引擎配置结构定义
该包定义了规则引擎的配置数据结构,包括: 1. Config - 规则配置结构体,定义数据转换的所有参数 2. EsMapping - Elasticsearch映射配置
配置功能: - 支持多种字段名转换规则(下划线转驼峰、大小写转换等) - 支持字段过滤(包含/排除指定字段) - 支持自定义列映射和默认值 - 支持Lua脚本进行复杂数据处理 - 支持多种目标系统的配置(Redis、MongoDB、Elasticsearch、Kafka等) - 支持日期时间格式化 - 支持审计功能开关
使用场景: - 配置MySQL表数据到目标系统的转换规则 - 定义字段映射和数据类型转换 - 配置目标系统的存储结构
Package ruleEngine 提供了规则引擎功能,用于管理和执行数据处理规则 ¶
该包的主要功能包括: 1. 规则引擎的初始化和管理 2. 规则的加载、存储和更新 3. 表结构的获取和缓存 4. 行数据的规则化处理和转换 5. Lua脚本的编译和执行
规则引擎是整个系统的核心组件之一,负责将MySQL binlog事件转换为目标系统所需的格式。 它支持多种数据转换方式,包括字段名转换、数据类型转换和自定义Lua脚本处理。
主要特性: - 支持多种字段名转换规则(驼峰命名、大小写转换等) - 支持自定义Lua脚本进行复杂数据处理 - 表结构自动获取和缓存 - 规则的动态加载和更新 - 高效的规则匹配和执行
Index ¶
- Constants
- type Config
- type EsMapping
- type Rule
- type RuleEngine
- func (r *RuleEngine) GetMySQLClient() interface{}
- func (r *RuleEngine) GetRule(schema, table string) (*Rule, bool)
- func (r *RuleEngine) HasRule(dbSchema, table string) bool
- func (r *RuleEngine) Initialize(ctx context.Context, config *canal.Config, rules []Config) error
- func (r *RuleEngine) ProcessRow(ctx context.Context, ruleKey string, action string, row interface{}) (interface{}, error)
- func (r *RuleEngine) UpdateTableInfo(ctx context.Context, dbSchema, table string) error
Constants ¶
const (
// DefaultMaxTableInfoCacheSize 默认表结构缓存最大数量
DefaultMaxTableInfoCacheSize = 100
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// ========== 基础配置 ==========
// Schema 数据库名称
// 说明:规则对应的源数据库名
Schema string `yaml:"schema"`
// Table 表名称
// 说明:规则对应的源表名
Table string `yaml:"table"`
// OrderByColumn 排序字段
// 说明:用于数据排序的字段名,某些场景下需要按特定顺序处理数据
OrderByColumn string `yaml:"order_by_column"`
// ========== 字段名转换配置 ==========
// ColumnLowerCase 字段名转换为小写
// 说明:如果为true,所有字段名转换为小写
// 优先级:低于ColumnUnderscoreToCamel
ColumnLowerCase bool `yaml:"column_lower_case"`
// ColumnUpperCase 字段名转换为大写
// 说明:如果为true,所有字段名转换为大写
// 优先级:低于ColumnUnderscoreToCamel
ColumnUpperCase bool `yaml:"column_upper_case"`
// ColumnUnderscoreToCamel 下划线命名转驼峰命名
// 说明:如果为true,将下划线命名转换为驼峰命名(如 user_name -> userName)
// 优先级:最高,优先于ColumnLowerCase和ColumnUpperCase
ColumnUnderscoreToCamel bool `yaml:"column_underscore_to_camel"`
// ========== 字段过滤配置 ==========
// IncludeColumns 包含的字段列表
// 说明:逗号分隔的字段名列表,只有这些字段会被处理
// 空字符串表示不过滤
// 示例:"id,name,age"
IncludeColumns string `yaml:"include_columns"`
// ExcludeColumns 排除的字段列表
// 说明:逗号分隔的字段名列表,这些字段不会被处理
// 空字符串表示不排除
// 示例:"password,secret"
ExcludeColumns string `yaml:"exclude_columns"`
// ========== 字段映射配置 ==========
// ColumnMappings 列映射配置
// 说明:逗号分隔的列映射配置,格式为"原列名:新列名"
// 用于字段重命名
// 示例:"user_name:userName,create_time:createTime"
ColumnMappings string `yaml:"column_mappings"`
// DefaultColumnValues 默认列值
// 说明:逗号分隔的默认值配置,格式为"列名:默认值"
// 当字段值为NULL时使用默认值
// 示例:"status:0,deleted:false"
DefaultColumnValues string `yaml:"default_column_values"`
// ========== 数据编码配置 ==========
// ValueEncoder 值编码器
// 说明:指定值的编码方式,如base64、hex等
// 空字符串表示不编码
ValueEncoder string `yaml:"value_encoder"`
// ValueFormatter 值格式化器
// 说明:指定值的格式化方式,用于特殊格式转换
// 空字符串表示不格式化
ValueFormatter string `yaml:"value_formatter"`
// ========== Lua脚本配置 ==========
// LuaScript Lua脚本内容
// 说明:直接指定Lua脚本内容,用于复杂数据处理
// 与LuaFilePath二选一,LuaScript优先级更高
// 脚本可用变量:result(处理后的数据)、action(操作类型)
// 示例:"result.transformed_field = result.original_field * 2"
LuaScript string `yaml:"lua_script"`
// LuaFilePath Lua脚本文件路径
// 说明:指定Lua脚本文件路径,用于复杂数据处理
// 与LuaScript二选一,LuaScript优先级更高
// 相对路径或绝对路径均可
// 示例:"./scripts/transform.lua"
LuaFilePath string `yaml:"lua_file_path"`
// ========== 日期时间配置 ==========
// DateFormatter 日期格式化字符串
// 说明:指定日期类型的格式化格式
// 格式参考Go的time包格式,如"2006-01-02"
// 示例:"2006-01-02"
DateFormatter string `yaml:"date_formatter"`
// DatetimeFormatter 日期时间格式化字符串
// 说明:指定日期时间类型的格式化格式
// 格式参考Go的time包格式,如"2006-01-02 15:04:05"
// 示例:"2006-01-02 15:04:05"
DatetimeFormatter string `yaml:"datetime_formatter"`
// ========== 其他配置 ==========
// ReserveRawData 保留原始数据
// 说明:如果为true,在结果中保留原始数据字段
// 用于调试和问题排查
ReserveRawData bool `yaml:"reserve_raw_data"`
// Priority 优先级
// 说明:规则的优先级,范围1-10,数值越大优先级越高
// 用于多规则冲突时的决策
// 默认值:1
Priority int `yaml:"priority"`
// ========== Redis目标系统配置 ==========
// RedisStructure Redis数据结构类型
// 说明:指定数据在Redis中的存储结构
// 可选值:string(字符串)、hash(哈希)、list(列表)、set(集合)、sortedset(有序集合)
// 示例:"hash"
RedisStructure string `yaml:"redis_structure"`
// RedisKeyPrefix Redis键前缀
// 说明:Redis键名的前缀,用于区分不同类型的数据
// 示例:"user:"
RedisKeyPrefix string `yaml:"redis_key_prefix"`
// RedisKeyColumn Redis键列名
// 说明:指定表中的哪个字段作为Redis键
// 示例:"id"
RedisKeyColumn string `yaml:"redis_key_column"`
// RedisKeyFormatter Redis键格式化字符串
// 说明:Redis键的格式化模板,支持占位符
// 占位符:{value}表示字段值,{key}表示列名
// 示例:"user:{value}"
RedisKeyFormatter string `yaml:"redis_key_formatter"`
// RedisKeyValue Redis键值
// 说明:固定Redis键值,用于特殊场景
// 示例:"fixed_key"
RedisKeyValue string `yaml:"redis_key_value"`
// RedisHashFieldPrefix Redis哈希字段前缀
// 说明:Redis哈希结构中字段名的前缀
// 示例:"field:"
RedisHashFieldPrefix string `yaml:"redis_hash_field_prefix"`
// RedisHashFieldColumn Redis哈希字段列名
// 说明:指定表中的哪个字段作为Redis哈希字段名
// 示例:"field_name"
RedisHashFieldColumn string `yaml:"redis_hash_field_column"`
// RedisSortedSetScoreColumn Redis有序集合分数列名
// 说明:指定表中的哪个字段作为Redis有序集合的分数
// 示例:"score"
RedisSortedSetScoreColumn string `yaml:"redis_sorted_set_score_column"`
// ========== MongoDB目标系统配置 ==========
// MongodbDatabase MongoDB数据库名
// 说明:MongoDB数据库名称
// 示例:"test_db"
MongodbDatabase string `yaml:"mongodb_database"`
// MongodbCollection MongoDB集合名
// 说明:MongoDB集合名称,类似关系型数据库的表名
// 示例:"users"
MongodbCollection string `yaml:"mongodb_collection"`
// ========== Elasticsearch目标系统配置 ==========
// EsIndex Elasticsearch索引名
// 说明:Elasticsearch索引名称
// 示例:"users-index"
EsIndex string `yaml:"es_index"`
// EsType Elasticsearch类型名
// 说明:Elasticsearch文档类型名(ES 7.x+已废弃,但保留用于兼容性)
// 示例:"_doc"
EsType string `yaml:"es_type"`
// EsMappings Elasticsearch字段映射配置
// 说明:定义字段到Elasticsearch的映射关系
// 用于配置字段类型、分析器、格式等
EsMappings []EsMapping `yaml:"es_mappings"`
// ========== Kafka目标系统配置 ==========
// KafkaTopic Kafka主题名
// 说明:Kafka的主题名称
// 示例:"users-topic"
KafkaTopic string `yaml:"kafka_topic"`
// ========== RocketMQ目标系统配置 ==========
// RocketmqTopic RocketMQ主题名
// 说明:RocketMQ的主题名称
// 示例:"users-topic"
RocketmqTopic string `yaml:"rocketmq_topic"`
// ========== RabbitMQ目标系统配置 ==========
// RabbitmqQueue RabbitMQ队列名
// 说明:RabbitMQ的队列名称
// 示例:"users-queue"
RabbitmqQueue string `yaml:"rabbitmq_queue"`
// ========== 审计配置 ==========
// AuditEnabled 是否启用该表的审计功能
// 说明:如果为true,该表的数据变更会被记录到审计日志
// 默认值:false
AuditEnabled bool `yaml:"audit_enabled"`
}
Config 规则配置结构体
该结构体定义了单个规则的所有配置项,控制数据从MySQL到目标系统的转换过程。 一个规则对应一个源表,定义了如何将该表的数据转换为目标系统所需的格式。
配置分类: 1. 基础配置:Schema、Table、Priority 2. 字段转换配置:ColumnLowerCase、ColumnUpperCase、ColumnUnderscoreToCamel 3. 字段过滤配置:IncludeColumns、ExcludeColumns 4. 字段映射配置:ColumnMappings、DefaultColumnValues 5. 数据编码配置:ValueEncoder、ValueFormatter 6. Lua脚本配置:LuaScript、LuaFilePath 7. 日期时间配置:DateFormatter、DatetimeFormatter 8. 目标系统配置:Redis、MongoDB、Elasticsearch、Kafka、RocketMQ、RabbitMQ 9. 审计配置:AuditEnabled
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig 创建默认规则配置
该函数返回一个包含默认值的规则配置实例。
返回值:
*Config - 默认规则配置,优先级默认为1
默认值:
- Priority: 1
func (*Config) SetDefaults ¶
func (c *Config) SetDefaults()
SetDefaults 设置默认值
该方法为规则配置设置默认值,确保配置的完整性。
处理流程:
- 检查Priority是否为0,如果为0则设置为1
注意:
- 该方法不会覆盖已设置的非零值
type EsMapping ¶
type EsMapping struct {
// Column 数据库列名
// 说明:源数据库表中的列名
// 示例:"user_name"
Column string `yaml:"column"`
// Field Elasticsearch字段名
// 说明:在Elasticsearch中的字段名,可以与列名不同
// 示例:"userName"
Field string `yaml:"field"`
// Type 字段类型
// 说明:Elasticsearch字段类型,如text、keyword、integer、date、boolean等
// 示例:"text"
Type string `yaml:"type"`
// Analyzer 分词器
// 说明:全文检索字段使用的分词器,仅text类型有效
// 示例:"ik_max_word"、"standard"
Analyzer string `yaml:"analyzer"`
// Format 格式
// 说明:字段的格式,主要用于日期类型字段
// 示例:"yyyy-MM-dd HH:mm:ss"
Format string `yaml:"format"`
}
EsMapping Elasticsearch字段映射配置
该结构体定义了单个字段的Elasticsearch映射配置,用于指定字段在Elasticsearch中的类型和分析方式。
使用场景: - 指定字段的映射类型 - 配置全文检索的分词器 - 指定日期字段的格式
type Rule ¶
type Rule struct {
// Config 规则配置
Config *Config
// TableInfo 表结构信息
TableInfo *schema.Table
// PaddingMap 填充映射
PaddingMap map[string]interface{}
// LuaProto Lua脚本原型
LuaProto *lua.FunctionProto
// ColumnNameMap 列名映射,key为列名,value为列信息
ColumnNameMap map[string]*schema.TableColumn
}
Rule 规则结构体,包含规则配置和相关资源
该结构体包含了单个规则的所有信息,包括配置、表结构、填充映射和Lua脚本原型。
type RuleEngine ¶
type RuleEngine struct {
// contains filtered or unexported fields
}
RuleEngine 规则引擎,负责管理和执行数据处理规则
该结构体负责加载、管理和执行数据处理规则,包括: 1. 初始化MySQL连接,用于获取表结构信息 2. 加载和管理规则配置 3. 处理行数据转换 4. 执行Lua脚本进行数据处理
func NewRuleEngine ¶
func NewRuleEngine() *RuleEngine
NewRuleEngine 创建规则引擎实例
该函数创建并返回一个新的规则引擎实例,初始化规则映射。
返回值:
*RuleEngine - 新创建的规则引擎实例
func (*RuleEngine) GetMySQLClient ¶
func (r *RuleEngine) GetMySQLClient() interface{}
GetMySQLClient 获取 MySQL 客户端(用于 Lua 引擎依赖注入)
该方法暴露 MySQL 客户端,以便在 SDK 层初始化 Lua 状态池时注入依赖。
返回值:
- interface{}: MySQL 客户端连接
func (*RuleEngine) GetRule ¶
func (r *RuleEngine) GetRule(schema, table string) (*Rule, bool)
GetRule 根据数据库和表名获取规则
该方法根据数据库名称和表名获取对应的规则实例。
参数:
schema - 数据库名称 table - 表名称
返回值:
*Rule - 规则实例,如果不存在则返回nil bool - 是否存在对应的规则,true表示存在,false表示不存在
func (*RuleEngine) HasRule ¶
func (r *RuleEngine) HasRule(dbSchema, table string) bool
HasRule 检查是否存在指定表的规则
该方法检查规则引擎中是否存在指定表的规则配置。
参数:
dbSchema - 数据库名称 table - 表名
返回值:
bool - 如果存在规则返回true,否则返回false
func (*RuleEngine) Initialize ¶
Initialize 初始化规则引擎
该方法初始化规则引擎,包括: 1. 初始化MySQL连接,用于获取表结构信息 2. 从配置中加载所有规则 3. 编译规则中的Lua脚本 4. 获取并缓存表结构信息
参数:
ctx - 上下文,用于控制初始化过程 config - Canal配置对象,包含MySQL连接信息 rules - 规则配置列表
返回值:
error - 初始化过程中发生的错误,nil表示成功
func (*RuleEngine) ProcessRow ¶
func (r *RuleEngine) ProcessRow(ctx context.Context, ruleKey string, action string, row interface{}) (interface{}, error)
ProcessRow 处理行数据,根据规则进行转换和处理
该方法是规则引擎的核心方法,负责根据规则处理行数据: 1. 查找对应的规则 2. 根据规则配置转换字段名(如驼峰命名、大小写转换等) 3. 转换列数据类型 4. 执行Lua脚本(如果有)进行进一步处理
参数:
ctx - 上下文,用于日志记录
ruleKey - 规则键,格式为"schema:table"
action - 操作类型,如"insert"、"update"、"delete"
row - 行数据,可以是[]interface{}或map[string]interface{}
返回值:
interface{} - 处理后的数据,通常是map[string]interface{}
error - 处理过程中发生的错误,nil表示成功
func (*RuleEngine) UpdateTableInfo ¶
func (r *RuleEngine) UpdateTableInfo(ctx context.Context, dbSchema, table string) error
UpdateTableInfo 更新表结构信息,从数据库重新获取并更新缓存
该方法用于更新表结构信息,通常在表结构发生变化时调用。 它会从数据库重新获取表结构,并更新缓存和相关规则。
参数:
ctx - 上下文,用于日志记录 dbSchema - 数据库名称 table - 表名称
返回值:
error - 更新过程中发生的错误,nil表示成功