1、概览 {#1概览}
Spring Cloud AWS 是一个旨在简化与 AWS 服务交互的项目。SQS(Simple Queue Service)是 AWS 的一种解决方案,用于以可扩展的方式发送和接收异步消息。
本文将带你了解针对于 Spring Cloud AWS 3.0 完全重写的 Spring Cloud AWS SQS Integration。
该框架为处理 SQS 队列提供了熟悉的 Spring 抽象,如 SqsTemplate
和 @SqsListener
注解。
本文以一个事件驱动的场景为例,介绍了如何发送和接收消息。并展示使用 Testcontainers
(一种管理一次性 Docker 容器的工具)和 LocalStack(可在本地模拟类似 AWS 的环境,用于测试)设置集成测试的策略。
2、依赖 {#2依赖}
Spring Cloud AWS BOM 可确保项目之间的版本兼容。它声明了包括 Spring Boot 在内的许多依赖项的版本。
添加 Spring Cloud AWS BOM 到 pom.xml
:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.0.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
我们需要的主要依赖关系是 SQS Starter,它包含项目中所有与 SQS 相关的类。SQS Integration 不依赖于 Spring Boot,可在任何标准 Java 应用中独立使用:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
对于 Spring Boot 应用(如本教程中构建的应用),我们应该添加项目的 Core Starter,因为它允许我们利用 Spring Boot 的 SQS 自动配置以及 AWS 配置(如 credentials
和 region
):
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
3、设置本地测试环境 {#3设置本地测试环境}
在本节中,我们将使用 Testcontainers
设置 LocalStack
环境,以便在本地环境中测试代码。注意,本教程中的示例也可以直接针对 AWS 执行。
3.1、依赖 {#31依赖}
要使用 JUnit 5 运行 LocalStack
和 TestContainers
,还需要两个额外的依赖项:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
还需要 awaitility 库,以断言异步消息消费:
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
3.2、配置 {#32配置}
现在,创建一个具有管理容器逻辑的类,测试套件可以继承该类。我们将其命名为 BaseSqsIntegrationTests
。对于继承该类的每个测试套件,Testcontainers
都将创建并启动一个新容器,这对于将每个套件的数据相互隔离至关重要。
@SpringBootTest
注解是初始化 Spring Context 所必需的,而 @Testcontainers
注解则将我们的 Testcontainers
注解与 JUnit 的运行时关联起来,这样容器就能在测试套件运行时启动,并在测试完成后停止:
@SpringBootTest
@Testcontainers
public class BaseSqsIntegrationTest {
// 在此处添加测试配置
}
现在声明 LocalStackContainer
。为了让框架自动管理容器的生命周期,@Container
注解是必须的:
private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";
@Container
static LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
最后,把 Spring Cloud AWS 框架用于自动配置的属性与 LocalStack
绑定。我们将在运行时获取容器端口和主机,因为 Testcontainers
会为我们提供随机端口,这非常适合并行测试。为此,可以使用 @DynamicPropertySource
注解:
@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
.toString());
// ...可在此处添加其他 AWS 服务端点
}
这就是我们使用 LocalStack
、Testcontainers
和 Spring Cloud AWS 实现 Spring Boot 测试所需的全部内容。在运行测试之前,还需要确保在本地环境中安装且运行了 Docker 引擎。
4、设置队列名称 {#4设置队列名称}
可以用 Spring Boot 的 application.yml
来设置队列名称。
在本教程中,创建三个队列:
events:
queues:
user-created-by-name-queue: user_created_by_name_queue
user-created-record-queue: user_created_record_queue
user-created-event-type-queue: user_created_event_type_queue
创建一个 POJO 来表示这些属性:
@ConfigurationProperties(prefix = "events.queues")
public class EventQueuesProperties {
private String userCreatedByNameQueue;
private String userCreatedRecordQueue;
private String userCreatedEventTypeQueue;
// Getter / Setter
}
最后,需要在 @Configuration
注解的类或 Spring Application main 类中使用 @EnableConfigurationProperties
注解,让 Spring Boot 知道要在其中注入 application.yml
属性:
@SpringBootApplication
@EnableConfigurationProperties(EventQueuesProperties.class)
public class SpringCloudAwsApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudAwsApplication.class, args);
}
}
现在,可以在需要队列名称时注入值本身或 POJO。
默认情况下,Spring Cloud AWS SQS 会在未找到队列时创建队列,这有助于我们快速建立开发环境。在生产环境中,应用不应拥有创建队列的权限,因此如果找不到队列,它将无法启动。也可以将框架配置为在找不到队列时显式启动失败。
5、发送和接收消息 {#5发送和接收消息}
使用 Spring Cloud AWS 向 SQS 收发消息有多种方法。在此,本文介绍最常见的几种方法,使用 SqsTemplate
发送消息和使用 @SqsListener
注解接收消息。
5.1、场景 {#51场景}
在我们场景,我们将模拟一个事件驱动的应用,它通过在本地 Repository 中保存相关信息来响应 UserCreatedEvent
。
创建 User
Entity:
public record User(String id, String name, String email) {
}
创建一个简单的基于内存的 UserRepository
:
@Repository
public class UserRepository {
private final Map<String, User> persistedUsers = new ConcurrentHashMap<>();
public void save(User userToSave) {
persistedUsers.put(userToSave.id(), userToSave);
}
public Optional<User> findById(String userId) {
return Optional.ofNullable(persistedUsers.get(userId));
}
public Optional<User> findByName(String name) {
return persistedUsers.values().stream()
.filter(user -> user.name().equals(name))
.findFirst();
}
}
最后,创建一个 UserCreatedEvent
Java Record 类:
public record UserCreatedEvent(String id, String username, String email) {
}
5.2、设置 {#52设置}
为了测试我们的场景,需要创建一个 SpringCloudAwsSQSLiveTest
类,该类将继承我们之前创建的 BaseSqsIntegrationTest
。我们将自动注入三个依赖:框架自动配置的 SqsTemplate
、UserRepository
(以便断言消息处理是否成功)以及包含队列名称的 EventQueuesProperties
POJO:
public class SpringCloudAwsSQSLiveTest extends BaseSqsIntegrationTest {
private static final Logger logger = LoggerFactory.getLogger(SpringCloudAwsSQSLiveTest.class);
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private UserRepository userRepository;
@Autowired
private EventQueuesProperties eventQueuesProperties;
// ...
}
创建一个包含监听器的 UserEventListeners
类,并将其声明为 Spring @Component
:
@Component
public class UserEventListeners {
private static final Logger logger = LoggerFactory.getLogger(UserEventListeners.class);
public static final String EVENT_TYPE_CUSTOM_HEADER = "eventType";
private final UserRepository userRepository;
public UserEventListeners(UserRepository userRepository) {
this.userRepository = userRepository;
}
// 在此添加监听器
}
5.3、String 消息 {#53string-消息}
第一个示例,我们将发送一条 String 消息,在监听器中接收该消息,并将其持久化到 Repository 中。然后,检索 Repository,以确保应用正确地持久化数据。
首先,在测试类中创建一个发送消息的测试:
@Test
void givenAStringPayload_whenSend_shouldReceive() {
// given
var userName = "Albert";
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedByNameQueue())
.payload(userName));
logger.info("Message sent with payload {}", userName);
// then
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findByName(userName)
.isPresent());
}
执行测试,我们应该可以在控制台看到类似如下的日志:
INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload Albert
注意,测试失败的原因是我们还没有该队列的监听器。
让我们在监听器类中设置监听器,以便从队列中接收消息,并使测试通过:
@SqsListener("${events.queues.user-created-by-name-queue}")
public void receiveStringMessage(String username) {
logger.info("Received message: {}", username);
userRepository.save(new User(UUID.randomUUID()
.toString(), username, null));
}
现在,运行测试,应该能在日志中看到结果:
INFO [ntContainer#0-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message: Albert
测试通过。
注意,我们使用了 Spring 的属性解析功能从之前创建的 application.yml
中获取队列名称。
5.4、POJO 和 Record 消息 {#54pojo-和-record-消息}
我们已经发送和接收了 String
,现在让我们用 Java Record(之前创建的 UserCreatedEvent
)来设置一个场景。
首先,编写失败测试:
@Test
void givenARecordPayload_whenSend_shouldReceive() {
// given
String userId = UUID.randomUUID()
.toString();
var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com");
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedRecordQueue())
.payload(payload));
// then
logger.info("Message sent with payload: {}", payload);
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findById(userId)
.isPresent());
}
在测试失败之前,你应该能看到类似如下的日志:
INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Message sent with payload: UserCreatedEvent[id=67f52cf6-c750-4200-9a02-345bda0516f8, username=John, email=john@baeldung.com]
现在,创建相应的监听器,使测试通过:
@SqsListener("${events.queues.user-created-record-queue}")
public void receiveRecordMessage(UserCreatedEvent event) {
logger.info("Received message: {}", event);
userRepository.save(new User(event.id(), event.username(), event.email()));
}
输出结果如下,说明消息已收到,测试通过:
INFO [ntContainer#1-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message: UserCreatedEvent[id=2d66df3d-2dbd-4aed-8fc0-ddd08416ed12, username=John, email=john@baeldung.com]
框架会自动配置 Spring Context
中可用的任何 ObjectMapper
Bean,以处理消息的序列化和反序列化。我们可以配置自己的 ObjectMapper
,并以多种方式自定义序列化,但这超出了本教程的范围,这里不多解释。
5.5、Spring Message 和 Header {#55spring-message-和-header}
最后一种场景,发送带有自定义 Header 的 Record
,并以 Spring Message
实例的形式接收消息,同时在方法签名中添加自定义 Header 和标准 SQS Header。框架会自动将所有 SQS 消息属性(message attributes) 转换为消息 Header,包括用户提供的任何属性。
首先,创建一个失败的测试:
@Test
void givenCustomHeaders_whenSend_shouldReceive() {
// given
String userId = UUID.randomUUID()
.toString();
var payload = new UserCreatedEvent(userId, "John", "john@baeldung.com");
var headers = Map.<String, Object> of(EVENT_TYPE_CUSTOM_HEADER, "UserCreatedEvent");
// when
sqsTemplate.send(to -> to.queue(eventQueuesProperties.getUserCreatedEventTypeQueue())
.payload(payload)
.headers(headers));
// then
logger.info("Sent message with payload {} and custom headers: {}", payload, headers);
await().atMost(Duration.ofSeconds(3))
.until(() -> userRepository.findById(userId)
.isPresent());
}
测试失败,生成的日志和上文类似:
INFO [ main] c.b.s.c.a.sqs.SpringCloudAwsSQSLiveTest : Sent message with payload UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, email=john@baeldung.com] and custom headers: {eventType=UserCreatedEvent}
现在,添加相应的监听器,使测试通过:
@SqsListener("${events.queues.user-created-event-type-queue}")
public void customHeaderMessage(Message<UserCreatedEvent> message, @Header(EVENT_TYPE_CUSTOM_HEADER) String eventType,
@Header(SQS_APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) Long firstReceive) {
logger.info("Received message {} with event type {}. First received at approximately {}.", message, eventType, firstReceive);
UserCreatedEvent payload = message.getPayload();
userRepository.save(new User(payload.id(), payload.username(), payload.email()));
}
重新运行测试,输出日志如下,表明测试成功:
INFO [ntContainer#2-1] c.b.s.cloud.aws.sqs.UserEventListeners : Received message GenericMessage [payload=UserCreatedEvent[id=575de854-82de-44e4-8dfe-8fdc9f6ae4a1, username=John, email=john@baeldung.com], headers=...
在这个示例中,我们接收到一条消息,其 Payload 是反序列化后的 UserCreatedEvent
Record 和两个 Header。为确保整个项目的一致性,应使用 SqsHeader
类常量来检索 SQS 标准 Header。
6、总结 {#6总结}
本文使用了一个事件驱动场景,通过不同的示例介绍了如何使用 Spring Cloud AWS SQS 3.0 发送和接收消息。
Ref:https://www.baeldung.com/java-spring-cloud-aws-v3-intro