sjdbc入门到放弃

入门到放弃,不是终点,以后还是放弃到再入门。

从去年写了一个强依赖Spring DataSource事务中的方法命名约束来达到读写分离的分表分库小功能后,这一块一直处于比较简单的使用场景,当场景复杂化之后,将会碰到各种诸如事务、xa、一致性以及扩展等更加实际的问题,sjdbc也是去年开始关注的分表分库客户端,从客户端层面通过parseSql实现读写分离场景并可配置化的分表分库插件,相对基于Spring事务场景用途更加广泛,更加简单易用,最关键的是对分表分库情况下的若干情况作了比较好的兼容,接下来就来慢慢总结并深入一下sjdbc的核心吧。

jdk7和jdk8这两篇总结边写边看边学慢慢折腾了近1个月,总算是对这两个版本的新内容有了比较全面的认识,加上之前深入innodb的总结,翻开之前看过的一些开源开源再深入一下这些功能在核心框架中的使用场景,sjdbc是当当开源的分表分库框架,在这个范畴内除了tddl名气稍大,关注度比较高并且全开源的sjdbc是个不错的选择框架,之后再去看看携程使用springcloud定制的配置化方案Apollo,虽然强依赖了太多携程内部的功能,对于netflix eureka的使用也是springcloud一大功能,至于nio在各种分布式网络中的使用让netty风光无限,接下来或许能在元旦后也能写点什么,大数据的功能延续始终找不到可以深入研究的场景,还是先把事干好再观望之。

前戏

读写分离和分表分库很多场合两个人都在一起缠绵,但这两个却是完全不同的两个功能和概念,实现的手段和要解决的那些问题有着本质上的区别,大概要去解决哪些问题呢?

读写分离

  • 主从配置
  • 一致性
  • 扩展策略
  • 轮训策略

分表分库

  • 分库路由策略
  • 分表路由策略
  • 聚合支持
  • 排序支持
  • 小表设计

读写分离

  1. master-slave-data-source
  2. table-rule

相对说读写分离的开发以及实施部署难度远小于分表分库,所以这里先从读写分离入手。

读写分离由数据库自身去完成主从服务器同步功能,程序需要实现的是在读写场景分别路由至主服务器或从服务器(同一线程内融合读写或特定强一致性场景除外),符合大部分读多写少场景下极大减少主服务器的压力,在一主多从场景下可以通过轮训策略进一步减少从服务器的压力,但是会增加主服务器的同步线程的压力,主从比例具体数量根据实际场景决策,比如1:1、1:2、1:3。

读写分离场景下因为主从复制需要依赖mysql binlog进行同步,存在一定的同步延迟,在特定场景下需要能够强制指定读写全部打到主服务器。

分表分库在一定场景下因为容量的增长导致之前设计的分表分库的策略不足以支撑当前容量需要扩容时,需要将一个从库变主库而不需要再次同步直接热启动,此时再给该主库配置至少一个从库为未来扩容做准备,每次以2的倍数形式进行扩容。

在一主多从场景下,从库的高可用可以使用客户端去控制,依据连接是否存在或者出现故障从连接中去除,但是主服务器如果出现问题则需要依赖数据库去实现从变主的操作,整个客户端再依据这种变化动态变化,整个过程会相对复杂很多。

分表分库

  1. data-sources
  2. table-rule

定义数据源是分表分库客户端最重要的操作,规定在客户端将使用那些数据源作为分库的数据源,同时还需要定义这些数据源的路由策略,是通过用户id还是其他场景。

分表是分库中另外一种扩容的方式,在一个数据源中定义出若干个表减少单张表的数据压力,这里同样需要定义表的路由策略,在数据源的路由策略下补充表的场景,可以是一个用户的另外一个业务属性。

分表分库的高可用出现在数据源的可用性上,路由指向的数据源将是本次业务数据需要写入的点,而不能因为该数据源发生故障写入到其他的数据源上,这是违背策略的行为,所以这里的高可用同读写分离的写操作一样,需要数据库本身实现高可用,同时客户端能够获取到最新的信息实现高可用。

SJdbc源码解读

  1. 准备配置数据
  2. 执行逻辑写读操作,创建逻辑连接并配置物理连接
  3. 准备逻辑连接和上下文数据
  4. 执行连接操作获取路由到物理数据源并获得物理连接
  5. 返回连接数据

1. 准备数据

