你好,我是猿java。
在 RocketMQ 如何保证发送消息不丢失? 这篇文章中提到了刷盘,这篇文章,我们将详细介绍 RocketMQ 的刷盘机制,包括它写了哪些文件,如何写入磁盘,以及相关的源码分析和示例代码。
本文源码基于 RocketMQ 5.0
RocketMQ 刷盘原理 {#RocketMQ-刷盘原理}
RocketMQ 的刷盘流程主要涉及以下几类文件:
- CommitLog 文件:存储所有消息,支持顺序写入和随机读取。
- ConsumeQueue 文件:存储消息的逻辑索引,加速消息消费。
- Index 文件:存储消息的索引信息,支持根据 Key 或时间区间快速查找消息。
CommitLog 文件 {#CommitLog-文件}
CommitLog
文件是 RocketMQ 存储消息的核心文件,所有的消息首先被顺序写入到CommitLog
文件中。
消息写入CommitLog
文件的过程如下:
- 消息到达 Broker 后,通过 DefaultMessageStore 类的 putMessage 方法写入。
- putMessage 方法调用
CommitLog
类的 putMessage 方法。 CommitLog
将消息写入到内存映射文件(MappedFile)中。- 根据刷盘策略(同步或异步),将数据刷入磁盘。
简化的写入代码如下:
|-------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8
| public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // 省略部分代码... // 写入消息到 MappedByteBuffer AppendMessageResult appendMessageResult = mappedFile.appendMessage(msg, this.appendMessageCallback); // 触发刷盘 handleDiskFlush(appendMessageResult, putMessageResult, msg); return putMessageResult; }
|
读取消息时,通过消息的物理偏移量(offset)从 CommitLog
文件中读取,偏移量信息通常存储在ConsumeQueue
文件中。
ConsumeQueue 文件 {#ConsumeQueue-文件}
因为CommitLog
文件包括当前 Broker 所有 Topic的信息,因此,为了消费者能够快速的定位到某个具体 Topic的信息,需要把CommitLog
文件中的消息分别发送到每个 Topic,因此,RocketMQ 使用了一种叫做 ConsumeQueue
的文件。
ConsumeQueue
文件是消息的逻辑索引文件,该文件的每个条目(Entry)具有固定的大小,每个条目占用 20 字节。具体结构如下:
- CommitLog Offset (8 bytes):消息在 CommitLog 文件中的物理偏移量。
- Message Size (4 bytes):消息的大小。
- Tag Hashcode (8 bytes):消息 Tag 的哈希值,用于消息过滤。
当消息写入CommitLog
文件后,RocketMQ 会将消息的偏移量信息写入到相应的ConsumeQueue
文件中,每个 Topic 的每个队列都有一个对应的ConsumeQueue
文件。
简化的写入代码如下:
|---------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9
| public void putMessagePositionInfoWrapper(long offset, int size, long tagsCode, long cqOffset) { // 省略部分代码... // 写入 ConsumeQueue this.mappedFile.appendMessage(positionInfo, this.appendMessageCallback); // 触发刷盘 handleDiskFlush(); }
|
消费者在消费消息时,会首先从ConsumeQueue
文件中读取消息的物理偏移量,然后根据偏移量从CommitLog
文件中读取消息内容。
Index 文件 {#Index-文件}
Index
文件是消息的索引文件,用于根据消息的 Key 或时间区间快速查找消息。它类似于数据库中的索引。
当消息写入CommitLog
文件后,RocketMQ 会根据消息的 Key 生成索引,并将索引信息写入到Index
文件中。
简化的写入代码如下:
|-----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7
| public void putKey(String key, long offset, long storeTimestamp) { // 计算 Key 的哈希值 int keyHash = key.hashCode(); // 写入索引信息 this.mappedFile.appendMessage(new IndexEntry(keyHash, offset, storeTimestamp)); }
|
根据消息的 Key 或时间区间查找消息时,RocketMQ 会首先从Index
文件中查找对应的偏移量,然后根据偏移量从CommitLog
文件中读取消息内容。
刷盘方式 {#刷盘方式}
RocketMQ 的刷盘机制主要分为同步刷盘和异步刷盘两种方式:
- 同步刷盘:消息写入 CommitLog 文件后,立即将数据刷入磁盘,然后返回写入成功的响应给生产者。同步刷盘的可靠性高,但性能相对较低。
- 异步刷盘:消息写入 CommitLog 文件后,立即返回写入成功的响应给生产者,后台线程负责将数据批量刷入磁盘。异步刷盘的性能高,但可靠性相对较低。
源码分析 {#源码分析}
这个部分,我们将详细的分析 RocketMQ 关于刷盘机制的核心源码。
CommitLog 文件刷盘 {#CommitLog-文件刷盘}
CommitLog 文件的刷盘逻辑主要在 CommitLog.java
类中,以下是简化的代码片段:
|---------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class CommitLog { // 刷盘服务类 private final FlushCommitLogService flushCommitLogService; public CommitLog() { this.flushCommitLogService = new FlushRealTimeService(); } // 消息写入方法 public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // 省略部分代码... // 写入消息到 MappedByteBuffer appendMessageResult = mappedFile.appendMessage(msg, this.appendMessageCallback); // 触发刷盘 handleDiskFlush(appendMessageResult, putMessageResult, msg); return putMessageResult; } // 处理刷盘逻辑 public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // 同步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.future().get(); if (!flushOK) { putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } // 异步刷盘 else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { this.flushCommitLogService.wakeup(); } else { this.commitLogService.wakeup(); } } } }
|
代码解释:
- 消息写入:当消息到达 CommitLog 时,首先调用 putMessage 方法,这里的 mappedFile.appendMessage 方法将消息写入到内存映射文件(MappedByteBuffer)中。
- 触发刷盘:接下来调用 handleDiskFlush 方法触发刷盘操作,
- 刷盘:如果配置为同步刷盘(FlushDiskType.SYNC_FLUSH),则会创建一个 GroupCommitRequest 请求,并将其提交到 GroupCommitService;如果配置为异步刷盘(FlushDiskType.ASYNC_FLUSH),则会唤醒 FlushRealTimeService 或 CommitLogService 执行刷盘操作。
FlushCommitLogService 类 {#FlushCommitLogService-类}
FlushCommitLogService
是一个抽象类,RocketMQ 提供了 FlushRealTimeService
和 GroupCommitService
两个具体实现。
- FlushRealTimeService:用于异步刷盘。
- GroupCommitService:用于同步刷盘。
以下是 FlushRealTimeService
的简化代码:
|------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class FlushRealTimeService extends FlushCommitLogService { @Override public void run() { while (!this.isStopped()) { this.waitForRunning(10); this.doFlush(); } } private void doFlush() { // 省略部分代码... mappedFileQueue.flush(0); } }
|
FlushRealTimeService
是 RocketMQ 中用于异步刷盘的核心类,它通过后台线程定时将内存中的数据刷入磁盘,以保证数据的持久化。其主要逻辑包括:
- 定时检查是否需要刷盘;
- 根据配置参数执行刷盘操作;
- 更新刷盘时间戳;
- 等待下一个刷盘周期;
异步刷盘的触发条件 {#异步刷盘的触发条件}
异步刷盘主要在以下几种情况下被触发:
-
消息写入到 CommitLog 后:当一条消息被写入到 CommitLog 文件后,如果配置为异步刷盘,RocketMQ 会唤醒异步刷盘服务,让其尽快将数据刷入磁盘。
-
定时触发:RocketMQ 的异步刷盘服务通常会在一个固定的时间间隔内被定时触发,以确保数据在一定的时间内被刷入磁盘。
-
达到某个阈值:当内存中的数据量达到某个阈值时,也会触发异步刷盘。这种阈值通常是通过配置来控制的,例如内存缓冲区的大小。
总结 {#总结}
RocketMQ 的刷盘机制通过同步刷盘和异步刷盘两种方式,确保消息在高性能和高可靠性之间找到平衡。理解其刷盘机制及源码实现,对于优化和调试RocketMQ系统具有重要意义。因此,还是建议有时间读读 RocketMQ的源码。