嗨,你好呀,我是猿java
Kafka的 ACK机制是确保消息成功传递和处理的重要机制.这篇文章,我们将详细分析 Kafka ACK机制,包括其原理、源码分析、使用场景以及优缺点。
ACK 方式 {#ACK-方式}
Kafka的 ACK机制主要用于确保生产者发送的消息能够被可靠地写入到 Kafka集群的 Topic中。ACK机制的核心思想是生产者发送消息后,需要等待 Kafka集群的确认(ACK),才认为消息发送成功。
Kafka的 ACK机制主要有三种级别:
acks=0
生产者不等待服务器的确认,消息发送后即认为成功,不管消息是否真正写入 Kafka,这种方式效率最高,但可靠性最低,数据可能存在丢失。
acks=1
生产者会等待来自 Leader分区的确认。Leader分区接收到消息并写入本地日志后即返回确认。这种方式在 Leader分区可用时可靠,但如果 Leader分区发生故障,可能会丢失数据。从 Kafka 2.0 开始,默认值是 acks=1
acks=all(或-1)
生产者等待所有 ISR(In-Sync Replica,同步副本)分区的确认。只有当消息被写入所有同步副本后才返回确认,这种方式最可靠,但性能较低。
ISR的工作原理 {#ISR的工作原理}
ISR,全称 In-Sync Replicas,翻译为同步副本,它是指某个分区中的一组与 Leader副本保持同步的副本,即这些副本包含了 Leader副本中的所有已确认消息。ISR是 Kafka 集群中用于保证数据可靠性的一个关键概念。
-
Leader和 Follower:在 Kafka中,每个分区都有一个 Leader和若干个 Follower,Leader负责处理所有的读写请求,而 Follower则从 Leader那里拉取数据并进行同步。
-
同步副本(ISR):ISR是一个动态的集合,包含了 Leader和所有与 Leader保持同步的 Follower,只有在 ISR中的副本才被认为是可靠的,因为它们包含了与 Leader相同的数据。
-
ACK机制与 ISR :当生产者发送消息并设置
acks=all
时,Kafka只有在消息被写入 ISR中的所有副本后才会返回确认,这确保了消息即使在 Leader故障的情况下也不会丢失,因为 ISR中的其他副本可以选举为新的 Leader。
ISR的维护 {#ISR的维护}
Kafka通过以下机制来维护ISR:
-
加入ISR:当一个 Follower副本成功地追上了 Leader副本的日志(即复制了 Leader的所有新的消息),它会被加入到 ISR中。
-
移出ISR :当一个 Follower副本落后于 Leader超过一定的时间(由参数
replica.lag.time.max.ms
控制),它会被移出 ISR。
ISR源码分析 {#ISR源码分析}
以下是 Kafka中维护ISR的关键代码片段(以 Kafka 2.x版本为例):
|------------------------------------------------------------------------------------------||
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| class Partition { private Set<Replica> isr; // 当前分区的ISR集合 public void updateISR() { // 获取所有副本的状态 List<Replica> replicas = getReplicas(); // 计算新的ISR集合 Set<Replica> newIsr = new HashSet<>(); for (Replica replica : replicas) { if (replica.isInSync()) { newIsr.add(replica); } } // 更新ISR if (!newIsr.equals(this.isr)) { this.isr = newIsr; // 触发ISR变化的事件 onISRChanged(); } } } class Replica { public boolean isInSync() { // 判断该副本是否与Leader同步 return this.logEndOffset >= leaderLogEndOffset - replicaLagMaxMessages; } }
|
源码分析 {#源码分析}
以 Kafka的 Producer端代码为例,下面是简化后的发送消息时处理ACK机制的关键代码片段:
|------------------------------------------------------------||
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // 构建请求 ProduceRequest request = new ProduceRequest(record, callback); // 发送请求 Future<RecordMetadata> future = this.sender.send(request); // 根据ACK配置处理确认 if (this.acks == 0) { // 不等待确认,直接返回成功 callback.onCompletion(null, null); } else if (this.acks == 1) { // 等待Leader确认 RecordMetadata metadata = future.get(); callback.onCompletion(metadata, null); } else if (this.acks == -1 || this.acks == "all") { // 等待所有ISR确认 RecordMetadata metadata = future.get(); callback.onCompletion(metadata, null); } return future; }
|
优缺点 {#优缺点}
acks=0
- 优点:性能最高,延迟最低。
- 缺点:消息可能丢失,可靠性最低。
acks=1
- 优点:在性能和可靠性之间取得平衡。
- 缺点:如果领导者在消息写入后但未同步给副本前崩溃,消息可能丢失。
acks=all
- 优点:最高的可靠性,确保消息被所有同步副本确认。
- 缺点:性能较低,延迟较高。
缺点 {#缺点}
- 性能影响:更高的ACK级别会带来更高的延迟,降低吞吐量。
- 复杂性:需要根据具体应用场景选择合适的ACK配置,增加了系统设计的复杂性。
适用场景 {#适用场景}
- acks=0:适用于对消息丢失不敏感且追求高吞吐量的场景,例如日志收集、监控数据等。
- acks=1:适用于对消息有一定可靠性要求,但对性能要求较高的场景,例如实时数据处理。
- acks=all:适用于对消息可靠性要求极高且可以接受较低吞吐量的场景,例如金融交易、订单处理等。
总结 {#总结}
本文我们分析了 Kafka的 ACK机制以及 ISR机制,从全局来看, Kafka 和 RocketMQ有着异曲同工之妙,Kafka的 ack=all
对应 RocketMQ的同步发送,ack=1
对应 RocketMQ的异步发送,ack=0
对应 RocketMQ的单向发送。
总体来说,Kafka的 ACK机制为消息的可靠传递提供了不同级别的保障,开发者可以根据具体的应用需求选择合适的 ACK配置,以在性能和可靠性之间取得平衡。
交流学习 {#交流学习}
最后,把猿哥的座右铭送给你:投资自己才是最大的财富。 如果你觉得文章有帮助,请帮忙转发给更多的好友,或关注公众号:猿java,持续输出硬核文章。