ruleEngine

package
v0.0.0-...-717ec9e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package ruleEngine 提供规则引擎配置结构定义

该包定义了规则引擎的配置数据结构,包括: 1. Config - 规则配置结构体,定义数据转换的所有参数 2. EsMapping - Elasticsearch映射配置

配置功能: - 支持多种字段名转换规则(下划线转驼峰、大小写转换等) - 支持字段过滤(包含/排除指定字段) - 支持自定义列映射和默认值 - 支持Lua脚本进行复杂数据处理 - 支持多种目标系统的配置(Redis、MongoDB、Elasticsearch、Kafka等) - 支持日期时间格式化 - 支持审计功能开关

使用场景: - 配置MySQL表数据到目标系统的转换规则 - 定义字段映射和数据类型转换 - 配置目标系统的存储结构

Package ruleEngine 提供了规则引擎功能,用于管理和执行数据处理规则

该包的主要功能包括: 1. 规则引擎的初始化和管理 2. 规则的加载、存储和更新 3. 表结构的获取和缓存 4. 行数据的规则化处理和转换 5. Lua脚本的编译和执行

规则引擎是整个系统的核心组件之一,负责将MySQL binlog事件转换为目标系统所需的格式。 它支持多种数据转换方式,包括字段名转换、数据类型转换和自定义Lua脚本处理。

主要特性: - 支持多种字段名转换规则(驼峰命名、大小写转换等) - 支持自定义Lua脚本进行复杂数据处理 - 表结构自动获取和缓存 - 规则的动态加载和更新 - 高效的规则匹配和执行

Index

Constants

View Source
const (
	// DefaultMaxTableInfoCacheSize 默认表结构缓存最大数量
	DefaultMaxTableInfoCacheSize = 100
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// ========== 基础配置 ==========
	// Schema 数据库名称
	// 说明:规则对应的源数据库名
	Schema string `yaml:"schema"`

	// Table 表名称
	// 说明:规则对应的源表名
	Table string `yaml:"table"`

	// OrderByColumn 排序字段
	// 说明:用于数据排序的字段名,某些场景下需要按特定顺序处理数据
	OrderByColumn string `yaml:"order_by_column"`

	// ========== 字段名转换配置 ==========
	// ColumnLowerCase 字段名转换为小写
	// 说明:如果为true,所有字段名转换为小写
	// 优先级:低于ColumnUnderscoreToCamel
	ColumnLowerCase bool `yaml:"column_lower_case"`

	// ColumnUpperCase 字段名转换为大写
	// 说明:如果为true,所有字段名转换为大写
	// 优先级:低于ColumnUnderscoreToCamel
	ColumnUpperCase bool `yaml:"column_upper_case"`

	// ColumnUnderscoreToCamel 下划线命名转驼峰命名
	// 说明:如果为true,将下划线命名转换为驼峰命名(如 user_name -> userName)
	// 优先级:最高,优先于ColumnLowerCase和ColumnUpperCase
	ColumnUnderscoreToCamel bool `yaml:"column_underscore_to_camel"`

	// ========== 字段过滤配置 ==========
	// IncludeColumns 包含的字段列表
	// 说明:逗号分隔的字段名列表,只有这些字段会被处理
	// 空字符串表示不过滤
	// 示例:"id,name,age"
	IncludeColumns string `yaml:"include_columns"`

	// ExcludeColumns 排除的字段列表
	// 说明:逗号分隔的字段名列表,这些字段不会被处理
	// 空字符串表示不排除
	// 示例:"password,secret"
	ExcludeColumns string `yaml:"exclude_columns"`

	// ========== 字段映射配置 ==========
	// ColumnMappings 列映射配置
	// 说明:逗号分隔的列映射配置,格式为"原列名:新列名"
	// 用于字段重命名
	// 示例:"user_name:userName,create_time:createTime"
	ColumnMappings string `yaml:"column_mappings"`

	// DefaultColumnValues 默认列值
	// 说明:逗号分隔的默认值配置,格式为"列名:默认值"
	// 当字段值为NULL时使用默认值
	// 示例:"status:0,deleted:false"
	DefaultColumnValues string `yaml:"default_column_values"`

	// ========== 数据编码配置 ==========
	// ValueEncoder 值编码器
	// 说明:指定值的编码方式,如base64、hex等
	// 空字符串表示不编码
	ValueEncoder string `yaml:"value_encoder"`

	// ValueFormatter 值格式化器
	// 说明:指定值的格式化方式,用于特殊格式转换
	// 空字符串表示不格式化
	ValueFormatter string `yaml:"value_formatter"`

	// ========== Lua脚本配置 ==========
	// LuaScript Lua脚本内容
	// 说明:直接指定Lua脚本内容,用于复杂数据处理
	// 与LuaFilePath二选一,LuaScript优先级更高
	// 脚本可用变量:result(处理后的数据)、action(操作类型)
	// 示例:"result.transformed_field = result.original_field * 2"
	LuaScript string `yaml:"lua_script"`

	// LuaFilePath Lua脚本文件路径
	// 说明:指定Lua脚本文件路径,用于复杂数据处理
	// 与LuaScript二选一,LuaScript优先级更高
	// 相对路径或绝对路径均可
	// 示例:"./scripts/transform.lua"
	LuaFilePath string `yaml:"lua_file_path"`

	// ========== 日期时间配置 ==========
	// DateFormatter 日期格式化字符串
	// 说明:指定日期类型的格式化格式
	// 格式参考Go的time包格式,如"2006-01-02"
	// 示例:"2006-01-02"
	DateFormatter string `yaml:"date_formatter"`

	// DatetimeFormatter 日期时间格式化字符串
	// 说明:指定日期时间类型的格式化格式
	// 格式参考Go的time包格式,如"2006-01-02 15:04:05"
	// 示例:"2006-01-02 15:04:05"
	DatetimeFormatter string `yaml:"datetime_formatter"`

	// ========== 其他配置 ==========
	// ReserveRawData 保留原始数据
	// 说明:如果为true,在结果中保留原始数据字段
	// 用于调试和问题排查
	ReserveRawData bool `yaml:"reserve_raw_data"`

	// Priority 优先级
	// 说明:规则的优先级,范围1-10,数值越大优先级越高
	// 用于多规则冲突时的决策
	// 默认值:1
	Priority int `yaml:"priority"`

	// ========== Redis目标系统配置 ==========
	// RedisStructure Redis数据结构类型
	// 说明:指定数据在Redis中的存储结构
	// 可选值:string(字符串)、hash(哈希)、list(列表)、set(集合)、sortedset(有序集合)
	// 示例:"hash"
	RedisStructure string `yaml:"redis_structure"`

	// RedisKeyPrefix Redis键前缀
	// 说明:Redis键名的前缀,用于区分不同类型的数据
	// 示例:"user:"
	RedisKeyPrefix string `yaml:"redis_key_prefix"`

	// RedisKeyColumn Redis键列名
	// 说明:指定表中的哪个字段作为Redis键
	// 示例:"id"
	RedisKeyColumn string `yaml:"redis_key_column"`

	// RedisKeyFormatter Redis键格式化字符串
	// 说明:Redis键的格式化模板,支持占位符
	// 占位符:{value}表示字段值,{key}表示列名
	// 示例:"user:{value}"
	RedisKeyFormatter string `yaml:"redis_key_formatter"`

	// RedisKeyValue Redis键值
	// 说明:固定Redis键值,用于特殊场景
	// 示例:"fixed_key"
	RedisKeyValue string `yaml:"redis_key_value"`

	// RedisHashFieldPrefix Redis哈希字段前缀
	// 说明:Redis哈希结构中字段名的前缀
	// 示例:"field:"
	RedisHashFieldPrefix string `yaml:"redis_hash_field_prefix"`

	// RedisHashFieldColumn Redis哈希字段列名
	// 说明:指定表中的哪个字段作为Redis哈希字段名
	// 示例:"field_name"
	RedisHashFieldColumn string `yaml:"redis_hash_field_column"`

	// RedisSortedSetScoreColumn Redis有序集合分数列名
	// 说明:指定表中的哪个字段作为Redis有序集合的分数
	// 示例:"score"
	RedisSortedSetScoreColumn string `yaml:"redis_sorted_set_score_column"`

	// ========== MongoDB目标系统配置 ==========
	// MongodbDatabase MongoDB数据库名
	// 说明:MongoDB数据库名称
	// 示例:"test_db"
	MongodbDatabase string `yaml:"mongodb_database"`

	// MongodbCollection MongoDB集合名
	// 说明:MongoDB集合名称,类似关系型数据库的表名
	// 示例:"users"
	MongodbCollection string `yaml:"mongodb_collection"`

	// ========== Elasticsearch目标系统配置 ==========
	// EsIndex Elasticsearch索引名
	// 说明:Elasticsearch索引名称
	// 示例:"users-index"
	EsIndex string `yaml:"es_index"`

	// EsType Elasticsearch类型名
	// 说明:Elasticsearch文档类型名(ES 7.x+已废弃,但保留用于兼容性)
	// 示例:"_doc"
	EsType string `yaml:"es_type"`

	// EsMappings Elasticsearch字段映射配置
	// 说明:定义字段到Elasticsearch的映射关系
	// 用于配置字段类型、分析器、格式等
	EsMappings []EsMapping `yaml:"es_mappings"`

	// ========== Kafka目标系统配置 ==========
	// KafkaTopic Kafka主题名
	// 说明:Kafka的主题名称
	// 示例:"users-topic"
	KafkaTopic string `yaml:"kafka_topic"`

	// ========== RocketMQ目标系统配置 ==========
	// RocketmqTopic RocketMQ主题名
	// 说明:RocketMQ的主题名称
	// 示例:"users-topic"
	RocketmqTopic string `yaml:"rocketmq_topic"`

	// ========== RabbitMQ目标系统配置 ==========
	// RabbitmqQueue RabbitMQ队列名
	// 说明:RabbitMQ的队列名称
	// 示例:"users-queue"
	RabbitmqQueue string `yaml:"rabbitmq_queue"`

	// ========== 审计配置 ==========
	// AuditEnabled 是否启用该表的审计功能
	// 说明:如果为true,该表的数据变更会被记录到审计日志
	// 默认值:false
	AuditEnabled bool `yaml:"audit_enabled"`
}

