背景 {#%E8%83%8C%E6%99%AF}
最近入职了一家新公司,发现新公司很喜欢用MyBatis-Plus(后面简称MP)工具,并且多数据源也是基于MP提供的多数据源能力,只需要在配置文件中定义好多数据源,然后在Mapper上使用@DS注解标注数据源的名称即可。
配置文件示例:
# DataSource Config
spring:
datasource:
dynamic:
primary: db0
# 是否启用严格模式,默认不启动. 严格模式下未匹配到数据源直接报错, 非严格模式下则使用默认数据源primary所设置的数据源
strict: true
datasource:
db0:
driver-class-name: org.h2.Driver
username: root
password: test
url: jdbc:h2:mem:testdb0;MODE=MYSQL;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false
schema: classpath:db0/schema-user.sql
data: classpath:db0/data-user.sql
db1:
driver-class-name: org.h2.Driver
username: root
password: test
url: jdbc:h2:mem:testdb1;MODE=MYSQL;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false
schema: classpath:db1/schema-address.sql
data: classpath:db1/data-address.sql
Mapper文件示例
@DS("db1")
public interface AddressMapper extends BaseMapper<Address> {
`}`
调用时即可基于@DS注解上的名称自动切换对应的数据源。
然而在自测一个加了事务的更新接口时却报错说找不到对应的表,我感觉非常奇怪,刚刚自测的另一个查询接口好好的,怎么在这个接口就报错表不存在了?
结论先行 {#%E7%BB%93%E8%AE%BA%E5%85%88%E8%A1%8C}
先说结论,这个问题的原因是由于更新的接口在service类的入口处添加了Spring提供的@Transactional注解。
MP提供的多数据源能力的大致原理是,通过一个切面拦截标注了@DS的方法,在切面中将@DS注解的数据源名称放入一个ThreadLocal类(DynamicDataSourceContextHolder)。
然后实现了一个自定义的数据源对象DynamicRoutingDataSource,当获取数据库连接时,会从DynamicDataSourceContextHolder中拿到数据源名称,并且返回对应数据源名称的连接。
没有使用事务的情况下,每次操作数据库都会去获取一次连接,每次获取连接都可以正常拿到Mapper上@DS的数据源名称,而使用了@Transactional之后,在开始执行真正的方法之前先开启一个事务,并且事务所使用的数据源是默认数据源。
排查过程 {#%E6%8E%92%E6%9F%A5%E8%BF%87%E7%A8%8B}
首先通过debug看了下Mybatis-plus动态数据源实现原理:
-
DynamicDataSourceAnnotationInterceptor拦截@DS注解,并将动态数据源名称放入ThreadLocal(DynamicDataSourceContextHolder.push(dsKey))
-
系统中的数据源由mybatis-plus提供的DynamicRoutingDataSource数据源接管,其中存储了多个数据源名称和对应的数据源配置,当调用DataSource.getConnection()时,实际上调用了DynamicRoutingDataSource.getDataSource(String dsKey).getConnection()方法,
-
当Mybatis获取SqlSession时,会调用到DynamicRoutingDataSource.getDatasource()方法获取对应的数据源,在getDatasource()中会读取DynamicDataSourceContextHolder中的动态数据源名称,并获取对应的数据源,如果找不到则获取标记为primiry的数据源。
那为什么使用了@Transaction之后会导致动态数据源失效呢?
如果配置了@Transaction注解,那么就会执行到TransactionInterceptor拦截器。
在TransactionInterceptor拦截器中,在启动事务时,会创建Transaction对象,并且在这之后会创建一个连接对象Connection。
问题在于,由于动态数据源时配置在Mapper接口上,而事务注解一般是放在Service类上,所以在在创建这个连接对象时,并没有经过DynamicDataSourceAnnotationInterceptor拦截器并将数据源信息放入DynamicDataSourceContextHolder中,所以此时获取的连接对象是标记为了primary的数据源。
当TransactionInterceptor拦截器执行完后,事务和连接对象都创建完毕,在这个事务期间调用非primary的数据源的Mapper查询时,会触发获取DefaultSqlSessionFactory#openSessionFromDataSource获取SqlSession,一路debug后最后走到DataSourceUtils#doGetConnection,在这个方法中有个判断,如果当前已经在一个事务中,则会先判断是否已经有现成的connection对象,如果有就直接复用。也就是说,即使后面的查询配置了@DS动态数据源的配置,其查询使用的仍然是在事务创建时就生成的connection对象。
解决办法 {#%E8%A7%A3%E5%86%B3%E5%8A%9E%E6%B3%95}
那如果就是要使用MP的多数据源时使用事务,有没有什么解决办法呢?
方法一:将@DS注解放在@Transaction相同的地方 {#%E6%96%B9%E6%B3%95%E4%B8%80%EF%BC%9A%E5%B0%86%40ds%E6%B3%A8%E8%A7%A3%E6%94%BE%E5%9C%A8%40transaction%E7%9B%B8%E5%90%8C%E7%9A%84%E5%9C%B0%E6%96%B9}
在前面debug的结果可以知道,多数据源失效的原因在于只获取了一次数据库连接,并且在@Transaction注解的入口处没有正确获取到动态数据源名称,针对这两个问题可以将默认的事务传播行为更改为REQUIRES_NEW,从而再次触发获取数据库连接,并且在有@Transaction注解的方法上也添加上动态数据源的注解。如下图:
再次调用debug发现,当要进入handleSetAward方法时,会先由DynamicDataSourceAnnotationInterceptor拦截将@DS的动态数据源名称保存到DynamicDataSourceContextHolder中。
同时由于有@Transaction注解,会再次进入到事务拦截器中,并且会判断是否已经存在事务,如果已经存在事务,并且事务传播行为为REQUIRES_NEW,则会调用startTransaction创建一个新的事务。
此时再创建事务,并且创建connection对象时,可以拿到动态数据库名称,也就能拿到正确的对应的动态数据源了。
不过这个方案非常不优雅,也不推荐。
方法二:使用MP提供的@DSTransactional注解 {#%E6%96%B9%E6%B3%95%E4%BA%8C%EF%BC%9A%E4%BD%BF%E7%94%A8mp%E6%8F%90%E4%BE%9B%E7%9A%84%40dstransactional%E6%B3%A8%E8%A7%A3}
使用@DSTransactional很简单,只需要将最外层使用@Transactional的地方替换成@DSTransactional即可。
使用虽简单,但是本着格物致知的精神,我们来看一下它的实现原理。
首先使用IDEA看看哪里有引用到这个注解,发现并没有哪里有引用,使用全局搜索发现在DynamicDataSourceAutoConfiguration类中有使用。
可以看到在这个配置类中定义了一个切面,拦截所有有DSTransactional注解的方法,并且在DynamicLocalTransactionAdvisor中执行拦截的逻辑。
再次进入DynamicLocalTransactionAdvisor类中,代码如下:
@Slf4j
public class DynamicLocalTransactionAdvisor implements MethodInterceptor {
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
// 如果有xid则直接目标方法
if (!StringUtils.isEmpty(TransactionContext.getXID())) {
return methodInvocation.proceed();
}
boolean state = true;
Object o;
String xid = UUID.randomUUID().toString();
// 如果没有xid,则生成一个,并且将xid放入ThreadLocal(bind方法中就是调用ThreadLocal的set方法)
TransactionContext.bind(xid);
try {
o = methodInvocation.proceed();
} catch (Exception e) {
// 目标方法报错,将state置为false
state = false;
throw e;
} finally {
// 执行ConnectionFactory的notify方法
ConnectionFactory.notify(state);
// 移除xid
TransactionContext.remove();
}
return o;
}
`}`
在这个切面中,首先判断TransactionContext中是否有xid,如果有xid则直接执行目标方法,如果没有xid,则生成一个新的xid,并调用TransactionContext.bind(xid)存入上下文。
如果目标方法抛出了异常,则将state更改false,最终执行ConnectionFactory.notify(state)并移除上下文中的xid。
这个方法中没有对事务的操作,重点应该在ConnectionFactory.notify(state)方法中,继续进入ConnectionFactory.notify方法。
public class ConnectionFactory {
private static final ThreadLocal<Map<String, ConnectionProxy>> CONNECTION_HOLDER =
new ThreadLocal<Map<String, ConnectionProxy>>() {
@Override
protected Map<String, ConnectionProxy> initialValue() {
return new ConcurrentHashMap<>();
}
};
/**
* 将连接对象的autoCommit设置为false,并存入上下文。key是动态数据源名称
*/
public static void putConnection(String ds, ConnectionProxy connection) {
Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
if (!concurrentHashMap.containsKey(ds)) {
try {
connection.setAutoCommit(false);
} catch (SQLException e) {
e.printStackTrace();
}
concurrentHashMap.put(ds, connection);
}
}
/**
* 获取数据库连接,ds是动态数据源名称
*/
public static ConnectionProxy getConnection(String ds) {
return CONNECTION_HOLDER.get().get(ds);
}
/**
* DSTransactional事务执行完成后执行的通知方法
*/
public static void notify(Boolean state) {
try {
Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
// 遍历当前上下文中的所有连接代理对象
for (ConnectionProxy connectionProxy : concurrentHashMap.values()) {
// 调用连接代理对象的notify方法
connectionProxy.notify(state);
}
} finally {
CONNECTION_HOLDER.remove();
}
}
`}`
继续进入connectionProxy.notify方法
@Slf4j
public class ConnectionProxy implements Connection {
private Connection connection;
private String ds;
public ConnectionProxy(Connection connection, String ds) {
this.connection = connection;
this.ds = ds;
}
public void notify(Boolean commit) {
try {
// 前面传入的state,如果是true则提交事务,如果为false则会滚事务
if (commit) {
connection.commit();
} else {
connection.rollback();
}
connection.close();
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
}
}
`}`
到这里,已经大致清楚多数据源的提交和回滚是如何实现的了,那事务的开启是什么时候呢?另外这个ConnectionProxy是MP提供的类,这个类是什么时候创建的呢。
在前面追溯源码的过程中注意到的ConnectionFactory类中有个putConnection方法,在putConnection中将autoCommit设置为了false,那直接打个断点debug一下。
通过debug堆栈信息很快找到了调用的是AbstractRoutingDataSource类的getConnectionProxy方法,在这个方法中创建了一个ConnectionProxy类对原connection进行了一层包装。
通过堆栈继续往上追溯,发现了熟悉的getConnection方法,这个方法正是在排查过程中介绍MP多数据源实现原理的DynamicRoutingDataSource类的父类的方法。画个类图看一下:
具体来看下AbstractRoutingDataSource的getConnection方法:
public abstract class AbstractRoutingDataSource extends AbstractDataSource {
protected abstract DataSource determineDataSource();
@Override
public Connection getConnection() throws SQLException {
String xid = TransactionContext.getXID();
// 如果当前上下文没有xid,则调用子类的determineDataSource()获取数据源,然后获取数据库连接
if (StringUtils.isEmpty(xid)) {
return determineDataSource().getConnection();
}
// 如果上下文有xid
else {
String ds = DynamicDataSourceContextHolder.peek();
// 获取上下文中的动态数据源名称
ds = StringUtils.isEmpty(ds) ? "default" : ds;
// 从先看看这个数据源有没有创建过
ConnectionProxy connection = ConnectionFactory.getConnection(ds);
// 如果没有创建过则创建一个新的数据库连接,并使用ConnectionProxy进行包装
return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;
}
}
`}`
到这里,整个DSTransactional的流程大概了解了,总结一下:
-
在DynamicLocalTransactionAdvisor中拦截DSTransactional注解,并生成一个xid保存在上下文中
-
当进行数据库操作需要获取连接对象时,实际调用的时DynamicRoutingDataSource的父类AbstractRoutingDataSource的getConnection方法
-
如果上下文存在xid,则获取上下文保存的动态数据源名称,并根据这个名称先看看该数据源有没有现成的连接对象,如果没有,则创建一个新的连接对象,并用ConnectionProxy进行包装,随后将ConnectionProxy放入上下文中,并将连接对象的autoCommit设置为false。
-
当事务方法执行完后,执行DynamicLocalTransactionAdvisor中的finall代码,将上下文中的所有连接对象进行提交或者回滚。
注意点:
看到这里,不知道你有没有注意到不管是多数据源还是DSTransactional实现的事务都是在getConnection中实现的,也就是说必须要调用getConnection方法才能使多数据源或者DSTransactional实现的事务生效,如果在DSTransactional外层的方法中有Spring的@Transactional注解会发生什么呢?
没错,多数据源以及DSTransactional事务都会失效,所以日常使用的时候一定要注意。