51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Reactor WebFlux 与虚拟线程(Virtual Thread)的对比

1、概览 {#1概览}

本文将带你了解 Java 19 的 虚拟线程Reactor Webflux 的基本工作原理以及它们的优缺点。

2、代码示例 {#2代码示例}

假如我们开发的是一个电商后台,有如下 "负责计算和发布添加到购物车中的商品价格" 的函数。

class ProductService {
    private final String PRODUCT_ADDED_TO_CART_TOPIC = "product-added-to-cart";

    private final ProductRepository repository;
    private final DiscountService discountService;
    private final KafkaTemplate<String, ProductAddedToCartEvent> kafkaTemplate;

    // 构造函数

    public void addProductToCart(String productId, String cartId) {
        Product product = repository.findById(productId)
          .orElseThrow(() -> new IllegalArgumentException("not found!"));

        Price price = product.basePrice();
        if (product.category().isEligibleForDiscount()) {
            BigDecimal discount = discountService.discountForProduct(productId);
            price.setValue(price.getValue().subtract(discount));
        }

        var event = new ProductAddedToCartEvent(productId, price.getValue(), price.getCurrency(), cartId);
        kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event);
    }

}

如上所示,首先使用 MongoRepository 从 MongoDB 数据库中检索 Product。检索完成后,确定 Product 是否符合折扣条件。如果符合,则使用 DiscountService 执行 HTTP 请求,以确定该产品是否有可用的折扣。

最后,计算 Product 的最终价格。计算完成后,会发送一条 Kafka 消息,其中包含 productIdcartId 和计算出的价格。

3、WebFlux {#3webflux}

WebFlux 是一个用于构建异步、非阻塞和事件驱动应用的框架。它基于响应式编程原理,利用 FluxMono 类型来处理异步通信的复杂性。这些类型实现了发布/订阅设计模式,将数据的消费者和生产者解耦。

3.1、Reactive 库 {#31reactive-库}

Spring 生态系统中的许多模块都与 WebFlux 集成,用于响应式编程。

我们要使用到其中一些模块来把前面的代码重构为响应式。

例如,可以将 MongoRepository 转换为 ReactiveMongoRepository。这一改变意味着我们必须使用 Mono<Product> 而不是 Optional<Product>

Mono<Product> product = repository.findById(productId)
  .switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")));

同样,可以将 ProductService 改为异步和非阻塞的。例如,可以让它使用 WebClient 来执行 HTTP 请求,从而以 Mono<BigDecimal> 的形式返回折扣:

Mono<BigDecimal> discount = discountService.discountForProduct(productId);

3.2、不变性 {#32不变性}

在函数式和响应式编程范式中,不可变性始终优于可变数据。最初的方法涉及使用 Setter 来改变 Price 对象的值。然而,随着响应式化,需要重构 Price 对象并使其成为不可变的。

例如,可以引入一个专门的方法来应用折扣并生成一个新的 Price 实例,而不是修改现有的实例:

record Price(BigDecimal value, String currency) {  
    public Price applyDiscount(BigDecimal discount) {
        return new Price(value.subtract(discount), currency);
    }
}

现在,可以使用 WebFlux 的 map() 方法,根据折扣计算新价格:

Mono<Price> price = discountService.discountForProduct(productId)
  .map(discount -> price.applyDiscount(discount));

甚至可以在这里使用方法引用,以保持代码的紧凑性:

Mono<Price> price = discountService.discountForProduct(productId).map(price::applyDiscount);

3.3、函数式管道 {#33函数式管道}

通过 map()flatMap() 等方法,MonoFlux 遵循了 functormonad 模式。这样,就可以将用例描述为不可变数据的转换管道。

尝试确定用例中需要的转换步骤:

  • 从原始的 productId 开始
  • 将其转化为 Product
  • Product 来计算 Price
  • 使用 Price 来创建 event
  • 最后,在消息队列中发布 event

现在,重构代码,实现这个函数链:

void addProductToCart(String productId, String cartId) {
    Mono<Product> productMono = repository.findById(productId)
      .switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")));

    Mono<Price> priceMono = productMono.flatMap(product -> {
        if (product.category().isEligibleForDiscount()) {
            return discountService.discountForProduct(productId)
              .map(product.basePrice()::applyDiscount);
        }
        return Mono.just(product.basePrice());
    });

    Mono<ProductAddedToCartEvent> eventMono = priceMono.map(
      price -> new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId));

    eventMono.subscribe(event -> kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event));
}

现在,内联局部变量,以保持代码紧凑。此外,提取一个用于计算价格的函数,并在 flatMap() 中使用它:

void addProductToCart(String productId, String cartId) {
    repository.findById(productId)
      .switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")))
      .flatMap(this::computePrice)
      .map(price -> new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId))
      .subscribe(event -> kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event));
}

Mono<Price> computePrice(Product product) {
    if (product.category().isEligibleForDiscount()) {
        return discountService.discountForProduct(product.id())
          .map(product.basePrice()::applyDiscount);
    }
    return Mono.just(product.basePrice());
}

4、虚拟线程 {#4虚拟线程}

通过 Project Loom,Java 引入了虚拟线程(Virtual Threads)作为并行处理的替代解决方案。虚拟线程是由 Java 虚拟机(JVM)管理的轻量级用户模式线程。因此,它们特别适用于 I/O 操作,其中传统线程可能会花费大量时间等待外部资源。

与异步或响应式解决方案相比,虚拟线程能让我们继续使用 "每个请求一个线程" 的模式。换句话说,可以继续按顺序编写 "阻塞式" 代码,而无需将业务逻辑与响应式 API 混合在一起。

4.1、使用虚拟线程 {#41使用虚拟线程}

有几种方法可以利用虚拟线程来执行代码。对于单个方法,如前面示例中演示的方法,可以使用 startVirtualThread()。这个静态方法是最近添加到 Thread API 中的,它会在一个新的虚拟线程上执行一个 Runnable

public void addProductToCart(String productId, String cartId) {
    Thread.startVirtualThread(() -> computePriceAndPublishMessage(productId, cartId));
}

private void computePriceAndPublishMessage(String productId, String cartId) {
    // ...
}

另外,还可以使用新的静态工厂方法 Executors.newVirtualThreadPerTaskExecutor() 创建依赖虚拟线程的 ExecutorService

对于使用 Spring 6Spring Boot 3 的应用,可以利用新的 Executor 并配置 Spring,使其优先使用虚拟线程而不是平台线程。

4.2、兼容性 {#42兼容性}

虚拟线程通过使用更传统的同步编程模型来简化代码。因此,可以以类似于阻塞 I/O 操作的顺序方式编写代码。

还可以从普通的单线程代码无缝切换到虚拟线程,只需极少的改动,甚至无需改动。例如,在前面的例子中,只需使用静态工厂方法 startVirtualThread() 创建一个虚拟线程,并在其中执行逻辑:

void addProductToCart(String productId, String cartId) {
    Thread.startVirtualThread(() -> computePriceAndPublishMessage(productId, cartId));
}

void computePriceAndPublishMessage(String productId, String cartId) {
    Product product = repository.findById(productId)
      .orElseThrow(() -> new IllegalArgumentException("not found!"));

    Price price = computePrice(productId, product);

    var event = new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId);
    kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event);
}

Price computePrice(String productId, Product product) {
    if (product.category().isEligibleForDiscount()) {
        BigDecimal discount = discountService.discountForProduct(productId);
        return product.basePrice().applyDiscount(discount);
    }
    return product.basePrice();
}

4.3、可读性 {#43可读性}

采用 "每个请求一个线程" 的模式,可以更容易地理解和推理业务逻辑。这可以减少与响应式编程范式相关的认知压力(响应式编程全是回调或者监听,可读性实在是差)。

换句话说,虚拟线程允许将技术问题与业务逻辑干净利落地分开。因此,在实现业务用例时,不再需要外部 API。

5、总结 {#5总结}

本文介绍了在 Java 中实现高效并发和异步的两种方式,首先介绍了Reactor WebFlux 和响应式编程,然后介绍了虚拟线程。总的来说,虚拟线程兼容传统阻塞式代码,可以平稳过度。且比响应式代码更具有可读性。


Ref:https://www.baeldung.com/java-reactor-webflux-vs-virtual-threads

赞(1)
未经允许不得转载:工具盒子 » Reactor WebFlux 与虚拟线程(Virtual Thread)的对比