Config 规则配置结构体

该结构体定义了单个规则的所有配置项,控制数据从MySQL到目标系统的转换过程。 一个规则对应一个源表,定义了如何将该表的数据转换为目标系统所需的格式。

配置分类: 1. 基础配置:Schema、Table、Priority 2. 字段转换配置:ColumnLowerCase、ColumnUpperCase、ColumnUnderscoreToCamel 3. 字段过滤配置:IncludeColumns、ExcludeColumns 4. 字段映射配置:ColumnMappings、DefaultColumnValues 5. 数据编码配置:ValueEncoder、ValueFormatter 6. Lua脚本配置:LuaScript、LuaFilePath 7. 日期时间配置:DateFormatter、DatetimeFormatter 8. 目标系统配置:Redis、MongoDB、Elasticsearch、Kafka、RocketMQ、RabbitMQ 9. 审计配置:AuditEnabled

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig 创建默认规则配置

该函数返回一个包含默认值的规则配置实例。

返回值:

*Config - 默认规则配置,优先级默认为1

默认值:

  • Priority: 1

func (*Config) SetDefaults

func (c *Config) SetDefaults()

SetDefaults 设置默认值

该方法为规则配置设置默认值,确保配置的完整性。

处理流程:

  1. 检查Priority是否为0,如果为0则设置为1

注意:

  • 该方法不会覆盖已设置的非零值

func (*Config) Validate

func (c *Config) Validate() error

Validate 验证规则配置

该方法验证规则配置的有效性,确保必要字段已正确设置。 验证失败时会返回具体的错误信息。

返回值:

error - 验证失败时返回错误,nil表示验证通过

验证项:

  1. 数据库名称不能为空
  2. 表名称不能为空
  3. 优先级必须在1-10之间

type EsMapping

type EsMapping struct {
	// Column 数据库列名
	// 说明:源数据库表中的列名
	// 示例:"user_name"
	Column string `yaml:"column"`

	// Field Elasticsearch字段名
	// 说明:在Elasticsearch中的字段名,可以与列名不同
	// 示例:"userName"
	Field string `yaml:"field"`

	// Type 字段类型
	// 说明:Elasticsearch字段类型,如text、keyword、integer、date、boolean等
	// 示例:"text"
	Type string `yaml:"type"`

	// Analyzer 分词器
	// 说明:全文检索字段使用的分词器,仅text类型有效
	// 示例:"ik_max_word"、"standard"
	Analyzer string `yaml:"analyzer"`

	// Format 格式
	// 说明:字段的格式,主要用于日期类型字段
	// 示例:"yyyy-MM-dd HH:mm:ss"
	Format string `yaml:"format"`
}

EsMapping Elasticsearch字段映射配置

该结构体定义了单个字段的Elasticsearch映射配置,用于指定字段在Elasticsearch中的类型和分析方式。

