1、概览 {#1概览}
本文将带你了解 Spring Kafka 中的 "Trusted Packages" 功能,了解其背后的动机以及用法。
2、先决条件 {#2先决条件}
一般来说,Spring Kafka 模块允许我们指定一些关于发送的 POJO 的元数据。它通常采用 Kafka Message Header 的形式。
例如,可以这样配置 ProducerFactory
:
@Bean
public ProducerFactory<Object, SomeData> producerFactory() {
JsonSerializer<SomeData> jsonSerializer = new JsonSerializer<>();
jsonSerializer.setAddTypeInfo(true);
return new DefaultKafkaProducerFactory<>(
producerFactoryConfig(),
new StringOrBytesSerializer(),
jsonSerializer
);
}
@Data
@AllArgsConstructor
static class SomeData {
private String id;
private String type;
private String status;
private Instant timestamp;
}
然后,使用上面用 producerFactory
配置的 KafkaTemplate
在一个 Topic 中生产一条新消息:
public void sendDataIntoKafka() {
SomeData someData = new SomeData("1", "active", "sent", Instant.now());
kafkaTemplate.send(new ProducerRecord<>("sourceTopic", null, someData));
}
此时,我们会在 Kafka 消费者的控制台中收到以下信息:
CreateTime:1701021806470 __TypeId__:com.baeldung.example.SomeData null {"id":"1","type":"active","status":"sent","timestamp":1701021806.153965150}
可以看到,消息中 POJO 的类型信息就在 Header 信息中。当然,这是 Spring Kafka 独有的特性。也就是说,从 Kafka 或其他框架的角度来看,这些 Header 只是元数据。因此,我们可以假设消费者和生产者都使用 Spring 来处理 Kafka 消息。
3、Trusted Packages 特性 {#3trusted-packages-特性}
在某些情况下,这是一个非常有用的功能。当 Topic 中的消息具有不同的 Payload 类型时,为消费者提示 Payload 的类型将非常有用。
不过,一般来说,我们知道 Topic 中会出现哪些类型的消息。因此,限制消费者可能接受的 Payload 类型可能是个好主意。这就是 Spring Kafka "Trusted Packages" 的意义所在。
4、用法 {#4用法}
"Trusted Packages" 是 Spring Kafka 中的一个功能,它在反序列化器(deserializer)级别进行配置。如果配置了 Trusted Packages,Spring 将查找传入消息的 type Header。然后,它会检查消息中提供的所有类型是否都是受信任的 --- 包括 Key 和 Value。
这主要是指在 相应 Header 中指定的 Key 和 Value 的 Java 类必须位于受信任的包内。如果一切正常,Spring 就会将消息传递,进一步反序列化。如果 Header 不存在,Spring 将直接反序列化对象,而不会检查 "Trusted Packages":
@Bean
public ConsumerFactory<String, SomeData> someDataConsumerFactory() {
JsonDeserializer<SomeData> payloadJsonDeserializer = new JsonDeserializer<>();
payloadJsonDeserializer.addTrustedPackages("com.baeldung.example");
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(),
new StringDeserializer(),
payloadJsonDeserializer
);
}
如果用星号(*
) 代替具体的 package
,Spring 会信任所有包:
JsonDeserializer<SomeData> payloadJsonDeserializer = new JsonDeserializer<>();
payloadJsonDeserializer.trustedPackages("*");
然而,在这种情况下,使用 "Trusted packages" 并没有任何作用,只会产生额外的开销。
5.1、第一个动机: 一致性 {#51第一个动机-一致性}
这个功能之所以很棒,有两个主要原因。首先,如果集群中出现问题,我们可以快速失败。想象一下,某个生产者意外地向一个他不应该发布消息的 Topic 发布了消息。这可能会造成很多问题,特别是如果我们成功地对传入的消息进行反序列化。在这种情况下,整个系统的行为可能是不确定的。
因此,如果生产者发布的消息中包含类型信息,并且消费者知道它信任的类型,那么所有这些问题都可以避免。当然,这假设生产者的消息类型与消费者期望的类型不同。但是这个假设是相当合理的,因为这个生产者本来就不应该在这个主题中发布消息。
5.2、第二个动机: 安全 {#52第二个动机-安全}
但最重要的还是安全问题。在前面的例子中,我们强调生产者无意中向 Topic 发布了信息。但这也可能是一种有意攻击。恶意生产者可能故意将信息发布到特定 Topic 中,以利用反序列化漏洞。因此,通过防止对不需要的消息进行反序列化,Spring 提供了额外的安全措施来降低安全风险。
在这里非常重要的一点是,"Trusted Packages" 功能并不能解决 "Header 伪造" 攻击。在这种情况下,攻击者篡改消息的 Header,使接收者误以为该消息是合法的并且来自受信任的源头。因此,通过提供正确的类型 Header,攻击者仍然可能欺骗 Spring,后者将继续进行消息的反序列化。但这个问题非常复杂,不是本文讨论的主题。总的来说,Spring 只是提供了额外的安全措施,以最大程度减少黑客得逞的风险。
6、总结 {#6总结}
本文介绍了Spring Kafka 的 "Trusted Packages" 特性,该功能为分布式消息系统提供了额外的一致性和安全性。
Ref:https://www.baeldung.com/spring-kafka-trusted-packages-feature