English
Follow
@Sphere_Ex

基于Sharding-JDBC的订单分库⽅案

本⽅案基于Apache开源项⽬ ShardingSphere 采 Sharding-JDBC 进⾏订单⽔平分库设计(数据分⽚)缓解存储压⼒。
03/29/2023by 杨敬敏 Ginee后端工程师

前言

随着业务的发展,数据量迅速增⻓,单实例 Mysql 数据库的磁盘存储容量很快就会达到上限(阿⾥云 RDB 最⼤上限6T)。因此为了解决⽣产环境的存储瓶颈,需要对数据库进⾏分库设计,加⼤存储空间。本⽅案基于Apache开源项⽬ ShardingSphere 采 Sharding-JDBC 进⾏订单⽔平分库设计(数据分⽚)缓解存储压⼒。

方案对比

⽬前主流的分库分表⽅案就是 Sharding-JDBC 和 Mycat,下⾯主要对⽐这两种。

img

经上述对⽐,可以发现 Sharding-JDBC 是直连数据库、没有中间层的⽅式,性能上更优,同时由于 Sharding-JDBC 集成在应⽤代码内,并不会增加额外的运维成本,使开发者可以专注于⾃⾝代码逻辑。另外,Sharding-JDBC ⽬前仅⽀持 Java,考虑到⽬前团队技术栈主要以 Java 为主,所以采⽤ Sharding-JDBC 的分库⽅案成为了⼀个不错的选择。 下图反映了 Sharding-JDBC 在应⽤中所处的位置:

img

原理

Sharding-JDBC 提供了精确分⽚扩展接⼝ org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm. PreciseShardingAlgorithm,只需要实现其doSharding ⽅法,基于分⽚键做⼀定的算法转换,返回指定的数据源,便可以达到分库的效果。 ⽬前我们使⽤的是⼀致性 hash 算法,⽤于定位分⽚键所在数据源,同时引⼊虚拟节点,可以避免数据倾斜问题(即有的数据源数据多,有的数据源数据少,导致数据源设备压⼒不均衡)算法原理如图:

img

**然后基于该分库的分⽚键&分⽚算法,我们还需在 application.yml 中增加 sharding 数据源配置;同时需要增加分表相关的配置信息。**最后项⽬启动时,sharding 会⾃动加载这些配置初始化数据源。在请求时从请求参数中提取到分⽚键,应⽤分⽚算法,找到⽬标数据源,操作该数据源。

实战

在原有订单数据源 datasource_name 的基础上,新加3台订单数据库实例 datasource_name_1&datasource_name_2&datasource_name_3,扩充后共4台订单数据库实例,磁盘存储最⼤可以达到原有容量的4倍。

img

4.1引⼊maven依赖

xml
<dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-jdbc-spring-boot-starter</artifactId> <version>4.1.1.001</version> </dependency>

这⾥的4.1.1.001版本是在官⽹ sharding 4.1.1版本上⾃⾏扩充了⼀部分内容:本⾝根据业务分⽚键对业务数据进⾏分⽚后,同⼀类型的数据只会在⼀个数据源中,sharding 只能操作这⼀个数据源的数据,但对于⼀部分分⽚键落⼊新数据源后,意味着数据源发⽣了变动,⽽在旧库数据未迁⼊到新库时,会存在⼀定的兼容时期,此时,改造源码实现了如下双操作(除插⼊外):a、双查询:查询新旧数据源的数据,然后业务项⽬⾥使⽤切⾯进⾏查询汇总去重,根据 id+updateDatetime 进⾏去重,保留最新数据。b、双更新:更新数据时,会同时更新新旧数据源中满⾜条件的数据。c、双删除:删除数据时,会同时删除新旧数据源中满⾜条件的数据。双删的⽬的是,为了避免只 删除了⼀个库的数据,⽽由于双查导致另⼀个库的数据被查询出来误⽤。d、单插⼊:插⼊数据时,只会插⼊新分⽚算法所落⼊的数据源中。这样做的好处是,在不影响业 务的情况下,也减少了⼀部分数据迁移的⼯作量,即分库上线后新数据直接插⼊新库,不⽤做任何处理。

改造源码如下:

