51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

在 Spring 应用中实现 Kafka Consumer 重试消费

1、概览 {#1概览}

本文将会带你学习在 Spring 应用中实现 Kafka Consumer 重试消费的 2 种方式,及其优缺点。

关于如何在 Spring 中整合 Kafka 的细节,请参阅 这里

2、项目设置 {#2项目设置}

创建一个新的 Spring Boot 项目,并添加 spring-kafka 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.0.1</version>
</dependency>

创建一个对象:

public class Greeting {

    private String msg;
    private String name;

    // 构造函数、get、set 方法省略
}

3、Kafka Consumer {#3kafka-consumer}

Kafka Consumer(消费者)是从 Kafka 集群中读取数据的客户端应用程序。它订阅一个或多个 topic,并消费已发布的消息。Producer (生产者)向 topic 发送消息,topic 是存储和发布记录的类别名称。topic 被分为多个分区,以便横向扩展。每个分区都是一个不可更改的消息序列。

Consumer 可以通过指定偏移量(即消息在分区中的位置)来读取特定分区中的消息。Ack(确认)是消费者发送给 Kafka broker 的消息,表示它已成功处理了一条记录。一旦 ACK 被发送,消费者偏移量(consumer offset)将会被更新。

这将确保消息已被消费,并且不会再次传递给当前 Listener。

3.1、Ack 模式 {#31ack-模式}

Ack 模式决定了 broker 何时更新消费者偏移量(consumer offset)。

有三种 Ack 模式:

  1. auto-commit(自动提交):消费者在收到信息后立即向 broker 发送 Ack 信息。
  2. after-processing(处理后):消费者只有在成功消费消息后才向 broker 发送 Ack 信息。
  3. manual(手动):消费者在收到具体指令后才向 broker 发送 Ack 信息。

Ack 模式决定了消费者如何处理从 Kafka 集群读取的消息。

让我们创建一个新的 Bean,ConcurrentKafkaListenerContainerFactory

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    // 其他配置
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    factory.afterPropertiesSet();
    return factory;
}

我们可以配置几种 Ack 模式:

  1. AckMode.RECORD:after-processing(处理后)模式,消费者会为其消费的每条信息发送 Ack 消息。
  2. AckMode.BATCH:手动模式,消费者为一批消息发送确认,而不是为每条消息发送 Ack。
  3. AckMode.COUNT:手动模式,消费者会在消费完一定数量的消息后发送 Ack 消息。
  4. AckMode.MANUAL:手动模式,消费者不会对其消费的报文发送 Ack 消息。
  5. AckMode.TIME:手动模式,消费者会在一定时间后发送 Ack 消息。

要在 Kafka 中实现消息处理的重试逻辑,我们需要选择一种 AckMode

这种 AckMode 应能让消费者向 broker 指出哪些特定消息已被成功消费。

这样,broker 就可以将任何未确认的消息重新发送给另一个消费者。

在阻塞式重试的情况下,这可能是 RECORDMANUAL(手动)模式。

4、阻塞式重试 {#4阻塞式重试}

如果初次尝试因临时错误而失败,阻塞式重试可让消费者再次尝试消费信息。

消费者等待一定时间(称为重试延迟周期 - retry backoff period)后,才会再次尝试消费信息。

此外,用户还可以使用固定延迟(fixed delay)或指数级回退策略(exponential backoff strategy)自定义重试延迟周期。

它还可以设置在放弃并将消息标记为失败之前的最大重试次数。

4.1、错误处理 {#41错误处理}

让我们在 Kafka 配置类上定义两个属性:

@Value(value = "${kafka.backoff.interval}")
private Long interval;

@Value(value = "${kafka.backoff.max_failure}")
private Long maxAttempts;

为了处理消费过程中抛出的所有异常,让我们定义一个新的 error handler:

@Bean
public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
        // 当所有重试尝试都用尽时执行的逻辑
    }, fixedBackOff);
    return errorHandler;
}

FixedBackOff 类需要两个参数:

  • interval:两次重试之间的等待时间(毫秒)。
  • maxAttempts:在放弃之前重新尝试操作的最大次数。

在这种策略中,消费者在重试消息消费之前会等待一段固定的时间。

DefaultErrorHandler 使用一个 lambda 函数进行初始化,该函数代表了当所有重试尝试都耗尽时要执行的逻辑。

lambda 函数有两个参数:

  • consumerRecord:表示导致错误的 Kafka 记录。
  • exception:表示抛出的异常。

4.2、Container Factory {#42container-factory}

让我们在 container factory bean 上添加 error handler:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> greetingKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    // 其他配置
    factory.setCommonErrorHandler(errorHandler());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    factory.afterPropertiesSet();
    return factory;
}

如果存在重试策略,请将确认模式(ack mode)设置为 AckMode.RECORD,以确保在处理过程中发生错误时,消费者将重新投递消息。

我们不应将确认模式(ack mode)设置为 AckMode.BATCHAckMode.TIME,因为消费者将一次性确认多个消息。

这是因为如果在处理消息时发生错误,消费者不会重新投递批处理或时间窗口中的所有消息给自己。

因此重试策略无法正确处理错误。

4.3、可重试异常和不可重试异常 {#43可重试异常和不可重试异常}

我们可以指定哪些异常可重试,哪些不可重试。

修改 ErrorHandler

@Bean
public DefaultErrorHandler errorHandler() {
    BackOff fixedBackOff = new FixedBackOff(interval, maxAttempts);
    DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, e) -> {
        // 当所有重试尝试都用尽时执行的逻辑
    }, fixedBackOff);
    errorHandler.addRetryableExceptions(SocketTimeoutException.class);
    errorHandler.addNotRetryableExceptions(NullPointerException.class);
    return errorHandler;
}

如上,我们在消费者中指定了哪些异常类型应触发重试策略。

SocketTimeoutException 被认为是可重试的,而 NullPointerException 被认为是不可重试的。

如果我们没有定义、设置任何可重试异常,则将使用默认的可重试异常集:

4.4、优缺点 {#44优缺点}

在阻塞式重试中,当消息处理失败时,消费者会阻塞,直到重试机制完成重试或达到最大重试次数。

使用阻塞式重试有几个优点和缺点。

通过允许消费者在发生错误时重试消息的消费,阻塞式重试可以提高消息处理流水线的可靠性。即使发生短暂错误,这可以确保消息被成功处理。

阻塞式重试可以通过抽象出重试机制来简化消息处理逻辑的实现。消费者可以专注于处理消息,让重试机制处理任何可能发生的错误。

最后,如果消费者需要等待重试机制完成重试,阻塞式重试可能会在消息处理流水线中引入延迟。这可能会影响系统的整体性能。阻塞式重试还可能导致消费者消耗更多的资源,如CPU和内存,因为它需要等待重试机制完成重试。这可能会影响系统的整体可扩展性。

5、非阻塞式重试 {#5非阻塞式重试}

非阻塞式重试允许消费者在异步的情况下重试消息的消费,而不会阻塞消息监听方法的执行。

5.1、@RetryableTopic {#51retryabletopic}

让我们在 KafkaListener 上添加注解 @RetryableTopic

@Component
@KafkaListener(id = "multiGroup", topics = "greeting")
public class MultiTypeKafkaListener {

    @KafkaHandler
    @RetryableTopic(
      backoff = @Backoff(value = 3000L), 
      attempts = "5", 
      autoCreateTopics = "false",
      include = SocketTimeoutException.class, exclude = NullPointerException.class)
    public void handleGreeting(Greeting greeting) {
        System.out.println("Greeting received: " + greeting);
    }
}

我们通过修改多个属性定制了重试行为,例如

  • backoff:该属性指定在重试失败的消息时要使用的退避策略。
  • attempts:该属性指定在放弃之前消息应该重试的最大次数。
  • autoCreateTopics:如果 retry topic 和 DLT(死信 Topic)不存在,该属性指定是否自动创建它们。
  • include:指定需要触发重试的异常。
  • exclude:指定不需要触发重试的异常。

当信息无法投递到预定 topic 时,它会自动发送到 retry topic 进行重试。

如果在最大尝试次数后,信息仍无法送达,它将被发送到 DLT (死信 topic)进行进一步处理。

5.2、优缺点 {#52优缺点}

实现非阻塞式重试有几个好处:

  • 提高性能:非阻塞式重试允许在不阻塞调用线程的情况下重试失败的消息,这可以提高应用程序的整体性能。
  • 提高可靠性:非阻塞式重试可以帮助应用程序从故障中恢复并继续处理消息,即使某些消息未能成功传递。

不过,在实现非阻塞式重试时,也要考虑一些潜在的缺点:

  • 复杂性增加: 非阻塞式重试会增加应用程序的复杂性,因为我们需要处理重试逻辑和 DLT(死信 Topic)。
  • 消息重复消费的风险:如果消息在重试后成功送达,那么消费者可能会重复消费同一消息。我们需要考虑这一风险,并采取措施防止重复消费。
  • 消息的顺序:重试的消息以异步方式发送到 retry topic,可能比未重试的消息更晚发送到原始 topic。

6、总结 {#6总结}

在本教程中,我们学习了如何在 Kafka logic 上实现重试逻辑,包括阻塞式和非阻塞式的方法。


参考:https://www.baeldung.com/spring-retry-kafka-consumer

赞(4)
未经允许不得转载:工具盒子 » 在 Spring 应用中实现 Kafka Consumer 重试消费