51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

解决 Java 异常 “java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking”

1、简介 {#1简介}

Spring Webflux 是一个非阻塞的 Web 框架,从底层开始构建,旨在利用多核、下一代处理器的优势,处理大量并发连接(既然是非阻塞框架,线程就不应该被阻塞)。

本文将带你了解在使用 Spring Webflux 时常犯的一个错误。

2、Spring Webflux 线程模型 {#2spring-webflux-线程模型}

为了更好地理解这个问题,我们需要了解 Spring Webflux 的线程模型。

在 Spring Webflux 中,一个小型工作线程池负责处理传入请求。这与 Servlet 模型不同,在 Servlet 模型中,每个请求都有一个专用线程。因此,框架会保护(隔离)这些接受(处理)请求的线程。

理解了这一点后,继续往下看。

3、通过线程阻塞了解 IllegalStateException {#3通过线程阻塞了解-illegalstateexception}

让我们通过一个示例来了解 Spring Webflux 中何时以及为何会出现异常:"java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread"。

以文件搜索 API 为例。该应用从文件系统读取文件,并在文件中搜索用户提供的文本。

3.1、FileService {#31fileservice}

先定义一个 FileService 类,它能以字符串形式读取文件内容:

@Service
public class FileService {
    @Value("${files.base.dir:/tmp/bael-7724}")
    private String filesBaseDir;

    public Mono<String> getFileContentAsString(String fileName) {
        return DataBufferUtils.read(Paths.get(filesBaseDir + "/" + fileName), DefaultDataBufferFactory.sharedInstance, DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY)
          .map(dataBuffer -> dataBuffer.toString(StandardCharsets.UTF_8))
          .reduceWith(StringBuilder::new, StringBuilder::append)
          .map(StringBuilder::toString);
    }
}

注意,FileService 是以响应式(异步)从文件系统读取文件的。

3.2、FileContentSearchService {#32filecontentsearchservice}

使用 FileService 来实现文件搜索服务:

@Service
public class FileContentSearchService {
    @Autowired
    private FileService fileService;

    public Mono<Boolean> blockingSearch(String fileName, String searchTerm) {
        String fileContent = fileService
          .getFileContentAsString(fileName)
          .doOnNext(content -> ThreadLogger.log("1. BlockingSearch"))
          .block();

        boolean isSearchTermPresent = fileContent.contains(searchTerm);

        return Mono.just(isSearchTermPresent);
    }
}

文件搜索服务会根据是否在文件中找到搜索词返回一个 boolean 值。为此,我们调用了 FileServicegetFileContentAsString() 方法。由于我们以异步方式(即 Mono<String>)获取结果,因此我们调用 block() 来获取 String 值。然后,检查 fileContent 是否包含 searchTerm。最后,将结果封装在 Mono 中并返回。

3.3、FileController {#33filecontroller}

最后,定义 FileController,它调用了 FileContentSearchServiceblockingSearch() 方法:

@RestController
@RequestMapping("bael7724/v1/files")
public class FileController {
    ...
    @GetMapping(value = "/{name}/blocking-search")
    Mono<Boolean> blockingSearch(@PathVariable("name") String fileName, @RequestParam String term) {
        return fileContentSearchService.blockingSearch(fileName, term);
    }
}

3.4、重现异常 {#34重现异常}

我们可以看到,Controller 调用了 FileContentSearchService 的方法,而 FileContentSearchService 又调用了 block() 方法。由于这是在一个接受请求的线程上,如果按照当前的设计调用我们的 API,将遇到我们想要避免的臭名昭著的异常:

12:28:51.610 [reactor-http-epoll-2] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [ea98e542-1]  500 Server Error for HTTP GET "/bael7724/v1/files/a/blocking-search?term=a"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-2
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ com.baeldung.filters.TraceWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ com.baeldung.filters.ExceptionalTraceFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ HTTP GET "/bael7724/v1/files/a/blocking-search?term=a" [ExceptionHandlingWebHandler]
Original Stack Trace:
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
	at reactor.core.publisher.Mono.block(Mono.java:1712)
	at com.baeldung.bael7724.service.FileContentSearchService.blockingSearch(FileContentSearchService.java:20)
	at com.baeldung.bael7724.controller.FileController.blockingSearch(FileController.java:35)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)

3.5、根本原因 {#35根本原因}

导致此异常的根本原因是在接受请求的线程上调用了 block()。在上面的示例代码中,block() 方法是在接受请求的线程池中的一个线程上调用的。具体地说,是在标记为 "仅限非阻塞操作" 的线程上调用了 block(),也就是在实现了 ReactorNonBlocking 记接口的线程上调用了 block(),比如那些由 Schedulers.parallel() 启动的线程。

4、解决办法 {#4解决办法}

来看看如何解决这一异常。

4.1、拥抱响应式业务 {#41拥抱响应式业务}

惯用的方法是使用响应式操作,而不是调用 block()

更新代码,使用 map() 操作将 String 转换为 Boolean 值:

public Mono<Boolean> nonBlockingSearch(String fileName, String searchTerm) {
    return fileService.getFileContentAsString(fileName)
      .doOnNext(content -> ThreadLogger.log("1. NonBlockingSearch"))
      .map(content -> content.contains(searchTerm))
      .doOnNext(content -> ThreadLogger.log("2. NonBlockingSearch"));
}

这样,就完全不需要调用 block() 了。当我们运行上述方法时,会发现线程上下文如下:

[1. NonBlockingSearch] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506215299Z
[2. NonBlockingSearch] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506361786Z
[1. In Controller] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506465805Z
[2. In Controller] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506543145Z

上述日志表明,我们在接受请求的同一个线程池中执行了操作。

注意 ,尽管我们没有遇到异常,但最好还是在不同的线程池中运行 I/O 操作,例如从文件中读取数据。

4.2、在 Schedulers.boundedElastic() 线程池上阻塞 {#42在-schedulersboundedelastic-线程池上阻塞}

假设由于某种原因,我们无法避开 block()。那该怎么做呢?

我们的结论是,当在接受请求的线程池上调用 block() 时,异常就发生了。因此,要调用 block(),需要切换线程池:

public Mono<Boolean> workableBlockingSearch(String fileName, String searchTerm) {
    return Mono.just("")
      .doOnNext(s -> ThreadLogger.log("1. WorkableBlockingSearch"))
      .publishOn(Schedulers.boundedElastic())
      .doOnNext(s -> ThreadLogger.log("2. WorkableBlockingSearch"))
      .map(s -> fileService.getFileContentAsString(fileName)
        .block()
        .contains(searchTerm))
      .doOnNext(s -> ThreadLogger.log("3. WorkableBlockingSearch"));
}

要切换线程池,Spring Webflux 提供了两个操作 publishOn()subscribeOn()

我们使用 publishOn(),它可以为 publishOn() 之后的操作更改线程,而不会影响订阅或上游操作。由于线程池现在已切换为 Schedulers.boundedElastic() 线程池,因此可以调用 block()

现在,如果我们运行 workableBlockingSearch() 方法,就会看到以下线程日志:

[1. WorkableBlockingSearch] ThreadName: parallel-2, Time: 2024-06-17T07:40:59.440562518Z
[2. WorkableBlockingSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.442161018Z
[3. WorkableBlockingSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.442891230Z
[1. In Controller] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.443058091Z
[2. In Controller] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.443181770Z

可以看到,从第 2 项开始,操作确实发生在有 Schedulers.boundedElastic() 线程池上,因此没有发生 IllegalStateException 异常。

4.3、注意事项 {#43注意事项}

这种阻塞解决方案有一些注意事项。

在调用 block() 时,可能会在很多方面出错。

举一个例子,尽管我们使用了 Scheduler 来切换线程上下文,但它并没有按照我们期望的方式运行:

public Mono<Boolean> incorrectUseOfSchedulersSearch(String fileName, String searchTerm) {
    String fileContent = fileService.getFileContentAsString(fileName)
      .doOnNext(content -> ThreadLogger.log("1. IncorrectUseOfSchedulersSearch"))
      .publishOn(Schedulers.boundedElastic())
      .doOnNext(content -> ThreadLogger.log("2. IncorrectUseOfSchedulersSearch"))
      .block();

    boolean isSearchTermPresent = fileContent.contains(searchTerm);

    return Mono.just(isSearchTermPresent);
}

示例如下,我们使用了 publishOn(),但 block() 方法仍会导致异常。运行上述代码时,可以看到以下日志。

[1. IncorrectUseOfSchedulersSearch] ThreadName: Thread-4, Time: 2024-06-17T08:57:02.490298417Z
[2. IncorrectUseOfSchedulersSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T08:57:02.491870410Z
14:27:02.495 [parallel-1] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [53e4bce1]  500 Server Error for HTTP GET "/bael7724/v1/files/robots.txt/incorrect-use-of-schedulers-search?term=r-"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ com.baeldung.filters.TraceWebFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ com.baeldung.filters.ExceptionalTraceFilter [DefaultWebFilterChain]
    *__checkpoint ⇢ HTTP GET "/bael7724/v1/files/robots.txt/incorrect-use-of-schedulers-search?term=r-" [ExceptionHandlingWebHandler]
Original Stack Trace:
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
	at reactor.core.publisher.Mono.block(Mono.java:1712)
	at com.baeldung.bael7724.service.FileContentSearchService.incorrectUseOfSchedulersSearch(FileContentSearchService.java:64)
	at com.baeldung.bael7724.controller.FileController.incorrectUseOfSchedulersSearch(FileController.java:48)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)

这表明第二条日志语句确实在 Schedulers.boundedElastic() 线程池上运行。但是,仍然遇到了异常。原因是 block() 仍在同一个接受请求的线程池上运行。

再来看看另一个注意事项。即使我们切换了线程池,也不能使用并行线程池,即 Schedulers.parallel()。如前所述,某些线程池不允许在其线程上调用 block(),并行线程池就是其中之一。

最后,在我们的示例中我们仅使用了 Schedulers.boundedElastic()。相反,我们也可以通过 Schedulers.fromExecutorService() 使用任何自定义线程池。

5、总结 {#5总结}

总之,要有效解决 Spring Webflux 在使用 block() 等阻塞操作时出现 IllegalStateException 的问题,我们应该采用非阻塞的响应式方法。通过利用 map() 等响应式操作符,我们可以在同一个响应式线程池上执行操作,从而无需显式 block()。如果无法避免 block(),那么将执行上下文切换到 boundedElastic Scheduler 或使用 publishOn() 的自定义线程池,就可以将这些操作与接受请求的响应式线程池隔离开来,从而防止异常的发生。

必须注意有些线程池不支持阻塞调用,并确保应用正确的上下文切换,以保持应用的稳定性和性能。


Ref:https://www.baeldung.com/java-fix-illegalstateexception-blocking

赞(2)
未经允许不得转载:工具盒子 » 解决 Java 异常 “java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking”