`
bit1129
  • 浏览: 1051214 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark四十七】Hadoop Map Shuffle Reduce的过程

 
阅读更多

深入Spark Shuffle之前,首先总结下Hadoop MapReduce的Shuffle过程,了解了Hadoop MR的shuffle过程,有助于对Spark的Shuffle过程的理解

 

Hadoo MapReduce的Shuffle总体流程图

 

问题:下图中Map端的parttion sort and Spill to disk的一个矩形框分成3块表示什么意思?

在spill到磁盘前要做parttion操作,每个parttion一块,因此每次spill会产生包含多个parttion的文件

因为一个map操作可能会产生多次spill,每个spill对应一个文件,Map需要将所有这些spill文件进行合并成一个,每个spill文件相同分区的数据合并到一起(merge on disk操作),map任务结束时输出一个文件,这个文件包含了所有的输出数据,文件内按Partition进行分段



Hadoop MapReduce shuffle的特性:

1. Reducer从Map端拉取属于自己Partition的数据时,该Partition的数据已经在Map端排好序。Reducer将属于它的所有的partition拉取过去后,进行Reducer端的归并排序(归并排序的原因是Reducer会从多个Mapper拉取相应的Partition,

Reducer需要将所有这些Partition进行排序)

2. 如果客户端定义了Combiner,那么在数据在排好序后,会调用CombinerClass对数据已经combine,然后才spill到磁盘。这就是说Sort操作在Combine操作之前执行,而Partititon操作在Sort之前执行,也就是Parttion->Sort->Combine的过程

3. 整个Shuffle的过程包含如下几部分

foreach inputsplit do: map->output to memory buffer->partition(根据Reducer的个数进行分区)->map side sort(partition内排序)->combine(Partition内Combine)->spill->map side merge(因为可能spill出多个文件,最后需要merge成一个大文件)->reducer fetch from Mapper->reducer side merge->reduce to ouput

 

map side什么时候开始partition流程,是在Map结果或者内存缓冲即将满需要spill到磁盘时做

 

 

Hadoop MapReduce shuffle的过程:

Map端

1. Mapper产生的数据首先写入到内存缓冲区,默认大小是100M。在内存使用了80%的时候,开始将数据spill到磁盘上。在spill到磁盘前,在内存中做如下操作:

1.1首先对数据按照Reducer的数目进行分区

1.2分区完成后,对每个分区的数据按照Key进行排序

1.3每个分区的数据排序完成后,如果Map端定义了Combiner,那么对该分区的数据进行Map端combine,这有利于压缩写到磁盘所需要的空间以及发送到reducer的数据

2.spill file

2.1 针对每个partiton,mapper可能会spill磁盘多次。每当内存缓冲区满,要spill到磁盘时,Hadoop不是追加的方式,而是新建一个新的spill文件,这个文件内的数据是按照Key排序的

2.2 在Map任务结束前,会对磁盘上的所有的spill文件进行一个总排序,这样Map任务结束时,map产生了一个唯一的且排序的输出文件。如果map产生了多余3个的spill file,那么在产生这个唯一的文件之前,再做一次combine操作,将位于不同文件的相同Key进行combine

3.数据压缩

为了压缩mapper输出数据,一方面节省占用的磁盘空间,另一方面减少数据传输量,可以使用数据压缩。默认情况下,数据压缩是没有开启的。可以将mapred.compress.map.out设置为true开启即可。同时,也可以为mapper设置压缩算法,

常用的压缩算法有gzip,lzo和snappy,实际生产环境下,gzip已经很少使用,lzo和snappy各有优缺点,需根据实际情况选择,

4.数据传输

Hadoop使用HTTP将map节点的数据传输到reduce节点,默认是五个线程去拉取数据。

 

总结:

1.Map只产生一个Partion内部排序的文件(这个文件分割成多个Partition),。也就是说,Reduce拉取数据时,需要按照offset来取,这个信息记录于什么位置来着。是ApplicationMaster

 

Reduce端

0. Reducer从每个Mapper拉取过来的数据都是在Mapper端排好序的

1. 1个reducer可能到多个map端拉取属于自己需要处理的map输出文件。拉取的策略是,只要有map输出文件完成,那么reducer就去拉取,而不是等到所有的map都完成了才去拉取。

2. reducer如何得知map已经产生了一个分区的输出文件?在Hadoop2中,mapper直接通知ApplicationMaster。在Hadoop1中,mapper通知TaskTracker,任务已经执行完成,而TaskTracker则通知JobTracker,那么JobTracker则会通知Reducer已经有Mapper任务执行完成并且数据的位置在什么地方(此处可见,JobTracker确实承担了很多的职责)

3. reducer拉取分区数据后,如果拉过来的数据量较小,那么直接加载到内存;如果较大,则存放到磁盘上。这跟Mapper端的处理过程类似,此时Reducer的内存大小是50M,随着拉取的数据越来越多,内存容不下,Reducer开启Spill到磁盘操作

4. 当所有的分区数据拉取过来后,就开始了merge sort阶段,将单个已排序的文件进行总排序。

5.当所有的文件归并排序完成后,就开始了reduce阶段,即把排序的数据传给reducer

 

Hadoop Map/Reduce Shuffle总结

Hadoop Map/Reduce Shuffle过程简单的总结为

map->partition(根据Reducer的个数 进行分区)->map side sort(partition内排序)->combine(Partition内Combine)->spill->map side merge(因为可能spill出多个文件,最后需要merge成一个大文件)->fetch->reduce side merge->reduce

 

问题:

1. 每个partition经过Map后得到一个排序的文件,那么这个文件中的数据只被一个Reducer消费还是被所有的Reducer消费?

是被所有的Reducer消费,也就是说,一个Map输出文件包含了很多个Partition,Reducer只关心属于自己的Partition。在上面的那幅图中也清楚的看到,一个Map产生的最终输出文件包含了3个Partition(merge on disk右侧连在一起的三个矩形框),而每个Partition由reducer进行消费。

2. 每个partition经过Map后得到一个排序的文件,那么所有的Map总共产生Map数目的文件。

这里的Partition是输入的Partition数,即Input Split的输入产生的Partition数目。

在Spark的Sort Based的shuffle中,最终也是产生了m个(m是输出文件。在Spark的Hash Based的shuffle中,产生的输出文件个数是M*R

3.注意:流程是在排序前就要partition,然后partition内部做排序。然后分区数据会被送往处理它的Reducer

4. Sort Based Shuffle的优势在哪里?

Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并).外排序怎么理解?是指数据可以排序后放到磁盘上,然后可以做大规模数据的排序?

 
 


 

 

 


 

 


 

 

  • 大小: 43.3 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics