RocketMq の 介绍篇(2)

RocketMq の 介绍篇(2)

Scroll Down

概述

image1.png

1.Name Server

  • Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步

2.Broker

  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个
  • 每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接

3.Producer

  • Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

  • Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。

  • Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。

4.Consumer

  • Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

  • Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。
    Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。

  • 当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉。
    消费者对列是消费者连接之后(或者之前有连接过)才创建的。我们将原生的消费者标识由 @{消费者group}扩展为 @{消费者group},(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)。任何一个元素不同,都认为是不同的消费端,每个消费端会拥有一份自己消费对列(默认是broker对列数量*broker数量)。新挂载的消费者对列中拥有commitlog中的所有数据。

消费

消费模式

  • 集群消费

    • 一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息
  • 广播消费

    • 一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义

获取消息模式

  • Push

    • 推送模式(虽然 RocketMQ 使用的是长轮询)的消费者。消息的能及时被消费。使用非常简单,内部已处理如线程池消费、流控、负载均衡、异常处理等等的各种场景
  • Pull

    • 拉取模式的消费者。应用主动控制拉取的时机,怎么拉取,怎么消费等。主动权更高。但要自己处理各种场景。
      决绝绝大多数场景下,我们会使用 PushConsumer 推送模式

消息类型

  • 普通消息

    • 普通消息也叫做无序消息,简单来说就是没有顺序的消息,producer 只管发送消息,consumer 只管接收消息,至于消息和消息之间的顺序并没有保证,可能先发送的消息先消费,也可能先发送的消息后消费。
      举个简单例子,producer 依次发送 order id 为 1、2、3 的消息到 broker,consumer 接到的消息顺序有可能是 1、2、3,也有可能是 2、1、3 等情况,这就是普通消息
  • 顺序消息

    • 有序消息就是按照一定的先后顺序的消息类型
      举个例子来说,producer 依次发送 order id 为 1、2、3 的消息到 broker,consumer 接到的消息顺序也就是 1、2、3 ,而不会出现普通消息那样的 2、1、3 等情况
  • 延时消息

    • 延时消息,简单来说就是当 producer 将消息发送到 broker 后,会延时一定时间后才投递给 consumer 进行消费。
      RcoketMQ的延时等级为:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延时。level=1,表示 1 级延时,对应延时 1s。level=2 表示 2 级延时,对应5s,以此类推
  • 事物消息

    image3.png

image4.jpg

存储模型

RocketMQ的消息的存储是由ConsumeQueue和CommitLog 配合来完成的,ConsumeQueue中只存储很少的数据,消息主体都是通过CommitLog来进行读写。

image5.jpg

  • CommitLog:是消息主体以及元数据的存储主体,一个文件集合,,为了讨论方便可以把它当成一个文件,所有消息内容全部持久化到这个文件中。对CommitLog建立一个ConsumeQueue,每个ConsumeQueue对应一个(概念模型中的)MessageQueue,所以只要有Commit Log在,Consume Queue即使数据丢失,仍然可以恢复出来。

    • 每个commitLog大小上限为1G,满1G之后会自动新建CommitLog文件做保存数据用
    • CommitLog的清理机制:
      • 按时间清理,rocketmq默认会清理3天前的commitLog文件;
      • 按磁盘水位清理:当磁盘使用量到达磁盘容量75%,开始清理最老的commitLog文件。
  • ConsumeQueue:是一个消息的逻辑队列,相当于CommitLog的索引文件,存储了这个Queue在CommitLog中的起始offset,log大小和MessageTag的hashCode。每个Topic下的每个Queue都有一个对应的ConsumerQueue文件,例如Topic中有三个队列,每个队列中的消息索引都会有一个编号,编号从0开始,往上递增。并由此一个位点offset的概念,有了这个概念,就可以对Consumer端的消费情况进行队列定义。

    • consumequeue的数据结构包含3部分
      • 消息在commitLog文件实际偏移量(commitLogOffset)
      • 消息大小
      • 消息tag的哈希值

假如集群有一个Broker,Topic为binlog的队列(Consume Queue)数量为4,如下图所示,按顺序发送这5条内容各不相同消息。
image6.jpg

image7.jpg

了解了每个文件都在什么位置存放什么内容,那接下来就正式开始讨论这种存储方案为什么在性能带来的提升。
通常文件读写比较慢,如果对文件进行顺序读写,速度几乎是接近于内存的随机读写,为什么会这么快❓❓❓
⭐原因就是:Page Cache
image8.jpg

  • 先来个直观的感受,整个OS有3.7G的物理内存,用掉了2.7G,应当还剩下1G空闲的内存,但OS给出的却是175M。当然这个数学题肯定不能这么算。
  • OS发现系统的物理内存有大量剩余时,为了提高IO的性能,就会使用多余的内存当做文件缓存,也就是图上的buff / cache,广义我们说的Page Cache就是这些内存的子集。
  • OS在读磁盘时会将当前区域的内容全部读到Cache中,以便下次读时能命中Cache,写磁盘时直接写到Cache中就写返回,由OS的pdflush以某些策略将Cache的数据Flush回磁盘

☁ MQ元数据都落在单个文件上(即commitLog),大量数据IO都在顺序写同一个commitLog,满1G了再写新的,真正意义上的顺序写盘,再加上MQ默认是累计4K才强制从PageCache中刷到磁盘(缓存),所以高并发写性能突出。

☁ MQ读取消息依赖系统PageCache,PageCache命中率越高,读性能越高,Linux平时也会尽量预读数据,使得应用直接访问磁盘的概率降低。

💦 所以Broker的机器需要大内存,尽量缓存足够多的commitLog,让Broker读写消息基本在PageCache中操作。在运行时,如果数据量非常大,可以看到broker的进程占用内存比较多,其实大部分是被缓存住的commitlog。

最佳实践

1.Tag

一个应用尽可能用一个Topic,可以充分利用tags来做消息过滤,减少Topic复杂设计

2.key

每个消息在业务层面的唯一标示码,方便将来定位消息丢失问题  
服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

3.日志

消息发送成功或者失败,要打印消息日志,务必要打印send result和key字段

4.幂等

消息重复的根本原因是:网络不可靠。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?
RocketMQ使用的消息原语是At Least Once,所以consumer可能多次收到同一个消息
a. 消费端处理消息的业务逻辑保持幂等性;
b. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。

5.topic配置

broker上的topic要控制在合适的数量。  
Broker向Namesr发心跳时,会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话,网络传输失败,心跳失败,导致Namesrv误认为Broker心跳失败。