1、概览 {#1概览}
本文将带你了解 Java 19 的 虚拟线程 和 Reactor Webflux 的基本工作原理以及它们的优缺点。
2、代码示例 {#2代码示例}
假如我们开发的是一个电商后台,有如下 "负责计算和发布添加到购物车中的商品价格" 的函数。
class ProductService {
private final String PRODUCT_ADDED_TO_CART_TOPIC = "product-added-to-cart";
private final ProductRepository repository;
private final DiscountService discountService;
private final KafkaTemplate<String, ProductAddedToCartEvent> kafkaTemplate;
// 构造函数
public void addProductToCart(String productId, String cartId) {
Product product = repository.findById(productId)
.orElseThrow(() -> new IllegalArgumentException("not found!"));
Price price = product.basePrice();
if (product.category().isEligibleForDiscount()) {
BigDecimal discount = discountService.discountForProduct(productId);
price.setValue(price.getValue().subtract(discount));
}
var event = new ProductAddedToCartEvent(productId, price.getValue(), price.getCurrency(), cartId);
kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event);
}
}
如上所示,首先使用 MongoRepository
从 MongoDB 数据库中检索 Product
。检索完成后,确定 Product
是否符合折扣条件。如果符合,则使用 DiscountService
执行 HTTP 请求,以确定该产品是否有可用的折扣。
最后,计算 Product
的最终价格。计算完成后,会发送一条 Kafka 消息,其中包含 productId
、cartId
和计算出的价格。
3、WebFlux {#3webflux}
WebFlux 是一个用于构建异步、非阻塞和事件驱动应用的框架。它基于响应式编程原理,利用 Flux
和 Mono
类型来处理异步通信的复杂性。这些类型实现了发布/订阅设计模式,将数据的消费者和生产者解耦。
3.1、Reactive 库 {#31reactive-库}
Spring 生态系统中的许多模块都与 WebFlux 集成,用于响应式编程。
我们要使用到其中一些模块来把前面的代码重构为响应式。
例如,可以将 MongoRepository
转换为 ReactiveMongoRepository
。这一改变意味着我们必须使用 Mono<Product>
而不是 Optional<Product>
:
Mono<Product> product = repository.findById(productId)
.switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")));
同样,可以将 ProductService
改为异步和非阻塞的。例如,可以让它使用 WebClient
来执行 HTTP 请求,从而以 Mono<BigDecimal>
的形式返回折扣:
Mono<BigDecimal> discount = discountService.discountForProduct(productId);
3.2、不变性 {#32不变性}
在函数式和响应式编程范式中,不可变性始终优于可变数据。最初的方法涉及使用 Setter 来改变 Price
对象的值。然而,随着响应式化,需要重构 Price
对象并使其成为不可变的。
例如,可以引入一个专门的方法来应用折扣并生成一个新的 Price
实例,而不是修改现有的实例:
record Price(BigDecimal value, String currency) {
public Price applyDiscount(BigDecimal discount) {
return new Price(value.subtract(discount), currency);
}
}
现在,可以使用 WebFlux 的 map()
方法,根据折扣计算新价格:
Mono<Price> price = discountService.discountForProduct(productId)
.map(discount -> price.applyDiscount(discount));
甚至可以在这里使用方法引用,以保持代码的紧凑性:
Mono<Price> price = discountService.discountForProduct(productId).map(price::applyDiscount);
3.3、函数式管道 {#33函数式管道}
通过 map()
和 flatMap()
等方法,Mono
和 Flux
遵循了 functor 和 monad 模式。这样,就可以将用例描述为不可变数据的转换管道。
尝试确定用例中需要的转换步骤:
- 从原始的
productId
开始 - 将其转化为
Product
- 用
Product
来计算Price
- 使用
Price
来创建event
- 最后,在消息队列中发布
event
现在,重构代码,实现这个函数链:
void addProductToCart(String productId, String cartId) {
Mono<Product> productMono = repository.findById(productId)
.switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")));
Mono<Price> priceMono = productMono.flatMap(product -> {
if (product.category().isEligibleForDiscount()) {
return discountService.discountForProduct(productId)
.map(product.basePrice()::applyDiscount);
}
return Mono.just(product.basePrice());
});
Mono<ProductAddedToCartEvent> eventMono = priceMono.map(
price -> new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId));
eventMono.subscribe(event -> kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event));
}
现在,内联局部变量,以保持代码紧凑。此外,提取一个用于计算价格的函数,并在 flatMap()
中使用它:
void addProductToCart(String productId, String cartId) {
repository.findById(productId)
.switchIfEmpty(Mono.error(() -> new IllegalArgumentException("not found!")))
.flatMap(this::computePrice)
.map(price -> new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId))
.subscribe(event -> kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event));
}
Mono<Price> computePrice(Product product) {
if (product.category().isEligibleForDiscount()) {
return discountService.discountForProduct(product.id())
.map(product.basePrice()::applyDiscount);
}
return Mono.just(product.basePrice());
}
4、虚拟线程 {#4虚拟线程}
通过 Project Loom,Java 引入了虚拟线程(Virtual Threads)作为并行处理的替代解决方案。虚拟线程是由 Java 虚拟机(JVM)管理的轻量级用户模式线程。因此,它们特别适用于 I/O 操作,其中传统线程可能会花费大量时间等待外部资源。
与异步或响应式解决方案相比,虚拟线程能让我们继续使用 "每个请求一个线程" 的模式。换句话说,可以继续按顺序编写 "阻塞式" 代码,而无需将业务逻辑与响应式 API 混合在一起。
4.1、使用虚拟线程 {#41使用虚拟线程}
有几种方法可以利用虚拟线程来执行代码。对于单个方法,如前面示例中演示的方法,可以使用 startVirtualThread()
。这个静态方法是最近添加到 Thread
API 中的,它会在一个新的虚拟线程上执行一个 Runnable
:
public void addProductToCart(String productId, String cartId) {
Thread.startVirtualThread(() -> computePriceAndPublishMessage(productId, cartId));
}
private void computePriceAndPublishMessage(String productId, String cartId) {
// ...
}
另外,还可以使用新的静态工厂方法 Executors.newVirtualThreadPerTaskExecutor()
创建依赖虚拟线程的 ExecutorService
。
对于使用 Spring 6 和 Spring Boot 3 的应用,可以利用新的
Executor
并配置 Spring,使其优先使用虚拟线程而不是平台线程。
4.2、兼容性 {#42兼容性}
虚拟线程通过使用更传统的同步编程模型来简化代码。因此,可以以类似于阻塞 I/O 操作的顺序方式编写代码。
还可以从普通的单线程代码无缝切换到虚拟线程,只需极少的改动,甚至无需改动。例如,在前面的例子中,只需使用静态工厂方法 startVirtualThread()
创建一个虚拟线程,并在其中执行逻辑:
void addProductToCart(String productId, String cartId) {
Thread.startVirtualThread(() -> computePriceAndPublishMessage(productId, cartId));
}
void computePriceAndPublishMessage(String productId, String cartId) {
Product product = repository.findById(productId)
.orElseThrow(() -> new IllegalArgumentException("not found!"));
Price price = computePrice(productId, product);
var event = new ProductAddedToCartEvent(productId, price.value(), price.currency(), cartId);
kafkaTemplate.send(PRODUCT_ADDED_TO_CART_TOPIC, cartId, event);
}
Price computePrice(String productId, Product product) {
if (product.category().isEligibleForDiscount()) {
BigDecimal discount = discountService.discountForProduct(productId);
return product.basePrice().applyDiscount(discount);
}
return product.basePrice();
}
4.3、可读性 {#43可读性}
采用 "每个请求一个线程" 的模式,可以更容易地理解和推理业务逻辑。这可以减少与响应式编程范式相关的认知压力(响应式编程全是回调或者监听,可读性实在是差)。
换句话说,虚拟线程允许将技术问题与业务逻辑干净利落地分开。因此,在实现业务用例时,不再需要外部 API。
5、总结 {#5总结}
本文介绍了在 Java 中实现高效并发和异步的两种方式,首先介绍了Reactor WebFlux 和响应式编程,然后介绍了虚拟线程。总的来说,虚拟线程兼容传统阻塞式代码,可以平稳过度。且比响应式代码更具有可读性。
Ref:https://www.baeldung.com/java-reactor-webflux-vs-virtual-threads