尚硅谷教育 发表于 2021-9-28 10:00:31

重磅!Flink源码解析环境准备及提交流程之环境准备

Flink Yarn-per-job模式提交流程如图所示:
https://p26.toutiaoimg.com/large/pgc-image/3cb37ec15e8544e3973bfd8af0afa703
一、步伐起点
1. flink\bin\flink
=> exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}"-classpath "`manglePathList"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
2. flink/bin/config.sh(相关情况设置都在这里)
=> JAVA_RUN=java
=> JVM_ARGS="" => # Use conf/flink-conf.yaml
=>INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
3. 实行java -cp 就会开启JVM假造机,在假造机上开启CliFrontend进程,然后开始实行main方法
说明:java -cp和 -classpath一样,是指定类运行所依赖其他类的路径。
java -cp =》开启JVM假造机 =》开启Process(CliFrontend)=》步伐入口CliFrontend.main
4. Flink提交任务的入口类为CliFrontend。找到这个类的main方法:
在IDEA中全局查找(ctrl + n):org.apache.flink.client.cli.CliFrontend,找到CliFrontend类,并找到main方法
/*** Submits the job based on the arguments.(根据参数提交作业)*/public static void main(final String[] args) {       ... ...       final CliFrontend cli = new CliFrontend(                            configuration,                            customCommandLines);       ... ...}二、步伐入口
CliFrontend.java
public static void main(final String[] args) {         ... ...         final CliFrontend cli = new CliFrontend(                                 configuration,                                 customCommandLines);         ... ...      int retCode =SecurityUtils.getInstalledContext()                            .runSecured(() -> cli.parseParameters(args));         ... ...}public int parseParameters(String[] args) {         ... ...         // get action         String action = args;          // remove action from parameters         final String[] params =Arrays.copyOfRange(args, 1, args.length);                        // do action                  switch (action) {                        case ACTION_RUN:                                 run(params);                                 return 0;                        case ACTION_LIST:                                 list(params);                                 return 0;                        case ACTION_INFO:                                 info(params);                                 return 0;                        case ACTION_CANCEL:                                 cancel(params);                                 return 0;                        case ACTION_STOP:                                 stop(params);                                 return 0;                        case ACTION_SAVEPOINT:                                 savepoint(params);                                 return 0;                        ……                  }         ... ...}三、解析输入参数
CliFrontend.java
protected void run(String[] args) throws Exception {       ... ...       //获取默认的运行参数       final Options commandOptions =CliFrontendParser.getRunCommandOptions();         // 解析参数,返回commandLine       final CommandLine commandLine = getCommandLine(commandOptions, args, true);       ... ...}public CommandLine getCommandLine(final OptionscommandOptions, final String[] args, final boolean stopAtNonOptions) throwsCliArgsException {         final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions,customCommandLineOptions);         return CliFrontendParser.parse(commandLineOptions, args,stopAtNonOptions);}DefaultParser.javaDefaultParser.java
public class CliFrontendParser {         // 选项列表         static final Option HELP_OPTION = new Option("h", "help", false,                        "Show the helpmessage for the CLI Frontend or the action.");          static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");          static final Option CLASS_OPTION = new Option("c", "class", true,                        "Class with theprogram entry point (\"main()\" method). Only needed if the " +                        "JAR file doesnot specify the class in its manifest.");... ...}DefaultParser.java
public CommandLine parse(Options options, String[] arguments,Properties properties, boolean stopAtNonOption)            throws ParseException{    ... ...    if (arguments != null)    {      for (String argument : arguments)      {            handleToken(argument);      }    }... ...}private void handleToken(String token) throwsParseException{    currentToken = token;   if (skipParsing)    {      cmd.addArg(token);    }    else if ("--".equals(token))    {      skipParsing = true;    }    else if (currentOption != null &¤tOption.acceptsArg() && isArgument(token))    {      currentOption.addValueForProcessing(Util.stripLeadingAndTrailingQuotes(token));    }    else if (token.startsWith("--")){         // 解析--情势的参数      handleLongOption(token);    }    else if (token.startsWith("-") && !"-".equals(token)){         // 解析 -情势的参数      handleShortAndLongOption(token);    }    else    {      handleUnknownToken(token);    }   if (currentOption != null && !currentOption.acceptsArg())    {      currentOption = null;    }}private void handleLongOption(String token) throwsParseException{    if (token.indexOf('=') == -1){         //解析–L、-L、--l、-l情势的参数(不包含=)      handleLongOptionWithoutEqual(token);    }    else{         // 解析--L=V、-L=V、--l=V、-l=V情势的参数(包含=)      handleLongOptionWithEqual(token);    }}各种情况的解析,逻辑大要相同:去除-或--前缀,校验参数,以此中一个为例
private void handleLongOptionWithoutEqual(String token)throws ParseException{         // 校验参数是否合法    List matchingOpts = options.getMatchingOptions(token);    if (matchingOpts.isEmpty())    {      handleUnknownToken(currentToken);    }    else if (matchingOpts.size() > 1)    {      throw newAmbiguousOptionException(token, matchingOpts);    }    else{// 参数添加到实行命令      handleOption(options.getOption(matchingOpts.get(0)));    }}Options.java:
public List getMatchingOptions(String opt){         // 去除 -或 -- 前缀    opt = Util.stripLeadingHyphens(opt);       List matchingOpts = newArrayList();   // for a perfect match return the singleoption only    if (longOpts.keySet().contains(opt))    {      return Collections.singletonList(opt);    }   for (String longOpt : longOpts.keySet())    {      if (longOpt.startsWith(opt))      {            matchingOpts.add(longOpt);      }    }       return matchingOpts;}DefaultParser.java
private void handleOption(Option option) throwsParseException{    // check the previous option beforehandling the next one    checkRequiredArgs();   option = (Option) option.clone();   updateRequiredOptions(option);   cmd.addOption(option);   if (option.hasArg())    {      currentOption = option;    }    else    {      currentOption = null;    }}四、选择创建哪种类型的客户端
CliFrontend.java
public static void main(final String[] args) {         ... ...         final List customCommandLines = loadCustomCommandLines(                        configuration,                        configurationDirectory);         ... ...         final CliFrontend cli = new CliFrontend(                                 configuration,                                 customCommandLines);         ... ...}这里依次添加了 Yarn和Default(Standalone)两种客户端(后面根据isActive()选择):
public static List loadCustomCommandLines(Configurationconfiguration, String configurationDirectory) {         ListcustomCommandLines = new ArrayList();         customCommandLines.add(newGenericCLI(configuration, configurationDirectory));          //       Commandline interface of the YARN session, with a special initialization here         //       toprefix all options with y/yarn.         final String flinkYarnSessionCLI ="org.apache.flink.yarn.cli.FlinkYarnSessionCli";         try {                  customCommandLines.add(                        loadCustomCommandLine(flinkYarnSessionCLI,                                 configuration,                                 configurationDirectory,                                 "y",                                 "yarn"));         } catch (NoClassDefFoundError |Exception e) {                  final StringerrorYarnSessionCLI ="org.apache.flink.yarn.cli.FallbackYarnSessionCli";                  try {                        LOG.info("LoadingFallbackYarnSessionCli");                        customCommandLines.add(                                          loadCustomCommandLine(errorYarnSessionCLI,configuration));                  } catch (Exception exception){                        LOG.warn("Couldnot load CLI class {}.", flinkYarnSessionCLI, e);                  }         }          //       Tips:DefaultCLI must be added at last, because getActiveCustomCommandLine(..) willget the         //             active CustomCommandLine in order andDefaultCLI isActive always return true.         customCommandLines.add(new DefaultCLI(configuration));          return customCommandLines;}在run()内里,进行客户端的选择:
protected void run(String[] args) throws Exception {       ... ...       final CustomCommandLine activeCommandLine =                                 validateAndGetActiveCommandLine(checkNotNull(commandLine));... ...}public CustomCommandLine validateAndGetActiveCommandLine(CommandLinecommandLine) {... ...         for (CustomCommandLine cli :customCommandLines) {         ... ...         //在FlinkYarnSessionCli为active时优先返回FlinkYarnSessionCli。                  //对于DefaultCli,它的isActive方法总是返回true。                  if (cli.isActive(commandLine)) {                        return cli;                  }         }... ...}FlinkYarnSessionCli.java => Yarn客户端isActive的判断逻辑:
public boolean isActive(CommandLine commandLine) {         final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(),null);         //是否指定为per-job模式,即指定”-m yarn-cluster”; ID = "yarn-cluster"         final boolean yarnJobManager =ID.equals(jobManagerOption);         // 是否存在flink在yarn的appID,即yarn-session模式是否启动         final boolean hasYarnAppId =commandLine.hasOption(applicationId.getOpt())                        ||configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();         // executor的名字为"yarn-session" 或 "yarn-per-job"         final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equals(configuration.get(DeploymentOptions.TARGET))                         ||YarnJobClusterExecutor.NAME.equals(configuration.get(DeploymentOptions.TARGET));         //      return hasYarnExecutor || yarnJobManager|| hasYarnAppId || (isYarnPropertiesFileMode(commandLine) &&yarnApplicationIdFromYarnProperties != null);}五、获取有效设置
CliFrontend.java
protected void run(String[] args) throws Exception {       ... ...       final Configuration effectiveConfiguration = getEffectiveConfiguration(                                 activeCommandLine,commandLine, programOptions, jobJars);... ...}private Configuration getEffectiveConfiguration(                  final CommandLine commandLine,                  final ProgramOptionsprogramOptions,                  final List jobJars)throws FlinkException {... ...         final Configuration executorConfig =checkNotNull(activeCustomCommandLine)                                 .applyCommandLineOptionsToConfiguration(commandLine);... ...}FlinkYarnSessionCli.java
public Configuration applyCommandLineOptionsToConfiguration(CommandLinecommandLine) throws FlinkException {         // we ignore the addressOption becauseit can only contain "yarn-cluster"         final ConfigurationeffectiveConfiguration = new Configuration(configuration);          applyDescriptorOptionToConfig(commandLine,effectiveConfiguration);          final ApplicationId applicationId =getApplicationId(commandLine);         if (applicationId != null) {                  final StringzooKeeperNamespace;                  if (commandLine.hasOption(zookeeperNamespace.getOpt())){                        zooKeeperNamespace =commandLine.getOptionValue(zookeeperNamespace.getOpt());                  } else {                        zooKeeperNamespace =effectiveConfiguration.getString(HA_CLUSTER_ID, applicationId.toString());                  }                   effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);                  effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID,ConverterUtils.toString(applicationId));                  // TARGET 就是execution.target,目的实行器                  //决定后面什么类型的实行器提交任务:yarn-session、yarn-per-job                  effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);         } else {               effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);         }          if (commandLine.hasOption(jmMemory.getOpt())) {                  String jmMemoryVal =commandLine.getOptionValue(jmMemory.getOpt());                  if(!MemorySize.MemoryUnit.hasUnit(jmMemoryVal)) {                        jmMemoryVal +="m";                  }                  effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY,jmMemoryVal);         }          if (commandLine.hasOption(tmMemory.getOpt())) {                  String tmMemoryVal =commandLine.getOptionValue(tmMemory.getOpt());                  if(!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) {                        tmMemoryVal +="m";                  }                  effectiveConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,MemorySize.parse(tmMemoryVal));         }          if (commandLine.hasOption(slots.getOpt())) {                  effectiveConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,Integer.parseInt(commandLine.getOptionValue(slots.getOpt())));         }... ...}六、调用用户代码的main方法
CliFrontend.java
protected void run(String[] args) throws Exception {       ... ...       executeProgram(effectiveConfiguration,program);... ...}protected void executeProgram(final Configurationconfiguration, final PackagedProgram program) throws ProgramInvocationException{         ClientUtils.executeProgram(newDefaultExecutorServiceLoader(), configuration, program, false, false);}ClientUtils.java
public static void executeProgram(                  PipelineExecutorServiceLoader executorServiceLoader,                  Configuration configuration,                  PackagedProgram program,                  boolean enforceSingleJobExecution,                  boolean suppressSysout) throws ProgramInvocationException {         checkNotNull(executorServiceLoader);         final ClassLoader userCodeClassLoader =program.getUserCodeClassLoader();         final ClassLoader contextClassLoader =Thread.currentThread().getContextClassLoader();         try {                  //设置当前的classloader为用户代码的classloader                  Thread.currentThread().setContextClassLoader(userCodeClassLoader);                   LOG.info("Startingprogram (detached: {})",!configuration.getBoolean(DeploymentOptions.ATTACHED));         //用户代码中的getExecutionEnvironment会返回该Environment                  ContextEnvironment.setAsContext(                        executorServiceLoader,                        configuration,                        userCodeClassLoader,                        enforceSingleJobExecution,                        suppressSysout);                   StreamContextEnvironment.setAsContext(                        executorServiceLoader,                        configuration,                        userCodeClassLoader,                        enforceSingleJobExecution,                        suppressSysout);                   try {                        //调用用户代码的main方法                        program.invokeInteractiveModeForExecution();                  } finally {                        ContextEnvironment.unsetAsContext();                        StreamContextEnvironment.unsetAsContext();                  }         } finally {                  Thread.currentThread().setContextClassLoader(contextClassLoader);         }}PackagedProgram.java
public void invokeInteractiveModeForExecution() throws ProgramInvocationException{
callMainMethod(mainClass, args);
}
private static void callMainMethod(Class entryClass,String[] args) throws ProgramInvocationException {         ... ...         mainMethod = entryClass.getMethod("main", String[].class);         ... ...       //反射调用main函数         mainMethod.invoke(null, (Object) args);         ... ...}七、调用实行情况的execute方法
StreamExecutionEnvironment.java
public JobExecutionResult execute() throwsException {         return execute(DEFAULT_JOB_NAME);}public JobExecutionResult execute(String jobName)throws Exception {         ... ...         return execute(getStreamGraph(jobName));}public JobExecutionResult execute(StreamGraphstreamGraph) throws Exception {         final JobClient jobClient = executeAsync(streamGraph);         ... ...}public JobClient executeAsync(StreamGraph streamGraph)throws Exception {         ... ...         //根据提交模式选择匹配的factory         final PipelineExecutorFactoryexecutorFactory =                  executorServiceLoader.getExecutorFactory(configuration);... ...         //选择合适的executor提交任务         CompletableFuturejobClientFuture = executorFactory                  .getExecutor(configuration)                  .execute(streamGraph, configuration);... ...}<hr>往期精彩内容:
大咖分享 | 通过制作一个迷你版Flink来学习Flink源码
Flink进阶之使用布隆过滤器实现UV统计
Flink进阶之使用CEP实现恶意登陆检测
Flink进阶之滑动窗口统计实时热门商品
用flink能替换spark的批处理功能吗
页: [1]
查看完整版本: 重磅!Flink源码解析环境准备及提交流程之环境准备