摘要:
本文主要解说Dubbo消费者怎样注册到Zookeeper,怎样监听Dubbo提供者的Zookeeper临时目录,并及时更新提供者的注册列表,最后提到Dubbo Consumer的关键步骤,即启动Netty客户端。
消费者注册到注册中心
首先看一下消费者是怎样注册到Zookeeper的,而且生成关键的Invoker对象。
private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) { RegistryDirectory directory = new RegistryDirectory(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // ...... // 默认必要注册到注册中心 if (directory.isShouldRegister()) { // 设置消费者的url,然后将该url注册到zk directory.setRegisteredConsumerUrl(subscribeUrl); registry.register(directory.getRegisteredConsumerUrl()); } // 创建路由规则链 directory.buildRouterChain(subscribeUrl); // 订阅服务提供者地点 // 从zk拉取服务提供者的地点,并缓存到本地 directory.subscribe(toSubscribeUrl(subscribeUrl)); // 包装机器容错策略到invoker Invoker invoker = cluster.join(directory); List listeners = findRegistryProtocolListeners(url); if (CollectionUtils.isEmpty(listeners)) { return invoker; } RegistryInvokerWrapper registryInvokerWrapper = new RegistryInvokerWrapper(directory, cluster, invoker); for (RegistryProtocolListener listener : listeners) { listener.onRefer(this, registryInvokerWrapper); } return registryInvokerWrapper;}这里我们先关注一下registry.register(directory.getRegisteredConsumerUrl())。这部分代码就是注册到zk的地方。
public abstract class FailbackRegistry extends AbstractRegistry { @Override public void register(URL url) { //...... super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // Sending a registration request to the server side // 发送注册服务请求到注册中心 doRegister(url); } //...... }} public class ZookeeperRegistry extends FailbackRegistry { @Override public void doRegister(URL url) { try { // 将服务注册到zk zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }}可以看到上面的代码其实跟服务提供者注册是一样的,就是构造一下url,然后作为一个临时节点写入到zk的指定目录下,即consumer目录。这样就完成了消费者的注册,后续假如消费者下线或者网络断开了,Zookeeper都会将该临时节点删除,而且通知有监听该目录的机器节点。
下面我们继承看一下消费者是怎样订阅服务提供者的地点列表,而且注册好Zookeeper的监听,后续有变化则会收到相应的通知,然后更新本地的提供者地点列表。
订阅服务提供者和注册监听
接下去继承看directory.subscribe(),用于订阅服务提供者地点。
public class RegistryDirectory extends AbstractDirectory implements NotifyListener { public void subscribe(URL url) { setConsumerUrl(url); CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this); // 设置zk的监听器,假如服务提供者的列表出现变化,本地的缓存也会相应更新 serviceConfigurationListener = new ReferenceConfigurationListener(this, url); // 订阅提供者 registry.subscribe(url, this); }} @Overridepublic void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // Sending a subscription request to the server side // 向zk发送订阅请求 doSubscribe(url, listener); } catch (Exception e) { //...... }} @Overridepublic void doSubscribe(final URL url, final NotifyListener listener) { try { if (ANY_VALUE.equals(url.getServiceInterface())) { //...... } // 默认走这里 else { List urls = new ArrayList(); // 将消费者的url写入到zk的临时节点 for (String path : toCategoriesPath(url)) { ConcurrentMap listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap()); ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds))); zkClient.create(path, false); List children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 触发监听者 notify(url, listener, urls); } } //......}上面的代码其实就是注册一下zk目录的监听,然后将消费者的url注册到zk中,最后的关键就是触发了监听,然后前面设置的监听类RegistryDirectory就会实行notify。
/*** 接收zk节点变化通知*/@Overridepublic synchronized void notify(List urls) { //...... // providers // 获取zk中最新的提供者 List providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); //...... // 刷新invoker列表 refreshOverrideAndInvoker(providerURLs);}private void refreshOverrideAndInvoker(List urls) { // mock zookeeper://xxx?mock=return null overrideDirectoryUrl(); // 转换url为invoker refreshInvoker(urls);}private void refreshInvoker(List invokerUrls) { Assert.notNull(invokerUrls, "invokerUrls should not be null"); // 只有一个服务提供者时 //...... // 有多个提供者时 else { //...... // url转换为invoker对象 Map newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map //...... List newInvokers = Collections.unmodifiableList(new ArrayList(newUrlInvokerMap.values())); // routerChain设置newInvokers routerChain.setInvokers(newInvokers); this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers; this.urlInvokerMap = newUrlInvokerMap; // 关闭没用到的invokers try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } //...... }}其实就是从zk中获取所有的设置信息,包括降级设置、提供者、消费者信息,然后将invokerUrls转换为invoker对象,这一步是关键。因此我们继承深入toInvokers()方法查看具体Dubbo怎样生成invoker对象,也就是所谓的Dubbo消费者接口署理对象。
private Map toInvokers(List urls) { //...... for (URL providerUrl : urls) { //...... if (invoker == null) { // Not in the cache, refer again try { boolean enabled = true; if (url.hasParameter(DISABLED_KEY)) { enabled = !url.getParameter(DISABLED_KEY, false); } else { enabled = url.getParameter(ENABLED_KEY, true); } if (enabled) { // 关键代码:这里调用Dubbo协议转换服务到invoker invoker = new InvokerDelegate(protocol.refer(serviceType, url), url, providerUrl); } } //...... } keys.clear(); return newUrlInvokerMap;}从上面的这一坨代码中,关键的就是这一行:protocol.refer(serviceType, url),这里还是跟之前的一样先经过2个Wrapper类之后,就会调用DubboProtocol的refer()方法。
DubboProtocol创建连接
接下来继承深入DubboProtocol的refer()方法。
public class DubboProtocol extends AbstractProtocol { @Override public Invoker protocolBindingRefer(Class serviceType, URL url) throws RpcException { //...... // 关键代码:getClients() DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; } private ExchangeClient[] getClients(URL url) { // 同一台机器中的差别服务是否共享连接 boolean useShareConnect = false; //...... // 假如没设置,则默认连接是共享的,否则每个服务单独有本身的连接 if (connections == 0) { useShareConnect = true; /* * xml设置优先级高于属性设置 */ String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null); connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr); // 获取共享NettyClient shareClients = getSharedClient(url, connections); } // 初始化Client ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (useShareConnect) { // 共享则返回已经存在的 clients = shareClients.get(i); } else { // 否则创建新的 clients = initClient(url); } } return clients; } /** * Create new connection * * @param url */ private ExchangeClient initClient(URL url) { //..... ExchangeClient client; try { // connection should be lazy // 创建惰性连接 if (url.getParameter(LAZY_CONNECT_KEY, false)) { client = new LazyConnectExchangeClient(url, requestHandler); } // 创建即时连接 else { client = Exchangers.connect(url, requestHandler); } } //...... return client; }}上面其实看看是否同一台机器上面的服务共享一个netty客户端,然后根据url去创建连接。所以继承跟进关键代码:Exchangers.connect(url, requestHandler)。
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { //...... url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).connect(url, handler);}public class HeaderExchanger implements Exchanger { @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); }} public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { //...... return getTransporter().connect(url, handler);} public class NettyTransporter implements Transporter { public static final String NAME = "netty"; @Override public Client connect(URL url, ChannelHandler handler) throws RemotingException { return new NettyClient(url, handler); }}看到这里就很认识了,就是层层递进去找到NettyClient,然后初始化netty客户端而已。
Netty客户端初始化和参数优化
接下来我们看一下Netty客户端怎样初始化,而且Dubbo怎样在使用Netty客户端的时间进行参数优化的。
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { //...... try { // 关键代码:创建netty客户端 doOpen(); } try { // connect. connect(); } //......} /*** 初始化netty客户端*/@Overrideprotected void doOpen() throws Throwable { // 1、创建业务handler final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); // 2、创建启动器并设置 bootstrap = new Bootstrap(); bootstrap.group(NIO_EVENT_LOOP_GROUP) // 连接保活 .option(ChannelOption.SO_KEEPALIVE, true) // 不启用开启Nagle算法,Nagle算法会网络网络小包再一次性发送,不启用则是即时发送,进步时效性 .option(ChannelOption.TCP_NODELAY, true) // ByteBuf的分配器(重用缓冲区),也就是使用对象池重复利用内存块 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) .channel(socketChannelClass()); // 设置连接超时 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout())); // 3、添加handler到连接的pipeline bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { int heartbeatInterval = UrlUtils.getHeartbeat(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug // 解码器 .addLast("decoder", adapter.getDecoder()) // 编码器 .addLast("encoder", adapter.getEncoder()) // 心跳查抄handler .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)) // 业务处理handler .addLast("handler", nettyClientHandler); //...... } });}上面的代码就表现了一个Netty客户端的典型用法,设置worker线程池,参数设置了KEEPALIVE、NODELAY、CONNECT_TIMEOUT、ALLOCATOR这几个紧张参数,然后添加编解码、心跳和业务处理的handler。
其中参数调优主要是ALLOCATOR的设置,应用了Netty的对象池以便充分使用内存块,提升ByteBuf的分配效率。
更多详细和完整的文章内容请订阅CSDN专栏:Dubbo源码深度分析_Gemini的博客-CSDN博客
假如您感觉本文对您有所帮助的话,请先关注一下作者,转发一下本文,然后私信发“口试题整理资料”给作者,即可获得获得一份丰富的口试题整理资料,里面收录的口试题多达几百道,感谢您的阅读。 |