51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

将数据发送到 Kafka 中的特定分区

1、简介 {#1简介}

Apache Kafka 是一个分布式流平台,擅长处理海量实时数据流。Kafka 将数据组织成 Topic(主题),并进一步将 Topic 划分为 Partition(分区)。每个分区都是一个独立的 Channel(通道),可实现并行处理和容错。

本文将带你了解如何把数据发送到 Kafka 中特定的分区。

2、理解 Kafka 分区 {#2理解-kafka-分区}

首先来了解一下 Kafka 分区的基本概念。

2.1、什么是 Kafka 分区? {#21什么是-kafka-分区}

当生产者向 Kafka Topic 发送消息时,Kafka 会使用指定的分区策略将这些消息组织到分区中。分区是一个基本单元,代表了线性、有序的消息序列。消息一旦产生,就会根据所选的分区策略被分配到一个特定的分区。随后,消息会被附加到该分区中日志的末尾。

2.2、并行消费与消费组 {#22并行消费与消费组}

一个 Kafka Topic 可分为多个分区,一个消费组(Consumer Group)可被分配到这些分区的一个子集。组内的每个消费者都会独立处理来自其分配分区的消息。这种并行处理机制提高了整体吞吐量和可扩展性,使 Kafka 能够高效地处理大量数据。

2.3、顺序保证 {#23顺序保证}

在单个分区中,Kafka 可确保按照接收到的相同顺序处理消息。这保证了依赖消息顺序的应用(如金融交易或事件日志)的顺序处理。不过,需要注意的是,由于网络延迟和其他操作因素,接收消息的顺序可能与最初发送消息的顺序不同。

在不同的分区中,Kafka 并不保证顺序。来自不同分区的消息可能会被并发处理,从而带来事件顺序变化的可能性。在设计依赖于严格消息顺序的应用时,需要考虑到这个特性。

2.4、容错和高可用 {#24容错和高可用}

分区还有助于 Kafka 实现出色的容错能力。每个分区都可以在多个 Broker 之间复制。如果 Broker 发生故障,副本分区仍可被访问,并确保对数据的持续访问。

Kafka 集群可以将消费者无缝重定向到健康的 Broker,从而保持数据的可用性和系统的高可靠性。

3、为什么要将数据发送到特定分区? {#3为什么要将数据发送到特定分区}

3.1、数据亲和性 {#31数据亲和性}

数据亲和性是指有意将相关数据归入同一分区。通过将相关数据发送到特定分区,可以确保这些数据一起处理,从而提高处理效率。

例如,考虑一个场景,我们可能希望确保客户的订单位于同一个分区中,以便进行订单追踪和分析。保证特定客户的所有订单都进入同一个分区可以简化追踪和分析过程。

3.2、负载均衡 {#32负载均衡}

此外,在分区之间均匀地分配数据有助于确保最佳的资源利用率。在分区之间平均分配数据有助于优化 Kafka 集群内的资源利用率。通过根据负载情况向分区发送数据,可以防止出现资源瓶颈,确保每个分区都能接收到可管理的均衡工作量。

3.3、优先顺序 {#33优先顺序}

在某些情况下,并非所有数据都具有相同的优先级或紧迫性。Kafka 的分区功能可将关键数据引导到专用分区进行快速处理,从而实现关键数据的优先级排序。与不太重要的数据相比,这种优先级排序可确保高优先级的消息得到及时关注和更快处理。

4、向特定分区发送数据的方式 {#4向特定分区发送数据的方式}

Kafka 提供了将消息分配到分区的各种策略,从而提供了数据分布和处理的灵活性。下面是一些可用于将消息发送到特定分区的常用方法。

4.1、粘性分区器(Sticky Partitioner) {#41粘性分区器sticky-partitioner}

在 Kafka 2.4 及以上版本中,粘性分区器(Sticky Partitioner)的目的是将没有 Key 的消息保持在同一个分区中。不过,这种行为并不是绝对的,它会与批处理设置(如 batch.sizelinger.ms)相互作用。

为了优化消息传递,Kafka 会在将消息发送到 Broker 之前将其分组为多个批次。batch.size 设置(默认为 16,384 字节)控制着最大批处理的大小,影响着消息在粘性分区器下的同一分区中停留的时间。

linger.ms 配置(默认值:0 毫秒)会在发送批处理前引入延迟,可能会延长无 Key 消息的粘性行为。

在下面的测试用例中,假设默认的批处理配置保持不变。发送三条消息,但不显式分配 Key,它们最初被分配到同一个分区:

kafkaProducer.send("default-topic", "message1");
kafkaProducer.send("default-topic", "message2");
kafkaProducer.send("default-topic", "message3");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 3);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();

Set<Integer> uniquePartitions = records.stream()
  .map(ReceivedMessage::getPartition)
  .collect(Collectors.toSet());

Assert.assertEquals(1, uniquePartitions.size());

4.2、基于 Key 的方式 {#42基于-key-的方式}

在基于 Key 的方法中,Kafka 会将具有相同 Key 的消息导向同一个分区,从而优化相关数据的处理。这是通过哈希函数实现的,确保将消息 Key 地映射到分区。

测试如下,具有相同 Key partitionA 的消息始终位于同一分区。

kafkaProducer.send("order-topic", "partitionA", "critical data");
kafkaProducer.send("order-topic", "partitionA", "more critical data");
kafkaProducer.send("order-topic", "partitionB", "another critical message");
kafkaProducer.send("order-topic", "partitionA", "another more critical data");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 4);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
Map<String, List<ReceivedMessage>> messagesByKey = groupMessagesByKey(records);

messagesByKey.forEach((key, messages) -> {
    int expectedPartition = messages.get(0)
      .getPartition();
    for (ReceivedMessage message : messages) {
        assertEquals("Messages with key '" + key + "' should be in the same partition", message.getPartition(), expectedPartition);
    }
});

此外,在基于 Key 的方法中,具有相同 Key 的消息会按照其在特定分区内产生的顺序被一致接收。这就保证了分区内消息顺序的保持,尤其是对于相关的消息而言。

测试如下,按特定顺序生成带有 Key partitionA 的消息,测试主动验证这些消息在分区内是否按相同顺序被接收:

kafkaProducer.send("order-topic", "partitionA", "message1");
kafkaProducer.send("order-topic", "partitionA", "message3");
kafkaProducer.send("order-topic", "partitionA", "message4");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 3);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();

StringBuilder resultMessage = new StringBuilder();
records.forEach(record -> resultMessage.append(record.getMessage()));
String expectedMessage = "message1message3message4";

assertEquals("Messages with the same key should be received in the order they were produced within a partition", 
  expectedMessage, resultMessage.toString());

4.3、自定义分区 {#43自定义分区}

为了实现更精准的控制,Kafka 允许定义自定义分区器。这些类实现了 Partitioner 接口,使我们能够根据消息内容、元数据或其他因素编写逻辑,以确定目标分区。

假如,在向 Kafka Topic 分派订单时,需要根据客户类型创建自定义分区逻辑。具体来说,优质客户的订单将被定向到一个分区,而普通客户的订单将被定向到另一个分区。

首先,创建一个名为 CustomPartitioner 的类,继承自 Kafka Partitioner 接口。在这个类中,用自定义逻辑覆写 partition() 方法,以确定每条消息的目标分区:

public class CustomPartitioner implements Partitioner {
    private static final int PREMIUM_PARTITION = 0;
    private static final int NORMAL_PARTITION = 1;

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String customerType = extractCustomerType(key.toString());
        return "premium".equalsIgnoreCase(customerType) ? PREMIUM_PARTITION : NORMAL_PARTITION;
    }

    private String extractCustomerType(String key) {
        String[] parts = key.split("_");
        return parts.length > 1 ? parts[1] : "normal";
    }
   
    // 其他方法
}

接下来,在 Kafka 中应用这个自定义分区器,需要在生产者配置中设置 PARTITIONER_CLASS_CONFIG 属性。Kafka 将根据 CustomPartitioner 类中定义的逻辑,使用该分区器来确定每条消息的分区。

setProducerToUseCustomPartitioner() 方法用于设置 Kafka 生产者使用的 CustomPartitioner

private KafkaTemplate<String, String> setProducerToUseCustomPartitioner() {
    Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
    DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);

    return new KafkaTemplate<>(producerFactory);
}

然后,构建一个测试用例,以确保自定义分区逻辑正确地将高级客户订单和普通客户订单路由到各自的分区:

KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();

kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 2);

consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0)));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
    assertEquals("Premium order message should be in partition 0", 0, record.partition());
    assertEquals("123_premium", record.key());
}

4.4、直接指定分区 {#44直接指定分区}

在 Topic 间手动迁移数据或调整分区间的数据分布时,直接指定分区有助于控制消息的位置。Kafka 提供了使用 ProductRecord 构造函数直接向特定分区发送消息的功能,该构造函数接受一个分区编号。通过指定分区编号,可以为每条消息明确指定目标分区。

测试如下,指定 send() 方法的第二个参数为分区编号:

kafkaProducer.send("order-topic", 0, "123_premium", "Premium order message");
kafkaProducer.send("order-topic", 1, "456_normal", "Normal order message");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 2);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();

for (ReceivedMessage record : records) {
    if ("123_premium".equals(record.getKey())) {
        assertEquals("Premium order message should be in partition 0", 0, record.getPartition());
    } else if ("456_normal".equals(record.getKey())) {
        assertEquals("Normal order message should be in partition 1", 1, record.getPartition());
    }
}

5、从特定分区消费 {#5从特定分区消费}

要在消费者端消费 Kafka 中特定分区的数据,可以使用 KafkaConsumer.assign() 方法指定要订阅的分区。这样可以对消费进行细粒度控制,但需要手动管理分区偏移量。

下面是使用 assign() 方法从特定分区消费消息的示例:

KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();

kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 2);

consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0)));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
    assertEquals("Premium order message should be in partition 0", 0, record.partition());
    assertEquals("123_premium", record.key());
}

