`
bit1129
  • 浏览: 1051637 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark二八Spark Shuffle读过程源代码代码剖析

 
阅读更多

Spark的shuffle读操作是性能杀手,原因是Shuffle读操作需要从多个Map节点拉取数据到Reduce节点,所有的Reduce结果是否还要经过一次总计算?

 

 

package spark.examples

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

object SparkWordCount {
  def main(args: Array[String]) {
    System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
    val conf = new SparkConf()
    conf.setAppName("SparkWordCount")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("file:///D:/word.in")
    println(rdd.toDebugString)
    val rdd1 = rdd.flatMap(_.split(" "))
    println("rdd1:" + rdd1.toDebugString)
    val rdd2 = rdd1.map((_, 1))
    println("rdd2:" + rdd2.toDebugString)
    val rdd3 = rdd2.reduceByKey(_ + _);
    println("rdd3:" + rdd3.toDebugString)
    rdd3.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
    sc.stop
  }
}

 

 

 

1. ResultTask的runTask方法

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))
  }

 2. 首先反序列化得到rdd和func,其中rdd是该stage的最后一个RDD(final RDD),而func是就是PairRDDFunctions.scala的saveAsHadoopDataset方法内部定义的writeToFile函数。writeToFile函数实现了wordcount最终的写磁盘操作(rdd3.saveAsTextFile)。在调用writeToFile写磁盘之前,需要首先从Mapper节点读取到数据,然后对它进行整合,这个在ResultTask的runTask方法的最后一句 func(context, rdd.iterator(partition, context))中的rdd.iterator方法实现的。

这在Spark Shuffle写操作过程中分析,就是读取级联的读取它的父RDD的compute方法完成读取的读取操作

 

3.数据读取操作(rdd.iterator)

3.1 RDD类定义的iterator的模板方法,

 

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) { ///如果这个RDD的StorageLevel不为NONE,那么逻辑是什么?
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) 
    } else { //否则计算或者读取Checkpoint检查点
      computeOrReadCheckpoint(split, context)
    }
  }

3.2 WordCount的第二个Stage总共包含两个RDD,最后一个是MappedRDD,第一个是ShuffledMapRDD,MappedRDD从ShuffledMapRDD获取具体的数据,然后执行MappedRDD自身携带的函数,这个函数的定义是在RDD的saveAsTextFile的函数x=>(NullWritable.get(), new Text(x.toString))

  def saveAsTextFile(path: String) {
    this.map(x => (NullWritable.get(), new Text(x.toString)))
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
  }

3.3 MappedRDD的compute方法

 

 

  override def compute(split: Partition, context: TaskContext) =
    firstParent[T].iterator(split, context).map(f) ///调用它的parent RDD的iterator方法,这里是ShuffledRDD
}
 

 

 3.4 ShuffledRDD的compute方法

 

 

 ///split是当前要计算的分片
 override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]

   //ShuffleManager是SortShuffleManager、
   ///获取的ReaderHashShuffleReader,注意此处虽然是Sort based Shuffle,但是读仍然是HashShuffleReader
   ///获取reader的参照是shuffleHandle,分区的头尾,Spark只支持一次读取一个分片
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }
 

 

3.5 SortShuffleManager的getReader方法

 

 

/**
   * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
   * Called on executors by reduce tasks.
   */
  override def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C] = {
    // We currently use the same block store shuffle fetcher as the hash-based shuffle.
    new HashShuffleReader(////构造时,只需要提供ShuffleHandle,parttition开始结束位
      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
  }
 

 

 3.6 HashShuffleReader的read方法

 

 

  /** Read the combined key-values for this reduce task */
  override def read(): Iterator[Product2[K, C]] = {
    val ser = Serializer.getSerializer(dep.serializer)///获取序列化器

    ///关键方法,根据shuffleId获取迭代数据的Iterator
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
    
    ///对数据进行aggregate,得到一个整合后的Iterator
    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      
      ////Reduce端进行结果合并,map端是否已经combine过,采用两种不同的方式
      ////如果map端做了combine,那么调用combineCombinersByKey,
      if (dep.mapSideCombine) {
        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
      } else {
        ////如果map端没combine,那么调用combineValuesByKey,这个应该是跟map端做combine使用相同的方法
        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
      }
    } else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
      throw new IllegalStateException("Aggregator is empty for map-side combine")
    } else {
      // Convert the Product2s to pairs since this is what downstream RDDs currently expect
      iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
    }

    // Sort the output if there is a sort ordering defined.

    ///继续对数据进行排序操作,如果定义了keyOrdering,如果没有定义keyOrdering,那么如何定义之?
    ///对Key进行排序?
    dep.keyOrdering match {
      case Some(keyOrd: Ordering[K]) =>
        // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
        // the ExternalSorter won't spill to disk.
        val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
        sorter.insertAll(aggregatedIter)
        context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
        context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
        sorter.iterator
      case None =>
        aggregatedIter
    }
  }
 

 

3.7 BlockStoreShuffleFetcher伴生对象只定义了一个fetch方法

 

 

private[hash] object BlockStoreShuffleFetcher extends Logging {
  def fetch[T](
      shuffleId: Int, ////shuffleId,由dep.shuffleHandle.shuffleId提供
      reduceId: Int,  ///startPartition被理解为reduceId?
      context: TaskContext,
      serializer: Serializer)
    : Iterator[T] =
  {
    logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
    val blockManager = SparkEnv.get.blockManager //获取blockManager

    val startTime = System.currentTimeMillis
    ///根据shuffleId和reduceId获取statuses对象,因为MapOutputTracker存放在SparkEnv中,SparkEnv类似于集群级别的共享变量
    ///简单的说就是获取MapOutputputLocation
    /// Called from executors to get the server URIs and output sizes of the map outputs of a given shuffle.
    ///statuses是一个二元元组的集合,每个元组的第一个元素是BlockManagerId对象(包含三方面的信息,executorId_,host_,port_),第二个是数据的长度
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
    logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
      shuffleId, reduceId, System.currentTimeMillis - startTime))
    
    ////BlockManagerId封装了什么信息?
    ///ArrayBuffer的元素是Tuple二元组类型,分别是Int和Long类型
    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
    
     
    //statuses.zipWithIndex是个二元组,第一元素又是一个二元元组
    ///address表示BlockManagerId对象
    ///size表示数据大小
    ///index表示索引?有什么用?
    for (((address, size), index) <- statuses.zipWithIndex) {
      ///这是什么操作?根据address获取一个二元组,然后把(index,size)赋值给它
      ///splitsByAddress此时有一个元素(address,(index,size))
      splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
    }
    
    ///构造blockByAddress,得到一个Seq,元素类型是元组:(BlockManagerId, Seq[(BlockId, Long)])
    ///元组的第一个元素是address,元组的第二个元素是一个新元组的集合,每个元组的第一个是一个元素是BlockId(由shuffleId,index, reduceId),第二个元素是数据的size
    val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
      case (address, splits) =>
        (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
    }
    
    ///定义一个内部函数
    def unpackBlock(blockPair: (BlockId, Try[Iterator[Any]])) : Iterator[T] = {
      val blockId = blockPair._1
      val blockOption = blockPair._2
      blockOption match {
        case Success(block) => {
          block.asInstanceOf[Iterator[T]]
        }
        case Failure(e) => {
          blockId match {
            case ShuffleBlockId(shufId, mapId, _) =>
              val address = statuses(mapId.toInt)._1
              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
            case _ =>
              throw new SparkException(
                "Failed to get block " + blockId + ", which is not a shuffle block", e)
          }
        }
      }
    }
    
    ///这是什么类?
    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      SparkEnv.get.blockManager.shuffleClient,
      blockManager,
      blocksByAddress,
      serializer,
      SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
    
    ///对blockFetcherItr集合ununpackBlock方法然后压平结果
    val itr = blockFetcherItr.flatMap(unpackBlock)
    
    ///构造 CompletionIterator对象?
    val completionIter = CompletionIterator[T, Iterator[T]](itr, {
      context.taskMetrics.updateShuffleReadMetrics()
    })
    ///包装了completionIter迭代器,组合模式
    new InterruptibleIterator[T](context, completionIter)
  }
}
 

 3.8 ShuffleBlockFetcherIterator类

a. 从下面的类文档中可以看出,ShuffleBlockFetcherIterator类,用于fetch数据块,对于位于本地的block,从本地BlockManager获取;对于远端的数据块,使用BlockTransferService获取数据(BlockTransferService使用Netty作为底层的数据传输模块)

b. 最后的结果是获取了一个迭代器,迭代的每条记录是一个(BlockId,values)元组

c. throttle的含义是节流阀

 

/**
 * An iterator that fetches multiple blocks. For local blocks, it fetches from the local block
 * manager. For remote blocks, it fetches them using the provided BlockTransferService.
 *
 * This creates an iterator of (BlockID, values) tuples so the caller can handle blocks in a
 * pipelined fashion as they are received.
 *
 * The implementation throttles the remote fetches to they don't exceed maxBytesInFlight to avoid
 * using too much memory.
 *
 * @param context [[TaskContext]], used for metrics update
 * @param shuffleClient [[ShuffleClient]] for fetching remote blocks
 * @param blockManager [[BlockManager]] for reading local blocks
 * @param blocksByAddress list of blocks to fetch grouped by the [[BlockManagerId]].
 *                        For each block we also require the size (in bytes as a long field) in
 *                        order to throttle the memory usage.
 * @param serializer serializer used to deserialize the data.
 * @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point.
 */

 

d.在BlockStoreShuffleFetcher中,ShuffleBlockFetcherIterator的构造如下,对比下构造函数

注意ShuffleClient是从SparkEnv的BlockManager获取的,

    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      SparkEnv.get.blockManager.shuffleClient,
      blockManager,
      blocksByAddress,
      serializer,
      SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)

 

e. ShuffleClient在BlockManager中的定义

 

  // Client to read other executors' shuffle files. This is either an external service, or just the
  // standard BlockTranserService to directly connect to other Executors.
  private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { //启用了外部ExternalShuffleService
    val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
    new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
  } else {
    blockTransferService //否则使用blockTransferService: BlockTransferService,在Spark1.2中使用了基于Netty的数据传输NettyBlockTransferService
  }

 

 3.9 BlockStoreShuffleFetcher的fetch方法调用blockFetcherItr.flatMap(unpackBlock)解析

 

结论:

 

在Reduce阶段,还会执行combine操作,进行深度的合并的操作,这是必然的,假如不同的Mapper上有多个单词A,那么计数的结果应该是把所有Mapper上的A进行合并得到最后的结果。

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics