1. 首页
  2. rocketmq源码分析

017-十七、RocketMQ源码分析文件清除机制

作者:唯有坚持不懈 | 出处:https://blog.csdn.net/prestigeding/article/details/78888290

由于 RocketMQ 操作 CommitLogConsumeQueue 文件,都是基于内存映射方法并在启动的时候,会加载 commitlogConsumeQueue 目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要一种机制来删除已过期的文件。

RocketMQ 顺序写 CommitlogConsumeQueue 文件,所有写操作全部落在最后一个 CommitLogConsumeQueue 文件上,之前的文件在下一个文件创建后,将不会再被更新。

RocketMQ 清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ 不会管这个这个文件上的消息是否被全部消费。默认每个文件的过期时间为72小时。通过在 Broker 配置文件中设置 fileReservedTime 来改变过期时间,单位为小时。接下来详细分析 RocketMQ 是如何设计与实现上述机制的。

DefaultMessageStore\#addScheduleTask:


this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

RocketMQ 会每隔10s调度一次 cleanFilesPeriodically,已检测是否需要清除过期文件。执行频率可以通过设置 cleanResourceInterval,默认为10s。

DefaultMessageStore\#cleanFilesPeriodically


private void cleanFilesPeriodically() { this.cleanCommitLogService.run(); this.cleanConsumeQueueService.run(); }

主要清除 CommitLogConsumeQueue 的过期文件。CommitLogConsumeQueue 对于过期文件的删除算法、逻辑大同小异,本文将以 CommitLog 过期文件为例来详细分析其实现原理。

DefaultMessageStore$CleanCommitLogService\#run


public void run() { try { this.deleteExpiredFiles(); this.redeleteHangedFile(); } catch (Throwable e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } }

整个执行过程分为两个大的步骤,第一个步骤:尝试删除过期文件;第二个步骤:重试删除被 hange (由于被其他线程引用在第一阶段未删除的文件),在这里再重试一次。

DefaultMessageStore$CleanCommitLogService\#deleteExpiredFiles


long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

Step1:解释一下这个三个配置属性的含义。


* fileReservedTime:文件保留时间,也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以被删除。 * deletePhysicFilesInterval:删除物理文件的间隔,因为在一次清除过程中,可能需要删除的文件不止一个,该值指定两次删除文件的间隔时间。 * destroyMapedFileIntervalForcibly:在清除过期文件时,如果该文件被其他线程所占用(引用次数大于0,比如读取消息),此时会阻止此次删除任务,同时在第一次试图删除该文件时记录当前时间戳,destroyMapedFileIntervalForcibly表示第一次拒绝删除之后能保留的最大时间,在此时间内,同样可以被拒绝删除,同时会将引用减少1000个,超过该时间间隔后,文件将被强制删除。

DefaultMessageStore$CleanCommitLogService#deleteExpiredFiles:


