RocketMQ源码解析-开篇

RocketMQ源码解析-开篇

Scroll Down

前言

咳咳..... RocketMQ 源码解析 第一篇 🎬 Action!!!

其实嘛在第一次使用 RocketMQ 后就有对源码研究的想法,并不是说它相比与其他 MQ 有非常独特的优势,而是肥壕觉得有这么简单的几个理由吧,还是很值得我们研读一番滴~

  • 基于 Java 栈的中间件

    作为阿里系的开源产品,自然大多数都是 Java 为主。所以对于 Java 开发者来说是阅读起来还是比较相对轻松的。当然现在的也支持 C/C++, Python, Go多个语言版本

  • 有助于提升个人代码的风格和技巧

    枯燥冗长的代码写多了,编程风格和技巧难免会有瓶颈,通过阅读源码可以学习到更多深层次的技术点和优秀的设计模式。

以后简历上可以放心写上 “精通 RocketMQ”, 无论面试官问题如何刁钻再也不怕心虚了😎😎😎 ~ 哈哈哈

正文

当然,如果开始就一头扎进撸源码,估计看一个月也是头昏脑涨,也没看出个所以然。所以这也是肥壕为什么会先写这一篇源码解析的开篇章🤨。

让我们先来瞧瞧 RocketMQ 整体的架构设计

分别介绍一下上面的几个主要概念

Producer

消息生产者,负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

Consumer

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费

NameServer

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个 Namesrver 实例组成集群,但相互独立,没有信息交换

BrokerServer

消息中转角色,负责存储消息、转发消息。 代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Message

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ 中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message IDKey查询消息的功能。

Topic

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位

Tag

为消息设置的标志,用于同一主题下区分不同类型的消息。可以理解为 Topic 的二级分类。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。

消费模式

消息消费模式有两种:Clustering(集群消费)和 Broadcasting(广播消费)

默认是 Clustering 模式,该模式下同一个消费者集群中订阅一个主题,一个消息只会被一个消费者消费。

Broadcasting 模式消息会发给消费者组中的每一个消费者进行消费。

消费方式

Consumer 端有两种消费形式:Pull(拉取式消费)、Push(推动式消费)

拉取式消费:主动调用 Consumer 的拉消息方法从 Broker 服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

推动式消费:模式下 Broker 收到数据后会主动推送给消费端,该消费模式一般实时性较高。(其实 Push 模式只是对 Pull 模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息)

消息类型

消息类型有两种:Normal Message(普通消息)Ordered Message(顺序消费)

普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

消息通信方式

RocketMQ 消息队列中支持通信的方式主要有三种:

  • 同步(sync)
  • 异步(async)
  • 单向(oneway)

其中“单向”通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。

上面一顿猛如虎的专业术语介绍,不知道大家看完后会不会有点懵,反正肥壕现在已经是一脸懵逼了😰


闲话少说,talk is cheap,show me the code.

Github 地址:https://github.com/apache/rocketmq

输入 git clone ..... 然后就可以去喝杯茶,升个懒腰

源码下载好了,我们先来看一下工程的目录结构 👇

介绍一下几个核心的模块

  • namesrv:命名发现服务,broker 服务的管理与路由
  • broker:核心组件,接收 producer发送的消息和消息的存储与consumer 的消息消费
  • client:客户端实现,producerconsumer的实现模块
  • store:存储层实现,消息持久化、索引服务、高可用 HA 服务实现
  • remoting:通信层实现,基于 Netty 的底层封装,服务间的交互通讯都依赖此模块
  • filter:消息过滤服务,相当于在brokerconsumer中间加入了一个 filter 代理
  • common:模块间通用的功能类、方法、配置文件、常量等
  • tools:命令管理工具,提供了消息查询、topic 管理等功能
  • example:官方提供的例子,对典型的功能比如 order message,push consumer,pull consumer 的用法进行了示范

