再不看就删了!超详细的Ribbon源码解析
Ribbon简介什么是Ribbon?
Ribbon是springcloud下的客户端负载均衡器,消费者在通过服务别名调用服务时,须要通过Ribbon做负载均衡获取现实的服务调用地址,然后通过httpclient的方式举行当地RPC远程调用。
Ribbon原理
Ribbon负载均衡算法主要是轮询算法,分为以下几步:
[*]根据服务别名,从eureka获取服务提供者的列表
[*]将列表缓存到当地
[*]根据具体策略获取服务提供者
Ribbon的核心是负载均衡管理,另还有5个大功能点。如下图:
https://p6.toutiaoimg.com/large/pgc-image/4849dbd5ab6541fb88d03af9432c6ccd
源码分析
事前准备
[*]先搭建一个SpringCloud的项目,也可以从我的github上下载。地址:github.com/mmcLine/spr…
[*]拷贝以下代码 @Configuration public class RestTemplateConfiguration { @Bean @LoadBalanced public RestTemplate getRestTemplate(){ return new RestTemplate(); } } @Autowired private RestTemplate restTemplate; @GetMapping("/testRibbon/{id}") public User getTodayStatistic(@PathVariable("id") Integer id){ String url ="http://STUDY-USER/user/getUserById?id="+id; return restTemplate.getForObject(url, User.class); } 复制代码
代码都准备好了,可以开始分析了。
[*]执行调用
http://localhost:8005/trade/testRibbon/2
为什么这么就能调用到服务提供者的方法?
打断点,可以看到restTemplate里有两个拦截器,根据名字可以推断RetryLoadBalancerInterceptor是关键。
https://p3.toutiaoimg.com/large/pgc-image/744cd74848f640868d1049755972ec9e
跟踪到RetryLoadBalancerInterceptor类
@Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); //获取到service的name final String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); //根据serviceName和LoadBalancerClient,LoadBalancedRetryPolicy里面包罗了RibbonLoadBalancerContext和ServiceInstanceChooser final LoadBalancedRetryPolicy retryPolicy = lbRetryFactory.createRetryPolicy(serviceName, loadBalancer); RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy); //执行方法会进入到doExecute方法 return template.execute(context -> { ServiceInstance serviceInstance = null; if (context instanceof LoadBalancedRetryContext) { LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context; serviceInstance = lbContext.getServiceInstance(); } if (serviceInstance == null) { serviceInstance = loadBalancer.choose(serviceName); } ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer.execute( serviceName, serviceInstance, requestFactory.createRequest(request, body, execution)); int statusCode = response.getRawStatusCode(); if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) { byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody()); response.close(); throw new ClientHttpResponseStatusCodeException(serviceName, response, bodyCopy); } return response; }, new LoadBalancedRecoveryCallback() { //This is a special case, where both parameters to LoadBalancedRecoveryCallback are //the same.In most cases they would be different. @Override protected ClientHttpResponse createResponse(ClientHttpResponse response, URI uri) { return response; } }); }复制代码doExecute方法:
protectedT doExecute(RetryCallback retryCallback, RecoveryCallback recoveryCallback, RetryState state) throws E, ExhaustedRetryException { //省略部门代码 /* * We allow the whole loop to be skipped if the policy or context already * forbid the first try. This is used in the case of external retry to allow a * recovery in handleRetryExhausted without the callback processing (which * would throw an exception). */ //执行逻辑的关键方法 while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { }复制代码继续跟踪canRetry方法
@Override public boolean canRetry(RetryContext context) { LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext)context; if(lbContext.getRetryCount() == 0&& lbContext.getServiceInstance() == null) { //We haven't even tried to make the request yet so return true so we do //设置选中的服务提供者 lbContext.setServiceInstance(serviceInstanceChooser.choose(serviceName)); return true; } return policy.canRetryNextServer(lbContext); }复制代码我们跟踪serviceInstanceChooser.choose(serviceName)看看怎么通过serviceName选服务提供者的。
@Override public ServiceInstance choose(String serviceId) { //选择server Server server = getServer(serviceId); if (server == null) { return null; } return new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); }复制代码跟踪getServer方法
protected Server getServer(ILoadBalancer loadBalancer) { if (loadBalancer == null) { return null; } //可以看出是loadBalancer在选择 return loadBalancer.chooseServer("default"); // TODO: better handling of key }复制代码继续深入
public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } //有一个调用次数在+1 counter.increment(); if (rule == null) { return null; } else { try { //委托给了IRule,以是Irule是负载均衡的关键,最厥后总结 return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]:Error choosing server for key {}", name, key, e); return null; } } }复制代码查看Irule的实现
public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); //lb.getAllServers里面是全部的服务提供者列表 Optional server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } }复制代码跟踪chooseRoundRobinAfterFiltering方法
public Optional chooseRoundRobinAfterFiltering(List servers, Object loadBalancerKey) { //拿到筛选后的servers List eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } //incrementAndGetModulo方法拿到下标,然后根据list.get取到一个服务 return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); }复制代码至此就拿到了具体的服务提供者。
但是到这里还有个问题?
[*]怎么根据服务名拿到server的?
有一个ServerList接口是用于拿到服务列表的。我们使用的loadBalancer(ZoneAwareLoadBalancer)的父类DynamicServerListLoadBalancer类的构造方法里,有一个restOfinit方法
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList serverList, ServerListFilter filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); } restOfInit(clientConfig); }复制代码跟踪restOfInit方法
void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); enableAndInitLearnNewServersFeature(); //用于获取全部的serverList updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); }复制代码继续跟踪updateListOfServers方法
public void updateListOfServers() { List servers = new ArrayList(); if (serverListImpl != null) { //查询serverList servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); }复制代码继续跟踪源码到obtainServersViaDiscovery方法,
private List obtainServersViaDiscovery() { List serverList = new ArrayList(); //eurekaClientProvider.get()会去获取EurekaClient if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList(); } EurekaClient eurekaClient = eurekaClientProvider.get(); //vipAddresses就是serviceName if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client //此处获取到服务的信息 List listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } // copy is necessary since the InstanceInfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr); des.setZone(DiscoveryClient.getZone(ii)); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList; }复制代码综合上面可以看出,最终是通过eurekaClient去拿到服务列表的。
那么如果服务列表发生变革怎么刷新呢?
是通过CacheRefreshThread在定时线程池里面执行,核心拉取方法是fetchRegistry
Iping
Iping是用于探测服务列表中的服务是否正常,如果不正常,则从eureka拉取服务列表并更新。
在BaseLoadBalancer里面有一个setupPingTask方法,启动定时任务,30秒一次定时向EurekaClient发送“ping”
public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats, IPing ping, IPingStrategy pingStrategy) { logger.debug("LoadBalancer [{}]:initialized", name); this.name = name; this.ping = ping; this.pingStrategy = pingStrategy; setRule(rule); setupPingTask(); lbStats = stats; init(); }复制代码Iping的具体逻辑在PingTask类里。
Irule总结:
Irule是负载均衡的规则:
我这里默认是使用的是ZoneAvoidanceRule,还有许多种策略:
[*]RandomRule: 随机
[*]RoundRobinRule: 轮询
[*]RetryRule: 先按照RoundRobinRule的策略获取服务,如果获取服务失败则在指定时间内会举行重试,获取可用的服务
[*]WeightedResponseTimeRule: 对RoundRobinRule的扩展,响应速度越快的实例选择权重越大,越容易被选择
[*]BestAvailableRule:会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务
[*]AvailabilityFilteringRule:先过滤掉故障实例,再选择并发较小的实例
[*]ZoneAvoidanceRule:默认规则,复合判断server所在地区的性能和server的可用性选择服务器
properties配置方式如下:
STUDY-USER是服务名
STUDY-USER.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.RoundRobinRule
作者:别掉头发了
链接:https://juejin.cn/post/7020403359270043685
来源:稀土掘金
著作权归作者全部。商业转载请联系作者获得授权,非商业转载请注明出处。
转发了 转发了
页:
[1]