51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Spring Boot 整合 Apache Pulsar 入门指南

1、概览 {#1概览}

Apache Pulsar 是一个分布式发布/订阅消息系统。Apache Pulsar 提供的功能与 Apache Kafka 类似,但 Pulsar 的目标是克服 Kafka 的高延迟、低吞吐量、难以扩展和地理复制等局限性。在处理需要实时处理的大量数据时,Apache Pulsar 是一个不错的选择。

在本教程中,我们将学习如在 Spring Boot 应用中整合 Apache Pulsar,以及如何使用 Pulsar 的 Spring Boot Starter 提供的 PulsarTemplatePulsarListener。我们还将了解如何根据自己的需求修改它们的默认配置。

2、Maven 依赖 {#2maven-依赖}

首先,先根据 Apache Pulsar 简介中所述,运行独立的 Apache Pulsar 服务器。

然后,将 spring-pulsar-spring-boot-starter 库添加到项目中:

<dependency>
    <groupId>org.springframework.pulsar</groupId>
    <artifactId>spring-pulsar-spring-boot-starter</artifactId>
    <version>0.2.0</version>
</dependency>

3、PulsarClient {#3pulsarclient}

要与 Pulsar 服务器交互,我们需要配置一个 PulsarClient。默认情况下,Spring 会自动配置一个 PulsarClient,连接到 localhost:6650 上的 Pulsar 服务器:

spring:
  pulsar:
    client:
      service-url: pulsar://localhost:6650

我们可以更改配置,从自定义的地址上建立连接。

要连接到 ssl 下的 pulsar 服务器 ,可以使用 "pulsar+ssl" 代替 "pulsar"。我们还可以通过在 application.yml 中添加 spring.pulsar.client.** 属性来配置连接超时、身份验证和内存限制等属性。

4、指定自定义对象的 Schema {#4指定自定义对象的-schema}

在应用程序中使用一个简单的 User 类:

public class User {

    private String email;
    private String firstName;

    // standard constructors, getters and setters
}

Spring-Pulsar 会自动检测原始数据类型并生成相关 schema。但是,如果我们需要使用自定义 JSON 对象,就必须为 PulsarClient 配置其 schema 信息:

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.baeldung.springpulsar.User
          schema-info:
            schema-type: JSON

这里,message-type 属性接受消息类的完整名称,而 schema-type 则提供要使用的 schema 类型信息。对于复杂对象,schema-type 属性可接受 AVROJSON 值。

虽然使用 properties 文件指定 schema 是首选方法,但我们也可以通过 bean 来提供 schema:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
    return (schemaResolver) -> {
        schemaResolver.addCustomSchemaMapping(User.class, Schema.JSON(User.class));
    }
}

此配置应同时添加到生产者和消费者应用程序中。

5、生产者 {#5生产者}

要在 Pulsar topic 上发布消息,我们将使用 PulsarTemplatePulsarTemplate 实现了 PulsarOperations 接口,并提供了以同步和异步形式发布记录的方法。send 方法会阻塞调用以提供同步操作能力,而 sendAsync 方法则提供异步非阻塞操作。

在本教程中,我们将使用同步操作来发布记录。

5.1、发布消息 {#51发布消息}

Spring Boot 会自动配置一个随时可用的 PulsarTemplate,用于向指定 topic 发布记录。

让我们创建一个向队列发布 String 消息的生产者:

@Component
public class PulsarProducer {

    @Autowired
    private PulsarTemplate<String> stringTemplate;

    private static final String STRING_TOPIC = "string-topic";

    public void sendStringMessageToPulsarTopic(String str) throws PulsarClientException {
        stringTemplate.send(STRING_TOPIC, str);
    }
}

现在,让我们尝试向新队列发送一个 User 对象:

@Autowired
private PulsarTemplate<User> template;

private static final String USER_TOPIC = "user-topic";

public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
    template.send(USER_TOPIC, user);
}

在上面的代码片段中,我们使用 PulsarTemplate 向 Apache Pulsar 的 user-topic topic 发送了一个 User class 对象。

5.2、自定义生产者配置 {#52自定义生产者配置}

PulsarTemplate 接受 TypedMessageBuilderCustomizer 来配置发送的信息,并接受 ProducerBuilderCustomizer 来定制生产者的属性。

我们可以使用 TypedMessageBuilderCustomizer 来配置消息延迟、在特定时间发送、禁用复制以及提供其他属性:

public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
    template.newMessage(user)
      .withMessageCustomizer(mc -> {
        mc.deliverAfter(10L, TimeUnit.SECONDS);
      })
      .send();
}

ProducerBuilderCustomizer 可用于添加访问模式、自定义消息路由和拦截器,以及启用或禁用分块(chunking)和批处理:

public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
    template.newMessage(user)
      .withProducerCustomizer(pc -> {
        pc.accessMode(ProducerAccessMode.Shared);
      })
      .send();
}

6、消费者 {#6消费者}

在向 topic 发布消息后,我们现在要为同一个 topic 建立一个 listener。要启用对 topic 的监听,需要用 @PulsarListener 注解 listener 方法。

Spring Boot 会为 listener 方法配置所有必要的组件。

我们还需要使用 @EnablePulsar 注解来启用 PulsarListener

6.1. 接收消息 {#61-接收消息}

首先要为前一节创建的 "string-topic" 创建一个 listener 方法:

@Service
public class PulsarConsumer {

    private static final String STRING_TOPIC = "string-topic";

    @PulsarListener(
      subscriptionName = "string-topic-subscription",
      topics = STRING_TOPIC,
      subscriptionType = SubscriptionType.Shared
    )
    public void stringTopicListener(String str) {
        LOGGER.info("Received String message: {}", str);
    }
}

PulsarListener 注解中,我们在 topicName 属性中配置了该方法将监听的 topic,并在 subscriptionName 属性中给出了订阅名称。

现在,让我们为 User 类使用的 user-topic 创建一个 listener 方法:

private static final String USER_TOPIC = "user-topic";

@PulsarListener(
    subscriptionName = "user-topic-subscription",
    topics = USER_TOPIC,
    schemaType = SchemaType.JSON
)
public void userTopicListener(User user) {
    LOGGER.info("Received user object with email: {}", user.getEmail());
}

除了先前的 Listener 方法中提供的属性外,我们还添加了一个 schemaType 属性,其值与生产者中的值相同。

还需要在 main class 上添加 @EnablePulsar 注解:

@EnablePulsar
@SpringBootApplication
public class SpringPulsarApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringPulsarApplication.class, args);
    }
}

6.2、自定义消费者配置 {#62自定义消费者配置}

除订阅名称和 schema type 外,PulsarListener 还可用于配置自动启动、批处理和确认模式等属性:

@PulsarListener(
  subscriptionName = "user-topic-subscription",
  topics = USER_TOPIC,
  subscriptionType = SubscriptionType.Shared,
  schemaType = SchemaType.JSON,
  ackMode = AckMode.RECORD,
  properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
    LOGGER.info("Received user object with email: {}", user.getEmail());
}

在这里,我们将确认模式设置为 Record,并将确认超时设置为 60 秒。

7、使用死信 Topic {#7使用死信-topic}

如果信息确认超时或服务器接收到 nack,Pulsar 就会尝试重发一定次数的信息。这些重试次数用完后,这些未送达的信息会被发送到称为死信队列(DLQ)的队列中。

此选项仅适用于共享(Shared)订阅类型。要为我们的 user-topic 队列配置 DLQ,我们首先要创建一个 DeadLetterPolicy Bean,它将定义尝试重新交付的次数以及用作 DLQ 的队列名称:

private static final String USER_DEAD_LETTER_TOPIC = "user-dead-letter-topic";
@Bean
DeadLetterPolicy deadLetterPolicy() {
    return DeadLetterPolicy.builder()
      .maxRedeliverCount(10)
      .deadLetterTopic(USER_DEAD_LETTER_TOPIC)
      .build();
}

现在,我们将把该策略添加到之前创建的 PulsarListener 中:

@PulsarListener(
  subscriptionName = "user-topic-subscription",
  topics = USER_TOPIC,
  subscriptionType = SubscriptionType.Shared,
  schemaType = SchemaType.JSON,
  deadLetterPolicy = "deadLetterPolicy",
  properties = {"ackTimeout=60s"}
)
public void userTopicListener(User user) {
    LOGGER.info("Received user object with email: {}", user.getEmail());
}

在这里,我们将 userTopicListener 配置为使用之前创建的 deadLetterPolicy,并将确认时间配置为 60 秒。

我们可以创建一个单独的 Listener 来处理 DQL 中的信息:

@PulsarListener(
  subscriptionName = "dead-letter-topic-subscription",
  topics = USER_DEAD_LETTER_TOPIC,
  subscriptionType = SubscriptionType.Shared
)
public void userDlqTopicListener(User user) {
    LOGGER.info("Received user object in user-DLQ with email: {}", user.getEmail());
}

8、总结 {#8总结}

在本教程中,我们学习了如何在 Spring Boot 应用程序中整合,使用 Apache Pulsar,以及如何更改生产者和消费者的默认配置。


参考:https://www.baeldung.com/spring-boot-apache-pulsar

赞(2)
未经允许不得转载:工具盒子 » Spring Boot 整合 Apache Pulsar 入门指南