深入Spark Shuffle之前,首先总结下Hadoop MapReduce的Shuffle过程,了解了Hadoop MR的shuffle过程,有助于对Spark的Shuffle过程的理解
Hadoo MapReduce的Shuffle总体流程图
问题:下图中Map端的parttion sort and Spill to disk的一个矩形框分成3块表示什么意思?
在spill到磁盘前要做parttion操作,每个parttion一块,因此每次spill会产生包含多个parttion的文件
因为一个map操作可能会产生多次spill,每个spill对应一个文件,Map需要将所有这些spill文件进行合并成一个,每个spill文件相同分区的数据合并到一起(merge on disk操作),map任务结束时输出一个文件,这个文件包含了所有的输出数据,文件内按Partition进行分段
Hadoop MapReduce shuffle的特性:
1. Reducer从Map端拉取属于自己Partition的数据时,该Partition的数据已经在Map端排好序。Reducer将属于它的所有的partition拉取过去后,进行Reducer端的归并排序(归并排序的原因是Reducer会从多个Mapper拉取相应的Partition,
Reducer需要将所有这些Partition进行排序)
2. 如果客户端定义了Combiner,那么在数据在排好序后,会调用CombinerClass对数据已经combine,然后才spill到磁盘。这就是说Sort操作在Combine操作之前执行,而Partititon操作在Sort之前执行,也就是Parttion->Sort->Combine的过程
3. 整个Shuffle的过程包含如下几部分
foreach inputsplit do: map->output to memory buffer->partition(根据Reducer的个数进行分区)->map side sort(partition内排序)->combine(Partition内Combine)->spill->map side merge(因为可能spill出多个文件,最后需要merge成一个大文件)->reducer fetch from Mapper->reducer side merge->reduce to ouput
map side什么时候开始partition流程,是在Map结果或者内存缓冲即将满需要spill到磁盘时做
Hadoop MapReduce shuffle的过程:
Map端
1. Mapper产生的数据首先写入到内存缓冲区,默认大小是100M。在内存使用了80%的时候,开始将数据spill到磁盘上。在spill到磁盘前,在内存中做如下操作:
1.1首先对数据按照Reducer的数目进行分区
1.2分区完成后,对每个分区的数据按照Key进行排序
1.3每个分区的数据排序完成后,如果Map端定义了Combiner,那么对该分区的数据进行Map端combine,这有利于压缩写到磁盘所需要的空间以及发送到reducer的数据
2.spill file
2.1 针对每个partiton,mapper可能会spill磁盘多次。每当内存缓冲区满,要spill到磁盘时,Hadoop不是追加的方式,而是新建一个新的spill文件,这个文件内的数据是按照Key排序的
2.2 在Map任务结束前,会对磁盘上的所有的spill文件进行一个总排序,这样Map任务结束时,map产生了一个唯一的且排序的输出文件。如果map产生了多余3个的spill file,那么在产生这个唯一的文件之前,再做一次combine操作,将位于不同文件的相同Key进行combine
3.数据压缩
为了压缩mapper输出数据,一方面节省占用的磁盘空间,另一方面减少数据传输量,可以使用数据压缩。默认情况下,数据压缩是没有开启的。可以将mapred.compress.map.out设置为true开启即可。同时,也可以为mapper设置压缩算法,
常用的压缩算法有gzip,lzo和snappy,实际生产环境下,gzip已经很少使用,lzo和snappy各有优缺点,需根据实际情况选择,
4.数据传输
Hadoop使用HTTP将map节点的数据传输到reduce节点,默认是五个线程去拉取数据。
总结:
1.Map只产生一个Partion内部排序的文件(这个文件分割成多个Partition),。也就是说,Reduce拉取数据时,需要按照offset来取,这个信息记录于什么位置来着。是ApplicationMaster
Reduce端
0. Reducer从每个Mapper拉取过来的数据都是在Mapper端排好序的
1. 1个reducer可能到多个map端拉取属于自己需要处理的map输出文件。拉取的策略是,只要有map输出文件完成,那么reducer就去拉取,而不是等到所有的map都完成了才去拉取。
2. reducer如何得知map已经产生了一个分区的输出文件?在Hadoop2中,mapper直接通知ApplicationMaster。在Hadoop1中,mapper通知TaskTracker,任务已经执行完成,而TaskTracker则通知JobTracker,那么JobTracker则会通知Reducer已经有Mapper任务执行完成并且数据的位置在什么地方(此处可见,JobTracker确实承担了很多的职责)
3. reducer拉取分区数据后,如果拉过来的数据量较小,那么直接加载到内存;如果较大,则存放到磁盘上。这跟Mapper端的处理过程类似,此时Reducer的内存大小是50M,随着拉取的数据越来越多,内存容不下,Reducer开启Spill到磁盘操作
4. 当所有的分区数据拉取过来后,就开始了merge sort阶段,将单个已排序的文件进行总排序。
5.当所有的文件归并排序完成后,就开始了reduce阶段,即把排序的数据传给reducer
Hadoop Map/Reduce Shuffle总结
Hadoop Map/Reduce Shuffle过程简单的总结为
map->partition(根据Reducer的个数 进行分区)->map side sort(partition内排序)->combine(Partition内Combine)->spill->map side merge(因为可能spill出多个文件,最后需要merge成一个大文件)->fetch->reduce side merge->reduce
问题:
1. 每个partition经过Map后得到一个排序的文件,那么这个文件中的数据只被一个Reducer消费还是被所有的Reducer消费?
是被所有的Reducer消费,也就是说,一个Map输出文件包含了很多个Partition,Reducer只关心属于自己的Partition。在上面的那幅图中也清楚的看到,一个Map产生的最终输出文件包含了3个Partition(merge on disk右侧连在一起的三个矩形框),而每个Partition由reducer进行消费。
2. 每个partition经过Map后得到一个排序的文件,那么所有的Map总共产生Map数目的文件。
这里的Partition是输入的Partition数,即Input Split的输入产生的Partition数目。
在Spark的Sort Based的shuffle中,最终也是产生了m个(m是输出文件。在Spark的Hash Based的shuffle中,产生的输出文件个数是M*R
3.注意:流程是在排序前就要partition,然后partition内部做排序。然后分区数据会被送往处理它的Reducer
相关推荐
Hadoop Mapreduce过程shuffle过程全解析,Shuffle过程
hadoop的map reduce 学习手册,很实用
Hadoop Map-Reduce教程,hadoop,mapreduce
Hadoop Map Reduce教程,介绍hadoop map/reduce框架的各个方面
Hadoop学习总结之三:Map-Reduce入门
Hadoop学习总结之四:Map-Reduce的过程解析
hadoop中map/reduce自学资料合集
讲述了Windows平台的Hadoop安装... 最后,以最简单的求和为例,剖析Hadoop的Map/Reduce工作机制,对于初学Hadoop及Map/Reduce的读者有很大的帮助。相信通过最简单的求和为例,读者可步入Hadoop的Map/Reduce开发者行列。
人脸识别,车辆识别,一人一档,一车一档 hadoop map reduce hbase
简单说一下hadoop和spark的shuffle过程
hadoop map reduce 的中文简易教程,能轻松帮助普通用户不需了解太多hadoop底层知识就能实现分布式编程,很好的入门教程。
hadoop开发文档
Hadoop Map Reduce 教程.doc
NULL 博文链接:https://sgq0085.iteye.com/blog/1879442
spark-2.2.0-yarn-shuffle.jar
Hadoop Map-Reduce数据分析
hadoop,指南,map,reduce,hdfs,分布式,云计算,各部分都有详细说明
hadoop_spark_数据算法hadoop_spark_数据算法hadoop_spark_数据算法hadoop_spark_数据算法
Spark所需的hadoop2.7.1相关资源 hadoop2.7.1版本的hadoop.dll,winutils.exe 适用Spark2.0.0+版本
The Joins query by using Hadoop and map reduce