网站链接: element-ui dtcms
当前位置: 首页 > 技术博文  > 技术博文

顺藤摸瓜RocketMQ之刷盘机制debug解析

2021/6/3 20:04:12 人评论

文章目录Rocketmq 刷盘机制三个文件indexFileconsumeQueuecommitlog异步刷盘consumerqueue和indexfile文件是什么时候更新的同步刷盘Rocketmq 刷盘机制 笔者这里分析的是4.8版本,这里发现以前的handleflushdisk 以及handleha这两个方法在4.8中已经不会使用到了&…

文章目录

    • Rocketmq 刷盘机制
      • 三个文件
        • indexFile
        • consumeQueue
        • commitlog
      • 异步刷盘
      • consumerqueue和indexfile文件是什么时候更新的
      • 同步刷盘

Rocketmq 刷盘机制

笔者这里分析的是4.8版本,这里发现以前的handleflushdisk 以及handleha这两个方法在4.8中已经不会使用到了,所以这里特地对源码进行深入剖析了,读者可以直接跳到异步刷盘流程阅读。

三个文件

在rocketmq里面存在这样三个文件

  • indexfile
  • consumequeue
  • commitlog

其中indexfile和consumequeue可以理解为索引文件,
commitlog才是真正的存放消息的文件

因为rocketmq要保证性能,发送消息落盘的速度,所以,在落盘上选择了顺序写,这样消息在文件上的顺序就是乱序的,所以就需要维护一个indexfile索引文件,以及消费队列的一个索引

这里先介绍三个文件的api,然后接着分析刷盘流程:所以读者可以先看下面的刷盘流程,然后在回头看三个文件的实现原理,文章篇幅较大

indexFile

indexfile文件存储在store目录下的index文件里面,里面存放的是消息的hashcode和index内容,
在这里插入图片描述

我们来看看indexfile里面都存储的什么东西:

文件由一个文件头组成:长40字节
500w个hashslot,每个4字节
2000w个index条目,每个20字节

所以这里我们可以估算每个indexfile的大小为:40+500w4+2000w20个字节,大约400M左右
在这里插入图片描述

indexfileheader由如下组成:

在这里插入图片描述

那消息的索引是怎么put到这个文件里面的呢?以及是如何获取的呢?

  • 这边我们启动一下环境,然后发送一下消息,就可以进入到该方法

来到putkey方法,this.indexHeader.getIndexCount()是indeshead里面最后4个字节(上图),指index的数量,如果数量小于200w说明没有超过最大的。

然后计算出hash,
slotPos是要存放的hash槽的位置,
absSlotPos是要存放hash槽的具体偏移量(绝对位置)
在这里插入图片描述

接着获取到对应absSlotPos绝对位置的int值,
timeDiff是当前时间与BeginTimestamp的时间差,所以这里4个字节肯定够了

在这里插入图片描述
计算出index的绝对位置,也就是获取index最大,然后顺序往后写
在这里插入图片描述

然后就开始往mappedByteBuffer里面写数据了,主要用到了putInt,putLong方法,写完之后,更新indexHead的信息,

可以看到this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());这段是将index的位置存在了hash槽里面,也就是说,查找一个索引的话是先找到hash槽里面的index位置,然后计算出index的具体偏移量来查找的

如果没有hash冲突,将hashslot++
将indexcount++

这边笔者提出一个问题?为什么要存timeDIff时间?存时间为什么不存绝对时间?

在这里插入图片描述
indexfile怎么解决hash冲突?

如下两段代码:

如果拿到的slotvalue不是0的话,说明存在hash冲突,
然后接下来的this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue)会将该值存入到index里面,也就是index的最后四个字节,也就是说,如果我们拿到index的值的话,发现index的最后四位不是0,那么就继续取出来,也就是说,这里index中相同hash是相连的,所以每个index就有存储hash的必要了,如果hash冲突了,根据最后四位拿出来,还是需要比对一下hash的。
在这里插入图片描述
在这里插入图片描述

接着我们看,是如何从indexfile中查找到消息的offset的?

这边启动执行org.apache.rocketmq.test.client.producer.querymsg.QueryMsgByKeyIT#testQueryMsg这个测试方法,就可以进入debug

