spark core源码分析8 从简单例子看transformation

前面提到过spark自带的一个最简单的例子,也介绍了SparkContext的部分,这节介绍剩余的内容中的transformation。

object SparkPi { def main(args: Array[String]) {val conf = new SparkConf().setAppName("Spark Pi")val spark = new SparkContext(conf)val slices = if (args.length > 0) args(0).toInt else 2val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflowval count = spark.parallelize(1 until n, slices).map { i =>val x = random * 2 – 1val y = random * 2 – 1if (x*x + y*y < 1) 1 else 0}.reduce(_ + _)println("Pi is roughly " + 4.0 * count / n)spark.stop() }}

大家都在书面化的说RDD是什么,我还不如用源码的方式展示更为直观。可以看出来,最基本的RDD其实就是sparkContext和它本身的依赖。

abstract class RDD[T: ClassTag](@transient private var _sc: SparkContext,@transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { … …}我们顺着看parallelize方法的内容。seq是一个scala集合类型。numSlices是设置的并行度,有默认值(配置项或者driver获得的Executor注册上来的总的core)override def defaultParallelism(): Int = { conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))}def parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {assertNotStopped()new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) }

我们继续看ParallelCollectionRDD的内部。它重写了RDD的一些方法,并设置依赖为Nil

private[spark] class ParallelCollectionRDD[T: ClassTag](@transient sc: SparkContext,@transient data: Seq[T],numSlices: Int,locationPrefs: Map[Int, Seq[String]])extends RDD[T](sc, Nil) { // TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split // instead. // UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal. override def getPartitions: Array[Partition] = {val slices = ParallelCollectionRDD.slice(data, numSlices).toArrayslices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray } override def compute(s: Partition, context: TaskContext): Iterator[T] = {new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator) } override def getPreferredLocations(s: Partition): Seq[String] = {locationPrefs.getOrElse(s.index, Nil) }}

接着,在这个ParallelCollectionRDD上执行map操作。其实就是执行了RDD抽象类中的map方法。

/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }map方法中的f参数其实就是我们程序中写的map中的操作,之后产生了MapPartitionsRDD。

prev参数就是执行map转换之前的ParallelCollectionRDD

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](prev: RDD[T],f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)preservesPartitioning: Boolean = false) extends RDD[U](prev) { override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def getPartitions: Array[Partition] = firstParent[T].partitions override def compute(split: Partition, context: TaskContext): Iterator[U] =f(context, split.index, firstParent[T].iterator(split, context))}

查看父类RDD的构造过程,其实就是默认构建一个one-to-one的依赖,OneToOneDependency意味着RDD的转换过程是一对一的,即分区号是一一对应的。注明的是,依赖中保存的RDD是转换之前的RDD,这里的话就是ParallelCollectionRDD

/** Construct an RDD with just a one-to-one dependency on one parent */ def this(@transient oneParent: RDD[_]) =this(oneParent.context , List(new OneToOneDependency(oneParent)))partitioner其实就是包含了分区的数量以及根据key获得分区号的一个包装。abstract class Partitioner extends Serializable { def numPartitions: Int def getPartition(key: Any): Int}到这里为止,map操作就介绍结束了。同样也没有触发任务的调度,只是RDD的转换。

现在看最后的reduce操作

/** * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */def reduce(f: (T, T) => T): T = withScope { val cleanF = sc.clean(f) //定义一个具体的方法,将迭代器参数中的值从左一个个相加 val reducePartition: Iterator[T] => Option[T] = iter => {if (iter.hasNext) {Some(iter.reduceLeft(cleanF))} else {None} } var jobResult: Option[T] = None //定义一个将各分区合并的方法 val mergeResult = (index: Int, taskResult: Option[T]) => {if (taskResult.isDefined) {jobResult = jobResult match {case Some(value) => Some(f(value, taskResult.get))case None => taskResult}} } //执行runJob sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))}最终调用这个runJob方法,参数:rdd:MapPartitionsRDDfunc:包装了reducePartition方法partitions:0 until rdd.partitions.size,rdd.partitions就是调用了MapPartitionsRDD的getPartitions方法所得。每次转换之后的RDD都会保存之前所有的依赖,所以可以根据依赖关系追溯到第一个RDD。这里MapPartitionsRDD的getPartitions方法是获取第一个RDD的getPartitions方法。allowLocal:falseresultHandler:mergeResult方法/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. The allowLocal * flag specifies whether the scheduler can run the computation on the driver rather than * shipping it out to the cluster, for short actions like first(). */ def runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],allowLocal: Boolean,resultHandler: (Int, U) => Unit) {if (stopped.get()) {throw new IllegalStateException("SparkContext has been shutdown")}val callSite = getCallSiteval cleanedFunc = clean(func)logInfo("Starting job: " + callSite.shortForm)if (conf.getBoolean("spark.logLineage", false)) {logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)}dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,resultHandler, localProperties.get)progressBar.foreach(_.finishAll())rdd.doCheckpoint() }这里最终调用了DAGScheduler的runJob方法 -> submitJob这里产生JobId,,发送JobSubmitted消息到DAG的事件循环中 -> handleJobSubmitted所以,这个reduce实际触发了runJob操作,即spark中称为Action。下一节,就介绍剩下的reduce操作

版权声明:本文为博主原创文章,未经博主允许不得转载。

午餐,晚餐。或许吃得不好,可是却依旧为对方擦去嘴角的油渍。

spark core源码分析8 从简单例子看transformation

相关文章:

你感兴趣的文章:

标签云: