51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Spring Boot 启用虚拟线程

本文将带你了解什么是虚拟线程、虚拟线程是如何提高应用吞吐量的,以及如何为 Spring Boot 应用启用虚拟线程。

并发编程的演化 {#并发编程的演化}

线程 {#线程}

总所周知,线程(Thread)是计算机中的最小执行单元,由操作系统直接进行调度,每个线程都有自己的执行路径和执行状态,可以独立地运行和并发执行多个任务。

线程是一种重量级的资源,线程的创建、销毁以及在多个线程之间切换都需要耗费 CPU 时间,一个系统可以同时创建、调度的线程数量有限。所以,现在应用基本上都会使用 线程池 来解决这个问题,通过池化线程,可以减少线程频繁 创建销毁 的成本。

例如,Servlet 容器(Tomcat、Undertow、Jetty)的并发模型就是通过线程池,为每一个请求分配一个线程池中的线程进行处理。但是,一旦涉及到阻塞操作(IO、网络请求),当前线程就会被挂起进入等待状态,这个线程就不能去执行其他任务。这就导致了,使用传统线程池并发模型的服务器能同时处理的请求有限。

而,当代 Web 应用基本上都是 IO 密集形应用,请求中执行的业务往往涉及到与数据库进行交互、调用远程服务(Socket IO),本地磁盘文件读写等等,因此使用阻塞式线程是非常低效的。

异步非阻塞编程 {#异步非阻塞编程}

为了解决传统线程在执行 IO 操作时由于阻塞导致的低效,于是,开始有了一种 响应式异步非阻塞 的编程模型。

在 Java 界,这类优秀的框架很多,如 NettyWebFluxVert.x 等等。它们都提倡一个东西,那就是:异步非阻塞 ,只要是涉及到 IO、阻塞的地方,当前线程不会等待操作执行完毕,会立即返回去执行其他可执行的任务。这时候,就需要通过监听器(Listener),或者回调(Callback)来获得操作最终的执行结果。

以 Netty 为例,伪代码如下:

Channel channel = ...;
// 异步写入数据到 Socket
channel.write("Hello").addListener(future -> {
    if(future.isSuccess()) {
        // 写入成功 
    } else {
        // 写入失败
    }
});

其中 channel.write("Hello") 用于向 Socket 写入数据,这就是典型的阻塞操作,在传统阻塞式编程中,该方法就会阻塞,直到数据被完全写入到 Socket 中。但是,在 Netty 中,在 write 方法执行后线程就会立即返回一个 ChannelFuture 对象,不会等待写入完成,因此这个线程仍然可以继续执行其他可执行的任务。

最后,我们需要通过 write 方法所返回的 ChannelFuture 对象来监听写入的结果。执行写入操作和执行监听器的线程极有可能不是同一个

这种非阻塞编程,可以提高应用的吞吐量。没有线程阻塞,只需要少量线程就可以在 N 个请求之间来回切换执行,也就是说可以同时处理的请求就多了。

弊端也很明显,这种编程模型大大 增加了开发者的心智负担,凡是涉及到网络读写、阻塞操作的地方,都需要通过监听器和回调来获取到结果。我想你也不想维护下面这种代码吧(回调地狱):

foo.a(1, res1 -> {
  if (res1.succeeded()) {
    bar.b("abc", 1, res2 -> {
      if (res.succeeded()) {
         baz.c(res3 -> {
           dosomething(res1, res2, res3, res4 -> {
               // (...)
           });
         });
      }
    });
  }
});

协程 {#协程}

当代的编程语言为了解决上面的问题,引入了 协程(Coroutine) 的并发编程概念,例如 Kotlin 中的 Coroutine ,Golang 中的 Goroutine

协程是由程序进行调度执行的异步单元,也就是说成千上万个协程可以由一个普通的线程去调度执行。当协程执行到 IO 阻塞操作时它会被 "挂起",执行它的线程不会阻塞,会被调度到去执行其他的协程。当阻塞状态的协程执行完毕,从阻塞状态恢复后,它又可以被其他的线程所调度,继续执行。

一句话说明协程的好处就是:可以用阻塞式的编程风格,享受到由异步非阻塞带来的性能提升

Java 的虚拟线程 {#java-的虚拟线程}

Golang 大火后就一直被人拿来和 Java 比较,其中 Java 饱受诟病的就是它不支持 协程 这种轻量级的并发模型。

为了跟随潮流,Java 19 引入的一种轻量级线程,即 虚拟线程(Virtual Thread),并在 Java 21 中正式 GA。

虚拟线程是基于 Project Loom 的概念,通过将大量的虚拟线程映射到较少的实际线程上,实现了更高的并发性能。它采用了一种称为 Continuation 的机制,可以在执行过程中暂停和恢复线程的执行状态,以实现更高效的上下文切换。

终于,咱 Javaer 也有了属于自己的 协程

虚拟线程 {#虚拟线程}

来看看 Java 虚拟线程的基本使用和注意点。

启动协程的方式 {#启动协程的方式}

一、创建虚拟线程 Thread 对象,然后调用 start 执行:

// 创建虚拟线程
Thread virtual = Thread.ofVirtual().unstarted(() -> {
    System.out.println("我是虚拟线程:" + Thread.currentThread());
});

// 执行虚拟线程
virtual.start();
virtual.join();

// 输出如下:
// 我是虚拟线程:VirtualThread[#21]/runnable@ForkJoinPool-1-worker-1

二、通过 ExecutorService 执行:

// 创建协程 ExecutorService
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();

// 提交任务到虚拟线程
var future = executorService.submit(() -> {
    System.out.println("我是虚拟线程:" + Thread.currentThread());
});

// 阻塞,等待虚拟线程执行完毕
future.get();

// 输出如下:
// 我是虚拟线程:VirtualThread[#21]/runnable@ForkJoinPool-1-worker-1

三、通过 ThreadFactory 执行:

// 获取协程 ThreadFactory
ThreadFactory factory = Thread.ofVirtual().factory();

// 创建协程
Thread thread = factory.newThread(() -> {
    System.out.println("我是虚拟线程:" + Thread.currentThread());
});

// 运行
thread.start();

// 等待协程执行完毕
thread.join();

// 输出如下:
// 我是虚拟线程:VirtualThread[#21]/runnable@ForkJoinPool-1-worker-1

你会发现,虚拟线程平台线程 ,都是 Thread 对象,Java 推出的虚拟线程继续遵守向后兼容的承诺。

因此,我们可以无缝地把系统中所有的 平台线程 都替换为 虚拟线程,得到性能提升的同时大大减少了迁移的成本。

需要注意的地方 {#需要注意的地方}

不像 Golang,从一开始就设计、内置了 Goroutine。Java 经过这么多年的来的迭代发展,有它的历史包袱,更何况还要保证向后兼容,所以目前 Java 所推出的虚拟线程有一些需要注意的地方。

一、为每个并发任务创建一个虚拟线程,不要不池化虚拟线程 {#一为每个并发任务创建一个虚拟线程不要不池化虚拟线程}

虚拟线程很轻量,你应该为每一个任务创建一个虚拟线程来执行(哪怕任务很小、很短暂)。不建议池化虚拟线程,根据经验,如果你的应用中没有用到 10,000+ 以上的虚拟线程,那么不太可能从虚拟线程中获益。要么是负载太轻,不需要更高的吞吐量,要么是没有足够多的任务。

推荐使用 Executors.newVirtualThreadPerTaskExecutor() 返回的 ExecutorService 来执行异步任务。它并不使用线程池,它会为每个提交的任务创建一个新的虚拟线程来执行。

二、避免使用 synchronized 关键字进行同步 {#二避免使用-synchronized-关键字进行同步}

synchronized 代码块或方法会导致 JDK 的虚拟线程调度器阻塞一个宝贵的操作系统线程(即,平台线程),而如果阻塞操作是在同步代码块或方法之外进行的,则不会发生这种情况。我们称这种情况为 "pinning"。

Oracle 官方

简单理解就是在虚拟线程中使用 synchronized 会阻塞底层的平台线程。推荐 使用 ReentrantLock 代替 synchronized 关键字来进行同步。

三、尽量不要在虚拟线程中使用 ThreadLocal 存储大量对象 {#三尽量不要在虚拟线程中使用-threadlocal-存储大量对象}

虚拟线程是支持使用 ThreadLocal 来存储线程上下文数据的 。但是如前所述,虚拟线程实在是太过于轻量了,随随便便就可以创建数百万个,如果还在 ThreadLocal 存储大量的对象,这对系统来说说产生一定的负荷。

为了解决这个问题,Java 推出了 ScopedValue 的概念,但是目前还在 预览阶段

Spring Boot 启用虚拟线程 {#spring-boot-启用虚拟线程}

上面说了这么多,本节终于进入正题 如何在 Spring Boot 应用中开启虚拟线程?

创建一个 Spring Boot 应用 {#创建一个-spring-boot-应用}

创建一个最基础的 Spring Boot 应用,使用最新的 3.2.4 版本,以及 JDK 21

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.4</version>
</parent>

<properties>
    <java.version>21</java.version>
</properties>

<dependencies>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

创建 Application 启动类:

package cn.springdoc.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
@RequestMapping("/")
public class DemoApplication {

    static final Logger log = LoggerFactory.getLogger(DemoApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @GetMapping(produces = MediaType.TEXT_PLAIN_VALUE)
    public String index () {
        
        log.info("收到请求,执行线程是:{}", Thread.currentThread());
        
        return "Hello springdoc.cn";
    }
}

如上,DemoApplication 也是一个 Controller。我们在 index 方法中输出了执行当前请求的线程信息。

默认情况下,Spring Boot 中的嵌入式服务器使用的是传统的线程池模型。启动应用,打开浏览器访问 http://localhost:8080/,为了效果直观,可以多按几次 F5,输出日志如下:

cn.springdoc.demo.DemoApplication        : 收到请求,执行线程是:Thread[#41,http-nio-8080-exec-6,5,main]
cn.springdoc.demo.DemoApplication        : 收到请求,执行线程是:Thread[#43,http-nio-8080-exec-8,5,main]
cn.springdoc.demo.DemoApplication        : 收到请求,执行线程是:Thread[#42,http-nio-8080-exec-7,5,main]
cn.springdoc.demo.DemoApplication        : 收到请求,执行线程是:Thread[#44,http-nio-8080-exec-9,5,main]

可以看到,处理请求的是线程池中的线程,这正是传统的并发模型。

启用虚拟线程 {#启用虚拟线程}

接下来,做一些小小的修改。让底层的 Tomcat 服务器使用虚拟线程来处理每一个请求。

application.yaml 中添加配置,启用虚拟线程:

spring:
  threads:
    virtual:
      enabled: true # 启用虚拟线程

然后,添加配置类,为 Tomcat 指定其用于处理请求的 Executor

package cn.springdoc.demo.configuration;

import java.util.concurrent.Executors;

import org.springframework.boot.web.embedded.tomcat.TomcatProtocolHandlerCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 
 * Tomcat 配置
 * 
 */
@Configuration
public class TomcatConfiguration {

    @Bean
    public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() {
        
        return protocolHandler -> {
            // 使用虚拟线程来处理每一个请求
            protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
        };
    }
}

配置 Tomcat 使用 Executors.newVirtualThreadPerTaskExecutor() 返回的 ExecutorService 来处理每一个请求任务,它会为每个请求创建一个虚拟线程来执行。

一切 OK,就这么简单。

重启应用,再次访问主页 http://localhost:8080/

cn.springdoc.demo.DemoApplication        : 收到请求,执行线程是:VirtualThread[#39]/runnable@ForkJoinPool-1-worker-1
cn.springdoc.demo.DemoApplication        : 收到请求,执行线程是:VirtualThread[#45]/runnable@ForkJoinPool-1-worker-1
cn.springdoc.demo.DemoApplication        : 收到请求,执行线程是:VirtualThread[#46]/runnable@ForkJoinPool-1-worker-1
cn.springdoc.demo.DemoApplication        : 收到请求,执行线程是:VirtualThread[#47]/runnable@ForkJoinPool-1-worker-1

怎么样?有感觉没,上述日志中的 VirtualThread... 正说明了当前请求是在虚拟线程中执行的。

自定义虚拟线程的名称 {#自定义虚拟线程的名称}

如果你看到了完整的 Spring Boot 日志,你会发现日志中的虚拟线程的线程名称是空的:

虚拟线程的名称为空

这是因为 Spring Boot 默认获取的是 Threadname 属性作为线程名称,而默认情况下虚拟线程的 name 为空。

要解决这个问题,可以修改一些上面的 TomcatConfiguration 配置类,为虚拟线程指定名称:

@Bean
public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() {
    
    return protocolHandler -> {
        // 创建 OfVirtual,指定虚拟线程名称的前缀,以及线程编号起始值
        OfVirtual ofVirtual = Thread.ofVirtual().name("virtualthread#", 1);
        // 获取虚拟线程池工厂
        ThreadFactory factory = ofVirtual.factory();
        // 通过该工厂,创建 ExecutorService
        protocolHandler.setExecutor(Executors.newThreadPerTaskExecutor(factory));
    };
}

重启应用,再次访问主页:

虚拟线程的名称前缀为virtualthread#

这次虚拟线程就有了名称,每次创建一个虚拟线程它的编号都会 + 1。

Undertow 服务器开启虚拟线程 {#undertow-服务器开启虚拟线程}

Spring Boot 默认使用 Tomcat 作为嵌入式服务器,也有很多人喜欢用 Undertow。

Undertow 也支持使用虚拟线程,配置类如下:

package cn.springdoc.demo.configuration;

import java.util.concurrent.Executors;

import org.springframework.boot.web.embedded.undertow.UndertowServletWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.context.annotation.Configuration;

@Configuration
public class UndertowConfiguration implements WebServerFactoryCustomizer<UndertowServletWebServerFactory> {

    @Override
    public void customize(UndertowServletWebServerFactory factory) {
        
        factory.addDeploymentInfoCustomizers(deploymentInfo -> {
            deploymentInfo.setExecutor(Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("virtualthread#", 1).factory()));
        });
    }
}

至于怎么切换 Spring Boot 的服务器为 Undertow 这里就不多讲述,你可以参阅 这篇文章

为 @Async 任务启用虚拟线程 {#为-async-任务启用虚拟线程}

不仅仅是请求, Spring Boot 中的异步任务,也可以使用虚拟线程来执行。

添加 AsyncConfigurer 配置类,为 @Async 任务指定 Executor

package cn.springdoc.demo.configuration;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;

/**
 * 
 * 异步任务
 * 
 */
@Configuration
@EnableAsync  // 开启异步任务
public class AsyncTaskConfiguration implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        return new TaskExecutorAdapter(Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("virtual-async#", 1).factory()));
    }
}

基本上和服务器的配置如出一辙,只要是系统中涉及到可以设置 Executor 的地方,你都可以尝试把它替换为虚拟线程的实现。

最后 {#最后}

Java 引入的虚拟线程非常适合 IO 密集型应用,通过少量的平台线程去调度成千上万的虚拟线程能大大提升应用的吞吐量。

而且,虚拟线程和平台线程都是同一个 Thread 对象,大大降低了迁移成本,只需要小小的修改就可以享受虚到拟线程来的性能飙升。

赞(1)
未经允许不得转载:工具盒子 » Spring Boot 启用虚拟线程