RocketMQ源码解析-通信模块

RocketMQ源码解析-通信模块

Scroll Down

这篇文章和 《RocketMQ源码解析-开篇》 隔了非常久,肥壕真是惭愧不已。一方面是忙于工作(摸鱼),另一方面一直纠结从哪个方面入手会让大家更加容易理解,而且如果贴上太多的源码,阅读的效果可能会适得其反。

所以为了提高文章的阅读质量,肥壕决定删繁就简,摒弃过度的源码解析,结合更多的设计图,目的是:看完直呼好家伙

在消息队列架构中,各个角色可能随时都要进行通信交互,数据传输。

因此,通信模块在消息队列设计中是不可或缺的核心模块。

而且一个优秀良好的网络通信模块,很大程度上决定了消息传输的能力和整体性能。

本文就从 RocketMQ 的通信模块源码解析,深入学习高性能的网络通信模块究竟是如何实现的。

RocketMQ 消息队列的整体架构图👇

关于 RocketMQ 架构中各角色的作用和功能可以查看: ,肥壕就不再复述。

这里我们重点关注是各角色之间的通信关系:

NameServer

  • Name Server 每隔 10 s 扫描所有存活 Broker 的连接,如果 NameServer 超过 2 min 没有收到心跳,则 NameServer 断开与 Broker 的连接。

Broker

  • 每个 Broker 与所有的 NameServer 节点建立长连接,每隔 30s 汇报 Topic 信息到所有 NameServer。

Producer

  • Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,默认每隔 30s 从 NameServer 获取所有 Topic 队列的最新情况,这意味着如果 Broker不可用,Producer 最多 30s 能够感知,在此期间内发往 Broker 的所有消息都会失败。
  • Producer 与 提供 topic 服务的 Broker 建立长连接,默认 30s 向所有关联的 Broker 发送心跳,Broker 每隔 10s 扫描所有存活的连接,如果 Broker 在 2min 内没有收到心跳数据,则关闭与 Producer 的连接。

Consumer

  • Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,默认每隔 30s 从 NameServer 获取 Topic 的最新队列情况,这意味着 Broker 不可用时,Consumer 最多最需要 30s 才能感知。
  • Consumer 每隔 30s 向所有关联的 Broker 发送心跳,Broker 每隔 10s 扫描所有存活的连接,若某个连接 2min 内没有发送心跳数据,则关闭连接;并向该 Consumer Group 的所有 Consumer 发出通知,Group 内的 Consumer 重新分配队列,然后继续消费。

可以看出,RocketMQ 架构中各角色之间形成一个比较复杂的通信网络,每一条链路都有可能影响整个消息队里通信的性能。

rocketmq-remoting 模块是 RocketMQ 中负责网络通信的核心模块,也是我们这次阅读主要的模块。(本文使用的 RocketMQ 版本是 4.4.1)

RocketMQ 的通信模块是基于 Netty 扩展的,在阅读通信模块部分的源码前,大家最好对 Netty 有一个基础的入门了解,知道 Netty 的整体通信模型,NIO 线程模型的知识。这样在下面的源码解析中就不会犯迷糊。

RocketMQ 多线程模型

Remoting 的网络通信是基于 Netty 实现,所以整个通信架构都是基于 Netty 模型扩展来的。

1. Remoting 通信模块结构

我们先来看看 Remoting 通信模块的类结构图:

  • RemotingService:最上层接口,定义了三个方法
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
  • RemotingServer:定义了服务端的接口,继承了最上层接口 RemotingService
// 注册处理器
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
    final ExecutorService executor);

// 注册默认处理器
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

int localListenPort();

// 根据请求码(code)获取不同的处理器
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

// 同步通信,返回 RemotingCommand
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
    final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
    RemotingTimeoutException;

// 异步通信
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
    final InvokeCallback invokeCallback) throws InterruptedException,
    RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

