本文是系列教程的第 2 部分,在这一部分中,我们将通过 Spring Cloud Stream 和 Apache Kafka 详细介绍事务。在 上一节 中,我们了解了事务的基本概念。本文将深入了解一些实现细节。
在本文中,我们主要从生产者的角度来了解事务如何与 Spring Cloud Stream 和 Apache Kafka 配合使用。
Spring Cloud Stream 中的生产者 {#spring-cloud-stream-中的生产者}
在深入了解生产者发起的事务之前,我们先来了解一下简单生产者的基本知识。在 Spring Cloud Stream 中,有几种编写生产者(在消息传递领域也称为发布者)的方法。如果你的用例需要定时生成数据,你可以编写一个 java.util.function.Supplier
方法,如下所示。
@Bean
public Supplier<Pojo> mySupplier() {
return () -> {
new Pojo();
};
}
如代码所示,将上述 Supplier
作为 Spring Bean 提供时,Spring Cloud Stream 会将其视为发布者,由于我们在 Apache Kafka 的上下文中,因此它会将 POJO 记录发送到 Kafka Topic。
默认情况下,Spring Cloud Stream 每秒调用一次 Supplier,但你可以通过配置更改该计划。更多详情,请参阅 参考文档。
如果你不想轮询 Supplier,但又想控制其发布频率,该怎么办?Spring Cloud Stream 通过名为 StreamBridge
的开箱即用实现 StreamOperations
API 提供了一种便捷的方法。下面是一个示例。
@Autowired
StreamBridge streamBridge;
@PostMapping("/send-data")
public void publishData() {
streamBridge.send("mySupplier-out-0", new Pojo());
}
在这种情况下,应用使用 REST 端点触发通过 StreamBridge
发布数据。因此任何第三方都可以通过调用 REST 端点启动数据发布。
在这种基础的生产者中使用事务,是否合适?
现在,我们已经了解了 Spring Cloud Stream 为发布记录提供的两种策略,下面让我们回到讨论的主题:事务发布 。假设我们希望在使用其中一个或多个生产者的同时确保数据完整性并获得事务保证。在这种情况下,问题是我们是否首先需要使用事务来实现这些目标。在上述两个示例中,如何确保记录是以事务方式发布的?简而言之,在这些类型的发布用例中,应避免使用事务。这些示例中的记录发布都是单发送场景。使用同步生产者,我们可以实现相同的语义事务保证。默认情况下,生产者是异步的,在同步模式下运行时,生产者会确保在向客户端发送响应之前将记录写入 Leader 和所有副本。你可以通过将 spring.cloud.stream.kafka.bindings.<binding-name>.producer.sync
属性设置为 true
来启用同步发布。
总之,在设计纯生产者应用时,要谨慎使用事务。如果使用 Supplier
或通过 StreamBridge
一次发送一条记录,我们不建议使用事务,因为将生产者转换为同步模式运行可以达到同样的效果,而不会产生事务开销。上述讨论引出了一个值得思考的问题。对于纯生产者应用,什么时候才有必要使用事务并从中获益?正如本教程系列的 上一节 所述,这完全取决于应用的用例。就生产者而言,这意味着我们只有在进行多次相关发布,或者除了发布之外还需要与外部事务管理器同步时,才需要使用事务。接下来将介绍前一种情况,本系列教程的下一篇将介绍后一种情况。
在 Spring Cloud Stream Kafka Binder 中启用事务 {#在-spring-cloud-stream-kafka-binder-中启用事务}
在 Spring Cloud Stream 的 Kafka binder 中启用事务的主要驱动是一个属性:spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix
。当该属性具有有效的前缀字符串时,Spring Cloud Stream 中的 Kafka binder 就会确保底层 KafkaTemplate
通过使用事务来发布数据。顺便提一下,该属性可指示 Spring Cloud Stream 在使用处理器模式(消费-处理-生产 或 读-处理-写 模式)时使消费者具有事务感知能力。
事务实践 {#事务实践}
回到上面的单个 Supplier
或 StreamBridge
的例子,引入事务来了解事务组件的主要用途。如前所述,我们不必在这些情况下使用事务,因为这会增加更多开销。不过,这样做有助于我们理解事务。
代码如下:
@SpringBootApplication
@RestController
public class SimpleSpringCloudStreamProducer {
@Bean
public Supplier<Pojo> mySupplier() {
return () -> {
new Pojo();
};
}
@Autowired
StreamBridge streamBridge;
@PostMapping("/send-data")
public void publishData() {
streamBridge.send("mySupplier-out-0", new Pojo());
}
}
现在,提供所需的属性。
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: my-transactional-producer-
由于我们在应用的配置中提供了该属性,所以每当在这个示例中调用 Supplier(通过框架)或有人通过 StreamBridge#send
方法调用 REST 端点时,向 Kafka Topic 的底层发布就会变成完全事务性的。
当触发 Supplier 时,Kafka binder 使用 KafkaTemplate
发布数据。当 binder 检测到应用提供了 transaction-id-prefix
属性时,每次 KafkaTemplate#send
调用都是通过 KafkaTemplate#executeInTransaction
方法完成的。因此,请放心,框架是以事务方式向 Kafka Topic 进行底层发布的。从应用的角度来看,应用开发者唯一需要为事务目的提供的就是 transaction-id-prefix
属性。
在开发或调试事务应用时,可以将日志记录级别设置为 TRACE
,这样相关的底层事务类就能为我们提供正在发生的事情的详细信息。
例如,如果将下列包的日志记录级别设置为 TRACE
,就会在日志中看到大量的信息。
logging:
level:
org.springframework.transaction: TRACE
org.springframework.kafka.transaction: TRACE
org.springframework.kafka.producer: TRACE
org.springframework.kafka.core: TRACE
每次框架调用 Supplier 方法时,我们都能从日志中观察到以下信息:
o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] beginTransaction()
o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord
o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=myTopic1, partition=null, headers=RecordHeaders(headers = ...
o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@1426370c] commitTransaction()
从 TRACE 日志中可以看到,每次它以事务方式发布记录时,都会形成一个序列:beginTransaction 、Sending 、Sent 和 commitTransaction 。如果运行应用,你会观察到每秒都会看到这些序列,因为这是 Spring Cloud Stream 调用 Supplier
方法的默认调度频率。
同样的事务流程也适用于 StreamBridge#send
。当 Spring Cloud Stream 调用 send 方法时,输出绑定使用的底层 KafkaTemplate
会确保记录在事务中发布,因为我们提供了 transaction-id-prefix
。
发布多条记录的事务 {#发布多条记录的事务}
介绍完这些入门知识后,让我们来谈谈在哪些情况下使用事务是合理的。正如我们之前讨论过的,需要将多条记录作为单个原子单元发布,这就是需要使用事务的一种有效情况。
看看下面的代码示例:
public void publish(StreamBridge streamBridge) {
for (int i = 0; i < 5; i++) {
streamBridge.send("mySupplier-out-0", "data-" + i);
}
}
如你所见,这是一个为了说明问题所在而设计的例子。我们发布的不是一条记录,而是多条记录。发布到多个 Topic 也是同样有效的方法。我们可能会认为,通过设置 transaction-id-prefix
属性,我们可以在单个事务中快速封装多条记录的发布。然而,事实并非如此。我们仍然需要提供 prefix 属性。但是,仅凭这一点,每次发送仍会在其专用事务中进行。为了确保所有五条记录的整个端到端发布过程以原子方式进行,我们需要在方法中应用 Spring 框架核心的 @Transactional
注解。此外,我们还必须提供一个事务管理器 bean - KafkaTransactionManager
,它使用由 Spring Cloud Stream Kafka binder 创建的相同生产者工厂。
下面是我们现在的代码和应用的配置:
@SpringBootApplication
@RestController
public class SpringCloudStreamProducer {
@Autowired
StreamBridge streamBridge;
@Autowired Sender sender;
@Autowired
DefaultBinderFactory binderFactory;
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamProducer.class, args);
}
@PostMapping("/send-data")
public void publishData() throws InterruptedException {
sender.send(streamBridge);
}
@Component
static class Sender {
@Transactional
public void send(StreamBridge streamBridge)
{
for (int i = 0; i < 5; i++) {
streamBridge.send("mySupplier-out-0", "data-" + i);
}
}
}
@Bean
KafkaTransactionManager customKafkaTransactionManager() {
KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder("kafka", MessageChannel.class);
ProducerFactory<byte[], byte[]> transactionalProducerFactory = kafka.getTransactionalProducerFactory();
KafkaTransactionManager kafkaTransactionManager = new KafkaTransactionManager(transactionalProducerFactory);
return kafkaTransactionManager;
}
}
相应的配置:
spring:
cloud:
stream:
bindings:
mySupplier-out-0:
destination: my-topic
kafka:
binder:
Transaction:
transaction-id-prefix: mySupplier-
producer:
configuration:
retries: 1
acks: all
请注意,上述代码中的事务方法(注解了 @Transactional
的方法)不能在本类中直接使用 this
调用。如果调用的是同一类中的方法,或者是非 Spring 管理 Bean 的不同类之间的方法,则不存在代理,事务拦截器也不会启动。JVM 在运行时并不知道代理和拦截机制。在方法上添加 @Transactional
注解时,Spring 会在幕后为该方法创建事务代理。当 Spring Cloud Stream 调用事务方法时,代理会拦截该调用,然后通过代理对象进行实际调用。
我们提供的自定义 KafkaTransactionManager
Bean 有两个作用。首先,它使 Spring Boot 应用 @EnableTransactionManagerment
。它还提供了 binder 内部使用的相同生产者工厂,以便事务注解在应用事务时使用适当的资源。
当 Spring Boot 检测到可用的事务管理器 bean 时,它会自动应用 @EnableTransactionManagement
注解,该注解负责检测 @Transactional
注解,然后通过 Spring AOP 代理和 Advice 机制添加拦截器。换句话说,Spring AOP 会为 @Transactional
方法创建一个代理,并包含 AOP Advice。如果不应用 @EnableTransactionManagement
注解,Spring 不会触发任何代理和拦截机制。由于 EnableTransactionManagement
注解对于上述各种原因至关重要,因此我们必须提供一个事务管理器 Bean。否则,方法上的 Transactional
注解就没有任何作用。
注意,我们是从 binder 中获取事务生产者工厂,并将其用于 KafkaTransactionManager
的构造函数中。当该 bean 出现在应用中时,现在所有记录的整个发布过程都是在单个事务的范围内进行的。我们在 TRACE 日志中只看到 beginTransaction...commitTransaction 的单个序列,这意味着只有一个事务来执行所有发布操作。
其事件背后的顺序如下:
- 一旦调用注解为
Transactional
的方法,事务拦截器就会通过 AOP 代理机制启动,并使用自定义的KafkaTransactionManager
启动一个新事务。 - 当事务管理器开始事务时,事务管理器使用的资源 - 事务资源持有者(又名,从生产者工厂获得的生产者)- 就与事务绑定。
- 当该方法调用
StreamBridge#send
方法时,底层KafkaTemplate
将使用自定义KafkaTransactionManager
创建的相同事务资源。由于事务已在进行中,因此不会启动另一个事务,但发布会在同一个事务生产者上进行。 - 在多次调用
send
方法时,它不会启动新的事务。相反,它会通过原始事务中使用的相同生产者资源进行发布。 - 方法退出时,如果没有错误,拦截器会要求事务管理器提交事务。如果方法中的任何发送操作或其他操作抛出异常,拦截器就会要求事务管理器回滚事务。这些调用最终会触发
KafkaResourceHolder
的commit
或rollback
方法,从而调用 Kafka 生产者提交或回滚事务。
由于我们的示例中只有一个自定义 KafkaTransactionManager
Bean,因此我们可以直接使用 Transactional
注解。如果我们有多个自定义 KafkaTransactionManager
Bean,我们就必须用正确的 Bean 名称来限定 @Transactional
注解。
在不使用自定义 KafkaTransactionManager
的情况下运行应用 {#在不使用自定义-kafkatransactionmanager-的情况下运行应用}
如果我们移除自定义的 KafkaTransactionManager
并运行此应用,可以看到它创建了五个单独的事务,而不是一个单一的事务。如果启用 TRACE
日志,就能在日志中看到 beginTransaction...commitTransaction 的五个序列。
你可以通过编写一个事务性消费者 Spring Cloud Stream 应用并将其隔离级别设置为 read_committed
,来验证此行为。具体方法是使用 spring.cloud.stream.kafka.binder.configuration.isolation.level
属性并将其值设置为 read_committed
。出于测试目的,在 for 循环中添加 Thread.sleep
或其他等待机制来模拟每次 StreamBridge#send
之后的行为。我们可以看到,无论等待与否,只要每次 send
方法调用返回,消费者就会收到数据,从而证明整个操作并非由一个事务执行,而是每次 send
都发生在自己的事务中。
我们看到每次发送都有单独的事务,这是因为 Transactional
注解没有完成我们期望它完成的工作。Transactional
注解只有在事务管理器 Bean 可用且其生产者工厂与 Binder 使用的工厂相同的情况下才会起作用。
如果 Spring Boot 在配置中检测到 transaction-id-prefix
属性(通过 spring.kafka.producer.transaction-id-prefix
)时,它会自动配置一个 KafkaTransactionManager
。然而,由于我们处于 Spring Cloud Stream 的上下文中,我们必须使用 spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix
,因为这是我们向框架发出的信号,要求它为 Binder 和相关的事务性生产者工厂创建一个内部事务管理器。如果我们提供正确的 spring.kafka
前缀,让 Spring Boot 为我们自动配置一个 KafkaTransactionManager
,会怎么样呢?虽然这非常诱人,但实际上不起作用,因为自动配置的事务管理器使用的是与 Binder 使用的生产者工厂不同的工厂。因此,我们必须提供一个自定义的 KafkaTransactionManager
,它使用与 Binder 相同的生产者工厂。这正是我们上面所做的。
在本教程系列的下一部分,我们将了解如何与外部事务管理器同步生产者和消费者发起的事务。
参考:https://spring.io/blog/2023/09/28/producer-initiated-transactions-in-spring-cloud-stream-kafka-applications