准备物理的DataSource和对应的连接池Connection,此处准备的数据后面会被作为逻辑连接的方法使用,使用Spring配置作为案例,和实际业务接近并且配置参数一目了然,在无事务或者autocommit=true的情况下的配置如下:

  1. 定义多个物理数据源datasource,定义到schema级别,如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<bean id="dbtbl_0" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="${dbtbl_0.driver}"/>
<property name="url" value="${dbtbl_0.url}"/>
<property name="username" value="${dbtbl_0.username}"/>
<property name="password" value="${dbtbl_0.password}"/>
</bean>
<bean id="dbtbl_1" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="${dbtbl_1.driver}"/>
<property name="url" value="${dbtbl_1.url}"/>
<property name="username" value="${dbtbl_1.username}"/>
<property name="password" value="${dbtbl_1.password}"/>
</bean>
<bean id="dbtbl_config" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="${dbtbl_config.driver}"/>
<property name="url" value="${dbtbl_config.url}"/>
<property name="username" value="${dbtbl_config.username}"/>
<property name="password" value="${dbtbl_config.password}"/>
</bean>

分别代表分库需求的3个物理库

  1. 定义逻辑抽象的数据源ShardingDataSource,将物理数据源按照配置节点依次配置进去,首先定义分库的策略,其次单库的分表策略,最后按照策略对接抽象对象和实际对象的关系,在Spring配置中这些都是通过表达式或者表达式对应的策略定义类去分配的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<!--分库的策略 [表达式:用户id按照2取模]-->
<rdb:strategy id="databaseStrategy" sharding-columns="user_id" algorithm-expression="dbtbl_${user_id.longValue() % 2}"/>
<!--分表的策略 [表达式:订单id按照4取模]-->
<rdb:strategy id="orderTableStrategy" sharding-columns="order_id" algorithm-expression="t_order_${order_id.longValue() % 4}"/>
<!--分表的策略 [根据关键key策略的定义类取值]-->
<rdb:strategy id="orderItemTableStrategy" sharding-columns="order_id" algorithm-class="com.dangdang.ddframe.rdb.sharding.example.config.spring.algorithm.SingleKeyModuloTableShardingAlgorithm"/>
<!--定义逻辑关系,定义使用到的物理数据源到抽象数据源-->
<rdb:data-source id="shardingDataSource">
<rdb:sharding-rule data-sources="dbtbl_0,dbtbl_1,dbtbl_config">
<rdb:table-rules>
<!--定义抽象逻辑表和实际表的关系-->
<rdb:table-rule logic-table="t_config" actual-tables="dbtbl_config.t_config"/>
<!--定义抽象逻辑表和实际表的关系-->
<rdb:table-rule logic-table="t_order" actual-tables="dbtbl_${0..1}.t_order_${0..3}" database-strategy="databaseStrategy" table-strategy="orderTableStrategy"/>
<!--定义抽象逻辑表和实际表的关系-->
<rdb:table-rule logic-table="t_order_item" actual-tables="dbtbl_${0..1}.t_order_item_0,dbtbl_${0..1}.t_order_item_1,dbtbl_${0..1}.t_order_item_2,dbtbl_${0..1}.t_order_item_3" database-strategy="databaseStrategy" table-strategy="orderItemTableStrategy"/>
</rdb:table-rules>
<!--定义默认情况下的分库和分表的策略使用,也就是显示定义外的部分-->
<rdb:default-database-strategy sharding-columns="none" algorithm-class="com.dangdang.ddframe.rdb.sharding.api.strategy.database.NoneDatabaseShardingAlgorithm"/>
<rdb:default-table-strategy sharding-columns="none" algorithm-class="com.dangdang.ddframe.rdb.sharding.api.strategy.table.NoneTableShardingAlgorithm"/>
</rdb:sharding-rule>
</rdb:data-source>

上述自定义的xmlschama是根据Spring的schema规范定制的namespace,通过简单的配置即可实现复杂的对应关系,能在Spring容器中更加方便快捷获取到对应的Bean,在Spring中最终定义的Bean如下:

  1. 常用的DataSource,这里用的是org.apache.commons.dbcp.BasicDataSource,平常可能会用TomcaDataSource或者DruidDataSource
  2. 策略类算法ShardingAlgorithm,用于分库或者分表的策略,分别在分库和分表场景对应了其继承类TableShardingAlgorithm,DatabaseShardingAlgorithm,具体到这两种场景使用到的算法使用SingleKeyShardingAlgorithm,MultipleKeysShardingAlgorithm,可以将二者整合实现便可以定义出笛卡尔积个匹配规则:

    • 分库SingleKey
    • 分库MultiKey
    • 分表SingleKey
    • 分表MultiKey
  3. 定义逻辑的数据源shardingDataSource,面向用户接口的都是这个数据源,物理数据源在业务代码中是透明的

  4. 定义分库的规则Rule:DataSourceRule
  5. 定义分表的规则Rule:TableRule,表的规则在库的规则之下,只有路由到确定的库之后才有具体的表路由,所以在表路由中需要配置

    • 逻辑表名,约定的表命名规范的前缀部分
    • 物理实际表名,按照表达式形式的配置
    • 分库的策略算法
    • 分表的策略算法