6、潜在的挑战和考虑因素 {#6潜在的挑战和考虑因素}

当将消息发送到特定分区时,存在分区之间负载不均衡的风险。如果用于分区的逻辑不能将消息均匀地分布在所有分区上,就会出现这种情况。此外,扩展 Kafka 集群(涉及添加或删除 Broker)可能会触发分区重新分配。在重新分配期间,Broker 可能会移动分区,可能会破坏消息的顺序或导致暂时的不可用性。

因此,应该使用 Kafka 工具或指标定期监控每个分区的负载。例如,Kafka 管理客户端(Kafka Admin Client)和 Micrometer 可以帮助我们深入了解分区的健康状况和性能。使用管理客户端检索有关 Topic、分区及其当前状态的信息,并使用 Micrometer 进行指标监控。

此外,需要预见到主动调整分区策略或水平扩展 Kafka 集群的需要,以有效地管理特定分区上增加的负载。还可以考虑增加分区的数量或调整 Key 范围,以实现更均匀的分布。

7、总结 {#7总结}

总之,在 Apache Kafka 中向特定分区发送消息的功能为优化数据处理和提高整体系统效率提供了强大的可能性。

本文介绍了把消息发送到特定 Kafka 分区的各种方法,包括基于 Key 的方法、自定义分区和直接指定分配。每种方法都具有不同的优势,可以根据应用的具体要求进行定制。


Ref:https://www.baeldung.com/kafka-send-data-partition

赞(2)
未经允许不得转载:工具盒子 » 将数据发送到 Kafka 中的特定分区