使用场景: - 指定字段的映射类型 - 配置全文检索的分词器 - 指定日期字段的格式

type Rule

type Rule struct {
	// Config 规则配置
	Config *Config

	// TableInfo 表结构信息
	TableInfo *schema.Table

	// PaddingMap 填充映射
	PaddingMap map[string]interface{}

	// LuaProto Lua脚本原型
	LuaProto *lua.FunctionProto

	// ColumnNameMap 列名映射,key为列名,value为列信息
	ColumnNameMap map[string]*schema.TableColumn
}

Rule 规则结构体,包含规则配置和相关资源

该结构体包含了单个规则的所有信息,包括配置、表结构、填充映射和Lua脚本原型。

type RuleEngine

type RuleEngine struct {
	// contains filtered or unexported fields
}

RuleEngine 规则引擎,负责管理和执行数据处理规则

该结构体负责加载、管理和执行数据处理规则,包括: 1. 初始化MySQL连接,用于获取表结构信息 2. 加载和管理规则配置 3. 处理行数据转换 4. 执行Lua脚本进行数据处理

func NewRuleEngine

func NewRuleEngine() *RuleEngine

NewRuleEngine 创建规则引擎实例

该函数创建并返回一个新的规则引擎实例,初始化规则映射。

返回值:

*RuleEngine - 新创建的规则引擎实例

func (*RuleEngine) GetMySQLClient

func (r *RuleEngine) GetMySQLClient() interface{}

GetMySQLClient 获取 MySQL 客户端(用于 Lua 引擎依赖注入)

该方法暴露 MySQL 客户端,以便在 SDK 层初始化 Lua 状态池时注入依赖。

返回值:

  • interface{}: MySQL 客户端连接

func (*RuleEngine) GetRule

func (r *RuleEngine) GetRule(schema, table string) (*Rule, bool)

GetRule 根据数据库和表名获取规则

该方法根据数据库名称和表名获取对应的规则实例。

参数:

schema - 数据库名称
table - 表名称

返回值:

*Rule - 规则实例,如果不存在则返回nil
bool - 是否存在对应的规则,true表示存在,false表示不存在

func (*RuleEngine) HasRule

func (r *RuleEngine) HasRule(dbSchema, table string) bool

HasRule 检查是否存在指定表的规则

该方法检查规则引擎中是否存在指定表的规则配置。

参数:

dbSchema - 数据库名称
table - 表名

返回值:

bool - 如果存在规则返回true,否则返回false

func (*RuleEngine) Initialize

func (r *RuleEngine) Initialize(ctx context.Context, config *canal.Config, rules []Config) error

Initialize 初始化规则引擎

该方法初始化规则引擎,包括: 1. 初始化MySQL连接,用于获取表结构信息 2. 从配置中加载所有规则 3. 编译规则中的Lua脚本 4. 获取并缓存表结构信息

参数:

ctx - 上下文,用于控制初始化过程
config - Canal配置对象,包含MySQL连接信息
rules - 规则配置列表

返回值:

error - 初始化过程中发生的错误,nil表示成功

func (*RuleEngine) ProcessRow

func (r *RuleEngine) ProcessRow(ctx context.Context, ruleKey string, action string, row interface{}) (interface{}, error)

ProcessRow 处理行数据,根据规则进行转换和处理

该方法是规则引擎的核心方法,负责根据规则处理行数据: 1. 查找对应的规则 2. 根据规则配置转换字段名(如驼峰命名、大小写转换等) 3. 转换列数据类型 4. 执行Lua脚本(如果有)进行进一步处理

参数:

ctx - 上下文,用于日志记录
ruleKey - 规则键,格式为"schema:table"
action - 操作类型,如"insert"、"update"、"delete"
row - 行数据,可以是[]interface{}或map[string]interface{}

返回值:

interface{} - 处理后的数据,通常是map[string]interface{}
error - 处理过程中发生的错误,nil表示成功

func (*RuleEngine) UpdateTableInfo

func (r *RuleEngine) UpdateTableInfo(ctx context.Context, dbSchema, table string) error

UpdateTableInfo 更新表结构信息,从数据库重新获取并更新缓存

该方法用于更新表结构信息,通常在表结构发生变化时调用。 它会从数据库重新获取表结构,并更新缓存和相关规则。

参数:

ctx - 上下文,用于日志记录
dbSchema - 数据库名称
table - 表名称

返回值:

error - 更新过程中发生的错误,nil表示成功

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL