1、概览 {#1概览}
本文将会带你学习在 Spring 应用中实现 Kafka Consumer 重试消费的 2 种方式,及其优缺点。
关于如何在 Spring 中整合 Kafka 的细节,请参阅 这里。
2、项目设置 {#2项目设置}
创建一个新的 Spring Boot 项目,并添加 spring-kafka 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.1</version>
</dependency>
创建一个对象:
public class Greeting {
private String msg;
private String name;
// 构造函数、get、set 方法省略
}
3、Kafka Consumer {#3kafka-consumer}
Kafka Consumer(消费者)是从 Kafka 集群中读取数据的客户端应用程序。它订阅一个或多个 topic,并消费已发布的消息。Producer (生产者)向 topic 发送消息,topic 是存储和发布记录的类别名称。topic 被分为多个分区,以便横向扩展。每个分区都是一个不可更改的消息序列。
Consumer 可以通过指定偏移量(即消息在分区中的位置)来读取特定分区中的消息。Ack(确认)是消费者发送给 Kafka broker 的消息,表示它已成功处理了一条记录。一旦 ACK 被发送,消费者偏移量(consumer offset)将会被更新。
这将确保消息已被消费,并且不会再次传递给当前 Listener。
3.1、Ack 模式 {#31ack-模式}
Ack 模式决定了 broker 何时更新消费者偏移量(consumer offset)。
有三种 Ack 模式:
- auto-commit(自动提交):消费者在收到信息后立即向 broker 发送 Ack 信息。
- after-processing(处理后):消费者只有在成功消费消息后才向 broker 发送 Ack 信息。
- manual(手动):消费者在收到具体指令后才向 broker 发送 Ack 信息。
Ack 模式决定了消费者如何处理从 Kafka 集群读取的消息。
让我们创建一个新的 Bean,ConcurrentKafkaListenerContainerFactory
:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
// 其他配置
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.afterPropertiesSet();
return factory;
}
我们可以配置几种 Ack 模式:
AckMode.RECORD
:after-processing(处理后)模式,消费者会为其消费的每条信息发送 Ack 消息。AckMode.BATCH
:手动模式,消费者为一批消息发送确认,而不是为每条消息发送 Ack。AckMode.COUNT
:手动模式,消费者会在消费完一定数量的消息后发送 Ack 消息。AckMode.MANUAL
:手动模式,消费者不会对其消费的报文发送 Ack 消息。AckMode.TIME
:手动模式,消费者会在一定时间后发送 Ack 消息。
要在 Kafka 中实现消息处理的重试逻辑,我们需要选择一种 AckMode
。
这种 AckMode
应能让消费者向 broker 指出哪些特定消息已被成功消费。
这样,broker 就可以将任何未确认的消息重新发送给另一个消费者。
在阻塞式重试的情况下,这可能是 RECORD
或 MANUAL
(手动)模式。
4、阻塞式重试 {#4阻塞式重试}
如果初次尝试因临时错误而失败,阻塞式重试可让消费者再次尝试消费信息。
消费者等待一定时间(称为重试延迟周期 - retry backoff period)后,才会再次尝试消费信息。
此外,用户还可以使用固定延迟(fixed delay)或指数级回退策略(exponential backoff strategy)自定义重试延迟周期。
它还可以设置在放弃并将消息标记为失败之前的最大重试次数。
4.1、错误处理 {#41错误处理}
让我们在 Kafka 配置类上定义两个属性:
@Value(value = "${kafka.backoff.interval}")
private Long interval;
@Value(value = "${kafka.backoff.max_failure}")
private Long maxAttempts;
为了处理消费过程中抛出的所有异常,让我们定义一个新的 error handler:
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
// 当所有重试尝试都用尽时执行的逻辑
}, fixedBackOff);
return errorHandler;
}
FixedBackOff
类需要两个参数:
interval
:两次重试之间的等待时间(毫秒)。maxAttempts
:在放弃之前重新尝试操作的最大次数。
在这种策略中,消费者在重试消息消费之前会等待一段固定的时间。
DefaultErrorHandler
使用一个 lambda 函数进行初始化,该函数代表了当所有重试尝试都耗尽时要执行的逻辑。
lambda 函数有两个参数:
consumerRecord
:表示导致错误的 Kafka 记录。exception
:表示抛出的异常。
4.2、Container Factory {#42container-factory}
让我们在 container factory bean 上添加 error handler:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
// 其他配置
factory.setCommonErrorHandler(errorHandler());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
factory.afterPropertiesSet();
return factory;
}
如果存在重试策略,请将确认模式(ack mode)设置为 AckMode.RECORD
,以确保在处理过程中发生错误时,消费者将重新投递消息。
我们不应将确认模式(ack mode)设置为 AckMode.BATCH
或 AckMode.TIME
,因为消费者将一次性确认多个消息。
这是因为如果在处理消息时发生错误,消费者不会重新投递批处理或时间窗口中的所有消息给自己。
因此重试策略无法正确处理错误。
4.3、可重试异常和不可重试异常 {#43可重试异常和不可重试异常}
我们可以指定哪些异常可重试,哪些不可重试。
修改 ErrorHandler
:
@Bean
public DefaultErrorHandler errorHandler() {
BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
// 当所有重试尝试都用尽时执行的逻辑
}, fixedBackOff);
errorHandler.addRetryableExceptions(SocketTimeoutException.class);
errorHandler.addNotRetryableExceptions(NullPointerException.class);
return errorHandler;
}
如上,我们在消费者中指定了哪些异常类型应触发重试策略。
SocketTimeoutException
被认为是可重试的,而 NullPointerException
被认为是不可重试的。
如果我们没有定义、设置任何可重试异常,则将使用默认的可重试异常集:
4.4、优缺点 {#44优缺点}
在阻塞式重试中,当消息处理失败时,消费者会阻塞,直到重试机制完成重试或达到最大重试次数。
使用阻塞式重试有几个优点和缺点。
通过允许消费者在发生错误时重试消息的消费,阻塞式重试可以提高消息处理流水线的可靠性。即使发生短暂错误,这可以确保消息被成功处理。
阻塞式重试可以通过抽象出重试机制来简化消息处理逻辑的实现。消费者可以专注于处理消息,让重试机制处理任何可能发生的错误。
最后,如果消费者需要等待重试机制完成重试,阻塞式重试可能会在消息处理流水线中引入延迟。这可能会影响系统的整体性能。阻塞式重试还可能导致消费者消耗更多的资源,如CPU和内存,因为它需要等待重试机制完成重试。这可能会影响系统的整体可扩展性。
5、非阻塞式重试 {#5非阻塞式重试}
非阻塞式重试允许消费者在异步的情况下重试消息的消费,而不会阻塞消息监听方法的执行。
5.1、@RetryableTopic {#51retryabletopic}
让我们在 KafkaListener
上添加注解 @RetryableTopic
:
@Component
@KafkaListener(id = "multiGroup", topics = "greeting")
public class MultiTypeKafkaListener {
@KafkaHandler
@RetryableTopic(
backoff = @Backoff(value = 3000L),
attempts = "5",
autoCreateTopics = "false",
include = SocketTimeoutException.class, exclude = NullPointerException.class)
public void handleGreeting(Greeting greeting) {
System.out.println("Greeting received: " + greeting);
}
}
我们通过修改多个属性定制了重试行为,例如
backoff
:该属性指定在重试失败的消息时要使用的退避策略。attempts
:该属性指定在放弃之前消息应该重试的最大次数。autoCreateTopics
:如果 retry topic 和 DLT(死信 Topic)不存在,该属性指定是否自动创建它们。include
:指定需要触发重试的异常。exclude
:指定不需要触发重试的异常。
当信息无法投递到预定 topic 时,它会自动发送到 retry topic 进行重试。
如果在最大尝试次数后,信息仍无法送达,它将被发送到 DLT (死信 topic)进行进一步处理。
5.2、优缺点 {#52优缺点}
实现非阻塞式重试有几个好处:
- 提高性能:非阻塞式重试允许在不阻塞调用线程的情况下重试失败的消息,这可以提高应用程序的整体性能。
- 提高可靠性:非阻塞式重试可以帮助应用程序从故障中恢复并继续处理消息,即使某些消息未能成功传递。
不过,在实现非阻塞式重试时,也要考虑一些潜在的缺点:
- 复杂性增加: 非阻塞式重试会增加应用程序的复杂性,因为我们需要处理重试逻辑和 DLT(死信 Topic)。
- 消息重复消费的风险:如果消息在重试后成功送达,那么消费者可能会重复消费同一消息。我们需要考虑这一风险,并采取措施防止重复消费。
- 消息的顺序:重试的消息以异步方式发送到 retry topic,可能比未重试的消息更晚发送到原始 topic。
6、总结 {#6总结}
在本教程中,我们学习了如何在 Kafka logic 上实现重试逻辑,包括阻塞式和非阻塞式的方法。
参考:https://www.baeldung.com/spring-retry-kafka-consumer