boolean timeup = this.isTimeToDelete(); boolean spacefull = this.isSpaceToDelete(); boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; if (timeup || spacefull || manualDelete) { //继续执行删除逻辑 return; } else { // 本次删除任务无作为。 }

Step2:RocketMQ 在如下三种情况任意满足之一的情况下将继续执行删除文件操作。


* 到了删除文件的时间点,RocketMQ通过deleteWhen设置一天的固定时间执行一次删除过期文件操作,默认为凌晨4点。 * 判断磁盘空间是否充足,如果不充足,则返回true,表示应该触发过期文件删除操作。 * 预留,手工触发,可以通过调用excuteDeleteFilesManualy方法手工触发过期文件删除,目前RocketMQ暂未封装手工触发文件删除的命令。

重点分析一下磁盘不足的判断依据。

DefaultMessageStore$CleanCommitLogService\#isSpaceToDelete


double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; // @1 cleanImmediately = false; { String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); // @2 if (physicRatio > diskSpaceWarningLevelRatio) { // @3 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); } cleanImmediately = true; } else if (physicRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); } } if (physicRatio < 0 || physicRatio > ratio) { DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); return true; } }

代码@1:获取 maxUsedSpaceRatio,表示 commitlogconsumequeue 文件所在磁盘分区的最大使用量,如果超过该值,则需要立即清除过期文件。

代码@2:通过File\#getTotalSpace()获取 commitlog 所在磁盘分区总的存储容量,通过File\#getFreeSpace()获取 commitlog 目录所在磁盘文件剩余容量并得出当前该分区的物理磁盘使用率 physicRatio

代码@3:RocketMQ另外提供了两个与磁盘空间使用率相关的系统级参数:

  • -Drocketmq.broker.diskSpaceWarningLevelRatio=0.90:如果磁盘分区使用率超过该阔值,将设置磁盘不可写,此时会拒绝新消息的写入。
  • -Drocketmq.broker.diskSpaceCleanForciblyRatio=0.85:如果磁盘分区使用超过该阔值,建议立即执行过期文件清除,但不会拒绝新消息的写入。

判断磁盘是否可用,用当前已使用物理磁盘率 maxUsedSpaceRatiodiskSpaceWarningLevelRatiodiskSpaceCleanForciblyRatio,如果当前磁盘使用率达到上述阔值,将返回 true 表示磁盘已满,需要进行过期文件删除操作。

Step3:然后根据文件的最后一次更新时间与当前时间做比较,判断是否过期,如果已过期,调用MappedFile的destory。
MappedFile#shutdown


public void shutdown(final long intervalForcibly) { if (this.available) { this.available = false; this.firstShutdownTimestamp = System.currentTimeMillis(); this.release(); } else if (this.getRefCount() > 0) { if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) { this.refCount.set(-1000 - this.getRefCount()); this.release(); } } }

如果 availabletrue,表示第一次执行 shutdown 方法,首先设置 availablefalse,并记录 firstShutdownTimestamp 时间戳,如果当前该文件被其他线程引用,则本次不强制删除,如果没有其他线程在使用该文件,则清除 MappedFile 相关资源,并最终执行File\#delete()方法清除文件。在拒绝被删除保护期内(destroyMapedFileIntervalForcibly)每执行一次清理任务,将引用次数减去1000,引用数小于1后,该文件最终将被删除。

关于 ConsumeQueue 的过期文件删除机制与 Commitlog 文件机制类似,本文就不重复讲解。

本文重点是理解如下参数的含义:fileReservedTime、deletePhysicFilesInterval、destroyMapedFileIntervalForcibly、-Drocketmq.broker.diskSpaceWarningLevelRatio
\-Drocketmq.broker.diskSpaceCleanForciblyRatio
与获取磁盘分区总容量与剩余容量的方法。


备注:本文是《RocketMQ技术内幕》的前期素材,建议关注笔者的书籍:《RocketMQ技术内幕》。

写完了如果写得有什么问题,希望读者能够给小编留言,也可以点击[此处扫下面二维码关注微信公众号](https://www.ycbbs.vip/?p=28 "此处扫下面二维码关注微信公众号")

看完两件小事

如果你觉得这篇文章对你挺有启发,我想请你帮我两个小忙:

  1. 关注我们的 GitHub 博客,让我们成为长期关系
  2. 把这篇文章分享给你的朋友 / 交流群,让更多的人看到,一起进步,一起成长!
  3. 关注公众号 「方志朋」,公众号后台回复「666」 免费领取我精心整理的进阶资源教程
  4. JS中文网,Javascriptc中文网是中国领先的新一代开发者社区和专业的技术媒体,一个帮助开发者成长的社区,是给开发者用的 Hacker News,技术文章由为你筛选出最优质的干货,其中包括:Android、iOS、前端、后端等方面的内容。目前已经覆盖和服务了超过 300 万开发者,你每天都可以在这里找到技术世界的头条内容。

    本文著作权归作者所有,如若转载,请注明出处

    转载请注明:文章转载自「 Java极客技术学习 」https://www.javajike.com

    标题:017-十七、RocketMQ源码分析文件清除机制

    链接:https://www.javajike.com/article/1693.html

« 018-十八、源码研究RocketMQ主从同步机制(HA)
016-十六、RocketMQ源码分析顺序消息消费实现原理»

相关推荐

QR code