ShardingSphere 之 Scaling
简介
ShardingSphere-Scaling 是一个提供给用户的通用的 ShardingSphere 数据接入迁移,及弹性伸缩的解决方案。
使用方式
目前的弹性伸缩解决方案为:临时地使用两个数据库集群,伸缩完成后切换的方式实现。
这种实现方式有以下优点:
- 伸缩过程中,原始数据没有任何影响
- 伸缩失败无风险
- 不受分片策略限制
同时也存在一定的缺点:
- 在一定时间内存在冗余服务器
- 所有数据都需要移动
执行步骤
准备阶段
在准备阶段,弹性伸缩模块会进行数据源连通性及权限的校验,同时进行存量数据的统计、日志位点的记录,最后根据数据量和用户设置的并行度,对任务进行分片。
存量数据迁移阶段
执行在准备阶段拆分好的存量数据迁移作业,存量迁移阶段采用 JDBC 查询的方式,直接从数据节点中读取数据,并使用新规则写入到新集群中。
增量数据同步阶段
由于存量数据迁移耗费的时间受到数据量和并行度等因素影响,此时需要对这段时间内业务新增的数据进行同步。 不同的数据库使用的技术细节不同,但总体上均为基于复制协议或 WAL 日志实现的变更数据捕获功能。
- MySQL:订阅并解析 binlog
- PostgreSQL:采用官方逻辑复制 test_decoding
这些捕获的增量数据,同样会由弹性伸缩模块根据新规则写入到新数据节点中。当增量数据基本同步完成时(由于业务系统未停止,增量数据是不断的),则进入规则切换阶段。
规则切换阶段
在此阶段,可能存在一定时间的业务只读窗口期,通过业务停止写入或设置数据库只读或 ShardingSphere 的熔断机制,让旧数据节点中的数据短暂静态,确保增量同步已完全完成。
这个窗口期时间短则数秒,长则数分钟,取决于数据量和用户是否需要对数据进行强校验。 确认完成后,Apache ShardingSphere 可通过配置中心修改配置,将业务导向新规则的集群,弹性伸缩完成。
我们由于有写业务无法无法完全停止写入,目前只能尽可能减少写入,然后开启双写,并且停止迁移,进行数据校验,对有问题数据进行补处理。
架构设计
ScalingTask
任务最小执行单元
主要实现类:
- InventoryDataScalingTask(History)存量
- IncrementalDataScalingTask(RealTime) 增量
Dumper(Reader)
从原库获取数据,核心操作主要有:
- setChannel(Channel channel); 设置 Channel
- dump(); 从原库获取数据,并保存数据到 channel 中
主要实现类:
- JDBCDumper
- LogDumper
Importer(Writer)
把数据写入数据到新库,核心操作主要有:
- setChannel(Channel channel); 设置 Channel
- write(); 写入数据到数据库
Channel
1 |
|
主要实现类:
- MemoryChannel
- DistributionChannel
存量迁移
-
获取 minId、maxId,并根据 id 和 concurrency 把迁移 id 区间平均分配到各个 Task(InventoryDataScalingTask)。
minId=1,maxId=100,concurrency=10,则
task1 id 区间为 1-10,task2 id 区间为 11-20 …
-
每个 Task 都有一个 dumper 从原库批量获取 DataRecord,并 push 到 MemoryChannel,BlockingQueue 长度为 10000,超过则阻塞等待
-
Importer 从 MemoryChannel 批量获取 DataRecord 并依次插入到新库中,然后 ack()。
-
当 dumper 获取不到数据时,则 push 一个 FinishedRecord 到 MemoryChannel,dumper 结束。
-
Importer 获取到 FinishedRecord 时,Importer 结束,Task 执行完毕。
增量迁移
-
每次 Task(InventoryDataScalingTask)执行完成,callBack 检测是否所有 Task(InventoryDataScalingTask)执行完成。都执行完成开启增量迁移 Task(IncrementalDataScalingTask)。
-
增量只有一个 dumper 从迁移开始时记录的 binlog 位置 subscribe 数据,并根据 hashcode 和 channelNumber 取模 push 到 Memorychannel。
String index = Integer.toString(Math.abs(dataRecord.hashCode()) % channelNumber);
-
Importer 从 MemoryChannel 批量获取 DataRecord 并依次插入/更新到新库中,然后 ack()。