马士兵老师 发表于 2021-10-29 17:02:56

RocketMQ源码详解 | Broker篇 · 其一:线程模型与接收链路

Netty 组件

如果你还记得上一节的内容的话那应该知道,NettyRomotingAbstract 有两个实现类,分别是 NettyRemotingClient 和 NettyRemotingServer ,我们已经知道了前者的实现,如今我们再来看看后者
https://p9.toutiaoimg.com/large/pgc-image/ed4d8e5aaee445cfa98bd975ef897d9b
NettyRemotingServer

这个类很长,我们先来看它的属性
/*    引导类和dispatch线程与select线程池   */private final ServerBootstrap serverBootstrap;private final EventLoopGroup eventLoopGroupSelector;private final EventLoopGroup eventLoopGroupBoss;// 配置类private final NettyServerConfig nettyServerConfig; // 用来执行 callback 函数的线程池private final ExecutorService publicExecutor;// 自定义的 Channel 事件监听器private final ChannelEventListener channelEventListener; // 扫描已经超时的 ResponseFeatureprivate final Timer timer = new Timer("ServerHouseKeepingService", true);// 工作线程private DefaultEventExecutorGroup defaultEventExecutorGroup;private int port = 0; private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";private static final String TLS_HANDLER_NAME = "sslHandler";private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; // sharable handlersprivate HandshakeHandler handshakeHandler;private NettyEncoder encoder;private NettyConnectManageHandler connectionManageHandler;private NettyServerHandler serverHandler;我们主要关心 serverBootStrap 的启动



起首是它的初始化,初始化代码较长,主要做了三件事:

[*]初始化 callback 函数执行线程池
[*]在 Linux 平台上启用 epoll
[*]使用可能存在的 SSL



然后是重头戏,其具体的创建
ServerBootstrap childHandler =this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 半连接队列长度.option(ChannelOption.SO_BACKLOG, 1024)// 开启内核中的 net.ipv4.tcp_tw_reuse 选项.option(ChannelOption.SO_REUSEADDR, true)// 关闭操纵系统的连接维护,由本身去干.option(ChannelOption.SO_KEEPALIVE, false)// 禁用 Nagle 算法.childOption(ChannelOption.TCP_NODELAY, true)// 设定发送缓冲区和接收缓冲区大小.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()).childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())// 设置监听端口(0.0.0.0:xx).localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())).childHandler(new ChannelInitializer() {    @Override    public void initChannel(SocketChannel ch) throws Exception {      ch.pipeline()      // 设置握手处理器      .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)      .addLast(defaultEventExecutorGroup,               // 设置编解码器               encoder,               new NettyDecoder(),               // 注册 Netty 的心跳查抄               new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),               // 管理连接,超时处理,维护channelTables与存活的连接               connectionManageHandler,               // 实际上的处理收到的请求               serverHandler                );    }});这里需要关注的点许多,我们按照顺序来看
起首是线程模型,在这里我们可以看出它是 1(eventLoopGroupBoss) - N(eventLoopGroupSelector) - M(defaultEventExecutorGroup) 的线程模型,即有 一个 Acceptor,N 个 Select 线程,和 M 个 IO 线程。
如果了解过 Reactor 模型的话可以看出这属于主从多 Reactor 模式,在 Nginx、Kakfa、Tomcat 都能看到类似的设计。



然后需要关注的是 SO_BACKLOG,这里指定了半队列的长度为 1024
backlog
在 TCP 的三次握手中,backlog 用于处理从 SYN RECEIVED 到 ESTABLISHED 状态之间的套接字。
其中具有 SYN 队列和 accept 队列:
SYN 队列长度由系统调整。当服务器端收到一个 SYN 包时,将其放入 SYN 队列并返回 ACK+SYN。队满则抛弃,客户端超时后重发。accept 队列长度由步伐调整(也就是我们通过 SO_BACKLOG 设置的长度)。当服务器端收到之前本身发送的 SYN 的 ACK 时,会将套接字放入这里。大多数时间这里的数据可以很快的被步伐通过 accept() 取出。队满时抛弃到来的 ACK 包(虽然客户端已经进入了 ESTABLISHED 状态,但由于 tcp 的慢启动,并不会造成太大影响),客户端重发到一定次数仍未被放入 accept 队列时会被发送 RST 包。同时在 Linux 中,这里队满时会对 SYN 队列的接收速率举行控制。



