Spark源码阅读:第三节 Spark Job的提交

前一我们分析了SparkContext的创建,这一节,我们介绍在RDD执行的时候,如何提交job进行分析,同样是下面的源码:

import org.apache.spark.{SparkConf, SparkContext}object SparkWordCount{ def main(args: Array[String]) {if (args.length == 0) {System.err.println(“Usage: SparkWordCount <inputfile> <outputfile>”)System.exit(1)}val conf = new SparkConf().setAppName(“SparkWordCount”)val sc = new SparkContext(conf)val file=sc.textFile(“file:///hadoopLearning/spark-1.5.1-bin-hadoop2.4/README.md”)val counts=file.flatMap(line=>line.split(” “)).map(word=>(word,1)).reduceByKey(_+_)counts.saveAsTextFile(“file:///hadoopLearning/spark-1.5.1-bin-hadoop2.4/countReslut.txt”) }}

上面的程序代码counts.saveAsTextFile(“file:///hadoopLearning/spark-1.5.1-bin-hadoop2.4/countReslut.txt”)会触发action操作,Spark会生成一个Job来执行相关计算

//将RDD保存为Hadoop支持的文件系统,包括本地文件、HDFS等,使用的是Hadoop的OutputFormat类/** * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. */ def saveAsHadoopFile(path: String,keyClass: Class[_],valueClass: Class[_],outputFormatClass: Class[_ <: OutputFormat[_, _]],conf: JobConf = new JobConf(self.context.hadoopConfiguration),codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {hadoopConf = confhadoopConf.setOutputKeyClass(keyClass)hadoopConf.setOutputValueClass(valueClass)hadoopConf.set(“mapred.output.format.class”, outputFormatClass.getName)for (c <- codec) {hadoopConf.setCompressMapOutput(true)hadoopConf.set(“mapred.output.compress”, “true”)hadoopConf.setMapOutputCompressorClass(c)hadoopConf.set(“mapred.output.compression.codec”, c.getCanonicalName)hadoopConf.set(“mapred.output.compression.type”, CompressionType.BLOCK.toString)}// Use configured output committer if already setif (conf.getOutputCommitter == null) {hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])}FileOutputFormat.setOutputPath(hadoopConf,SparkHadoopWriter.createPathFromString(path, hadoopConf))//调用saveAsHadoopDataset方法进行RDD保存saveAsHadoopDataset(hadoopConf) }

跳转到saveAsHadoopDataset,并调用其self.context.runJob即SparkContext中的runJob方法

/** * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for * that storage system. The JobConf should set an OutputFormat and any output paths required * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop * MapReduce job. */ def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).val hadoopConf = confval wrappedConf = new SerializableConfiguration(hadoopConf)val outputFormatInstance = hadoopConf.getOutputFormatval keyClass = hadoopConf.getOutputKeyClassval valueClass = hadoopConf.getOutputValueClassif (outputFormatInstance == null) {throw new SparkException(“Output format class not set”)}if (keyClass == null) {throw new SparkException(“Output key class not set”)}if (valueClass == null) {throw new SparkException(“Output value class not set”)}SparkHadoopUtil.get.addCredentials(hadoopConf)logDebug(“Saving as hadoop file of type (” + keyClass.getSimpleName + “, ” +valueClass.getSimpleName + “)”)if (isOutputSpecValidationEnabled) {// FileOutputFormat ignores the filesystem parameterval ignoredFs = FileSystem.get(hadoopConf)hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)}val writer = new SparkHadoopWriter(hadoopConf)writer.preSetup()val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {val config = wrappedConf.valuetaskAttemptId = (context.taskAttemptId % Int.MaxValue).toIntval (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)writer.setup(context.stageId, context.partitionId, taskAttemptId)writer.open()var recordsWritten = 0LUtils.tryWithSafeFinally {while (iter.hasNext) {val record = iter.next()writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])// Update bytes written metric every few recordsmaybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)recordsWritten += 1}} {writer.close()}writer.commit()bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }outputMetrics.setRecordsWritten(recordsWritten)}//调用runJob方法执行RDD计算self.context.runJob(self, writeToFile)writer.commitJob() }背着背包的路上,看过许多人,听过许多故事,

Spark源码阅读:第三节 Spark Job的提交

相关文章:

你感兴趣的文章:

标签云: