Spark技术内幕: Shuffle详解(二)

本文主要关注ShuffledRDD的Shuffle Read是如何从其他的node上读取数据的。

上文讲到了获取如何获取的策略都在org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator#splitLocalRemoteBlocks中。可以见注释。

protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {// Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them// smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5// nodes, rather than blocking on reading output from one node.// 为了快速的得到数据,每次都会启动5个线程去最多5个node上取数据;// 每次请求的数据不会超过spark.reducer.maxMbInFlight(默认值为48MB) / 5。// 这样做的原因有几个:// 1. 避免占用目标机器的过多带宽,在千兆网卡为主流的今天,带宽还是比较重要的。// 如果一个连接将要占用48M的带宽,,这个Network IO可能会成为瓶颈。// 2. 请求数据可以平行化,这样请求数据的时间可以大大减少。请求数据的总时间就是那个请求最长的。// 如果不是并行请求,那么总时间将是所有的请求时间之和。// 而设置spark.reducer.maxMbInFlight,也是为了不要占用过多的内存val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)// Split local and remote blocks. Remote blocks are further split into FetchRequests of size// at most maxBytesInFlight in order to limit the amount of data in flight.val remoteRequests = new ArrayBuffer[FetchRequest]var totalBlocks = 0for ((address, blockInfos) <- blocksByAddress) { // address实际上是executor_idtotalBlocks += blockInfos.sizeif (address == blockManagerId) { //数据在本地,那么直接走local read// Filter out zero-sized blockslocalBlocksToFetch ++= blockInfos.filter(_._2 != 0).map(_._1)_numBlocksToFetch += localBlocksToFetch.size} else {val iterator = blockInfos.iteratorvar curRequestSize = 0Lvar curBlocks = new ArrayBuffer[(BlockId, Long)]while (iterator.hasNext) {// blockId 是org.apache.spark.storage.ShuffleBlockId,// 格式:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceIdval (blockId, size) = iterator.next()// Skip empty blocksif (size > 0) { //过滤掉为大小为0的文件curBlocks += ((blockId, size))remoteBlocksToFetch += blockId_numBlocksToFetch += 1curRequestSize += size} else if (size < 0) {throw new BlockException(blockId, "Negative block size " + size)}if (curRequestSize >= targetRequestSize) { // 避免一次请求的数据量过大// Add this FetchRequestremoteRequests += new FetchRequest(address, curBlocks)curBlocks = new ArrayBuffer[(BlockId, Long)]logDebug(s"Creating fetch request of $curRequestSize at $address")curRequestSize = 0}}// Add in the final requestif (!curBlocks.isEmpty) { // 将剩余的请求放到最后一个request中。remoteRequests += new FetchRequest(address, curBlocks)}}}logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +totalBlocks + " blocks")remoteRequests}

美不美乡中水,亲不亲故乡人。

Spark技术内幕: Shuffle详解(二)

相关文章:

你感兴趣的文章:

标签云: