ShardingSphere 之 Scaling

简介

ShardingSphere-Scaling 是一个提供给用户的通用的 ShardingSphere 数据接入迁移,及弹性伸缩的解决方案。

使用方式

目前的弹性伸缩解决方案为:临时地使用两个数据库集群,伸缩完成后切换的方式实现。

伸缩总揽

这种实现方式有以下优点:

  1. 伸缩过程中,原始数据没有任何影响
  2. 伸缩失败无风险
  3. 不受分片策略限制

同时也存在一定的缺点:

  1. 在一定时间内存在冗余服务器
  2. 所有数据都需要移动

执行步骤

image-20201216161515781

准备阶段

在准备阶段,弹性伸缩模块会进行数据源连通性及权限的校验,同时进行存量数据的统计、日志位点的记录,最后根据数据量和用户设置的并行度,对任务进行分片。

存量数据迁移阶段

执行在准备阶段拆分好的存量数据迁移作业,存量迁移阶段采用 JDBC 查询的方式,直接从数据节点中读取数据,并使用新规则写入到新集群中。

增量数据同步阶段

由于存量数据迁移耗费的时间受到数据量和并行度等因素影响,此时需要对这段时间内业务新增的数据进行同步。 不同的数据库使用的技术细节不同,但总体上均为基于复制协议或 WAL 日志实现的变更数据捕获功能。

  • MySQL:订阅并解析 binlog
  • PostgreSQL:采用官方逻辑复制 test_decoding

这些捕获的增量数据,同样会由弹性伸缩模块根据新规则写入到新数据节点中。当增量数据基本同步完成时(由于业务系统未停止,增量数据是不断的),则进入规则切换阶段。

规则切换阶段

在此阶段,可能存在一定时间的业务只读窗口期,通过业务停止写入或设置数据库只读或 ShardingSphere 的熔断机制,让旧数据节点中的数据短暂静态,确保增量同步已完全完成。

这个窗口期时间短则数秒,长则数分钟,取决于数据量和用户是否需要对数据进行强校验。 确认完成后,Apache ShardingSphere 可通过配置中心修改配置,将业务导向新规则的集群,弹性伸缩完成。

我们由于有写业务无法无法完全停止写入,目前只能尽可能减少写入,然后开启双写,并且停止迁移,进行数据校验,对有问题数据进行补处理。

架构设计

ScalingTask

任务最小执行单元

主要实现类:

  • InventoryDataScalingTask(History)存量
  • IncrementalDataScalingTask(RealTime) 增量

Dumper(Reader)

从原库获取数据,核心操作主要有:

  • setChannel(Channel channel); 设置 Channel
  • dump(); 从原库获取数据,并保存数据到 channel 中

主要实现类:

  • JDBCDumper
  • LogDumper

Importer(Writer)

把数据写入数据到新库,核心操作主要有:

  • setChannel(Channel channel); 设置 Channel
  • write(); 写入数据到数据库

Channel

1
2
3
4
5
6
7
public interface Channel {
void pushRecord(Record dataRecord) throws InterruptedException;
List<Record> fetchRecords(int batchSize, int timeout);
void ack();
void close();
}

主要实现类:

  • MemoryChannel
  • DistributionChannel

存量迁移

image-20201216175534892

  1. 获取 minId、maxId,并根据 id 和 concurrency 把迁移 id 区间平均分配到各个 Task(InventoryDataScalingTask)。

    minId=1,maxId=100,concurrency=10,则

    task1 id 区间为 1-10,task2 id 区间为 11-20 …

  2. 每个 Task 都有一个 dumper 从原库批量获取 DataRecord,并 push 到 MemoryChannel,BlockingQueue 长度为 10000,超过则阻塞等待

  3. Importer 从 MemoryChannel 批量获取 DataRecord 并依次插入到新库中,然后 ack()。

  4. 当 dumper 获取不到数据时,则 push 一个 FinishedRecord 到 MemoryChannel,dumper 结束。

  5. Importer 获取到 FinishedRecord 时,Importer 结束,Task 执行完毕。

增量迁移

image-20201216175621918

  1. 每次 Task(InventoryDataScalingTask)执行完成,callBack 检测是否所有 Task(InventoryDataScalingTask)执行完成。都执行完成开启增量迁移 Task(IncrementalDataScalingTask)。

  2. 增量只有一个 dumper 从迁移开始时记录的 binlog 位置 subscribe 数据,并根据 hashcode 和 channelNumber 取模 push 到 Memorychannel。

    String index = Integer.toString(Math.abs(dataRecord.hashCode()) % channelNumber);

  3. Importer 从 MemoryChannel 批量获取 DataRecord 并依次插入/更新到新库中,然后 ack()。


ShardingSphere 之 Scaling
https://zhengshuoo.github.io/posts/003-shardingsphere-scaling-introduce
作者
zhengshuo
发布于
2021年1月6日
许可协议