1、简介 {#1简介}
Spring Webflux 是一个非阻塞的 Web 框架,从底层开始构建,旨在利用多核、下一代处理器的优势,处理大量并发连接(既然是非阻塞框架,线程就不应该被阻塞)。
本文将带你了解在使用 Spring Webflux 时常犯的一个错误。
2、Spring Webflux 线程模型 {#2spring-webflux-线程模型}
为了更好地理解这个问题,我们需要了解 Spring Webflux 的线程模型。
在 Spring Webflux 中,一个小型工作线程池负责处理传入请求。这与 Servlet 模型不同,在 Servlet 模型中,每个请求都有一个专用线程。因此,框架会保护(隔离)这些接受(处理)请求的线程。
理解了这一点后,继续往下看。
3、通过线程阻塞了解 IllegalStateException {#3通过线程阻塞了解-illegalstateexception}
让我们通过一个示例来了解 Spring Webflux 中何时以及为何会出现异常:"java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread"。
以文件搜索 API 为例。该应用从文件系统读取文件,并在文件中搜索用户提供的文本。
3.1、FileService {#31fileservice}
先定义一个 FileService
类,它能以字符串形式读取文件内容:
@Service
public class FileService {
@Value("${files.base.dir:/tmp/bael-7724}")
private String filesBaseDir;
public Mono<String> getFileContentAsString(String fileName) {
return DataBufferUtils.read(Paths.get(filesBaseDir + "/" + fileName), DefaultDataBufferFactory.sharedInstance, DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY)
.map(dataBuffer -> dataBuffer.toString(StandardCharsets.UTF_8))
.reduceWith(StringBuilder::new, StringBuilder::append)
.map(StringBuilder::toString);
}
}
注意,FileService
是以响应式(异步)从文件系统读取文件的。
3.2、FileContentSearchService {#32filecontentsearchservice}
使用 FileService
来实现文件搜索服务:
@Service
public class FileContentSearchService {
@Autowired
private FileService fileService;
public Mono<Boolean> blockingSearch(String fileName, String searchTerm) {
String fileContent = fileService
.getFileContentAsString(fileName)
.doOnNext(content -> ThreadLogger.log("1. BlockingSearch"))
.block();
boolean isSearchTermPresent = fileContent.contains(searchTerm);
return Mono.just(isSearchTermPresent);
}
}
文件搜索服务会根据是否在文件中找到搜索词返回一个 boolean
值。为此,我们调用了 FileService
的 getFileContentAsString()
方法。由于我们以异步方式(即 Mono<String>
)获取结果,因此我们调用 block()
来获取 String
值。然后,检查 fileContent
是否包含 searchTerm
。最后,将结果封装在 Mono
中并返回。
3.3、FileController {#33filecontroller}
最后,定义 FileController
,它调用了 FileContentSearchService
的 blockingSearch()
方法:
@RestController
@RequestMapping("bael7724/v1/files")
public class FileController {
...
@GetMapping(value = "/{name}/blocking-search")
Mono<Boolean> blockingSearch(@PathVariable("name") String fileName, @RequestParam String term) {
return fileContentSearchService.blockingSearch(fileName, term);
}
}
3.4、重现异常 {#34重现异常}
我们可以看到,Controller
调用了 FileContentSearchService
的方法,而 FileContentSearchService
又调用了 block()
方法。由于这是在一个接受请求的线程上,如果按照当前的设计调用我们的 API,将遇到我们想要避免的臭名昭著的异常:
12:28:51.610 [reactor-http-epoll-2] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [ea98e542-1] 500 Server Error for HTTP GET "/bael7724/v1/files/a/blocking-search?term=a"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-2
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ com.baeldung.filters.TraceWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ com.baeldung.filters.ExceptionalTraceFilter [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP GET "/bael7724/v1/files/a/blocking-search?term=a" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
at reactor.core.publisher.Mono.block(Mono.java:1712)
at com.baeldung.bael7724.service.FileContentSearchService.blockingSearch(FileContentSearchService.java:20)
at com.baeldung.bael7724.controller.FileController.blockingSearch(FileController.java:35)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
3.5、根本原因 {#35根本原因}
导致此异常的根本原因是在接受请求的线程上调用了 block()
。在上面的示例代码中,block()
方法是在接受请求的线程池中的一个线程上调用的。具体地说,是在标记为 "仅限非阻塞操作" 的线程上调用了 block()
,也就是在实现了 Reactor
的 NonBlocking
记接口的线程上调用了 block()
,比如那些由 Schedulers.parallel()
启动的线程。
4、解决办法 {#4解决办法}
来看看如何解决这一异常。
4.1、拥抱响应式业务 {#41拥抱响应式业务}
惯用的方法是使用响应式操作,而不是调用 block()
。
更新代码,使用 map()
操作将 String
转换为 Boolean
值:
public Mono<Boolean> nonBlockingSearch(String fileName, String searchTerm) {
return fileService.getFileContentAsString(fileName)
.doOnNext(content -> ThreadLogger.log("1. NonBlockingSearch"))
.map(content -> content.contains(searchTerm))
.doOnNext(content -> ThreadLogger.log("2. NonBlockingSearch"));
}
这样,就完全不需要调用 block()
了。当我们运行上述方法时,会发现线程上下文如下:
[1. NonBlockingSearch] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506215299Z
[2. NonBlockingSearch] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506361786Z
[1. In Controller] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506465805Z
[2. In Controller] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506543145Z
上述日志表明,我们在接受请求的同一个线程池中执行了操作。
注意 ,尽管我们没有遇到异常,但最好还是在不同的线程池中运行 I/O 操作,例如从文件中读取数据。
4.2、在 Schedulers.boundedElastic() 线程池上阻塞 {#42在-schedulersboundedelastic-线程池上阻塞}
假设由于某种原因,我们无法避开 block()
。那该怎么做呢?
我们的结论是,当在接受请求的线程池上调用 block()
时,异常就发生了。因此,要调用 block()
,需要切换线程池:
public Mono<Boolean> workableBlockingSearch(String fileName, String searchTerm) {
return Mono.just("")
.doOnNext(s -> ThreadLogger.log("1. WorkableBlockingSearch"))
.publishOn(Schedulers.boundedElastic())
.doOnNext(s -> ThreadLogger.log("2. WorkableBlockingSearch"))
.map(s -> fileService.getFileContentAsString(fileName)
.block()
.contains(searchTerm))
.doOnNext(s -> ThreadLogger.log("3. WorkableBlockingSearch"));
}
要切换线程池,Spring Webflux 提供了两个操作 publishOn()
和 subscribeOn()
。
我们使用 publishOn()
,它可以为 publishOn()
之后的操作更改线程,而不会影响订阅或上游操作。由于线程池现在已切换为 Schedulers.boundedElastic()
线程池,因此可以调用 block()
。
现在,如果我们运行 workableBlockingSearch()
方法,就会看到以下线程日志:
[1. WorkableBlockingSearch] ThreadName: parallel-2, Time: 2024-06-17T07:40:59.440562518Z
[2. WorkableBlockingSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.442161018Z
[3. WorkableBlockingSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.442891230Z
[1. In Controller] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.443058091Z
[2. In Controller] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.443181770Z
可以看到,从第 2 项开始,操作确实发生在有 Schedulers.boundedElastic()
线程池上,因此没有发生 IllegalStateException
异常。
4.3、注意事项 {#43注意事项}
这种阻塞解决方案有一些注意事项。
在调用 block()
时,可能会在很多方面出错。
举一个例子,尽管我们使用了 Scheduler
来切换线程上下文,但它并没有按照我们期望的方式运行:
public Mono<Boolean> incorrectUseOfSchedulersSearch(String fileName, String searchTerm) {
String fileContent = fileService.getFileContentAsString(fileName)
.doOnNext(content -> ThreadLogger.log("1. IncorrectUseOfSchedulersSearch"))
.publishOn(Schedulers.boundedElastic())
.doOnNext(content -> ThreadLogger.log("2. IncorrectUseOfSchedulersSearch"))
.block();
boolean isSearchTermPresent = fileContent.contains(searchTerm);
return Mono.just(isSearchTermPresent);
}
示例如下,我们使用了 publishOn()
,但 block()
方法仍会导致异常。运行上述代码时,可以看到以下日志。
[1. IncorrectUseOfSchedulersSearch] ThreadName: Thread-4, Time: 2024-06-17T08:57:02.490298417Z
[2. IncorrectUseOfSchedulersSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T08:57:02.491870410Z
14:27:02.495 [parallel-1] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [53e4bce1] 500 Server Error for HTTP GET "/bael7724/v1/files/robots.txt/incorrect-use-of-schedulers-search?term=r-"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ com.baeldung.filters.TraceWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ com.baeldung.filters.ExceptionalTraceFilter [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP GET "/bael7724/v1/files/robots.txt/incorrect-use-of-schedulers-search?term=r-" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
at reactor.core.publisher.Mono.block(Mono.java:1712)
at com.baeldung.bael7724.service.FileContentSearchService.incorrectUseOfSchedulersSearch(FileContentSearchService.java:64)
at com.baeldung.bael7724.controller.FileController.incorrectUseOfSchedulersSearch(FileController.java:48)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
这表明第二条日志语句确实在 Schedulers.boundedElastic()
线程池上运行。但是,仍然遇到了异常。原因是 block()
仍在同一个接受请求的线程池上运行。
再来看看另一个注意事项。即使我们切换了线程池,也不能使用并行线程池,即 Schedulers.parallel()
。如前所述,某些线程池不允许在其线程上调用 block()
,并行线程池就是其中之一。
最后,在我们的示例中我们仅使用了 Schedulers.boundedElastic()
。相反,我们也可以通过 Schedulers.fromExecutorService()
使用任何自定义线程池。
5、总结 {#5总结}
总之,要有效解决 Spring Webflux 在使用 block()
等阻塞操作时出现 IllegalStateException
的问题,我们应该采用非阻塞的响应式方法。通过利用 map()
等响应式操作符,我们可以在同一个响应式线程池上执行操作,从而无需显式 block()
。如果无法避免 block()
,那么将执行上下文切换到 boundedElastic
Scheduler 或使用 publishOn()
的自定义线程池,就可以将这些操作与接受请求的响应式线程池隔离开来,从而防止异常的发生。
必须注意有些线程池不支持阻塞调用,并确保应用正确的上下文切换,以保持应用的稳定性和性能。
Ref:https://www.baeldung.com/java-fix-illegalstateexception-blocking