Hadoop学习笔记-MapReduce(3)-MapReduce流程

流程初印象

上面是MapReduce的工作流程图的大致版本,主要可以分为以下几个部分。我首先用通俗的话语描述每个流程需要完成的事情,然后再分别介绍每个环节的具体步骤和细节。

  1. MapReduce的输入一般来说是一个或者多个文件,在代码中通过指定路径获取。

  2. 有了文件之后,需要决定这些文件要分配给多少个Mapper,需要如何分配给这些Mapper,即采用什么样的切片机制。这个过程由InputFormat来完成。

  3. 每个Mapper得到自己对应的那部分文件,之后以某种形式形成键值对输入,经过Mapper的逻辑之后形成键值对输出。

    某种形式指可以是<偏移量,一行>或者<偏移量,多行>等,允许用户指定

  4. 每个Mapper的键值对输出,最终都需要持久化成为一个文件,但是并不是一下子形成的,在形成过程中,会涉及到排序,溢写,分区等操作,还有可能Combiner合并操作。最终每个Mapper对应得到一个输出文件。

  5. Mapper的任务完成之后启动Reducer,启动Reducer的个数可以由用户指定。Reducer根据分区从不同的Mapper取来对应的数据,然后将其归并成输入文件。以键值对的形式输入,经过Reducer的逻辑之后形成键值对输出。

  6. 来自Reducer的键值对输出需要持久化为文件,并且每个Reducer最终对应生成一个文件。而键值对应该如何输出成为文件,这个过程由OutputFormat来控制。

InputFormat数据输入

这一部分完成的操作是采用切片机制决定如何将文件分配给不同的Mapper。并且在这一部分,会讲到Job任务提交的过程。

1. 数据切片

在MapReduce中,使用数据切片来对输入的数据进行划分,从而决定一个MapReduce任务需要开启多少个Mapper,称为MapTask并行度。

这里可能会涉及到的概念辨析:

  • 数据块:数据块是HDFS在物理上将数据以块状进行划分存储,数据块是HDFS的存储数据单位

  • 数据切片:数据切片是在逻辑上对输入进行划分,并不会在磁盘上对它进行切分存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask

对于一个MapReduce任务来说,它的MapTask并行度在客户端提交任务的时候携带的切片文件信息来决定,切片文件信息中描述了该如何对输入文件进行切片。

2. Job提交流程

Job的提交流程如下:

  1. 确认当前Job的状态
  2. 处理新旧API之间的兼容性(setUseNewAPI)
  3. connect(连接本地或者连接集群)
  4. 将Job提交到系统
    1. 检查输入和输出路径
    2. 创建提交信息的路径,设置必要的集群信息
    3. 拷贝jar包到集群中
    4. 计算Job的数据切片,生成切片规划文件
    5. 向路径中写入配置文件
    6. 提交Job,返回提交状态

在计算数据切片的过程中,会根据配置信息来获取一个InputFormat的子类用于计算数据切片,默认是FileInputFormat

3. InputFormat

在MapReduce中,InputFormat是一个抽象类,描述了如何来进行数据切片。里面提供了两个抽象方法getSplitscreateRecordReader,其他具体完成数据切片的类都是这个抽象类的子类。

这里说的InputFormat类来自于package org.apache.hadoop.mapreduce。而在Hadoop源码中,还存在一个InputFormat接口,这个接口内容与inputFormat类基本一致,但是它来自于package org.apache.hadoop.mapred

这两个包分别是新旧版本的API。package org.apache.hadoop.mapreduce表示更新的版本,值得注意。

在IDEA中通过ctrl+H查看InputFromat的实现类,如下图所示:

3.1 FileInputFormat

FileInputFormat直接继承了抽象类InputFormat。在FileInputFormat中,实现了InputFormat的抽象方法getSplits,但是没有实现createRecordReader。FileInputFormat仍然是个抽象类

FileInputFormat切片机制:

  1. 简单地按照文件的内容长度进行切分
  2. 切片大小默认等于数据块Block大小
  3. 切片的时候不考虑数据集整体,而是逐个针对每个文件进行单独切片

FileInputFormat切片流程:

  1. 找到输入的路径,遍历其中的每个文件

  2. 对于遍历的每个文件:

    1. 获取文件大小

    2. 计算切片大小

    3. 形成切片(每次切片的时候,都需要判断切完剩下的部分是否大于块的1.1倍,如果大于就可以再分,否则剩余的就作为一个块

      1
      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) //SPLIT_SLOP=1.1
    4. 将切片信息写入一个切片规划文件当中

    5. InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等。

  3. 提交切片规划文件到Yarn上,Yarn上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数

FileInputFormat中计算切片大小的公式如下:

1
2
3
Math.max(minSize, Math.min(maxSize, blockSize));
mapreduce.input.fileinputformat.split.minsize=1; //默认值为1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue; //默认值Long.MAXValue

因此在默认情况下,切片大小 = blocksize。

于是,根据这个计算公式我们可以对切片大小进行调整。

  • maxsize:参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值
  • minsize:参数调的比blockSize大,则可以让切片变得比blockSize还大

可以使用的信息:获取切片信息的API

1
2
3
4
// 获取切片的文件名称
String name = inputSplit.getPath().getName();
// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();

注意在切片的过程中,会判断文件是否是可以切分的。如果不可以切分,则一个文件还是一个文件。(如部分压缩文件的情况)

3.2 TextInputFormat

TextInputFormat是FileInputFormat的直接子类,实现了其中没有实现的createRecordReader方法。在createRecordReader中它返回了一个LineRecordReader,按行读取每条记录形成键值对。键是存储该行在整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text 类型。

3.3 CombineTextInputFormat

CombineTextInputFormat是FileInputFormat的直接子类,适用于小文件过多的场景。默认的Text Input Format切片机制按照文件进行切片,不管文件多小,至少会形成一个单独的切片。这样如果有大量的小文件的话,就会产生大量的MapTask,处理效率低下。

CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到 一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。

在CombineTextInputFormat中可以使用setMaxInputSplitSize值进行设置。下面是CombineTextInputFormat进行切片的机制:

首先进行虚拟存储过程:

将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不 大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍, 那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值 2 倍,此时 将文件均分成 2 个虚拟存储块(防止出现太小切片)。

例如 setMaxInputSplitSize 值为 4M,输入文件大小为 8.02M,则先逻辑上分成一个 4M。剩余的大小为 4.02M,如果按照 4M 逻辑划分,就会出现 0.02M 的小的虚拟存储 文件,所以将剩余的 4.02M 文件切分成(2.01M 和 2.01M)两个文件。

切片过程:

  1. 判断虚拟存储的文件大小是否大于setMaxInputSplitSize的值,如果大于则单独形成一个切片
  2. 如果不大于则和下一个虚拟存储文件进行合并,共同形成一个切片
  3. 继续判断下一个切片

3.4 设置使用的InputFormat

在Driver类中进行配置

1
2
3
4
// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置 4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

Map和Reduce流程

在前面的流程当中,以及完成了切片信息的获取,Yarn根据切片信息决定启动Mapper的个数,以及每个Mapper对应的数据。

1. RecordReader

现在每个Mapper有了自己对应的数据,但是我们知道Mapper的输入是键值对形式的,所以需要有一个中间流程将数据转化成键值对的形式,这就是RecordReader做的事情。

在前面的TextInputFormat中我们也提到了它实现的createRecordReader中返回了LineRecordReader。这就是RecordReader的一种,它将数据文件组织成<偏移量, 每行内容>的键值对形式。

2. MapReduce流程

上面是MapReduce的全流程,我们可以看到,在第6步是Mapper里面的逻辑,是用户自定义实现的输入键值对,输出键值对逻辑。在第14步之后的Reducer,则是对应用户自定义实现的输入键值对组,输出键值对的逻辑。

在Mapper和Reducer之间,还有很多流程,这些流程统称为Shuffle过程。

Shuffle机制

1. shuffle过程

在前面的图中我们可以看到,在Mapper和Reducer之间还存在很多步骤,并不是一下到达的,这个过程我们称为Shffle过程。

  1. MapTask收集map方法输出的键值对,放在内存缓冲区中
  2. 内存缓冲区无法一次性存放所有的输出,于是会存在不断的溢写,可能会溢出多个文件
  3. 在溢写的过程中,需要将内存中的文件持久化到本地,在此之前会对文件内部的数据按照分区进行快排,快排的依据是key值。排序完成之后,形成本地文件
  4. 在过程中会可能会出现多个溢写文件,这些文件各自内部按照分区有序,但是最终需要形成一个分区有序的大文件,所以再对所有的溢写文件进行归并排序
  5. 在上面的合并过程中,可以出现Combiner过程(可选)
  6. 启动ReduceTask。ReduceTask按照自己的分区号,去各个MapTask机器上取得对应的分区数据
  7. 它从每个MapTask上远程拷贝相应的数据文件,如果文件大 小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到 一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者 数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上
  8. Reducer需要得到一个统一的输入文件,所以当所有数据拷贝完毕之后,需要再对这些不同文件进行归并排序
  9. 合成大文件之后,Shuffle的过程就结束了,后面进入ReduceTask的逻辑运算过程。

注意事项

  1. Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上来说,缓冲区越大,磁盘IO的次数越少,执行的速度也就越快

  2. 缓冲区的大小可以通过参数调整:mapreduce.task.io.sort.mb,默认为100M

  3. Shuffle过程在Map阶段结束之后,Reduce阶段开启之前。如果整个任务根本没有Reduce阶段,那么也就没有Shffle阶段,直接在Map阶段键值对输出那里就退出了。

2. Partition分区

在Reduce阶段,每个Reducer得到的是相同分区的数据。默认情况下进行的Partitioner分区如下:

1
2
3
4
5
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}

分区和Reducer的个数之间存在紧密联系,默认的分区逻辑如上所示,按照hashcode的值对Reducer的个数取余,得到不同的分区。

前面说到Mapper的个数和数据切片有关,但是这里Reducer的个数则可以由程序员直接指定,默认只开启一个Reducer。

实际情况中,指定多少个Reducer应该需要通过进行测试来确定。

如果要自定义实现分区逻辑的化,需要自定义类继承Partioner,并且重写其中的getPartition方法,具体可以查看后续笔记。

3. WritableComparable排序

在Shuffle过程中,存在多处排序操作,排序是MapReduce框架中最重要的操作之一。在Mapper溢写过程中,存在快排操作;多个溢写文件的合并,使用的是归并排序;在Reducer获取输入文件的时候,会进行归并排序。

MapTask和ReduceTask均会对数据按照key进行排序,该操作属于Hadoop的默认行为,任何应用程序中的数据均会被排序,而不管逻辑上是否需要。因此在指定键值对的时候(Mapper的输出键值对和Reducer的输入键值对,当然这两者是相同的),key能够排序是必要的。

默认排序按照字典顺序进行排序,如果使用的是自定义类作为key值的化,那么需要实现WritableComparable接口,重写其中的compareTo方法,具体可以查看后续笔记。

4. Combiner合并

Combiner是MapReduce程序中,Mapper和Reducer之外的一种组件,它出现的背景是为了减轻Reducer的负担,所以提前在Mapper阶段进行合并操作。它的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量。

Combiner和Reducer做的事情在逻辑上是一致的,而Combiner组件的父类正是Reducer。它和Reducer之间的区别在于运行的位置不同,Combiner是在每一个MapTask所在的节点运行的,而Reducer是接收全局的所有Mapper的输出结果。

我们可以自定义实现Combiner的逻辑,需要自定义Combiner继承Reducer,重写Reducer方法

1
2
3
4
5
6
public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
...//合并逻辑
}
}

之后在驱动Driver中进行配置

1
job.setCombinerClass(MyCombiner.class)

而我们前面也说了,Combiner和Reducer完成的事情基本是相同的,所以在可以使用Combiner的情况下,也可以这样指定:

1
job.setCombinerClass(MyReducer.class)

不过需要注意的是,并不是所有情况下都能使用Combiner的,是否能够使用Combiner,需要根据业务逻辑来进行判断。

OutputFormat数据输出

在Reducer得到键值对输出之后,需要最终输出得到结果。从键值对输出到结果,则是OutputFormat完成的事情。OutputFormat是MapReduce输出的基类,所有MapReduce的输出类都继承了OutputFormat类。

下面是MapReduce中自带的一些输出类,默认使用的是TextOutputFormat。

当然我们也可以自定义一个OutputFormat类,我们自定义的MyOutputFormat可以继承上面的类,并且实现里面的getRecordWriter方法。在getRecordWriter中需要返回一个RecordWriter,里面重写对应的write和close方法,来完成我们的输出逻辑。当然最后还是需要在驱动类中完成绑定。


Hadoop学习笔记-MapReduce(3)-MapReduce流程
http://example.com/2022/02/22/Hadoop学习笔记-MapReduce-3-MapReduce流程/
作者
EverNorif
发布于
2022年2月22日
许可协议