PS:上面几个核心的模块,后面呢肥壕会分多个章节从源码深入详细讲解的啦💁‍♂️


既然是开篇嘛,那我们总得把 RocketMQ 的各个部分都得了解个7788啦~

了解了上面 RocketMQ 的架构设计之后,我们再来看一下各角色之间具体的交互流程吧

Emmmm...上面这个图可能稍微有点复杂,但是肥壕相信智商在线的你们应该能够看个大概哈[狗头]

我们重点了解这三个概念就好了

  1. CommitLog消息存储的主体结构,简单来说就是存储Producer发送的消息。要知道所有的消息都是需要落盘的,所以这些消息都是要写入文件。每个文件默认1G(为什么默认为1G,大家可以想一下) ,文件满了写入下一个文件。
  2. ConsumeQueue消息消费队列,可以理解为基于 Topic 的 commitlog 索引文件。Topic 上的 Queue 与 消费者的 ConsumeQueue 一一对应,比如 Topic 有 1,2,3,4个队列,消费者A分配到1,2两个队列(此处涉及消费者负载均衡),那么消费者A的ConsumerQueue就是对应 Topic1,2 的两个queue。引入ConsumeQueue 主要是为了提高消息消费的性能,它存储了起始物理偏移量 offset消息大小 size消息 Tag 的 HashCode 值
  3. IndexFile索引文件,简单说就是 commitLog 的索引集合文件。固定的单个 IndexFile 文件大小约为400M,一个IndexFile 可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现 HashMap 结构,故rocketmq 的索引文件其底层实现为 hash 索引。

页缓存与内存映射

PageCache(页缓存)文件系统缓存,加速文件的读写速度。我们都知道磁盘 IO 和内存 IO 的速度可是相差了好几个数量级。加入页缓存的目的就是为了使程序对文件进行顺序读写的速度接近于内存的读写速度。所以简单来说,对于数据的写入,OS 会先写入至 Cache 内,随后通过异步的方式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中 PageCache 的情况,OS 从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。

MMAP(内存映射):将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址进程虚拟地址空间中一段虚拟地址的一一对映关系。简单来说,就是实现磁盘文件虚拟内存的直接传输,减少了内核态到用户态的数据拷贝

另外这里给大家说明白的一点是,这个 mmap 技术在进行文件映射的时候,一般有大小限制,在 1.5GB~2GB 之间。所以 RocketMQ 才让CommitLog单个文件在1GB,ConsumeQueue文件在5.72MB,不会太大。

PS: 可能有些小伙伴觉得看了之后,对这两个概念还不是非常清楚[狗头]。其实肥壕起初看了之后也是觉得一头雾水,后面查阅了各种资料才了解个所以然。有关 PageCacheMMAP这一块的知识,后面肥壕会专门整理一篇分享文章滴😁😁😁

消息刷盘

  1. 同步刷盘:如上图所示,只有在消息真正持久化至磁盘后 RocketMQ 的 Broker 端才会真正返回给 Producer 端一个成功的 ACK 响应。同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
  2. 异步刷盘:能够充分利用 OS 的 PageCache 的优势,只要消息写入 PageCache 即可将成功的 ACK 返回给 Producer 端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量。

消息重复

消息领域有一个对消息投递的QoS定义,分为:

  • 至少一次(At least once)
  • 最多一次(At most once)
  • 仅一次( Exactly once)

QoS:Quality of Service,服务质量

几乎所有的MQ产品都声称自己做到了At least once。

既然是至少一次,那避免不了消息重复,尤其是在分布式网络环境下。

引用 RocketMQ 联合创始人的介绍:

这个缺憾归根结底也可以看做是 TCP 协议的一部分,如失败重传。业务上往往对消息重复又很敏感,RocketMQ目前的版本是不支持去重的,我们通常建议用户通过外置全局存储自己做判重处理。在下一代的特性规划里,我们会内置解决方案。先说下业界通用做法,像Artemis,IronMQ等,通过在服务端全局存储判重。这是一个IO敏感的操作,为服务端带来一定的负载。而RocketMQ则希望通过采取二次判重策略,有效降低服务端IO。

