51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

使用 Webflux R2dbc 和 Postgres 构建响应式 Spring Boot 应用

在本文中,你将学习如何使用 Spring WebFlux、R2DBC 和 Postgres 数据库实现和测试响应式(Reactive) Spring Boot 应用程序。我们将使用最新版本的 Spring Boot 3 创建两个用 Kotlin 编写的简单应用程序。我们的应用程序通过 HTTP 公开一些 REST 端点。为了测试它们之间的通信以及与 Postgres 数据库的集成,我们将使用 Testcontainers 和 Netty Mock Server。

源码 {#源码}

你可以可以克隆我的 GitHub repository。它包含 employee-serviceorganization-service 两个应用程序。之后,你只需按照我的说明操作即可。

依赖 {#依赖}

第一步,我们将添加几个与 Kotlin 相关的依赖。除了标准库,我们还可以加入 Kotlin 对 Jackson(JSON 序列化/反序列化)的支持:

<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.module</groupId>
  <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-reflect</artifactId>
</dependency>

我们还需要包含两个 Spring Boot Starter。为了创建响应式 Spring @Controller,我们需要使用 Spring WebFlux 模块。有了 Spring Boot Data R2DBC Starter,我们就能以响应方式使用 Spring Data Repository。最后,我们还需要加入 R2DBC 提供的 Postgres 驱动程序。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>r2dbc-postgresql</artifactId>
  <scope>runtime</scope>
</dependency>

我们的项目中有几个测试依赖。我们需要包含标准的 Spring Boot Test Starter、支持 JUnit 5、Postgres 和 R2DBC 的 Testcontainers,以及用于模拟响应式 API 的 Mock Server Netty 模块。还添加了 spring-boot-testcontainers 模块,以利用 Spring Boot 和 Testcontainers 之间的内置集成。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>r2dbc</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>postgresql</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>junit-jupiter</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-testcontainers</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.mock-server</groupId>
  <artifactId>mockserver-netty</artifactId>
  <version>5.15.0</version>
  <scope>test</scope>
</dependency>

最后一个依赖关系是可选的。我们可以在应用程序中加入 Spring Boot Actuator。它将 R2DBC 连接状态添加到健康检查中,并将若干指标与连接池状态结合起来。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

实现 Spring Reactive 应用程序 {#实现-spring-reactive-应用程序}

下面是第一个应用程序 - employee-service 的 model 类:

class Employee(val name: String, 
               val salary: Int, 
               val organizationId: Int) {
    @Id var id: Int? = null
}

这是 repository 接口。它需要继承 R2dbcRepository 接口。与标准 Spring Data Repository 一样,我们可以定义多个 find 方法。不过,它们使用 Reactor MonoFlux 封装返回对象,而不是实体。

interface EmployeeRepository: R2dbcRepository<Employee, Int> {
    fun findByOrganizationId(organizationId: Int): Flux<Employee>
}

下面是 @RestController 的实现。我们需要注入 EmployeeRepository Bean。然后,我们使用 repository Bean 以响应式的方式与数据库交互。我们的端点也会返回由 Reactor MonoFlux 封装的对象。有三个 find 端点和一个 POST 端点:

@RestController
@RequestMapping("/employees")
class EmployeeController {

@Autowired lateinit var repository : EmployeeRepository

@GetMapping // (1) fun findAll() : Flux<Employee> = repository.findAll()

@GetMapping("/{id}") // (2) fun findById(@PathVariable id: Int) : Mono<Employee> = repository.findById(id)

@GetMapping("/organization/{organizationId}") // (3) fun findByOrganizationId(@PathVariable organizationId: Int): Flux<Employee> = repository.findByOrganizationId(organizationId)

@PostMapping // (4) fun add(@RequestBody employee: Employee) : Mono<Employee> = repository.save(employee)

}

  1. 检索所有 employee。
  2. 根据 employee id 检索。
  3. 根据 organization id 检索所有 employee。
  4. 添加新的 employee。

我们还需要在 Spring Boot application.yml 中配置数据库连接设置:

spring:
  application:
    name: employee-service
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/spring
    username: spring
    password: spring123

这是我们的 main class。我们希望应用程序在启动时在数据库中创建一个表。使用 R2DBC 时,我们需要准备一段代码,以便使用 schema.sql 文件填充 schema。

@SpringBootApplication
class EmployeeApplication {

@Bean fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? { val initializer = ConnectionFactoryInitializer() initializer.setConnectionFactory(connectionFactory) initializer.setDatabasePopulator( ResourceDatabasePopulator(ClassPathResource("schema.sql"))) return initializer }

}

fun main(args: Array<String>) { runApplication<EmployeeApplication>(*args) }

然后将 schema.sql 文件放到 src/main/resources 目录中即可。

CREATE TABLE employee (
  id SERIAL PRIMARY KEY, 
  name VARCHAR(255), 
  salary INT, 
  organization_id INT
);

让我们切换到 organization-service。实现过程非常相似。这是我们的 domain model 类:

class Organization(var name: String) {
    @Id var id: Int? = null
}

我们的应用程序要与 employee-service 进行通信。因此,我们需要定义 WebClient Bean。它将从 application properties 中获取目标服务的地址。

@SpringBootApplication
class OrganizationApplication {

@Bean fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? { val initializer = ConnectionFactoryInitializer() initializer.setConnectionFactory(connectionFactory) initializer.setDatabasePopulator( ResourceDatabasePopulator(ClassPathResource("schema.sql"))) return initializer }

@Value("${employee.client.url}") private lateinit var employeeUrl: String

@Bean fun webClient(builder: WebClient.Builder): WebClient { return builder.baseUrl(employeeUrl).build() } }

fun main(args: Array<String>) { runApplication<OrganizationApplication>(*args) }

There is also the repository interface OrganizationRepository. Our @RestController uses a repository bean to interact with the database and the WebClient bean to call the endpoint exposed by the employee-service. As the response from the GET /employees/{id}/with-employees it returns the OrganizationDTO.

还有一个 repository 接口 OrganizationRepository。我们的 @RestController 使用 repository Bean 与数据库交互,并使用 WebClient Bean 调用 employee-service 暴露的端点。GET /employees/{id}/with-employees 的响应会返回 OrganizationDTO

@RestController
@RequestMapping("/organizations")
class OrganizationController {

@Autowired lateinit var repository : OrganizationRepository @Autowired lateinit var client : WebClient

@GetMapping fun findAll() : Flux<Organization> = repository.findAll()

@GetMapping("/{id}") fun findById(@PathVariable id : Int): Mono<Organization> = repository.findById(id)

@GetMapping("/{id}/with-employees") fun findByIdWithEmployees(@PathVariable id : Int) : Mono<OrganizationDTO> { val employees : Flux<Employee> = client.get().uri("/employees/organization/$id") .retrieve().bodyToFlux(Employee::class.java) val org : Mono<Organization> = repository.findById(id) return org.zipWith(employees.collectList()).log() .map { tuple -> OrganizationDTO(tuple.t1.id as Int, tuple.t1.name, tuple.t2) } }

@PostMapping fun add(@RequestBody employee: Organization) : Mono<Organization> = repository.save(employee)

}

下面是我们的 DTO 实现:

data class OrganizationDTO(var id: Int?, var name: String) {
    var employees : MutableList<Employee> = ArrayList()
    constructor(employees: MutableList<Employee>) : this(null, "") {
        this.employees = employees
    }
    constructor(id: Int, name: String, employees: MutableList<Employee>) : this(id, name) {
        this.employees = employees
    }
}

集成测试 {#集成测试}

完成实现后,我们就可以准备几个集成测试了。正如我在开头提到的,我们将使用 Testcontainers 在测试期间运行 Postgres 容器。我们的测试运行应用程序,并利用自动配置的 WebTestClient 实例调用 API 端点 (1) 。我们需要在测试前启动 Postgres 容器。因此,我们需要在 companion object 部分定义容器 bean (2) 。有了 @ServiceConnection 注解,我们就不必手动设置属性了,Spring Boot 会帮我们完成 (3)

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class EmployeeControllerTests {

@Autowired private lateinit var webTestClient: WebTestClient // (1)

companion object { // (2)

  @Container
  @ServiceConnection // (3)
  val container = PostgreSQLContainer&lt;Nothing&gt;(&quot;postgres:14&quot;).apply {
     withDatabaseName(&quot;spring&quot;)
     withUsername(&quot;spring&quot;)
     withPassword(&quot;spring123&quot;)
  }

}

@Test @Order(1) fun shouldStart() {

}

@Test @Order(2) fun shouldAddEmployee() { webTestClient.post().uri("/employees") .contentType(MediaType.APPLICATION_JSON) .bodyValue(Employee("Test", 1000, 1)) .exchange() .expectStatus().is2xxSuccessful .expectBody() .jsonPath("$.id").isNotEmpty }

@Test @Order(3) fun shouldFindEmployee() { webTestClient.get().uri("/employees/1") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().is2xxSuccessful .expectBody() .jsonPath("$.id").isNotEmpty }

@Test @Order(3) fun shouldFindEmployees() { webTestClient.get().uri("/employees") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().is2xxSuccessful .expectBody().jsonPath("$.length()").isEqualTo(1) .jsonPath("$[0].id").isNotEmpty }

}

organization-service 的测试类要复杂一些。这是因为我们需要模拟与 employee-service 的通信。为此,我们使用了 ClientAndServer 对象 (1) 。它在所有测试之前启动一次 (2) ,在测试之后停止 (3) 。我们模拟 organization-service 调用的 GET /employees/organization/{id} 端点 (4) 。然后,我们调用 organization-service GET /organizations/{id}/with-employees 端点 (5)。最后,我们将验证它是否在 JSON 响应中返回 employee 列表。

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class OrganizationControllerTests {

@Autowired private lateinit var webTestClient: WebTestClient

companion object {

  @Container
  @ServiceConnection
  val container = PostgreSQLContainer&lt;Nothing&gt;(&quot;postgres:14&quot;).apply {
     withDatabaseName(&quot;spring&quot;)
     withUsername(&quot;spring&quot;)
     withPassword(&quot;spring123&quot;)
  }

  private var mockServer: ClientAndServer? = null // (1)

  @BeforeAll
  @JvmStatic
  internal fun beforeAll() { // (2)
     mockServer = ClientAndServer.startClientAndServer(8090);
  }

  @AfterAll
  @JvmStatic
  internal fun afterAll() { // (3)
     mockServer!!.stop()
  }

}

@Test @Order(1) fun shouldStart() {

}

@Test @Order(2) fun shouldAddOrganization() { webTestClient.post().uri("/organizations") .contentType(MediaType.APPLICATION_JSON) .bodyValue(Organization("Test")) .exchange() .expectStatus().is2xxSuccessful .expectBody() .jsonPath("$.id").isNotEmpty }

@Test @Order(3) fun shouldFindOrganization() { webTestClient.get().uri("/organizations/1") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().is2xxSuccessful .expectBody() .jsonPath("$.id").isNotEmpty }

@Test @Order(3) fun shouldFindOrganizations() { webTestClient.get().uri("/organizations") .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().is2xxSuccessful .expectBody().jsonPath("$.length()").isEqualTo(1) .jsonPath("$[0].id").isNotEmpty }

@Test @Order(3) fun shouldFindOrganizationWithEmployees() { // (4) mockServer!!.when(request() .withMethod("GET") .withPath("/employees/organization/1")) .respond(response() .withStatusCode(200) .withContentType(MediaType.APPLICATION_JSON) .withBody(createEmployees()))

  webTestClient.get().uri(&quot;/organizations/1/with-employees&quot;)
      .accept(MediaType.APPLICATION_JSON) // (5)
      .exchange()
      .expectStatus().is2xxSuccessful
      .expectBody()
      .jsonPath(&quot;$.id&quot;).isNotEmpty
      .jsonPath(&quot;$.employees.length()&quot;).isEqualTo(2)
      .jsonPath(&quot;$.employees[0].id&quot;).isEqualTo(1)
      .jsonPath(&quot;$.employees[1].id&quot;).isEqualTo(2)

}

private fun createEmployees(): String { val employees: List<Employee> = listOf<Employee>( Employee(1, "Test1", 10000, 1), Employee(2, "Test2", 20000, 1) ) return jacksonObjectMapper().writeValueAsString(employees) } }

你可以在自己电脑上运行这些测试,验证所有测试是否已成功完成。clone repository 后,你需要运行 Docker 并使用以下 Maven 命令构建应用程序:

$ mvn clean package

我们还可以在 CircleCI 上为应用程序准备构建定义(build definition)。由于我们需要运行 Testcontainers,因此需要一台运行有 Docker 守护进程的机器。下面是 .circle/config.yml 文件中为 CircleCI 构建 pipeline 的配置:

# .circleci/config.yml
version: 2.1

jobs: build: docker: - image: 'cimg/openjdk:20.0' steps: - checkout - run: name: Analyze on SonarCloud command: mvn verify sonar:sonar -DskipTests

executors: machine_executor_amd64: machine: image: ubuntu-2204:2022.04.2 environment: architecture: "amd64" platform: "linux/amd64"

orbs: maven: circleci/maven@1.4.1

workflows: maven_test: jobs: - maven/test: executor: machine_executor_amd64 - build: context: SonarCloud

下面是在 CircleCI 上的构建结果:

Build results on CircleCI

如果你正在运行 Docker,也可以使用 Postgres 容器启动我们的 Spring Boot 响应式应用程序。这要归功于 spring-boot-testcontainers 模块。有一个专用的 @TestConfiguration 类可用于在开发模式下运行 Postgres:

@TestConfiguration
class PostgresContainerDevMode {
@Bean
@ServiceConnection
fun postgresql(): PostgreSQLContainer&lt;*&gt;? {
    return PostgreSQLContainer(&quot;postgres:14.0&quot;)
        .withUsername(&quot;spring&quot;)
        .withPassword(&quot;spring123&quot;)
}

}

现在,我们需要定义 test main class,它将使用 PostgresContainerDevMode 类中提供的配置。

class EmployeeApplicationTest

fun main(args: Array<String>) { fromApplication<EmployeeApplication>() .with(PostgresContainerDevMode::class) .run(*args) }

要在 Docker 上运行 Dev Postgres 应用程序,只需执行以下 Maven 命令:

$ mvn spring-boot:test-run

参考:https://piotrminkowski.com/2023/07/28/reactive-spring-boot-with-webflux-r2dbc-and-postgres/

赞(3)
未经允许不得转载:工具盒子 » 使用 Webflux R2dbc 和 Postgres 构建响应式 Spring Boot 应用