- RocketMQ技术内幕:RocketMQ架构设计与实现原理(第2版)
- 丁威 张登 周继锋
- 4583字
- 2022-08-25 16:04:36
4.1 存储概要设计
RocketMQ存储的文件主要包括CommitLog文件、ConsumeQueue文件、Index文件。RocketMQ将所有主题的消息存储在同一个文件中,确保消息发送时按顺序写文件,尽最大的能力确保消息发送的高性能与高吞吐量。因为消息中间件一般是基于消息主题的订阅机制,所以给按照消息主题检索消息带来了极大的不便。为了提高消息消费的效率,RocketMQ引入了ConsumeQueue消息消费队列文件,每个消息主题包含多个消息消费队列,每一个消息队列有一个消息文件。Index索引文件的设计理念是为了加速消息的检索性能,根据消息的属性从CommitLog文件中快速检索消息。
RocketMQ是一款高性能的消息中间件,存储部分的设计是重点,存储的核心是I/O访问性能,本章也会重点剖析RocketMQ是如何提高I/O访问性能的。我们先看一下RocketMQ数据流向,如图4-1所示。
图4-1 RocketMQ消息存储设计原理图
1)CommitLog:消息存储,所有消息主题的消息都存储在CommitLog文件中。
2)ConsumeQueue:消息消费队列,消息到达CommitLog文件后,将异步转发到ConsumeQuene文件中,供消息消费者消费。
3)Index:消息索引,主要存储消息key与offset的对应关系。
4.1.1 RocketMQ存储文件的组织方式
RocketMQ在消息写入过程中追求极致的磁盘顺序写,所有主题的消息全部写入一个文件,即CommitLog文件。所有消息按抵达顺序依次追加到CommitLog文件中,消息一旦写入,不支持修改。CommitLog文件的布局如图4-2所示。
基于文件编程与基于内存编程一个很大的不同是基于内存编程时我们有现成的数据结构,例如List、HashMap,对数据的读写非常方便,那么一条一条消息存入CommitLog文件后,该如何查找呢?
图4-2 CommitLog文件布局
正如关系型数据库会为每条数据引入一个ID字段,基于文件编程也会为每条消息引入一个身份标志:消息物理偏移量,即消息存储在文件的起始位置。
正是有了物理偏移量的概念,CommitLog文件的命名方式也是极具技巧性,使用存储在该文件的第一条消息在整个CommitLog文件组中的偏移量来命名,例如第一个CommitLog文件为0000000000000000000,第二个CommitLog文件为00000000001073741824,依次类推。
这样做的好处是给出任意一个消息的物理偏移量,可以通过二分法进行查找,快速定位这个文件的位置,然后用消息物理偏移量减去所在文件的名称,得到的差值就是在该文件中的绝对地址。
CommitlLog文件的设计理念是追求极致的消息写,但我们知道消息消费模型是基于主题订阅机制的,即一个消费组是消费特定主题的消息。根据主题从CommitlLog文件中检索消息,这绝不是一个好主意,这样只能从文件的第一条消息逐条检索,其性能可想而知,为了解决基于topic的消息检索问题,RocketMQ引入了ConsumeQueue文件,ConsumeQueue文件的结构如图4-3所示。
图4-3 ConsumeQueue文件结构
ConsumeQueue文件是消息消费队列文件,是CommitLog文件基于topic的索引文件,主要用于消费者根据topic消费消息,其组织方式为/topic/queue,同一个队列中存在多个消息文件。ConsumeQueue的设计极具技巧,每个条目长度固定(8字节CommitLog物理偏移量、4字节消息长度、8字节tag哈希码)。这里不是存储tag的原始字符串,而是存储哈希码,目的是确保每个条目的长度固定,可以使用访问类似数组下标的方式快速定位条目,极大地提高了ConsumeQueue文件的读取性能。消息消费者根据topic、消息消费进度(ConsumeQueue逻辑偏移量),即第几个ConsumeQueue条目,这样的消费进度去访问消息,通过逻辑偏移量logicOffset×20,即可找到该条目的起始偏移量(ConsumeQueue文件中的偏移量),然后读取该偏移量后20个字节即可得到一个条目,无须遍历ConsumeQueue文件。
RocketMQ与Kafka相比具有一个强大的优势,就是支持按消息属性检索消息,引入ConsumeQueue文件解决了基于topic查找消息的问题,但如果想基于消息的某一个属性进行查找,ConsumeQueue文件就无能为力了。故RocketMQ又引入了Index索引文件,实现基于文件的哈希索引。Index文件的存储结构如图4-4所示。
图4-4 Index文件存储结构
Index文件基于物理磁盘文件实现哈希索引。Index文件由40字节的文件头、500万个哈希槽、2000万个Index条目组成,每个哈希槽4字节、每个Index条目含有20个字节,分别为4字节索引key的哈希码、8字节消息物理偏移量、4字节时间戳、4字节的前一个Index条目(哈希冲突的链表结构)。
ConsumeQueue、Index文件都是根据CommitLog文件异步转发的,其转发实现如图4-5所示。
图4-5 消息消费运行逻辑
下面详细介绍关键逻辑。
1. CommitLog
消息主体以及元数据的存储主体,存储消息生产端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1GB,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824。第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
2. Dispatch操作
短轮询:longPollingEnable=false,第一次未拉取到消息后等待shortPollingTimeMills时间后再试。shortPollingTimeMills默认为1s。
长轮询:longPollingEnable=true,以消费者端设置的挂起超时时间为依据,受Default MQPullConsumer的brokerSuspendMaxTimeMillis控制,默认20s,长轮询有两个线程来相互实现。PullRequestHoldService默认每隔5s重试一次。DefaultMessageStore#ReputMessageService方法在每当有消息到达后,转发消息,然后调用PullRequestHoldService线程中的拉取任务,尝试拉取,每处理一次,线程休眠1ms,继续下一次检查。
3. ConsumerQueue
消息消费队列引入的目的是提高消息消费的性能,由于RocketMQ是基于topic的订阅模式,消息消费是针对topic进行的,如果要根据topic遍历CommitLog文件中的消息是非常低效的。消息消费端可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue作为消费消息的索引,保存了指定topic下的队列消息在CommitLog中的起始物理偏移量,消息大小和消息tag的哈希码。ConsumeQueue文件可以看作基于topic的CommitLog索引文件,故ConsumeQueue文件夹的组织方式为topic/queue/file三层组织结构,具体存储路径为$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样,ConsumeQueue文件采取定长设计,每一个条目20字节,分别为8字节的CommitLog物理偏移量、4字节的消息长度、8字节tag哈希码,单个文件由30万个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72MB。
4. Consumer
先从rebalanceImpl实例的本地缓存变量topicSubscribeInfoTable中,获取该topic主题下的消息消费队列集合mqSet。
然后以topic和consumerGroup为参数调用mQClientFactory.findConsumerIdList()方法向Broker端发送获取该消费组下消费者ID列表的RPC通信请求(Broker端基于前面消息消费端上报的心跳包数据构建的consumerTable做出响应返回,业务请求码为GET_CONSUMER_LIST_BY_GROUP)。
接着对topic下的消息消费队列、消费者ID进行排序,然后用消息队列分配策略算法(默认为消息队列的平均分配算法),计算待拉取的消息队列。这里的平均分配算法类似于分页算法,求出每一页需要包含的平均大小和每个页面记录的范围,遍历整个范围,计算当前消息消费端应该分配到的记录(这里即为MessageQueue)。
最后调用updateProcessQueueTableInRebalance()方法,具体的做法是先将分配到的消息队列集合与processQueueTable做一个过滤比对,为过滤后的消息队列集合中的每个MessageQueue创建一个ProcessQueue对象并存入RebalanceImpl的processQueueTable队列中,其中调用RebalanceImpl实例的computePullFromWhere(MessageQueue mq)方法获取该MessageQueue对象的下一个进度消费值offset,随后填充至接下来要创建的pullRequest对象属性中。创建拉取请求对象pullRequest添加到拉取列表pullRequestList中,最后执行dispatchPullRequest()方法,将PullRequest依次放入PullMessageService服务线程的阻塞队列pullRequestQueue中,待服务线程取出后向Broker端发起拉取消息的请求。
4.1.2 内存映射
虽然基于磁盘的顺序写消息可以极大提高I/O的写效率,但如果基于文件的存储采用常规的Java文件操作API,例如FileOutputStream等,将性能提升会很有限,故RocketMQ又引入了内存映射,将磁盘文件映射到内存中,以操作内存的方式操作磁盘,将性能又提升了一个档次。
在Java中可通过FileChannel的map方法创建内存映射文件。在Linux服务器中由该方法创建的文件使用的就是操作系统的页缓存(pagecache)。Linux操作系统中内存使用策略时会尽可能地利用机器的物理内存,并常驻内存中,即页缓存。在操作系统的内存不够的情况下,采用缓存置换算法,例如LRU将不常用的页缓存回收,即操作系统会自动管理这部分内存。
如果RocketMQ Broker进程异常退出,存储在页缓存中的数据并不会丢失,操作系统会定时将页缓存中的数据持久化到磁盘,实现数据安全可靠。不过如果是机器断电等异常情况,存储在页缓存中的数据也有可能丢失。
4.1.3 灵活多变的刷盘策略
有了顺序写和内存映射的加持,RocketMQ的写入性能得到了极大的保证,但凡事都有利弊,引入了内存映射和页缓存机制,消息会先写入页缓存,此时消息并没有真正持久化到磁盘。那么Broker收到客户端的消息后,是存储到页缓存中就直接返回成功,还是要持久化到磁盘中才返回成功呢?
这是一个“艰难”的选择,是在性能与消息可靠性方面进行权衡。为此,RocketMQ提供了两种策略:同步刷盘、异步刷盘。
1. 同步刷盘
同步刷盘在RocketMQ的实现中称作组提交,其设计理念如图4-6所示。
图4-6 同步刷盘流程
2. 异步刷盘
同步刷盘的优点是能保证消息不丢失,即向客户端返回成功就代表这条消息已被持久化到磁盘,但这是以牺牲写入性能为代价的,不过因为RocketMQ的消息是先写入pagecache,所以消息丢失的可能性较小,如果能容忍一定概率的消息丢失或者在丢失后能够低成本的快速重推,可以考虑使用异步刷盘策略。
异步刷盘指的是broker将消息存储到pagecache后就立即返回成功,然后开启一个异步线程定时执行FileChannel的force方法,将内存中的数据定时写入磁盘,默认间隔时间为500ms。
4.1.4 transientStorePoolEnable机制
RocketMQ为了降低pagecache的使用压力,引入了transientStorePoolEnable机制,即内存级别的读写分离机制。
默认情况下,RocketMQ将消息写入pagecache,消息消费时从pagecache中读取,这样在高并发时pagecache的压力会比较大,容易出现瞬时broker busy的异常。RocketMQ通过transientStorePoolEnable机制,将消息先写入堆外内存并立即返回,然后异步将堆外内存中的数据提交到pagecache,再异步刷盘到磁盘中。因为堆外内存中的数据并未提交,所以认为是不可信的数据,消息在消费时不会从堆外内存中读取,而是从pagecache中读取,这样就形成了内存级别的读写分离,即写入消息时主要面对堆外内存,而读取消息时主要面对pagecache。
该机制使消息直接写入堆外内存,然后异步写入pagecache,相比每条消息追加直接写入pagechae,最大的优势是实现了批量化消息写入。
该机制的缺点是如果由于某些意外操作导致broker进程异常退出,已经放入pagecache的数据不会丢失,而存储在堆外内存的数据会丢失。
4.1.5 文件恢复机制
我们知道,RocketMQ主要的数据存储文件包括CommitLog、ConsumeQueue和Index,而ConsumeQueue、Index文件是根据CommitLog文件异步构建的。既然是异步操作,这两者之间的数据就不可能始终保持一致,那么,重启broker时需要如何恢复数据呢?我们考虑如下异常场景。
1)消息采用同步刷盘方式写入CommitLog文件,准备转发给ConsumeQueue文件时由于断电等异常,导致存储失败。
2)在刷盘的时候,突然记录了100MB消息,准备将这100MB消息写入磁盘,由于机器突然断电,只写入50MB消息到CommitLog文件。
3)在RocketMQ存储目录下有一个检查点(Checkpoint)文件,用于记录CommitLog等文件的刷盘点。但将数据写入CommitLog文件后才会将刷盘点记录到检查点文件中,有可能在从刷盘点写入检查点文件前数据就丢失了。
在RocketMQ中有broker异常停止恢复和正常停止恢复两种场景。这两种场景的区别是定位从哪个文件开始恢复的逻辑不一样,大致思路如下。
1)尝试恢复ConsumeQueue文件,根据文件的存储格式(8字节物理偏移量、4字节长度、8字节tag哈希码),找到最后一条完整的消息格式所对应的物理偏移量,用maxPhysical OfConsumequeue表示。
2)尝试恢复CommitLog文件,先通过文件的魔数判断该文件是否为ComitLog文件,然后按照消息的存储格式寻找最后一条合格的消息,拿到其物理偏移量,如果CommitLog文件的有效偏移量小于ConsumeQueue文件存储的最大物理偏移量,将会删除ConsumeQueue中多余的内容,如果大于,说明ConsuemQueue文件存储的内容少于CommitLog文件,则会重推数据。
那么如何定位要恢复的文件呢?
正常停止刷盘的情况下,先从倒数第三个文件开始进行恢复,然后按照消息的存储格式进行查找,如果该文件中所有的消息都符合消息存储格式,则继续查找下一个文件,直到找到最后一条消息所在的位置。
异常停止刷盘的情况下,RocketMQ会借助检查点文件,即存储的刷盘点,定位恢复的文件。刷盘点记录的是CommitLog、ConsuemQueue、Index文件最后的刷盘时间戳,但并不是只认为该时间戳之前的消息是有效的,超过这个时间戳之后的消息就是不可靠的。
异常停止刷盘时,从最后一个文件开始寻找,在寻找时读取该文件第一条消息的存储时间,如果这个存储时间小于检查点文件中的刷盘时间,就可以从这个文件开始恢复,如果这个文件中第一条消息的存储时间大于刷盘点,说明不能从这个文件开始恢复,需要寻找上一个文件,因为检查点文件中的刷盘点代表的是100%可靠的消息。