1、概览 {#1概览}
本文将会带你了解在 Java 响应式编程中如何将 Flux<DataBuffer>
读取到 InputStream
。
2、请求设置 {#2请求设置}
首先,使用 Spring Reactive WebClient 发起 GET 请求。使用由 gorest.co.in
托管的公共 API 端点来进行测试:
String REQUEST_ENDPOINT = "https://gorest.co.in/public/v2/users";
接下来,定义 getWebClient()
方法,用于获取 WebClient
类的新实例:
static WebClient getWebClient() {
WebClient.Builder webClientBuilder = WebClient.builder();
return webClientBuilder.build();
}
至此,我们就可以向 /public/v2/users
端点发出 GET 请求了。注意,必须以 Flux<DataBuffer>
对象的形式获取响应体。
3、BodyExtractors
和 DataBufferUtils
{#3bodyextractors-和-databufferutils}
我们可以使用 spring-webflux 中 BodyExtractors
类的 toDataBuffers()
方法将响应体提取到 Flux<DataBuffer>
中。
将 body
构建为 Flux<DataBuffer>
类型的实例:
Flux<DataBuffer> body = client
.get(
.uri(REQUEST_ENDPOINT)
.exchangeToFlux( clientResponse -> {
return clientResponse.body(BodyExtractors.toDataBuffers());
});
接下来,需要将这些 DataBuffer
Stream 收集到一个 InputStream
中,实现这一目标的好方法是使用 PipedInputStream
和 PipedOutputStream
。
我们打算向管道输出流(PipedOutputStream
)写入内容,并最终从管道输入流(PipedInputStream
)读取内容。
创建这两个相连的流:
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(1024*10);
inputStream.connect(outputStream);
注意,默认大小为 1024
字节。但是,预计从 Flux<DataBuffer>
收集到的结果可能会超过默认值。因此,需要明确指定一个更大的值,在本例中就是 1024 * 10
*。
最后,使用 DataBufferUtils
类中的 write()
方法,将 body 作为 publisher 写入 outputStream
:
DataBufferUtils.write(body, outputStream).subscribe();
注意,在声明时,已将 inputStream
连接到 outputStream
。因此,现在已经可以从 inputStream
读取数据了。
4、从管道输入流(PipedInputStream)读取数据 {#4从管道输入流pipedinputstream读取数据}
首先,定义一个方法 readContent()
,以 String
的形式读取 InputStream
:
String readContent(InputStream stream) throws IOException {
StringBuffer contentStringBuffer = new StringBuffer();
byte[] tmp = new byte[stream.available()];
int byteCount = stream.read(tmp, 0, tmp.length);
contentStringBuffer.append(new String(tmp));
return String.valueOf(contentStringBuffer);
}
接下来,典型的做法 是在不同的线程中读取 PipedInputStream
,所以创建 readContentFromPipedInputStream()
方法,在内部创建一个新的线程,通过调用 readContent()
方法将 PipedInputStream
中的内容读取为 String
:
String readContentFromPipedInputStream(PipedInputStream stream) throws IOException {
StringBuffer contentStringBuffer = new StringBuffer();
try {
Thread pipeReader = new Thread(() -> {
try {
contentStringBuffer.append(readContent(stream));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
pipeReader.start();
pipeReader.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
stream.close();
}
return String.valueOf(contentStringBuffer);
}
现在,看看它的运行效果。
WebClient webClient = getWebClient();
InputStream inputStream = getResponseAsInputStream(webClient, REQUEST_ENDPOINT);
Thread.sleep(3000);
String content = readContentFromPipedInputStream((PipedInputStream) inputStream);
logger.info("response content: \n{}", content.replace("}","}\n"));
由于系统是异步的,所以在从从数据流中读取数据之前通过 Thread.sleep
把当前线程暂停 3 秒,目的是为了能够看到完整的响应。另外,在输出日志的时候,添加了一个换行符,把长的日志输出分成了多行,
最后,执行代码,验证输出的结果:
20:45:04.120 [main] INFO com.baeldung.databuffer.DataBufferToInputStream - response content:
[{"id":2642,"name":"Bhupen Trivedi","email":"bhupen_trivedi@renner-pagac.name","gender":"male","status":"active"}
,{"id":2637,"name":"Preity Patel","email":"patel_preity@abshire.info","gender":"female","status":"inactive"}
,{"id":2633,"name":"Brijesh Shah","email":"brijesh_shah@morar.co","gender":"male","status":"inactive"}
...
,{"id":2623,"name":"Mohini Mishra","email":"mishra_mohini@hamill-ledner.info","gender":"female","status":"inactive"}
]
一切OK。
5、总结 {#5总结}
本文介绍了如何通过管道流以及 BodyExtractors
和 DataBufferUtils
类中的工具方法,将 Flux<DataBuffer>
读取到 InputStream
中
参考:https://www.baeldung.com/spring-reactive-read-flux-into-inputstream