TData
一个高效、可靠的MySQL binlog同步工具,支持将MySQL数据实时同步到多种目标系统。
项目简介
TData是一个基于MySQL binlog的实时数据同步系统,采用事件驱动架构,支持批量处理、优先级队列、动态调整批量大小、智能告警等高级特性。系统设计清晰,模块化程度高,具有良好的扩展性和可维护性。
核心优势
- 高性能:批量处理、对象池、异步写入、事件驱动等优化策略
- 高可用:自动重连、重试机制、位置持久化、断路器保护等高可用设计
- 可扩展:发送器扩展、规则扩展等扩展性设计
- 易维护:清晰的架构分层、合理的设计模式、完善的监控指标
- 智能告警:多渠道告警、去重聚合、实时通知等智能运维能力
功能特性
-
实时同步:基于MySQL binlog实现实时数据同步
-
多目标支持:支持同步到Redis、MongoDB、RocketMQ、Kafka、RabbitMQ等多种目标系统
-
灵活配置:支持通过配置文件灵活定义同步规则
-
高性能:支持批量处理、动态批量大小调整、事件驱动通知、并发控制
-
高可靠:支持位置存储、断点续传、故障自动恢复
-
监控支持:内置Prometheus指标导出,支持监控系统运行状态
-
智能重试:支持发送失败后的智能重试机制
-
断路器模式:内置断路器模式,自动熔断保护,提高系统容错能力
-
事件驱动架构:使用Channel实现批次异步通知,避免无效轮询,提升性能
-
Lua脚本支持:支持通过Lua脚本自定义数据处理逻辑
-
优雅关闭:支持信号处理,确保资源正确释放
-
多渠道告警:支持钉钉、企业微信、邮件、Slack等多渠道告警
-
智能告警聚合:告警去重和聚合机制,避免告警轰炸
-
死信队列持久化:死信队列持久化到文件,防止数据丢失
-
位置存储优化:位置存储立即写入,确保数据可靠性
技术栈
- Go:主要开发语言,版本要求1.21+
- go-mysql/canal:用于解析MySQL binlog
- redis/go-redis:Redis客户端
- Prometheus:监控指标收集
- gogf/gf:基础框架,提供日志、配置等功能
- yuin/gopher-lua:Lua脚本支持
项目结构
tdata/
├── cmd/
│ └── transfer/ # 应用程序入口
│ └── main.go
├── pkg/
│ ├── alert/ # 告警层:多渠道告警通知(钉钉、企业微信、邮件、Slack)
│ │ ├── aggregator.go # 告警聚合器
│ │ ├── config.go # 告警配置
│ │ ├── deduplicator.go # 告警去重器
│ │ ├── dingtalk.go # 钉钉告警实现
│ │ ├── email.go # 邮件告警实现
│ │ ├── interface.go # 告警接口定义
│ │ ├── loader.go # 告警配置加载
│ │ ├── manager.go # 告警管理器
│ │ ├── slack.go # Slack告警实现
│ │ ├── types.go # 告警类型定义
│ │ └── wecom.go # 企业微信告警实现
│ ├── circuit/ # 断路器:熔断保护
│ │ ├── breaker.go # 断路器实现
│ │ └── breaker_test.go # 断路器测试
│ ├── collector/ # 采集层:Canal客户端和事件处理器
│ │ ├── batch_manager.go # 批量管理器
│ │ ├── canal_client.go # Canal客户端实现
│ │ ├── canal_config_builder.go # Canal配置构建器
│ │ ├── canal_connector.go # Canal连接器
│ │ ├── canal_logger.go # Canal日志记录器
│ │ ├── canal_monitor.go # Canal监控器
│ │ ├── canal_reconnector.go # Canal重连器
│ │ ├── canal_types.go # Canal类型定义
│ │ ├── config.go # 采集器配置
│ │ ├── constants.go # 常量定义
│ │ ├── event_adjustment.go # 事件调整
│ │ ├── event_batch.go # 事件批量处理
│ │ ├── event_config.go # 事件配置
│ │ ├── event_handler.go # 事件处理器
│ │ ├── event_init.go # 事件初始化
│ │ ├── event_position.go # 事件位置处理
│ │ ├── event_statistics.go # 事件统计
│ │ ├── event_test.go # 事件测试
│ │ └── logger.go # 日志记录器
│ ├── container/ # 容器:依赖注入和生命周期管理
│ │ └── container.go # 容器实现
│ ├── coordinator/ # 协调层:批量协调器
│ │ └── batch_coordinator.go # 批量协调器
│ ├── errors/ # 错误处理
│ │ └── errors.go # 错误定义
│ ├── luaengine/ # 处理层:Lua脚本引擎
│ │ └── luaengine.go # Lua引擎实现
│ ├── metrics/ # 监控层:Prometheus指标导出
│ │ ├── collector.go # 指标收集器
│ │ ├── config.go # 监控配置
│ │ ├── constants.go # 指标常量
│ │ ├── exporter.go # Prometheus导出器
│ │ ├── metrics.go # 指标定义
│ │ └── metrics_test.go # 指标测试
│ ├── redis/ # Redis客户端实现
│ │ ├── config.go # Redis配置
│ │ ├── constants.go # Redis常量
│ │ ├── consumer.go # Redis消费者
│ │ ├── handler.go # Redis处理器
│ │ ├── handler_test.go # Redis处理器测试
│ │ ├── mapper.go # Redis映射器
│ │ ├── option.go # Redis选项
│ │ └── sender.go # Redis发送器
│ ├── rule/ # 规则引擎
│ │ ├── config.go # 规则配置
│ │ ├── rule.go # 规则引擎实现
│ │ └── rule_test.go # 规则引擎测试
│ ├── sdk/ # SDK客户端接口和实现
│ │ ├── builder.go # SDK构建器
│ │ ├── client.go # SDK客户端
│ │ ├── config.go # SDK配置
│ │ ├── config_loader.go # 配置加载器
│ │ ├── config_test.go # 配置测试
│ │ ├── handler.go # 处理器接口
│ │ └── options.go # 配置选项
│ ├── sender/ # 发送器抽象和实现
│ │ ├── config.go # 发送器配置
│ │ ├── redis_adapter.go # Redis适配器
│ │ ├── redis_sender.go # Redis发送器
│ │ └── sender.go # 发送器接口
│ ├── storage/ # 存储层:位置存储
│ │ ├── config.go # 存储配置
│ │ ├── position.go # 位置存储实现
│ │ └── position_test.go # 位置存储测试
│ ├── types/ # 公共类型定义
│ │ └── types.go # 类型定义
│ └── util/ # 工具函数
│ ├── string.go # 字符串工具
│ └── string_test.go # 字符串工具测试
├── docs/ # 文档目录
│ ├── 代码注释规范.md # 代码注释编写规范
│ ├── 告警模块完整指南.md # 告警模块完整指南
│ ├── 审计功能启动指南.md # 审计功能启动指南
│ ├── Lua规则使用文档.md # Lua脚本使用文档
│ ├── 监控运维指南.md # 监控和运维指南
│ ├── 快速部署指南.md # 快速部署指南
│ ├── 配置说明.md # 配置文件详细说明
│ ├── 日志编写规范.md # 日志编写规范
│ ├── 使用示例.md # 使用场景和示例
│ └── 术语翻译表.md # 技术术语翻译对照表
├── store/ # 数据存储目录
│ └── mysql_position.json # MySQL位置存储文件
├── logs/ # 日志目录
├── app.yml # 应用配置文件
├── app.example.yaml # 配置示例文件
├── prometheus.yml # Prometheus监控配置
├── prometheus_alerts.yml # Prometheus告警规则配置
├── go.mod # Go模块定义
├── go.sum # Go模块依赖校验
└── README.md # 项目说明文档
架构分层
┌─────────────────────────────────────────────────────────────┐
│ 应用层 │
│ (cmd/transfer/main.go) │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ Client 层 │
│ (pkg/client/client.go) │
│ - 统一 API 接口 │
│ - 配置管理 │
│ - 生命周期管理 │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 容器层 │
│ (pkg/container/container.go) │
│ - 依赖注入管理 │
│ - 组件生命周期管理 │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 业务逻辑层 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 收集器层 │ │ 规则引擎 │ │ 发送器层 │ │
│ │ (collector) │ │ (rule) │ │ (sender) │ │
│ │ │ │ │ │ │ │
│ │ - CanalClient│ │ - RuleEngine │ │ - Sender │ │
│ │ - EventHandler│ │ - Rule │ │ - RedisSender│ │
│ │ - QueueMgr │ │ - LuaEngine │ │ │ │
│ │ - BatchMgr │ │ │ │ │ │
│ │ - WorkerPool │ │ │ │ │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 存储层 │
│ (pkg/storage) │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 文件存储 │ │ Redis 存储 │ │
│ │ (file) │ │ (redis) │ │
│ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ 基础设施层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 类型定义 │ │ 监控指标 │ │ 工具库 │ │
│ │ (types) │ │(metrics) │ │ (util) │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
快速开始
安装
# 编译程序
go build -o TData cmd/transfer/main.go
# 或直接运行
go run cmd/transfer/main.go
配置
- 选择配置文件:
# 使用示例配置文件
cp app.example.yaml app.yml
-
修改配置文件:
app.yml:配置MySQL连接、系统设置、目标系统、告警等所有配置项
- 同步规则配置在
sender.rules 部分
运行
# 使用配置文件启动
./TData -config app.yml
# 或使用 go run
go run cmd/transfer/main.go -config app.yml
配置说明
应用配置(app.yml)
完整配置示例:
# Web服务配置
web:
enable: true # 是否启用Web服务
port: 8060 # Web服务监听端口
# 监控配置
metrics:
enabled: true # 是否启用监控
port: 9595 # 监控端口
# 日志配置
logger:
level: "info" # 日志级别:debug, info, warn, error
store: "logs" # 日志存储路径
stdout: true # 是否输出到控制台
# 采集器配置
collector:
# MySQL连接配置
mysql:
addr: "127.0.0.1:3306" # MySQL地址,格式:host:port
user: "root" # 用户名
pass: "123456" # 密码
charset: "utf8mb4" # 字符集
slave_id: 1001 # 从库ID,用于标识当前从库
flavor: "mysql" # 数据库类型:mysql或mariadb
# 系统配置
system:
data_dir: "./store" # 数据目录
maxprocs: 4 # 最大协程数,推荐值:CPU核心数
max_flush_interval: 5000 # 最大刷新间隔(毫秒)
skip_no_pk_table: false # 跳过无主键表
enable_dynamic_bulk_size: true # 是否启用动态批量大小
# 位置存储配置
position_storage:
type: "redis" # 存储类型:file, redis, etcd
file:
file_name: "./store/mysql_position.json" # 位置存储文件名
redis:
addrs: "127.0.0.1:6379"
pass: ""
database: 0
group_type: "single"
master_name: "master"
key: "mysql_position"
batch_size: 1
batch_interval: 200
# 告警配置
alert:
enabled: true # 是否启用告警
send_mode: "parallel" # 发送模式:parallel(并行)或 serial(串行)
# 去重配置
deduplicator:
enabled: true # 是否启用去重
window_size: 300 # 去重时间窗口(秒)
max_entries: 1000 # 最大缓存条目数
# 聚合配置
aggregator:
enabled: true # 是否启用聚合
window_size: 60 # 聚合时间窗口(秒)
flush_interval: 60 # 定时刷新间隔(秒)
max_alerts_per_group: 50 # 每组最大告警数
# 钉钉告警配置
dingtalk:
enabled: true # 是否启用钉钉告警
webhook: "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN" # 钉钉机器人Webhook URL
secret: "" # 钉钉机器人加签密钥(可选)
timeout: 10 # 请求超时时间(秒)
max_retry: 3 # 最大重试次数
retry_delay: 1 # 重试延迟(秒)
# 发送器配置
sender:
target: "Redis" # 目标类型:Redis, MongoDB, Elastic, RocketMQ, Kafka, RabbitMQ
# Redis配置
redis:
addrs: "127.0.0.1:6379" # Redis地址
group_type: "single" # 集群类型
master_name: "" # Master节点名称
pass: "" # Redis密码
database: 0 # Redis数据库索引
# RabbitMQ配置
rabbitmq:
addr: amqp://guest:guest@127.0.0.1:5672/
# 规则配置
rules:
- schema: test
table: user
redis_structure: sortedset
redis_key_prefix: 'user:'
order_by_column: "id"
column_underscore_to_camel: false
value_encoder: "json"
架构设计
核心组件
-
Collector(采集器):负责连接MySQL,解析binlog事件
- CanalClient:管理MySQL连接和binlog同步
- EventHandler:处理各种binlog事件
- BatchManager:管理批量缓冲区,支持事件驱动通知
-
RuleEngine(规则引擎):根据规则处理数据转换
- 表结构缓存
- 字段名转换(驼峰、大小写)
- 数据类型转换
- Lua脚本支持
-
Sender(发送器):将数据发送到目标系统
- Sender接口:定义发送器的核心方法
- RedisSender:Redis发送器实现
- 断路器保护:集成CircuitBreaker实现熔断保护
-
PositionStorage(位置存储):存储同步位置,支持断点续传
- 文件存储:将位置数据保存到本地文件
- Redis存储:将位置数据保存到Redis服务器
-
Metrics(监控指标):收集和导出系统指标
- 系统状态指标
- 队列指标
- 批量处理指标
- Worker指标
- 错误指标
- 业务指标
-
AlertManager(告警管理器):管理系统告警,支持多渠道发送
- 去重机制:基于指纹和时间窗口的智能去重
- 聚合机制:相同类型的告警聚合成一条消息
- 多渠道支持:钉钉、企业微信、邮件、Slack
- 并行/串行发送模式
-
CircuitBreaker(断路器):保护关键路径,防止级联故障
- 三种状态:Closed(闭合)、Open(打开)、HalfOpen(半开)
- 自动熔断:失败达到阈值时快速失败
- 自动恢复:半开状态探测系统恢复
-
BatchCoordinator(批次协调器):协调批次处理,支持并发控制
- 事件驱动订阅:通过Channel订阅批次通知
- 并发控制:使用信号量控制最大并发数
- 异步处理:Goroutine异步发送批次
- 优雅关闭:双重退出机制(Context + Channel)
数据流
MySQL Binlog
↓
[CanalClient] - 解析 binlog 事件
↓
[EventHandler] - 处理各种事件
├─→ OnRow (行事件)
├─→ OnXID (事务提交)
├─→ OnRotate (binlog 切换)
└─→ OnDDL (DDL 语句)
↓
[QueueManager] - 优先级队列
↓
[WorkerPool] - 工作协程池
↓
[handleRowRequests] - 处理行请求
↓
[RuleEngine] - 规则引擎
├─→ 字段名转换
├─→ 数据类型转换
└─→ Lua 脚本处理
↓
[BatchManager] - 批量缓冲区
├─→ 达到批量大小
└─→ 达到最大刷新间隔
↓
【事件驱动通知】非阻塞发送到 batchChannel
↓
[BatchCoordinator] - 批次协调器
├─→ 订阅 BatchChannel
├─→ 事件循环监听
└─→ 并发控制(信号量)
↓
[Sender] - 发送器(带断路器保护)
├─→ CircuitBreaker 检查
├─→ RedisSender 发送
└─→ 并发控制
↓
[Redis] - 目标系统
位置数据流向
MySQL Binlog Position
↓
[EventHandler] - 位置事件
├─→ OnXID
├─→ OnRotate
└─→ OnPosSynced
↓
[QueueManager] - 优先级队列(最高优先级)
↓
[WorkerPool] - 工作协程
↓
[handlePosRequest] - 处理位置请求
↓
[PositionStorage] - 位置存储
├─→ filePositionStorage
│ ├─→ 异步写入通道
│ ├─→ 批量缓冲区
│ └─→ 原子写入文件
└─→ redisPositionStorage
└─→ Redis SET
系统架构图
整体架构图
graph TB
subgraph 应用层["应用层"]
CMD[cmd/transfer/main.go<br/>应用入口]
end
subgraph Client层["Client层"]
SDK[pkg/client/client.go<br/>Client客户端]
end
subgraph 容器层["容器层"]
CONTAINER[pkg/container/container.go<br/>依赖注入管理]
end
subgraph 业务逻辑层["业务逻辑层"]
COLLECTOR[pkg/collector/<br/>采集器层<br/>CanalClient<br/>EventHandler<br/>BatchManager]
RULE[pkg/rule/<br/>规则引擎<br/>RuleEngine<br/>LuaEngine]
COORDINATOR[pkg/coordinator/<br/>协调层<br/>BatchCoordinator]
SENDER[pkg/sender/<br/>发送器层<br/>RedisSender<br/>断路器保护]
end
subgraph 基础设施层["基础设施层"]
ALERT[pkg/alert/<br/>告警系统<br/>多渠道支持]
CIRCUIT[pkg/circuit/<br/>断路器<br/>熔断保护]
STORAGE[pkg/storage/<br/>位置存储<br/>文件/Redis]
METRICS[pkg/metrics/<br/>监控指标<br/>Prometheus]
end
CMD --> SDK
SDK --> CONTAINER
CONTAINER --> COLLECTOR
COLLECTOR --> RULE
COLLECTOR --> COORDINATOR
COORDINATOR --> SENDER
SENDER --> CIRCUIT
SENDER --> STORAGE
SENDER --> ALERT
SENDER --> METRICS
COLLECTOR --> METRICS
事件驱动数据流图
sequenceDiagram
participant MySQL as MySQL Server
participant Canal as CanalClient
participant EH as EventHandler
participant BM as BatchManager
participant BC as BatchCoordinator
participant Sender as RedisSender
participant Redis as Redis
MySQL->>Canal: binlog 事件
Canal->>EH: 解析事件
EH->>EH: handleRowRequests()
EH->>BM: Add(requests)
BM->>BM: mutex.Lock()
BM->>BM: append(buffer, requests)
BM->>BM: batch = flush()
BM->>BM: 从对象池获取切片
BM->>BM: batch = append(batch, buffer)
BM->>BM: buffer = buffer[:0]
BM->>BM: 更新统计信息
BM->>BM: select非阻塞发送
BM->>BM: batchChannel <- batch
BM->>BM: mutex.Unlock()
BC->>BM: SubscribeBatch()
BC->>BC: batchProcessor() 启动
loop 事件监听
BC->>BM: 接收 <-batchChannel
alt channel关闭
BC->>BC: return (优雅退出)
alt 批次有效
BC->>BC: processBatch(batch)
BC->>BC: 获取信号量
par 异步发送
BC->>Sender: Send(ctx, batch)
Sender->>Sender: CircuitBreaker检查
alt 断路器打开
Sender->>BC: 跳过发送
else 断路器闭合
Sender->>Sender: Pipeline批量执行
Sender->>Redis: 管道命令
Redis-->>Sender: 执行结果
alt 发送成功
Sender->>BC: MarkSuccess()
BC->>BC: 释放信号量
else 发送失败
Sender->>BC: MarkFailure()
BC->>BC: 释放信号量
end
end
end
告警系统架构图
graph TB
subgraph 告警管理器["AlertManager"]
DEDUP[去重器 Deduplicator<br/>时间窗口去重]
AGGREGATOR[聚合器 Aggregator<br/>告警聚合]
NOTIFIERS[发送器列表<br/>DingTalk/WeCom<br/>Email/Slack]
end
subgraph 去重机制["去重机制"]
FP1[生成指纹<br/>Level + Title + Source]
CHECK[检查时间窗口<br/>5分钟]
RECORD[记录已发送<br/>fingerprint + timestamp]
LRU[LRU清理<br/>超过1000条]
end
subgraph 聚合机制["聚合机制"]
BUCKET[聚合桶<br/>相同类型告警]
WINDOW[时间窗口<br/>1分钟]
TRIGGER[触发聚合<br/>超时/超数]
BUILD[构建聚合消息<br/>统计重复次数]
end
DEDUP --> CHECK
CHECK --> RECORD
RECORD --> LRU
BUCKET --> WINDOW
WINDOW --> TRIGGER
TRIGGER --> BUILD
告警源 --> 告警管理器
告警管理器 --> 去重机制
去重机制 --> AGGREGATOR
AGGREGATOR --> NOTIFIERS
NOTIFIERS --> 钉钉[钉钉]
NOTIFIERS --> 企业微信[企业微信]
NOTIFIERS --> 邮件[邮件]
NOTIFIERS --> Slack[Slack]
断路器状态转换图
stateDiagram-v2
[*] --> Closed: 初始状态
Closed --> Open: 失败次数≥5次<br/>failure_threshold
note right of Closed
允许所有请求通过
失败计数器重置
end note
Open --> HalfOpen: 超时30秒<br/>timeout
note right of Open
拒绝所有请求
记录最后失败时间
end note
HalfOpen --> Closed: 成功次数≥3次<br/>success_threshold
HalfOpen --> Open: 探测失败<br/>或超限
note right of HalfOpen
允许部分请求探测<br/>最多1次
半开状态请求计数
end note
HalfOpen --> [*]: 优雅关闭
Closed --> [*]: 优雅关闭
Open --> [*]: 优雅关闭
批次处理并发控制图
graph LR
subgraph BatchManager["BatchManager 生产者"]
FLUSH[flush()<br/>创建批次]
CHANNEL[batchChannel<br/>容量100]
NONBLOCK[非阻塞发送<br/>select + default]
end
subgraph BatchCoordinator["BatchCoordinator 消费者"]
PROCESSOR[batchProcessor()<br/>事件循环]
SUBSCRIBE[SubscribeBatch()<br/>订阅channel]
SELECTOR[select监听<br/>context + channel]
PROCESS[processBatch()<br/>处理批次]
SEMAPHORE[sendSemaphore<br/>信号量控制]
GOROUTINE[goroutine<br/>异步发送]
end
subgraph 并发控制["并发控制"]
ACQUIRE[获取信号量<br/><-sendSemaphore]
RELEASE[释放信号量<br/><-sendSemaphore]
MAX[最大并发数<br/>maxConcurrent]
end
FLUSH --> NONBLOCK
NONBLOCK --> CHANNEL
CHANNEL --> PROCESSOR
PROCESSOR --> SUBSCRIBE
PROCESSOR --> SELECTOR
SELECTOR --> PROCESS
PROCESS --> ACQUIRE
ACQUIRE --> GOROUTINE
GOROUTINE --> RELEASE
RELEASE --> MAX
CHANNEL -.丢弃满时.-> NONBLOCK
监控
内置 Prometheus 指标导出,支持实时监控系统运行状态。提供完整的监控指标和健康检查端点。
监控端点:
/metrics - Prometheus 指标采集端点
/health - 健康检查
/ - 监控主页,提供指标和健康检查的链接
详细文档:完整的监控运维指南、指标说明和告警规则配置请参考 docs/监控运维指南.md
高级特性
事件驱动架构
采用发布-订阅模式实现批次异步通知,消除无效轮询开销:
- 非阻塞发送:BatchManager 通过 Channel 非阻塞发送批次通知
- 事件订阅:BatchCoordinator 订阅 Channel 接收批次事件
- 并发控制:使用信号量控制最大并发数,防止资源耗尽
- 优雅关闭:双重退出机制(Context取消 + Channel关闭)
- 对象池优化:复用切片减少 GC 压力
关键代码示例:
// BatchManager - 发布批次
select {
case bm.batchChannel <- batch:
glog.Debug(nil, "批次已发送到 channel")
default:
glog.Warning(nil, "batchChannel 已满,批次通知丢失")
}
// BatchCoordinator - 订阅批次
batchChan := bm.SubscribeBatch()
for {
select {
case <-bc.ctx.Done():
return
case batch, ok := <-batchChan:
if !ok || batch == nil {
return
}
bc.processBatch(batch)
}
}
断路器模式
内置断路器模式,当目标系统出现故障时,会自动熔断,避免系统过载:
- Closed(闭合):正常状态,允许所有请求通过
- Open(打开):故障状态,快速拒绝所有请求
- HalfOpen(半开):恢复状态,允许部分请求探测系统健康状况
状态转换条件:
- Closed → Open:失败次数达到阈值(默认5次)
- Open → HalfOpen:超时时间到达(默认30秒)
- HalfOpen → Closed:成功次数达到阈值(默认3次)
- HalfOpen → Open:探测失败或超过最大请求数
关键配置参数:
| 参数 |
默认值 |
说明 |
| timeout |
30秒 |
打开状态超时时间 |
| failure_threshold |
5 |
失败阈值 |
| success_threshold |
3 |
成功阈值 |
| max_half_open_requests |
1 |
半开状态最大请求数 |
使用示例:
// 创建断路器
cb := circuit.NewCircuitBreaker(
circuit.WithTimeout(30*time.Second),
circuit.WithFailureThreshold(5),
circuit.WithSuccessThreshold(3),
)
// 使用断路器保护操作
err := cb.Execute(func() error {
return redisClient.Ping(ctx).Err()
})
if err != nil {
if circuit.IsCircuitOpen(err) {
// 断路器打开,跳过操作
log.Println("断路器打开,跳过Redis操作")
}
}
动态批量大小调整
系统会根据处理时间和队列长度自动调整批量大小,优化系统性能:
- 当处理时间较短且队列长度增加时,自动增大批量大小
- 当处理时间较长或队列长度减少时,自动减小批量大小
智能告警系统
完整的告警管理系统,支持多渠道发送、智能去重、自动聚合:
核心特性:
- 告警去重:基于指纹和时间窗口的智能去重,避免告警轰炸
- 告警聚合:自动聚合相同类型告警,减少消息数量
- 多渠道支持:钉钉、企业微信、邮件、Slack
- 灵活发送模式:并行或串行发送
详细文档:完整的告警配置和使用指南请参考 docs/告警模块完整指南.md
Lua脚本支持
支持通过 Lua 脚本自定义数据处理逻辑。系统提供丰富的预加载模块(scriptOps、dbOps、httpOps、redisOps、mqOps、mongodbOps、esOps),可直接在脚本中使用,无需手动 require。
核心特性:
- 模块自动加载,简化脚本编写
- 支持 HTTP 请求(GET、POST、PUT、DELETE),支持 JSON、表单、文件上传
- 支持 SQL 查询、Redis 操作、消息队列等多种数据操作
- 智能重试机制、性能指标收集、结构化日志记录
详细文档:完整的 Lua 脚本使用指南、API 参考和最佳实践请参考 docs/Lua规则使用文档.md
故障排查
常见问题
-
连接MySQL失败
- 检查MySQL地址、端口、用户名、密码是否正确
- 确认MySQL服务是否正常运行
- 检查MySQL用户是否有足够的权限
- 查看日志文件获取详细错误信息
-
同步数据丢失
- 检查位置存储是否正常工作
- 检查批量大小和批量间隔配置
- 确认目标系统连接是否正常
- 查看日志中的错误信息
-
性能问题
- 调整批量大小和批量间隔
- 增加工作协程数量
- 检查系统资源使用情况(CPU、内存)
- 启用监控查看性能指标
-
Lua脚本错误
- 检查Lua脚本语法是否正确
- 使用测试数据验证脚本逻辑
- 查看日志中的Lua执行错误
- 确认Lua脚本文件路径是否正确
日志查看
日志文件位置:./logs/transfer.log
日志级别说明:
- debug:详细的调试信息
- info:常规运行信息
- warn:警告信息
- error:错误信息
监控指标
访问监控端点:http://localhost:9595/metrics
关键指标:
- transfer_queue_length:队列长度,正常情况下应该保持稳定
- transfer_batch_size:批量大小,根据系统负载动态调整
- transfer_worker_count:工作协程数量,应该根据队列长度动态调整
- transfer_errors_total:错误总数,应该保持在较低水平
- transfer_destination_errors_total:目标系统错误,应该为零或很低
贡献指南
- Fork项目
- 创建特性分支
- 提交代码
- 推送到分支
- 创建Pull Request
许可证
Apache-2.0 license