到此为止整个逻辑过程还算简单,将真实的业务场景通过配置的形式反应在Spring容器中,难点在于不同的实际业务场景通过现有的逻辑或者需要定制开发逻辑将这些逻辑反应在上述的Spring容器中,这里只是举例了单个key和多个key组合的情况,其他更加复杂的需要自己去定制。

2. 执行逻辑写读操作,创建逻辑连接并配置物理连接

在实际的业务代码中最常用的增啥改查操作,在Service层面通常定义成Save,Delete,Update,Select操作,这些操作最终会通过Dao层包装成PrepareStatment反应到数据库的Connection中,这里首先会执行业务的Service代码:

1
2
3
4
5
OrderService service = applicationContext.getBean(OrderService.class);
service.insert();
service.select();
service.delete();
service.select();

Dao层的包装过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void insert() {
String orderSql = "INSERT INTO `t_order` (`order_id`, `user_id`, `status`) VALUES (?, ?, ?)";
String orderItemSql = "INSERT INTO `t_order_item` (`order_item_id`, `order_id`, `user_id`, `status`) VALUES (?, ?, ?, ?)";
for (int orderId = 1; orderId <= 4; orderId++) {
for (int userId = 1; userId <= 2; userId++) {
//从这里开始进入逻辑连接阶段,也就是下面的第三步
try (Connection connection = shardingDataSource.getConnection()) {
PreparedStatement preparedStatement = connection.prepareStatement(orderSql);
preparedStatement.setInt(1, orderId);
preparedStatement.setInt(2, userId);
preparedStatement.setString(3, "insert");
preparedStatement.execute();
preparedStatement.close();
preparedStatement = connection.prepareStatement(orderItemSql);
int orderItemId = orderId + 4;
preparedStatement.setInt(1, orderItemId);
preparedStatement.setInt(2, orderId);
preparedStatement.setInt(3, userId);
preparedStatement.setString(4, "insert");
preparedStatement.execute();
preparedStatement.close();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
ex.printStackTrace();
}
}
}
}

这个过程在不同的业务中会根据实际使用的ORM框架比如MyBatis/SpringTempalte/Hibernate等进行选型,但是本质上都是一样的。

3. 准备逻辑连接和上下文数据

在单一数据源的简单数据库连接中,会直接从数据库的连接池中获取到对应的链接,在SJdbc框架中,这里的链接使用了继承类ShardingConnection,这个是我认为的SJdbc中最关键核心的一个类,在每一次Dao操作中都会声明一个新的连接。

1
2
3
4
5
6
7
8
9
Connection connection = shardingDataSource.getConnection()
//这里使用了Metrics进行数据跟踪(先忽略),在最后返回了一个新的逻辑连接,并且把整个Spring的配置作为上下文参数传递给了这个连接,也就是在这个连接的声明周期中,Spring容器中的分表分库配置都可以被调用
public ShardingConnection getConnection() throws SQLException {
MetricsContext.init(shardingProperties);
return new ShardingConnection(shardingContext);
}

看一下这个上下文参数都包含了哪些信息,规则信息,路由引擎,执行引擎,到这里为止只用到了规则信息

1
2
3
4
5
6
7
8
9
10
@RequiredArgsConstructor
@Getter
public final class ShardingContext {
private final ShardingRule shardingRule;
private final SQLRouteEngine sqlRouteEngine;
private final ExecutorEngine executorEngine;
}

继续执行,insert操作,获取准备的包装的PrepareStatment,

1
2
3
4
5
6
7
8
PreparedStatement preparedStatement = connection.prepareStatement(orderSql);
//这里返回了一个新的包装的PrepareStatment类型,将上面得到的逻辑连接赋予了这个类
public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
return new ShardingPreparedStatement(this, sql, resultSetType, resultSetConcurrency);
}

给上面的对象赋予业务中的数据,入库的数据就是这些了

1
2
3
preparedStatement.setInt(1, orderId);
preparedStatement.setInt(2, userId);
preparedStatement.setString(3, "insert");

到这里为止,所有的数据都已经准备完成,只欠一声令下了。

4. 执行连接操作获取路由到物理数据源并获得物理连接

1
preparedStatement.execute();

从这里开始就进入到Sjdbc作为客户端框架的内核了,这里将会做两件灰常重要的事情

  • 分析sql内容并根据路由规则计算最终的物理数据源和表
  • 获得物理数据源的连接池连接执行sql语句

由之前的代码可以得知preparedStatement对象已经包含了上述所有的物理数据库、上下文路由规则等信息,preparedStatement .execute()方法将会执行哪些呢,还需要看一下execute源码

1
2
//这里创建了一个执行对象,并且构造了执行引擎和路由数据,最后再做了一个执行动作
new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).execute();

具体深入上述的代码:

  • 构造执行引擎
  • 构造路由数据
  • 执行对象

    执行引擎从前面可以看到是上下文的一个成员变量,所以这里获取到的执行引擎应该是在上下文创建时就已经声明了的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//节选ShardingDataSource代码,可以看到执行引擎包含的信息
executorEngine = new ExecutorEngine(shardingProperties);
try {
shardingContext = new ShardingContext(shardingRule, new SQLRouteEngine(shardingRule, DatabaseType.valueFrom(getDatabaseProductName(shardingRule))), executorEngine);
} catch (final SQLException ex) {
throw new ShardingJdbcException(ex);
}
//再回到执行引擎本身
public ExecutorEngine(final ShardingProperties shardingProperties) {
int executorMinIdleSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_MIN_IDLE_SIZE);
int executorMaxSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_MAX_SIZE);
long executorMaxIdleTimeoutMilliseconds = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_MAX_IDLE_TIMEOUT_MILLISECONDS);
executorService = MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(
new ThreadPoolExecutor(executorMinIdleSize, executorMaxSize, executorMaxIdleTimeoutMilliseconds, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>())));
}

构造路由信息在PrePareStatment中的routeSql代码返回,最后返回了一个PreparedStatementExecutorWrapper的列表,可以看的出来,一个语句最终可能会路由到多个数据源的多个表中,所以这里通过这个方法全部返回了,具体是如何分析得出这个结果呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private List<PreparedStatementExecutorWrapper> routeSQL() throws SQLException {
List<PreparedStatementExecutorWrapper> result = new ArrayList<>();
//这里得到了所有的路由信息,这个方法用到了preparedSQLRouter,这是什么呢?看下面
SQLRouteResult sqlRouteResult = preparedSQLRouter.route(getParameters());
MergeContext mergeContext = sqlRouteResult.getMergeContext();
setMergeContext(mergeContext);
setGeneratedKeyContext(sqlRouteResult.getGeneratedKeyContext());
for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) {
PreparedStatement preparedStatement = (PreparedStatement) getStatement(getShardingConnection().getConnection(each.getDataSource(), sqlRouteResult.getSqlStatementType()), each.getSql());
replayMethodsInvocation(preparedStatement);
getParameters().replayMethodsInvocation(preparedStatement);
result.add(wrap(preparedStatement, each));
}
return result;
}

在ShardingPreparedStatement初始化的时候这个成员变量就已经生成了,从路由引擎中的数据准备中获取到,这个对象包含哪些信息呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
preparedSQLRouter = shardingConnection.getShardingContext().getSqlRouteEngine().prepareSQL(sql);
private final String logicSql;
private final SQLRouteEngine engine;
private final ShardingRule shardingRule;
private SQLParsedResult sqlParsedResult;
private Optional<TableRule> tableRuleOptional;
/**
* 使用参数进行SQL路由.
* 当第一次路由时进行SQL解析,之后的路由复用第一次的解析结果.
*
* @param parameters SQL中的参数
* @return 路由结果
*/
public SQLRouteResult route(final List<Object> parameters) {
if (null == sqlParsedResult) {
sqlParsedResult = engine.parseSQL(logicSql, parameters);
tableRuleOptional = shardingRule.tryFindTableRule(sqlParsedResult.getRouteContext().getTables().iterator().next().getName());
} else {
generateId(parameters);
for (ConditionContext each : sqlParsedResult.getConditionContexts()) {
each.setNewConditionValue(parameters);
}
}
return engine.routeSQL(sqlParsedResult, parameters);
}
private void generateId(final List<Object> parameters) {
if (!tableRuleOptional.isPresent()) {
return;
}
TableRule tableRule = tableRuleOptional.get();
GeneratedKeyContext generatedKeyContext = sqlParsedResult.getGeneratedKeyContext();
for (String each : generatedKeyContext.getColumns()) {
Object id = tableRule.generateId(each);
parameters.add(id);
generatedKeyContext.putValue(each, id);
}
}