java
package org.apache.shardingsphere.core.rule; /** * Databases and tables sharding rule. */ @Getter public class ShardingRule implements BaseRule { // 定义:旧的分库分片策略 private final ShardingStrategy oldDatabaseShardingStrategy; public ShardingRule(final ShardingRuleConfiguration shardingRuleConfig, final Collection<String> dataSourceNames) { Preconditions.checkArgument(null != shardingRuleConfig, "ShardingRuleConfig cannot be null."); Preconditions.checkArgument(null != dataSourceNames && !dataSourceNames.isEmpty(), "Data sources cannot be empty."); this.ruleConfiguration = shardingRuleConfig; shardingDataSourceNames = new ShardingDataSourceNames(shardingRuleConfig, dataSourceNames); tableRules = createTableRules(shardingRuleConfig); broadcastTables = shardingRuleConfig.getBroadcastTables(); bindingTableRules = createBindingTableRules(shardingRuleConfig.getBindingTableGroups()); // 加载旧的分库分片策略 ShardingStrategyConfiguration oldDatabaseShardingStrategyConfig = shardingRuleConfig.getOldDatabaseShardingStrategyConfig(); if (oldDatabaseShardingStrategyConfig != null) { oldDatabaseShardingStrategy = createDefaultShardingStrategy(oldDatabaseShardingStrategyConfig); } else { oldDatabaseShardingStrategy = null; } defaultDatabaseShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultDatabaseShardingStrategyConfig()); defaultTableShardingStrategy = createDefaultShardingStrategy(shardingRuleConfig.getDefaultTableShardingStrategyConfig()); defaultShardingKeyGenerator = createDefaultKeyGenerator(shardingRuleConfig.getDefaultKeyGeneratorConfig()); masterSlaveRules = createMasterSlaveRules(shardingRuleConfig.getMasterSlaveRuleConfigs()); encryptRule = createEncryptRule(shardingRuleConfig.getEncryptRuleConfig()); } }
java
package org.apache.shardingsphere.sharding.route.engine; /** * Sharding route decorator. */ public final class ShardingRouteDecorator implements RouteDecorator<ShardingRule> { @SuppressWarnings("unchecked") @Override public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) { SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext(); // SQL参数集合 List<Object> parameters = routeContext.getParameters(); ShardingStatementValidatorFactory.newInstance( sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters)); // 获取分片条件 ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule); boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule); if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) { checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions); mergeShardingConditions(shardingConditions); } ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties); RouteResult routeResult = null; // ShardingStandardRoutingEngine特殊处理 if(shardingRouteEngine instanceof ShardingStandardRoutingEngine) { SQLStatement sqlStatement = sqlStatementContext.getSqlStatement(); // 查询/更新/删除操作,开启双操作模式 boolean isCompatibleMode = (sqlStatement instanceof SelectStatement) || (sqlStatement instanceof UpdateStatement) || (sqlStatement instanceof DeleteStatement); routeResult = shardingRouteEngine.route(shardingRule, isCompatibleMode); } else { routeResult = shardingRouteEngine.route(shardingRule); } if (needMergeShardingValues) { Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery."); } return new RouteContext(sqlStatementContext, parameters, routeResult); } }
java
package org.apache.shardingsphere.sharding.route.engine.type.standard; /** * Sharding standard routing engine. */ public final class ShardingStandardRoutingEngine extends BaseShardingRoutingEngine { /** * 寻找sharding路由结点信息 * @param shardingRule * @param tableRule * @param databaseShardingValues * @param tableShardingValues * @param isCompatibleMode 兼容模式(即同时操作新旧分片算法定位到的数据库,查询/更新/删除自动开启兼容模式) * @return */ private Collection<DataNode> route0(final ShardingRule shardingRule, final TableRule tableRule, final List<RouteValue> databaseShardingValues, final List<RouteValue> tableShardingValues, final boolean isCompatibleMode) { Collection<DataNode> result = new LinkedList<>(); // 双查 & 双更 & 双删核心逻辑 // 1. 加载旧数据节点路由信息 ShardingStrategy oldDatabaseShardingStrategy = shardingRule.getOldDatabaseShardingStrategy(); Collection<String> oldDataSources = null; if (oldDatabaseShardingStrategy != null && isCompatibleMode) { oldDataSources = routeDataSources(shardingRule, tableRule, databaseShardingValues, true); for (String each : oldDataSources) { result.addAll(routeTables(shardingRule, tableRule, each, tableShardingValues, true)); } } // 2. 加载新数据节点路由信息 Collection<String> routedDataSources = routeDataSources(shardingRule, tableRule, databaseShardingValues, false); for (String each : routedDataSources) { // 新数据节点未被加载时,加载新数据节点 if(oldDataSources == null || !oldDataSources.contains(each)) { result.addAll(routeTables(shardingRule, tableRule, each, tableShardingValues, false)); } } return result; } private Collection<String> routeDataSources(final ShardingRule shardingRule, final TableRule tableRule, final List<RouteValue> databaseShardingValues, final boolean isOld) { if (databaseShardingValues.isEmpty()) { return tableRule.getActualDatasourceNames(); } // 根据isOld参数,区分加载新/旧分片策略 Collection<String> result = new LinkedHashSet<>(); if (isOld) { result.addAll(shardingRule.getOldDatabaseShardingStrategy().doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties)); } else { result.addAll(shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(tableRule.getActualDatasourceNames(), databaseShardingValues, this.properties)); } Preconditions.checkState(!result.isEmpty(), "no database route info"); Preconditions.checkState(tableRule.getActualDatasourceNames().containsAll(result), "Some routed data sources do not belong to configured data sources. routed data sources: `%s`, configured data sources: `%s`", result, tableRule.getActualDatasourceNames()); return result; } }

4.2数据分片基于业务进⾏分库的分⽚键选择,最好能均匀的将数据拆分到不同数据库实例。本⽅案假设选择⽤⼾ID作为分库的分⽚键,然后⾃定义新的分库分⽚算法,算法命名为 DatasourceShardingAlgorithmV2,算法内容如下:

java
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm; import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.*; /** * 新的分库数据分片算法 * * @author jingmin.yang * @date 2022/4/18 19:23 */ public class DatasourceShardingAlgorithmV2 implements PreciseShardingAlgorithm<String> { // 排序存储结构:SortedMap<虚拟节点,物理节点> private static final SortedMap<Integer, String> virtualToRealMap = new TreeMap<Integer, String>(); /** * 基于所有数据源初始化订单分库虚拟节点 * @return */ static { List<String> datasources = new ArrayList<String>(); datasources.add("datasource-name"); datasources.add("datasource-name-1"); datasources.add("datasource-name-2"); datasources.add("datasource-name-3"); // 初始化分库虚拟节点数据 init(virtualToRealMap, datasources); } @Override public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<String> preciseShardingValue) { // shardingKey即为分片键值 String shardingKey = preciseShardingValue.getValue(); return getShardingDatasource(virtualToRealMap, shardingKey); } /** * 获取分库物理节点 * @param virtualToRealMap 分库虚拟节点 * @param shardingKey 分库的分片键标识 * @author: jingmin.yang * @create: 2021/9/10 10:41 * @return */ static String getShardingDatasource(SortedMap<Integer, String> virtualToRealMap, String shardingKey) { int hashValue = hash(shardingKey); // 顺时针找最近的虚拟节点 SortedMap<Integer, String> subVirtualToRealMap = virtualToRealMap.tailMap(hashValue); if (subVirtualToRealMap.isEmpty()) { return virtualToRealMap.get(virtualToRealMap.firstKey()); } return subVirtualToRealMap.get(subVirtualToRealMap.firstKey()); } /** * 初始化hash环 * * @param: virtualToRealMap * @param: realNodes * @author: jingmin.yang * @create: 2021/9/14 16:29 */ static void init(SortedMap<Integer, String> virtualToRealMap, List<String> realNodes) { for(String node: realNodes) { virtualToRealMap.put(hash(node), node); // 虚拟出指定数量的虚拟节点 int count = 0; int i = 0; int virtualNum = realNodes.size(); while (count < virtualNum) { i++; String virtualNode = node + "#" + i; // 计算hash值 int hashValue = hash(virtualNode); if (!virtualToRealMap.containsKey(hashValue)) { // 虚拟节点放到环上去 数据结构:红黑树 virtualToRealMap.put(hashValue, node); count++; } } } } /** * * MurmurHash是一种非加密型哈希函数,适用于一般的哈希检索操作,与其它流行的哈希函数相比,对于规律 * 性较强的key,MurmurHash的随机分布(离散性)特征表现更好,而且具有低碰撞率。Redis对节点进行 * shard时采用的是这种算法。 * * @param: key 分片键 * @author: jingmin.yang * @create: 2021/9/14 19:17 */ static int hash(String key) { ByteBuffer buf = ByteBuffer.wrap(key.getBytes()); int seed = 305441741; ByteOrder byteOrder = buf.order(); buf.order(ByteOrder.LITTLE_ENDIAN); int m = 1540483477; byte r = 24; int h; int k; for (h = seed ^ buf.remaining(); buf.remaining() >= 4; h ^= k) { k = buf.getInt(); k *= m; k ^= k >>> r; k *= m; h *= m; } if (buf.remaining() > 0) { ByteBuffer finish = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN); finish.put(buf).rewind(); h ^= finish.getInt(); h *= m; } h ^= h >>> 13; h *= m; h ^= h >>> 15; buf.order(byteOrder); if (h < 0) { h = Math.abs(h); } return h; } }

测试:

java
public static void main(String[] args) { System.out.println(DatasourceShardingAlgorithmV2.getShardingDatasource(virtualToRealMap, "U0000000001")); }

结果:

Plaintext
datasource-name-1

4.3规则配置Sharding-JDBC 可以通过 Java, YAML, Spring 命名空间 和 Spring Boot Starter 这 4种⽅式进⾏配置,开发者可根据场景选择适合的配置⽅式。当前⽅案使⽤ YAML ⽅式进⾏规则配置,application.ym 内容如下(红⾊部分为⾃⾏扩展内容):

yaml
spring: shardingsphere: # sharding数据源配置 datasource: # 对应下方自定义的数据源名称 names: datasource-name,datasource-name-1,datasource-name-2,datasource-name-3 # 数据源基础配置 datasource-name: # 数据源驱动类型 type: com.alibaba.druid.pool.DruidDataSource # 数据库驱动类 driver-class-name: com.mysql.jdbc.Driver # 数据库连接地址 url: jdbc:mysql://127.0.0.1:3306/datasource_name_0 # 数据库用户名 username: username # 数据库用户密码 password: password # 自定义第二个数据源(新数据源1) datasource-name-1: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/datasource_name_1 username: username password: password # 自定义第三个数据源(新数据源2) datasource-name-2: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/datasource_name_2 username: username password: password # 自定义第四个数据源(新数据源3) datasource-name-3: type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/datasource_name_3 username: username password: password # 采用sharding-jdbc进行分库分表设计 sharding: # 默认数据源 default-data-source-name: datasource-name # oldDatabaseStrategy是自行扩展配置(可选),用于双操作(双查询&双更新&双删除)中获取原有数据源,pom需要对应引入4.1.1.001自行扩展版本 oldDatabaseStrategy: standard: # 分库分片键 shardingColumn: sharding_key # 原有的数据源分片算法,本算法返回的是固定的原有数据源datasource-name preciseAlgorithmClassName: com.yang.sharding.DatasourceShardingAlgorithm # 订单分库新的分片算法,根据sharding_key进行分库 default-database-strategy: standard: # 分库分片键 shardingColumn: sharding_key # 新的分库分片算法 preciseAlgorithmClassName: com.yang.sharding.DatasourceShardingAlgorithmV2 # 分库分表配置,只有分库&分表的表需要在此配置,不分库不分表不需要配置,数据会存入默认数据源 tables: # table_0使用分库+分表 table_0: actual-data-nodes: datasource-name.table_0_$->{0..63},datasource-name-$->{1..3}.table_0_$->{0..63} logic-table: table_0 # 数据库表策略配置 table-strategy: standard: # 表分片键 shardingColumn: sharding_key # 表分片算法 preciseAlgorithmClassName: com.yang.TableShardingAlgorithm # table_1使用分库,不使用分表 table_1: actual-data-nodes: datasource-name.table_1, datasource-name-$->{1..3}.table_1 logic-table: table_1

4.4代码改造准备由于 sharding 是对分⽚键使⽤分⽚算法,获取到对应的数据源进⾏操作,⽽分⽚键是从请求 sql 中提取⽽来,所以需要在代码层⾯改造⽀持分库,所有 CRUD 操作必须要带上分⽚键参数。

sql
<!-- 这里假设数据库是MySQL,分片键是user_id字段 --> <!-- 新增操作 --> INSERT INTO order_info('id', 'user_id', 'update_time') VALUES('O0000000001', 'U0000000001', NOW()); <!-- 查询操作 --> SELECT * FROM order_info WHERE user_id='U0000000001'; <!-- 更新操作 --> UPDATE order_info SET update_time=NOW() WHERE user_id='U0000000001'; <!-- 删除操作 --> DELETE FROM order_info WHERE user_id='U0000000001';

总结

**⾄此,基于 Sharding-JDBC 完整的订单分库改造完成,包含选择分库分⽚键、定义分库数据分⽚算法,添加数据源&表配置,代码改造等。**本⽅案已有实际应⽤案例,如有需要的⼩伙伴可作为参考,有问题也可直接联系我,⼀起讨论学习。

Share Article
wechat qrcode

Scan to Follow
Us on WeChat

ShardingSphere 助力当当 WMS:订单效率提升 30%、节约成本上千万
金鹰系统数据库分库分表策略
Experience the power of our next-gen database engine with a free trial.
Products
SphereEx-DBPlusEngine
SphereEx-Tools
SphereEx-Services
Socials
wechat qrcode

Scan to Follow
Us on WeChat

© 2024 SphereEx. All Rights Reserved.
Privacy PolicyTerms Of UseDisclaimerCookie PolicyDo Not Sell My Personal Information