本 Chat 中,我们将基于 Dubbo 的 2.7.8 版本来讲解,主要是从口试官提问的角度,然后通过几个例子层层递进分析源码,让大家能够一步步在口试的氛围中深入相识 Dubbo 核心原理。话不多说了,我们进入第一个口试提问。
Dubbo 整体框架原理
口试官:Dubbo 是用来做什么的?内部的大概原理能讲一下吗?
首先,Dubbo 就是一个 RPC 框架,用来使开辟者实现远程调用如同本地调用一样方便的框架,只需要简朴配置和引入提供者的接口,就可以直接通过接口调用到远程服务提供的实现。 内部的大概原理就是提供者提供接口,并且将接口暴露成为一个可供访问的服务,然后把提供者服务的 IP 和端口注册到注册中心,消耗者在调用本地接口的时候,Dubbo 框架就会先根据接口类路径从注册中心拉取对应的 IP 和端口,然后连接到这个提供者实验业务逻辑,获取到效果后返回到消耗者本地接口的方法中。 听起来有点绕,直接看下图:
现在先有个大概的印象,下面会更具体介绍整个过程,用一步一图的方式分析。
Dubbo 的提供者核心源码和原理
口试官:那服务提供者是怎样将自己的服务暴露出去的,然后消耗者为什么能调用?
在回答这个问题之前,我们先从一个源码中的小例子入手,然后层层深入跟进源码:
private static void startWithExport() throws InterruptedException { // 服务提供者的配置 ServiceConfig service = new ServiceConfig(); service.setInterface(DemoService.class); service.setRef(new DemoServiceImpl()); service.setApplication(new ApplicationConfig("dubbo-demo-api-provider")); service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); // 最重要的入口,将服务暴露出去 service.export(); System.out.println("dubbo service started"); // 卡住主进程 new CountDownLatch(1).await();}这里的关键就是 service.export() 这行代码,跟进去后会发现,Dubbo 会将各种需要通报的数据比如方法入参、上下文参数等都写入到一个 URL 中,并且根据不同协议实验服务导出,这里我们全部按照默认 Zookeeper 注册中心的实现来跟进,因此可以看到下面这一段关键代码:
// 生成动态代理对象Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);// 真正导出服务Exporter exporter = PROTOCOL.export(wrapperInvoker);这里的 PROTOCOL 会经过 ProtocolListenerWrapper、ProtocolFilterWrapper,然后才会到 RegistryProtocol,而在 RegistryProtocol 中,我们发现在这里是通过 doLocalExport() 再一次履历 ProtocolListenerWrapper、ProtocolFilterWrapper 然后到 DubboProtocol,而在这个过程中,ProtocolFilterWrapper 会实验所有已注册的 Filter。 然后 RegistryProtocol 还会获取注册中心的地址,并将本提供者注册到了注册中心,而注册中心的注册实在就是调用了 ZookeeperRegistry 的 doRegister() 直接将提供者的 URL 作为临时节点写入到了 Zookeeper 的 providers 目次下面。 顺便说一句,想相识更多的关于 Zookeeper 的相干源码和原理知识,欢迎订阅我的另一篇 Chat:口试突击系列:Zookeeper 的核心源码和原理分析。
try { // 将服务注册到zk zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));} catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}注册完成之后,RegistryProtocol 的使命就临时结束了。到这里我们画个图表示一下:
接下来我们重点看一下 DubboProtocol 做了什么。 首先 DubboProtocol 通过调用 Exchangers#bind() 创建了一个 ExchangeServer,而在 bind() 中经过 HeaderExchanger、Transporters、Transporter$Adaptive 的调用,最终来到了 NettyTransporter,在这里创建了一个 NettyServer。
@Overridepublic RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException { // 初始化netty服务端 return new NettyServer(url, handler);}看到这里,我们就非常清楚地相识到了,Dubbo 的服务提供者实在就是使用 Netty 框架创建了一个 Server,由此我们可以猜测到,Dubbo 的服务消耗者应该是作为 Netty 的 Client 连接到 Server 这里进行通讯的。 所以 Dubbo 提供者是怎样暴露服务的呢,实在就是干了两件事:一个是将提供者的信息注册到注册中心,一个是启动 NettyServer 作为服务端提供服务。至于 NettyServer 里面具体是怎样初始化以及 Netty 参数优化,我们下面再讲。
Dubbo 的消耗者核心源码和原理
口试官:服务消耗者是怎样仅仅通过一个接口类直接调用到提供者的,并且做到失败重试、负载均衡的?
同样的,我们从一个消耗者的小例子入手,层层分析源码:
private static void runWithRefer() { ReferenceConfig reference = new ReferenceConfig(); reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer")); // 注册中心配置 reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); // 设置接口类 reference.setInterface(DemoService.class); // 关键代码:消耗者入口 DemoService service = reference.get(); String message = service.sayHello("dubbo"); System.out.println(message);}这里我们看到消耗者设置了注册中心配置和接口类,然后通过调用 reference.get() 就实现了服务的引入,接着就是直接调用接口的方法。因此我们从 reference.get() 入手分析。 从 ReferenceConfig 中可以看到,在创建代理类的时候,实在是使用了 REF_PROTOCOL.refer() 实现的:
// 只有一个注册中心的时候if (urls.size() == 1) { // 重点代码,创建服务引用 invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));}而这个 REF_PROTOCOL 也是跟提供者一样,经过了 ProtocolListenerWrapper、ProtocolFilterWrapper 的包装,然后来到了 RegistryProtocol。在 RegistryProtocol 中,首先是获取了注册中心的地址和集群容错策略,然后将本消耗者作为临时节点注册到了注册中心的 consumers,接着从注册中心将接口类的提供者信息拉取到本地缓存起来,并且监听该接口类的提供者列表的变更事件,最后将接口类包装成一个包罗集群容错策略的 Invoker 并返回,Invoker 的默认实现就是 MockClusterInvoker。而 MockClusterInvoker 里面的 invoker 对象实在就是 FailoverClusterInvoker,然后同时这个 invoker 还持有了 interceptors 拦截器链条。估计到时候就是经过层层拦截器链条之后,再调用 FailoverClusterInvoker 实现失败重试功能。关键代码如下:
// 3、包装机器容错策略到invokerInvoker invoker = cluster.join(directory);public class MockClusterWrapper implements Cluster { @Override public Invoker join(Directory directory) throws RpcException { return new MockClusterInvoker(directory, // 这里的cluster默认是FailoverCluster this.cluster.join(directory)); }}这里需要关注一下 RegistryDirectory#subscribe(),这个方法是订阅注册中心的提供者列表,拉取到了提供者的 URL 之后,通过 toInvokers() 将 URL 转换为 Invoker 对象,也就是下面这一段关键代码:
if (enabled) { // 关键代码:这里调用Dubbo协议转换服务到invoker // 这里的protocol.refer(serviceType, url)的效果是ProtocolFilterWrapper invoker = new InvokerDelegate(protocol.refer(serviceType, url), url, providerUrl);}这里的 protocol.refer(serviceType, url) 就会经过 ProtocolFilterWrapper 的包装,ProtocolFilterWrapper 会实验注册的消耗者 Filter,然后才到了 DubboProtocol。而 DubboProtocol 中的关键就是创建了 ExchangeClient,代码如下:
// 创建即时连接else { client = Exchangers.connect(url, requestHandler);}而这个 Exchangers.connect() 我们就很熟悉了,跟上面的提供者差不多,在connect() 中经过 HeaderExchanger、Transporters、Transporter$Adaptive 的调用,最终来到了 NettyTransporter,在这里创建了一个 NettyClient,也就是说,Dubbo 的消耗者实在就是作为一个 Netty 的客户端,连接到提供者的 Netty 服务端,然后进行数据传输。 而前面返回的 invoker 对象也在 ReferenceConfig 中创建为 InvokerInvocationHandler 对象,所以 InvokerInvocationHandler 就是具体的代理类了,当调用我们代码定义的接口类的时候,实际就是调用 InvokerInvocationHandler 的 invoke() 方法,代码如下:
// 6、创建服务代理,这里默认调用的是 JavassistProxyFactory 的 getProxy()方法// invoker就是MockClusterInvokerreturn (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));public class JavassistProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public T getProxy(Invoker invoker, Class[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }}所以这里我们梳理一下这个过程:
所以下面我们从 InvokerInvocationHandler 的 invoke() 方法入手,开始分析一次完整的 Dubbo 调用是怎样串起来的。 InvokerInvocationHandler 组装一下 RPC 调用参数然后直接调用 MockClusterInvoker 的 invoke(),并进行异常的捕获处理。然后 MockClusterInvoker 根据参数是否有 Mock 模式,如果有则返回 Mock 效果,如果没有则继承往下调用 FailoverClusterInvoker。 FailoverClusterInvoker 就是根据重试次数循环,捕获到异常后根据负载均衡策略重新选择 invoker 发起调用。关键代码如下:
// 3、使用循环,失败重试RpcException le = null; // last exception.List invoked = new ArrayList(copyInvokers.size()); // invoked invokers.Set providers = new HashSet(len);for (int i = 0; i < len; i++) { // 重试时,进行重新选择,克制重试时invoker列表已经发生变革 // 注意:如果列表发生了变革,那么invoked判断会失效,因为invoker实例已经改变 if (i > 0) { // 3.1、如果当前实例已经被销毁,则抛出异常 checkWhetherDestroyed(); // 3.2、重新获取所有服务提供者 copyInvokers = list(invocation); // check again // 3.3、重新检查一下 checkInvokers(copyInvokers, invocation); } // 3.4、根据负载均衡策略选择invoker Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // 3.5、具体发起远程调用 // 这里拿到的invoker对象是RegistryDirectory$InvokerDelegate Result result = invoker.invoke(invocation); ... return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); }}而负载均衡策略的则是在 select() 方法中实现,默认是使用 RandomLoadBalance 也就是随机选择策略,并且在重新选择的时候会先清除已经使用过的 invoker。 相信看到这里,大家就能回答前面的口试题了。 接下来 FailoverClusterInvoker 的 invoke() 会开始经过 Filter 的过滤链条处理,然后最终进入了 DubboInvoker 的 invoke() 方法,在这里会先从线程池获取一个线程,然后将前面初始化好的 Netty 客户端发起远程连接并且传输数据到服务提供者。
Netty 的参数优化
口试官:前面说到 Dubbo 是使用 Netty 作为通讯框架,那么使用 Netty 有什么好处?
这个口试题实在就是在问 Netty 的使用,并且最好能将一些 Netty 的参数调优表现出来。 前面我们提到的,服务提供者的核心实在是初始化了 Netty 服务端并且将 IP 和端口上报给注册中心,然后服务消耗者就从注册中心获取接口类对应的 IP 和端口,便可以通过 Netty 客户端连接到服务端,将数据传输到服务端进行处理,处理完成后吸收返回的数据,转换为接口方法的返回类型,返回给该方法。 因此我们先看一下 Netty 服务端的初始化:
// 设置Netty的boss线程池和worker线程池bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");workerGroup = NettyEventLoopFactory.eventLoopGroup( getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), "NettyServerWorker");首先这里是 reactor 线程模型的使用, 这里设置了2个线程池,bossGroup主要用于处理客户端的accept连接、断开连接等,workerGroup处理数据的传输,包括read、write事件和pipeline链条中的handler。 简朴提一下,reactor 线程模型就是将线程池的职责分开,处理连接的属于低频操纵且阻塞时间相对较长(因为需要进行 TCP 3次握手),处理数据的 IO 传输属于高频且和业务相干性精密,使用互不干扰的线程池可以提高效率。 接下来看一下参数的设置:
// 设置netty的业务处理类final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);channels = nettyServerHandler.getChannels();bootstrap.group(bossGroup, workerGroup) .channel(NettyEventLoopFactory.serverSocketChannelClass()) //一样平常来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用,用于断开后的重连。 .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) // 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送,提高时效性 .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) // ByteBuf的分配器(重用缓冲区),也就是使用对象池重复利用内存块 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)实在除了上面那几个参数外,还有一些比较重要的参数在这块源码中没有表现,因此我从Seata(分布式事务框架)将相干的源码摘出来:
this.serverBootstrap .group(this.eventLoopGroupBoss, this.eventLoopGroupWorker) .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ) // TCP 3次握手的队列缓冲区巨细 .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize()) //一样平常来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。 .option(ChannelOption.SO_REUSEADDR, true) // 连接保活 .childOption(ChannelOption.SO_KEEPALIVE, true) // 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送 .childOption(ChannelOption.TCP_NODELAY, true) // 发送数据缓冲区巨细 .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize()) // 吸收数据缓冲区巨细 .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize()) // 控制网络水位,将网络传输速度维持在比较安稳的状态 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark())) .localAddress(new InetSocketAddress(listenPort))而这个参数 WRITEBUFFERWATER_MARK 是属于新版本才有的,原理大概就是设置一个高水位、一个低水位,当输出的数据速率高于高水位时,则暂停一下 write 事件,改为处理 read 事件,这样就可以控制网络传输的速度比较安稳,克制大流量打死网卡。 最后看一下处理的链条:
.childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); // 默认不启用SSL if (getUrl().getParameter(SSL_ENABLED_KEY, false)) { ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler)); } ch.pipeline() // 解码器 .addLast("decoder", adapter.getDecoder()) // 编码器 .addLast("encoder", adapter.getEncoder()) // 心跳检查handler .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) // 业务处理handler .addLast("handler", nettyServerHandler); }});// bind// 绑定本地端口,并启动监听服务ChannelFuture channelFuture = bootstrap.bind(getBindAddress());channelFuture.syncUninterruptibly();channel = channelFuture.channel();这里我们可以看到 pipeline() 里面加入了很多 handler,实在这里就是使用了责任链设计模式,吸收到的网络请求,都会依次使用每个 handler 处理一下。 编解码器涉及到了怎样处理网络的粘包、拆包的处理。 IdleStateHandler 心跳处理器,这个 handler 实在是 Netty 提供的,目标是维持 server 端和 client 端的长连接,如果没有设置这个 handler,就会经常出现 TCP 的超时时间到了,然后客户端直接断开和服务端的连接,这个笔者之前遇到过这个 bug,现象就是创建连接一段时间之后,没有新的网络传输的时候,莫名其妙地断开了连接。
下面我们再看一下 Netty 客户端的初始化:
protected void doOpen() throws Throwable { // 1、创建业务handler final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); // 2、创建启动器并配置 bootstrap = new Bootstrap(); bootstrap.group(NIO_EVENT_LOOP_GROUP) // 连接保活 .option(ChannelOption.SO_KEEPALIVE, true) // 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送,提高时效性 .option(ChannelOption.TCP_NODELAY, true) // ByteBuf的分配器(重用缓冲区),也就是使用对象池重复利用内存块 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) .channel(socketChannelClass()); // 设置连接超时 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout())); // 3、添加handler到连接的pipeline bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); if (getUrl().getParameter(SSL_ENABLED_KEY, false)) { ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler)); } NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug // 解码器 .addLast("decoder", adapter.getDecoder()) // 编码器 .addLast("encoder", adapter.getEncoder()) // 心跳检查handler .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)) // 业务处理handler .addLast("handler", nettyClientHandler); String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST); if(socksProxyHost != null) { int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT)); Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort)); ch.pipeline().addFirst(socks5ProxyHandler); } } });}客户端的参数与服务端的大同小异,需要注意的是客户端只需要一个线程池而已。
编解码器怎样处理粘包和拆包
口试官:怎样办理 TCP 网络传输中的拆包和粘包?
拆包是指在网络传输过程中,一份数据被拆分为多次传输,每次只传输了一部分。粘包是指在网络传输中,两份数据合并在一起传输过去了。Dubbo 的网络拆包和粘包的处理是通过在 Netty 的处理链条中添加的编解码器实现的。 Dubbo 的编码器是 DubboCodec 的父类 ExchangeCodec 实现的。我们看一下代码:
public class ExchangeCodec extends TelnetCodec { @Override public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException { // 1、对请求信息进行编码 if (msg instanceof Request) { encodeRequest(channel, buffer, (Request) msg); } // 2、对响应信息进行编码 else if (msg instanceof Response) { encodeResponse(channel, buffer, (Response) msg); } // 3、其他信息进行编码 else { super.encode(channel, buffer, msg); } } protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { // 获取序列化扩展实现 Serialization serialization = getSerialization(channel); // 创建Dubbo协议扩展头字节数组,HEADER_LENGTH=16 // header. byte[] header = new byte[HEADER_LENGTH]; // 把魔数0xdabb写入协议头 // set magic number. Bytes.short2bytes(MAGIC, header); // 设置请求类型与序列化类型,标记到协议头 // set request and serialization flag. header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); if (req.isTwoWay()) { header[2] |= FLAG_TWOWAY; } if (req.isEvent()) { header[2] |= FLAG_EVENT; } // 将请求id设置到协议头 // set request id. Bytes.long2bytes(req.getId(), header, 4); // 使用serialization将对象数据部分进行编码,并把协议数据部分写入缓存 // encode request data. int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) { encodeEventData(channel, out, req.getData()); } else { encodeRequestData(channel, out, req.getData(), req.getVersion()); } // 刷新缓存 out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); // 检查payload(协议数据部分)是否合法 int len = bos.writtenBytes(); checkPayload(channel, len); Bytes.int2bytes(len, header, 12); // 将协议头写入缓存 // write buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); // write header. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException { int savedWriteIndex = buffer.writerIndex(); try { // 获取序列化扩展实现 Serialization serialization = getSerialization(channel); // 创建Dubbo协议扩展头字节数组, HEADER_LENGTH为 16 // header. byte[] header = new byte[HEADER_LENGTH]; // 把魔数写入协议头 // set magic number. Bytes.short2bytes(MAGIC, header); // 设立请求类型与序列化类型,标记到协议头 // set request and serialization flag. header[2] = serialization.getContentTypeId(); if (res.isHeartbeat()) { header[2] |= FLAG_EVENT; } // 设置响应类型到第4字节位置 // set response status. byte status = res.getStatus(); header[3] = status; // 将请求ID设直到协议头 // set request id. Bytes.long2bytes(res.getId(), header, 4); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); // 使用serialization对数据部分进行编码,并把协议数据部分写入缓存 ObjectOutput out = serialization.serialize(channel.getUrl(), bos); // encode response data or error message. if (status == Response.OK) { if (res.isHeartbeat()) { encodeEventData(channel, out, res.getResult()); } else { encodeResponseData(channel, out, res.getResult(), res.getVersion()); } } else { out.writeUTF(res.getErrorMessage()); } // 刷新缓存 out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); // 检查payload (协议数据部分)是否合法 checkPayload(channel, len); Bytes.int2bytes(len, header, 12); // 将协议头写入缓存 // write buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); // write header. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } catch (Throwable t) { // clear buffer buffer.writerIndex(savedWriteIndex); ...... } }}上面的代码实在可以看到 Dubbo 协议的组成,分为 2 大块:header 和 data 部分。header 总包罗了 16 字节,前 2 字节为魔数,标记一个数据帧的开始,然后是1字节的请求类型和序列化标记 id,然后 1 字节是只在响应报文设置的效果码,然后 8 字节是请求 id,最后 4 字节是 body 内容的巨细。 接下来看下解码的部分,同时可以从里面学习到对于网络拆包粘包的处理。InternalDecoder 就是解码的内部实现类。
private class InternalDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf input, List out) throws Exception { ChannelBuffer message = new NettyBackedChannelBuffer(input); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); // decode object. do { // 先保存未读取之前的位置 int saveReaderIndex = message.readerIndex(); // 调用DubboCodec对数据进行解码 Object msg = codec.decode(channel, message); // 如果遇到拆包,则重置message为之前的位置 if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break; } else { //is it possible to go here ? if (saveReaderIndex == message.readerIndex()) { throw new IOException("Decode without read data."); } // 否则,走到这里就是读取乐成的 if (msg != null) { // 把解码乐成的对象放入out列表 out.add(msg); } } } while (message.readable()); }}这里我们可以看到 Dubbo 对于拆包的处理,实在就是判断一下是否遇到了 Codec2.DecodeResult.NEEDMOREINPUT,如果遇到了则放弃前面读取的部分,然后等待下次 read 通道里面的数据。至于粘包的处理,因为这里读取 header 的时候,都是按照固定从长度读取,并且读取 data 的时候,也是按照 header 里面指定的长度读取的,所以读到的效果肯定是完整的,不会出现多余的字节,如果不完整就是走拆包的处理逻辑。 接下来 codec.decode() 就是开始分析协议的内容了:
@Overridepublic Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; // 先读取协议头 buffer.readBytes(header); //分析Dubbo协议数据部分 return decode(channel, buffer, readable, header);}@Overrideprotected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException { // 检查魔数,确认为Dubbo协议帧 // check magic number. if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1; i < header.length - 1; i++) { if (header == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break; } } return super.decode(channel, buffer, readable, header); } // 检查是否读取了一个完整的Dubbo协议头 // check length. if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } // 从协议头的最后四个字节读取协议数据部分的巨细 // get data length. int len = Bytes.bytes2int(header, 12); checkPayload(channel, len); // 如采遇到半包问题,则直接返回 int tt = len + HEADER_LENGTH; if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } // 分析协议数据部分 // limit input stream. ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { // 这里调用的是子类DubboCodec return decodeBody(channel, is, header); } finally { if (is.available() > 0) { try { if (logger.isWarnEnabled()) { logger.warn("Skip input stream " + is.available()); } StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } }} |