创意电子

标题: 重磅!Flink源码解析环境准备及提交换程之任务提交换程 [打印本页]

作者: 尚硅谷教育    时间: 2021-9-29 09:59
标题: 重磅!Flink源码解析环境准备及提交换程之任务提交换程
Flink Yarn-per-job模式提交换程如图所示:

                               
登录/注册后可看大图

1、启动Yarn客户端

AbstractJobClusterExecutor.java
public CompletableFuture execute(@Nonnull finalPipeline pipeline, @Nonnull final Configuration configuration) throws Exception{         final JobGraph jobGraph =ExecutorUtils.getJobGraph(pipeline, configuration);         // 创建并启动yarn客户端         try (finalClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)){                  final ExecutionConfigAccessorconfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);                  // 获取集群配置参数                  final ClusterSpecificationclusterSpecification = clusterClientFactory.getClusterSpecification(configuration);                  // 部署集群                  final ClusterClientProvider clusterClientProvider =clusterDescriptor                                   .deployJobCluster(clusterSpecification,jobGraph, configAccessor.getDetachedMode());                  LOG.info("Job has beensubmitted with JobID " + jobGraph.getJobID());                   return CompletableFuture.completedFuture(                                   newClusterClientJobClientAdapter(clusterClientProvider,jobGraph.getJobID()));         }}YarnClusterClientFactory.java
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration){... ...         return getClusterDescriptor(configuration);} private YarnClusterDescriptor getClusterDescriptor(Configurationconfiguration) {         final YarnClient yarnClient = YarnClient.createYarnClient();         final YarnConfigurationyarnConfiguration = new YarnConfiguration();          yarnClient.init(yarnConfiguration);         yarnClient.start();          return new YarnClusterDescriptor(                          configuration,                          yarnConfiguration,                          yarnClient,                          YarnClientYarnClusterInformationRetriever.create(yarnClient),                          false);}
2、获取集群配置参数

AbstractContainerizedClusterClientFactory.java
public ClusterSpecification getClusterSpecification(Configurationconfiguration) {... ...         final int jobManagerMemoryMB =JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(                  configuration,                  JobManagerOptions.TOTAL_PROCESS_MEMORY)         .getTotalProcessMemorySize()         .getMebiBytes();          final int taskManagerMemoryMB = TaskExecutorProcessUtils.processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(                          configuration,TaskManagerOptions.TOTAL_PROCESS_MEMORY))                  .getTotalProcessMemorySize()                  .getMebiBytes();          int slotsPerTaskManager =configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);          return newClusterSpecification.ClusterSpecificationBuilder()                  .setMasterMemoryMB(jobManagerMemoryMB)                  .setTaskManagerMemoryMB(taskManagerMemoryMB)                  .setSlotsPerTaskManager(slotsPerTaskManager)                  .createClusterSpecification();}
3、部署集群

