在这篇http://bit1129.iteye.com/blog/2186325博文中,分析了hash based shuffle write开启consolidationFiles选项的过程。本文,则关注将Iteratable
1. 如下代码是HashShuffleWriter.write方法
在将partition的数据写入到磁盘前,进行map端的shuffle
/** Write a bunch of records to this task's output */ override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { ///对输入的partition对应Iteratable集合进行map端combine val iter = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { //如果定义了dep.aggregator以及dep.mapSideCombine则进行map端combine dep.aggregator.get.combineValuesByKey(records, context) } else { records } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") records } for (elem <- iter) { val bucketId = dep.partitioner.getPartition(elem._1) shuffle.writers(bucketId).write(elem) } }
2. 调用dep.aggregator.get.combineValuesByKey(records, context)进行map端combine
其中aggregator是Aggregator类型的对象,它在构造时需要传入如下参数:
case class Aggregator[K, V, C] ( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { //代码体..... }
3. Aggregator.combineValuesByKey方法体:
@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0") def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] = combineValuesByKey(iter, null) //iter:是map端输入的可遍历的数据集合 def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = { if (!isSpillEnabled) { //不启用spill到磁盘,那么数据集合中的所有数据将在内存中进行combine val combiners = new AppendOnlyMap[K,C] //combiners是一个AppendOnlyMap, 可以想象成内存内的HashMap var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (iter.hasNext) { //遍历集合 kv = iter.next() //kv._1是Key,changeValue接收两个参数,kv键以及update combiners.changeValue(kv._1, update) } combiners.iterator //返回 } else { //构造参数哪来的? val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) combiners.insertAll(iter) // Update task metrics if context is not null // TODO: Make context non optional in a future release Option(context).foreach { c => c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled } combiners.iterator } }
4. 只在内存内combine(AppendOnlyMap)
4.1 使用AppendOnlyMap,代码的关键是update函数,以及combiner.changeValue(kv._1, update)
val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null ///通过闭包特性,update函数内可以访问kv val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (iter.hasNext) { kv = iter.next() //kv键值对的key以及update函数作为入参 combiners.changeValue(kv._1, update) } combiners.iterator
4.2 AppendOnlyMap的changeValue方法
/** * Set the value for key to updateFunc(hadValue, oldValue), where oldValue will be the old value * for key, if any, or null otherwise. Returns the newly updated value. */ //key:kv键值对的key, //updateFunc的函数类型是(Boolean,V)=>V def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { ///如果键值null,对键为空的处理 if (!haveNullValue) { incrementSize() } nullValue = updateFunc(haveNullValue, nullValue) // haveNullValue = true return nullValue } //通过对k进行rehash算出它Hash值 var pos = rehash(k.hashCode) & mask var i = 1 while (true) { //data是数组 //kv期望放到2*pos位置,其中k放到2*pos的位置,v放到2*pos+1的位置 val curKey = data(2 * pos) //if-else if-else做寻址探测 //如果data(2*pos)位置上的key已经存在,且与k相同,那么表示它们需要聚合 if (k.eq(curKey) || k.equals(curKey)) { //调用updateFunc,入参是值已经存在(true),那个位置上的值(old value) val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V]) //赋心智 data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] //返回新值 return newValue } else if (curKey.eq(null)) { //HashMap上data(2 * pos)的值为null, //将kv的v写入,updateFunc的入参是值不存在,同时旧值为null val newValue = updateFunc(false, null.asInstanceOf[V]) //将data(2*pos)的值由null改为k data(2 * pos) = k //设置新值 data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] //AppendOnlyMap的长度增1,表示新元素加入 incrementSize() return newValue } else { //重新寻址,寻址的算法,每次的步长是平方探测,1,2,4,8? val delta = i pos = (pos + delta) & mask i += 1 } } null.asInstanceOf[V] // Never reached but needed to keep compiler happy }
4.3 incrementSize肩负着resize table的职责
/** Increase table size by 1, rehashing if necessary */ private def incrementSize() { curSize += 1 if (curSize > growThreshold) { growTable() } }
5. 内存+磁盘combine(ExternalAppendOnlyMap)
特点:
1. 内存放不下可以放到磁盘
2. 放到磁盘前,首先进行排序
3. 最后对所有spill到磁盘的文件做归并排序
6. 向MapOutputTrackerMaster汇报写入数据的位置以及文件中FileSegment的位置
http://www.cnblogs.com/fxjwind/p/3522219.html
相关推荐
Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort 的 Shuffle。
hash_map
关于hash_map的用法与解释: #include <hash_map> #include #include using namespace std; //define the class class ClassA{ public: ClassA(int a):c_a(a){} int getvalue()const { return c_a;} void ...
代码重点是hash_table,附加std::map与其做对比,实现的是一条sql语句:select c_nationkey, c_mktsegment, count(*), max(c_acctbal) from aaa_customer_1g group by c_nationkey, c_mktsegment order by c_...
linux 下hash_map的基本原理及使用,希望对大家有帮助。
一个用Hash算法实现的map,可以实际项目所使用。 希望能帮助大家学习。
网上下载的一个哈希表.再次分享一下
大数据 Map 哈希 价值五亿美元 Big Data Map hash value of five hundred million U.S. dollars
哈希映射 hash map hash_map基于hash table(哈希表)。哈希表最大的优点,就是把数据的存储和查找消耗的时间大大降低,几乎可以看成是常数时间;而代价仅仅是消耗比较多的内存。然而在当前可利用内存越来越多的情况...
abaqus用户子程序,Hashin失效准则,abaqus显示分析,使用三维实体单元
本实例实现了一个hash_map,key是string类型,即可以存储索引是string的数据,希望对大家有帮助
在使用hash_map的时候,发现他对字符串的支持不是很好,就特写了一个str hash map 的程序,设置和提取键值的性能是hash_map 的20 倍左右。 特意拿出来给大家分享,如果有改进的, 请大家指出。
3维hashin失效准则~复合材料层合板
通过可靠的统计和精确的估算,我们能够在这些领域做出好的决定:选择散列连接(hash join)操作的正确构建端(build side),选择正确的连接算法(如broadcast hash join与 shuffled hash join), 调整连接的顺序...
This version combines the previous CPU-based hashcat (now called hashcat-legacy) and GPU-based oclHashcat. Hashcat is released as open source software under the MIT license. Current Version ...
Hashcat is the self-proclaimed world's fastest password recovery tool. It had a proprietary code base until 2015, but is now released as free software. Versions are available for Linux, OS X, and ...
Linked Hash Map Example java 源码
因为AHash是keyed hash,每个map会产生完全不同的hash,不知道key是无法预测的。 这可以防止 DOS 攻击,其中攻击者发送大量哈希冲突的项目,这些项目被用作哈希映射中的键。 这也避免了通过从一个映射读取并写入另...
1、Spark中的HashShufle的有哪些不足 2、 conslidate是如何优化Hash shuffle时在map端产生的小文件 3、spark.def
三维hashin失效准则,用于复合材料失效模拟