Spark源码阅读:第一节 Spark应用程序提交流程

作者:摇摆少年梦 微信号: zhouzhihubeyond

spark-submit 脚本应用程序提交流程

在运行Spar应用程序时,会将spark应用程序打包后使用spark-submit脚本提交到Spark中运行,执行提交命令如下:

root@sparkmaster:/hadoopLearning/spark-1.5.0-bin-hadoop2.4/bin# ./spark-submit –master spark://sparkmaster:7077 –class SparkWordCount –executor-memory 1g /root/IdeaProjects/SparkWordCount/out/artifacts/SparkWord Count_jar/SparkWordCount.jar hdfs://ns1/README.md hdfs://ns1/SparkWordCountResult

为弄清楚整个流程,我们先来分析一下spark-submit脚本,spark-submit脚本内容如下:

#!/usr/bin/env bashSPARK_HOME=PYTHONHASHSEED=”/bin/spark-class org.apache.spark.deploy.SparkSubmit “$@”

spark-class脚本会加载spark配置的环境变量信息、定位依赖包spark-assembly-1.5.0-hadoop2.4.0.jar文件(以spark1.5.0为例)等,,然后再调用org.apache.spark.launcher.Main正式启动Spark应用程序的运行,具体如下:

SPARK_HOME=. “$SPARK_HOME”/bin/load-spark-env.sh[ -n “${JAVA_HOME}” ]; then RUNNER=[ `command -v java` ]; thenRUNNER=>&SPARK_ASSEMBLY_JAR=if [ -f “$SPARK_HOME/RELEASE” ]; then ASSEMBLY_DIR=”$SPARK_HOME/lib”else ASSEMBLY_DIR=”$SPARK_HOME/assembly/target/scala-$SPARK_SCALA_VERSION”finum_jars=-z .” >&ASSEMBLY_JARS=]; :” ” >&SPARK_ASSEMBLY_JAR=”${ASSEMBLY_DIR}/${ASSEMBLY_JARS}”LAUNCH_CLASSPATH=[ -n “$SPARK_PREPEND_CLASSES” ]; then LAUNCH_CLASSPATH=_SPARK_ASSEMBLY=CMD=()-r ARG; do CMD+=(“$ARG”)done < <(“$RUNNER” -cp “$LAUNCH_CLASSPATH” org.apache.spark.launcher.Main “$@”)exec “${CMD[@]}”

从上述代码中,可以看到,通过org.apache.spark.launcher.Main类启动org.apache.spark.deploy.SparkSubmit的执行,SparkSubmit部分源码如下:

//SparkSubmit Main方法def main(args: Array[String]): Unit = {//任务提交时设置的参数,见图2val appArgs = new SparkSubmitAarguments(args)if (appArgs.verbose) {// scalastyle:off printlnprintStream.println(appArgs)// scalastyle:on println}appArgs.action match {//任务提交时,执行submit(appArgs)case SparkSubmitAction.SUBMIT => submit(appArgs)case SparkSubmitAction.KILL => kill(appArgs)case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)} }

图1 appArgs = new SparkSubmitAarguments(args)参数

进入submit方法:

/** * Submit the application using the provided parameters. * * This runs in two steps. First, we prepare the launch environment by setting up * the appropriate classpath, system properties, and application arguments for * running the child main class based on the cluster manager and the deploy mode. * Second, we use this launch environment to invoke the main method of the child * main class. */ private def submit(args: SparkSubmitArguments): Unit = { //运行参数等信息,见图2val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)//定义在submit方法中的方法doRunMain()def doRunMain(): Unit = {if (args.proxyUser != null) {val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,UserGroupInformation.getCurrentUser())try {proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {override def run(): Unit = {//执行runMain方法runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)}})} catch {case e: Exception =>(e.getStackTrace().length == 0) {// scalastyle:off printlnprintStream.println(s”ERROR: ${e.getClass().getName()}: ${e.getMessage()}”)// scalastyle:on printlnexitFn(1)} else {throw e}}} else {//执行runMain方法runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)}}(args.isStandaloneCluster && args.useRest) {try {// scalastyle:off printlnprintStream.println(“Running Spark using the REST application submission protocol.”)// scalastyle:on println//调用submit方法中的doRunMain方法doRunMain()} catch {// Fail over to use the legacy submission gatewaycase e: SubmitRestConnectionException =>printWarning(s”Master endpoint ${args.master} was not a REST server. ” +”Falling back to legacy submission gateway instead.”)args.useRest = falsesubmit(args)}// In all other modes, just run the main class as prepared} else {//调用submit方法中的doRunMain方法doRunMain()} }

图2 任务提交时设置的参数,

没有行李,没有背包,不带电脑更不要手机,

Spark源码阅读:第一节 Spark应用程序提交流程

相关文章:

你感兴趣的文章:

标签云: