Flink Yarn-per-job模式提交换程如图所示:
1、启动Yarn客户端
AbstractJobClusterExecutor.java
public CompletableFuture execute(@Nonnull finalPipeline pipeline, @Nonnull final Configuration configuration) throws Exception{ final JobGraph jobGraph =ExecutorUtils.getJobGraph(pipeline, configuration); // 创建并启动yarn客户端 try (finalClusterDescriptor clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)){ final ExecutionConfigAccessorconfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration); // 获取集群配置参数 final ClusterSpecificationclusterSpecification = clusterClientFactory.getClusterSpecification(configuration); // 部署集群 final ClusterClientProvider clusterClientProvider =clusterDescriptor .deployJobCluster(clusterSpecification,jobGraph, configAccessor.getDetachedMode()); LOG.info("Job has beensubmitted with JobID " + jobGraph.getJobID()); return CompletableFuture.completedFuture( newClusterClientJobClientAdapter(clusterClientProvider,jobGraph.getJobID())); }}YarnClusterClientFactory.java
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration){... ... return getClusterDescriptor(configuration);} private YarnClusterDescriptor getClusterDescriptor(Configurationconfiguration) { final YarnClient yarnClient = YarnClient.createYarnClient(); final YarnConfigurationyarnConfiguration = new YarnConfiguration(); yarnClient.init(yarnConfiguration); yarnClient.start(); return new YarnClusterDescriptor( configuration, yarnConfiguration, yarnClient, YarnClientYarnClusterInformationRetriever.create(yarnClient), false);}2、获取集群配置参数
AbstractContainerizedClusterClientFactory.java
public ClusterSpecification getClusterSpecification(Configurationconfiguration) {... ... final int jobManagerMemoryMB =JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( configuration, JobManagerOptions.TOTAL_PROCESS_MEMORY) .getTotalProcessMemorySize() .getMebiBytes(); final int taskManagerMemoryMB = TaskExecutorProcessUtils.processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption( configuration,TaskManagerOptions.TOTAL_PROCESS_MEMORY)) .getTotalProcessMemorySize() .getMebiBytes(); int slotsPerTaskManager =configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); return newClusterSpecification.ClusterSpecificationBuilder() .setMasterMemoryMB(jobManagerMemoryMB) .setTaskManagerMemoryMB(taskManagerMemoryMB) .setSlotsPerTaskManager(slotsPerTaskManager) .createClusterSpecification();}3、部署集群
YarnClusterDescriptor.java
public ClusterClientProvider deployJobCluster( ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) throws ClusterDeploymentException{ try { return deployInternal( clusterSpecification, "Flink per-jobcluster", getYarnJobClusterEntrypoint(), //获取YarnJobClusterEntrypoint,启动AM的入口 jobGraph, detached); } catch (Exception e) { throw new ClusterDeploymentException("Couldnot deploy Yarn job cluster.", e); }}上传 jar 包和配置文件到 HDFS
YarnClusterDescriptor.java
private ClusterClientProvider deployInternal( ClusterSpecificationclusterSpecification, String applicationName, String yarnClusterEntrypoint, @Nullable JobGraph jobGraph, boolean detached) throwsException {... ...// 创建应用 final YarnClientApplicationyarnApplication = yarnClient.createApplication();... ... ApplicationReport report = startAppMaster( flinkConfiguration, applicationName, yarnClusterEntrypoint, jobGraph, yarnClient, yarnApplication, validClusterSpecification);... ...}private ApplicationReport startAppMaster( Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplicationyarnApplication, ClusterSpecificationclusterSpecification) throws Exception {... ... // 初始化文件系统(HDFS) final FileSystem fs = FileSystem.get(yarnConfiguration);... ...ApplicationSubmissionContextappContext =yarnApplication.getApplicationSubmissionContext(); final List providedLibDirs = getRemoteSharedPaths(configuration);// 上传文件的工具类final YarnApplicationFileUploader fileUploader=YarnApplicationFileUploader.from( fs, fs.getHomeDirectory(), providedLibDirs, appContext.getApplicationId(), getFileReplication());... ... final ApplicationId appId =appContext.getApplicationId();... ... if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)){ // yarn重试次数,默认2 appContext.setMaxAppAttempts( configuration.getInteger( YarnConfigOptions.APPLICATION_ATTEMPTS.key(), YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); activateHighAvailabilitySupport(appContext); } else { //不是高可用重试次数为1 appContext.setMaxAppAttempts( configuration.getInteger( YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1)); }... ... // 多次调用上传HDFS的方法,分别是: // => systemShipFiles:日志的配置文件、lib/目录下除了dist的jar包 // => shipOnlyFiles:plugins/目录下的文件 // => userJarFiles:用户代码的jar包fileUploader.registerMultipleLocalResources (... ...);... ... // 上传和配置ApplicationMaster的jar包:flink-dist*.jar final YarnLocalResourceDescriptorlocalResourceDescFlinkJar = fileUploader.uploadFlinkDist(flinkJarPath);... ...//fileUploader.registerSingleLocalResource( jobGraphFilename, newPath(tmpJobGraphFile.toURI()), "", true, false);... ... // 上传flink配置文件 String flinkConfigKey ="flink-conf.yaml"; Path remotePathConf = setupSingleLocalResource( flinkConfigKey, fs, appId, newPath(tmpConfigurationFile.getAbsolutePath()), localResources, homeDir, "");... ... // 将JobGraph写入tmp文件并添加到本地资源,并上传到HDFS fileUploader.registerSingleLocalResource( jobGraphFilename, newPath(tmpJobGraphFile.toURI()), "", true, false);... ...// 上传flink配置文件String flinkConfigKey = "flink-conf.yaml";fileUploader.registerSingleLocalResource( flinkConfigKey, newPath(tmpConfigurationFile.getAbsolutePath()), "", true, true);... ...final JobManagerProcessSpec processSpec =JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap( flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY); //封装启动AM container的Java命令 final ContainerLaunchContext amContainer = setupApplicationMasterContainer( yarnClusterEntrypoint, hasKrb5, processSpec);... ... appContext.setApplicationName(customApplicationName);appContext.setApplicationType(applicationType!= null ? applicationType : "Apache Flink");appContext.setAMContainerSpec(amContainer);appContext.setResource(capability);... ... yarnClient.submitApplication(appContext);... ... }封装 AM 参数和命令
YarnClusterDescriptor.java
ContainerLaunchContext setupApplicationMasterContainer( String yarnClusterEntrypoint, boolean hasKrb5, JobManagerProcessSpec processSpec) { // respect custom JVM options in theYAML file String javaOpts =flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS); if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length()> 0) { javaOpts += " " +flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS); } //applicable only for YarnMiniClustersecure test run //krb5.conf file will be available aslocal resource in JM/TM container if (hasKrb5) { javaOpts += "-Djava.security.krb5.conf=krb5.conf"; } // 创建AM的容器启动上下文 ContainerLaunchContext amContainer =Records.newRecord(ContainerLaunchContext.class); final Map startCommandValues = new HashMap(); startCommandValues.put("java","$JAVA_HOME/bin/java"); String jvmHeapMem =JobManagerProcessUtils.generateJvmParametersStr(processSpec,flinkConfiguration); startCommandValues.put("jvmmem", jvmHeapMem); startCommandValues.put("jvmopts", javaOpts); startCommandValues.put("logging",YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration)); startCommandValues.put("class",yarnClusterEntrypoint); startCommandValues.put("redirects", "1> " +ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " + "2> " +ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err"); startCommandValues.put("args", ""); final String commandTemplate =flinkConfiguration .getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE, ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE); final String amCommand = BootstrapTools.getStartCommand(commandTemplate,startCommandValues); amContainer.setCommands(Collections.singletonList(amCommand)); LOG.debug("Application Masterstart command: " + amCommand); return amContainer;}封装 AM 参数:
private ApplicationReport startAppMaster( Configuration configuration, String applicationName, String yarnClusterEntrypoint, JobGraph jobGraph, YarnClient yarnClient, YarnClientApplicationyarnApplication, ClusterSpecificationclusterSpecification) throws Exception { ... ... final ContainerLaunchContext amContainer = setupApplicationMasterContainer( yarnClusterEntrypoint, hasKrb5, processSpec); ... ... // 封装AM 的classpath和环境参数 final Map appMasterEnv = new HashMap(); // set user specified appmaster environment variables appMasterEnv.putAll( ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX,configuration)); // set Flink app class path appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH,classPathBuilder.toString()); // set Flink on YARN internalconfiguration values appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR,localResourceDescFlinkJar.toString()); appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR,fileUploader.getHomeDir().toString()); appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, encodeYarnLocalResourceDescriptorListToString(fileUploader.getEnvShipResourceList())); appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace()); appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES,fileUploader.getApplicationDir().toUri().toString()); //https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME,UserGroupInformation.getCurrentUser().getUserName()); if (localizedKeytabPath !=null) { appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath); String principal =configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal); if (remotePathKeytab!= null) { appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH,remotePathKeytab.toString()); } } //To support Yarn SecureIntegration Test Scenario if (remoteYarnSiteXmlPath !=null) { appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH,remoteYarnSiteXmlPath.toString()); } if (remoteKrb5Path != null) { appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString()); } // set classpath from YARNconfiguration Utils.setupYarnClassPath(yarnConfiguration,appMasterEnv); //设置 AM 参数 amContainer.setEnvironment(appMasterEnv); ... ... yarnClient.submitApplication(appContext);... ... }4、提交应用
YarnClientImpl.java
public ApplicationId submitApplication(ApplicationSubmissionContextappContext) throws YarnException, IOException { ApplicationId applicationId =appContext.getApplicationId(); ... ... SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); //TODO: YARN-1763:Handle RM failoversduring the submitApplication call.rmClient.submitApplication(request);... ...}ApplicationClientProtocolPBClientImpl.java
public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throwsYarnException,IOException {//取出报文 SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl)request).getProto(); //将报文发送发送到服务端,并将返回结果构成response try { return newSubmitApplicationResponsePBImpl(proxy.submitApplication(null, requestProto)); } catch (ServiceException e) { RPCUtil.unwrapAndThrowException(e); return null; }}ApplicationClientProtocolPBServiceImpl.java
public SubmitApplicationResponseProto submitApplication(RpcController arg0,SubmitApplicationRequestProto proto) throws ServiceException {//服务端重新构建报文 SubmitApplicationRequestPBImpl request = newSubmitApplicationRequestPBImpl(proto); ...... SubmitApplicationResponse response = real.submitApplication(request); return((SubmitApplicationResponsePBImpl)response).getProto(); ......}ClientRMService.java
public SubmitApplicationResponse submitApplication(SubmitApplicationRequestrequest) throws YarnException { ... ... //将应用请求提交到Yarn上的RMAppManager去提交使命 this.rmAppManager.submitApplication(submissionContext,System.currentTimeMillis(), user); ... ...}5、创建Dispatcher、ResourceManager
Per-job模式的AM container加载运行入口是YarnJobClusterEntryPoint中的main()方法
YarnJobClusterEntrypoint.java
public staticvoid main(String[] args) { ... ... Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory,env); YarnJobClusterEntrypoint yarnJobClusterEntrypoint = newYarnJobClusterEntrypoint(configuration); ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);}ClusterEntrypoint.java
private void runCluster(Configuration configuration,PluginManager pluginManager) throws Exception { synchronized (lock) { initializeServices(configuration,pluginManager); ... ... //1、创建dispatcher、ResourceManager对象的工厂类 // 此中有从本地重新创建JobGraph的过程 finalDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory= createDispatcherResourceManagerComponentFactory(configuration); //2、通过工厂类创建dispatcher、ResourceManager对象 // Entry 启动RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等 clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, ioExecutor, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, newRpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), this); ... ... }}DefaultDispatcherResourceManagerComponentFactory.java
public DispatcherResourceManagerComponent create( Configuration configuration, Executor ioExecutor, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, ArchivedExecutionGraphStore archivedExecutionGraphStore, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { LeaderRetrievalService dispatcherLeaderRetrievalService = null; LeaderRetrievalService resourceManagerRetrievalService = null; WebMonitorEndpoint webMonitorEndpoint = null; ResourceManager resourceManager = null; ResourceManagerMetricGroup resourceManagerMetricGroup = null; DispatcherRunner dispatcherRunner =null; try { dispatcherLeaderRetrievalService =highAvailabilityServices.getDispatcherLeaderRetriever(); resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever(); final LeaderGatewayRetriever dispatcherGatewayRetriever = newRpcGatewayRetriever( rpcService, DispatcherGateway.class, DispatcherId::fromUuid, 10, Time.milliseconds(50L)); final LeaderGatewayRetriever resourceManagerGatewayRetriever = newRpcGatewayRetriever( rpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 10, Time.milliseconds(50L)); ... ... // 创建吸收前端Rest请求的节点 webMonitorEndpoint =restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler); log.debug("StartingDispatcher REST endpoint."); webMonitorEndpoint.start(); ... ... // 创建ResourceManager对象,返回的是new YarnResourceManager // 调理过程:AbstractDispatcherResourceManagerComponentFactory // -> ActiveResourceManagerFactory // -> YarnResourceManagerFactory resourceManager = resourceManagerFactory.createResourceManager( configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices, fatalErrorHandler, newClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), resourceManagerMetricGroup); ... ... // 创建dispatcherRunner对象并启动 log.debug("StartingDispatcher."); dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, newHaServicesJobGraphStoreFactory(highAvailabilityServices), ioExecutor, rpcService, partialDispatcherServices); // 启动ResourceManager log.debug("StartingResourceManager."); resourceManager.start(); resourceManagerRetrievalService.start(resourceManagerGatewayRetriever); dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); return newDispatcherResourceManagerComponent( dispatcherRunner, resourceManager, dispatcherLeaderRetrievalService, resourceManagerRetrievalService, webMonitorEndpoint); } ... ...}创建 YarnResourceManager
ResourceManagerFactory.java
public ResourceManager createResourceManager( Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, MetricRegistry metricRegistry, String hostname) throwsException { final ResourceManagerMetricGroup resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry,hostname); final SlotManagerMetricGroup slotManagerMetricGroup = SlotManagerMetricGroup.create(metricRegistry,hostname); final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices( configuration, rpcService,highAvailabilityServices, slotManagerMetricGroup); return createResourceManager( configuration, resourceId, rpcService, highAvailabilityServices, heartbeatServices, fatalErrorHandler, clusterInformation, webInterfaceUrl, resourceManagerMetricGroup, resourceManagerRuntimeServices);}YarnResourceManagerFactory.java
public ResourceManager createResourceManager( Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, ResourceManagerMetricGroup resourceManagerMetricGroup, ResourceManagerRuntimeServices resourceManagerRuntimeServices) { return new YarnResourceManager( rpcService, resourceId, configuration, System.getenv(), highAvailabilityServices, heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), ResourceManagerPartitionTrackerImpl::new, resourceManagerRuntimeServices.getJobLeaderIdService(), clusterInformation, fatalErrorHandler, webInterfaceUrl, resourceManagerMetricGroup);}创建YarnResourceManager时,创建了SlotManager
ResourceManagerFactory.java
private ResourceManagerRuntimeServices createResourceManagerRuntimeServices( Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, SlotManagerMetricGroup slotManagerMetricGroup) throws ConfigurationException { return ResourceManagerRuntimeServices.fromConfiguration( createResourceManagerRuntimeServicesConfiguration(configuration), highAvailabilityServices, rpcService.getScheduledExecutor(), slotManagerMetricGroup);}ResourceManagerRuntimeServices.java
public static ResourceManagerRuntimeServices fromConfiguration( ResourceManagerRuntimeServicesConfiguration configuration, HighAvailabilityServices highAvailabilityServices, ScheduledExecutor scheduledExecutor, SlotManagerMetricGroup slotManagerMetricGroup) { final SlotManager slotManager = createSlotManager(configuration,scheduledExecutor, slotManagerMetricGroup); final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( highAvailabilityServices, scheduledExecutor, configuration.getJobTimeout()); return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);}创建并启动 Dispatcher
DefaultDispatcherRunnerFactory.java
public DispatcherRunner createDispatcherRunner( LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, JobGraphStoreFactory jobGraphStoreFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices) throws Exception { final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =dispatcherLeaderProcessFactoryFactory.createFactory( jobGraphStoreFactory, ioExecutor, rpcService, partialDispatcherServices, fatalErrorHandler); return DefaultDispatcherRunner.create( leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory);}DefaultDispatcherRunner.java
public static DispatcherRunner create( LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception { final DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner( leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory); return DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner,leaderElectionService);}DispatcherRunnerLeaderElectionLifecycleManager.java
public static DispatcherRunner createFor(T dispatcherRunner,LeaderElectionService leaderElectionService) throws Exception { return new DispatcherRunnerLeaderElectionLifecycleManager(dispatcherRunner,leaderElectionService);} private DispatcherRunnerLeaderElectionLifecycleManager(TdispatcherRunner, LeaderElectionService leaderElectionService) throws Exception{ this.dispatcherRunner =dispatcherRunner; this.leaderElectionService =leaderElectionService; // 启动dispacher的leader选举 leaderElectionService.start(dispatcherRunner);}StandaloneLeaderElectionService.java
public void start(LeaderContender newContender) throws Exception { ... ... contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);}DefaultDispatcherRunner.java
public void grantLeadership(UUID leaderSessionID) { runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));}private void startNewDispatcherLeaderProcess(UUIDleaderSessionID) { ... ... previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));}AbstractDispatcherLeaderProcess.java
public final void start() { runIfStateIs( State.CREATED, this::startInternal);}private void startInternal() { log.info("Start {}.",getClass().getSimpleName()); state = State.RUNNING; onStart();}JobDispatcherLeaderProcess.java
protected void onStart() { final DispatcherGatewayServicedispatcherService = dispatcherGatewayServiceFactory.create( DispatcherId.fromUuid(getLeaderSessionId()), Collections.singleton(jobGraph), ThrowingJobGraphWriter.INSTANCE); completeDispatcherSetup(dispatcherService);}public AbstractDispatcherLeaderProcess.DispatcherGatewayService create( DispatcherId fencingToken, Collection recoveredJobs, JobGraphWriter jobGraphWriter){ ... ... // 启动dispacher dispatcher.start(); ... ...}启动 ResourceManager
DefaultDispatcherResourceManagerComponentFactory.java
public DispatcherResourceManagerComponent create( Configuration configuration, Executor ioExecutor, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, ArchivedExecutionGraphStore archivedExecutionGraphStore, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception { ... ... // 启动ResourceManager log.debug("StartingResourceManager."); resourceManager.start(); ... ...}ResourceManager.java
public void onStart() throws Exception { ... ... startResourceManagerServices(); ... ...}private void startResourceManagerServices() throwsException { try { leaderElectionService =highAvailabilityServices.getResourceManagerLeaderElectionService(); initialize(); leaderElectionService.start(this); jobLeaderIdService.start(new JobLeaderIdActionsImpl()); registerTaskExecutorMetrics(); } catch (Exception e) { handleStartResourceManagerServicesException(e); }}6、Dispatcher启动JobManager
Dispatcher.java
public void onStart() throws Exception { try { // 启动Dispatcher startDispatcherServices(); } ... ... // 启动Job startRecoveredJobs(); ... ...}Dispatcher.java
CompletableFuture createJobManagerRunner(JobGraphjobGraph, long initializationTimestamp) { final RpcService rpcService =getRpcService(); return CompletableFuture.supplyAsync( () -> { try { JobManagerRunnerrunner = jobManagerRunnerFactory.createJobManagerRunner( jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, newDefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup), fatalErrorHandler, initializationTimestamp); // 启动 JobManagerRunner runner.start(); return runner; } ......}JobManagerRunnerImpl.java
public void start() throws Exception { try { leaderElectionService.start(this); } catch (Exception e) { log.error("Could notstart the JobManager because the leader election service did not start.",e); throw new Exception("Couldnot start the leader election service.", e); }}StandaloneLeaderElectionService.java
public void start(LeaderContendernewContender) throws Exception { ... ... contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);}JobManagerRunnerImpl.java
public void grantLeadership(final UUID leaderSessionID){ synchronized (lock) { if (shutdown) { log.debug("JobManagerRunnercannot be granted leadership because it is already shut down."); return; } leadershipOperation =leadershipOperation.thenCompose( (ignored) -> { synchronized(lock) { // 校验作业的调理状态然后启动作业管理器 returnverifyJobSchedulingStatusAndStartJobManager(leaderSessionID); } }); handleException(leadershipOperation,"Could not start the job manager."); }}private CompletableFuture verifyJobSchedulingStatusAndStartJobManager(UUIDleaderSessionId) { final CompletableFuture jobSchedulingStatusFuture = getJobSchedulingStatus(); return jobSchedulingStatusFuture.thenCompose( jobSchedulingStatus -> { if(jobSchedulingStatus == JobSchedulingStatus.DONE) { returnjobAlreadyDone(); } else { return startJobMaster(leaderSessionId); } });}private CompletionStage startJobMaster(UUIDleaderSessionId) { ... ... startFuture = jobMasterService.start(new JobMasterId(leaderSessionId)); ... ...}JobMaster.java
public CompletableFuture start(finalJobMasterId newJobMasterId) throws Exception { // make sure we receive RPC and asynccalls start(); return callAsyncWithoutFencing(() ->startJobExecution(newJobMasterId),RpcUtils.INF_TIMEOUT);}private Acknowledge startJobExecution(JobMasterId newJobMasterId)throws Exception {... ...// 启动JobMaster startJobMasterServices(); log.info("Starting execution ofjob {} ({}) under job master id {}.", jobGraph.getName(),jobGraph.getJobID(), newJobMasterId); // 重置开始调理 resetAndStartScheduler();... ...}7、ResourceManager启动SlotManager
ResourceManager.java
public final void onStart() throws Exception { ... ... startResourceManagerServices(); ... ...}private void startResourceManagerServices() throwsException { try { leaderElectionService =highAvailabilityServices.getResourceManagerLeaderElectionService(); initialize(); leaderElectionService.start(this); jobLeaderIdService.start(newJobLeaderIdActionsImpl()); registerTaskExecutorMetrics(); } catch (Exception e) { handleStartResourceManagerServicesException(e); }}创建 Yarn 的 RM 和 NM 客户端
ActiveResourceManager.java
protected void initialize() throws ResourceManagerException{ try { resourceManagerDriver.initialize( this, newGatewayMainThreadExecutor(), ioExecutor); } catch (Exception e) { throw newResourceManagerException("Cannot initialize resource provider.", e); }}AbstractResourceManagerDriver.java
public final void initialize( ResourceEventHandler resourceEventHandler, ScheduledExecutor mainThreadExecutor, Executor ioExecutor) throwsException { this.resourceEventHandler =Preconditions.checkNotNull(resourceEventHandler); this.mainThreadExecutor =Preconditions.checkNotNull(mainThreadExecutor); this.ioExecutor =Preconditions.checkNotNull(ioExecutor); initializeInternal();}YarnResourceManagerDriver.java
protected void initializeInternal() throws Exception { final YarnContainerEventHandler yarnContainerEventHandler = new YarnContainerEventHandler(); try { // 创建和启动yarn的resourcemanager客户端 resourceManagerClient =yarnResourceManagerClientFactory.createResourceManagerClient( yarnHeartbeatIntervalMillis, yarnContainerEventHandler); resourceManagerClient.init(yarnConfig); resourceManagerClient.start(); final RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster(); getContainersFromPreviousAttempts(registerApplicationMasterResponse); taskExecutorProcessSpecContainerResourcePriorityAdapter= newTaskExecutorProcessSpecContainerResourcePriorityAdapter( registerApplicationMasterResponse.getMaximumResourceCapability(), ExternalResourceUtils.getExternalResources(flinkConfig,YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX)); } catch (Exception e) { throw newResourceManagerException("Could not start resource manager client.",e); } // 创建和启动yarn的nodemanager客户端 nodeManagerClient =yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler); nodeManagerClient.init(yarnConfig); nodeManagerClient.start();}启动 SlotManager
StandaloneLeaderElectionService.java
private void startServicesOnLeadership() { startHeartbeatServices(); slotManager.start(getFencingToken(),getMainThreadExecutor(), new ResourceActionsImpl()); onLeadership();}SlotManagerImpl.java
public void start(ResourceManagerId newResourceManagerId,Executor newMainThreadExecutor, ResourceActions newResourceActions) { LOG.info("Starting theSlotManager."); this.resourceManagerId =Preconditions.checkNotNull(newResourceManagerId); mainThreadExecutor =Preconditions.checkNotNull(newMainThreadExecutor); resourceActions =Preconditions.checkNotNull(newResourceActions); started = true; taskManagerTimeoutsAndRedundancyCheck =scheduledExecutor.scheduleWithFixedDelay( () ->mainThreadExecutor.execute( () -> checkTaskManagerTimeoutsAndRedundancy()), 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); slotRequestTimeoutCheck =scheduledExecutor.scheduleWithFixedDelay( () ->mainThreadExecutor.execute( () -> checkSlotRequestTimeouts()), 0L, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); registerSlotManagerMetrics();}void checkTaskManagerTimeoutsAndRedundancy() { if (!taskManagerRegistrations.isEmpty()){ long currentTime =System.currentTimeMillis(); ArrayListtimedOutTaskManagers = new ArrayList(taskManagerRegistrations.size()); // first retrieve the timedout TaskManagers for (TaskManagerRegistrationtaskManagerRegistration : taskManagerRegistrations.values()) { if (currentTime -taskManagerRegistration.getIdleSince() >=taskManagerTimeout.toMilliseconds()) { // we collectthe instance ids first in order to avoid concurrent modifications by the //ResourceActions.releaseResource call timedOutTaskManagers.add(taskManagerRegistration); } } int slotsDiff =redundantTaskManagerNum * numSlotsPerWorker - freeSlots.size(); if (freeSlots.size() == slots.size()){ // No need tokeep redundant taskManagers if no job is running. // 假如没有job在运行,开释taskmanager releaseTaskExecutors(timedOutTaskManagers,timedOutTaskManagers.size()); } else if (slotsDiff > 0) { // Keep enoughredundant taskManagers from time to time. // 保证随时有富足的taskmanager intrequiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, numSlotsPerWorker); allocateRedundantTaskManagers(requiredTaskManagers); } else { // second we triggerthe release resource callback which can decide upon the resource release int maxReleaseNum =(-slotsDiff) / numSlotsPerWorker; releaseTaskExecutors(timedOutTaskManagers,Math.min(maxReleaseNum, timedOutTaskManagers.size())); } }}8、JobManager申请Slot
启动 SlotPool
接6,JobMaster启动时,启动SlotPool,向ResourceManager注册
private void startJobMasterServices() throwsException { // 启动心跳服务 startHeartbeatServices(); // 启动slotPool slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor()); // 连接到之前已知的ResourceManager reconnectToResourceManager(newFlinkException("Starting JobMaster component.")); // 启动后slotpool开始向slot manager请求slot resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());}向 ResourceManager 注册
颠末下面层层调用:
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
-> notifyOfNewResourceManagerLeader()
-> notifyOfNewResourceManagerLeader()
-> reconnectToResourceManager()
-> tryConnectToResourceManager()
-> connectToResourceManager()
private void connectToResourceManager() { ... ... resourceManagerConnection = new ResourceManagerConnection( log, jobGraph.getJobID(), resourceId, getAddress(), getFencingToken(), resourceManagerAddress.getAddress(), resourceManagerAddress.getResourceManagerId(), scheduledExecutorService); resourceManagerConnection.start();}RegisteredRpcConnection.java
public void start() { ... ... final RetryingRegistration newRegistration = createNewRegistration(); if (REGISTRATION_UPDATER.compareAndSet(this,null, newRegistration)) { newRegistration.startRegistration(); } else { // concurrent start operation newRegistration.cancel(); }}privateRetryingRegistration createNewRegistration() { RetryingRegistrationnewRegistration = checkNotNull(generateRegistration()); ... ...}JobMaster.java的内部类ResourceManagerConnection
protected RetryingRegistration generateRegistration() { return newRetryingRegistration( log, getRpcService(), "ResourceManager", ResourceManagerGateway.class, getTargetAddress(), getTargetLeaderId(), jobMasterConfiguration.getRetryingRegistrationConfiguration()){ @Override protectedCompletableFuture invokeRegistration( ResourceManagerGatewaygateway, ResourceManagerId fencingToken, long timeoutMillis) { Time timeout =Time.milliseconds(timeoutMillis); return gateway.registerJobManager( jobMasterId, jobManagerResourceID, jobManagerRpcAddress, jobID, timeout); } };}SlotPool 申请 slot
注册成功调用onRegistrationSuccess(),向ResourceManager进行slot的申请:
JobMaster.java的内部类ResourceManagerConnection
protected void onRegistrationSuccess(finalJobMasterRegistrationSuccess success) { runAsync(() -> { // filter out outdatedconnections //noinspection ObjectEquality if (this ==resourceManagerConnection) { establishResourceManagerConnection(success); } });}private void establishResourceManagerConnection(finalJobMasterRegistrationSuccess success) { ... ... slotPool.connectToResourceManager(resourceManagerGateway); ... ...}SlotPoolImpl.javapublic void connectToResourceManager(@NonnullResourceManagerGateway resourceManagerGateway) { this.resourceManagerGateway =checkNotNull(resourceManagerGateway); // work on all slots waiting for thisconnection for (PendingRequest pendingRequest :waitingForResourceManager.values()) { // 向ResourceManager申请slot requestSlotFromResourceManager(resourceManagerGateway,pendingRequest); } // all sent off waitingForResourceManager.clear();}private void requestSlotFromResourceManager( final ResourceManagerGatewayresourceManagerGateway, final PendingRequestpendingRequest) { ... ... CompletableFuturermResponse = resourceManagerGateway.requestSlot( jobMasterId, new SlotRequest(jobId,allocationId, pendingRequest.getResourceProfile(), jobManagerAddress), rpcTimeout); ... ...}ResourceManager.java:由ResourceManager里的SlotManager处理请求
public CompletableFuture requestSlot( JobMasterId jobMasterId, SlotRequest slotRequest, final Time timeout) { ... ... try { // SlotManager处理slot请求 slotManager.registerSlotRequest(slotRequest); }... ...}SlotManagerImpl.java
public boolean registerSlotRequest(SlotRequest slotRequest)throws ResourceManagerException { checkInit(); ... ... PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); pendingSlotRequests.put(slotRequest.getAllocationId(),pendingSlotRequest); try { internalRequestSlot(pendingSlotRequest); } ... ...}private void internalRequestSlot(PendingSlotRequestpendingSlotRequest) throws ResourceManagerException { final ResourceProfile resourceProfile =pendingSlotRequest.getResourceProfile(); OptionalConsumer.of(findMatchingSlot(resourceProfile)) .ifPresent(taskManagerSlot-> allocateSlot(taskManagerSlot, pendingSlotRequest)) .ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));}private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequestpendingSlotRequest) throws ResourceManagerException { ... ... if (!pendingTaskManagerSlotOptional.isPresent()){ pendingTaskManagerSlotOptional= allocateResource(resourceProfile); } ... ...}9、ResourceManager申请资源
ResourceManager.java
public boolean allocateResource(WorkerResourceSpec workerResourceSpec){ validateRunsInMainThread(); return startNewWorker(workerResourceSpec);}ActiveResourceManager.javapublic boolean startNewWorker(WorkerResourceSpecworkerResourceSpec) { requestNewWorker(workerResourceSpec); return true;}private void requestNewWorker(WorkerResourceSpecworkerResourceSpec) { // 从配置中获取taskexecutor配置 final TaskExecutorProcessSpectaskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig,workerResourceSpec);... ...// 申请资源 CompletableFuture requestResourceFuture = resourceManagerDriver.requestResource(taskExecutorProcessSpec);... ...}YarnResourceManagerDriver.java
public CompletableFuture requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) { checkInitialized(); final CompletableFuture requestResourceFuture = newCompletableFuture(); final OptionalpriorityAndResourceOpt = taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec); if(!priorityAndResourceOpt.isPresent()) { requestResourceFuture.completeExceptionally( newResourceManagerException( String.format("Couldnot compute the container Resource from the given TaskExecutorProcessSpec %s." + "Thisusually indicates the requested resource is larger than Yarn's max containerresource limit.", taskExecutorProcessSpec))); } else { final Priority priority =priorityAndResourceOpt.get().getPriority(); final Resource resource =priorityAndResourceOpt.get().getResource(); resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority)); // make sure we transmit therequest fast and receive fast news of granted allocations resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis); requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec,ignore -> new LinkedList()).add(requestResourceFuture); log.info("Requesting newTaskExecutor container with resource {}, priority {}.",taskExecutorProcessSpec, priority); } return requestResourceFuture;}10、TaskManager启动
YarnTaskExecutorRunner.java
public static void main(String[] args) { EnvironmentInformation.logEnvironmentInfo(LOG,"YARN TaskExecutor runner", args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); runTaskManagerSecurely(args);}private static void runTaskManagerSecurely(String[] args) { try { LOG.debug("Allenvironment variables: {}", ENV); final String currDir =ENV.get(Environment.PWD.key()); LOG.info("Current workingDirectory: {}", currDir); final Configurationconfiguration = TaskManagerRunner.loadConfiguration(args); setupAndModifyConfiguration(configuration,currDir, ENV); TaskManagerRunner.runTaskManagerSecurely(configuration); } catch (Throwable t) { final ThrowablestrippedThrowable = ExceptionUtils.stripException(t,UndeclaredThrowableException.class); // make sure that everythingwhatever ends up in the log LOG.error("YARN TaskManagerinitialization failed.", strippedThrowable); System.exit(INIT_ERROR_EXIT_CODE); }}TaskManagerRunner.java
public void start() throws Exception { taskExecutorService.start();}TaskExecutorToServiceAdapter.java
public void start() { taskExecutor.start();}TaskExecutor.java
public void onStart() throws Exception { try { startTaskExecutorServices(); } catch (Throwable t) { final TaskManagerExceptionexception = new TaskManagerException(String.format("Could not start theTaskExecutor %s", getAddress()), t); onFatalError(exception); throw exception; } startRegistrationTimeout();}11、向ResourceManager注册
TaskExecutor.java
private void startTaskExecutorServices() throwsException { try { // start by connecting to theResourceManager resourceManagerLeaderRetriever.start(newResourceManagerLeaderListener()); // tell the task slot tablewho's responsible for the task slot actions taskSlotTable.start(newSlotActionsImpl(), getMainThreadExecutor()); // start the job leaderservice jobLeaderService.start(getAddress(),getRpcService(), haServices, new JobLeaderListenerImpl()); fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(),blobCacheService.getPermanentBlobService()); } catch (Exception e) { handleStartTaskExecutorServicesException(e); }}resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
-> notifyOfNewResourceManagerLeader()
-> TaskExecutor的notifyOfNewResourceManagerLeader()
-> TaskExecutor的reconnectToResourceManager()
-> TaskExecutor的tryConnectToResourceManager()
-> TaskExecutor的connectToResourceManager()
-> TaskExecutor的resourceManagerConnection.start()
执行 createNewRegistration()->generateRegistration()
TaskExecutorToResourceManagerConnection.java
protected RetryingRegistration generateRegistration() { return newTaskExecutorToResourceManagerConnection.ResourceManagerRegistration( log, rpcService, getTargetAddress(), getTargetLeaderId(), retryingRegistrationConfiguration, taskExecutorRegistration);}开始注册newRegistration. startRegistration()会调用invokeRegistration():
TaskExecutorToResourceManagerConnection.java的内部类ResourceManagerRegistration
private static class ResourceManagerRegistration extendsRetryingRegistration { private final TaskExecutorRegistration taskExecutorRegistration; ResourceManagerRegistration( Logger log, RpcService rpcService, String targetAddress, ResourceManagerIdresourceManagerId, RetryingRegistrationConfigurationretryingRegistrationConfiguration, TaskExecutorRegistrationtaskExecutorRegistration) { super(log, rpcService,"ResourceManager", ResourceManagerGateway.class, targetAddress,resourceManagerId, retryingRegistrationConfiguration); this.taskExecutorRegistration= taskExecutorRegistration; } @Override protectedCompletableFuture invokeRegistration( ResourceManagerGatewayresourceManager, ResourceManagerId fencingToken, long timeoutMillis) throwsException { Time timeout =Time.milliseconds(timeoutMillis); return resourceManager.registerTaskExecutor( taskExecutorRegistration, timeout); }}注册成功调用onRegistrationSuccess
protected void onRegistrationSuccess(TaskExecutorRegistrationSuccesssuccess) { log.info("Successful registrationat resource manager {} under registration id {}.", getTargetAddress(),success.getRegistrationId()); registrationListener.onRegistrationSuccess(this, success);}TaskExecutor.java的内部类ResourceManagerRegistrationListener
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnectionconnection, TaskExecutorRegistrationSuccess success) { final ResourceID resourceManagerId =success.getResourceManagerId(); final InstanceIDtaskExecutorRegistrationId = success.getRegistrationId(); final ClusterInformationclusterInformation = success.getClusterInformation(); final ResourceManagerGatewayresourceManagerGateway = connection.getTargetGateway(); runAsync( () -> { // filter out outdatedconnections //noinspectionObjectEquality if(resourceManagerConnection == connection) { try { establishResourceManagerConnection( resourceManagerGateway, resourceManagerId, taskExecutorRegistrationId, clusterInformation); } catch(Throwable t) { log.error("EstablishingResource Manager connection in Task Executor failed", t); } } });}private void establishResourceManagerConnection( ResourceManagerGateway resourceManagerGateway, ResourceID resourceManagerResourceId, InstanceID taskExecutorRegistrationId, ClusterInformation clusterInformation) { // 向ResourceManager注册slot finalCompletableFuture slotReportResponseFuture =resourceManagerGateway.sendSlotReport( getResourceID(), taskExecutorRegistrationId, taskSlotTable.createSlotReport(getResourceID()), taskManagerConfiguration.getTimeout()); ... ...}ResourceManager.java
public CompletableFuture sendSlotReport(ResourceIDtaskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReportslotReport, Time timeout) { final WorkerRegistration workerTypeWorkerRegistration =taskExecutors.get(taskManagerResourceId); if(workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)){ if (slotManager.registerTaskManager(workerTypeWorkerRegistration,slotReport)) { onWorkerRegistered(workerTypeWorkerRegistration.getWorker()); } returnCompletableFuture.completedFuture(Acknowledge.get()); } else { return FutureUtils.completedExceptionally(newResourceManagerException(String.format("Unknown TaskManager registrationid %s.", taskManagerRegistrationId))); }} SlotManagerImpl.java
public boolean registerTaskManager(final TaskExecutorConnectiontaskExecutorConnection, SlotReport initialSlotReport) { checkInit(); LOG.debug("Registering TaskManager{} under {} at the SlotManager.",taskExecutorConnection.getResourceID().getStringWithMetadata(),taskExecutorConnection.getInstanceID()); // we identify task managers by theirinstance id // 通过实例id判定某个taskmanager是否已经注册过 if(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())){ // 报告已注册过的taskmanager的slot分配环境,更新slot环境 reportSlotStatus(taskExecutorConnection.getInstanceID(),initialSlotReport); return false; } else { if(isMaxSlotNumExceededAfterRegistration(initialSlotReport)) { LOG.info("Thetotal number of slots exceeds the max limitation {}, release the excess resource.",maxSlotNum); resourceActions.releaseResource(taskExecutorConnection.getInstanceID(),new FlinkException("The total number of slots exceeds the maxlimitation.")); return false; } // first register theTaskManager ArrayList reportedSlots= new ArrayList(); for (SlotStatus slotStatus :initialSlotReport) { reportedSlots.add(slotStatus.getSlotID()); } TaskManagerRegistrationtaskManagerRegistration = new TaskManagerRegistration( taskExecutorConnection, reportedSlots); taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(),taskManagerRegistration); // next register the new slots for (SlotStatus slotStatus :initialSlotReport) { // 注册新的slot,根据slot请求进行分配 registerSlot( slotStatus.getSlotID(), slotStatus.getAllocationID(), slotStatus.getJobID(), slotStatus.getResourceProfile(), taskExecutorConnection); } return true; } }12、ResourceManager分配Slot
SlotManagerImpl.java
private void registerSlot( SlotID slotId, AllocationID allocationId, JobID jobId, ResourceProfileresourceProfile, TaskExecutorConnectiontaskManagerConnection) { if (slots.containsKey(slotId)) { // remove the old slot first // 移除旧slot removeSlot( slotId, newSlotManagerException( String.format( "Re-registrationof slot %s. This indicates that the TaskExecutor has re-connected.", slotId))); } // 创建和注册TaskManager的slot final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId,resourceProfile, taskManagerConnection); final PendingTaskManagerSlotpendingTaskManagerSlot; if (allocationId == null) { pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile); } else { pendingTaskManagerSlot = null; } if (pendingTaskManagerSlot == null) { updateSlot(slotId,allocationId, jobId); } else { pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId()); final PendingSlotRequestassignedPendingSlotRequest =pendingTaskManagerSlot.getAssignedPendingSlotRequest(); // 分配slot给请求 if (assignedPendingSlotRequest== null) { handleFreeSlot(slot); } else { assignedPendingSlotRequest.unassignPendingTaskManagerSlot(); allocateSlot(slot,assignedPendingSlotRequest); } }}private void allocateSlot(TaskManagerSlottaskManagerSlot, PendingSlotRequest pendingSlotRequest) { ... ... taskManagerRegistration.markUsed(); // RPC call to the task manager CompletableFuturerequestFuture = gateway.requestSlot( slotId, pendingSlotRequest.getJobId(), allocationId, pendingSlotRequest.getResourceProfile(), pendingSlotRequest.getTargetAddress(), resourceManagerId, taskManagerRequestTimeout); ... ...}13、TaskManager提供Slot
TaskExecutor.java
public CompletableFuture requestSlot( final SlotID slotId, final JobID jobId, final AllocationID allocationId, final ResourceProfile resourceProfile, final String targetAddress, final ResourceManagerIdresourceManagerId, final Time timeout) { ... ... try { // 分配taskmanager上的slot allocateSlot( slotId, jobId, allocationId, resourceProfile); } catch (SlotAllocationException sae) { returnFutureUtils.completedExceptionally(sae); } final JobTable.Job job; try { job =jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId,targetAddress)); } catch (Exception e) { // free the allocated slot try { taskSlotTable.freeSlot(allocationId); } catch (SlotNotFoundExceptionslotNotFoundException) { // slot no longerexistent, this should actually never happen, because we've // just allocated theslot. So let's fail hard in this case! onFatalError(slotNotFoundException); } // release local state underthe allocation id. localStateStoresManager.releaseLocalStateForAllocationId(allocationId); // sanity check if(!taskSlotTable.isSlotFree(slotId.getSlotNumber())) { onFatalError(newException("Could not free slot " + slotId)); } returnFutureUtils.completedExceptionally(new SlotAllocationException("Could notcreate new job.", e)); } if (job.isConnected()) { // 连接上job,提供slot给JobManager offerSlotsToJobManager(jobId); } return CompletableFuture.completedFuture(Acknowledge.get());}private void internalOfferSlotsToJobManager(JobTable.ConnectionjobManagerConnection) { final JobID jobId =jobManagerConnection.getJobId(); if(taskSlotTable.hasAllocatedSlots(jobId)) { log.info("Offer reservedslots to the leader of job {}.", jobId); final JobMasterGatewayjobMasterGateway = jobManagerConnection.getJobManagerGateway(); finalIterator reservedSlotsIterator =taskSlotTable.getAllocatedSlots(jobId); final JobMasterId jobMasterId= jobManagerConnection.getJobMasterId(); finalCollection reservedSlots = new HashSet(2); while(reservedSlotsIterator.hasNext()) { SlotOffer offer =reservedSlotsIterator.next().generateSlotOffer(); reservedSlots.add(offer); } CompletableFutureacceptedSlotsFuture = jobMasterGateway.offerSlots( getResourceID(), reservedSlots, taskManagerConfiguration.getTimeout()); acceptedSlotsFuture.whenCompleteAsync( handleAcceptedSlotOffers(jobId,jobMasterGateway, jobMasterId, reservedSlots), getMainThreadExecutor()); } else { log.debug("There are nounassigned slots for the job {}.", jobId); }}JobMaster.java
public CompletableFuture offerSlots( final ResourceIDtaskManagerId, final Collectionslots, final Time timeout) { Tuple2 taskManager =registeredTaskManagers.get(taskManagerId); if (taskManager == null) { returnFutureUtils.completedExceptionally(new Exception("Unknown TaskManager" + taskManagerId)); } final TaskManagerLocationtaskManagerLocation = taskManager.f0; final TaskExecutorGatewaytaskExecutorGateway = taskManager.f1; final RpcTaskManagerGatewayrpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway,getFencingToken()); return CompletableFuture.completedFuture( slotPool.offerSlots( taskManagerLocation, rpcTaskManagerGateway, slots));}SlotPoolImpl.java
public Collection offerSlots( TaskManagerLocation taskManagerLocation, TaskManagerGateway taskManagerGateway, Collectionoffers) { ArrayList result = newArrayList(offers.size()); for (SlotOffer offer : offers) { if (offerSlot( taskManagerLocation, taskManagerGateway, offer)) { result.add(offer); } } return result;}boolean offerSlot( final TaskManagerLocationtaskManagerLocation, final TaskManagerGatewaytaskManagerGateway, final SlotOffer slotOffer) { ... ... // use the slot to fulfill pendingrequest, in requested order // 按照请求顺序,利用slot来完成挂起的请求 tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); // we accepted the request in any case.slot will be released after it idled for // too long and timed out return true;}<hr>往期精彩内容:
用flink能替换spark的批处理功能吗
Flink进阶之滑动窗口统计实时热门商品
Flink进阶之利用CEP实现恶意登陆检测
重磅!Flink源码解析环境准备及提交换程之环境准备
大咖分享 | 通过制作一个迷你版Flink来学习Flink源码 |