在这方面来说 RocketMQ 相比于 Kafka 来说可能稍微逊色一些,毕竟 Kafka 三个模式都支持。

事务消息

Apache RocketMQ 在 4.3.0 版中已经支持分布式事务消息,这里 RocketMQ 采用了 2PC 的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。

我们重点关注一下

Half Message(半消息)

Producer 把消息发送到 Broker 端时,该消息是不能被 Consumer 消费的,需要 Producer 对消息进行二次确认后,才能被消费。

消息回查

由于网络抖动或者 Producer 重启,导致 Producer 一直没有对 Half Message 进行二次确认Broker 端对未确定状态的消息发起回查,将消息发送到对应的 Producer 端(同一个Group的Producer),由 Producer 根据消息来检查本地事务的状态,进而执行 Commit 或者 Rollback 。值得注意的是,RocketMqQ 并不会无休止的的信息事务状态回查,默认回查15次,如果15次回查还是无法得知事务状态,默认回滚该消息。

回溯消费

回溯消费是指 Consumer 已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker 在向 Consumer 投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer 系统故障,恢复后需要重新消费1小时前的数据,那么 Broker 要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒。

定时消息

定时消息(延迟队列)是指消息发送到 broker 后,不会立即被消费,等待特定时间投递给真正的 topic。 broker 有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义 messageDelayLevel。注意,messageDelayLevel是 broker 的属性,不属于某个 topic。发消息时,设置 delayLevel 等级即可:msg.setDelayLevel(level)。level有以下三种情况:

  • level == 0,消息为非延迟消息
  • 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
  • level > maxLevel,则level== maxLevel,例如level==20,延迟2h

定时消息会暂存在名为 SCHEDULE_TOPIC_XXXX 的 topic 中,并根据 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一个 queue 只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费 SCHEDULE_TOPIC_XXXX,将消息写入真实的 topic。

需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。

消息重试

Consumer 消费消息其实也是需要一个 A 的操作,即 Comsumer 消费成功后需要返回 Broker 一个确认消息,如果没有返回则 Broker 认为这条消息消费失败,失败后会再重试消费该消息,默认重试16次

RocketMQ 对于重试消息的处理是先保存至 Topic 名称为SCHEDULE_TOPIC_XXXX的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至%RETRY%+consumerGroup的重试队列中,而且重试次数越多投递延时就越大

肥壕对于消息消费失败的处理方式是

死信队列

当消息达到最大的重试次数之后(默认16次),若消费依然失败,消息就会被投递到 Topic 名称为 %DLQ%+consumerGroup 的DLQ 死信队列,死信队列用于处理无法被正常消费的消息

RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

总结

开篇章没有讲解很多的源码,都是一些较为枯燥的概念和知识点。但是没有这些基础概念的认识,估计后面深入源码解析的章节里面,大家可能会随时一脸懵逼啦 😅

有关 RocketMQ 大部分的知识点应该都总结得差不多啦,相信大家对此都有一个初步的了解了(其实很多内容也是参考了源码中的文档)

PS:肥壕呢也是刚开始写博客,作为一名萌新写手可能写作效率呢是稍微弱了那么一丢丢。像这篇文章肥壕也是利用下班的时间肝出来的,写完后还要多次修补,大概花费了一周多时间。但是我觉得既然决定要做了,就必须尽力做到最好,这不仅是对大家负责,也是对自己的负责。后面的深入源码篇也会坚持下去的,可能更新时间比较久,但是肥壕保证一定是高质好文。

还有,既然是萌新写手嘛可能有很多术语或者语句表达起来有点晦涩或者毛病,希望大伙能够多多体谅。当然如果哪些技术点有错误或者有争议,同时也希望大伙能够及时提出,欢迎评论📫~