来到org.apache.rocketmq.store.index.IndexFile#selectPhyOffset方法:

先将mappedfile锁住,然后根据key计算hash,
拿到slotpos,
计算出slot绝对位置offset
在这里插入图片描述

这边贴出整个方法,以及执行到最后每个变量的值:在右边一目了然

L214:将nextIndexToRead赋值,nextIndexToRead是下一次读取的index的位置
L215:校验
L219:计算出index的绝对位置absIndexPos
L223-L227:拿到index中的值
L239:比较hash值,然后放入phyOffsets中
L242:判断了如果prevIndexRead值合法且不是当前位置,就说明发生了hash冲突,将nextIndexToRead赋值,接着继续循环

所以这个方法执行完成之后phyOffsets会存放所以hash值和预期相同的一些消息地址

所以indexfile是如何解决hash冲突的?这个问题也就迎刃而解

在这里插入图片描述
以上是indexfile的常用连个api

consumeQueue

consumequeue存放在store文件里面,里面的consumequeue文件里面按照topic排放,然后每个topic默认4个队列,里面存放的consumequeue文件

在这里插入图片描述

consumequeue文件存储的单元如下,固定20个字节,单个consumequeue文件默认包含30w个条目,所以单个文件大小大概6M左右,
在这里插入图片描述

这边我们发送消息进入到org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper 的debug:

进行了一些简单的判断之后来到:org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo
在这里插入图片描述

将消息的信息put到byteBufferIndex里面,

先是8个字节的offset,然后4个字节的size,最后8个字节tag,当然这里的最后8个字节也有可能存放延迟消息的执行时间戳这里不展开

在这里插入图片描述

而执行到最后就是将byteBufferIndex追加到该队列里面,也就是追加到consumequeue里面

在这里插入图片描述

appendMessage实现逻辑:

在这里插入图片描述

commitlog

commitlog文件存储再store目录下的commitlog目录下:
每个默认1G大小,
在这里插入图片描述

如下是commitlog的中每个msg的形状,这是计算msg的长度代码里面的一段可以瞥见,可以很明白的看到了一个msg的结构是什么样的想要具体点可以看看丁威老师的博客里面的这里我把图片抄过来:
在这里插入图片描述

对比上面和下面的图片就很清晰的看到每个消息长什么样:
在这里插入图片描述

接下来我们分析一下org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)这个核心方法,

同样我们是发送一个消息来进入到debug:

先是获取到wroteOffset也就是文件初始位置加上bytebuffer写入位置,这里初始位置是第一个文件所以是0,
然后判断系统ip是不是ipv6地址,决定长度
然后调用createMessageId生成msgid,这里不展开,有兴趣的读者可以点进方法看下,

在这里插入图片描述

接着计算一些数据的长度:
将topic转化为byte数组,供存储
计算msg长度
校验msg的长度是否比最大长度长,以及最后commitlog最后需要8个字节的留白

在这里插入图片描述

接着将各个数据都放到bytebuffer里面,可以看到msg的格式和上面图片是一样的,

在这里插入图片描述

然后,就直接new了一个result里面放的是PUT_OK返回出去了,这里可以看到这里把msg放到bytebuffer里面就直接返回了,也就是说,只是放到内存映射里面,然后下面刷写到磁盘上,就又broker自己去操作了,那么同步刷盘是怎么实现呢?

  • 同步刷盘,在调用完这个方法之后会同步调用一下force方法

在这里插入图片描述

异步刷盘

Rocketmq的存储是与读写是机遇NIIO的内存映射机制的,就如上面分析的一样,数据是先都put到一个bytebuffer里面,然后根据配置的刷盘策略在不同时间进行刷盘的,

运行一下namesrv和broker,然后发送消息:进入到debug

消息发送之后在之前分析的一个顺藤摸瓜RocketMQ之消息发送debug解析,里面提到了往remoting模块发送了一个sendmsg的一个code的request,而这个code对应的processer在broker里面是SendMessageProcessor这个类,我们跟进这个类:

