1、概览 {#1概览}
消息确认是 MQ 系统中的一种标准机制,它向 Message Broker 发出信号,表明消息已被消费,不应再次传递。在亚马逊的 SQS(Simple Queue Servic)中,确认是通过删除队列中的信息来执行的。
本文将带你了解 Spring Cloud AWS SQS v3 中开箱即提供的三种确认模式: ON_SUCCESS
、MANUAL
和 ALWAYS
。
本文将利用 Spring Cloud AWS SQS V3 入门文章 中的环境和测试设置,使用事件驱动场景来说明用例。
2、依赖 {#2依赖}
首先,导入 Spring Cloud AWS BOM,以确保 pom.xml
中的所有依赖项相互兼容:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws</artifactId>
<version>3.1.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
还要添加 Core 和 SQS starter 依赖:
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
</dependency>
最后,添加测试所需的依赖,即带有 JUnit 5 的 LocalStack 和 TestContainers 、用于验证异步消息消费的 awaitility 库,以及用于处理断言的 AssertJ:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
3、设置本地测试环境 {#3设置本地测试环境}
首先,使用 Testcontainers 配置一个本地测试的 LocalStack 环境:
@Testcontainers
public class BaseSqsLiveTest {
private static final String LOCAL_STACK_VERSION = "localstack/localstack:2.3.2";
@Container
static LocalStackContainer localStack = new LocalStackContainer(DockerImageName.parse(LOCAL_STACK_VERSION));
@DynamicPropertySource
static void overrideProperties(DynamicPropertyRegistry registry) {
registry.add("spring.cloud.aws.region.static", () -> localStack.getRegion());
registry.add("spring.cloud.aws.credentials.access-key", () -> localStack.getAccessKey());
registry.add("spring.cloud.aws.credentials.secret-key", () -> localStack.getSecretKey());
registry.add("spring.cloud.aws.sqs.endpoint", () -> localStack.getEndpointOverride(SQS)
.toString());
}
}
这种设置使测试变得简单且可重复,本教程中的代码也可直接用于 AWS。
4、设置队列名称 {#4设置队列名称}
默认情况下,Spring Cloud AWS SQS 会自动创建任何 @SqsListener
注解方法中指定的队列。首先,在 application.yaml
中定义队列名称:
events:
queues:
order-processing-retry-queue: order_processing_retry_queue
order-processing-async-queue: order_processing_async_queue
order-processing-no-retries-queue: order_processing_no_retries_queue
acknowledgment:
order-processing-no-retries-queue: ALWAYS
其中一个监听器定义了 acknowledgment
属性: ALWAYS
。
还可以添加一些 product.id,以便在整个示例中使用:
product:
id:
smartphone: 123e4567-e89b-12d3-a456-426614174000
wireless-headphones: 123e4567-e89b-12d3-a456-426614174001
laptop: 123e4567-e89b-12d3-a456-426614174002
tablet: 123e4567-e89b-12d3-a456-426614174004
为了在应用中以 POJO 的形式获取这些属性,需要创建两个 @ConfigurationProperties
类,其中一个用于队列:
@ConfigurationProperties(prefix = "events.queues")
public class EventsQueuesProperties {
private String orderProcessingRetryQueue;
private String orderProcessingAsyncQueue;
private String orderProcessingNoRetriesQueue;
// getter 和 setter
}
另一个用于 product:
@ConfigurationProperties("product.id")
public class ProductIdProperties {
private UUID smartphone;
private UUID wirelessHeadphones;
private UUID laptop;
// getter 和 setter
}
最后,使用 @EnableConfigurationProperties
在 @Configuration
类中启用配置属性:
@EnableConfigurationProperties({ EventsQueuesProperties.class, ProductIdProperties.class})
@Configuration
public class OrderProcessingConfiguration {
}
5、处理成功后确认消息 {#5处理成功后确认消息}
@SqsListener
的默认确认模式是 ON_SUCCESS
。在这种模式下,如果监听器方法执行完毕而没有抛出异常,消息就会被确认。
为了说明这种行为,创建一个简单的监听器,它将接收 OrderCreatedEvent
(订单创建事件),检查 InventoryService
(库存服务),如果请求的项目和数量有库存,则将订单状态更改为 PROCESSED
。
5.1、创建服务 {#51创建服务}
首先,创建 OrderService
,它负责更新订单状态:
@Service
public class OrderService {
Map<UUID, OrderStatus> ORDER_STATUS_STORAGE = new ConcurrentHashMap<>();
public void updateOrderStatus(UUID orderId, OrderStatus status) {
ORDER_STATUS_STORAGE.put(orderId, status);
}
public OrderStatus getOrderStatus(UUID orderId) {
return ORDER_STATUS_STORAGE.getOrDefault(orderId, OrderStatus.UNKNOWN);
}
}
然后,创建 InventoryService
。使用 Map
来模拟数据库,通过 ProductIdProperties
来填充,ProductIDProperties
会自动注入 application.yaml
文件中的值:
@Service
public class InventoryService implements InitializingBean {
private ProductIdProperties productIdProperties;
private Map<UUID, Integer> inventory;
public InventoryService(ProductIdProperties productIdProperties) {
this.productIdProperties = productIdProperties;
}
@Override
public void afterPropertiesSet() {
this.inventory = new ConcurrentHashMap<>(Map.of(productIdProperties.getSmartphone(), 10,
productIdProperties.getWirelessHeadphones(), 15,
productIdProperties.getLaptop(), 5);
}
}
InitializingBean
接口提供了 afterPropertiesSet
方法,这是 Spring 在解析了 Bean 的所有依赖(在本例中是 ProductIdProperties
Bean)后调用的生命周期方法。
添加一个 checkInventory
方法,用于验证库存中是否有所需数量的产品。如果产品不存在,它将抛出 ProductNotFoundException
异常;如果产品存在但数量不足,它将抛出 OutOfStockException
异常。在第二种情况下,我们还模拟随机补货,这样经过几次重试后,处理最终会成功:
public void checkInventory(UUID productId, int quantity) {
Integer stock = inventory.get(productId);
if (stock < quantity) {
inventory.put(productId, stock + (int) (Math.random() * 5));
throw new OutOfStockException(
"Product with id %s is out of stock. Quantity requested: %s ".formatted(productId, quantity));
};
inventory.put(productId, stock - quantity);
}
5.2、创建监听器 {#52创建监听器}
创建第一个监听器,使用 @Component
注解,并通过 Spring 的构造函数依赖注入机制注入服务:
@Component
public class OrderProcessingListeners {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingListeners.class);
private InventoryService inventoryService;
private OrderService orderService;
public OrderProcessingListeners(InventoryService inventoryService, OrderService orderService) {
this.inventoryService = inventoryService;
this.orderService = orderService;
}
}
接下来,编写监听器方法:
@SqsListener(value = "${events.queues.order-processing-retry-queue}", id = "retry-order-processing-container", messageVisibilitySeconds = "1")
public void stockCheckRetry(OrderCreatedEvent orderCreatedEvent) {
logger.info("Message received: {}", orderCreatedEvent);
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED);
logger.info("Message processed successfully: {}", orderCreatedEvent);
}
value
属性是通过 application.yaml
自动注入的队列名称。由于 ON_SUCCESS
是默认的确认模式,因此不需要在注解中指定它。
5.3、设置测试类 {#53设置测试类}
为了确保逻辑按预期运行,创建一个测试类:
@SpringBootTest
class OrderProcessingApplicationLiveTest extends BaseSqsLiveTest {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingApplicationLiveTest.class);
@Autowired
private EventsQueuesProperties eventsQueuesProperties;
@Autowired
private ProductIdProperties productIdProperties;
@Autowired
private SqsTemplate sqsTemplate;
@Autowired
private OrderService orderService;
@Autowired
private MessageListenerContainerRegistry registry;
}
还添加一个名为 assertQueueIsEmpty
的方法。在该方法中,使用自动注入的 MessageListenerContainerRegistry
来获取容器,然后停止容器以确保它没有消费任何消息。Registry 包含由 @SqsListener
注解创建的所有容器:
private void assertQueueIsEmpty(String queueName, String containerId) {
logger.info("Stopping container {}", containerId);
var container = Objects
.requireNonNull(registry.getContainerById(containerId), () -> "could not find container " + containerId);
container.stop();
// ...
}
容器停止后,使用 SqsTemplate
在队列中查找消息。如果确认成功,则不应返回任何消息。还把 pollTimeout
设置为大于可见性(message visibility)超时的值,这样如果消息尚未被删除,就会在指定的时间间隔内再次发送。
下面是 assertQueueIsEmpty
方法的另一部分:
// ...
logger.info("Checking for messages in queue {}", queueName);
var message = sqsTemplate.receive(from -> from.queue(queueName)
.pollTimeout(Duration.ofSeconds(5)));
assertThat(message).isEmpty();
logger.info("No messages found in queue {}", queueName);
5.4、测试 {#54测试}
在第一个测试中,向队列发送一个 OrderCreatedEvent
(订单创建事件),其中包含一个数量大于库存量的产品订单。当异常通过监听器方法时,它会向框架发出消息处理失败的信号,消息应在消息可见性(message visibility)时间窗口结束后再次发送。
为了加快测试速度,我们在注解中将消息可见性秒数(messageVisibilitySeconds
)设为 1
,但通常情况下,这一配置是在队列本身中完成的,默认值为 30
秒。
使用 Spring Cloud AWS 提供的自动配置 SqsTemplate
创建事件并发送。然后,使用 Awaitility
等待订单状态变为 PROCESSED
,最后,断言队列为空,这意味着确认成功:
@Test
public void givenOnSuccessAcknowledgementMode_whenProcessingThrows_shouldRetry() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingRetryQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getLaptop(), 10));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.PROCESSED));
assertQueueIsEmpty(queueName, "retry-order-processing-container");
}
注意,我们将 @SqsListener
注解中指定的 containerId
传递给了 assertQueueIsEmpty
方法。
现在运行测试。首先,要确保 Docker 正在运行,然后执行测试。在容器初始化日志之后,我们应该能看到应用的日志信息:
Message received: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]
然后,由于库存不足,应该会出现一次或多次异常:
Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174002 is out of stock. Quantity requested: 10
而且,由于我们添加了补充逻辑,我们最终应该看到消息处理成功了:
Message processed successfully: OrderCreatedEvent[id=83f27bf2-1bd4-460a-9006-d784ec7eff47, productId=123e4567-e89b-12d3-a456-426614174002, quantity=10]
最后,确保确认成功:
INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container retry-order-processing-container
INFO 2699 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container retry-order-processing-container stopped
INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_retry_queue
INFO 2699 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_retry_queue
注意,测试完成后可能会抛出 "connection refused" 错误,这是因为 Docker 容器在框架停止轮询消息之前就已停止,可以放心地忽略这些错误。
6、手动确认 {#6手动确认}
框架支持手动确认信息,这对于需要更好地控制确认过程的情况下非常有用。
6.1、创建监听器 {#61创建监听器}
为了说明这一点,我们创建一个异步场景,在这个场景中,InventoryService
的连接速度很慢,我们希望在它完成之前释放监听器线程:
@SqsListener(value = "${events.queues.order-processing-async-queue}", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL, id = "async-order-processing-container", messageVisibilitySeconds = "3")
public void slowStockCheckAsynchronous(OrderCreatedEvent orderCreatedEvent, Acknowledgement acknowledgement) {
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSING);
CompletableFuture.runAsync(() -> inventoryService.slowCheckInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity()))
.thenRun(() -> orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.PROCESSED))
.thenCompose(voidFuture -> acknowledgement.acknowledgeAsync())
.thenRun(() -> logger.info("Message for order {} acknowledged", orderCreatedEvent.id()));
logger.info("Releasing processing thread.");
}
在此逻辑中,我们使用 Java 的 CompletableFuture
异步运行库存检查。将 Acknowledge
对象添加到监听器方法中,并将 SqsListenerAcknowledgementMode.MANUAL
添加到注解的 acknowledgementMode
属性中。该属性是字符串,可接受属性占位符和 SpEL 。只有当我们将 AcknowledgementMode
设置为 MANUAL
时,Acknowledgement
对象才可用。
注意,示例中利用了 Spring Boot 自动配置(它提供了合理的默认值)和 @SqsListener
注解属性来更改确认模式。另一种方法是声明一个 SqsMessageListenerContainerFactory
Bean,它允许设置更复杂的配置。
6.2、模拟慢连接 {#62模拟慢连接}
现在,在 InventoryService
类中添加 slowCheckInventory
方法,使用 Thread.sleep
模拟慢连接:
public void slowCheckInventory(UUID productId, int quantity) {
simulateBusyConnection();
checkInventory(productId, quantity);
}
private void simulateBusyConnection() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
6.3、测试 {#63测试}
接下来,编写测试:
@Test
public void givenManualAcknowledgementMode_whenManuallyAcknowledge_shouldAcknowledge() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingAsyncQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getSmartphone(), 1));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.PROCESSED));
assertQueueIsEmpty(queueName, "async-order-processing-container");
}
这一次,我们请求的是库存中的可用数量,所以应该不会出现错误。
运行测试,我们会看到一条日志信息,显示已收到消息:
INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message received: OrderCreatedEvent[id=013740a3-0a45-478a-b085-fbd634fbe66d, productId=123e4567-e89b-12d3-a456-426614174000, quantity=1]
然后,我们会看到线程的释放信息:
INFO 2786 --- [ing-container-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Releasing processing thread.
这是因为我们在异步处理和确认消息。大约两秒钟后,就会看到日志显示消息已确认:
INFO 2786 --- [onPool-worker-1] c.b.s.c.a.s.a.l.OrderProcessingListeners : Message for order 013740a3-0a45-478a-b085-fbd634fbe66d acknowledged
最后,可以看到停止容器和断言队列为空的日志:
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container async-order-processing-container
INFO 2786 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container async-order-processing-container stopped
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_async_queue
INFO 2786 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_async_queue
7、不论成功和失败都确认消息 {#7不论成功和失败都确认消息}
最后一种确认模式是 ALWAYS
,无论监听器方法是否抛出错误,框架都会确认消息。
7.1、创建监听器 {#71创建监听器}
模拟一个销售事件,在这个事件中,我们的库存是有限的,而且我们不想重新处理任何信息,无论是否有任何失败。
使用之前在 application.yml
中定义的属性将确认模式设置为 ALWAYS
:
@SqsListener(value = "${events.queues.order-processing-no-retries-queue}", acknowledgementMode = ${events.acknowledgment.order-processing-no-retries-queue}, id = "no-retries-order-processing-container", messageVisibilitySeconds = "3")
public void stockCheckNoRetries(OrderCreatedEvent orderCreatedEvent) {
logger.info("Message received: {}", orderCreatedEvent);
orderService.updateOrderStatus(orderCreatedEvent.id(), OrderStatus.RECEIVED);
inventoryService.checkInventory(orderCreatedEvent.productId(), orderCreatedEvent.quantity());
logger.info("Message processed: {}", orderCreatedEvent);
}
在测试中,创建一个数量大于库存的订单:
7.2、测试 {#72测试}
@Test
public void givenAlwaysAcknowledgementMode_whenProcessThrows_shouldAcknowledge() {
var orderId = UUID.randomUUID();
var queueName = eventsQueuesProperties.getOrderProcessingNoRetriesQueue();
sqsTemplate.send(queueName, new OrderCreatedEvent(orderId, productIdProperties.getWirelessHeadphones(), 20));
Awaitility.await()
.atMost(Duration.ofMinutes(1))
.until(() -> orderService.getOrderStatus(orderId)
.equals(OrderStatus.RECEIVED));
assertQueueIsEmpty(queueName, "no-retries-order-processing-container");
}
现在,即使抛出 OutOfStockException
,消息也会被确认,并且不会对消息进行重试:
Message received: OrderCreatedEvent[id=7587f1a2-328f-4791-8559-ee8e85b25259, productId=123e4567-e89b-12d3-a456-426614174001, quantity=20]
Caused by: com.baeldung.spring.cloud.aws.sqs.acknowledgement.exception.OutOfStockException: Product with id 123e4567-e89b-12d3-a456-426614174001 is out of stock. Quantity requested: 20
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Stopping container no-retries-order-processing-container
INFO 2835 --- [main] a.c.s.l.AbstractMessageListenerContainer : Container no-retries-order-processing-container stopped
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : Checking for messages in queue order_processing_no_retries_queue
INFO 2835 --- [main] a.s.a.OrderProcessingApplicationLiveTest : No messages found in queue order_processing_no_retries_queue
8、总结 {#8总结}
本文通过一个事件驱动场景来介绍了 Spring Cloud AWS v3 SQS 提供的三种消息确认模式: ON_SUCCESS
(默认)、MANUAL
和 ALWAYS
。
Ref:https://www.baeldung.com/java-spring-cloud-aws-v3-message-acknowledgement