1、简介 {#1简介}
本文将带你了解在 Spring Boot 中如何配置 SSL 认证以连接到 Apache Kafka Broker。
安全套接字层(SSL)实际上已被弃用,自 2015 年起被传输层安全(TLS)所取代。不过,由于历史原因,Kafka(和 Java)仍然使用 "SSL"。
2、SSL 概览 {#2ssl-概览}
默认情况下,Apache Kafka 以明文形式发送所有数据,且不进行任何身份认证。
首先,可以为 Broker 和客户端之间的加密配置 SSL。默认情况下,这需要使用公钥加密进行单向身份认证,由客户端验证服务器证书。
此外,服务器还可以使用单独的机制(如 SSL 或 SASL)对客户端进行身份认证,从而实现双向身份认证或相互 TLS(mTLS)。基本上,双向 SSL 认证确保客户端和服务器都使用 SSL 证书来认证对方的身份,并在双向上相互信任。
在本文中,Broker 使用 SSL 对客户端进行身份验证,使用 Keystore 和 Truststore 保存证书和密钥。
每个 Broker 都需要自己的 Keystore,其中包含私钥和公共证书。客户端使用其 Truststore 来验证该证书并信任服务器。同样,每个客户端也需要自己的 Keystore,其中包含私钥和公共证书。服务器使用其 Truststore 来验证和信任客户端的证书,并建立安全连接。
Truststore 可以包含一个可以签署证书的证书颁发机构(CA)。在这种情况下,Broker 或客户端会信任由 Truststore 中的 CA 签发的任何证书。这就简化了证书验证,因为添加新客户端或 Broker 无需更改 Truststore。
3、依赖和设置 {#3依赖和设置}
创建一个简单的 Spring Boot 示例应用,在 pom.xml
中添加 spring-kafka
依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
使用 Docker Compose 文件来配置和测试 Kafka 服务器设置。
首先,不配置任何 SSL:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
启动容器:
docker-compose up
这会以默认配置拉起 Broker。
4、Broker 配置 {#4broker-配置}
来看看建立安全连接所需的最低配置。
4.1、单机 Broker {#41单机-broker}
本例使用单机 Broker 来介绍启用 SSL 身份认证所需的最低配置。
首先,需要在 server.properties
中配置 Broker 通过 9093 端口监听 SSL 连接:
listeners=PLAINTEXT://kafka1:9092,SSL://kafka1:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
接着,使用 keystore
和 truststore
相关属性配置证书和凭证。
ssl.keystore.location=/certs/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.truststore.location=/certs/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.key.password=password
最后,配置 Broker 对客户端进行身份认证,从而实现双向身份认证:
ssl.client.auth=required
4.2、Docker Compose {#42docker-compose}
由于我们使用 Docker Compose 来管理 Broker,现将上述所有属性添加到 docker-compose.yml
文件的 environment
中:
kafka:
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper
ports:
- 9092:9092
- 9093:9093
environment:
...
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,SSL://localhost:9093
KAFKA_SSL_CLIENT_AUTH: 'required'
KAFKA_SSL_KEYSTORE_FILENAME: '/certs/kafka.server.keystore.jks'
KAFKA_SSL_KEYSTORE_CREDENTIALS: '/certs/kafka_keystore_credentials'
KAFKA_SSL_KEY_CREDENTIALS: '/certs/kafka_sslkey_credentials'
KAFKA_SSL_TRUSTSTORE_FILENAME: '/certs/kafka.server.truststore.jks'
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: '/certs/kafka_truststore_credentials'
volumes:
- ./certs/:/etc/kafka/secrets/certs
如上,在 ports
配置公开了 SSL 端口(9093)。此外,还在 volumes
配置挂载了 certs
项目文件夹。其中包含所需的证书和相关凭证。
现在,使用 Compose 重新启动应用。可以在 Broker 日志中看到 SSL 相关的信息:
...
kafka_1 | uid=1000(appuser) gid=1000(appuser) groups=1000(appuser)
kafka_1 | ===> Configuring ...
<strong>kafka_1 | SSL is enabled.</strong>
....
kafka_1 | [2021-08-20 22:45:10,772] INFO KafkaConfig values:
<strong>kafka_1 | advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
kafka_1 | ssl.client.auth = required</strong>
<strong>kafka_1 | ssl.enabled.protocols = [TLSv1.2, TLSv1.3]</strong>
kafka_1 | ssl.endpoint.identification.algorithm = https
kafka_1 | ssl.key.password = [hidden]
kafka_1 | ssl.keymanager.algorithm = SunX509
<strong>kafka_1 | ssl.keystore.location = /etc/kafka/secrets/certs/kafka.server.keystore.jks</strong>
kafka_1 | ssl.keystore.password = [hidden]
kafka_1 | ssl.keystore.type = JKS
kafka_1 | ssl.principal.mapping.rules = DEFAULT
<strong>kafka_1 | ssl.protocol = TLSv1.3</strong>
kafka_1 | ssl.trustmanager.algorithm = PKIX
kafka_1 | ssl.truststore.certificates = null
<strong>kafka_1 | ssl.truststore.location = /etc/kafka/secrets/certs/kafka.server.truststore.jks</strong>
kafka_1 | ssl.truststore.password = [hidden]
kafka_1 | ssl.truststore.type = JKS
....
5、Spring Boot 客户端 {#5spring-boot-客户端}
服务器设置完成后,现在来创建和 Broker 进行 SSL 双向认证所需的 Spring Boot 组件。
5.1、Producer {#51producer}
首先,使用 KafkaTemplate
向指定的 Topic 发送一条消息:
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message, String topic) {
log.info("Producing message: {}", message);
kafkaTemplate.send(topic, "key", message)
.addCallback(
result -> log.info("Message sent to topic: {}", message),
ex -> log.error("Failed to send message", ex)
);
}
}
send
方法是一个异步操作。因此,添加了一个简单的回调,一旦 Broker 收到消息,它就会输出日志。
5.2、Consumer {#52consumer}
接着,使用 @KafkaListener
创建一个简单的消费者。它会连接到 Broker,并从和生产者相同的 Topic 中消费消息:
public class KafkaConsumer {
public static final String TOPIC = "test-topic";
public final List<String> messages = new ArrayList<>();
@KafkaListener(topics = TOPIC)
public void receive(ConsumerRecord<String, String> consumerRecord) {
log.info("Received payload: '{}'", consumerRecord.toString());
messages.add(consumerRecord.value());
}
}
在本例中,为了简单,消费者只需将消息存储在 List
中。在实际系统中,消费者会接收消息,并根据应用的业务逻辑对其进行处理。
5.3、配置 {#53配置}
最后,在 application.yml
中添加必要的配置:
spring:
kafka:
security:
protocol: "SSL"
bootstrap-servers: localhost:9093
ssl:
trust-store-location: classpath:/client-certs/kafka.client.truststore.jks
trust-store-password: <password>
key-store-location: classpath:/client-certs/kafka.client.keystore.jks
key-store-password: <password>
# producer/consumer 的额外配置
如上,通过 Spring Boot 提供的属性配置了生产者和消费者。由于这两个组件都连接到同一个 Broker,可以将所有必要的属性声明在 spring.kafka
下。然而,如果生产者和消费者连接到不同的 Broker,则需要分别在 spring.kafka.producer
和 spring.kafka.consumer
下指定这些属性。
在配置的 ssl
部分,指定了 JKS Truststore,以验证 Kafka Broker。其中包含签署了 Broker 证书的 CA 的证书。此外,还提供了 Spring 客户端 Keystore 的路径,其中包含由 CA 签发的证书,该证书应存在于 Broker 端的 Truststore 中。
5.4、测试 {#54测试}
由于使用的是 Compose,因此可以使用 Testcontainers 框架为生产者和消费者创建一个端到端的测试:
@ActiveProfiles("ssl")
@Testcontainers
@SpringBootTest(classes = KafkaSslApplication.class)
class KafkaSslApplicationLiveTest {
private static final String KAFKA_SERVICE = "kafka";
private static final int SSL_PORT = 9093;
@Container
public DockerComposeContainer<?> container =
new DockerComposeContainer<>(KAFKA_COMPOSE_FILE)
.withExposedService(KAFKA_SERVICE, SSL_PORT, Wait.forListeningPort());
@Autowired
private KafkaProducer kafkaProducer;
@Autowired
private KafkaConsumer kafkaConsumer;
@Test
void givenSslIsConfigured_whenProducerSendsMessageOverSsl_thenConsumerReceivesOverSsl() {
String message = generateSampleMessage();
kafkaProducer.sendMessage(message, TOPIC);
await().atMost(Duration.ofMinutes(2))
.untilAsserted(() -&gt; assertThat(kafkaConsumer.messages).containsExactly(message));
}
private static String generateSampleMessage() {
return UUID.randomUUID().toString();
}
}
运行测试时,Testcontainers 会使用 Compose 文件(包括 SSL 配置)启动 Kafka Broker。应用也会启动并使用其 SSL 配置,通过加密和身份认证的连接与 Broker 建立连接。由于这是一连串异步事件,所以使用 await
轮询消费者消息存储中的预期消息。以此来验证所有配置以及 Broker 和客户端之间的双向身份认证是否成功。
6、总结 {#6总结}
本文介绍了 Kafka Broker 如何配置 SSL 双向认证,以及如何在 Spring Boot 中配置 SSL 双向认证连接到 Kafka Borker 进行生产和消费。
Ref:https://www.baeldung.com/spring-boot-kafka-ssl