51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

限制 Webclient 的并发请求数量

1、简介 {#1简介}

本文介绍了一些限制 WebClient 并发请求数量的方式。

2、服务端 {#2服务端}

限制 WebClient 并发请求数量是为了避免服务器因大量并发请求而宕机。有些服务自身也提供了一些限制策略。

2.1、一个简单的 Controller {#21一个简单的-controller}

为了演示,先定义一个简单的 @RestController,它返回固定范围的随机数字:

@RestController
@RequestMapping("/random")
public class RandomController {
@GetMapping
Integer getRandom() {
    return new Random().nextInt(50);
}

}

接下来,我们将模拟一些耗时的操作,并限制并发请求的数量。

2.2、服务器限制并发请求数 {#22服务器限制并发请求数}

修改服务,模拟一个更真实的场景。

首先,限制服务器可接受的并发请求数,并在达到限制时抛出异常。

其次,增加处理响应的延迟,模拟耗时的操作。

创建 Concurrency 用于限制并发数量:

public class Concurrency {
public static final int MAX_CONCURRENT = 5;
static final AtomicInteger CONCURRENT_REQUESTS = new AtomicInteger();

public static int protect(IntSupplier supplier) { try { if (CONCURRENT_REQUESTS.incrementAndGet() > MAX_CONCURRENT) { throw new UnsupportedOperationException("max concurrent requests reached"); }

    TimeUnit.SECONDS.sleep(2);
    return supplier.getAsInt();
} finally {
    CONCURRENT_REQUESTS.decrementAndGet();
}

}

}

更改端点以使用它:

@GetMapping
Integer getRandom() {
    return Concurrency.protect(() -> new Random().nextInt(50));
}

现在,当请求超过 MAX_CONCURRENT 时,端点会拒绝处理请求,并向客户端返回错误信息。

2.3、一个简单的客户端 {#23一个简单的客户端}

以下的所有示例都将遵循这种模式,生成包含 n 个请求的 Flux,并向服务发出 GET 请求:

Flux.range(1, n)
  .flatMap(i -> {
    // GET 请求
  });

为了减少模板代码,用一个可以在所有示例中重复使用的方法来实现请求部分。

接收一个 WebClient,调用 get(),然后使用 ParameterizedTypeReference 泛型检索响应体:

public interface RandomConsumer {
static <T> Mono<T> get(WebClient client) {
    return client.get()
      .retrieve()
      .bodyToMono(new ParameterizedTypeReference<T>() {});
}

}

3、zipWith(Flux.interval()) {#3zipwithfluxinterval}

第一个示例,使用 zipWith() 方法来在来固定延迟后发起请求。

public class ZipWithInterval {
public static Flux<Integer> fetch(
  WebClient client, int requests, int delay) {
    return Flux.range(1, requests)
      .zipWith(Flux.interval(Duration.ofMillis(delay)))
      .flatMap(i -> RandomConsumer.get(client));
}

}

每次请求之前都会延迟 delay 毫秒。

4、Flux.delayElements() {#4fluxdelayelements}

Flux 有一种更直接的延迟消费方式:

public class DelayElements {
public static Flux<Integer> fetch(
  WebClient client, int requests, int delay) {
    return Flux.range(1, requests)
      .delayElements(Duration.ofMillis(delay))
      .flatMap(i -> RandomConsumer.get(client));
}

}

使用 delayElements(),延迟会直接应用于 Subscriber.onNext() 信号。换句话说,它会延迟 Flux.range() 中的每个元素。因此,传入 flatMap() 的函数将受到影响,需要更长时间才能启动。例如,如果 delay 值为 1000,我们的请求开始前将延迟一秒钟。

4.1、调整解决方案 {#41调整解决方案}

如果我们没有提供足够长的延迟时间,就会出现错误:

@Test
void givenSmallDelay_whenDelayElements_thenExceptionThrown() {
    int delay = 100;
int requests = 10;
assertThrows(InternalServerError.class, () -> {
  DelayElements.fetch(client, requests, delay)
    .blockLast();
});

}

这是因为我们每个请求需要等待 100 毫秒,但每个请求在服务器端需要两秒钟才能完成。因此,很快就达到了并发请求限制,并收到 500 错误。

如果增加足够的延迟,就可以避免请求受到限制。但这样一来,等待的时间就会过长,可能会严重影响性能。

既然我们知道服务器的并发限制数,接下来让我们看看有什么更合适的方法来处理这个问题。

5、使用 flatMap() 进行并发控制 {#5使用-flatmap-进行并发控制}

考虑到服务的并发限制,最佳选择是最多并行发送 Concurrency.MAX_CONCURRENT 个请求。为此,可以在 flatMap() 中添加一个额外的参数来指定最大并行处理的数量:

public class LimitConcurrency {
public static Flux<Integer> fetch(
  WebClient client, int requests, int concurrency) {
    return Flux.range(1, requests)
      .flatMap(i -> RandomConsumer.get(client), concurrency);
}

}

该参数可保证最大并发请求数不超过 concurrency,并保证我们的处理不会出现不必要的延迟:

@Test
void givenLimitInsideServerRange_whenLimitedConcurrency_thenNoExceptionThrown() {
    int limit = Concurrency.MAX_CONCURRENT;
int requests = 10;
assertDoesNotThrow(() -> {
  LimitConcurrency.fetch(client, TOTAL_REQUESTS, limit)
    .blockLast();
});

}

6、使用 Resilience4j RateLimiter {#6使用-resilience4j-ratelimiter}

Resilience4j 是一个多功能库,用于处理应用中的容错问题。可以使用它来限制一定时间间隔内的并发请求数,包括超时。

首先,添加 resilience4j-reactorresilience4j-ratelimiter 依赖:

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-reactor</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-ratelimiter</artifactId>
    <version>1.7.1</version>
</dependency>

然后,使用 RateLimiter.of() 创建 Rate Limiter,提供名称、发送新请求的时间间隔、并发限制和超时:

public class Resilience4jRateLimit {
public static Flux&lt;Integer&gt; fetch(
  WebClient client, int requests, int concurrency, int interval) {
    RateLimiter limiter = RateLimiter.of(&quot;my-rate-limiter&quot;, RateLimiterConfig.custom()
      .limitRefreshPeriod(Duration.ofMillis(interval))
      .limitForPeriod(concurrency)
      .timeoutDuration(Duration.ofMillis(interval * concurrency))
      .build());
// ...

}

}

现在,通过 transformDeferred() 将其纳入 Flux,这样它就能控制 GET 请求速率:

return Flux.range(1, requests)
  .flatMap(i -> RandomConsumer.get(client)
    .transformDeferred(RateLimiterOperator.of(limiter))
  );

你会注意到,如果定义的时间间隔太短,仍然会出现问题。

这种方式适用于我们需要与其他操作共享 Rate Limiter 的场景。

7、用 Guava 进行精确地限制 {#7用-guava-进行精确地限制}

Guava 有一个通用的 Rate Limiter,可以很好地满足我们的需求。由于它使用令牌桶算法,因此只会在必要时阻塞,而不会像 Flux.delayElements() 那样每次都阻塞。

首先,需要在 pom.xml 中添加 guava

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.1-jre</version>
</dependency>

调用 RateLimiter.create(),并将每秒要发送的最大请求数传给它。然后,在发送请求前调用 Limiter 的 acquire() 方法,对请求执行进行限制:

public class GuavaRateLimit {
public static Flux&lt;Integer&gt; fetch(
  WebClient client, int requests, int requestsPerSecond) {
    RateLimiter limiter = RateLimiter.create(requestsPerSecond);
return Flux.range(1, requests)
  .flatMap(i -&amp;gt; {
    limiter.acquire();

    return RandomConsumer.get(client);
  });

}

}

这种解决方案简单且效果极佳,它不会让代码阻塞得比必要的时间更长。

8、总结 {#8总结}

在本文中,我们介绍了几种可用来限制 WebClient 并发请求数的方法。


参考:https://www.baeldung.com/spring-webclient-limit-requests-per-second

赞(4)
未经允许不得转载:工具盒子 » 限制 Webclient 的并发请求数量