1、概览 {#1概览}
本文将带你了解如何使用 Spring Modulith 来监听 Spring Application Event 并自动将其发布到 Kafka Topic。
2、事务操作和 Message Broker {#2事务操作和-message-broker}
假设,正在开发一个保存文章的功能:
@Service
class Baeldung {
private final ArticleRepository articleRepository;
// 构造函数
@Transactional
public void createArticle(Article article) {
validateArticle(article);
article = addArticleTags(article);
// ...其他业务逻辑
articleRepository.save(article);
}
}
此外,还需要向系统的其他部分通知这一新文章。其他模块或服务会根据此做出反应,例如创建报告或向网站读者发送新闻邮件。
最简单的方法是使用 KafkaOperations
向 baeldung.articles.published
Kafka Topic 发送消息,并使用文章的 slug()
作为关 Key:
@Service
class Baeldung {
private final ArticleRepository articleRepository;
private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;
// 构造函数
@Transactional
public void createArticle(Article article) {
// ... 业务逻辑
validateArticle(article);
article = addArticleTags(article);
article = articleRepository.save(article);
messageProducer.send(
"baeldung.articles.published",
article.slug(),
new ArticlePublishedEvent(article.slug(), article.title())
).join();
}
}
然而,这种方法并不理想,原因有几个。从设计的角度来看,将 Domain Service 与消息生产者耦合在一起了。此外,Domain Service 直接依赖于较低级别的组件,违反了基本的干净架构(Clean Architecture)规则之一。
而且,这种方法还会影响性能,因为所有事情都是在 @Transacional
方法中发生的。因此,为保存文章而获取的数据库连接将一直处于打开状态,直到消息成功发布。
最后,保存实体和发布消息是一个原子操作。换句话说,如果生产者未能发布事件,数据库事务就会回滚。
3、使用 Spring Event 进行依赖反转 {#3使用-spring-event-进行依赖反转}
可以利用 Spring Event 来改进解决方案的设计,目的是避免直接从 Domain Service 向 Kafka 发布消息。
首先,移除 KafkaOperations
依赖,转而发布内部的 Application Event:
@Service
public class Baeldung {
private final ApplicationEventPublisher applicationEvents;
private final ArticleRepository articleRepository;
// 构造函数
@Transactional
public void createArticle(Article article) {
// ... 业务逻辑
validateArticle(article);
article = addArticleTags(article);
article = articleRepository.save(article);
applicationEvents.publishEvent(
new ArticlePublishedEvent(article.slug(), article.title()));
}
}
除此之外,基础架构层还有一个专门的 Kafka Producer,该组件将监听 ArticlePublishedEvent
,并将发布委托给底层的 KafkaOperations
Bean:
@Component
class ArticlePublishedKafkaProducer {
private final KafkaOperations<String, ArticlePublishedEvent> messageProducer;
// 构造函数
@EventListener
public void publish(ArticlePublishedEvent article) {
Assert.notNull(article.slug(), "Article Slug must not be null!");
messageProducer.send("baeldung.articles.published", article.splug(), event);
}
}
有了这个抽象,基础架构组件现在依赖于 Domain Service 产生的事件。换句话说,这成功地降低了耦合度,并反转了依赖关系。此外,如果其他模块对文章(Article
)的创建事件感兴趣,它们现在可以无缝监听这些 Application Event,并做出相应的反应。
4、原子操作与非原子运算 {#4原子操作与非原子运算}
现在,来深入考虑性能问题。首先,必须确定当与 Message Broker 的通信失败时,回滚是否是所需的行为。这种选择因具体情况而异。
如果不需要这种原子性,就必须释放数据库连接并异步发布事件。为了模拟这种情况,可以尝试创建一篇没有 slug
的文章,这回导致 ArticlePublishedKafkaProducer::publish
失败:
@Test
void whenPublishingMessageFails_thenArticleIsStillSavedToDB() {
var article = new Article(null, "Introduction to Spring Boot", "John Doe", "<p> Spring Boot is [...] </p>");
baeldung.createArticle(article);
assertThat(repository.findAll())
.hasSize(1).first()
.extracting(Article::title, Article::author)
.containsExactly("Introduction to Spring Boot", "John Doe");
}
如果现在运行测试,它将会失败。这是因为 ArticlePublishedKafkaProducer
会抛出异常,导致 Domain Service 回滚事务。不过,可以用 @TransactionalEventListener
和 @Async
替换 @EventListener
注解,使事件监听器成为异步的:
@Async
@TransactionalEventListener
public void publish(ArticlePublishedEvent event) {
Assert.notNull(event.slug(), "Article Slug must not be null!");
messageProducer.send("baeldung.articles.published", event);
}
如果现在重新运行测试,就会发现日志中输出了异常,事件未被发布,实体已被保存到数据库中。此外,数据库连接被提前释放,允许其他线程使用。
5、使用 Spring Modulith 实现事件外部化 {#5使用-spring-modulith-实现事件外部化}
上文通过两步骤的方法成功解决了原始代码示例中的设计和性能问题:
- 使用 Spring Application Event 进行依赖反转
- 利用
@TransactionalEventListener
和@Async
进行异步发布
使用 Spring Modulith 可以进一步简化代码,为这种模式提供内置支持。首先在 pom.xml
中添加 spring-modulith-events-api
的 maven 依赖:
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-api</artifactId>
<version>1.1.2</version>
</dependency>
该模块可配置为监听 Application Event,并自动将其外部化到各种 消息系统。这里继续使用最开始的示例,重点关注 Kafka。为了实现整合,需要添加 spring-modulith-events-kafka
依赖:
<dependency>
<groupId>org.springframework.modulith</groupId>
<artifactId>spring-modulith-events-kafka</artifactId>
<version>1.1.2</version>
</dependency>
现在,需要更新 ArticlePublishedEvent
,并用 @Externalized
对其进行注解。该注解需要路由目标的名称和 Key。换句话说,就是 Kafka Topic 和消息 Key。对于 Key,可以使用 SpEL 表达式来调用 Article::slug()
:
@Externalized("baeldung.article.published::#{slug()}")
public record ArticlePublishedEvent(String slug, String title) {
}
6、事件外部化配置 {#6事件外部化配置}
尽管 @Externalized
注解的值对于简洁的 SpEL 表达式很有用,但在某些情况下,还是要避免使用它:
- 在表达式过于复杂的情况下
- 需要将 Topic 消息与 Application Event 分开时
- 需要为 Application Event 和外部化事件建立不同的 Model
对于这些场景,可以使用 EventExternalizationConfiguration
的 Builder 配置必要的路由和事件映射。之后,只需将此配置作为 Spring Bean 公开即可:
@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {
return EventExternalizationConfiguration.externalizing()
.select(EventExternalizationConfiguration.annotatedAsExternalized())
.route(
ArticlePublishedEvent.class,
it -> RoutingTarget.forTarget("baeldung.articles.published").andKey(it.slug())
)
.mapping(
ArticlePublishedEvent.class,
it -> new ArticlePublishedKafkaEvent(it.slug(), it.title())
)
.build();
}
这个时候,可以删除 ArticlePublishedEvent
中的路由信息,并保留没有值的 @Externalized
注解:
@Externalized
public record ArticlePublishedEvent(String slug, String title) {
}
7、总结 {#7总结}
本文通过 "在事务中发布消息" 的业务场景,介绍了如何使用 Spring Modulith 来监听 Application Event,并自动将其发布到 Kafka Topic,通过这种方法,可以异步地将事件外部化,并尽快释放数据库连接。
Ref:https://www.baeldung.com/spring-modulith-event-externalization