在本系列教程的前几章中,我们分析了事务在 Spring Cloud Stream Kafka 应用中的工作原理。了解了事务发挥作用的不同环境,包括生产者和消费者应用,以及应用如何正确使用事务。现在,这些基本要素已经介绍完毕,让我们继续了解事务的另一个方面:在发生错误时回滚事务。当错误发生时,事务处理系统无法提交事务,事务管理器就会回滚事务,不会为下游消费者保留任何内容。如果应用能够决定回滚机制的工作方式,将大有裨益。Spring Cloud Stream 通过 Spring 对 Apache Kafla 的基本支持,为回滚定制提供了便利。在处理生产者和消费者(消费-处理-生产)事务性应用时,我们必须注意一些事情。接下来,让我们一起深入了解这些内容。
生产者发起的事务 {#生产者发起的事务}
下面是我们在 上一篇文章 中看到的代码片段。
@Transactional
public void send(StreamBridge streamBridge)
{
for (int i = 0; i < 5; i++) {
streamBridge.send("mySupplier-out-0", "my data: " + i);
}
}
如果事务方法抛出异常,我们应该怎么办?从 Spring Cloud Stream 的角度来看,我们不需要做任何操作。事务拦截器会启动回滚,最终由 Kafka 中的事务协调器中断事务。最后,异常会传播给调用者,然后调用者可以决定在错误是暂时的情况下重新触发事务方法。由于这是一个生产者发起的事务,框架不会进行重试。这种情况很简单,因为在事务回滚期间,从应用或框架的角度来看,我们不需要做任何操作。如果发生错误,它将被保证回滚。然而,需要注意的是,即使事务被回滚,Kafka 日志中可能存在未提交的记录。使用隔离级别为 read_uncommitted
(默认级别)的消费者仍然会接收这些记录。因此,消费者应用程序必须确保使用 read_committed
隔离级别,这样才不会收到上游事务回滚的任何记录。
生产者发起的事务与外部事务同步 {#生产者发起的事务与外部事务同步}
在本系列教程的上一章,我们看到了这种情况。与第一种情况一样,如果方法抛出异常并发生回滚,即使 Kafka 事务与数据库事务同步,应用也无需做任何事情来处理错误。数据库和 Kafka 发布的事务都会回滚。
消费者发起的事务回滚 {#消费者发起的事务回滚}
如果生产者发起的事务回滚是如此简单,你可能会想知道为什么我们要专门为这个主题撰写一篇完整的文章。什么时候应用程序需要提供特定的回滚策略呢?当你有一个正在进行的消费者发起的事务时,这就有了意义,因为我们需要特别关注如何处理消费记录的状态和它们的偏移量。让我们重新看一下在系列中 之前的教程 中运行的消费者发起的事务方法代码。
public Consumer<PersonEvent> process(TxCode txCode) {
return txCode::run;
}
@Component
class TxCode {
@Transactional
void run(PersonEvent pe) {
Person person = new Person();
person.setName(pe.getName());
Person savedPerson = repository.save(person);
PersonEvent event = new PersonEvent();
event.setName(savedPerson.getName());
event.setType("PersonSaved");
streamBridge.send("process-out-0", event);
}
}
正如你所记得的,这是一种消费-处理-生产模式,以事务方式从头到尾进行。如果事务方法抛出异常怎么办?在这里,我们需要了解框架如何在回滚事务时处理消费的记录。Spring for Apache Kafka 的底层消息监听器容器允许设置 回滚处理器(Rollback Processor)
消息监听器容器会调用 AfterRollbackProcessor
API,同时调用上一次消费者轮询的剩余记录,并将失败记录放在列表的开头。这些实现使用 Topic/分区信息来确保在下一次轮询中再次获取失败记录。当应用程序在 Spring Cloud Stream 中启用事务时,我们使用名为 DefaultAfterRollbackProcessor
的默认实现来实现 AfterRollbackProcessor
API。因此,当事务回滚时,默认情况下该实现会启动。让我们来看看 AfterRollbackProcessor
运行时会发生什么。
Spring Cloud Stream 允许你通过消费者绑定设置方法调用重试的最大次数。例如,spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts
。最大尝试次数值包括初始尝试。默认值为 3
次。如果想禁用重试,可将此值设为 1
。在这种情况下,框架只尝试记录 1 次。该值包含记录的首次尝试。因此,在默认值为 3 次的情况下, Binder 会在首次尝试后重试两次。
当用户方法抛出异常时,容器最初启动的事务就会回滚。由于我们处于事务上下文中,容器会使用事务模板(transaction template)在新事务中调用 AfterRollbackProcessor
的 process
方法,从而启动一个新的 Kafka 事务。在运行 AfterRollbackProcessor
的处理方法时,它会根据最大尝试次数配置检查是否还有待处理的重试。如果发现更多重试次数,它就会提交当前事务,这是一个 "空" 操作,因为在检查期间什么都没发生。消费者会使用失败的记录进行 seek 操作,以便下一次轮询返回这条失败的记录。然后,消费者轮询更多记录,从而重新返回失败的记录。整个流程重新开始并继续。如果再次失败,则重复进行,直到用完所有可用的重试。一旦所有重试次数用完,AfterRollbackProcessor
就会调用已注册的 Recoverer。Spring Cloud Stream 注册的 Recoverer 会将错误记录发送到错误通道(error channel)。之后,输入(恢复的)记录的偏移将被发送到新事务。此后,当前事务提交,原子式地将偏移量发送到事务并提交记录的偏移量。至此,整个过程结束。恢复的记录不会包含在消费者的 seek 操作中,下一次轮询会返回新记录。
如果恢复因任何原因失败,容器会表现为重试次数未用尽,并进入无限重试。如上所述,当恢复成功时,失败的记录不会包含在 seek 中,因此下一次轮询不会返回该记录。
假设应用程序将最大尝试次数设置为 2 次,并且记录 2 次都失败,以下是在使用事务时事件的顺序:
- 消费者轮询记录,Spring Kafka 的监听器在
TransactionTemplate
的execute
方法中被调用,从而触发KafkaTransactionManager
启动新事务。 - 最终,监听器会调用用户方法,该方法使用
@Transactional
注解。 - 事务拦截器会拦截事务方法,并使用其事务管理器启动一个新的 JPA 事务。
- 当到达数据库操作时,不会发生提交或回滚,因为我们正在执行方法。
StreamBridge
调用send
方法,将消息发布到 Kafka Topic。这里不会启动新的 Kafka 事务,因为已经有一个 Kafka 事务正在进行中。KafkaTemplate
使用同一个事务资源(生产者)来发布。- 该方法的任何操作都会抛出异常,而事务拦截器会捕获异常并对 JPA 事务执行回滚。
- 异常会传播回 Spring Kafka 中的消息监听器容器,监听器通过
TransactionTemplate
的execute
方法调用用户方法。然后,它会回滚 Kafka 事务。 - 此时,容器会在一个新事务中调用
AfterRollbackProcessor
,因为我们正处于事务上下文中。它在其TransactionTemplate
上启动另一个执行操作,通过KafkaTransactionManager
创建一个新的 Kafka 事务。 TransactionTemplate
的execute
方法会调用AfterRollbackProcessor
API 中的process
方法,并立即返回,因为还剩下一次重试(因为我们最多只能尝试两次)。- 然后,容器会提交新的 Kafka 事务,关闭事务而不做任何操作 - 本质上这是一个 "空" 操作的操作。
- 接下来的消费者轮询会重新传递失败的记录,容器会在新事务中再次调用监听器来重试(步骤 1)。
- 重复步骤 2 - 8。
TransactionTemplate
的execute
方法会调用AfterRollbackProcessor
的process
方法,并发现已没有重试机会。process
方法调用已注册的 recoverer。由于我们将其作为 Spring Cloud Stream 应用程序运行,因此默认的 recoverer 会发送到 error channel。- 记录恢复后,恢复记录的偏移量(最初由消费者消费)会通过事务上的生产者发送给事务。
AfterRollbackProcessor
中的process
方法返回后,容器会调用事务上的提交操作,以原子方式向事务发送偏移量,并执行消费者偏移量提交。
为什么我们需要在上述第 8 步中创建一个新事务,并在尝试失败后每次调用 AfterRollbackProcessor
?为什么不能在提交原始 Kafka 事务之前调用 AfterRollbackProcessor
?虽然在每次尝试失败后创建一个新的 Kafka 事务来执行回滚后任务听起来像是不必要的开销,但这是必要的。当原始事务发生回滚时,它不会向事务发送偏移量。如果发生重试,容器会在新事务中再次调用监听器,如此循环往复,直到重试次数耗尽、记录恢复为止。容器创建并回滚的事务数量可能与未向事务发送偏移量的最大尝试次数一样多。每次原始事务回滚时,容器都会为 AfterRollbackProcessor
调用启动一个相应的新事务,其提交都是 "空" 操作(恢复后的最后一次提交除外)。恢复记录后,最后一次调用会将偏移量发送给事务,以便原子提交偏移量,并在 Kafka 端进行必要的事务清理。因此,我们可以看到,要将偏移量发送到事务,我们需要在一个新事务中调用 AfterRollbackProcessor
。
自定义 AfterRollbackProcessor {#自定义-afterrollbackprocessor}
如果应用程序希望自定义回滚后任务,而不是使用 Spring Cloud Stream 使用的默认 - DefaultAfterRollbackProcessor
,则可使用 ListenerContainerCustomizer
提供自定义 AfterRollbackProcessor
。如下:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
return (container, destination, group) -> container.setAfterRollbackProcessor(
new DefaultAfterRollbackProcessor<byte[], byte[]>(
(record, exception) -> System.out.println("Discarding failed record: " + record),
new FixedBackOff(0L, 1)));
}
在提供上述自定义功能时,Recoverer 会记录错误并继续运行。DefaultAfterRollbackProcessor
的构造函数也采取了不重试的回退(backoff)方式。因此,在本例中,只要方法中第一次出现异常,就会通过日志记录恢复记录。
记录恢复过程中的事务性 DLQ 发布 {#记录恢复过程中的事务性-dlq-发布}
作为恢复过程的一部分,Spring Cloud Stream 可让你在所有重试都失败后将失败记录发送到唯一的 DLQ (死信队列) Topic。我们提到,Spring Cloud Stream Kafka Binder 使用的 DefaultAfterRollbackProcessor
会将记录发送到 error channel。当应用程序启用 DLQ 时,Binder 会将失败记录发送到一个特殊的 DLT Topic。具体细节不在我们的事务讨论范围之内。不过,问题在于 DLT 发布是否具有事务性。在设置 DLQ 基础架构时,如果应用程序使用了事务(即提供了 transaction-id-prefix
),Binder 就会使用与 KafkaTransactionManager
相同的原始事务生产者工厂。因此,该框架能保证以事务方式发布到 DLT。
本文介绍了在 Spring Cloud Stream Kafka 应用中使用事务的所有主要构建模块。在本系列教程的下一部分,我们将了解 Kafka 中事务的实际应用,即常用的 "仅一次" 语义,以及如何在 Spring Cloud Stream Kafka 应用中启用它们。
参考:https://spring.io/blog/2023/10/11/transactional-rollback-strategies-with-spring-cloud-stream-and-apache-kafka