// 单向通信
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
    throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
    RemotingSendRequestException;
  • RemotingClient:定义了客户端的接口,并且继承了最上层接口 RemotingService,定义的方法与 RemotingServer 相似。
  • NettyRemotingAbstract:Netty 通信抽象类,定义并封装了服务端和客户端公共方法。
  • NettyRemotingServer:服务端的实现类,实现了 RemotingServer 接口,继承 NettyRemotingAbstract 抽象类。
  • NettyRemotingClient:客户端的实现类,实现了 RemotingClient 接口,继承 NettyRemotingAbstract 抽象类。

简单说,RocketMQ 在 Netty 通信的基础框架上对通信的 Server 和 Client 进行了抽象和封装的处理,使结构更为简洁和易扩展。

2. Netty 的多线程模型

Netty 是一个高性能,异步事件驱动的 NIO 框架,使用 Reactor 模式构建的线程模型。

Reactor 三种线程模型:

  • 单线程 Reactor 模型

    所有的 I/O 操作都在一个 NIO 线程完成,同时负责客户端的连接和 read/write 操作。

    缺点:一个 NIO 线程即要负责 I/O 连接又要负责 I/O 读写,可能会导致线程负载过高, 处理性能越来越低效,甚至会导致 CPU 跑飞,系统宕机的风险。

  • 多线程 Reactor 模型

    与单线程模型最大的区别是,有一组 NIO 线程负责 I/O 读写,将 I/O 连接与读写分离开,提高 I/O 的读写速率。

    这也是大部分场景所使用的模型,能够支撑日常高并发连接的业务场景。

  • 主从线程模型

    如果是并发百万的客户端连接,单个 Acceptor 线程可能就会显得力不从心,有性能上的瓶颈。而主从线程模型的特点是:将原本负责 I/O 连接的单个线程替换成 NIO 线程池。

3. RocketMQ 的线程模型

RocketMQ 则采用了多线 Reactor 程模型的设计实现网络通信:

通过这个结构图可以看出,RocketMQ 在 Netty 原生的多线程 Reactor 模型上做了一系列的扩展和优化,记住主要的数字:(1 + N + M1 + M2

  • 一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP 网络连接请求,建立好连接,创建 SocketChannel,并注册到 selector 上。
  • RocketMQ 的源码中会自动根据 OS 的类型选择 NIOEpoll,也可以通过参数配置,然后监听真正的网络数据。
  • 拿到网络数据后,再丢给 Worker 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),
  • 在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,这些工作交给 defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为 8 )去做。
  • 而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,然后封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。
线程数线程名线程具体说明
1NettyBoss_%dReactor 主线程
NNettyServerEPOLLSelector_%d_%dReactor 线程池
M1NettyServerCodecThread_%dWorker线程池
M2RemotingExecutorThread_%d业务processor处理线程池

更详细的内容可以参考官方文档的说明 [Apache RocketMQ开发者指南-设计] ,当然,前提是要熟悉 Netty 的整个调用链路和整体的设计结构,不熟悉的同学先自行学习。

消息的协议设计与编解码

我们知道网络传输的数据是二进制格式的,Server 与 Client 之间发送消息与接收消息的时候,需要对其进行序列化和反序列化,所以也可以理解为数据的解码和编码的过程。

但是解码和编码也必须要按相同的消息协议进行,就好比这一段二进制的消息是一封信,哪里个位置是开头,哪里位置是内容,哪里位置是结尾,写信人和收信人都必须要有相同的约定。

所以要保证消息能够正确的发送与接收,就必须保证彼此使用一致的消息协议编解码方式,不然就会出现

👳‍♂️:ミ耗釨尾汁o巴さ

🤷‍♂️: ???

RocketMQ 为了更高效地在网络中传输消息和对收到的消息读取,自定义通信协议和消息的编解码。

先来看一下 RocketMQ 自定义的通信协议的格式:

可见传输内容主要可以分为以下 4 部分:

(1) 消息长度:总长度,四个字节存储,占用一个 int 类型;

