基于Spring Event,实现同步转异步,解决定时任务扫表导致数据库连接池不够的问题 {#基于spring-event实现同步转异步解决定时任务扫表导致数据库连接池不够的问题}
背景 {#背景}
有这样一个业务场景,作为一个金融产品,很多用户会在借款后发生逾期,每一笔逾期都是一笔单独的借据,在贷后催收环节中,需要基于用户维度做聚合,把多笔借据合并成一个案件进行统一的催收。
那么就需要一个把多笔借据合并成一个案件的操作。最开始采用的方案就是定时任务扫表,每天早上凌晨5-8点之间进行定时任务扫表,然后进行案件的合并。
但是随着业务量的增多,扫表经常会扫不完,于是业务上为了提效,把定时任务改为分布式任务,借助多实例进行批量扫表。
但是这样做就导致数据库扛不住了,数据库的连接池经常在跑任务的时候被打满。于是就需要想办法解决这个问题。
技术选型 {#技术选型}
关于这个问题,有挺多方案的。
首先就是可以选择分库分表,把原来的单独分成多个库,这样整体的连接数就多了,也就可以扛得住并发扫表了。但是这个方案比较重,分表后也会带来一系列问题。
于是考虑了另外一种方案,那就是基于"同步转异步"的思想,在借据生成的时候,就进行合并,而不是定时任务批量合并。
这样就可以把集中地流量分散到每一条借据生成的过程中,而且这个过程允许失败,一旦失败了,通过定时任务补偿即可。
但是这么做就会导致借据生成这部分逻辑很复杂,需要考虑到合并案件的事情,耦合性太深了。于是就基于Spring Event,把借据生成和案件合并进行解耦。
所以,整体方案就是基于Spring Event,实现同步转异步,解决定时任务扫表导致数据库连接池不够的问题。
在方案改造前,每次扫表需要处理的数据量有20万条,改造后,只需要1000左右的数据量需要扫表处理,大大提升系统的可用性。
你做了什么 {#你做了什么}
在借据生成的方法中,增加一个事件发送:
protected BaseManageResponse genenrateLoan(LoanGenerateEvent loanGenerateEvent) {
BaseManageResponse manageResponse = new BaseManageResponse();
<span class="hljs-keyword">try</span> {
<span class="hljs-comment">// 开启事务</span>
<span class="hljs-keyword">return</span> transactionTemplate.execute(transactionStatus -> {
<span class="hljs-comment">// 核心逻辑执行</span>
doGenerateLoan(loanGenerateEvent);
<span class="hljs-comment">// 发送一个案件入催完成的事件</span>
<span class="hljs-keyword">try</span> {
applicationContext.publishEvent(<span class="hljs-keyword">new</span> <span class="hljs-title class_">CaseStartFinishEvent</span>(loanGenerateEven));
} <span class="hljs-keyword">catch</span> (Exception e) {
LOG.warn(<span class="hljs-string">"publishLoanGenerateEventEvent failed"</span>, e);
}
<span class="hljs-comment">// 结果返回</span>
<span class="hljs-keyword">return</span> manageResponse.successResponse(caseModel);
});
} <span class="hljs-keyword">catch</span> (Exception e) {
<span class="hljs-type">LoanGenerateStream</span> <span class="hljs-variable">existStream</span> <span class="hljs-operator">=</span> queryExistStream(request);
<span class="hljs-keyword">if</span> (existStream != <span class="hljs-literal">null</span>) {
<span class="hljs-keyword">return</span> manageResponse.duplicatedResponse(existStream);
}
<span class="hljs-keyword">throw</span> e;
}
`}
`
这里在applicationContext.publishEvent(new CaseStartFinishEvent(caseModel.getCaseItem()));
中发送一个事件,并且用try-catch包上,一旦失败了,不影响主流程。
然后再定义一个监听器,处理这个事件:
/**
* 案件中心内部事件监听器
*/
@Component
public class CollectionCaseEventListener {
<span class="hljs-meta">@Autowired</span>
<span class="hljs-keyword">private</span> CaseManageService caseManageService;
<span class="hljs-meta">@Autowired</span>
<span class="hljs-keyword">private</span> DistributeLockSupport distributeLockSupport;
<span class="hljs-meta">@EventListener(CaseStartFinishEvent.class)</span>
<span class="hljs-meta">@Async("caseStartFinishExecutor")</span>
<span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title function_">onApplicationEvent</span><span class="hljs-params">(CaseStartFinishEvent event)</span> {
<span class="hljs-type">LoanGenerateEvent</span> <span class="hljs-variable">loanGenerateEvent</span> <span class="hljs-operator">=</span> (LoanGenerateEvent) event.getSource();
<span class="hljs-comment">// 加分布式锁,避免并发情况下导致创建多条案件</span>
<span class="hljs-keyword">if</span> (!distributeLockSupport.acquireLock(loanGenerateEvent.getUserId(), loanGenerateEvent.getBizId(), <span class="hljs-number">10000</span>)) {
<span class="hljs-keyword">return</span>;
}
<span class="hljs-keyword">try</span> {
<span class="hljs-type">CaseMergeEvent</span> <span class="hljs-variable">mergeEvent</span> <span class="hljs-operator">=</span> <span class="hljs-keyword">new</span> <span class="hljs-title class_">CaseMergeEvent</span>();
mergeEvent.setCaseItemId(loanGenerateEvent.getId());
mergeEvent.setUserId(loanGenerateEvent.getUserId());
mergeEvent.setUserIdType(loanGenerateEvent.getUserIdType());
mergeEvent.setIdentifier(UUID.randomUUID().toString());
mergeEvent.setProduct(loanGenerateEvent.getProduct());
mergeEvent.setBizId(loanGenerateEvent.getBizId());
mergeEvent.setBizDate(<span class="hljs-keyword">new</span> <span class="hljs-title class_">Date</span>());
caseManageService.merge(mergeEvent);
} <span class="hljs-keyword">finally</span> {
distributeLockSupport.releaseLock(loanGenerateEvent.getUserId(), loanGenerateEvent.getBizId());
}
}
`}
`
这里主要有一个点需要提一下,就是加了分布式锁,避免并发导致重复。即一锁二查三更新。