简介 {#简介}
本文将带你了解在 Spring Data JPA 中使用 Stream(流式)查询的最佳方式。
当需要获取较大的结果集时,使用 Java Stream 的好处是可以逐步迭代查询结果集,避免一次性获取所有数据可能导致的内存溢出异常。
JPA Stream 方法 {#jpa-stream-方法}
自 2.2 版起,你可以使用 JPA 的 getResultStream
方法以 Stream
的形式处理结果集。
getResultStream
使用 JDBC ResultSet
对给定查询返回的记录进行流式处理。特别是在处理大结果集的时候,这种方法很有效率。
Spring Data JPA Stream 查询方法 {#spring-data-jpa-stream-查询方法}
如果要对查询结果集进行流式处理,则需要在 Spring Data JPA 查询方法中返回 Java Stream
类型,如下例所示:
@Repository
public interface PostRepository extends BaseJpaRepository<Post, Long> {
@Query("""
select p
from Post p
where date(p.createdOn) >= :sinceDate
"""
)
@QueryHints(
@QueryHint(name = AvailableHints.HINT_FETCH_SIZE, value = "25")
)
// 返回 Stream
Stream<Post> streamByCreatedOnSince(@Param("sinceDate") LocalDate sinceDate);
}
对于 PostgreSQL 和 MySQL,指定 FETCH_SIZE
JPA QueryHint 是必要的,它指示 JDBC 驱动每次迭代的时候最多预取 25 条记录。否则,PostgreSQL 和 MySQL JDBC 驱动会在遍历底层 ResultSet
之前预取所有查询结果。
有了 streamByCreatedOnSince
方法后,我们现在来实现一个 updatePostCache
业务方法,该方法将获取最新的 Post
实体并更新内存缓存。
@Transactional(readOnly = true)
public void updatePostCache() {
LocalDate yesterday = LocalDate.now().minusDays(1);
try(Stream<Post> postStream = postRepository.streamByCreatedOnSince(yesterday)) {
postStream.forEach(
post -> executorService.submit(() ->
postCache.put(post.getId(), post)
)
);
}
}
注意,updatePostCache
service 方法注解了 @Transactional(readOnly = true)
,因为需要在整个 Stream
遍历的过程中保持数据库连接处于打开状态。
总结 {#总结}
Spring Data JPA 提供了对流式(Stream
)查询方法的支持,但在使用此功能之前有几件事需要注意:
- 首先,需要确保不要一次性检索所有数据。
- 其次,需要确保在遍历
Stream
之前不释放数据库连接。
Ref:https://vladmihalcea.com/spring-data-jpa-stream/