上面的route方法就返回了对应的全部路由信息,这里都依赖了一个非常重要的核心解析算法:parseSql,分析sql得到最后真实的物理数据源及表数据,在这里读写分离的动作也一并包含在这个过程中,只不过读写分离在这里还需要另外增加单线程同一数据库的限制要求以及强制主库等行为支持。

1
2
sqlParsedResult = engine.parseSQL(logicSql, parameters);
tableRuleOptional = shardingRule.tryFindTableRule(sqlParsedResult.getRouteContext().getTables().iterator().next().getName());

最后执行做了哪些事情呢?再看一下execute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 执行SQL请求.
*
* @return true表示执行DQL, false表示执行的DML
*/
public boolean execute() {
Context context = MetricsContext.start("ShardingPreparedStatement-execute");
eventPostman.postExecutionEvents();
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
try {
//前面的预测没有问题,执行原生方法executeInternal去执行了从获取到的物理路由列表信息
if (1 == preparedStatementExecutorWrappers.size()) {
PreparedStatementExecutorWrapper preparedStatementExecutorWrapper = preparedStatementExecutorWrappers.iterator().next();
return executeInternal(preparedStatementExecutorWrapper, isExceptionThrown, dataMap);
}
List<Boolean> result = executorEngine.execute(preparedStatementExecutorWrappers, new ExecuteUnit<PreparedStatementExecutorWrapper, Boolean>() {
@Override
public Boolean execute(final PreparedStatementExecutorWrapper input) throws Exception {
synchronized (input.getPreparedStatement().getConnection()) {
return executeInternal(input, isExceptionThrown, dataMap);
}
}
});
return (null == result || result.isEmpty()) ? false : result.get(0);
} finally {
MetricsContext.stop(context);
}
}

看看原生执行方法做了那些事情,包装对象最后恢复原形

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private boolean executeInternal(final PreparedStatementExecutorWrapper preparedStatementExecutorWrapper,
final boolean isExceptionThrown, final Map<String, Object> dataMap) {
boolean result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
try {
//从包装类中拿到原生的对象,最后对该对象执行execute操作
result = preparedStatementExecutorWrapper.getPreparedStatement().execute();
} catch (final SQLException ex) {
eventPostman.postExecutionEventsAfterExecution(preparedStatementExecutorWrapper, EventExecutionType.EXECUTE_FAILURE, Optional.of(ex));
ExecutorExceptionHandler.handleException(ex);
return false;
}
eventPostman.postExecutionEventsAfterExecution(preparedStatementExecutorWrapper);
return result;
}

5. 返回连接数据

从执行后运行了整个框架的计算逻辑,最后回到这里返回了一个boolean值表示执行是否成功,在增删改查4个场景中会返回不同的结果,这里以其中一个举例,在返回的过程中会根据实际情况比如返回的要求返回不同的结果,比如insert返回主键在自增场景下。

1
2
3
4
5
6
7
public boolean execute() throws SQLException {
try {
return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).execute();
} finally {
clearRouteContext();
}
}

事务

前面的整个流程都是基于autocommit=true的场景,但是实际业务中可没有这简单,在ACID要求比较高的场景中务必需要加入事务对数据进行控制,在单一数据源中可以直接使用数据库事务进行业务上的隔离,但是在多数据源时xa事务就显得比较棘手,实现上远没有那么简单,SJdbc在当前这个版本通过柔性事务最大努力送达的方式实现最终一致性,计划中会增加TCC类型的一致性方案,这里的最终一致性都是补偿形式的方法,也就是后期通过补偿获得一致性,在某些时间段内可能会存在不一致性的情况。

柔性事务使用通知或者定时的方法在事务后对之前的操作进行一个一致性补充,具体是如何做到的呢?通过一个案例来描述吧。

图片

官方定义的内容:

适用场景

  1. 根据主键删除数据。
  2. 更新记录永久状态,如更新通知送达状态。

使用限制

使用最大努力送达型柔性事务的SQL需要满足幂等性。

  1. INSERT语句要求必须包含主键,且不能是自增主键。
  2. UPDATE语句要求幂等,不能是UPDATE xxx SET x=x+1
  3. DELETE语句无要求。