再通过 SO_REUSEADDR 开启了内核的 net.ipv4.tcp_tw_reuse 选项
net.ipv4.tcp_tw_reuse
这个选项主要用在具有大量短连接的应用。
问题:
在具有大量短连接时,服务器端上具有太多属于同一个客户端的处于 TIME_WAIT 状态的连接,而导致该客户端不能建立新的连接。
处理方法:
在 Linux 中,TCP 的 TIME_WAIT 时间默认为 1 分钟,而 TIME_WAIT 被设计出来的主要目标有两个:
避免新的连接收到旧的连接的重发数据包确保长途端不是在 LAST_ACK 状态
在开启这个选项后,如果 TIME_WAIT 状态的连接过多,会使用在 TCP 可选头部中的时间戳选项,来和之前存储的时间戳对比,若该大,则从 TIME_WAIT 状态的存活连接中随机选取一个并分配给该 TCP 连接。
对于需要解决问题 1,由于旧的连接的重发包具有过期的时间戳,所以会被抛弃;
对于问题 2 ,当处于 LAST_ACK 的一端收到新的 TCP 连接的 SYN 包后,会将其抛弃,然后重发 FIN 包,处于 SYN_SEND 状态的一端收到这种错误的包后会发送 RST 包,然后再发送 SYN 包重试。



然后使用 SO_KEEPALIVE 关闭操纵系统自带的 KeepAlive 机制。
这是因为操纵系统的连接维护默认为 2 小时,对其修改需要系统调用,且当协议被切换为 UDP 时会失效,故我们在后面使用了 IdleStateHandler 来注册 Netty 本身实现的心跳检测



接着将 TCP_NODELAY 设置为 True 来禁用 Nagle 算法。
这是因为 Nagle 算法会期待当前 TCP 的包到达了充足的大小才会发送,这会造成发送延迟



再往后看可以发现是先注册了 HandshakeHandler,我们来看它干了什么
@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {// 标记当前位置以便规复。因为我们接下来需要查看第一个字节以确定内容是否以 TLS 握手开始msg.markReaderIndex();   byte b = msg.getByte(0);   // 握手的魔数,如果是说明这是个tls握手if (b == HANDSHAKE_MAGIC_CODE) {    switch (tlsMode) {      // 禁用 SSL      case DISABLED:      ctx.close();      log.warn("Clients intend to establish an SSL connection while this server is running in SSL disabled mode");      break;      // 可用或必须使用 SSL      case PERMISSIVE:      case ENFORCING:      if (null != sslContext) {          // 添加 SSL handler          ctx.pipeline()            // SSL 隧道            .addAfter(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, TLS_HANDLER_NAME, sslContext.newHandler(ctx.channel().alloc()))            // 用来包管文件在零拷贝时也进入能被 SSL 加密            .addAfter(defaultEventExecutorGroup, TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());          log.info("Handlers prepended to channel pipeline to establish SSL connection");      } else {          ctx.close();          log.error("Trying to establish an SSL connection but sslContext is null");      }      break;       default:      log.warn("Unknown TLS mode");      break;    }} else if (tlsMode == TlsMode.ENFORCING) {    ctx.close();    log.warn("Clients intend to establish an insecure connection while this server is running in SSL enforcing mode");}   // 规复read索引,以便握手协商可以正常举行。msg.resetReaderIndex();   try {    // 完成 SSL 的判定后将被于本 pipeline 中移除    ctx.pipeline().remove(this);} catch (NoSuchElementException e) {    log.error("Error while removing HandshakeHandler", e);}   // 交给下一个 handlerctx.fireChannelRead(msg.retain());}从代码我们可以知道,这个 Handler 用于判断是否使用 SSL 对连接举行加密,有的话则使用



然后是我们之条件到过的 IdleStateHandler ,它的几个参数分别是:

[*]读超时时间
[*]写超时时间
[*]读写超时时间
而我们在这将 1 和 2 都设置为了 0,即不举行触发
一旦超时,它将会产生 IdleStateEvent ,在下一个 Handler NettyConnectManageHandler 中,我们可以看到它被捕获了
@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {    IdleStateEvent event = (IdleStateEvent) evt;    if (event.state().equals(IdleState.ALL_IDLE)) {      final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());      log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);      RemotingUtil.closeChannel(ctx.channel());      if (NettyRemotingServer.this.channelEventListener != null) {      NettyRemotingServer.this          .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));      }    }}   ctx.fireUserEventTriggered(evt);}


最后其他的组件都和上一章讲过差不多,故不再重复。接下来主要看一个和 Client 差别的地方。



ChannelEventListener

在上一章了解 Client 时,NettyConnectManageHandler 中在每一个状态中都有以下代码
if (NettyRemotingServer.this.channelEventListener != null) {NettyRemotingServer.this    .putNettyEvent(new NettyEvent(NettyEventType./*XXX*/, remoteAddress, ctx.channel()));}Client 由于没有注册 channelEventListener 而没有使用,在 NettyRemotingServer 中则在执行构造器时注册了 ClientHousekeepingService ,当然是 Broekr 端,另有一个是 BrokerHousekeepingService ,用于 NameServer
public void start() {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {    @Override    public void run() {      try {      ClientHousekeepingService.this.scanExceptionChannel();      } catch (Throwable e) {      log.error("Error occurred when scan not active client channels.", e);      }    }}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);} private void scanExceptionChannel() {this.brokerController.getProducerManager().scanNotActiveChannel();this.brokerController.getConsumerManager().scanNotActiveChannel();this.brokerController.getFilterServerManager().scanNotActiveChannel();}从实现就能看出来,这个类是在定期扫描过期的 Channel 并移除,同时通过监听事件在其 close、exception、idle 时移除



NettyRemotingAbstract

最后回到 NettyRemotingAbstract 的 processRequestCommand 方法,虽然在上一节中已经看过了,不过我们再来详细看一次
final Pair matched = this.processorTable.get(cmd.getCode());final Pair pair = null == matched ? this.defaultRequestProcessor : matched;起首我们可以知道在 processorTable 中存放着响应码和其对应的请求处理器与执行线程池,如果没有会使用默认处理器。
然后是使用其对应的线程池来执行业务请求,并使用处理回调函数
try {doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);final RemotingResponseCallback callback = response -> { /* xxx */ };// 如果是异步请求处理器,则将回调函数交给其if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {    AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();    processor.asyncProcessRequest(ctx, cmd, callback);} else {    NettyRequestProcessor processor = pair.getObject1();    RemotingCommand response = processor.processRequest(ctx, cmd);    // 否则举行同步的调用    callback.callback(response);}} catch (Throwable e) {        /* xxx*/}


那么,这些响应函数和线程池是在什么时间放入的呢?通过追踪,我们发现了 BrokerController 类,其在初始化时调用的 registerProcessor 函数如下:
// 用于处理消息的发送请求SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/** * PullMessageProcessor */this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); /** * ReplyMessageProcessor */ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);replyMessageProcessor.registerSendMessageHook(sendMessageHookList); this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor); /*以下略   */我们主要观察到了几个重点:

[*]每一类业务处理都由该业务范例对应的线程池来处理
[*]同时维护 remotingServer 和 fastRemotingServer 两个处理服务如果你对在第一节提到过的 VIP 另有印象的话,应该可以想起 VIP 端口就是 普通端标语-2。而这里的 fastRemotingServer,监控的就是 VIP 端口



至此,我们终于可以画出 RocketMQ 在 Broker 端的线程模型了
https://p6.toutiaoimg.com/large/pgc-image/e8bc79fb82294d5bace2d7e6ffef4fde

原文链接:https://www.cnblogs.com/enoc/p/rocketmq-so-no-san.html

愉悦的旭日H8 发表于 2021-10-29 18:34:36

转发了

懂球 发表于 2021-10-29 18:30:49

转发了
页: [1]
查看完整版本: RocketMQ源码详解 | Broker篇 · 其一:线程模型与接收链路