创意电子

标题: 「Eureka源码分析」从源码层面让你认识Eureka工作流程和运作机制 [打印本页]

作者: 爱马士团团长    时间: 2021-9-25 14:46
标题: 「Eureka源码分析」从源码层面让你认识Eureka工作流程和运作机制
还原还原

EurekaServer启动流程分析

EurekaServer 处理服务注册、消息数据复制

EurekaClient 是如何注册到 EurekaServer 的?

刚在如今org.springframework.cloud.netflix.eureka.server.InstanceRegistry的每个方法都打了一个断点,而且EurekaServer已经开始调试运行状态,那我们就随便找一个被@EnableEurekaClient的微服务启动微服务来有用吧,直接运行。
实例注册方法

InstanceRegistry.register( final InstanceInfo info, final  boolean isReplication) 方法进断点了。
应用资源类

主要是处理吸收 Http 的服务请求。
@POST @Consumes({ "application/json" , "application/xml" }) public Response addInstance(InstanceInfo info,                             @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {    logger.debug( "注册实例{} (replication={})" , info.getId(), isReplication);    // 验证 instanceinfo 是否包含所有必需的字段    if (isBlank(info.getId())) {         return Response.status( 400 ).entity( "Missing instanceId" ).build();    } else  if (isBlank(info.getHostName())) {         return Response.status( 400 ).entity( "Missing hostname" ).build();    } else  if (isBlank(info.getAppName())) {         return Response.status( 400 ).entity( "Missing appName" ).build();    } else  if (!appName.equals(info.getAppName())) {         return Response.status( 400 ).entity( "不匹配的appName,盼望" + appName + " 但是是 " + info.getAppName()).build( );    } else  if (info.getDataCenterInfo() == null ) {         return Response.status( 400 ).entity( "Missing dataCenterInfo" ) .build ();    } else  if (info.getDataCenterInfo().getName() == null ) {         return Response.status( 400 ).entity( "Missing dataCenterInfo Name" ) .build ();    }    // 处理客户端大概使用错误的 DataCenterInfo 注册并丢失数据的环境    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();    if (dataCenterInfo instanceof UniqueIdentifier) {        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();        如果(isBlank(dataCenterInfoId)){            布尔实验 = "真" .equalsIgnoreCase(serverConfig.getExperimental( "registration.validation.dataCenterInfoId" ));            如果(实验){                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " 必须包含有用的 id" ;                返回Response.status( 400 ).entity(entity).build();            } else  if (dataCenterInfo instanceof AmazonInfo) {                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;                字符串有用 ID = amazonInfo。获取(AmazonInfo.MetaDataKey.instanceId);                如果(有用 ID == null){                 amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());                }            }其他{                logger.warn( "在没有合适的 id 的环境下注册 {} 类型的 DataCenterInfo" , dataCenterInfo.getClass());            }        }    }    registry.register(info, "true" .equals(isReplication));    返回Response.status( 204 ).build();  // 204 向后兼容}
@Override public  void  register (终极InstanceInfo信息,终极 布尔值isReplication)  {        handleRegistration(信息,resolveInstanceLeaseDuration(信息),isReplication);        super .register(info, isReplication);}
私人 无效 handleRegistration (InstanceInfo信息,INT leaseDuration,                boolean isReplication)  {         log ( "register " + info.getAppName() + ", vip " + info.getVIPAddress()                        + ",leaseDuration" +leaseDuration + ", isReplication "                        + isReplication);        发布事件(新EurekaInstanceRegisteredEvent(这个,信息,leaseDuration,                        isReplication));}
服务户厕机制

进入PeerAwareInstanceRegistryImpl类的register(final InstanceInfo info, final boolean isReplication)方法;
@覆盖公共 无效 寄存器(终极InstanceInfo信息,终极 布尔isReplication)  {         //注释:续约时间,默认时间是常量值90秒    INT leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;        // 自己:续约时间,也可以从配置文件中取出来,所以说续约时间值也是可以让我们自界说配置的    if (info.getLeaseInfo() != null && info.getLeaseInfo()。 getDurationInSecs() > 0 ) {        LeaseDuration = info.getLeaseInfo().getDurationInSecs();    }        // 注释:将注册方的信息写入 EurekaServer 的注册表,父类为 AbstractInstanceRegistry     super .register(info,leasedDuration,isReplication);        // 注释:EurekaServer节点之间的数据同步,复制到其他    PeerreplicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null , isReplication);}进入super.register(info,leasedDuration,isReplication),如何写入EurekaServer的注册表的,进入AbstractInstanceRegistry.register(InstanceInfo registrant, inthaleDuration, boolean isReplication)方法。

公共 无效 注册(InstanceInfo 注册人,int租赁连续时间,布尔值是复制) {    尝试{        读。锁();                // 这个变量,就是我们的注册表注释,生存在内存中的;         Map gMap = registry. 获取(registrant.getAppName());        REGISTER.increment(isReplication);        if (gMap == null ) {            final ConcurrentHashMap gNewMap = new ConcurrentHashMap();            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);            if (gMap == null ) {                gMap = gNewMap;            }        }        租约 existingLease = gMap。获取(registrant.getId());        // 生存末了一个脏时间戳而不覆盖它,如果已经有租约        if (existingLease != null && (existingLease.getHolder() != null )) {            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();            logger.debug( "现有租约找到 (existing={},provided={}" , existingLastDirtyTimestamp, registrationLastDirtyTimestamp);             if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {                logger.warn( "有一个现有租约,而且现有租约的脏时间戳 {} 大于" +                         " 比正在注册的谁人 {}" , existingLastDirtyTimestamp, registrationLastDirtyTimestamp);                logger.warn( "使用现有的 instanceInfo 而不是新的 instanceInfo 作为注册人" );                注册人 = existingLease.getHolder();            }        } else {             // 租约不存在,因此它是一个新的注册            synchronized ( lock ) {                 if ( this .expectedNumberOfRenewsPerMin > 0 ) {                     // 由于客户端想要取消它,降低阈值                    // (1                     // 30 秒,2 一分钟)                    this .expectedNumberOfRenewsPerMin = this .expectedNumberOfRenewsPerMin + 2 ;                    这个.numberOfRenewsPerMinThreshold =                            ( int ) ( this .expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());                }            }            logger.debug( "没有找到以前的租约信息;这是新注册的" );        }        租约租约=新租约(注册人,租用连续时间);        if (existingLease != null ) {            Lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());        }        gMap.put(registrant.getId(), 租约);        同步(最近注册队列){            最近注册队列。添加( new Pair(                    System.currentTimeMillis(),                    registrant.getAppName() + "(" + registrant.getId() + ")" ));        }        //这是其中重载状态初始状态转移发生        ,如果(!InstanceStatus.UNKNOWN。即是(registrant.getOverriddenStatus())){            logger.debug( "Found overridden status {} for instance {}. 查抄是否需要添加到 "                             + "overrides" , registrant.getOverriddenStatus(), registrant.getId());            如果(!overriddenInstanceStatusMap.containsKey(registrant.getId())) {                logger.info( "未找到覆盖的 id {} 并因此添加它" , registrant.getId());                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());            }        }        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap。获取(registrant.getId());        if (overriddenStatusFromMap != null ) {            logger.info( "从地图存储覆盖状态{}",overriddenStatusFromMap );            registrant.setOverriddenStatus(overriddenStatusFromMap);        }        // 根据覆盖的状态规则设置状态        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);        registrant.setStatusWithoutDirty(overriddenInstanceStatus);        //如果租赁与UP状态,集租赁服务了时间戳注册        ,如果(InstanceStatus.UP。即是(registrant.getStatus())){            租赁.serviceUp();        }        registrant.setActionType(ActionType.ADDED);        最近更改队列。添加(新最近更改的项目(租赁));        registrant.setLastUpdatedTimestamp();        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());        logger.info( "注册实例 {}/{} 状态为 {} (replication={})" ,                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);    }末了{        读。解锁();    }}
集群之间的复制

replicaToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication) 的这个方法。
私人无效replicateToPeers(行动行动,字符串appName,字符串ID,                              InstanceInfo 信息/* 可选 */ ,                              InstanceStatus newStatus /* optional */ , boolean isReplication) {     Stopwatch tracer = action.getTimer().start();    尝试{        如果(isReplication){            numberOfReplicationsLastMin.increment();        }        // 如果已经是复制,不要再复制,由于这会造成毒复制                // 注释:如果已经复制过,就不再复制          if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {             return ;        }                // 遍历Eureka Server 集群中的所有节点,进行复制操作        for ( final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {             // 如果url代表本主机,则不要复制到自己。            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {                 continue ;            }                        // 没有复制过,遍历Eureka Server中的节点节点,操作,包罗取消、注册、状态、状态更新等。            replicaInstanceActionsToPeers(action, appName, id, info, newStatus, node);        }    }末了{        tracer.stop();    }}
private  void replicationInstanceActionsToPeers(Action action, String appName,                                             字符串 id、InstanceInfo 信息、InstanceStatus newStatus、                                             PeerEurekaNode 节点) {    试试{        InstanceInfo infoFromRegistry = null ;        CurrentRequestVersion.set(Version.V2);        开关(动作){             case 取消:                node.cancel(appName, id);                打破;            案例 心跳:                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);                infoFromRegistry = getInstanceByAppAndId(appName, id, false );                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false );                打破;            案例 注册:                节点注册(信息);                打破;            case  StatusUpdate:                 infoFromRegistry = getInstanceByAppAndId(appName, id, false );                node.statusUpdate(appName, id, newStatus, infoFromRegistry);                打破;            case  DeleteStatusOverride:                 infoFromRegistry = getInstanceByAppAndId(appName, id, false );                node.deleteStatusOverride(appName, id, infoFromRegistry);                打破;        }    }捕捉(可扔的 t){        logger.error( "无法将信息复制到 {} 以进行操作 {}" , node.getServiceUrl(), action.name(), t);    }}
同级之间的复制机制

PeerEurekaNode.register(final InstanceInfo info) 方法,一窥毕竟如何同步数据。
public  void  register ( final InstanceInfo)  throws Exception {         // 注释:使命到期时间给使命服务处理,默认时间    偏移当前时间 30 秒long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);    batchingDispatcher.process(            taskId( "register" , info),             new InstanceReplicationTask(targetHost, Action.Register, info, null , true ) {                 public EurekaHttpResponse execute ()  {                     return replicationClient.register(info);                }            },            到期时间    );}
公共 静态 TaskDispatcher  createBatchingTaskDispatcher (串ID,                                                                              INT MAXBUFFERSIZE,                                                                              INT workloadSize,                                                                              INT workerCount,                                                                             长maxBatchingDelay,                                                                             长congestionRetryDelayMs,                                                                             长networkFailureRetryMs,                                                                             TaskProcessor taskProcessor)  {         final AcceptorExecutor acceptorExecutor = new AcceptorExecutor(                id、maxBufferSize、workloadSize、maxBatchingDelay、congestionRetryDelayMs、networkFailureRetryMs        );        final TaskExecutors taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);        return  new TaskDispatcher() {             @Override             public  void  process (ID id, T task, long expiryTime)  {                acceptorExecutor.process(id, task, expiryTime);            }            @Override             public  void  shutdown ()  {                acceptorExecutor.shutdown();                taskExecutor.shutdown();            }        };    }
static  TaskExecutors batchExecutors ( final String name,                                                    int workerCount,                                                    final TaskProcessor 处理器,                                                    final AcceptorExecutor acceptorExecutor)  {     final AtomicBoolean isShutdown = new AtomicBoolean();    终极的TaskExecutorMetrics 指标 =新的TaskExecutorMetrics(name);    返回 新的TaskExecutors( newWorkerRunnableFactory (){         @覆盖        公共WorkerRunnable 创建(INT IDX)  {            返回 新BatchWorkerRunnable (“TaskBatchingWorker-” +名称+ ' - ' + IDX,isShutdown,指标,处理器,acceptorExecutor );        }    }, workerCount, isShutdown);}
@Override public  void run() {     try {         while (!isShutdown.get()) {                         // 注释:获取信号量释放batchWorkRequests.release(),返回使命集合列表            List holder = getWork();            指标.registerExpiryTimes(持有人);            List tasks = getTasksOf(holders);                        //注释:将完成使命打包请求Peer节点            ProcessingResult 结果 = processor.process(tasks);            switch (result) {                 case  Success:                     break ;                案例 拥塞:                案例 瞬态错误:                    taskDispatcher.reprocess(持有人,结果);                    打破;                case  PermanentError:                     logger.warn( "由于永世错误而放弃 {} 的 {} 个使命" ,holder.size(), workerName);            }            metrics.registerTaskResult(result, tasks.size());        }    } catch (InterruptedException e) {         // 忽略    } catch (Throwable e) {         // 安全保护,所以我们永世不会以不受控制的方式退出这个循环。        logger.warn( "发现工作线程错误" , e);    }}
@Override public ProcessingResult process(List tasks) {    ReplicationList list = createReplicationListOf(tasks);    try {                 // 注释:这里通过 JerseyReplicationClient 客户端对象直接发送列表请求数据        EurekaHttpResponse response = replicationClient.submitBatchUpdates(list);        int statusCode = response.getStatusCode();        if (!isSuccess(statusCode)) {             if (statusCode == 503 ) {                logger.warn( "Server busy (503) HTTP status code received from peer {};延迟后重新调度使命" , peerId);                返回ProcessingResult.Congestion;            } else {                 // 从服务器返回的不测错误。理想环境下,这应该永世不会发生。                logger.error( "批量更新失败,HTTP 状态码为{};放弃{}复制使命" , statusCode, tasks.size());                返回ProcessingResult.PermanentError;            }        }其他{            handleBatchResponse(tasks, response.getEntity().getResponseList());        }    } catch (Throwable e) {         if (isNetworkConnectException(e)) {            logNetworkErrorSample( null , e);            返回ProcessingResult.TransientError;        }其他{            logger.error( "不重试这个异常,由于它好像不是网络异常" , e);            返回ProcessingResult.PermanentError;        }    }    返回ProcessingResult.Success;}
@覆盖公共EurekaHttpResponse  submitBatchUpdates(ReplicationList replicationList){    ClientResponse 响应 = null ;    试试{        响应 = jerseyApacheClient.resource(serviceUrl)                                // 注释:这才是重点,请求相对路径,peerreplication/batch/                .path(PeerEurekaNode.BATCH_URL_PATH)                .accept(MediaType.APPLICATION_JSON_TYPE)                .type(MediaType.APPLICATION_JSON_TYPE)                .POST(ClientResponse类,replicationList);        如果(isSuccess(response.getStatus())!){            回报anEurekaHttpResponse(response.getStatus(),ReplicationListResponse。类).build();        }        ReplicationListResponse batchResponse = response.getEntity(ReplicationListResponse。类);        返回anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build();    }末了{        如果(响应!=空){            response.close();        }    }}
@Path( "batch" ) @POST public Response batchReplication(ReplicationList replicationList) {     try {        ReplicationListResponse batchResponse = new ReplicationListResponse();                // 实例:这里将收到的使命列表,方法循环解析处理,主要核心在调度方法中。        for (ReplicationInstanceInfo : replicationList.getReplicationList()) {             try {                batchResponse.addResponse(dispatch(instanceInfo));            }捕获(异常 e){                batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null ));                logger.error(instanceInfo.getAction() + "请求处理批处理失败"                         + instanceInfo.getAppName() + '/' + instanceInfo.getId(), e);            }        }        返回Response.ok(batchResponse).build();    } catch (Throwable e) {        logger.error( "无法执行批量请求" , e);        返回Response.status(Status.INTERNAL_SERVER_ERROR).build();    }}
私有ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {    ApplicationResource applicationResource = createApplicationResource(instanceInfo);    InstanceResource 资源 = createInstanceResource(instanceInfo, applicationResource);    String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());    String overriddenStatus = toString(instanceInfo.getOverriddenStatus());    String instanceStatus = toString(instanceInfo.getStatus());    Builder singleResponseBuilder = new Builder();    switch (instanceInfo.getAction()) {         case 注册:            singleResponseBuilder = handleRegister(instanceInfo, applicationResource);            打破;        案例 心跳:            singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);            打破;        案例 取消:            singleResponseBuilder = handleCancel(resource);            打破;        案例状态更新 :            singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);            打破;        案例 删除状态覆盖:            singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);            打破;    }    返回singleResponseBuilder.build();}
private  static Builder handleRegister (ReplicationInstance instanceInfo, ApplicationResource applicationResource)  {         // 注释:private static final String REPLICATION = "true"; 界说一个常量值,而且还是方法描述ApplicationResource.addInstance    applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);    return  new Builder().setStatusCode(Status.OK.getStatusCode());}
EurekaClient启动流程分析

调换运行模式

运行discovery-eureka服务,调试运行provider-user服务,先观察日志先;

2017-10-23  19 :43 :07.688   INFO  1488  ---  [main]  o .s .c .support .DefaultLifecycleProcessor   :在阶段0 中启动 bean  2017-10-23 19 :43 :07.694 INFO --- [main] 1488 ]  .S .C .N .eureka .InstanceInfoFactory        :设置初始实例状态为:STARTING 2017年10月23日19 :43 :07.874                INFO  1488  ---  [主要]   .N .D .provider .DiscoveryJerseyProvider    :使用 JSON 编码 的编解码器 LegacyJacksonJson 2017年10月23日 19 :43 :07.874   INFO  1488  ---  [主要]   .N .D .provider .DiscoveryJerseyProvider    :使用 JSON 解码 编 解码器LegacyJacksonJson 2017-10-23  19 :43 :07.971   INFO  1488  ---  [main]  c.N .D .provider .DiscoveryJerseyProvider    :使用 XML 编码 的编解码器 XStreamXml 2017年10月23日 19 :43 :07.971   INFO  1488  ---  [主要]   .N .D .provider .DiscoveryJerseyProvider    :使用 XML 解码 的编解码器 XStreamXml 2017-10 -23  19 :43 :08.134   INFO  1488  ---  [主要]   .N .D .S .R .aws .ConfigClusterResolver      :解决 尤里卡 端点 经由 配置2017年10月23日 19 :43 :08.344   INFO  1488  ---  [主要]  COM .netflix夏卓.DiscoveryClient     :禁用 增量 属性:假2017年10月23日 19 :43 :08.344   INFO  1488  - --  [main]  com .netflix .discovery .DiscoveryClient     :单一 vip 注册表 革新 属性:空2017年10月23日 19 :43 :08.344   INFO  1488  ---  [主]  COM .netflix夏卓.DiscoveryClient     :部队 全面 注册表 获取:假2017年10月23日 19 :43 :08.344   INFO  1488  ---  [主]  com .netflix .discovery .DiscoveryClient     : Application  is  null : false 2017-10-23  19 :43 :08.344   INFO  1488 ---  [主要]  COM .netflix夏卓.DiscoveryClient     :注册 应用程序 大小 是 零:真2017年10月23日 19 :43 :08.344   INFO  1488  ---  [主要]  COM .netflix夏卓.DiscoveryClient     :应用 版本 是 -1:真2017年10月23日 19 :43 :08.345   INFO  1488  ---  [主要]  COM .netflix夏卓.DiscoveryClient     :获取 的所有 实例 的注册表 信息 从 该 尤里卡 服务器2017年10月23日 19 :43 :08.630   INFO  1488  ---  [主]  COM .netflix夏卓.DiscoveryClient     :该 响应 状态 是 200 2017年10月23日 19 :43 :08.631  信息 1488  ---  [主要]  com .netflix .discovery .DiscoveryClient     :开始 心跳 执行:更新 隔断 是:30 2017年10月23日 19 :43 :08.634   INFO  1488  ---  [主要]   .N夏卓.InstanceInfoReplicator      :InstanceInfoReplicator 按需 更新 答应 速率 每 分钟 是 4 2017年10月23日 19 :43 :08.637  信息 1488  ---  [主要]  com .netflix .discovery .DiscoveryClient     :发现 客户端 初始化 在 时间戳 1508758988637 与 初始 实例 计数:0 2017年10月23日 19 :43 :08.657   INFO  1488  ---  [主要]   .N .E .EurekaDiscoveryClientConfiguration:注册 应用 springms提供商用户 与 尤里卡 与 状态 UP 2017-10-23  19 :43 :08.658   INFO  1488  ---  [主要]  com .netflix夏卓.DiscoveryClient     :锯 本地 状态 变化 事件 StatusChangeEvent  [时间戳= 1508758988658,电流= UP,以前= STARTING] 2017年10月23日 19 :43 :08.659   INFO  1488  ---  [nfoReplicator-0]  COM .netflix夏卓。 DiscoveryClient     : DiscoveryClient_SPRINGMS-PROVIDER-USER / springms-provider-user :192.168.3.101 :7900 :注册 服务... 2017-10-23  19 :43 :08.768   INFO  1488 ---  [main]  s .b .c .e .t .TomcatEmbeddedServletContainer:Tomcat 在端口(s)上启动 :7900 (http) 2017-10-23 19 :43 :08.768 INFO 1488 --- [main] c . n .e .EurekaDiscoveryClientConfiguration:将端口更新为7900 2017-10-23 19 :43 :08.773 INFO 1488 --- [main] c .s .cloud                  .MsProviderUserApplication       :在882 .1秒内启动 ProviderApplication (JVM 运行10.398)  服务提供方主体加载流程

EnableEurekaClient 组件。

@Target (ElementType.TYPE)@Retention (RetentionPolicy.RUNTIME) @Documented @Inherited @EnableDiscoveryClient public @interface EnableEurekaClient {}@EnableEurekaClient

这个注解类自己也使用了注解@EnableDiscoveryClient,那么我们有必要去这个注解类看看。

@target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableDiscoveryClientImportSelector.class)公共@interface EnableDiscoveryClient {}@EnableDiscoveryClient

这个注解类有个比较特殊的 注解@Import ,由此我们料想,这里的大多数逻辑是不是都写 在这个EnableDiscoveryClientImportSelector 类呢?
启用DiscoveryClientImportSelector

@Order(Ordered.LOWEST_PRECEDENCE - 100)公共 类 EnableDiscoveryClientImportSelector                延伸 SpringFactoryImportSelector < EnableDiscoveryClient > {         @覆盖        保护 布尔 的IsEnabled ()  {                返回 新。RelaxedPropertyResolver(getEnvironment())的getProperty(                                 “spring.cloud.discovery.enabled”,Boolean.class , Boolean.TRUE);        }        @Override         protected  boolean  hasDefaultFactory ()  {                 return  true ;        }}EnableDiscoveryClientImportSelector 类继承了 SpringFactoryImportSelector 类,但是重写了一个 isEnabled() 方法,默认值返回 true,为什么会返回 true。

/** *选择并返回了其中的类(ES)的名称应是基于对入口 *在{@link AnnotationMetadata}中的入口@ {@链路配置}类。 */ @Override public String [] selectImports(AnnotationMetadata metadata) {     if (!isEnabled()) {                  return  new  String [ 0 ];        }        AnnotationAttributes 属性 = AnnotationAttributes.fromMap(                        metadata.getAnnotationAttributes( this .annotationClass.getName(), true ));        Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "                         + metadata.getClassName() + " @" + getSimpleName() + "?" );        // 查找所有大概的主动配置类,过滤重复项        List < String > factory = new ArrayList( new LinkedHashSet(SpringFactoriesLoader                        .loadFactoryNames(这个.annotationClass,这个.beanClassLoader)));        if (factories.isEmpty() && !hasDefaultFactory()) {                 throw  new IllegalStateException( "Annotation @" + getSimpleName()                                + " 找到了,但没有实现。你忘记包含一个启动器了吗?" );        }        if (factories.size() > 1 ) {                 // 应该只有一个 DiscoveryClient,但大概不止                一个工厂                log.warn( "More than one implementation" + "of @" + getSimpleName()                                + "(如今依靠@Conditionals 选择一个):" + 工厂);        }        return factory.toArray( new  String [factories.size()]);}EnableDiscoveryClientImportSelector.selectImports

首先通过注解获取一些属性,然后加载了一些类名称,我们进入loadFactoryNames方法看看。

public static  List < String > loadFactoryNames(Class factoryClass, ClassLoader classLoader) {         String factoryClassName = factoryClass.getName();        try {                 // 注释:public static final String FACTORIES_RESOURCE_LOCATION = "META-INF/spring.factories";                 //注释:这个jar包下的一个配置文件                Enumeration urls = (classLoader != null ? classLoader.getResources(FACTORIES_RESOURCE_LOCATION) :                                ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION));                List < String > result = new ArrayList< String >();                而(urls.hasMoreElements()) {                        URL url = urls.nextElement();                        Properties properties = PropertiesLoaderUtils.loadProperties( new UrlResource(url));                        String factoryClassNames = properties.getProperty(factoryClassName);  result.addAll(Arrays.asList(StringUtils.commaDelimitedListToStringArray(factoryClassNames)));                }                返回结果;        }        catch (IOException ex) {                 throw  new IllegalArgumentException( "无法加载 [" + factoryClass.getName() +                                 "] 工厂 [" + FACTORIES_RESOURCE_LOCATION + "]" , ex);        }}加载了一个配置文件,配置文件内里写了呢?打开SpringFactoryImportSelector该文件属于什么jar包的spring.factories文件一看。

# AutoConfiguration org.springframework.boot.autoconfigure.EnableAutoConfiguration= \ org.springframework.cloud.client.CommonsClientAutoConfiguration, \ org.springframework.cloud.client.discovery.noop.NoopDiscoveryClientAutoConfiguration, \ org.springframework.cloud.client.hypermedia.CloudHypermediaAutoConfiguration , \ org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration, \org.springframework.cloud.commons.util.UtilAutoConfiguration# 环境后处理器org.springframework.boot.env.EnvironmentPostProcessor= \ org.springframework.cloud.client.HostInfoEnvironmentPostProcessor都是一些配置后缀的类名,所以这些都是加载的工作堆的配置文件类。
工厂对象内里只有一个类名路径为 org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration 。
EurekaDiscoveryClientConfiguration

@Configuration @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class)@ConditionalOnProperty(值= “eureka.client.enabled”,matchIfMissing =真)@CommonsLog公共 类 EurekaDiscoveryClientConfiguration 器具 SmartLifecycle,有序{         @覆盖        公共无效启动(){                 //仅集如果nonSecurePort端口设置为0和this.port!= 0                 ,如果(这个.port。获得()!= 0 &&这.instanceConfig.getNonSecurePort()== 0){                        这个.instanceConfig.setNonSecurePort(此.port。获得());                }                //初始化只有当nonSecurePort大于0且尚未运行                ,由于containerPortInitializer的//下面                如果(!此.running。获得()&&这个.instanceConfig.getNonSecurePort()> 0){                        也许InitializeClient();                        如果(log.isInfoEnabled()){                                log.info( "注册应用程序" + this .instanceConfig.getAppname()                                                + “带有状态的尤里卡”                                                 +这个.instanceConfig.getInitialStatus());                        }                        这个.applicationInfoManager                                        .setInstanceStatus( this .instanceConfig.getInitialStatus());                        if ( this .healthCheckHandler != null ) {                                 this .eurekaClient.registerHealthCheck( this .healthCheckHandler);                        }                        这个.context.publishEvent(                                        new InstanceRegisteredEvent( this , this .instanceConfig));                        这个.running。设置(真);                }        }}
// ApplicationInfoManager.setInstanceStatus 的方法public  synchronized  void  setInstanceStatus (InstanceStatus status)  { // 打上断点    InstanceStatus prev = instanceInfo.setStatus(status);    if (prev != null ) {         for (StatusChangeListener listener : listeners.values()) {             try {                listener.notify( new StatusChangeEvent(prev, status));            }捕获(异常 e){                logger.warn( "通知监听失败:{}" , listener.getId(), e);            }        }    }}
public  void  registerStatusChangeListener (StatusChangeListener listener)  { // 打上断点    listeners.put(listener.getId(), listener);}
果不方法其然,EurekaDiscoveryClientConfiguration.start被调用了,紧接着this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus())也进入断点,然后在往下走,又进入的
DiscoveryClient.initScheduledTasks 方法中的通知位置处。
DiscoveryClient 通过@Inject 注解过的构造方法。

@进样DiscoveryClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig配置,DiscoveryClientOptionalArgs指定参数时,提供 backupRegistryProvider) {    如果(参数=!空){        此.healthCheckHandlerProvider = args.healthCheckHandlerProvider;        这个.healthCheckCallbackProvider = args.healthCheckCallbackProvider;        这个.eventListeners.addAll(args.getEventListeners());    } else {         this .healthCheckCallbackProvider = null ;        这个.healthCheckHandlerProvider = null ;    }        这个.applicationInfoManager = applicationInfoManager;    InstanceInfo myInfo = applicationInfoManager.getInfo();    客户端配置 = 配置;    staticClientConfig = clientConfig;    运输配置 = config.getTransportConfig();    实例信息 = 我的信息;    如果(我的信息!= null){        appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();    }其他{        logger.warn( "将 instanceInfo 设置为传入的空值" );    }    这个.backupRegistryProvider = backupRegistryProvider;    this .urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);    本地域域应用程序。设置(新应用程序());    fetchRegistryGeneration = new AtomicLong( 0 );    remoteRegionsToFetch = new AtomicReference(clientConfig.fetchRegistryForRemoteRegions());    remoteRegionsRef =新的AtomicReference (remoteRegionsToFetch。得到()==空?空:remoteRegionsToFetch得到().split( “”));    if (config.shouldFetchRegistry()) {         this .registryStalenessMonitor = new ThresholdLevelsMetric( this , METRIC_REGISTRY_PREFIX + "lastUpdateSec_" , new  long []{ 15 L, 30 L, 60 L, 120 L, 240 L, 480 L);    } else {        这个.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;    }    如果(config.shouldRegisterWithEureka()){        此.heartbeatStalenessMonitor =新ThresholdLevelsMetric(此,METRIC_REGISTRATION_PREFIX + “lastHeartbeatSec_” ,新 长[] { 15 L,30 L,60 L,120 L,240 L,480 L});    } else {        这个.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;    }    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {        logger.info( "客户端配置为既不注册也不查询数据。" );        调度程序=空;        heartbeatExecutor = null ;        cacheRefreshExecutor = null ;        eurekaTransport = null ;        instanceRegionChecker = new InstanceRegionChecker( new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());        // 这是一个小本领,答应使用 DiscoveryManager.getInstance() 的现有代码        // 与 DI 的 DiscoveryClient         DiscoveryManager.getInstance().setDiscoveryClient( this );        DiscoveryManager.getInstance().setEurekaClientConfig(config);        initTimestampMs = System.currentTimeMillis();        logger.info( "Discovery Client 在时间戳 {} 初始化,初始实例计数:{}" ,                initTimestampMs,这个.getApplications().size());        返回;  // 不需要设置网络使命,我们就完成了    }    try {                 // 注释:定时使命调度准备        scheduler = Executors.newScheduledThreadPool( 3 ,                 new ThreadFactoryBuilder()                        .setNameFormat( "DiscoveryClient-%d" )                        .setDaemon(真)                        。建造());                // 注释:实例化心跳定时使命线程池        heartbeatExecutor = new ThreadPoolExecutor(                 1 , clientConfig.getHeartbeatExecutorThreadPoolSize(), 0 , TimeUnit.SECONDS,                 new SynchronousQueue(),                 new ThreadFactoryBuilder()                        .setNameFormat( "DiscoveryClient-HeartbeatExecutor-%d" )                        .setDaemon(真)                        。建造()        );  // 使用直接切换                // 注释:实例化革新定时使命线程池        cacheRefreshExecutor = new ThreadPoolExecutor(                 1 ,clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0 , TimeUnit.SECONDS,                 new SynchronousQueue(),                 new ThreadFactoryBuilder()                        .setNameFormat( "DiscoveryClient-CacheRefreshExecutor-%d" )                        .setDaemon(真)                        。建造()        );  // 使用直接切换        eurekaTransport = new EurekaTransport();        scheduleServerEndpointTask(eurekaTransport, args);        azToRegionMapper azToRegionMapper;        如果(clientConfig.shouldUseDnsForFetchingServiceUrls()){            azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);        }其他{            azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);        }        如果(空!= remoteRegionsToFetch。获得()){            azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch。获得().split( “”));        }        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());    } catch (Throwable e) {         throw  new RuntimeException( "初始化DiscoveryClient失败!" , e);    }    if (clientConfig.shouldFetchRegistry() && !fetchRegistry( false )) {        fetchRegistryFromBackup();    }        // 注释:初始化调度使命    initScheduledTasks();    试试{        Monitors.registerObject( this );    } catch (Throwable e) {        logger.warn( "无法注册定时器" , e);    }    // 这是一个小本领,答应使用 DiscoveryManager.getInstance() 的现有代码    // 与 DI 的 DiscoveryClient     DiscoveryManager.getInstance().setDiscoveryClient( this );    DiscoveryManager.getInstance().setEurekaClientConfig(config);    initTimestampMs = System.currentTimeMillis();    logger.info( "Discovery Client 在时间戳 {} 初始化,初始实例计数:{}" ,            initTimestampMs,这个.getApplications().size());}
private  void  initScheduledTasks ()  {     if (clientConfig.shouldFetchRegistry()) {         // 注册表缓存革新定时器                // 注释:隔断时间去拉取服务注册信息,默认时间 30秒        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();                //注释:定时使命,每隔断30秒去拉取一次服务注册信息        scheduler.schedule(                new TimedSupervisorTask(                         "cacheRefresh" ,                        调度程序,                        缓存革新执行器,                        registryFetchIntervalSeconds,                        时间单元.秒,                        expBackOffBound,                        新的缓存革新线程()                ),                registryFetchIntervalSeconds, TimeUnit.SECONDS);    }    if (clientConfig.shouldRegisterWithEureka()) {                 // 注释:隔断发送一次心跳约,默认隔断时间 30 秒        int refreshIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();        logger.info( "启动心跳执行器:" + "更新隔断为:" + refreshIntervalInSecs);        // 心跳定时器                // 注释:定时使命,每隔断 30 秒去想 EurekaServer 发送一次心跳周约        scheduler.schedule(                new TimedSupervisorTask(                         "heartbeat" ,                        调度程序,                        心跳执行器,                        更新IntervalInSecs,                        时间单元.秒,                        expBackOffBound,                        新的心跳线程()                ),                更新IntervalInSecs, TimeUnit.SECONDS);        // InstanceInfo 复制器                // 注释:实例信息复制器,定时革新dataCenterInfo 数据中心信息,默认30instanceInfoReplicator         = new InstanceInfoReplicator(                 this ,                实例信息,                clientConfig.getInstanceInfoReplicationIntervalSeconds(),                2 ); // 突发大小                // 注释:实例化状态变化监听器        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {             @Override             public String getId ()  {                 return  "statusChangeListener" ;            }            @Override             public  void  notify (StatusChangeEvent statusChangeEvent)  {                 if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||                        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {                    // 如果涉及 DOWN,                    则以告诫级别记录logger.warn( "Saw local status change event {}" , statusChangeEvent);                }其他{                    logger.info( "看到本地状态变化事件{}" , statusChangeEvent);                }                                // 状态有变化的话,会说这个方法                instanceInfoReplicator.onDemandUpdate();            }        };                // 注释:注册状态变化监听器        if (clientConfig.shouldOnDemandUpdateStatusChange()) {            applicationInfoManager.registerStatusChangeListener(statusChangeListener);        }        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());    }其他{        logger.info( "没有在每个配置中注册尤里卡服务器" );    }}
public  boolean  onDemandUpdate ()  {     if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {        scheduler.submit( new Runnable() {             @Override             public  void  run ()  {                logger.debug( "执行本地InstanceInfo的按需更新" );                未来latestPeriodic = schedulePeriodicRef.get();                if (latestPeriodic != null && !latestPeriodic.isDone()) {                    logger.debug( "取消最新的预定更新,按需更新结束时重新调度" );                    latestPeriodic.cancel( false );                }                                // 注释:这里进行了实例信息革新和注册                InstanceInfoReplicator。这个.run();            }        });        返回 真;    }其他{        logger.warn( "由于速率限制器而忽略按需更新" );        返回 假;    }}
公共 无效运行(){    尝试{        discoveryClient.refreshInstanceInfo();        长dirtyTimestamp = instanceInfo.isDirtyWithTime();        if (dirtyTimestamp != null ) {            discoveryClient.register();            instanceInfo.unsetIsDirty(dirtyTimestamp);        }    }捕捉(可扔的 t){        logger.warn( "实例信息复制器出现问题" , t);    }末了{        Future next = scheduler.schedule( this , replicationIntervalSeconds, TimeUnit.SECONDS);        schedulePeriodicRef.set( next );    }}
布尔 寄存器() 抛出Throwable {    logger.info(PREFIX + appPathIdentifier + ": 注册服务..." );    EurekaHttpResponse httpResponse;    试试{        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);    }捕获(异常 e){        logger.warn( "{} - 注册失败 {}" , PREFIX + appPathIdentifier, e.getMessage(), e);        扔e;    }    如果(logger.isInfoEnabled()){        logger.info( "{} - 注册状态:{}" , PREFIX + appPathIdentifier, httpResponse.getStatusCode());    }    返回httpResponse.getStatusCode() == 204 ;}
@覆盖公共EurekaHttpResponse 寄存器(InstanceInfo信息){    String urlPath = "apps/" + info.getAppName();    ClientResponse 响应 = null ;    试试{        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();        addExtraHeaders(resourceBuilder);        响应 = 资源生成器                .header( "Accept-Encoding" , "gzip" )                .type(MediaType.APPLICATION_JSON_TYPE)                .accept(MediaType.APPLICATION_JSON)                                //注释:打包带上当前应用的所有信息的信息                .POST(ClientResponse类,信息);        返回anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();    }末了{        如果(logger.isDebugEnabled()) {            logger.debug( "Jersey HTTP POST {}/{} with instance {}; statusCode={}" , serviceUrl, urlPath, info.getId(),                    响应 ==空?"N/A" : response.getStatus());        }        如果(响应!= null){            response.close();        }    }}





欢迎光临 创意电子 (https://wxcydz.cc/) Powered by Discuz! X3.4