因为这边默认的是一步刷盘,所以这里走的是asyncProcessRequest这个方法,
在这里插入图片描述

跟进去判断了一下是否是批量发送消息,然后调用了asyncSendMessage方法:

在这里插入图片描述

继续跟进去发现这边校验了一下code,然后将body序列化了,拿到queueid,设置了一些topic,queue信息到msginner里面,这个msginner对象就是消息对象,最终是要store里面存储的。

在这里插入图片描述

记下来设置了一些时间信息,以及主机信息,判断了是否是事务消息,然后调用了this.brokerController.getMessageStore().asyncPutMessage(msgInner),也就是拿到MessageStore对象然后用该对象里面的commitlog对象来进行存储落盘

在这里插入图片描述

继续跟进,发现msg的一些信息都已经设置好了,接着调用commitlog进行落盘,而

在这里插入图片描述
来到org.apache.rocketmq.store.CommitLog#asyncPutMessage这个方法:

设置了存储时间和crc,对事务进行了判断

在这里插入图片描述

接着拿到锁,有设置了存储时间,最后调用了mappedFile.appendMessage方法,

在这里插入图片描述

一直来到这个方法org.apache.rocketmq.store.MappedFile#appendMessagesInner

在该方法里面校验了写位置是否大于最大长度

然后调用了cb.doAppend,而这个方法我们在上面说过的org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)这个方法,然后进行返回,

到这里我们还没有发现其他两个文件是什么时候更新的呢?comsumequeue和indexfile

在这里插入图片描述

接下来就将两个异步任务给返回出去:

一个刷盘任务,一个ha任务

在这里插入图片描述

因为是异步刷盘,所以这里的submitFlushRequest方法里面走下面这个分支:

直接唤醒刷盘线程,然后返回一个completed的future,并且里面的状态是put_ok,然后就回到我们一开始的发送成功的页面了。

在这里插入图片描述

consumerqueue和indexfile文件是什么时候更新的

在我们的broker启动的时候在这个方法里面org.apache.rocketmq.broker.BrokerController#start
在这里插入图片描述

跟进到方法里面,启动了this.reputMessageService.start();这个线程,

在这里插入图片描述

org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run我们直接来到这个线程的run方法:

一个死循环在执行doreput方法:

在这里插入图片描述

在方法里面:执行了DefaultMessageStore.this.doDispatch(dispatchRequest)这个方法 ,继续更近
在这里插入图片描述

是从一个集合里面拿到了CommitLogDispatcher的实现类,然后挨个执行一下:

在这里插入图片描述

然后我们发现:
他的实现类就包含一个构建consumerqueue的和一个构建index的,那我们继续发送消息进入到debug:

在这里插入图片描述

果然这里面存放的三个dispatcher就是其中两个就是我们需要处理的文件的实现类:

在这里插入图片描述

我们来到consumerqueue实现类,发现这里执行了DefaultMessageStore.this.putMessagePositionInfo(request),而再跟进方法里面发现就是调用了putMessagePositionInfoWrapper来将consumerqueue文件写入到磁盘

在这里插入图片描述

同样我们debug到另一个index的实现类

在这里插入图片描述

跟进方法:

执行了putkey方法

在这里插入图片描述

而这个方法正是调用的是idexfile中的putkey方法:这个我们上面已经分析过:

在这里插入图片描述

所以在broker启动的时候就是会执行一个死循环的线程来将consumerqueue和indexfile进行刷盘。

同步刷盘

同步刷盘本质上是对异步的一种等待:

上面异步刷盘提到了submitFlushRequest这个方法,
可以看到在这个方法里面构建了一个groupcommitrequeest,提交给GroupCommitService执行:

然后在GroupCommitService里面是有个轮训看判断该request中的offset是否比commitlog的最大值小,如果小就将这个任务完成。

这里的思想和consumer的消费逻辑类似,都是发送一个request给service去轮询看是否完成。

值的一提的是这里并没有调用get进行阻塞,所以这里也就直接将future返回出去了。这里不知道会不会有问题?

理论上这里应该阻塞一下,然后等待刷盘完成,然后将future返回才是同步刷盘的逻辑。
在这里插入图片描述

相关资讯

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?