MapReduce编程框架之Shuffle详述
Shuffle是什麽
Shuffle:数据混洗,核心机制有数据分区、排序、局部聚合、缓存、拉取、再合并排序。
Shuffle是MapReduce处理流程中的一个核心过程,由下图可以看出,它的每一个处理步骤是分散在各个mapTask和ReduceTask节点上完成的,整体分为3个操作:Partitioner(分区,NumReduceTask只有一个或者没有分区操作将不会起作用)、Sort(排序,根据key排序,没有reducer阶段,那么就不会对key排序)、Combiner(合并,局部value的合并,可选组件)。
MapReuce中的Shuffle
我们先捋一捋MapReduce的执行流程:
我们知道,一个大文件需要处理,在HDFS上是以block块的形式存放,Hadop2.x之后的每个block默认为128M,默认副本为3份,运行每个map任务会处理一个split,一般split大小和block相同,那么有多少block就有多少个map任务。
每个map任务处理完输入的split后会把结果写入到内存的一个环形缓冲区,写入过程中会进行简单的排序,默认大小为100M,当缓冲区的大小使用超过一定的阀值(默认为80%),一个后台的线程就会启动把缓冲区中的数据溢写(spill)到本地磁盘中,同时Mapper继续向环形缓冲区中写入数据。
数据溢写入到磁盘之前,首先会根据Reducer的数量划分成同数量的分区(默认为HashPartition),每个分区中的数据都会有后台线程根据map任务的输出结果进行内排序(字典数序,自然数顺序或自定义顺序comparator),如果有combiner(作用是使map输出更紧凑,写到本地磁盘和传给reducer的数据更少),Mapper会在溢写到磁盘之前排好序的输出上运行,最后在本地生成分好区排好序的小文件;如果小文件数量达到默认值10(mapreduce.task.io.sort.factor),则会合并成一个大文件,这个结果文件的分区存在一个映射关系,比如 01024 字节内容为 0 号分区内容,10252048 字节内容为 1 号分区内容;如果map向环形缓冲区写入数据的速度大于向本地写入数据,环形缓冲区就会被写满,向环形缓冲区写入的数据的线程就会阻塞直至缓冲区中的内容全部溢写到磁盘后再次启动,到阀值后会向本地磁盘新建一个溢写文件。
Reduce任务启动,Reducer个数由mapred-site.xml的mapreduce.job.reducers配置决定,或者初始化job时调用Job.setNumReduceTask来设置;Reducer中的一个线程定期向MRAppMastrer询问Mapper输出结果文件位置,mapper结束后会向MRAppMaster汇报信息;从而Reducer得知Mapper状态,得到map结果文件目录。
当有一个Mapper结束时,reduce任务进入复制阶段,通过http协议(hadoop内置了netty容器)把所有Mapper结果文件的对应的分区数据复制过来,比如:编号为0的reduce复制maop结果文件中0号分区数据,1号reduce复制map结果文件中1号分区的数据等;Reducer可以并行复制Mappere的结果,默认线程数为5(mapred-site.xml:mapreduce.reduce.shuffle.parallelcoopies);
另外:如果map结果文件相当小,则会被直接复制到reduceNodeManager的内存中(缓冲区大小由mapred.site.xml:mapreduce.shuffle.input.buffer.percent指定,默认为0.7),一旦缓冲区达到reduce的阀值大小0.66或写入到reduceNodeManager内存中文件个数达到map输出阀值1000(mapred-site.xml:mapreduce.reduce.merge.inmen.threshold),reduce就会把map结果文件合并溢写到本地。
复制阶段完成后,Ruducer进入到Merge阶段,循环的合并map结果文件,维持其顺序排列,合并因子默认为10(mapred-site.xml:mapreduce.task.io.start.factor),经过不断的Merge后得到一个”最终文件”,可能存储在磁盘也可能存在内存中。
“最终文件”输入到reduce进行计算,计算结果输入到HDFS文件系统中存储。
shuffle流程
回到map阶段,mapTask 是收集 map()方法输出的 K-V 对,放到内存缓冲区 kvbuffer中(环形缓冲区:内存中的一种首尾相连的数据结构,kvbuffer 包含数据区和索引区)
从内存缓冲区中的数据区的数据不断溢出本地磁盘文件 file.out,可能会溢出多次,则会有多个文件,相应的内存缓冲区中的索引区数据溢出为磁盘索引文件 file.out.index
多个溢出文件会被合并成大的溢出文件
在溢出过程中,及合并的过程中,都要调用 partitoner 进行分区和针对 key 进行排序
在数据量大的时候,可以对 mapTask 结果启用压缩,将 mapreduce.map.output.compress设为 true,并使用 mapreduce.map.output.compress.codec 设置使用的压缩算法,可以提高数据传输到 reducer 端的效率
reduceTask 根据自己的分区号,去各个 mapTask 机器上取相应的结果分区数据,取到同一个分区的来自不同 mapTask 的结果文件,reduceTask 会将这些文件再进行合并(归并排序)
合并成大文件后,shuffle 的过程也就结束了,后面进入 reduceTask 的逻辑运算过程(从文件中取出一个一个的键值对 group,调用用户自定义的 reduce()方法)
Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快
缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb 默认 100M
缓冲区的溢写比也可以通过参数调整,参数:mapreduce.map.sort.spill.percent 默认 0.8
带上shuffle后mapreduce执行流程图
为什么需要环形缓冲区
Map过程中环形缓冲区是指数据被map处理之后会先放入内存,内存中的这片区域就是环形缓冲区。数据从内存要写入磁盘中时,数据会被先写入到磁盘缓冲区,磁盘缓冲区满了再把数据写入磁盘。
磁盘缓冲区是为了平滑不同I/O设备的速度差。
磁盘是分区分块存储的。如果是机械硬盘,是分磁道和扇区的。当磁头转到一个扇区的某磁道时,开始读取数据,如果只读取了 100KB 的数据,这时操作系统就想,磁头转到这儿看不容易啊,反正来都来了,顺带多读点数据吧,万一用的着呢。
所以,读取数据的时候也是通过缓冲区的。
如果应用的数据存放在不同的磁道,不同的扇区,那么读取的效率是很低的,这被称为磁盘碎片,所以 windows 有个操作叫“整理磁盘碎片”。