(2) 序列化类型 & 消息头长度:同样占用一个 int 类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;

(3) 消息头数据:经过序列化后的消息头数据;

(4) 消息主体数据:消息主体的二进制字节数据内容;

RemotingCommand 类是消息协议的数据封装,不但包含了所有的数据结构,还包含了编码解码操作。

RemotingCommand 类的成员变量如下:

Header字段类型Request说明Response说明
codeint请求操作码,应答方根据不同的请求码进行不同的业务处理应答响应码。0表示成功,非0则表示各种错误
languageLanguageCode请求方实现的语言应答方实现的语言
versionint请求方程序的版本应答方程序的版本
opaqueint相当于requestId,在同一个连接上的不同请求标识码,与响应消息中的相对应应答不做修改直接返回
flagint区分是普通RPC还是onewayRPC得标志区分是普通RPC还是onewayRPC得标志
remarkString传输自定义文本信息传输自定义文本信息
extFieldsHashMap<String, String>请求自定义扩展信息响应自定义扩展信息

具体的编解码操作都在 RemotingCommand 类实现,肥壕来简单讲解一下消息的编码过程,让大家了解一下 RocketMQ 对消息做了哪些自定义的规范和处理。

进去 NettyEncoder 类编码器看:

@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
    throws Exception {
    try {
        // 1. Message Length + Serialization type + Header Length + Data Header
        ByteBuffer header = remotingCommand.encodeHeader();
        out.writeBytes(header);
        // 2. Message Body
        byte[] body = remotingCommand.getBody();
        if (body != null) {
            out.writeBytes(body);
        }
    } catch (Exception e) {
        log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
        if (remotingCommand != null) {
            log.error(remotingCommand.toString());
        }
        RemotingUtil.closeChannel(ctx.channel());
    }
}

对照上图 RocketMQ 自定义的通信协议的格式

步骤 1 :remotingCommand.encodeHeader() 将通信协议的前三部分都转成 byte

步骤 2: remotingCommand.getBody() 将消息内容转成 byte

最终转成能在网络中传输的二进制数据,我们再深入 remotingCommand.encodeHeader()

public ByteBuffer encodeHeader(final int bodyLength) {
    // 1> header length size
    int length = 4;

    // 2> header data length
    byte[] headerData;
    headerData = this.headerEncode();

    length += headerData.length;

    // 3> body data length
    length += bodyLength;

    ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);

    // 1.message length
    result.putInt(length);

    // 2.serialization type + header length
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

    // 3.header data
    result.put(headerData);

    result.flip();

    return result;
}

这里其实逻辑也是比较清晰的,就是把自定义协议的前三部分分别转成 byte。不过这里有个比较有意思的方法:markProtocolType(headerData.length, serializeTypeCurrentRPC)

public static byte[] markProtocolType(int source, SerializeType type) {
    byte[] result = new byte[4];

    result[0] = type.getCode();
    result[1] = (byte) ((source >> 16) & 0xFF);
    result[2] = (byte) ((source >> 8) & 0xFF);
    result[3] = (byte) (source & 0xFF);
    return result;
}

可能有些同学没看明白上面的位运算,肥壕呢一开始看到也是一脸懵逼,后面经过一番资料研究之后其实还是很好理解的。可以参考:https://www.cnblogs.com/mcsfx/p/11027160.html

消息的通信方式和通信流程

RocketMQ 通信方式主要有三种:

  • 同步(sync)
  • 异步(async)
  • 单向(oneway)

下文就以 同步(sync) 通信模式,重点分析一下客户端的发送流程。

1. Client 发送请求消息

客户端(发送者)发送消息的时候,一般都会直接调用 DefaultMQProducerImpl 类中的 send(Message msg),而这个方法默认是同步通信模式的。而这个方法最后会调用到 NettyRemotingClient 类中的 invokeSync方法,拿到与服务器 Channel ,然后调用 NettyRemotingAbstract 类中的 invokeSyncImpl 方法,给服务端发送消息。

