在本文中,你将学习如何使用 Spring WebFlux、R2DBC 和 Postgres 数据库实现和测试响应式(Reactive) Spring Boot 应用程序。我们将使用最新版本的 Spring Boot 3 创建两个用 Kotlin 编写的简单应用程序。我们的应用程序通过 HTTP 公开一些 REST 端点。为了测试它们之间的通信以及与 Postgres 数据库的集成,我们将使用 Testcontainers 和 Netty Mock Server。
源码 {#源码}
你可以可以克隆我的 GitHub repository。它包含 employee-service
和 organization-service
两个应用程序。之后,你只需按照我的说明操作即可。
依赖 {#依赖}
第一步,我们将添加几个与 Kotlin 相关的依赖。除了标准库,我们还可以加入 Kotlin 对 Jackson(JSON 序列化/反序列化)的支持:
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
</dependency>
我们还需要包含两个 Spring Boot Starter。为了创建响应式 Spring @Controller
,我们需要使用 Spring WebFlux 模块。有了 Spring Boot Data R2DBC Starter,我们就能以响应方式使用 Spring Data Repository。最后,我们还需要加入 R2DBC 提供的 Postgres 驱动程序。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<scope>runtime</scope>
</dependency>
我们的项目中有几个测试依赖。我们需要包含标准的 Spring Boot Test Starter、支持 JUnit 5、Postgres 和 R2DBC 的 Testcontainers,以及用于模拟响应式 API 的 Mock Server Netty 模块。还添加了 spring-boot-testcontainers
模块,以利用 Spring Boot 和 Testcontainers 之间的内置集成。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>r2dbc</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.18.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty</artifactId>
<version>5.15.0</version>
<scope>test</scope>
</dependency>
最后一个依赖关系是可选的。我们可以在应用程序中加入 Spring Boot Actuator。它将 R2DBC 连接状态添加到健康检查中,并将若干指标与连接池状态结合起来。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
实现 Spring Reactive 应用程序 {#实现-spring-reactive-应用程序}
下面是第一个应用程序 - employee-service
的 model 类:
class Employee(val name: String,
val salary: Int,
val organizationId: Int) {
@Id var id: Int? = null
}
这是 repository 接口。它需要继承 R2dbcRepository
接口。与标准 Spring Data Repository 一样,我们可以定义多个 find 方法。不过,它们使用 Reactor Mono
或 Flux
封装返回对象,而不是实体。
interface EmployeeRepository: R2dbcRepository<Employee, Int> {
fun findByOrganizationId(organizationId: Int): Flux<Employee>
}
下面是 @RestController
的实现。我们需要注入 EmployeeRepository
Bean。然后,我们使用 repository Bean 以响应式的方式与数据库交互。我们的端点也会返回由 Reactor Mono
和 Flux
封装的对象。有三个 find 端点和一个 POST 端点:
@RestController @RequestMapping("/employees") class EmployeeController {
@Autowired lateinit var repository : EmployeeRepository
@GetMapping // (1) fun findAll() : Flux<Employee> = repository.findAll()
@GetMapping("/{id}") // (2) fun findById(@PathVariable id: Int) : Mono<Employee> = repository.findById(id)
@GetMapping("/organization/{organizationId}") // (3) fun findByOrganizationId(@PathVariable organizationId: Int): Flux<Employee> = repository.findByOrganizationId(organizationId)
@PostMapping // (4) fun add(@RequestBody employee: Employee) : Mono<Employee> = repository.save(employee)
}
- 检索所有 employee。
- 根据 employee
id
检索。 - 根据 organization
id
检索所有 employee。 - 添加新的 employee。
我们还需要在 Spring Boot application.yml
中配置数据库连接设置:
spring:
application:
name: employee-service
r2dbc:
url: r2dbc:postgresql://localhost:5432/spring
username: spring
password: spring123
这是我们的 main class。我们希望应用程序在启动时在数据库中创建一个表。使用 R2DBC 时,我们需要准备一段代码,以便使用 schema.sql
文件填充 schema。
@SpringBootApplication class EmployeeApplication {
@Bean fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? { val initializer = ConnectionFactoryInitializer() initializer.setConnectionFactory(connectionFactory) initializer.setDatabasePopulator( ResourceDatabasePopulator(ClassPathResource("schema.sql"))) return initializer }
}
fun main(args: Array<String>) { runApplication<EmployeeApplication>(*args) }
然后将 schema.sql 文件放到 src/main/resources
目录中即可。
CREATE TABLE employee (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
salary INT,
organization_id INT
);
让我们切换到 organization-service
。实现过程非常相似。这是我们的 domain model 类:
class Organization(var name: String) {
@Id var id: Int? = null
}
我们的应用程序要与 employee-service
进行通信。因此,我们需要定义 WebClient
Bean。它将从 application properties 中获取目标服务的地址。
@SpringBootApplication class OrganizationApplication {
@Bean fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? { val initializer = ConnectionFactoryInitializer() initializer.setConnectionFactory(connectionFactory) initializer.setDatabasePopulator( ResourceDatabasePopulator(ClassPathResource("schema.sql"))) return initializer }
@Value("${employee.client.url}") private lateinit var employeeUrl: String
@Bean fun webClient(builder: WebClient.Builder): WebClient { return builder.baseUrl(employeeUrl).build() } }
fun main(args: Array<String>) { runApplication<OrganizationApplication>(*args) }
There is also the repository interface OrganizationRepository
. Our @RestController
uses a repository bean to interact with the database and the WebClient bean to call the endpoint exposed by the employee-service
. As the response from the GET /employees/{id}/with-employees
it returns the OrganizationDTO
.
还有一个 repository 接口 OrganizationRepository
。我们的 @RestController
使用 repository Bean 与数据库交互,并使用 WebClient
Bean 调用 employee-service
暴露的端点。GET /employees/{id}/with-employees
的响应会返回 OrganizationDTO
。
@RestController @RequestMapping("/organizations") class OrganizationController {
@Autowired lateinit var repository : OrganizationRepository @Autowired lateinit var client : WebClient
@GetMapping fun findAll() : Flux<Organization> = repository.findAll()
@GetMapping("/{id}") fun findById(@PathVariable id : Int): Mono<Organization> = repository.findById(id)
@GetMapping("/{id}/with-employees") fun findByIdWithEmployees(@PathVariable id : Int) : Mono<OrganizationDTO> { val employees : Flux<Employee> = client.get().uri("/employees/organization/$id") .retrieve().bodyToFlux(Employee::class.java) val org : Mono<Organization> = repository.findById(id) return org.zipWith(employees.collectList()).log() .map { tuple -> OrganizationDTO(tuple.t1.id as Int, tuple.t1.name, tuple.t2) } }
@PostMapping fun add(@RequestBody employee: Organization) : Mono<Organization> = repository.save(employee)
}
下面是我们的 DTO 实现:
data class OrganizationDTO(var id: Int?, var name: String) {
var employees : MutableList<Employee> = ArrayList()
constructor(employees: MutableList<Employee>) : this(null, "") {
this.employees = employees
}
constructor(id: Int, name: String, employees: MutableList<Employee>) : this(id, name) {
this.employees = employees
}
}
集成测试 {#集成测试}
完成实现后,我们就可以准备几个集成测试了。正如我在开头提到的,我们将使用 Testcontainers 在测试期间运行 Postgres 容器。我们的测试运行应用程序,并利用自动配置的 WebTestClient
实例调用 API 端点 (1) 。我们需要在测试前启动 Postgres 容器。因此,我们需要在 companion object
部分定义容器 bean (2) 。有了 @ServiceConnection
注解,我们就不必手动设置属性了,Spring Boot 会帮我们完成 (3)。
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @Testcontainers @TestMethodOrder(OrderAnnotation::class) public class EmployeeControllerTests {
@Autowired private lateinit var webTestClient: WebTestClient // (1)
companion object { // (2)
@Container @ServiceConnection // (3) val container = PostgreSQLContainer<Nothing>("postgres:14").apply { withDatabaseName("spring") withUsername("spring") withPassword("spring123") }
}
@Test @Order(1) fun shouldStart() {
}
@Test @Order(2) fun shouldAddEmployee() { webTestClient.post().uri("/employees") .contentType(MediaType.APPLICATION_JSON) .bodyValue(Employee("Test", 1000, 1)) .exchange() .expectStatus().is2xxSuccessful .expectBody() .jsonPath("$.id").isNotEmpty }
@Test @Order(3) fun shouldFindEmployee() { webTestClient.get().uri("/employees/1") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().is2xxSuccessful .expectBody() .jsonPath("$.id").isNotEmpty }
@Test @Order(3) fun shouldFindEmployees() { webTestClient.get().uri("/employees") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().is2xxSuccessful .expectBody().jsonPath("$.length()").isEqualTo(1) .jsonPath("$[0].id").isNotEmpty }
}
organization-service
的测试类要复杂一些。这是因为我们需要模拟与 employee-service
的通信。为此,我们使用了 ClientAndServer
对象 (1) 。它在所有测试之前启动一次 (2) ,在测试之后停止 (3) 。我们模拟 organization-service
调用的 GET /employees/organization/{id}
端点 (4) 。然后,我们调用 organization-service GET /organizations/{id}/with-employees
端点 (5)。最后,我们将验证它是否在 JSON 响应中返回 employee 列表。
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @Testcontainers @TestMethodOrder(OrderAnnotation::class) public class OrganizationControllerTests {
@Autowired private lateinit var webTestClient: WebTestClient
companion object {
@Container @ServiceConnection val container = PostgreSQLContainer<Nothing>("postgres:14").apply { withDatabaseName("spring") withUsername("spring") withPassword("spring123") } private var mockServer: ClientAndServer? = null // (1) @BeforeAll @JvmStatic internal fun beforeAll() { // (2) mockServer = ClientAndServer.startClientAndServer(8090); } @AfterAll @JvmStatic internal fun afterAll() { // (3) mockServer!!.stop() }
}
@Test @Order(1) fun shouldStart() {
}
@Test @Order(2) fun shouldAddOrganization() { webTestClient.post().uri("/organizations") .contentType(MediaType.APPLICATION_JSON) .bodyValue(Organization("Test")) .exchange() .expectStatus().is2xxSuccessful .expectBody() .jsonPath("$.id").isNotEmpty }
@Test @Order(3) fun shouldFindOrganization() { webTestClient.get().uri("/organizations/1") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().is2xxSuccessful .expectBody() .jsonPath("$.id").isNotEmpty }
@Test @Order(3) fun shouldFindOrganizations() { webTestClient.get().uri("/organizations") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().is2xxSuccessful .expectBody().jsonPath("$.length()").isEqualTo(1) .jsonPath("$[0].id").isNotEmpty }
@Test @Order(3) fun shouldFindOrganizationWithEmployees() { // (4) mockServer!!.
when
(request() .withMethod("GET") .withPath("/employees/organization/1")) .respond(response() .withStatusCode(200) .withContentType(MediaType.APPLICATION_JSON) .withBody(createEmployees()))webTestClient.get().uri("/organizations/1/with-employees") .accept(MediaType.APPLICATION_JSON) // (5) .exchange() .expectStatus().is2xxSuccessful .expectBody() .jsonPath("$.id").isNotEmpty .jsonPath("$.employees.length()").isEqualTo(2) .jsonPath("$.employees[0].id").isEqualTo(1) .jsonPath("$.employees[1].id").isEqualTo(2)
}
private fun createEmployees(): String { val employees: List<Employee> = listOf<Employee>( Employee(1, "Test1", 10000, 1), Employee(2, "Test2", 20000, 1) ) return jacksonObjectMapper().writeValueAsString(employees) } }
你可以在自己电脑上运行这些测试,验证所有测试是否已成功完成。clone repository 后,你需要运行 Docker 并使用以下 Maven 命令构建应用程序:
$ mvn clean package
我们还可以在 CircleCI 上为应用程序准备构建定义(build definition)。由于我们需要运行 Testcontainers,因此需要一台运行有 Docker 守护进程的机器。下面是 .circle/config.yml
文件中为 CircleCI 构建 pipeline 的配置:
# .circleci/config.yml version: 2.1
jobs: build: docker: - image: 'cimg/openjdk:20.0' steps: - checkout - run: name: Analyze on SonarCloud command: mvn verify sonar:sonar -DskipTests
executors: machine_executor_amd64: machine: image: ubuntu-2204:2022.04.2 environment: architecture: "amd64" platform: "linux/amd64"
orbs: maven: circleci/maven@1.4.1
workflows: maven_test: jobs: - maven/test: executor: machine_executor_amd64 - build: context: SonarCloud
下面是在 CircleCI 上的构建结果:
如果你正在运行 Docker,也可以使用 Postgres 容器启动我们的 Spring Boot 响应式应用程序。这要归功于 spring-boot-testcontainers
模块。有一个专用的 @TestConfiguration
类可用于在开发模式下运行 Postgres:
@TestConfiguration class PostgresContainerDevMode {
@Bean @ServiceConnection fun postgresql(): PostgreSQLContainer<*>? { return PostgreSQLContainer("postgres:14.0") .withUsername("spring") .withPassword("spring123") }
}
现在,我们需要定义 test main class,它将使用 PostgresContainerDevMode
类中提供的配置。
class EmployeeApplicationTest
fun main(args: Array<String>) { fromApplication<EmployeeApplication>() .with(PostgresContainerDevMode::class) .run(*args) }
要在 Docker 上运行 Dev Postgres 应用程序,只需执行以下 Maven 命令:
$ mvn spring-boot:test-run
参考:https://piotrminkowski.com/2023/07/28/reactive-spring-boot-with-webflux-r2dbc-and-postgres/