51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Spring事务注解@Transactional导致mybatis-plus动态数据源@DS失效

背景 {#%E8%83%8C%E6%99%AF}

最近入职了一家新公司,发现新公司很喜欢用MyBatis-Plus(后面简称MP)工具,并且多数据源也是基于MP提供的多数据源能力,只需要在配置文件中定义好多数据源,然后在Mapper上使用@DS注解标注数据源的名称即可。

配置文件示例:

# DataSource Config
spring:
  datasource:
    dynamic:
      primary: db0
      # 是否启用严格模式,默认不启动. 严格模式下未匹配到数据源直接报错, 非严格模式下则使用默认数据源primary所设置的数据源
      strict: true
      datasource:
        db0:
          driver-class-name: org.h2.Driver
          username: root
          password: test
          url: jdbc:h2:mem:testdb0;MODE=MYSQL;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false
          schema: classpath:db0/schema-user.sql
          data: classpath:db0/data-user.sql
        db1:
          driver-class-name: org.h2.Driver
          username: root
          password: test
          url: jdbc:h2:mem:testdb1;MODE=MYSQL;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false
          schema: classpath:db1/schema-address.sql
          data: classpath:db1/data-address.sql

Mapper文件示例

@DS("db1")
public interface AddressMapper extends BaseMapper<Address> {
`}`

调用时即可基于@DS注解上的名称自动切换对应的数据源。


然而在自测一个加了事务的更新接口时却报错说找不到对应的表,我感觉非常奇怪,刚刚自测的另一个查询接口好好的,怎么在这个接口就报错表不存在了?

结论先行 {#%E7%BB%93%E8%AE%BA%E5%85%88%E8%A1%8C}

先说结论,这个问题的原因是由于更新的接口在service类的入口处添加了Spring提供的@Transactional注解。

MP提供的多数据源能力的大致原理是,通过一个切面拦截标注了@DS的方法,在切面中将@DS注解的数据源名称放入一个ThreadLocal类(DynamicDataSourceContextHolder)。

然后实现了一个自定义的数据源对象DynamicRoutingDataSource,当获取数据库连接时,会从DynamicDataSourceContextHolder中拿到数据源名称,并且返回对应数据源名称的连接。

没有使用事务的情况下,每次操作数据库都会去获取一次连接,每次获取连接都可以正常拿到Mapper上@DS的数据源名称,而使用了@Transactional之后,在开始执行真正的方法之前先开启一个事务,并且事务所使用的数据源是默认数据源。


排查过程 {#%E6%8E%92%E6%9F%A5%E8%BF%87%E7%A8%8B}

首先通过debug看了下Mybatis-plus动态数据源实现原理:

  1. DynamicDataSourceAnnotationInterceptor拦截@DS注解,并将动态数据源名称放入ThreadLocal(DynamicDataSourceContextHolder.push(dsKey))

  2. 系统中的数据源由mybatis-plus提供的DynamicRoutingDataSource数据源接管,其中存储了多个数据源名称和对应的数据源配置,当调用DataSource.getConnection()时,实际上调用了DynamicRoutingDataSource.getDataSource(String dsKey).getConnection()方法,

  3. 当Mybatis获取SqlSession时,会调用到DynamicRoutingDataSource.getDatasource()方法获取对应的数据源,在getDatasource()中会读取DynamicDataSourceContextHolder中的动态数据源名称,并获取对应的数据源,如果找不到则获取标记为primiry的数据源。


那为什么使用了@Transaction之后会导致动态数据源失效呢?

如果配置了@Transaction注解,那么就会执行到TransactionInterceptor拦截器。

在TransactionInterceptor拦截器中,在启动事务时,会创建Transaction对象,并且在这之后会创建一个连接对象Connection。

问题在于,由于动态数据源时配置在Mapper接口上,而事务注解一般是放在Service类上,所以在在创建这个连接对象时,并没有经过DynamicDataSourceAnnotationInterceptor拦截器并将数据源信息放入DynamicDataSourceContextHolder中,所以此时获取的连接对象是标记为了primary的数据源。


当TransactionInterceptor拦截器执行完后,事务和连接对象都创建完毕,在这个事务期间调用非primary的数据源的Mapper查询时,会触发获取DefaultSqlSessionFactory#openSessionFromDataSource获取SqlSession,一路debug后最后走到DataSourceUtils#doGetConnection,在这个方法中有个判断,如果当前已经在一个事务中,则会先判断是否已经有现成的connection对象,如果有就直接复用。也就是说,即使后面的查询配置了@DS动态数据源的配置,其查询使用的仍然是在事务创建时就生成的connection对象。



解决办法 {#%E8%A7%A3%E5%86%B3%E5%8A%9E%E6%B3%95}

那如果就是要使用MP的多数据源时使用事务,有没有什么解决办法呢?

方法一:将@DS注解放在@Transaction相同的地方 {#%E6%96%B9%E6%B3%95%E4%B8%80%EF%BC%9A%E5%B0%86%40ds%E6%B3%A8%E8%A7%A3%E6%94%BE%E5%9C%A8%40transaction%E7%9B%B8%E5%90%8C%E7%9A%84%E5%9C%B0%E6%96%B9}

在前面debug的结果可以知道,多数据源失效的原因在于只获取了一次数据库连接,并且在@Transaction注解的入口处没有正确获取到动态数据源名称,针对这两个问题可以将默认的事务传播行为更改为REQUIRES_NEW,从而再次触发获取数据库连接,并且在有@Transaction注解的方法上也添加上动态数据源的注解。如下图:

再次调用debug发现,当要进入handleSetAward方法时,会先由DynamicDataSourceAnnotationInterceptor拦截将@DS的动态数据源名称保存到DynamicDataSourceContextHolder中。

同时由于有@Transaction注解,会再次进入到事务拦截器中,并且会判断是否已经存在事务,如果已经存在事务,并且事务传播行为为REQUIRES_NEW,则会调用startTransaction创建一个新的事务。

此时再创建事务,并且创建connection对象时,可以拿到动态数据库名称,也就能拿到正确的对应的动态数据源了。

不过这个方案非常不优雅,也不推荐。

方法二:使用MP提供的@DSTransactional注解 {#%E6%96%B9%E6%B3%95%E4%BA%8C%EF%BC%9A%E4%BD%BF%E7%94%A8mp%E6%8F%90%E4%BE%9B%E7%9A%84%40dstransactional%E6%B3%A8%E8%A7%A3}

使用@DSTransactional很简单,只需要将最外层使用@Transactional的地方替换成@DSTransactional即可。

使用虽简单,但是本着格物致知的精神,我们来看一下它的实现原理。


首先使用IDEA看看哪里有引用到这个注解,发现并没有哪里有引用,使用全局搜索发现在DynamicDataSourceAutoConfiguration类中有使用。

可以看到在这个配置类中定义了一个切面,拦截所有有DSTransactional注解的方法,并且在DynamicLocalTransactionAdvisor中执行拦截的逻辑。

再次进入DynamicLocalTransactionAdvisor类中,代码如下:

@Slf4j
public class DynamicLocalTransactionAdvisor implements MethodInterceptor {
@Override
public Object invoke(MethodInvocation methodInvocation) throws Throwable {
    // 如果有xid则直接目标方法
    if (!StringUtils.isEmpty(TransactionContext.getXID())) {
        return methodInvocation.proceed();
    }
    boolean state = true;
    Object o;
    String xid = UUID.randomUUID().toString();
    // 如果没有xid,则生成一个,并且将xid放入ThreadLocal(bind方法中就是调用ThreadLocal的set方法)
    TransactionContext.bind(xid);
    try {
        o = methodInvocation.proceed();
    } catch (Exception e) {
        // 目标方法报错,将state置为false
        state = false;
        throw e;
    } finally {
        // 执行ConnectionFactory的notify方法
        ConnectionFactory.notify(state);
        // 移除xid
        TransactionContext.remove();
    }
    return o;
}

}


在这个切面中,首先判断TransactionContext中是否有xid,如果有xid则直接执行目标方法,如果没有xid,则生成一个新的xid,并调用TransactionContext.bind(xid)存入上下文。

如果目标方法抛出了异常,则将state更改false,最终执行ConnectionFactory.notify(state)并移除上下文中的xid。

这个方法中没有对事务的操作,重点应该在ConnectionFactory.notify(state)方法中,继续进入ConnectionFactory.notify方法。

public class ConnectionFactory {
private static final ThreadLocal&amp;lt;Map&amp;lt;String, ConnectionProxy&amp;gt;&amp;gt; CONNECTION_HOLDER =
        new ThreadLocal&amp;lt;Map&amp;lt;String, ConnectionProxy&amp;gt;&amp;gt;() {
            @Override
            protected Map&amp;lt;String, ConnectionProxy&amp;gt; initialValue() {
                return new ConcurrentHashMap&amp;lt;&amp;gt;();
            }
        };

/**
* 将连接对象的autoCommit设置为false,并存入上下文。key是动态数据源名称
*/
public static void putConnection(String ds, ConnectionProxy connection) {
    Map&amp;lt;String, ConnectionProxy&amp;gt; concurrentHashMap = CONNECTION_HOLDER.get();
    if (!concurrentHashMap.containsKey(ds)) {
        try {
            connection.setAutoCommit(false);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        concurrentHashMap.put(ds, connection);
    }
}

/**
* 获取数据库连接,ds是动态数据源名称
*/
public static ConnectionProxy getConnection(String ds) {
    return CONNECTION_HOLDER.get().get(ds);
}

/**
* DSTransactional事务执行完成后执行的通知方法
*/
public static void notify(Boolean state) {
    try {
        Map&amp;lt;String, ConnectionProxy&amp;gt; concurrentHashMap = CONNECTION_HOLDER.get();
        // 遍历当前上下文中的所有连接代理对象
        for (ConnectionProxy connectionProxy : concurrentHashMap.values()) {
            // 调用连接代理对象的notify方法
            connectionProxy.notify(state);
        }
    } finally {
        CONNECTION_HOLDER.remove();
    }
}

}


继续进入connectionProxy.notify方法

@Slf4j
public class ConnectionProxy implements Connection {
private Connection connection;

private String ds;

public ConnectionProxy(Connection connection, String ds) {
    this.connection = connection;
    this.ds = ds;
}

public void notify(Boolean commit) {
    try {
        // 前面传入的state,如果是true则提交事务,如果为false则会滚事务
        if (commit) {
            connection.commit();
        } else {
            connection.rollback();
        }
        connection.close();
    } catch (Exception e) {
        log.error(e.getLocalizedMessage(), e);
    }
}

}


到这里,已经大致清楚多数据源的提交和回滚是如何实现的了,那事务的开启是什么时候呢?另外这个ConnectionProxy是MP提供的类,这个类是什么时候创建的呢。

在前面追溯源码的过程中注意到的ConnectionFactory类中有个putConnection方法,在putConnection中将autoCommit设置为了false,那直接打个断点debug一下。

通过debug堆栈信息很快找到了调用的是AbstractRoutingDataSource类的getConnectionProxy方法,在这个方法中创建了一个ConnectionProxy类对原connection进行了一层包装。

通过堆栈继续往上追溯,发现了熟悉的getConnection方法,这个方法正是在排查过程中介绍MP多数据源实现原理的DynamicRoutingDataSource类的父类的方法。画个类图看一下:

具体来看下AbstractRoutingDataSource的getConnection方法:

public abstract class AbstractRoutingDataSource extends AbstractDataSource {
    protected abstract DataSource determineDataSource();
@Override
public Connection getConnection() throws SQLException {
    String xid = TransactionContext.getXID();
    // 如果当前上下文没有xid,则调用子类的determineDataSource()获取数据源,然后获取数据库连接
    if (StringUtils.isEmpty(xid)) {
        return determineDataSource().getConnection();
    }
    // 如果上下文有xid 
    else {
        String ds = DynamicDataSourceContextHolder.peek();
        // 获取上下文中的动态数据源名称
        ds = StringUtils.isEmpty(ds) ? &quot;default&quot; : ds;
        // 从先看看这个数据源有没有创建过
        ConnectionProxy connection = ConnectionFactory.getConnection(ds);
        // 如果没有创建过则创建一个新的数据库连接,并使用ConnectionProxy进行包装
        return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;
    }
}

}



到这里,整个DSTransactional的流程大概了解了,总结一下:

  1. 在DynamicLocalTransactionAdvisor中拦截DSTransactional注解,并生成一个xid保存在上下文中

  2. 当进行数据库操作需要获取连接对象时,实际调用的时DynamicRoutingDataSource的父类AbstractRoutingDataSource的getConnection方法

  3. 如果上下文存在xid,则获取上下文保存的动态数据源名称,并根据这个名称先看看该数据源有没有现成的连接对象,如果没有,则创建一个新的连接对象,并用ConnectionProxy进行包装,随后将ConnectionProxy放入上下文中,并将连接对象的autoCommit设置为false。

  4. 当事务方法执行完后,执行DynamicLocalTransactionAdvisor中的finall代码,将上下文中的所有连接对象进行提交或者回滚。


注意点:

看到这里,不知道你有没有注意到不管是多数据源还是DSTransactional实现的事务都是在getConnection中实现的,也就是说必须要调用getConnection方法才能使多数据源或者DSTransactional实现的事务生效,如果在DSTransactional外层的方法中有Spring的@Transactional注解会发生什么呢?

没错,多数据源以及DSTransactional事务都会失效,所以日常使用的时候一定要注意。


赞(1)
未经允许不得转载:工具盒子 » Spring事务注解@Transactional导致mybatis-plus动态数据源@DS失效