Flink Yarn-per-job模式提交流程如图所示:
一、步伐起点
1. flink\bin\flink
=> exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}"-classpath "`manglePathList"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
2. flink/bin/config.sh(相关情况设置都在这里)
=> JAVA_RUN=java
=> JVM_ARGS="" => # Use conf/flink-conf.yaml
=>INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
3. 实行java -cp 就会开启JVM假造机,在假造机上开启CliFrontend进程,然后开始实行main方法
说明:java -cp和 -classpath一样,是指定类运行所依赖其他类的路径。
java -cp =》开启JVM假造机 =》开启Process(CliFrontend)=》步伐入口CliFrontend.main
4. Flink提交任务的入口类为CliFrontend。找到这个类的main方法:
在IDEA中全局查找(ctrl + n):org.apache.flink.client.cli.CliFrontend,找到CliFrontend类,并找到main方法
/*** Submits the job based on the arguments.(根据参数提交作业)*/public static void main(final String[] args) { ... ... final CliFrontend cli = new CliFrontend( configuration, customCommandLines); ... ...}二、步伐入口
CliFrontend.java
public static void main(final String[] args) { ... ... final CliFrontend cli = new CliFrontend( configuration, customCommandLines); ... ... int retCode =SecurityUtils.getInstalledContext() .runSecured(() -> cli.parseParameters(args)); ... ...}public int parseParameters(String[] args) { ... ... // get action String action = args[0]; // remove action from parameters final String[] params =Arrays.copyOfRange(args, 1, args.length); // do action switch (action) { case ACTION_RUN: run(params); return 0; case ACTION_LIST: list(params); return 0; case ACTION_INFO: info(params); return 0; case ACTION_CANCEL: cancel(params); return 0; case ACTION_STOP: stop(params); return 0; case ACTION_SAVEPOINT: savepoint(params); return 0; …… } ... ...}三、解析输入参数
CliFrontend.java
protected void run(String[] args) throws Exception { ... ... //获取默认的运行参数 final Options commandOptions =CliFrontendParser.getRunCommandOptions(); // 解析参数,返回commandLine final CommandLine commandLine = getCommandLine(commandOptions, args, true); ... ...}public CommandLine getCommandLine(final OptionscommandOptions, final String[] args, final boolean stopAtNonOptions) throwsCliArgsException { final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions,customCommandLineOptions); return CliFrontendParser.parse(commandLineOptions, args,stopAtNonOptions);}DefaultParser.javaDefaultParser.java
public class CliFrontendParser { // 选项列表 static final Option HELP_OPTION = new Option("h", "help", false, "Show the helpmessage for the CLI Frontend or the action."); static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file."); static final Option CLASS_OPTION = new Option("c", "class", true, "Class with theprogram entry point (\"main()\" method). Only needed if the " + "JAR file doesnot specify the class in its manifest.");... ...}DefaultParser.java
public CommandLine parse(Options options, String[] arguments,Properties properties, boolean stopAtNonOption) throws ParseException{ ... ... if (arguments != null) { for (String argument : arguments) { handleToken(argument); } }... ...}private void handleToken(String token) throwsParseException{ currentToken = token; if (skipParsing) { cmd.addArg(token); } else if ("--".equals(token)) { skipParsing = true; } else if (currentOption != null &¤tOption.acceptsArg() && isArgument(token)) { currentOption.addValueForProcessing(Util.stripLeadingAndTrailingQuotes(token)); } else if (token.startsWith("--")){ // 解析--情势的参数 handleLongOption(token); } else if (token.startsWith("-") && !"-".equals(token)){ // 解析 -情势的参数 handleShortAndLongOption(token); } else { handleUnknownToken(token); } if (currentOption != null && !currentOption.acceptsArg()) { currentOption = null; }}private void handleLongOption(String token) throwsParseException{ if (token.indexOf('=') == -1){ //解析–L、-L、--l、-l情势的参数(不包含=) handleLongOptionWithoutEqual(token); } else{ // 解析--L=V、-L=V、--l=V、-l=V情势的参数(包含=) handleLongOptionWithEqual(token); }}各种情况的解析,逻辑大要相同:去除-或--前缀,校验参数,以此中一个为例
private void handleLongOptionWithoutEqual(String token)throws ParseException{ // 校验参数是否合法 List matchingOpts = options.getMatchingOptions(token); if (matchingOpts.isEmpty()) { handleUnknownToken(currentToken); } else if (matchingOpts.size() > 1) { throw newAmbiguousOptionException(token, matchingOpts); } else{// 参数添加到实行命令 handleOption(options.getOption(matchingOpts.get(0))); }}Options.java:
public List getMatchingOptions(String opt){ // 去除 -或 -- 前缀 opt = Util.stripLeadingHyphens(opt); List matchingOpts = newArrayList(); // for a perfect match return the singleoption only if (longOpts.keySet().contains(opt)) { return Collections.singletonList(opt); } for (String longOpt : longOpts.keySet()) { if (longOpt.startsWith(opt)) { matchingOpts.add(longOpt); } } return matchingOpts;}DefaultParser.java
private void handleOption(Option option) throwsParseException{ // check the previous option beforehandling the next one checkRequiredArgs(); option = (Option) option.clone(); updateRequiredOptions(option); cmd.addOption(option); if (option.hasArg()) { currentOption = option; } else { currentOption = null; }}四、选择创建哪种类型的客户端
CliFrontend.java
public static void main(final String[] args) { ... ... final List customCommandLines = loadCustomCommandLines( configuration, configurationDirectory); ... ... final CliFrontend cli = new CliFrontend( configuration, customCommandLines); ... ...}这里依次添加了 Yarn和Default(Standalone)两种客户端(后面根据isActive()选择):
public static List loadCustomCommandLines(Configurationconfiguration, String configurationDirectory) { ListcustomCommandLines = new ArrayList(); customCommandLines.add(newGenericCLI(configuration, configurationDirectory)); // Commandline interface of the YARN session, with a special initialization here // toprefix all options with y/yarn. final String flinkYarnSessionCLI ="org.apache.flink.yarn.cli.FlinkYarnSessionCli"; try { customCommandLines.add( loadCustomCommandLine(flinkYarnSessionCLI, configuration, configurationDirectory, "y", "yarn")); } catch (NoClassDefFoundError |Exception e) { final StringerrorYarnSessionCLI ="org.apache.flink.yarn.cli.FallbackYarnSessionCli"; try { LOG.info("LoadingFallbackYarnSessionCli"); customCommandLines.add( loadCustomCommandLine(errorYarnSessionCLI,configuration)); } catch (Exception exception){ LOG.warn("Couldnot load CLI class {}.", flinkYarnSessionCLI, e); } } // Tips:DefaultCLI must be added at last, because getActiveCustomCommandLine(..) willget the // active CustomCommandLine in order andDefaultCLI isActive always return true. customCommandLines.add(new DefaultCLI(configuration)); return customCommandLines;}在run()内里,进行客户端的选择:
protected void run(String[] args) throws Exception { ... ... final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine));... ...}public CustomCommandLine validateAndGetActiveCommandLine(CommandLinecommandLine) {... ... for (CustomCommandLine cli :customCommandLines) { ... ... //在FlinkYarnSessionCli为active时优先返回FlinkYarnSessionCli。 //对于DefaultCli,它的isActive方法总是返回true。 if (cli.isActive(commandLine)) { return cli; } }... ...}FlinkYarnSessionCli.java => Yarn客户端isActive的判断逻辑:
public boolean isActive(CommandLine commandLine) { final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(),null); //是否指定为per-job模式,即指定”-m yarn-cluster”; ID = "yarn-cluster" final boolean yarnJobManager =ID.equals(jobManagerOption); // 是否存在flink在yarn的appID,即yarn-session模式是否启动 final boolean hasYarnAppId =commandLine.hasOption(applicationId.getOpt()) ||configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent(); // executor的名字为"yarn-session" 或 "yarn-per-job" final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equals(configuration.get(DeploymentOptions.TARGET)) ||YarnJobClusterExecutor.NAME.equals(configuration.get(DeploymentOptions.TARGET)); // return hasYarnExecutor || yarnJobManager|| hasYarnAppId || (isYarnPropertiesFileMode(commandLine) &&yarnApplicationIdFromYarnProperties != null);}五、获取有效设置
CliFrontend.java
protected void run(String[] args) throws Exception { ... ... final Configuration effectiveConfiguration = getEffectiveConfiguration( activeCommandLine,commandLine, programOptions, jobJars);... ...}private Configuration getEffectiveConfiguration( final CommandLine commandLine, final ProgramOptionsprogramOptions, final List jobJars)throws FlinkException {... ... final Configuration executorConfig =checkNotNull(activeCustomCommandLine) .applyCommandLineOptionsToConfiguration(commandLine);... ...}FlinkYarnSessionCli.java
public Configuration applyCommandLineOptionsToConfiguration(CommandLinecommandLine) throws FlinkException { // we ignore the addressOption becauseit can only contain "yarn-cluster" final ConfigurationeffectiveConfiguration = new Configuration(configuration); applyDescriptorOptionToConfig(commandLine,effectiveConfiguration); final ApplicationId applicationId =getApplicationId(commandLine); if (applicationId != null) { final StringzooKeeperNamespace; if (commandLine.hasOption(zookeeperNamespace.getOpt())){ zooKeeperNamespace =commandLine.getOptionValue(zookeeperNamespace.getOpt()); } else { zooKeeperNamespace =effectiveConfiguration.getString(HA_CLUSTER_ID, applicationId.toString()); } effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace); effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID,ConverterUtils.toString(applicationId)); // TARGET 就是execution.target,目的实行器 //决定后面什么类型的实行器提交任务:yarn-session、yarn-per-job effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME); } else { effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME); } if (commandLine.hasOption(jmMemory.getOpt())) { String jmMemoryVal =commandLine.getOptionValue(jmMemory.getOpt()); if(!MemorySize.MemoryUnit.hasUnit(jmMemoryVal)) { jmMemoryVal +="m"; } effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY,jmMemoryVal); } if (commandLine.hasOption(tmMemory.getOpt())) { String tmMemoryVal =commandLine.getOptionValue(tmMemory.getOpt()); if(!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) { tmMemoryVal +="m"; } effectiveConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,MemorySize.parse(tmMemoryVal)); } if (commandLine.hasOption(slots.getOpt())) { effectiveConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,Integer.parseInt(commandLine.getOptionValue(slots.getOpt()))); }... ...}六、调用用户代码的main方法
CliFrontend.java
protected void run(String[] args) throws Exception { ... ... executeProgram(effectiveConfiguration,program);... ...}protected void executeProgram(final Configurationconfiguration, final PackagedProgram program) throws ProgramInvocationException{ ClientUtils.executeProgram(newDefaultExecutorServiceLoader(), configuration, program, false, false);}ClientUtils.java
public static void executeProgram( PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, PackagedProgram program, boolean enforceSingleJobExecution, boolean suppressSysout) throws ProgramInvocationException { checkNotNull(executorServiceLoader); final ClassLoader userCodeClassLoader =program.getUserCodeClassLoader(); final ClassLoader contextClassLoader =Thread.currentThread().getContextClassLoader(); try { //设置当前的classloader为用户代码的classloader Thread.currentThread().setContextClassLoader(userCodeClassLoader); LOG.info("Startingprogram (detached: {})",!configuration.getBoolean(DeploymentOptions.ATTACHED)); //用户代码中的getExecutionEnvironment会返回该Environment ContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); StreamContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); try { //调用用户代码的main方法 program.invokeInteractiveModeForExecution(); } finally { ContextEnvironment.unsetAsContext(); StreamContextEnvironment.unsetAsContext(); } } finally { Thread.currentThread().setContextClassLoader(contextClassLoader); }}PackagedProgram.java
public void invokeInteractiveModeForExecution() throws ProgramInvocationException{
callMainMethod(mainClass, args);
}
private static void callMainMethod(Class entryClass,String[] args) throws ProgramInvocationException { ... ... mainMethod = entryClass.getMethod("main", String[].class); ... ... //反射调用main函数 mainMethod.invoke(null, (Object) args); ... ...}七、调用实行情况的execute方法
StreamExecutionEnvironment.java
public JobExecutionResult execute() throwsException { return execute(DEFAULT_JOB_NAME);}public JobExecutionResult execute(String jobName)throws Exception { ... ... return execute(getStreamGraph(jobName));}public JobExecutionResult execute(StreamGraphstreamGraph) throws Exception { final JobClient jobClient = executeAsync(streamGraph); ... ...}public JobClient executeAsync(StreamGraph streamGraph)throws Exception { ... ... //根据提交模式选择匹配的factory final PipelineExecutorFactoryexecutorFactory = executorServiceLoader.getExecutorFactory(configuration);... ... //选择合适的executor提交任务 CompletableFuturejobClientFuture = executorFactory .getExecutor(configuration) .execute(streamGraph, configuration);... ...}<hr>往期精彩内容:
大咖分享 | 通过制作一个迷你版Flink来学习Flink源码
Flink进阶之使用布隆过滤器实现UV统计
Flink进阶之使用CEP实现恶意登陆检测
Flink进阶之滑动窗口统计实时热门商品
用flink能替换spark的批处理功能吗 |