invokeAsyncImpl 发送消息的源码如下(已附上相关注释):

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    // 相当于requestID,每个请求都会生成一个唯一ID,每次加一
    final int opaque = request.getOpaque();

    try {
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        // 使用 Netty 的 Channel 发送请求数据到服务端
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }
                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                // 执行这个方法同时也会调用 countDownLatch countDown()方法
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
        // 使用 countDownLatch 实现同步
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            } else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }
        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);
    }
}
  • opaque:请求标识码,同个客户端连接每个请求都会生成唯一的请求码。
  • ResponseFuture: 是获取发送消息结果的封装对象,这里 RocketMQ 使用 CountDownLatch 计数器实现同步通信模式。创建对象的时候也会默认创建一个 countDownLatch = new CountDownLatch(1) ,调用 Channel 发送消息后会调用 waitResponse(timeoutMillis)(实际调用countDownLatch.await()) 阻塞等待结果,然后在 Channel 的回调函数中会释放这个计数器。
  • responseTable:保存请求标识码和响应结果的映射表。在同步模式中即用即取,发挥作用不大,主要是作用于异步通信模式,因网络丢失问题,对异步调用做补偿处理等。

2. Server 接收消息和处理逻辑

Server 端接收消息的核心处理入口在 NettyServerHandler 类的 channelRead0 方法中,并调用负责处理请求消息的核心方法 processRequestCommand

/**
 * Process incoming request command issued by remote peer.
 *
 * @param ctx channel handler context.
 * @param cmd request command.
 */
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
  // 根据请求业务码,获取对应的处理类和线程池  
  final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    // 前置处理
                    doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                    // 核心处理方法
                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                    // 后置处理
                    doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

                    if (!cmd.isOnewayRPC()) {
                        if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                                ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                                log.error("process request over, but response failed", e);
                                log.error(cmd.toString());
                                log.error(response.toString());
                            }
                        } else {

                        }
                    }
                } catch (Throwable e) {
                    log.error("process request exception", e);
                    log.error(cmd.toString());

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                            RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };
        // 如果拒绝请求为true
        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        try {
            // 封装task,使用当前业务的processor绑定的线程池执行
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
            if ((System.currentTimeMillis() % 10000) == 0) {
                log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                    + ", too many requests and system thread pool busy, RejectedExecutionException "
                    + pair.getObject2().toString()
                    + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
            }
        }
    } else {
        String error = " request type " + cmd.getCode() + " not supported";
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
        log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }
}

在处理请求命令的方法中,RocketMQ 使用了一系列的设计模式,把消息业务代码抽象化,使整个调用方法的代码逻辑更为简洁和易扩展。

  • processorTable:业务码与业务处理器、业务线程池的映射表
/**
 * This container holds all processors per request code, aka, for each incoming request, we may look up the
 * responding processor in this map to handle the request.
 */
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

可以看到,如果要对业务进行修改和扩展,只需改动对应的业务处理器即可,扩展性是非常高的。并且都会封装成 RequestTask 线程,由对应的业务线程池异步执行任务。

总结

本篇文章最核心的是深入解析 RocketMQ 的通信模块,分别从多线程模型消息协议设计与编解码消息通信方式这几个方面并结合代码较为深入的了解整个通信模块的设计和交互流程。

RocketMQ 作为一个出色优秀的消息队列框架,其底层必然少不了一个性能高效的通讯架构支撑。

当然,一个高效的网络通信架构,除了有优秀的通信设计,还需要确保通信的稳定性。比如:客户端如何确保发送消息不丢失、客户端的负载均衡等,这些后面肥壕再一一讲解吧。

本文内容如有理解不到位的地方,欢迎大家留言探讨~

普通的改变,将改变普通

我是宅小年,一个在互联网低调前行的小青年

关注公众号「宅小年」,个人博客 📖 edisonz.cn,阅读更多分享文章