跟踪到RetryLoadBalancerInterceptor类
@Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); //获取到service的name final String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); //根据serviceName和LoadBalancerClient,LoadBalancedRetryPolicy里面包罗了RibbonLoadBalancerContext和ServiceInstanceChooser final LoadBalancedRetryPolicy retryPolicy = lbRetryFactory.createRetryPolicy(serviceName, loadBalancer); RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy); //执行方法会进入到doExecute方法 return template.execute(context -> { ServiceInstance serviceInstance = null; if (context instanceof LoadBalancedRetryContext) { LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context; serviceInstance = lbContext.getServiceInstance(); } if (serviceInstance == null) { serviceInstance = loadBalancer.choose(serviceName); } ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer.execute( serviceName, serviceInstance, requestFactory.createRequest(request, body, execution)); int statusCode = response.getRawStatusCode(); if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) { byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody()); response.close(); throw new ClientHttpResponseStatusCodeException(serviceName, response, bodyCopy); } return response; }, new LoadBalancedRecoveryCallback() { //This is a special case, where both parameters to LoadBalancedRecoveryCallback are //the same. In most cases they would be different. @Override protected ClientHttpResponse createResponse(ClientHttpResponse response, URI uri) { return response; } }); }复制代码doExecute方法:
protected T doExecute(RetryCallback retryCallback, RecoveryCallback recoveryCallback, RetryState state) throws E, ExhaustedRetryException { //省略部门代码 /* * We allow the whole loop to be skipped if the policy or context already * forbid the first try. This is used in the case of external retry to allow a * recovery in handleRetryExhausted without the callback processing (which * would throw an exception). */ //执行逻辑的关键方法 while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { }复制代码继续跟踪canRetry方法
@Override public boolean canRetry(RetryContext context) { LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext)context; if(lbContext.getRetryCount() == 0 && lbContext.getServiceInstance() == null) { //We haven't even tried to make the request yet so return true so we do //设置选中的服务提供者 lbContext.setServiceInstance(serviceInstanceChooser.choose(serviceName)); return true; } return policy.canRetryNextServer(lbContext); }复制代码我们跟踪serviceInstanceChooser.choose(serviceName)看看怎么通过serviceName选服务提供者的。
@Override public ServiceInstance choose(String serviceId) { //选择server Server server = getServer(serviceId); if (server == null) { return null; } return new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); }复制代码跟踪getServer方法
protected Server getServer(ILoadBalancer loadBalancer) { if (loadBalancer == null) { return null; } //可以看出是loadBalancer在选择 return loadBalancer.chooseServer("default"); // TODO: better handling of key }复制代码继续深入
public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } //有一个调用次数在+1 counter.increment(); if (rule == null) { return null; } else { try { //委托给了IRule,以是Irule是负载均衡的关键,最厥后总结 return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } }复制代码查看Irule的实现
public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); //lb.getAllServers里面是全部的服务提供者列表 Optional server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } }复制代码跟踪chooseRoundRobinAfterFiltering方法
public Optional chooseRoundRobinAfterFiltering(List servers, Object loadBalancerKey) { //拿到筛选后的servers List eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } //incrementAndGetModulo方法拿到下标,然后根据list.get取到一个服务 return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); }复制代码至此就拿到了具体的服务提供者。
但是到这里还有个问题?
怎么根据服务名拿到server的?
有一个ServerList接口是用于拿到服务列表的。我们使用的loadBalancer(ZoneAwareLoadBalancer)的父类DynamicServerListLoadBalancer类的构造方法里,有一个restOfinit方法
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList serverList, ServerListFilter filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); } restOfInit(clientConfig); }复制代码跟踪restOfInit方法
void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); enableAndInitLearnNewServersFeature(); //用于获取全部的serverList updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); }复制代码继续跟踪updateListOfServers方法
public void updateListOfServers() { List servers = new ArrayList(); if (serverListImpl != null) { //查询serverList servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); }复制代码继续跟踪源码到obtainServersViaDiscovery方法,
private List obtainServersViaDiscovery() { List serverList = new ArrayList(); //eurekaClientProvider.get()会去获取EurekaClient if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList(); } EurekaClient eurekaClient = eurekaClientProvider.get(); //vipAddresses就是serviceName if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client //此处获取到服务的信息 List listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } // copy is necessary since the InstanceInfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr); des.setZone(DiscoveryClient.getZone(ii)); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList; }复制代码综合上面可以看出,最终是通过eurekaClient去拿到服务列表的。
那么如果服务列表发生变革怎么刷新呢?
是通过CacheRefreshThread在定时线程池里面执行,核心拉取方法是fetchRegistry Iping
Iping是用于探测服务列表中的服务是否正常,如果不正常,则从eureka拉取服务列表并更新。
在BaseLoadBalancer里面有一个setupPingTask方法,启动定时任务,30秒一次定时向EurekaClient发送“ping”
public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats, IPing ping, IPingStrategy pingStrategy) { logger.debug("LoadBalancer [{}]: initialized", name); this.name = name; this.ping = ping; this.pingStrategy = pingStrategy; setRule(rule); setupPingTask(); lbStats = stats; init(); }复制代码Iping的具体逻辑在PingTask类里。 Irule总结:
Irule是负载均衡的规则:
我这里默认是使用的是ZoneAvoidanceRule,还有许多种策略: