Hadoop学习笔记-MapReduce(3)-MapReduce流程
流程初印象
上面是MapReduce的工作流程图的大致版本,主要可以分为以下几个部分。我首先用通俗的话语描述每个流程需要完成的事情,然后再分别介绍每个环节的具体步骤和细节。
MapReduce的输入一般来说是一个或者多个文件,在代码中通过指定路径获取。
有了文件之后,需要决定这些文件要分配给多少个Mapper,需要如何分配给这些Mapper,即采用什么样的切片机制。这个过程由InputFormat来完成。
每个Mapper得到自己对应的那部分文件,之后以某种形式形成键值对输入,经过Mapper的逻辑之后形成键值对输出。
某种形式指可以是<偏移量,一行>或者<偏移量,多行>等,允许用户指定
每个Mapper的键值对输出,最终都需要持久化成为一个文件,但是并不是一下子形成的,在形成过程中,会涉及到排序,溢写,分区等操作,还有可能Combiner合并操作。最终每个Mapper对应得到一个输出文件。
Mapper的任务完成之后启动Reducer,启动Reducer的个数可以由用户指定。Reducer根据分区从不同的Mapper取来对应的数据,然后将其归并成输入文件。以键值对的形式输入,经过Reducer的逻辑之后形成键值对输出。
来自Reducer的键值对输出需要持久化为文件,并且每个Reducer最终对应生成一个文件。而键值对应该如何输出成为文件,这个过程由OutputFormat来控制。
InputFormat数据输入
这一部分完成的操作是采用切片机制决定如何将文件分配给不同的Mapper。并且在这一部分,会讲到Job任务提交的过程。
1. 数据切片
在MapReduce中,使用数据切片来对输入的数据进行划分,从而决定一个MapReduce任务需要开启多少个Mapper,称为MapTask并行度。
这里可能会涉及到的概念辨析:
数据块:数据块是HDFS在物理上将数据以块状进行划分存储,数据块是HDFS的存储数据单位
数据切片:数据切片是在逻辑上对输入进行划分,并不会在磁盘上对它进行切分存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask
对于一个MapReduce任务来说,它的MapTask并行度在客户端提交任务的时候携带的切片文件信息来决定,切片文件信息中描述了该如何对输入文件进行切片。
2. Job提交流程
Job的提交流程如下:
- 确认当前Job的状态
- 处理新旧API之间的兼容性(setUseNewAPI)
- connect(连接本地或者连接集群)
- 将Job提交到系统
- 检查输入和输出路径
- 创建提交信息的路径,设置必要的集群信息
- 拷贝jar包到集群中
- 计算Job的数据切片,生成切片规划文件
- 向路径中写入配置文件
- 提交Job,返回提交状态
在计算数据切片的过程中,会根据配置信息来获取一个InputFormat的子类用于计算数据切片,默认是FileInputFormat。
3. InputFormat
在MapReduce中,InputFormat是一个抽象类,描述了如何来进行数据切片。里面提供了两个抽象方法getSplits
和createRecordReader
,其他具体完成数据切片的类都是这个抽象类的子类。
这里说的InputFormat类来自于
package org.apache.hadoop.mapreduce
。而在Hadoop源码中,还存在一个InputFormat接口,这个接口内容与inputFormat类基本一致,但是它来自于package org.apache.hadoop.mapred
这两个包分别是新旧版本的API。
package org.apache.hadoop.mapreduce
表示更新的版本,值得注意。
在IDEA中通过ctrl+H
查看InputFromat的实现类,如下图所示:
3.1 FileInputFormat
FileInputFormat直接继承了抽象类InputFormat。在FileInputFormat中,实现了InputFormat的抽象方法getSplits,但是没有实现createRecordReader。FileInputFormat仍然是个抽象类。
FileInputFormat切片机制:
- 简单地按照文件的内容长度进行切分
- 切片大小默认等于数据块Block大小
- 切片的时候不考虑数据集整体,而是逐个针对每个文件进行单独切片
FileInputFormat切片流程:
找到输入的路径,遍历其中的每个文件
对于遍历的每个文件:
获取文件大小
计算切片大小
形成切片(每次切片的时候,都需要判断切完剩下的部分是否大于块的1.1倍,如果大于就可以再分,否则剩余的就作为一个块)
1
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) //SPLIT_SLOP=1.1
将切片信息写入一个切片规划文件当中
InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等。
提交切片规划文件到Yarn上,Yarn上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数
FileInputFormat中计算切片大小的公式如下:
1 |
|
因此在默认情况下,切片大小 = blocksize。
于是,根据这个计算公式我们可以对切片大小进行调整。
- maxsize:参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值
- minsize:参数调的比blockSize大,则可以让切片变得比blockSize还大
可以使用的信息:获取切片信息的API
1
2
3
4
// 获取切片的文件名称
String name = inputSplit.getPath().getName();
// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
注意在切片的过程中,会判断文件是否是可以切分的。如果不可以切分,则一个文件还是一个文件。(如部分压缩文件的情况)
3.2 TextInputFormat
TextInputFormat是FileInputFormat的直接子类,实现了其中没有实现的createRecordReader方法。在createRecordReader中它返回了一个LineRecordReader,按行读取每条记录形成键值对。键是存储该行在整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text 类型。
3.3 CombineTextInputFormat
CombineTextInputFormat是FileInputFormat的直接子类,适用于小文件过多的场景。默认的Text Input Format切片机制按照文件进行切片,不管文件多小,至少会形成一个单独的切片。这样如果有大量的小文件的话,就会产生大量的MapTask,处理效率低下。
CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到 一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。
在CombineTextInputFormat中可以使用setMaxInputSplitSize值进行设置。下面是CombineTextInputFormat进行切片的机制:
首先进行虚拟存储过程:
将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不 大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍, 那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值 2 倍,此时 将文件均分成 2 个虚拟存储块(防止出现太小切片)。
例如 setMaxInputSplitSize 值为 4M,输入文件大小为 8.02M,则先逻辑上分成一个 4M。剩余的大小为 4.02M,如果按照 4M 逻辑划分,就会出现 0.02M 的小的虚拟存储 文件,所以将剩余的 4.02M 文件切分成(2.01M 和 2.01M)两个文件。
切片过程:
- 判断虚拟存储的文件大小是否大于setMaxInputSplitSize的值,如果大于则单独形成一个切片
- 如果不大于则和下一个虚拟存储文件进行合并,共同形成一个切片
- 继续判断下一个切片
3.4 设置使用的InputFormat
在Driver类中进行配置
1
2
3
4
// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置 4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
Map和Reduce流程
在前面的流程当中,以及完成了切片信息的获取,Yarn根据切片信息决定启动Mapper的个数,以及每个Mapper对应的数据。
1. RecordReader
现在每个Mapper有了自己对应的数据,但是我们知道Mapper的输入是键值对形式的,所以需要有一个中间流程将数据转化成键值对的形式,这就是RecordReader做的事情。
在前面的TextInputFormat中我们也提到了它实现的createRecordReader
中返回了LineRecordReader
。这就是RecordReader的一种,它将数据文件组织成<偏移量, 每行内容>
的键值对形式。
2. MapReduce流程
上面是MapReduce的全流程,我们可以看到,在第6步是Mapper里面的逻辑,是用户自定义实现的输入键值对,输出键值对逻辑。在第14步之后的Reducer,则是对应用户自定义实现的输入键值对组,输出键值对的逻辑。
在Mapper和Reducer之间,还有很多流程,这些流程统称为Shuffle过程。
Shuffle机制
1. shuffle过程
在前面的图中我们可以看到,在Mapper和Reducer之间还存在很多步骤,并不是一下到达的,这个过程我们称为Shffle过程。
- MapTask收集map方法输出的键值对,放在内存缓冲区中
- 内存缓冲区无法一次性存放所有的输出,于是会存在不断的溢写,可能会溢出多个文件
- 在溢写的过程中,需要将内存中的文件持久化到本地,在此之前会对文件内部的数据按照分区进行快排,快排的依据是key值。排序完成之后,形成本地文件
- 在过程中会可能会出现多个溢写文件,这些文件各自内部按照分区有序,但是最终需要形成一个分区有序的大文件,所以再对所有的溢写文件进行归并排序
- 在上面的合并过程中,可以出现Combiner过程(可选)
- 启动ReduceTask。ReduceTask按照自己的分区号,去各个MapTask机器上取得对应的分区数据
- 它从每个MapTask上远程拷贝相应的数据文件,如果文件大 小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到 一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者 数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上
- Reducer需要得到一个统一的输入文件,所以当所有数据拷贝完毕之后,需要再对这些不同文件进行归并排序
- 合成大文件之后,Shuffle的过程就结束了,后面进入ReduceTask的逻辑运算过程。
注意事项:
Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上来说,缓冲区越大,磁盘IO的次数越少,执行的速度也就越快
缓冲区的大小可以通过参数调整:mapreduce.task.io.sort.mb,默认为100M
Shuffle过程在Map阶段结束之后,Reduce阶段开启之前。如果整个任务根本没有Reduce阶段,那么也就没有Shffle阶段,直接在Map阶段键值对输出那里就退出了。
2. Partition分区
在Reduce阶段,每个Reducer得到的是相同分区的数据。默认情况下进行的Partitioner分区如下:
1 |
|
分区和Reducer的个数之间存在紧密联系,默认的分区逻辑如上所示,按照hashcode的值对Reducer的个数取余,得到不同的分区。
前面说到Mapper的个数和数据切片有关,但是这里Reducer的个数则可以由程序员直接指定,默认只开启一个Reducer。
实际情况中,指定多少个Reducer应该需要通过进行测试来确定。
如果要自定义实现分区逻辑的化,需要自定义类继承Partioner,并且重写其中的getPartition方法,具体可以查看后续笔记。
3. WritableComparable排序
在Shuffle过程中,存在多处排序操作,排序是MapReduce框架中最重要的操作之一。在Mapper溢写过程中,存在快排操作;多个溢写文件的合并,使用的是归并排序;在Reducer获取输入文件的时候,会进行归并排序。
MapTask和ReduceTask均会对数据按照key进行排序,该操作属于Hadoop的默认行为,任何应用程序中的数据均会被排序,而不管逻辑上是否需要。因此在指定键值对的时候(Mapper的输出键值对和Reducer的输入键值对,当然这两者是相同的),key能够排序是必要的。
默认排序按照字典顺序进行排序,如果使用的是自定义类作为key值的化,那么需要实现WritableComparable接口,重写其中的compareTo方法,具体可以查看后续笔记。
4. Combiner合并
Combiner是MapReduce程序中,Mapper和Reducer之外的一种组件,它出现的背景是为了减轻Reducer的负担,所以提前在Mapper阶段进行合并操作。它的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量。
Combiner和Reducer做的事情在逻辑上是一致的,而Combiner组件的父类正是Reducer。它和Reducer之间的区别在于运行的位置不同,Combiner是在每一个MapTask所在的节点运行的,而Reducer是接收全局的所有Mapper的输出结果。
我们可以自定义实现Combiner的逻辑,需要自定义Combiner继承Reducer,重写Reducer方法
1 |
|
之后在驱动Driver中进行配置
1 |
|
而我们前面也说了,Combiner和Reducer完成的事情基本是相同的,所以在可以使用Combiner的情况下,也可以这样指定:
1 |
|
不过需要注意的是,并不是所有情况下都能使用Combiner的,是否能够使用Combiner,需要根据业务逻辑来进行判断。
OutputFormat数据输出
在Reducer得到键值对输出之后,需要最终输出得到结果。从键值对输出到结果,则是OutputFormat完成的事情。OutputFormat是MapReduce输出的基类,所有MapReduce的输出类都继承了OutputFormat类。
下面是MapReduce中自带的一些输出类,默认使用的是TextOutputFormat。
当然我们也可以自定义一个OutputFormat类,我们自定义的MyOutputFormat可以继承上面的类,并且实现里面的getRecordWriter方法。在getRecordWriter中需要返回一个RecordWriter,里面重写对应的write和close方法,来完成我们的输出逻辑。当然最后还是需要在驱动类中完成绑定。