YarnClusterDescriptor.java
public ClusterClientProvider deployJobCluster(         ClusterSpecification clusterSpecification,         JobGraph jobGraph,         boolean detached) throws ClusterDeploymentException{         try {                  return deployInternal(                          clusterSpecification,                          "Flink per-jobcluster",                          getYarnJobClusterEntrypoint(),  //获取YarnJobClusterEntrypoint,启动AM的入口                          jobGraph,                          detached);         } catch (Exception e) {                  throw new ClusterDeploymentException("Couldnot deploy Yarn job cluster.", e);         }}上传 jar 包和配置文件到 HDFS
YarnClusterDescriptor.java
private ClusterClientProvider deployInternal(                  ClusterSpecificationclusterSpecification,                  String applicationName,                  String yarnClusterEntrypoint,                  @Nullable JobGraph jobGraph,                  boolean detached) throwsException {... ...// 创建应用         final YarnClientApplicationyarnApplication = yarnClient.createApplication();... ...         ApplicationReport report = startAppMaster(                          flinkConfiguration,                          applicationName,                          yarnClusterEntrypoint,                          jobGraph,                          yarnClient,                          yarnApplication,                          validClusterSpecification);... ...}private ApplicationReport startAppMaster(                  Configuration configuration,                  String applicationName,                  String yarnClusterEntrypoint,                  JobGraph jobGraph,                  YarnClient yarnClient,                  YarnClientApplicationyarnApplication,                  ClusterSpecificationclusterSpecification) throws Exception {... ...         // 初始化文件系统(HDFS)         final FileSystem fs = FileSystem.get(yarnConfiguration);... ...ApplicationSubmissionContextappContext =yarnApplication.getApplicationSubmissionContext(); final List providedLibDirs = getRemoteSharedPaths(configuration);// 上传文件的工具类final YarnApplicationFileUploader fileUploader=YarnApplicationFileUploader.from(         fs,         fs.getHomeDirectory(),         providedLibDirs,         appContext.getApplicationId(),         getFileReplication());... ...         final ApplicationId appId =appContext.getApplicationId();... ...         if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)){                  // yarn重试次数,默认2                  appContext.setMaxAppAttempts(                                   configuration.getInteger(                                                     YarnConfigOptions.APPLICATION_ATTEMPTS.key(),                                                     YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));                   activateHighAvailabilitySupport(appContext);         } else {                  //不是高可用重试次数为1                  appContext.setMaxAppAttempts(                                   configuration.getInteger(                                                     YarnConfigOptions.APPLICATION_ATTEMPTS.key(),                                                     1));         }... ...                 // 多次调用上传HDFS的方法,分别是:         // => systemShipFiles:日志的配置文件、lib/目录下除了dist的jar包         // => shipOnlyFiles:plugins/目录下的文件         // => userJarFiles:用户代码的jar包fileUploader.registerMultipleLocalResources (... ...);... ...         // 上传和配置ApplicationMaster的jar包:flink-dist*.jar         final YarnLocalResourceDescriptorlocalResourceDescFlinkJar = fileUploader.uploadFlinkDist(flinkJarPath);... ...//fileUploader.registerSingleLocalResource(                                            jobGraphFilename,                                            newPath(tmpJobGraphFile.toURI()),                                            "",                                            true,                                            false);... ...         // 上传flink配置文件         String flinkConfigKey ="flink-conf.yaml";         Path remotePathConf = setupSingleLocalResource(                  flinkConfigKey,                  fs,                  appId,                  newPath(tmpConfigurationFile.getAbsolutePath()),                  localResources,                  homeDir,                  "");... ...         // 将JobGraph写入tmp文件并添加到本地资源,并上传到HDFS         fileUploader.registerSingleLocalResource(                  jobGraphFilename,                  newPath(tmpJobGraphFile.toURI()),                  "",                  true,                  false);... ...// 上传flink配置文件String flinkConfigKey = "flink-conf.yaml";fileUploader.registerSingleLocalResource(         flinkConfigKey,         newPath(tmpConfigurationFile.getAbsolutePath()),         "",         true,         true);... ...final JobManagerProcessSpec processSpec =JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(                          flinkConfiguration,                          JobManagerOptions.TOTAL_PROCESS_MEMORY);         //封装启动AM container的Java命令         final ContainerLaunchContext amContainer = setupApplicationMasterContainer(                          yarnClusterEntrypoint,                          hasKrb5,                          processSpec);... ... appContext.setApplicationName(customApplicationName);appContext.setApplicationType(applicationType!= null ? applicationType : "Apache Flink");appContext.setAMContainerSpec(amContainer);appContext.setResource(capability);... ...         yarnClient.submitApplication(appContext);... ... }封装 AM 参数和命令
YarnClusterDescriptor.java
ContainerLaunchContext setupApplicationMasterContainer(                  String yarnClusterEntrypoint,                  boolean hasKrb5,                  JobManagerProcessSpec processSpec) {                 // respect custom JVM options in theYAML file         String javaOpts =flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);         if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length()> 0) {                  javaOpts += " " +flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);         }         //applicable only for YarnMiniClustersecure test run         //krb5.conf file will be available aslocal resource in JM/TM container         if (hasKrb5) {                  javaOpts += "-Djava.security.krb5.conf=krb5.conf";         }          // 创建AM的容器启动上下文         ContainerLaunchContext amContainer =Records.newRecord(ContainerLaunchContext.class);          final Map startCommandValues = new HashMap();         startCommandValues.put("java","$JAVA_HOME/bin/java");          String jvmHeapMem =JobManagerProcessUtils.generateJvmParametersStr(processSpec,flinkConfiguration);         startCommandValues.put("jvmmem", jvmHeapMem);          startCommandValues.put("jvmopts", javaOpts);         startCommandValues.put("logging",YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));          startCommandValues.put("class",yarnClusterEntrypoint);         startCommandValues.put("redirects",                  "1> " +ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +                  "2> " +ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err");         startCommandValues.put("args", "");          final String commandTemplate =flinkConfiguration                          .getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,                                            ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);         final String amCommand =                  BootstrapTools.getStartCommand(commandTemplate,startCommandValues);          amContainer.setCommands(Collections.singletonList(amCommand));          LOG.debug("Application Masterstart command: " + amCommand);          return amContainer;}封装 AM 参数:
private ApplicationReport startAppMaster(                  Configuration configuration,                  String applicationName,                  String yarnClusterEntrypoint,                  JobGraph jobGraph,                  YarnClient yarnClient,                  YarnClientApplicationyarnApplication,                  ClusterSpecificationclusterSpecification) throws Exception {                   ... ...                  final ContainerLaunchContext amContainer = setupApplicationMasterContainer(                                   yarnClusterEntrypoint,                                   hasKrb5,                                   processSpec);                  ... ...                  // 封装AM 的classpath和环境参数                  final Map appMasterEnv = new HashMap();                  // set user specified appmaster environment variables                  appMasterEnv.putAll(                  ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX,configuration));                  // set Flink app class path                  appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH,classPathBuilder.toString());                   // set Flink on YARN internalconfiguration values                  appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR,localResourceDescFlinkJar.toString());                  appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());                  appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR,fileUploader.getHomeDir().toString());                  appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, encodeYarnLocalResourceDescriptorListToString(fileUploader.getEnvShipResourceList()));                  appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());                  appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES,fileUploader.getApplicationDir().toUri().toString());                   //https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name                  appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME,UserGroupInformation.getCurrentUser().getUserName());                   if (localizedKeytabPath !=null) {                          appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);                          String principal =configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);                          appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);                          if (remotePathKeytab!= null) {                                   appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH,remotePathKeytab.toString());                          }                  }                   //To support Yarn SecureIntegration Test Scenario                  if (remoteYarnSiteXmlPath !=null) {                          appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH,remoteYarnSiteXmlPath.toString());                  }                  if (remoteKrb5Path != null) {                          appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());                  }                   // set classpath from YARNconfiguration                  Utils.setupYarnClassPath(yarnConfiguration,appMasterEnv);                             //设置 AM 参数                  amContainer.setEnvironment(appMasterEnv);                  ... ...                  yarnClient.submitApplication(appContext);... ...  }
4、提交应用

YarnClientImpl.java
public ApplicationId submitApplication(ApplicationSubmissionContextappContext) throws YarnException, IOException {    ApplicationId applicationId =appContext.getApplicationId();    ... ...    SubmitApplicationRequest request =       Records.newRecord(SubmitApplicationRequest.class);   request.setApplicationSubmissionContext(appContext);     //TODO: YARN-1763:Handle RM failoversduring the submitApplication call.rmClient.submitApplication(request);... ...}ApplicationClientProtocolPBClientImpl.java
public SubmitApplicationResponse submitApplication(    SubmitApplicationRequest request) throwsYarnException,IOException {//取出报文    SubmitApplicationRequestProto requestProto =        ((SubmitApplicationRequestPBImpl)request).getProto();    //将报文发送发送到服务端,并将返回结果构成response    try {      return newSubmitApplicationResponsePBImpl(proxy.submitApplication(null,        requestProto));    } catch (ServiceException e) {      RPCUtil.unwrapAndThrowException(e);      return null;    }}ApplicationClientProtocolPBServiceImpl.java
public SubmitApplicationResponseProto submitApplication(RpcController arg0,SubmitApplicationRequestProto proto) throws ServiceException {//服务端重新构建报文    SubmitApplicationRequestPBImpl request = newSubmitApplicationRequestPBImpl(proto);    ......     SubmitApplicationResponse response = real.submitApplication(request);    return((SubmitApplicationResponsePBImpl)response).getProto();    ......}ClientRMService.java
public SubmitApplicationResponse submitApplication(SubmitApplicationRequestrequest) throws YarnException {         ... ...         //将应用请求提交到Yarn上的RMAppManager去提交使命         this.rmAppManager.submitApplication(submissionContext,System.currentTimeMillis(), user);         ... ...}
5、创建Dispatcher、ResourceManager

Per-job模式的AM container加载运行入口是YarnJobClusterEntryPoint中的main()方法
YarnJobClusterEntrypoint.java
public staticvoid main(String[] args) {         ... ...         Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory,env);          YarnJobClusterEntrypoint yarnJobClusterEntrypoint = newYarnJobClusterEntrypoint(configuration);          ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);}ClusterEntrypoint.java
private void runCluster(Configuration configuration,PluginManager pluginManager) throws Exception {         synchronized (lock) {                  initializeServices(configuration,pluginManager);                  ... ...                  //1、创建dispatcher、ResourceManager对象的工厂类                   //       此中有从本地重新创建JobGraph的过程                  finalDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory= createDispatcherResourceManagerComponentFactory(configuration);                  //2、通过工厂类创建dispatcher、ResourceManager对象                  //   Entry 启动RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等                  clusterComponent = dispatcherResourceManagerComponentFactory.create(                          configuration,                          ioExecutor,                          commonRpcService,                          haServices,                          blobServer,                          heartbeatServices,                          metricRegistry,                          archivedExecutionGraphStore,                         newRpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),                          this);                  ... ...         }}DefaultDispatcherResourceManagerComponentFactory.java
public DispatcherResourceManagerComponent create(                  Configuration configuration,                  Executor ioExecutor,                  RpcService rpcService,                  HighAvailabilityServices highAvailabilityServices,                  BlobServer blobServer,                  HeartbeatServices heartbeatServices,                  MetricRegistry metricRegistry,                  ArchivedExecutionGraphStore archivedExecutionGraphStore,                  MetricQueryServiceRetriever metricQueryServiceRetriever,                  FatalErrorHandler fatalErrorHandler) throws Exception {          LeaderRetrievalService dispatcherLeaderRetrievalService = null;         LeaderRetrievalService resourceManagerRetrievalService = null;         WebMonitorEndpoint webMonitorEndpoint = null;         ResourceManager resourceManager = null;         ResourceManagerMetricGroup resourceManagerMetricGroup = null;         DispatcherRunner dispatcherRunner =null;          try {                  dispatcherLeaderRetrievalService =highAvailabilityServices.getDispatcherLeaderRetriever();                   resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();                   final LeaderGatewayRetriever dispatcherGatewayRetriever = newRpcGatewayRetriever(                          rpcService,                          DispatcherGateway.class,                          DispatcherId::fromUuid,                          10,                          Time.milliseconds(50L));                   final LeaderGatewayRetriever resourceManagerGatewayRetriever = newRpcGatewayRetriever(                          rpcService,                          ResourceManagerGateway.class,                          ResourceManagerId::fromUuid,                          10,                          Time.milliseconds(50L));                   ... ...                                   // 创建吸收前端Rest请求的节点                  webMonitorEndpoint =restEndpointFactory.createRestEndpoint(                          configuration,                          dispatcherGatewayRetriever,                          resourceManagerGatewayRetriever,                          blobServer,                          executor,                          metricFetcher,                          highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),                          fatalErrorHandler);                   log.debug("StartingDispatcher REST endpoint.");                  webMonitorEndpoint.start();                   ... ...                  // 创建ResourceManager对象,返回的是new YarnResourceManager        // 调理过程:AbstractDispatcherResourceManagerComponentFactory        //                 -> ActiveResourceManagerFactory        //                 -> YarnResourceManagerFactory                  resourceManager = resourceManagerFactory.createResourceManager(                          configuration,                          ResourceID.generate(),                          rpcService,                          highAvailabilityServices,                          heartbeatServices,                          fatalErrorHandler,                          newClusterInformation(hostname, blobServer.getPort()),                          webMonitorEndpoint.getRestBaseUrl(),                          resourceManagerMetricGroup);                   ... ...                   // 创建dispatcherRunner对象并启动                  log.debug("StartingDispatcher.");                  dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(                          highAvailabilityServices.getDispatcherLeaderElectionService(),                          fatalErrorHandler,                          newHaServicesJobGraphStoreFactory(highAvailabilityServices),                          ioExecutor,                          rpcService,                          partialDispatcherServices);                   // 启动ResourceManager                  log.debug("StartingResourceManager.");                  resourceManager.start();                   resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);                  dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);                   return newDispatcherResourceManagerComponent(                          dispatcherRunner,                          resourceManager,                          dispatcherLeaderRetrievalService,                          resourceManagerRetrievalService,                          webMonitorEndpoint);          }         ... ...}创建 YarnResourceManager
ResourceManagerFactory.java
public ResourceManager createResourceManager(                  Configuration configuration,                  ResourceID resourceId,                  RpcService rpcService,                  HighAvailabilityServices highAvailabilityServices,                  HeartbeatServices heartbeatServices,                  FatalErrorHandler fatalErrorHandler,                  ClusterInformation clusterInformation,                  @Nullable String webInterfaceUrl,                  MetricRegistry metricRegistry,                  String hostname) throwsException {          final ResourceManagerMetricGroup resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry,hostname);         final SlotManagerMetricGroup slotManagerMetricGroup = SlotManagerMetricGroup.create(metricRegistry,hostname);          final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices(                  configuration, rpcService,highAvailabilityServices, slotManagerMetricGroup);          return createResourceManager(                  configuration,                  resourceId,                  rpcService,                  highAvailabilityServices,                  heartbeatServices,                  fatalErrorHandler,                  clusterInformation,                  webInterfaceUrl,                  resourceManagerMetricGroup,                  resourceManagerRuntimeServices);}YarnResourceManagerFactory.java
public ResourceManager createResourceManager(                  Configuration configuration,                  ResourceID resourceId,                  RpcService rpcService,                  HighAvailabilityServices highAvailabilityServices,                  HeartbeatServices heartbeatServices,                  FatalErrorHandler fatalErrorHandler,                  ClusterInformation clusterInformation,                  @Nullable String webInterfaceUrl,                  ResourceManagerMetricGroup resourceManagerMetricGroup,                  ResourceManagerRuntimeServices resourceManagerRuntimeServices) {          return new YarnResourceManager(                  rpcService,                  resourceId,                  configuration,                  System.getenv(),                  highAvailabilityServices,                  heartbeatServices,                  resourceManagerRuntimeServices.getSlotManager(),                  ResourceManagerPartitionTrackerImpl::new,                  resourceManagerRuntimeServices.getJobLeaderIdService(),                  clusterInformation,                  fatalErrorHandler,                  webInterfaceUrl,                  resourceManagerMetricGroup);}创建YarnResourceManager时,创建了SlotManager
ResourceManagerFactory.java
private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(                  Configuration configuration,                  RpcService rpcService,                  HighAvailabilityServices highAvailabilityServices,                  SlotManagerMetricGroup slotManagerMetricGroup) throws ConfigurationException {          return ResourceManagerRuntimeServices.fromConfiguration(                  createResourceManagerRuntimeServicesConfiguration(configuration),                  highAvailabilityServices,                  rpcService.getScheduledExecutor(),                  slotManagerMetricGroup);}ResourceManagerRuntimeServices.java
public static ResourceManagerRuntimeServices fromConfiguration(                  ResourceManagerRuntimeServicesConfiguration configuration,                  HighAvailabilityServices highAvailabilityServices,                  ScheduledExecutor scheduledExecutor,                  SlotManagerMetricGroup slotManagerMetricGroup) {          final SlotManager slotManager = createSlotManager(configuration,scheduledExecutor, slotManagerMetricGroup);          final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(                  highAvailabilityServices,                  scheduledExecutor,                  configuration.getJobTimeout());          return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);}创建并启动 Dispatcher
DefaultDispatcherRunnerFactory.java
public DispatcherRunner createDispatcherRunner(                  LeaderElectionService leaderElectionService,                  FatalErrorHandler fatalErrorHandler,                  JobGraphStoreFactory jobGraphStoreFactory,                  Executor ioExecutor,                  RpcService rpcService,                  PartialDispatcherServices partialDispatcherServices) throws Exception {          final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =dispatcherLeaderProcessFactoryFactory.createFactory(                  jobGraphStoreFactory,                  ioExecutor,                  rpcService,                  partialDispatcherServices,                  fatalErrorHandler);          return DefaultDispatcherRunner.create(                  leaderElectionService,                  fatalErrorHandler,                  dispatcherLeaderProcessFactory);}DefaultDispatcherRunner.java
public static DispatcherRunner create(                  LeaderElectionService leaderElectionService,                  FatalErrorHandler fatalErrorHandler,                  DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception {         final DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner(                  leaderElectionService,                  fatalErrorHandler,                  dispatcherLeaderProcessFactory);         return DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner,leaderElectionService);}DispatcherRunnerLeaderElectionLifecycleManager.java
public static  DispatcherRunner createFor(T dispatcherRunner,LeaderElectionService leaderElectionService) throws Exception {         return new DispatcherRunnerLeaderElectionLifecycleManager(dispatcherRunner,leaderElectionService);} private DispatcherRunnerLeaderElectionLifecycleManager(TdispatcherRunner, LeaderElectionService leaderElectionService) throws Exception{         this.dispatcherRunner =dispatcherRunner;         this.leaderElectionService =leaderElectionService;         // 启动dispacher的leader选举         leaderElectionService.start(dispatcherRunner);}StandaloneLeaderElectionService.java
public void start(LeaderContender newContender) throws Exception {         ... ...         contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);}DefaultDispatcherRunner.java
public void grantLeadership(UUID leaderSessionID) {         runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));}private void startNewDispatcherLeaderProcess(UUIDleaderSessionID) {         ... ...         previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));}AbstractDispatcherLeaderProcess.java
public final void start() {         runIfStateIs(                  State.CREATED,                  this::startInternal);}private void startInternal() {         log.info("Start {}.",getClass().getSimpleName());         state = State.RUNNING;         onStart();}JobDispatcherLeaderProcess.java
protected void onStart() {         final DispatcherGatewayServicedispatcherService = dispatcherGatewayServiceFactory.create(                  DispatcherId.fromUuid(getLeaderSessionId()),                  Collections.singleton(jobGraph),                  ThrowingJobGraphWriter.INSTANCE);          completeDispatcherSetup(dispatcherService);}public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(                  DispatcherId fencingToken,                  Collection recoveredJobs,                  JobGraphWriter jobGraphWriter){         ... ...         // 启动dispacher         dispatcher.start();         ... ...}启动 ResourceManager
DefaultDispatcherResourceManagerComponentFactory.java
public DispatcherResourceManagerComponent create(                  Configuration configuration,                  Executor ioExecutor,                  RpcService rpcService,                  HighAvailabilityServices highAvailabilityServices,                  BlobServer blobServer,                  HeartbeatServices heartbeatServices,                  MetricRegistry metricRegistry,                  ArchivedExecutionGraphStore archivedExecutionGraphStore,                  MetricQueryServiceRetriever metricQueryServiceRetriever,                  FatalErrorHandler fatalErrorHandler) throws Exception {         ... ...                  // 启动ResourceManager                  log.debug("StartingResourceManager.");                  resourceManager.start();         ... ...}ResourceManager.java
public void onStart() throws Exception {         ... ...                  startResourceManagerServices();         ... ...}private void startResourceManagerServices() throwsException {         try {                  leaderElectionService =highAvailabilityServices.getResourceManagerLeaderElectionService();                   initialize();                   leaderElectionService.start(this);                  jobLeaderIdService.start(new JobLeaderIdActionsImpl());                   registerTaskExecutorMetrics();         } catch (Exception e) {                  handleStartResourceManagerServicesException(e);         }}
6、Dispatcher启动JobManager

Dispatcher.java
public void onStart() throws Exception {         try {                  // 启动Dispatcher                  startDispatcherServices();         }         ... ...         // 启动Job         startRecoveredJobs();         ... ...}Dispatcher.java
CompletableFuture createJobManagerRunner(JobGraphjobGraph, long initializationTimestamp) {         final RpcService rpcService =getRpcService();         return CompletableFuture.supplyAsync(                  () -> {                          try {                                   JobManagerRunnerrunner = jobManagerRunnerFactory.createJobManagerRunner(                                            jobGraph,                                            configuration,                                            rpcService,                                            highAvailabilityServices,                                            heartbeatServices,                                            jobManagerSharedServices,                                            newDefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),                                            fatalErrorHandler,                                            initializationTimestamp);                                   // 启动 JobManagerRunner                                   runner.start();                                   return runner;                          }                  ......}JobManagerRunnerImpl.java
public void start() throws Exception {         try {                  leaderElectionService.start(this);         } catch (Exception e) {                  log.error("Could notstart the JobManager because the leader election service did not start.",e);                  throw new Exception("Couldnot start the leader election service.", e);         }}StandaloneLeaderElectionService.java
public void start(LeaderContendernewContender) throws Exception {         ... ...         contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);}JobManagerRunnerImpl.java
public void grantLeadership(final UUID leaderSessionID){         synchronized (lock) {                  if (shutdown) {                          log.debug("JobManagerRunnercannot be granted leadership because it is already shut down.");                          return;                  }                   leadershipOperation =leadershipOperation.thenCompose(                          (ignored) -> {                                   synchronized(lock) {                                            // 校验作业的调理状态然后启动作业管理器                                            returnverifyJobSchedulingStatusAndStartJobManager(leaderSessionID);                                   }                          });                   handleException(leadershipOperation,"Could not start the job manager.");         }}private CompletableFuture verifyJobSchedulingStatusAndStartJobManager(UUIDleaderSessionId) {         final CompletableFuture jobSchedulingStatusFuture = getJobSchedulingStatus();          return jobSchedulingStatusFuture.thenCompose(                  jobSchedulingStatus -> {                          if(jobSchedulingStatus == JobSchedulingStatus.DONE) {                                   returnjobAlreadyDone();                          } else {                                   return startJobMaster(leaderSessionId);                          }                  });}private CompletionStage startJobMaster(UUIDleaderSessionId) {         ... ...                  startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));         ... ...}JobMaster.java
public CompletableFuture start(finalJobMasterId newJobMasterId) throws Exception {         // make sure we receive RPC and asynccalls         start();          return callAsyncWithoutFencing(() ->startJobExecution(newJobMasterId),RpcUtils.INF_TIMEOUT);}private Acknowledge startJobExecution(JobMasterId newJobMasterId)throws Exception {... ...// 启动JobMaster         startJobMasterServices();          log.info("Starting execution ofjob {} ({}) under job master id {}.", jobGraph.getName(),jobGraph.getJobID(), newJobMasterId);         // 重置开始调理         resetAndStartScheduler();... ...}
7、ResourceManager启动SlotManager

ResourceManager.java
public final void onStart() throws Exception {         ... ...                  startResourceManagerServices();         ... ...}private void startResourceManagerServices() throwsException {         try {                  leaderElectionService =highAvailabilityServices.getResourceManagerLeaderElectionService();                   initialize();                   leaderElectionService.start(this);                  jobLeaderIdService.start(newJobLeaderIdActionsImpl());                   registerTaskExecutorMetrics();         } catch (Exception e) {                  handleStartResourceManagerServicesException(e);         }}创建 Yarn 的 RM 和 NM 客户端
ActiveResourceManager.java
protected void initialize() throws ResourceManagerException{         try {                  resourceManagerDriver.initialize(                                   this,                                   newGatewayMainThreadExecutor(),                                   ioExecutor);         } catch (Exception e) {                  throw newResourceManagerException("Cannot initialize resource provider.", e);         }}AbstractResourceManagerDriver.java
public final void initialize(                  ResourceEventHandler resourceEventHandler,                  ScheduledExecutor mainThreadExecutor,                  Executor ioExecutor) throwsException {         this.resourceEventHandler =Preconditions.checkNotNull(resourceEventHandler);         this.mainThreadExecutor =Preconditions.checkNotNull(mainThreadExecutor);         this.ioExecutor =Preconditions.checkNotNull(ioExecutor);          initializeInternal();}YarnResourceManagerDriver.java
protected void initializeInternal() throws Exception {         final YarnContainerEventHandler yarnContainerEventHandler = new YarnContainerEventHandler();         try {                  // 创建和启动yarn的resourcemanager客户端                  resourceManagerClient =yarnResourceManagerClientFactory.createResourceManagerClient(                          yarnHeartbeatIntervalMillis,                          yarnContainerEventHandler);                  resourceManagerClient.init(yarnConfig);                  resourceManagerClient.start();                   final RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster();                  getContainersFromPreviousAttempts(registerApplicationMasterResponse);                  taskExecutorProcessSpecContainerResourcePriorityAdapter=                          newTaskExecutorProcessSpecContainerResourcePriorityAdapter(                                   registerApplicationMasterResponse.getMaximumResourceCapability(),                                   ExternalResourceUtils.getExternalResources(flinkConfig,YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));         } catch (Exception e) {                  throw newResourceManagerException("Could not start resource manager client.",e);         }                 // 创建和启动yarn的nodemanager客户端         nodeManagerClient =yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);         nodeManagerClient.init(yarnConfig);         nodeManagerClient.start();}启动 SlotManager
StandaloneLeaderElectionService.java
private void startServicesOnLeadership() {         startHeartbeatServices();          slotManager.start(getFencingToken(),getMainThreadExecutor(), new ResourceActionsImpl());          onLeadership();}SlotManagerImpl.java
public void start(ResourceManagerId newResourceManagerId,Executor newMainThreadExecutor, ResourceActions newResourceActions) {         LOG.info("Starting theSlotManager.");          this.resourceManagerId =Preconditions.checkNotNull(newResourceManagerId);         mainThreadExecutor =Preconditions.checkNotNull(newMainThreadExecutor);         resourceActions =Preconditions.checkNotNull(newResourceActions);          started = true;          taskManagerTimeoutsAndRedundancyCheck =scheduledExecutor.scheduleWithFixedDelay(                  () ->mainThreadExecutor.execute(                          () -> checkTaskManagerTimeoutsAndRedundancy()),                  0L,                  taskManagerTimeout.toMilliseconds(),                  TimeUnit.MILLISECONDS);          slotRequestTimeoutCheck =scheduledExecutor.scheduleWithFixedDelay(                  () ->mainThreadExecutor.execute(                          () -> checkSlotRequestTimeouts()),                  0L,                  slotRequestTimeout.toMilliseconds(),                  TimeUnit.MILLISECONDS);          registerSlotManagerMetrics();}void checkTaskManagerTimeoutsAndRedundancy() {         if (!taskManagerRegistrations.isEmpty()){                  long currentTime =System.currentTimeMillis();                   ArrayListtimedOutTaskManagers = new ArrayList(taskManagerRegistrations.size());                   // first retrieve the timedout TaskManagers                  for (TaskManagerRegistrationtaskManagerRegistration : taskManagerRegistrations.values()) {                          if (currentTime -taskManagerRegistration.getIdleSince() >=taskManagerTimeout.toMilliseconds()) {                                   // we collectthe instance ids first in order to avoid concurrent modifications by the                                   //ResourceActions.releaseResource call                                   timedOutTaskManagers.add(taskManagerRegistration);                          }                  }                                   int slotsDiff =redundantTaskManagerNum * numSlotsPerWorker - freeSlots.size();                  if (freeSlots.size() == slots.size()){                          // No need tokeep redundant taskManagers if no job is running.                          // 假如没有job在运行,开释taskmanager                          releaseTaskExecutors(timedOutTaskManagers,timedOutTaskManagers.size());                  } else if (slotsDiff > 0) {                          // Keep enoughredundant taskManagers from time to time.            // 保证随时有富足的taskmanager                          intrequiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, numSlotsPerWorker);                          allocateRedundantTaskManagers(requiredTaskManagers);                  } else {                          // second we triggerthe release resource callback which can decide upon the resource release                          int maxReleaseNum =(-slotsDiff) / numSlotsPerWorker;                          releaseTaskExecutors(timedOutTaskManagers,Math.min(maxReleaseNum, timedOutTaskManagers.size()));                  }         }}
8、JobManager申请Slot

启动 SlotPool
接6,JobMaster启动时,启动SlotPool,向ResourceManager注册
private void startJobMasterServices() throwsException {         // 启动心跳服务         startHeartbeatServices();          // 启动slotPool         slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());          // 连接到之前已知的ResourceManager         reconnectToResourceManager(newFlinkException("Starting JobMaster component."));          // 启动后slotpool开始向slot manager请求slot         resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());}向 ResourceManager 注册
颠末下面层层调用:
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
-> notifyOfNewResourceManagerLeader()
-> notifyOfNewResourceManagerLeader()
-> reconnectToResourceManager()
-> tryConnectToResourceManager()
-> connectToResourceManager()
private void connectToResourceManager() {         ... ...          resourceManagerConnection = new ResourceManagerConnection(                  log,                  jobGraph.getJobID(),                  resourceId,                  getAddress(),                  getFencingToken(),                  resourceManagerAddress.getAddress(),                  resourceManagerAddress.getResourceManagerId(),                  scheduledExecutorService);          resourceManagerConnection.start();}RegisteredRpcConnection.java
public void start() {         ... ...          final RetryingRegistration newRegistration = createNewRegistration();          if (REGISTRATION_UPDATER.compareAndSet(this,null, newRegistration)) {                  newRegistration.startRegistration();         } else {                  // concurrent start operation                  newRegistration.cancel();         }}privateRetryingRegistration createNewRegistration() {         RetryingRegistrationnewRegistration = checkNotNull(generateRegistration());          ... ...}JobMaster.java的内部类ResourceManagerConnection
protected RetryingRegistration generateRegistration() {         return newRetryingRegistration(                  log,                  getRpcService(),                  "ResourceManager",                  ResourceManagerGateway.class,                  getTargetAddress(),                  getTargetLeaderId(),                  jobMasterConfiguration.getRetryingRegistrationConfiguration()){                   @Override                  protectedCompletableFuture invokeRegistration(                                   ResourceManagerGatewaygateway, ResourceManagerId fencingToken, long timeoutMillis) {                          Time timeout =Time.milliseconds(timeoutMillis);                           return gateway.registerJobManager(                                   jobMasterId,                                   jobManagerResourceID,                                   jobManagerRpcAddress,                                   jobID,                                   timeout);                  }         };}SlotPool 申请 slot
注册成功调用onRegistrationSuccess(),向ResourceManager进行slot的申请:
JobMaster.java的内部类ResourceManagerConnection
protected void onRegistrationSuccess(finalJobMasterRegistrationSuccess success) {         runAsync(() -> {                  // filter out outdatedconnections                  //noinspection ObjectEquality                  if (this ==resourceManagerConnection) {                          establishResourceManagerConnection(success);                  }         });}private void establishResourceManagerConnection(finalJobMasterRegistrationSuccess success) {         ... ...         slotPool.connectToResourceManager(resourceManagerGateway);         ... ...}SlotPoolImpl.javapublic void connectToResourceManager(@NonnullResourceManagerGateway resourceManagerGateway) {         this.resourceManagerGateway =checkNotNull(resourceManagerGateway);          // work on all slots waiting for thisconnection         for (PendingRequest pendingRequest :waitingForResourceManager.values()) {                  // 向ResourceManager申请slot                  requestSlotFromResourceManager(resourceManagerGateway,pendingRequest);         }          // all sent off         waitingForResourceManager.clear();}private void requestSlotFromResourceManager(                  final ResourceManagerGatewayresourceManagerGateway,                  final PendingRequestpendingRequest) {         ... ...         CompletableFuturermResponse = resourceManagerGateway.requestSlot(                  jobMasterId,                  new SlotRequest(jobId,allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),                  rpcTimeout);          ... ...}ResourceManager.java:由ResourceManager里的SlotManager处理请求
public CompletableFuture requestSlot(                  JobMasterId jobMasterId,                  SlotRequest slotRequest,                  final Time timeout) {          ... ...                          try {                                   // SlotManager处理slot请求                                   slotManager.registerSlotRequest(slotRequest);                          }... ...}SlotManagerImpl.java
public boolean registerSlotRequest(SlotRequest slotRequest)throws ResourceManagerException {         checkInit();          ... ...                   PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);                   pendingSlotRequests.put(slotRequest.getAllocationId(),pendingSlotRequest);                   try {                          internalRequestSlot(pendingSlotRequest);                  }         ... ...}private void internalRequestSlot(PendingSlotRequestpendingSlotRequest) throws ResourceManagerException {         final ResourceProfile resourceProfile =pendingSlotRequest.getResourceProfile();          OptionalConsumer.of(findMatchingSlot(resourceProfile))                  .ifPresent(taskManagerSlot-> allocateSlot(taskManagerSlot, pendingSlotRequest))                  .ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));}private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequestpendingSlotRequest) throws ResourceManagerException {         ... ...         if (!pendingTaskManagerSlotOptional.isPresent()){                  pendingTaskManagerSlotOptional= allocateResource(resourceProfile);         }          ... ...}
9、ResourceManager申请资源

ResourceManager.java
public boolean allocateResource(WorkerResourceSpec workerResourceSpec){         validateRunsInMainThread();         return startNewWorker(workerResourceSpec);}ActiveResourceManager.javapublic boolean startNewWorker(WorkerResourceSpecworkerResourceSpec) {         requestNewWorker(workerResourceSpec);         return true;}private void requestNewWorker(WorkerResourceSpecworkerResourceSpec) {         // 从配置中获取taskexecutor配置         final TaskExecutorProcessSpectaskExecutorProcessSpec =                          TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig,workerResourceSpec);... ...// 申请资源         CompletableFuture requestResourceFuture = resourceManagerDriver.requestResource(taskExecutorProcessSpec);... ...}YarnResourceManagerDriver.java
public CompletableFuture requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {         checkInitialized();          final CompletableFuture requestResourceFuture = newCompletableFuture();          final OptionalpriorityAndResourceOpt =         taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec);          if(!priorityAndResourceOpt.isPresent()) {                  requestResourceFuture.completeExceptionally(                          newResourceManagerException(                                   String.format("Couldnot compute the container Resource from the given TaskExecutorProcessSpec %s." +                                                     "Thisusually indicates the requested resource is larger than Yarn's max containerresource limit.",                                            taskExecutorProcessSpec)));         } else {                  final Priority priority =priorityAndResourceOpt.get().getPriority();                  final Resource resource =priorityAndResourceOpt.get().getResource();                  resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority));                   // make sure we transmit therequest fast and receive fast news of granted allocations                  resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);                   requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec,ignore -> new LinkedList()).add(requestResourceFuture);                   log.info("Requesting newTaskExecutor container with resource {}, priority {}.",taskExecutorProcessSpec, priority);         }          return requestResourceFuture;}
10、TaskManager启动

YarnTaskExecutorRunner.java
public static void main(String[] args) {         EnvironmentInformation.logEnvironmentInfo(LOG,"YARN TaskExecutor runner", args);         SignalHandler.register(LOG);         JvmShutdownSafeguard.installAsShutdownHook(LOG);          runTaskManagerSecurely(args);}private static void runTaskManagerSecurely(String[] args) {         try {                  LOG.debug("Allenvironment variables: {}", ENV);                   final String currDir =ENV.get(Environment.PWD.key());                  LOG.info("Current workingDirectory: {}", currDir);                   final Configurationconfiguration = TaskManagerRunner.loadConfiguration(args);                  setupAndModifyConfiguration(configuration,currDir, ENV);                   TaskManagerRunner.runTaskManagerSecurely(configuration);         }         catch (Throwable t) {                  final ThrowablestrippedThrowable = ExceptionUtils.stripException(t,UndeclaredThrowableException.class);                  // make sure that everythingwhatever ends up in the log                  LOG.error("YARN TaskManagerinitialization failed.", strippedThrowable);                  System.exit(INIT_ERROR_EXIT_CODE);         }}TaskManagerRunner.java
public void start() throws Exception {         taskExecutorService.start();}TaskExecutorToServiceAdapter.java
public void start() {         taskExecutor.start();}TaskExecutor.java
public void onStart() throws Exception {         try {                  startTaskExecutorServices();         } catch (Throwable t) {                  final TaskManagerExceptionexception = new TaskManagerException(String.format("Could not start theTaskExecutor %s", getAddress()), t);                  onFatalError(exception);                  throw exception;         }          startRegistrationTimeout();}
11、向ResourceManager注册

TaskExecutor.java
private void startTaskExecutorServices() throwsException {         try {                  // start by connecting to theResourceManager                  resourceManagerLeaderRetriever.start(newResourceManagerLeaderListener());                   // tell the task slot tablewho's responsible for the task slot actions                  taskSlotTable.start(newSlotActionsImpl(), getMainThreadExecutor());                   // start the job leaderservice                  jobLeaderService.start(getAddress(),getRpcService(), haServices, new JobLeaderListenerImpl());                   fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(),blobCacheService.getPermanentBlobService());         } catch (Exception e) {                  handleStartTaskExecutorServicesException(e);         }}resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
-> notifyOfNewResourceManagerLeader()
-> TaskExecutor的notifyOfNewResourceManagerLeader()
-> TaskExecutor的reconnectToResourceManager()
-> TaskExecutor的tryConnectToResourceManager()
-> TaskExecutor的connectToResourceManager()
-> TaskExecutor的resourceManagerConnection.start()
执行 createNewRegistration()->generateRegistration()
TaskExecutorToResourceManagerConnection.java
protected RetryingRegistration generateRegistration() {         return newTaskExecutorToResourceManagerConnection.ResourceManagerRegistration(                  log,                  rpcService,                  getTargetAddress(),                  getTargetLeaderId(),                  retryingRegistrationConfiguration,                  taskExecutorRegistration);}开始注册newRegistration. startRegistration()会调用invokeRegistration():
TaskExecutorToResourceManagerConnection.java的内部类ResourceManagerRegistration
private static class ResourceManagerRegistration                  extendsRetryingRegistration {          private final TaskExecutorRegistration taskExecutorRegistration;          ResourceManagerRegistration(                          Logger log,                          RpcService rpcService,                          String targetAddress,                          ResourceManagerIdresourceManagerId,                          RetryingRegistrationConfigurationretryingRegistrationConfiguration,                          TaskExecutorRegistrationtaskExecutorRegistration) {                   super(log, rpcService,"ResourceManager", ResourceManagerGateway.class, targetAddress,resourceManagerId, retryingRegistrationConfiguration);                  this.taskExecutorRegistration= taskExecutorRegistration;         }          @Override         protectedCompletableFuture invokeRegistration(                          ResourceManagerGatewayresourceManager, ResourceManagerId fencingToken, long timeoutMillis) throwsException {                   Time timeout =Time.milliseconds(timeoutMillis);                  return resourceManager.registerTaskExecutor(                          taskExecutorRegistration,                          timeout);         }}注册成功调用onRegistrationSuccess
protected void onRegistrationSuccess(TaskExecutorRegistrationSuccesssuccess) {         log.info("Successful registrationat resource manager {} under registration id {}.",                  getTargetAddress(),success.getRegistrationId());          registrationListener.onRegistrationSuccess(this, success);}TaskExecutor.java的内部类ResourceManagerRegistrationListener
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnectionconnection, TaskExecutorRegistrationSuccess success) {         final ResourceID resourceManagerId =success.getResourceManagerId();         final InstanceIDtaskExecutorRegistrationId = success.getRegistrationId();         final ClusterInformationclusterInformation = success.getClusterInformation();         final ResourceManagerGatewayresourceManagerGateway = connection.getTargetGateway();          runAsync(                  () -> {                          // filter out outdatedconnections                          //noinspectionObjectEquality                          if(resourceManagerConnection == connection) {                                   try {                                            establishResourceManagerConnection(                                                     resourceManagerGateway,                                                     resourceManagerId,                                                     taskExecutorRegistrationId,                                                     clusterInformation);                                   } catch(Throwable t) {                                            log.error("EstablishingResource Manager connection in Task Executor failed", t);                                   }                          }                  });}private void establishResourceManagerConnection(                  ResourceManagerGateway resourceManagerGateway,                  ResourceID resourceManagerResourceId,                  InstanceID taskExecutorRegistrationId,                  ClusterInformation clusterInformation) {                 // 向ResourceManager注册slot         finalCompletableFuture slotReportResponseFuture =resourceManagerGateway.sendSlotReport(                  getResourceID(),                  taskExecutorRegistrationId,                  taskSlotTable.createSlotReport(getResourceID()),                  taskManagerConfiguration.getTimeout());          ... ...}ResourceManager.java
public CompletableFuture sendSlotReport(ResourceIDtaskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReportslotReport, Time timeout) {         final WorkerRegistration workerTypeWorkerRegistration =taskExecutors.get(taskManagerResourceId);          if(workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)){                  if (slotManager.registerTaskManager(workerTypeWorkerRegistration,slotReport)) {                          onWorkerRegistered(workerTypeWorkerRegistration.getWorker());                  }                  returnCompletableFuture.completedFuture(Acknowledge.get());         } else {                  return FutureUtils.completedExceptionally(newResourceManagerException(String.format("Unknown TaskManager registrationid %s.", taskManagerRegistrationId)));  }}       SlotManagerImpl.java
public boolean registerTaskManager(final TaskExecutorConnectiontaskExecutorConnection, SlotReport initialSlotReport) {         checkInit();          LOG.debug("Registering TaskManager{} under {} at the SlotManager.",taskExecutorConnection.getResourceID().getStringWithMetadata(),taskExecutorConnection.getInstanceID());          // we identify task managers by theirinstance id         // 通过实例id判定某个taskmanager是否已经注册过         if(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())){                  // 报告已注册过的taskmanager的slot分配环境,更新slot环境                  reportSlotStatus(taskExecutorConnection.getInstanceID(),initialSlotReport);                  return false;         } else {                  if(isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {                          LOG.info("Thetotal number of slots exceeds the max limitation {}, release the excess resource.",maxSlotNum);                          resourceActions.releaseResource(taskExecutorConnection.getInstanceID(),new FlinkException("The total number of slots exceeds the maxlimitation."));                          return false;                  }                   // first register theTaskManager                  ArrayList reportedSlots= new ArrayList();                   for (SlotStatus slotStatus :initialSlotReport) {                          reportedSlots.add(slotStatus.getSlotID());                  }                   TaskManagerRegistrationtaskManagerRegistration = new TaskManagerRegistration(                          taskExecutorConnection,                          reportedSlots);                   taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(),taskManagerRegistration);                   // next register the new slots                  for (SlotStatus slotStatus :initialSlotReport) {                          // 注册新的slot,根据slot请求进行分配                          registerSlot(                                   slotStatus.getSlotID(),                                   slotStatus.getAllocationID(),                                   slotStatus.getJobID(),                                   slotStatus.getResourceProfile(),                                   taskExecutorConnection);                  }                   return true;         } }
12、ResourceManager分配Slot

SlotManagerImpl.java
private void registerSlot(                  SlotID slotId,                  AllocationID allocationId,                  JobID jobId,                  ResourceProfileresourceProfile,                  TaskExecutorConnectiontaskManagerConnection) {          if (slots.containsKey(slotId)) {                  // remove the old slot first                  // 移除旧slot                  removeSlot(                          slotId,                          newSlotManagerException(                                   String.format(                                            "Re-registrationof slot %s. This indicates that the TaskExecutor has re-connected.",                                            slotId)));         }          // 创建和注册TaskManager的slot         final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId,resourceProfile, taskManagerConnection);          final PendingTaskManagerSlotpendingTaskManagerSlot;          if (allocationId == null) {                  pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);         } else {                  pendingTaskManagerSlot = null;         }          if (pendingTaskManagerSlot == null) {                  updateSlot(slotId,allocationId, jobId);         } else {                  pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());                  final PendingSlotRequestassignedPendingSlotRequest =pendingTaskManagerSlot.getAssignedPendingSlotRequest();                   // 分配slot给请求                  if (assignedPendingSlotRequest== null) {                          handleFreeSlot(slot);                  } else {                          assignedPendingSlotRequest.unassignPendingTaskManagerSlot();                          allocateSlot(slot,assignedPendingSlotRequest);                  }         }}private void allocateSlot(TaskManagerSlottaskManagerSlot, PendingSlotRequest pendingSlotRequest) {         ... ...         taskManagerRegistration.markUsed();          // RPC call to the task manager         CompletableFuturerequestFuture = gateway.requestSlot(                  slotId,                  pendingSlotRequest.getJobId(),                  allocationId,                  pendingSlotRequest.getResourceProfile(),                  pendingSlotRequest.getTargetAddress(),                  resourceManagerId,                  taskManagerRequestTimeout);          ... ...}
13、TaskManager提供Slot

TaskExecutor.java
public CompletableFuture requestSlot(         final SlotID slotId,         final JobID jobId,         final AllocationID allocationId,         final ResourceProfile resourceProfile,         final String targetAddress,         final ResourceManagerIdresourceManagerId,         final Time timeout) {         ... ...          try {                   // 分配taskmanager上的slot                  allocateSlot(                          slotId,                          jobId,                          allocationId,                          resourceProfile);         } catch (SlotAllocationException sae) {                  returnFutureUtils.completedExceptionally(sae);         }          final JobTable.Job job;          try {                  job =jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId,targetAddress));         } catch (Exception e) {                  // free the allocated slot                  try {                          taskSlotTable.freeSlot(allocationId);                  } catch (SlotNotFoundExceptionslotNotFoundException) {                          // slot no longerexistent, this should actually never happen, because we've                          // just allocated theslot. So let's fail hard in this case!                          onFatalError(slotNotFoundException);                  }                   // release local state underthe allocation id.                  localStateStoresManager.releaseLocalStateForAllocationId(allocationId);                   // sanity check                  if(!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {                          onFatalError(newException("Could not free slot " + slotId));                  }                   returnFutureUtils.completedExceptionally(new SlotAllocationException("Could notcreate new job.", e));         }          if (job.isConnected()) {                  // 连接上job,提供slot给JobManager                  offerSlotsToJobManager(jobId);         }          return CompletableFuture.completedFuture(Acknowledge.get());}private void internalOfferSlotsToJobManager(JobTable.ConnectionjobManagerConnection) {         final JobID jobId =jobManagerConnection.getJobId();          if(taskSlotTable.hasAllocatedSlots(jobId)) {                  log.info("Offer reservedslots to the leader of job {}.", jobId);                   final JobMasterGatewayjobMasterGateway = jobManagerConnection.getJobManagerGateway();                   finalIterator reservedSlotsIterator =taskSlotTable.getAllocatedSlots(jobId);                  final JobMasterId jobMasterId= jobManagerConnection.getJobMasterId();                   finalCollection reservedSlots = new HashSet(2);                   while(reservedSlotsIterator.hasNext()) {                          SlotOffer offer =reservedSlotsIterator.next().generateSlotOffer();                          reservedSlots.add(offer);                  }                   CompletableFutureacceptedSlotsFuture = jobMasterGateway.offerSlots(                          getResourceID(),                          reservedSlots,                          taskManagerConfiguration.getTimeout());                   acceptedSlotsFuture.whenCompleteAsync(                          handleAcceptedSlotOffers(jobId,jobMasterGateway, jobMasterId, reservedSlots),                          getMainThreadExecutor());         } else {                  log.debug("There are nounassigned slots for the job {}.", jobId);         }}JobMaster.java
public CompletableFuture offerSlots(                  final ResourceIDtaskManagerId,                  final Collectionslots,                  final Time timeout) {          Tuple2 taskManager =registeredTaskManagers.get(taskManagerId);          if (taskManager == null) {                  returnFutureUtils.completedExceptionally(new Exception("Unknown TaskManager" + taskManagerId));         }          final TaskManagerLocationtaskManagerLocation = taskManager.f0;         final TaskExecutorGatewaytaskExecutorGateway = taskManager.f1;          final RpcTaskManagerGatewayrpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway,getFencingToken());          return CompletableFuture.completedFuture(                  slotPool.offerSlots(                          taskManagerLocation,                          rpcTaskManagerGateway,                          slots));}SlotPoolImpl.java
public Collection offerSlots(                  TaskManagerLocation taskManagerLocation,                  TaskManagerGateway taskManagerGateway,                  Collectionoffers) {          ArrayList result = newArrayList(offers.size());          for (SlotOffer offer : offers) {                  if (offerSlot(                          taskManagerLocation,                          taskManagerGateway,                          offer)) {                           result.add(offer);                  }         }          return result;}boolean offerSlot(                  final TaskManagerLocationtaskManagerLocation,                  final TaskManagerGatewaytaskManagerGateway,                  final SlotOffer slotOffer) {          ... ...          // use the slot to fulfill pendingrequest, in requested order         // 按照请求顺序,利用slot来完成挂起的请求         tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);          // we accepted the request in any case.slot will be released after it idled for         // too long and timed out         return true;}<hr>往期精彩内容:
用flink能替换spark的批处理功能吗


Flink进阶之滑动窗口统计实时热门商品
Flink进阶之利用CEP实现恶意登陆检测
重磅!Flink源码解析环境准备及提交换程之环境准备
大咖分享 | 通过制作一个迷你版Flink来学习Flink源码




欢迎光临 创意电子 (https://wxcydz.cc/) Powered by Discuz! X3.4