Gemini技术窝 发表于 2021-9-30 23:14:19

Dubbo源码深度分析:消费者初始化和Netty客户端应用

摘要:
本文主要解说Dubbo消费者怎样注册到Zookeeper,怎样监听Dubbo提供者的Zookeeper临时目录,并及时更新提供者的注册列表,最后提到Dubbo Consumer的关键步骤,即启动Netty客户端。


消费者注册到注册中心

首先看一下消费者是怎样注册到Zookeeper的,而且生成关键的Invoker对象。
privateInvoker doRefer(Cluster cluster, Registry registry, Class type, URL url) {    RegistryDirectory directory = new RegistryDirectory(type, url);    directory.setRegistry(registry);    directory.setProtocol(protocol);    // ......    // 默认必要注册到注册中心    if (directory.isShouldRegister()) {      // 设置消费者的url,然后将该url注册到zk      directory.setRegisteredConsumerUrl(subscribeUrl);      registry.register(directory.getRegisteredConsumerUrl());    }    // 创建路由规则链    directory.buildRouterChain(subscribeUrl);    // 订阅服务提供者地点    // 从zk拉取服务提供者的地点,并缓存到本地    directory.subscribe(toSubscribeUrl(subscribeUrl));   // 包装机器容错策略到invoker    Invoker invoker = cluster.join(directory);    List listeners = findRegistryProtocolListeners(url);    if (CollectionUtils.isEmpty(listeners)) {      return invoker;    }   RegistryInvokerWrapper registryInvokerWrapper = new RegistryInvokerWrapper(directory, cluster, invoker);    for (RegistryProtocolListener listener : listeners) {      listener.onRefer(this, registryInvokerWrapper);    }    return registryInvokerWrapper;}这里我们先关注一下registry.register(directory.getRegisteredConsumerUrl())。这部分代码就是注册到zk的地方。
public abstract class FailbackRegistry extends AbstractRegistry {    @Override    public void register(URL url) {      //......      super.register(url);      removeFailedRegistered(url);      removeFailedUnregistered(url);      try {            // Sending a registration request to the server side            // 发送注册服务请求到注册中心            doRegister(url);      } //......    }} public class ZookeeperRegistry extends FailbackRegistry {    @Override    public void doRegister(URL url) {      try {            // 将服务注册到zk            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));      } catch (Throwable e) {            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);      }    }}可以看到上面的代码其实跟服务提供者注册是一样的,就是构造一下url,然后作为一个临时节点写入到zk的指定目录下,即consumer目录。这样就完成了消费者的注册,后续假如消费者下线或者网络断开了,Zookeeper都会将该临时节点删除,而且通知有监听该目录的机器节点。
下面我们继承看一下消费者是怎样订阅服务提供者的地点列表,而且注册好Zookeeper的监听,后续有变化则会收到相应的通知,然后更新本地的提供者地点列表。
订阅服务提供者和注册监听

接下去继承看directory.subscribe(),用于订阅服务提供者地点。
public class RegistryDirectory extends AbstractDirectory implements NotifyListener {    public void subscribe(URL url) {      setConsumerUrl(url);      CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);      // 设置zk的监听器,假如服务提供者的列表出现变化,本地的缓存也会相应更新      serviceConfigurationListener = new ReferenceConfigurationListener(this, url);      // 订阅提供者      registry.subscribe(url, this);    }} @Overridepublic void subscribe(URL url, NotifyListener listener) {    super.subscribe(url, listener);    removeFailedSubscribed(url, listener);    try {      // Sending a subscription request to the server side      // 向zk发送订阅请求      doSubscribe(url, listener);    } catch (Exception e) {      //......    }} @Overridepublic void doSubscribe(final URL url, final NotifyListener listener) {    try {      if (ANY_VALUE.equals(url.getServiceInterface())) {            //......      }      // 默认走这里      else {            List urls = new ArrayList();            // 将消费者的url写入到zk的临时节点            for (String path : toCategoriesPath(url)) {                ConcurrentMap listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap());                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));                zkClient.create(path, false);                List children = zkClient.addChildListener(path, zkListener);                if (children != null) {                  urls.addAll(toUrlsWithEmpty(url, path, children));                }            }            // 触发监听者            notify(url, listener, urls);      }    } //......}上面的代码其实就是注册一下zk目录的监听,然后将消费者的url注册到zk中,最后的关键就是触发了监听,然后前面设置的监听类RegistryDirectory就会实行notify。
/*** 接收zk节点变化通知*/@Overridepublic synchronized void notify(List urls) {    //......    // providers    // 获取zk中最新的提供者    List providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());    //......    // 刷新invoker列表    refreshOverrideAndInvoker(providerURLs);}private void refreshOverrideAndInvoker(List urls) {    // mock zookeeper://xxx?mock=return null    overrideDirectoryUrl();    // 转换url为invoker    refreshInvoker(urls);}private void refreshInvoker(List invokerUrls) {    Assert.notNull(invokerUrls, "invokerUrls should not be null");   // 只有一个服务提供者时    //......    // 有多个提供者时    else {      //......      // url转换为invoker对象      Map newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map         //......      List newInvokers = Collections.unmodifiableList(new ArrayList(newUrlInvokerMap.values()));      // routerChain设置newInvokers      routerChain.setInvokers(newInvokers);      this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;      this.urlInvokerMap = newUrlInvokerMap;         // 关闭没用到的invokers      try {            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker      } //......    }}其实就是从zk中获取所有的设置信息,包括降级设置、提供者、消费者信息,然后将invokerUrls转换为invoker对象,这一步是关键。因此我们继承深入toInvokers()方法查看具体Dubbo怎样生成invoker对象,也就是所谓的Dubbo消费者接口署理对象。
private Map toInvokers(List urls) {    //......    for (URL providerUrl : urls) {      //......      if (invoker == null) { // Not in the cache, refer again            try {                boolean enabled = true;                if (url.hasParameter(DISABLED_KEY)) {                  enabled = !url.getParameter(DISABLED_KEY, false);                } else {                  enabled = url.getParameter(ENABLED_KEY, true);                }                if (enabled) {                  // 关键代码:这里调用Dubbo协议转换服务到invoker                  invoker = new InvokerDelegate(protocol.refer(serviceType, url), url, providerUrl);                }            } //......    }    keys.clear();    return newUrlInvokerMap;}从上面的这一坨代码中,关键的就是这一行:protocol.refer(serviceType, url),这里还是跟之前的一样先经过2个Wrapper类之后,就会调用DubboProtocol的refer()方法。
DubboProtocol创建连接

接下来继承深入DubboProtocol的refer()方法。
public class DubboProtocol extends AbstractProtocol {    @Override    publicInvoker protocolBindingRefer(Class serviceType, URL url) throws RpcException {      //......      // 关键代码:getClients()      DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers);      invokers.add(invoker);         return invoker;    }   private ExchangeClient[] getClients(URL url) {      // 同一台机器中的差别服务是否共享连接      boolean useShareConnect = false;         //......      // 假如没设置,则默认连接是共享的,否则每个服务单独有本身的连接      if (connections == 0) {            useShareConnect = true;             /*             * xml设置优先级高于属性设置             */            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,                  DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);            // 获取共享NettyClient            shareClients = getSharedClient(url, connections);      }         // 初始化Client      ExchangeClient[] clients = new ExchangeClient;      for (int i = 0; i < clients.length; i++) {            if (useShareConnect) {                // 共享则返回已经存在的                clients = shareClients.get(i);             } else {                // 否则创建新的                clients = initClient(url);            }      }         return clients;    }   /**   * Create new connection   *   * @param url   */    private ExchangeClient initClient(URL url) {      //.....      ExchangeClient client;      try {            // connection should be lazy            // 创建惰性连接            if (url.getParameter(LAZY_CONNECT_KEY, false)) {                client = new LazyConnectExchangeClient(url, requestHandler);             }            // 创建即时连接            else {                client = Exchangers.connect(url, requestHandler);            }         } //......      return client;    }}上面其实看看是否同一台机器上面的服务共享一个netty客户端,然后根据url去创建连接。所以继承跟进关键代码:Exchangers.connect(url, requestHandler)。
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {    //......    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");    return getExchanger(url).connect(url, handler);}public class HeaderExchanger implements Exchanger {@Overridepublic ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {      return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);}} public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {    //......    return getTransporter().connect(url, handler);} public class NettyTransporter implements Transporter {    public static final String NAME = "netty";    @Override    public Client connect(URL url, ChannelHandler handler) throws RemotingException {      return new NettyClient(url, handler);    }}看到这里就很认识了,就是层层递进去找到NettyClient,然后初始化netty客户端而已。
Netty客户端初始化和参数优化

接下来我们看一下Netty客户端怎样初始化,而且Dubbo怎样在使用Netty客户端的时间进行参数优化的。
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {    //......    try {      // 关键代码:创建netty客户端      doOpen();    }   try {      // connect.      connect();    } //......} /*** 初始化netty客户端*/@Overrideprotected void doOpen() throws Throwable {    // 1、创建业务handler    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);    // 2、创建启动器并设置    bootstrap = new Bootstrap();    bootstrap.group(NIO_EVENT_LOOP_GROUP)            // 连接保活            .option(ChannelOption.SO_KEEPALIVE, true)            // 不启用开启Nagle算法,Nagle算法会网络网络小包再一次性发送,不启用则是即时发送,进步时效性            .option(ChannelOption.TCP_NODELAY, true)            // ByteBuf的分配器(重用缓冲区),也就是使用对象池重复利用内存块            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)            //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())            .channel(socketChannelClass());   // 设置连接超时    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));    // 3、添加handler到连接的pipeline    bootstrap.handler(new ChannelInitializer() {         @Override      protected void initChannel(SocketChannel ch) throws Exception {            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());             NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug                  // 解码器                  .addLast("decoder", adapter.getDecoder())                  // 编码器                  .addLast("encoder", adapter.getEncoder())                  // 心跳查抄handler                  .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))                  // 业务处理handler                  .addLast("handler", nettyClientHandler);             //......      }    });}上面的代码就表现了一个Netty客户端的典型用法,设置worker线程池,参数设置了KEEPALIVE、NODELAY、CONNECT_TIMEOUT、ALLOCATOR这几个紧张参数,然后添加编解码、心跳和业务处理的handler。
其中参数调优主要是ALLOCATOR的设置,应用了Netty的对象池以便充分使用内存块,提升ByteBuf的分配效率。
更多详细和完整的文章内容请订阅CSDN专栏:Dubbo源码深度分析_Gemini的博客-CSDN博客
假如您感觉本文对您有所帮助的话,请先关注一下作者,转发一下本文,然后私信发“口试题整理资料”给作者,即可获得获得一份丰富的口试题整理资料,里面收录的口试题多达几百道,感谢您的阅读。
页: [1]
查看完整版本: Dubbo源码深度分析:消费者初始化和Netty客户端应用