1、概览 {#1概览}
Apache Pulsar 是一个分布式发布/订阅消息系统。Apache Pulsar 提供的功能与 Apache Kafka 类似,但 Pulsar 的目标是克服 Kafka 的高延迟、低吞吐量、难以扩展和地理复制等局限性。在处理需要实时处理的大量数据时,Apache Pulsar 是一个不错的选择。
在本教程中,我们将学习如在 Spring Boot 应用中整合 Apache Pulsar,以及如何使用 Pulsar 的 Spring Boot Starter 提供的 PulsarTemplate
和 PulsarListener
。我们还将了解如何根据自己的需求修改它们的默认配置。
2、Maven 依赖 {#2maven-依赖}
首先,先根据 Apache Pulsar 简介中所述,运行独立的 Apache Pulsar 服务器。
然后,将 spring-pulsar-spring-boot-starter 库添加到项目中:
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-boot-starter</artifactId>
<version>0.2.0</version>
</dependency>
3、PulsarClient {#3pulsarclient}
要与 Pulsar 服务器交互,我们需要配置一个 PulsarClient
。默认情况下,Spring 会自动配置一个 PulsarClient
,连接到 localhost:6650
上的 Pulsar 服务器:
spring:
pulsar:
client:
service-url: pulsar://localhost:6650
我们可以更改配置,从自定义的地址上建立连接。
要连接到 ssl 下的 pulsar 服务器 ,可以使用 "pulsar+ssl" 代替 "pulsar"。我们还可以通过在 application.yml
中添加 spring.pulsar.client.**
属性来配置连接超时、身份验证和内存限制等属性。
4、指定自定义对象的 Schema {#4指定自定义对象的-schema}
在应用程序中使用一个简单的 User
类:
public class User {
private String email;
private String firstName;
// standard constructors, getters and setters
}
Spring-Pulsar 会自动检测原始数据类型并生成相关 schema。但是,如果我们需要使用自定义 JSON 对象,就必须为 PulsarClient
配置其 schema 信息:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.baeldung.springpulsar.User
schema-info:
schema-type: JSON
这里,message-type
属性接受消息类的完整名称,而 schema-type
则提供要使用的 schema 类型信息。对于复杂对象,schema-type
属性可接受 AVRO
或 JSON
值。
虽然使用 properties 文件指定 schema 是首选方法,但我们也可以通过 bean 来提供 schema:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.JSON(User.class));
}
}
此配置应同时添加到生产者和消费者应用程序中。
5、生产者 {#5生产者}
要在 Pulsar topic 上发布消息,我们将使用 PulsarTemplate
。PulsarTemplate
实现了 PulsarOperations
接口,并提供了以同步和异步形式发布记录的方法。send
方法会阻塞调用以提供同步操作能力,而 sendAsync
方法则提供异步非阻塞操作。
在本教程中,我们将使用同步操作来发布记录。
5.1、发布消息 {#51发布消息}
Spring Boot 会自动配置一个随时可用的 PulsarTemplate
,用于向指定 topic 发布记录。
让我们创建一个向队列发布 String
消息的生产者:
@Component
public class PulsarProducer {
@Autowired
private PulsarTemplate<String> stringTemplate;
private static final String STRING_TOPIC = "string-topic";
public void sendStringMessageToPulsarTopic(String str) throws PulsarClientException {
stringTemplate.send(STRING_TOPIC, str);
}
}
现在,让我们尝试向新队列发送一个 User
对象:
@Autowired
private PulsarTemplate<User> template;
private static final String USER_TOPIC = "user-topic";
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.send(USER_TOPIC, user);
}
在上面的代码片段中,我们使用 PulsarTemplate
向 Apache Pulsar 的 user-topic
topic 发送了一个 User
class 对象。
5.2、自定义生产者配置 {#52自定义生产者配置}
PulsarTemplate
接受 TypedMessageBuilderCustomizer
来配置发送的信息,并接受 ProducerBuilderCustomizer
来定制生产者的属性。
我们可以使用 TypedMessageBuilderCustomizer
来配置消息延迟、在特定时间发送、禁用复制以及提供其他属性:
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.newMessage(user)
.withMessageCustomizer(mc -> {
mc.deliverAfter(10L, TimeUnit.SECONDS);
})
.send();
}
ProducerBuilderCustomizer
可用于添加访问模式、自定义消息路由和拦截器,以及启用或禁用分块(chunking)和批处理:
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.newMessage(user)
.withProducerCustomizer(pc -> {
pc.accessMode(ProducerAccessMode.Shared);
})
.send();
}
6、消费者 {#6消费者}
在向 topic 发布消息后,我们现在要为同一个 topic 建立一个 listener。要启用对 topic 的监听,需要用 @PulsarListener
注解 listener 方法。
Spring Boot 会为 listener 方法配置所有必要的组件。
我们还需要使用 @EnablePulsar
注解来启用 PulsarListener
。
6.1. 接收消息 {#61-接收消息}
首先要为前一节创建的 "string-topic" 创建一个 listener 方法:
@Service
public class PulsarConsumer {
private static final String STRING_TOPIC = "string-topic";
@PulsarListener(
subscriptionName = "string-topic-subscription",
topics = STRING_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void stringTopicListener(String str) {
LOGGER.info("Received String message: {}", str);
}
}
在 PulsarListener
注解中,我们在 topicName
属性中配置了该方法将监听的 topic,并在 subscriptionName
属性中给出了订阅名称。
现在,让我们为 User
类使用的 user-topic
创建一个 listener 方法:
private static final String USER_TOPIC = "user-topic";
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
schemaType = SchemaType.JSON
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
除了先前的 Listener 方法中提供的属性外,我们还添加了一个 schemaType
属性,其值与生产者中的值相同。
还需要在 main class 上添加 @EnablePulsar
注解:
@EnablePulsar
@SpringBootApplication
public class SpringPulsarApplication {
public static void main(String[] args) {
SpringApplication.run(SpringPulsarApplication.class, args);
}
}
6.2、自定义消费者配置 {#62自定义消费者配置}
除订阅名称和 schema type 外,PulsarListener
还可用于配置自动启动、批处理和确认模式等属性:
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON,
ackMode = AckMode.RECORD,
properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
在这里,我们将确认模式设置为 Record
,并将确认超时设置为 60 秒。
7、使用死信 Topic {#7使用死信-topic}
如果信息确认超时或服务器接收到 nack
,Pulsar 就会尝试重发一定次数的信息。这些重试次数用完后,这些未送达的信息会被发送到称为死信队列(DLQ)的队列中。
此选项仅适用于共享(Shared)订阅类型。要为我们的 user-topic
队列配置 DLQ,我们首先要创建一个 DeadLetterPolicy
Bean,它将定义尝试重新交付的次数以及用作 DLQ 的队列名称:
private static final String USER_DEAD_LETTER_TOPIC = "user-dead-letter-topic";
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder()
.maxRedeliverCount(10)
.deadLetterTopic(USER_DEAD_LETTER_TOPIC)
.build();
}
现在,我们将把该策略添加到之前创建的 PulsarListener
中:
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON,
deadLetterPolicy = "deadLetterPolicy",
properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
在这里,我们将 userTopicListener
配置为使用之前创建的 deadLetterPolicy
,并将确认时间配置为 60 秒。
我们可以创建一个单独的 Listener 来处理 DQL 中的信息:
@PulsarListener(
subscriptionName = "dead-letter-topic-subscription",
topics = USER_DEAD_LETTER_TOPIC,
subscriptionType = SubscriptionType.Shared
)
public void userDlqTopicListener(User user) {
LOGGER.info("Received user object in user-DLQ with email: {}", user.getEmail());
}
8、总结 {#8总结}
在本教程中,我们学习了如何在 Spring Boot 应用程序中整合,使用 Apache Pulsar,以及如何更改生产者和消费者的默认配置。
参考:
https://www.baeldung.com/spring-boot-apache-pulsar