51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Spring Boot 整合 Kafka Stream

1、简介 {#1简介}

流式数据在现实生活中的一些例子包括传感器数据、股票市场事件流和系统日志。在本文中,我们通过构建一个简单的字数统计流式应用来介绍如何在 Spring Boot 中使用 Kafka Streams。

2、概览 {#2概览}

Kafka Streams 在 Kafka Topic 和关系型数据库表之间提供了一种对偶性。它使我们能够对一个或多个流式事件进行连接、分组、聚合和过滤等操作。

Kafka 流的一个重要概念是处理器拓扑(Processor Topology)。处理器拓扑是 Kafka Stream 对一个或多个事件流进行操作的蓝图。从本质上讲,处理器拓扑可视为有向无环图。在这个图中,节点分为源节点、处理器节点和汇节点,而边则代表流事件的流向。

位于拓扑结构顶端的源接收来自 Kafka 的流数据,将其向下传递到执行自定义操作的处理器节点,并通过汇节点流出到新的 Kafka Topicc。在进行核心处理的同时,还利用检查点(Checkpoint)定期保存数据流的状态,以实现容错和弹性。

3、依赖 {#3依赖}

首先在 POM 中添加 spring-kafkakafka-streams 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.7.8</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId
    <artifactId>kafka-streams</artifactId>
    <version>2.7.1</version>
</dependency> 

4、示例 {#4示例}

示例应用从输入的 Kafka Topic 中读取流式事件。读取记录后,它会对记录进行处理,分割文本并计算单个字数。随后,它将更新的字数发送到 Kafka 输出。除了输出 Topic 外,还要创建一个简单的 REST 服务,通过 HTTP 端点公开该计数。

总之,输出 Topic 将不断更新从输入事件中提取的单词及其更新计数。

4.1、配置 {#41配置}

在 Java 配置类中定义 Kafka Stream 配置:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    KafkaStreamsConfiguration kStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(APPLICATION_ID_CONFIG, "streams-app");
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        return new KafkaStreamsConfiguration(props);
    }

    // 其他配置
}

如上,使用了 @EnableKafkaStreams 注解来自动配置所需的组件。这个自动配置需要一个名为 DEFAULT_STREAMS_CONFIG_BEAN_NAME 指定的 KafkaStreamsConfiguration Bean。Spring Boot 使用这个配置并创建一个 KafkaStreams 客户端来管理应用生命周期。

在这个示例中,为配置提供了 application id、bootstrap server 连接详情和 Serializer / Deserializer。

4.2、拓扑 {#42拓扑}

配置设置完成后,为应用构建拓扑结构,以记录输入信息的字数:

@Component
public class WordCountProcessor {

    private static final Serde<String> STRING_SERDE = Serdes.String();

    @Autowired
    void buildPipeline(StreamsBuilder streamsBuilder) {
        KStream<String, String> messageStream = streamsBuilder
          .stream("input-topic", Consumed.with(STRING_SERDE, STRING_SERDE));

        KTable<String, Long> wordCounts = messageStream
          .mapValues((ValueMapper<String, String>) String::toLowerCase)
          .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
          .groupBy((key, word) -> word, Grouped.with(STRING_SERDE, STRING_SERDE))
          .count();

        wordCounts.toStream().to("output-topic");
    }
}

如上,定义了一个配置方法,并用 @Autowired 对其进行了注解。Spring 会处理此注解,并将容器中匹配的 StreamsBuilder Bean 注入到参数。或者,也可以在配置类中创建一个 Bean 来生成拓扑结构。

通过 StreamsBuilder 可以访问所有 Kafka Streams API,并使其成为一个常规的 Kafka Streams 应用。在本例中,使用这种高级 DSL 来定义应用的转换操作:

  • 使用指定的 key 和 value Serializer / Deserializer 从输入 Topic 创建 KStream
  • 通过转换、拆分、分组,然后计算数据,创建 KTable
  • 将结果具体化为输出流。

从本质上讲,Spring Boot 在管理 KStream 实例生命周期的同时,为 Streams API 提供了一个非常薄的封装。它为拓扑创建和配置所需的组件,并执行 Streams 应用。重要的是,这可以让我们专注于核心业务逻辑,而 Spring 则负责管理生命周期。

4.3、REST 服务 {#43rest-服务}

通过声明步骤定义管道(Pipeline)后,创建 REST Controller,提供端点以便将消息 POST 到输入 Topic,并 GET 指定单词的计数。重要的是,应用从 Kafka Streams 状态存储而不是输出 Topic 中检索数据。

首先,修改之前的 KTable,并将聚合计数具体化为本地状态存储。这样就可以通过 REST Controller 进行查询:

KTable<String, Long> wordCounts = textStream
  .mapValues((ValueMapper<String, String>) String::toLowerCase)
  .flatMapValues(value -> Arrays.asList(value.split("\\W+")))
  .groupBy((key, value) -> value, Grouped.with(STRING_SERDE, STRING_SERDE))
  .count(Materialized.as("counts"));

之后,更新 Controller,从 counts 状态存储中检索计数值:

@GetMapping("/count/{word}")
public Long getWordCount(@PathVariable String word) {
    KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
    ReadOnlyKeyValueStore<String, Long> counts = kafkaStreams.store(
      StoreQueryParameters.fromNameAndType("counts", QueryableStoreTypes.keyValueStore())
    );
    return counts.get(word);
}

如上,factoryBeanStreamsBuilderFactoryBean 的一个实例,它被注入到 Controller 中。这提供了由该工厂 Bean 管理的 KafkaStreams 实例。因此,我们可以获得之前创建的由 KTable 表示的 key/value counts 状态存储。此时,可以使用它从本地状态存储中获取请求的单词的当前计数。

5、测试 {#5测试}

测试是开发和验证应用拓扑的关键部分。Spring Kafka 测试库和 Testcontainers 都在不同层面为测试应用提供了出色的支持。

5.1、单元测试 {#51单元测试}

首先,使用 TopologyTestDriver 为拓扑结构设置一个单元测试。这是测试 Kafka Streams 应用的主要测试工具:

@Test
void givenInputMessages_whenProcessed_thenWordCountIsProduced() {
    StreamsBuilder streamsBuilder = new StreamsBuilder();
    wordCountProcessor.buildPipeline(streamsBuilder);
    Topology topology = streamsBuilder.build();

    try (TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, new Properties())) {
        TestInputTopic<String, String> inputTopic = topologyTestDriver
          .createInputTopic("input-topic", new StringSerializer(), new StringSerializer());
        
        TestOutputTopic<String, Long> outputTopic = topologyTestDriver
          .createOutputTopic("output-topic", new StringDeserializer(), new LongDeserializer());

        inputTopic.pipeInput("key", "hello world");
        inputTopic.pipeInput("key2", "hello");

        assertThat(outputTopic.readKeyValuesToList())
          .containsExactly(
            KeyValue.pair("hello", 1L),
            KeyValue.pair("world", 1L),
            KeyValue.pair("hello", 2L)
          );
    }
}

首先将业务逻辑从 WordCountProcessor 中封装到测试中的 Topology 中。现在,可以使用 TopologyTestDriver 为测试创建输入和输出 Topic。最重要的是,这样就不需要运行代理,同时还能验证管道(Pipeline)行为。换句话说,它让我们无需使用真正的 Kafka Broker 就能快速、轻松地验证管道行为。

5.2、集成测试 {#52集成测试}

最后,使用 Testcontainers 框架来端到端测试应用。这将使用运行中的 Kafka Broker,并启动应用进行完整的测试:

@Testcontainers
@SpringBootTest(classes = KafkaStreamsApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
class KafkaStreamsApplicationLiveTest {

    @Container
    private static final KafkaContainer KAFKA = new KafkaContainer(
      DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

    private final BlockingQueue<String> output = new LinkedBlockingQueue<>();

    // 其他的测试设置

    @Test
    void givenInputMessages_whenPostToEndpoint_thenWordCountsReceivedOnOutput() throws Exception {
        postMessage("test message");

        startOutputTopicConsumer();

        // 断言输出 Topic 的计数正确
        assertThat(output.poll(2, MINUTES)).isEqualTo("test:1");
        assertThat(output.poll(2, MINUTES)).isEqualTo("message:1");

        // 断言来自 REST 服务的计数正确
        assertThat(getCountFromRestServiceFor("test")).isEqualTo(1);
        assertThat(getCountFromRestServiceFor("message")).isEqualTo(1);
    }
}

如上,向 REST Controller 发送了一个 POST,而 REST Controller 又将消息发送到 Kafka 输入 Topic。在设置中,还启动了一个 Kafka 消费者。它异步监听 Kafka 输出 Topic,并用接收到的字数更新 BlockingQueue

在测试执行期间,应用处理输入信息。随后,可以使用 REST 服务验证 Topic 和状态存储的输出是否符合预期。

6、总结 {#6总结}

本文介绍了如何使用 Kafka Streams 和 Spring Boot 创建一个简单的事件驱动应用来处理消息。


参考:https://www.baeldung.com/spring-boot-kafka-streams

赞(0)
未经允许不得转载:工具盒子 » Spring Boot 整合 Kafka Stream