`
bit1129
  • 浏览: 1051396 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark七十一】Hash Based Shuffle之三Shuffle Write Map side combine

 
阅读更多

在这篇http://bit1129.iteye.com/blog/2186325博文中,分析了hash based shuffle write开启consolidationFiles选项的过程。本文,则关注将Iteratable

 

1. 如下代码是HashShuffleWriter.write方法

在将partition的数据写入到磁盘前,进行map端的shuffle

  /** Write a bunch of records to this task's output */
  override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
    ///对输入的partition对应Iteratable集合进行map端combine
    val iter = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) { //如果定义了dep.aggregator以及dep.mapSideCombine则进行map端combine
        dep.aggregator.get.combineValuesByKey(records, context)
      } else {
        records
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      records
    }

    for (elem <- iter) {
      val bucketId = dep.partitioner.getPartition(elem._1)
      shuffle.writers(bucketId).write(elem)
    }
  }

 

 

2. 调用dep.aggregator.get.combineValuesByKey(records, context)进行map端combine

其中aggregator是Aggregator类型的对象,它在构造时需要传入如下参数:

 

 

case class Aggregator[K, V, C] (
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C) {

//代码体.....

}

 

3. Aggregator.combineValuesByKey方法体:

 

 @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
    combineValuesByKey(iter, null)

  //iter:是map端输入的可遍历的数据集合
  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
                         context: TaskContext): Iterator[(K, C)] = {

    if (!isSpillEnabled) { //不启用spill到磁盘,那么数据集合中的所有数据将在内存中进行combine
      val combiners = new AppendOnlyMap[K,C] //combiners是一个AppendOnlyMap, 可以想象成内存内的HashMap
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (iter.hasNext) { //遍历集合
        kv = iter.next()
        //kv._1是Key,changeValue接收两个参数,kv键以及update
        combiners.changeValue(kv._1, update) 
      }
      combiners.iterator //返回
    } else {
      //构造参数哪来的?
      val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
      
      combiners.insertAll(iter)
      // Update task metrics if context is not null
      // TODO: Make context non optional in a future release
      Option(context).foreach { c =>
        c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
        c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
      }
      combiners.iterator
    }
  }

 

 

4. 只在内存内combine(AppendOnlyMap)

 

4.1 使用AppendOnlyMap,代码的关键是update函数,以及combiner.changeValue(kv._1, update)

 

      val combiners = new AppendOnlyMap[K,C]
      var kv: Product2[K, V] = null
      ///通过闭包特性,update函数内可以访问kv
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (iter.hasNext) {
        kv = iter.next()
        //kv键值对的key以及update函数作为入参
        combiners.changeValue(kv._1, update)
      }
      combiners.iterator

 

4.2 AppendOnlyMap的changeValue方法

 

/**
   * Set the value for key to updateFunc(hadValue, oldValue), where oldValue will be the old value
   * for key, if any, or null otherwise. Returns the newly updated value.
   */
  //key:kv键值对的key,
  //updateFunc的函数类型是(Boolean,V)=>V
  def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) { ///如果键值null,对键为空的处理
      if (!haveNullValue) {
        incrementSize()
      }
      nullValue = updateFunc(haveNullValue, nullValue) //
      haveNullValue = true
      return nullValue
    }
    //通过对k进行rehash算出它Hash值
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
      //data是数组
      //kv期望放到2*pos位置,其中k放到2*pos的位置,v放到2*pos+1的位置
      val curKey = data(2 * pos)
  
      //if-else if-else做寻址探测
      //如果data(2*pos)位置上的key已经存在,且与k相同,那么表示它们需要聚合
      if (k.eq(curKey) || k.equals(curKey)) {
        //调用updateFunc,入参是值已经存在(true),那个位置上的值(old value)
        val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
        //赋心智
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        //返回新值
        return newValue
      } else if (curKey.eq(null)) {
        //HashMap上data(2 * pos)的值为null,
        //将kv的v写入,updateFunc的入参是值不存在,同时旧值为null
        val newValue = updateFunc(false, null.asInstanceOf[V])
        //将data(2*pos)的值由null改为k
        data(2 * pos) = k
        //设置新值
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        //AppendOnlyMap的长度增1,表示新元素加入
        incrementSize()
        return newValue
      } else {
        //重新寻址,寻址的算法,每次的步长是平方探测,1,2,4,8?
        val delta = i
        pos = (pos + delta) & mask
        i += 1
      }
    }
    null.asInstanceOf[V] // Never reached but needed to keep compiler happy
  }

 

4.3 incrementSize肩负着resize table的职责

  /** Increase table size by 1, rehashing if necessary */
  private def incrementSize() {
    curSize += 1
    if (curSize > growThreshold) {
      growTable()
    }
  }

 

 

 

5. 内存+磁盘combine(ExternalAppendOnlyMap)

特点:

1. 内存放不下可以放到磁盘

2. 放到磁盘前,首先进行排序

3. 最后对所有spill到磁盘的文件做归并排序

 

 6. 向MapOutputTrackerMaster汇报写入数据的位置以及文件中FileSegment的位置

http://www.cnblogs.com/fxjwind/p/3522219.html

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics