51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

RocketMQ 是如何刷盘的?

你好,我是猿java。

RocketMQ 如何保证发送消息不丢失? 这篇文章中提到了刷盘,这篇文章,我们将详细介绍 RocketMQ 的刷盘机制,包括它写了哪些文件,如何写入磁盘,以及相关的源码分析和示例代码。

本文源码基于 RocketMQ 5.0

RocketMQ 刷盘原理 {#RocketMQ-刷盘原理}

RocketMQ 的刷盘流程主要涉及以下几类文件:

  1. CommitLog 文件:存储所有消息,支持顺序写入和随机读取。
  2. ConsumeQueue 文件:存储消息的逻辑索引,加速消息消费。
  3. Index 文件:存储消息的索引信息,支持根据 Key 或时间区间快速查找消息。

CommitLog 文件 {#CommitLog-文件}

CommitLog文件是 RocketMQ 存储消息的核心文件,所有的消息首先被顺序写入到CommitLog文件中。

消息写入CommitLog文件的过程如下:

  • 消息到达 Broker 后,通过 DefaultMessageStore 类的 putMessage 方法写入。
  • putMessage 方法调用CommitLog类的 putMessage 方法。
  • CommitLog将消息写入到内存映射文件(MappedFile)中。
  • 根据刷盘策略(同步或异步),将数据刷入磁盘。

简化的写入代码如下:

|-------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 | public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // 省略部分代码... // 写入消息到 MappedByteBuffer AppendMessageResult appendMessageResult = mappedFile.appendMessage(msg, this.appendMessageCallback); // 触发刷盘 handleDiskFlush(appendMessageResult, putMessageResult, msg); return putMessageResult; } |

读取消息时,通过消息的物理偏移量(offset)从 CommitLog文件中读取,偏移量信息通常存储在ConsumeQueue文件中。

ConsumeQueue 文件 {#ConsumeQueue-文件}

因为CommitLog文件包括当前 Broker 所有 Topic的信息,因此,为了消费者能够快速的定位到某个具体 Topic的信息,需要把CommitLog文件中的消息分别发送到每个 Topic,因此,RocketMQ 使用了一种叫做 ConsumeQueue的文件。

ConsumeQueue文件是消息的逻辑索引文件,该文件的每个条目(Entry)具有固定的大小,每个条目占用 20 字节。具体结构如下:

  • CommitLog Offset (8 bytes):消息在 CommitLog 文件中的物理偏移量。
  • Message Size (4 bytes):消息的大小。
  • Tag Hashcode (8 bytes):消息 Tag 的哈希值,用于消息过滤。

当消息写入CommitLog文件后,RocketMQ 会将消息的偏移量信息写入到相应的ConsumeQueue文件中,每个 Topic 的每个队列都有一个对应的ConsumeQueue文件。

简化的写入代码如下:

|---------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 | public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long cqOffset) { // 省略部分代码... // 写入 ConsumeQueue this.mappedFile.appendMessage(positionInfo, this.appendMessageCallback); // 触发刷盘 handleDiskFlush(); } |

消费者在消费消息时,会首先从ConsumeQueue文件中读取消息的物理偏移量,然后根据偏移量从CommitLog文件中读取消息内容。

Index 文件 {#Index-文件}

Index文件是消息的索引文件,用于根据消息的 Key 或时间区间快速查找消息。它类似于数据库中的索引。

当消息写入CommitLog文件后,RocketMQ 会根据消息的 Key 生成索引,并将索引信息写入到Index文件中。

简化的写入代码如下:

|-----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 | public void putKey(String key, long offset, long storeTimestamp) { // 计算 Key 的哈希值 int keyHash = key.hashCode(); // 写入索引信息 this.mappedFile.appendMessage(new IndexEntry(keyHash, offset, storeTimestamp)); } |

根据消息的 Key 或时间区间查找消息时,RocketMQ 会首先从Index文件中查找对应的偏移量,然后根据偏移量从CommitLog文件中读取消息内容。

img

刷盘方式 {#刷盘方式}

RocketMQ 的刷盘机制主要分为同步刷盘和异步刷盘两种方式:

  1. 同步刷盘:消息写入 CommitLog 文件后,立即将数据刷入磁盘,然后返回写入成功的响应给生产者。同步刷盘的可靠性高,但性能相对较低。
  2. 异步刷盘:消息写入 CommitLog 文件后,立即返回写入成功的响应给生产者,后台线程负责将数据批量刷入磁盘。异步刷盘的性能高,但可靠性相对较低。

源码分析 {#源码分析}

这个部分,我们将详细的分析 RocketMQ 关于刷盘机制的核心源码。

CommitLog 文件刷盘 {#CommitLog-文件刷盘}

CommitLog 文件的刷盘逻辑主要在 CommitLog.java 类中,以下是简化的代码片段:

|---------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | public class CommitLog { // 刷盘服务类 private final FlushCommitLogService flushCommitLogService; public CommitLog() { this.flushCommitLogService = new FlushRealTimeService(); } // 消息写入方法 public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // 省略部分代码... // 写入消息到 MappedByteBuffer appendMessageResult = mappedFile.appendMessage(msg, this.appendMessageCallback); // 触发刷盘 handleDiskFlush(appendMessageResult, putMessageResult, msg); return putMessageResult; } // 处理刷盘逻辑 public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // 同步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.future().get(); if (!flushOK) { putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } // 异步刷盘 else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { this.flushCommitLogService.wakeup(); } else { this.commitLogService.wakeup(); } } } } |

代码解释:

  • 消息写入:当消息到达 CommitLog 时,首先调用 putMessage 方法,这里的 mappedFile.appendMessage 方法将消息写入到内存映射文件(MappedByteBuffer)中。
  • 触发刷盘:接下来调用 handleDiskFlush 方法触发刷盘操作,
  • 刷盘:如果配置为同步刷盘(FlushDiskType.SYNC_FLUSH),则会创建一个 GroupCommitRequest 请求,并将其提交到 GroupCommitService;如果配置为异步刷盘(FlushDiskType.ASYNC_FLUSH),则会唤醒 FlushRealTimeService 或 CommitLogService 执行刷盘操作。

FlushCommitLogService 类 {#FlushCommitLogService-类}

FlushCommitLogService 是一个抽象类,RocketMQ 提供了 FlushRealTimeServiceGroupCommitService 两个具体实现。

  • FlushRealTimeService:用于异步刷盘。
  • GroupCommitService:用于同步刷盘。

以下是 FlushRealTimeService 的简化代码:

|------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | class FlushRealTimeService extends FlushCommitLogService { @Override public void run() { while (!this.isStopped()) { this.waitForRunning(10); this.doFlush(); } } private void doFlush() { // 省略部分代码... mappedFileQueue.flush(0); } } |

FlushRealTimeService 是 RocketMQ 中用于异步刷盘的核心类,它通过后台线程定时将内存中的数据刷入磁盘,以保证数据的持久化。其主要逻辑包括:

  • 定时检查是否需要刷盘;
  • 根据配置参数执行刷盘操作;
  • 更新刷盘时间戳;
  • 等待下一个刷盘周期;

异步刷盘的触发条件 {#异步刷盘的触发条件}

异步刷盘主要在以下几种情况下被触发:

  • 消息写入到 CommitLog 后:当一条消息被写入到 CommitLog 文件后,如果配置为异步刷盘,RocketMQ 会唤醒异步刷盘服务,让其尽快将数据刷入磁盘。

  • 定时触发:RocketMQ 的异步刷盘服务通常会在一个固定的时间间隔内被定时触发,以确保数据在一定的时间内被刷入磁盘。

  • 达到某个阈值:当内存中的数据量达到某个阈值时,也会触发异步刷盘。这种阈值通常是通过配置来控制的,例如内存缓冲区的大小。

总结 {#总结}

RocketMQ 的刷盘机制通过同步刷盘和异步刷盘两种方式,确保消息在高性能和高可靠性之间找到平衡。理解其刷盘机制及源码实现,对于优化和调试RocketMQ系统具有重要意义。因此,还是建议有时间读读 RocketMQ的源码。

赞(5)
未经允许不得转载:工具盒子 » RocketMQ 是如何刷盘的?