Hadoop学习笔记-MapReduce(1)-MapReduce概述

MapReduce概述

MapReduce是一个分布式运算程序的编程框架,在这个框架下,我们很容易就能编写出基于Hadoop的应用。它的核心功能是将用户编写的业务逻辑代码和自带的默认组件整合成一个完整的分布式运算程序,并发地运行在一个Hadoop集群上。我们只需要基于这个框架进行业务逻辑的编写即可。

  • 优点:
    • 易于编程:用户只需要简单地实现一些接口,就可以完成一个分布式程序
    • 良好的扩展性:计算资源可以通过简单地增加机器来进行扩展
    • 高容错性:如果有节点挂了,上面的计算任务可以自动转移到另外的节点上运行,这个过程由Hadoop内部自动进行完成
    • 适合PB以上海量数据的离线处理
  • 缺点:
    • 不擅长进行实时计算
    • 不擅长流式计算:MapReduce的输入数据集是静态的
    • 不擅长DAG(有向无环图)的计算:即前一个程序的输出是后一个程序的输入。MapReduce可以完成这样的工作,但是由于每个MapReduce作业的输入结果都会写入到磁盘,完成这种情形会造成大量的磁盘IO,导致性能非常低下

Spark Streaming和Flink擅长流式计算。

Spark擅长DAG的计算。

MapReduce核心思想

一个MapReduce运算程序一般分为两个阶段,Map阶段Reduce阶段

  • 在Map阶段中,会有多个Mapper并发实例。它们完全并行运行,互不相干
  • 每个Mapper都是以键值对的形式读入数据,以键值对的形式输出数据,并且这里键值对的形式是可以修改和定义的
  • 在Reduce阶段中,同样有多个Reducer并发实例。它们完全并行运行,互不相干。一个Reducer对应后续的一个结果文件的输出
  • Reducer的输入和输出同样是键值对的形式,并且Reducer的输入来自于前面的Mapper的输出
  • MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业 务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行

MapReduce运行进程

一个完整的MapReduce程序在分布式运行的时候有三类实例进程:

  1. MrAppMaster:负责整个程序的过程调度以及状态协调
  2. MapTask:负责Map阶段的整个数据处理流程
  3. ReduceTask:负责Reduce阶段的整个数据处理流程

WordCount示例的实现

1. 描述

WordCount是官方提供的示例程序之一,也算是MapReduce编程的Hello World程序。它的输入是文件,输出是文件中每个单词(以空格分隔)出现的次数,并且会按照字典序升序进行排列。

首先这里先提供MapReduce常用的数据序列化类型。在指定键值对的类型的时候,需要用到的是Hadoop Writable类型。(如果是自定义类型的话,后续会讲到需要实现Writable接口)

Java类型 Hadoop Writable类型 Java类型 Hadoop Writable类型
Boolean BooleanWritable Double DoubleWritable
Byte ByteWritable String Text
Int IntWritable Map MapWritable
Float FloatWritable Array ArrayWritable
Long LongWritable Null NullWritable

这里需要注意String对应的是Text,其余类型只需要添加Writable后缀即可。

书写MapReduce程序需要遵循它的编程规范。需要用户编写的程序主要分成三个部分Mapper、Reducer和Driver。

Mapper阶段

  1. 用户自定义Mapper要继承父类Mapper
  2. 在泛型中指定输入和输出对应的KV对的类型
  3. 重写父类中的方法(可重写的方法包括setup、map、cleanup)
  4. 主要的业务逻辑写在map方法中
  5. map方法会对每个键值对执行一次

Reducer阶段

  1. 用户自定义的Reducer要继承父类Reducer
  2. 在泛型中指定输入和输出对应的KV对的类型,其中输入的键值对类型应该和Mapper的输出键值对类型相同
  3. 重写父类中的方法(可重写的方法包括setup、reduce、cleanup)
  4. 主要的业务逻辑写在reduce方法中
  5. reduce方法会对键值相同的键值对组执行一次。即一次得到的是一个key值以及对应的多个value值

Driver阶段

  1. Driver相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是 封装了MapReduce程序相关运行参数的job对象。
  2. 相当于整个MapReduce程序的入口
  3. 在这里我们会进行整个程序的关联
  4. 这里的代码基本类似,完成的是一些模板式的工作

2. 代码实现

在书写代码之前应该要准备相关的环境,使用Maven管理依赖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.3</version>
<scope>test</scope>
</dependency>
</dependencies>

以下代码的实现尤其需要注意导包的正确与否,需要导入正确的包才能正确地运行。

WordCountMapper

首先实现Mapper,命名为WordCountMapper。我们自己写的类需要继承Mapper类。

这里,由于我们没有修改输入的形式,所以采用的是默认的方式,即按行读取文件。传入的是键值对<long, String>的形式,前者表示当前的文件指针的偏移值,后者表示这一行的内容。于是在Mapper中指定,应该使用hadoop writable类型,为<LongWritable, Text>

在Map阶段,我们会将每个词都分出来,并且每分出一个词,就为它计数为1,这样,Reduce阶段就只需要将每个词的的计数进行对应相加,就可以得到这个词出现的总次数。因此Map阶段的输出键值对应该是这样的形式<Text, IntWritable>,前者表示词word的内容,后者表示出现了一次(1)。

之后我们需要重写父类中的map方法,具体完成的逻辑就是将每一行取出来,然后对这一行的内容进行一个拆分,得到一个个对应的word,每个word都以键值对的形式写入。写入利用函数中传入的context对象,这是贯穿过程始终的上下文对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outK = new Text();
private IntWritable outV = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// 这里的key就是偏移量,value就是一行的内容
String line = value.toString();
String[] words = line.split(" ");

for (String word : words) {
outK.set(word);
// 写入输出键值对
context.write(outK, outV);
}
}
}

WordCountReducer

之后实现Reducer。这里命名为WordCountReducer,同样需要继承对应的Reducer。

指定键值对泛型中的类型,输入键值对的类型同Map阶段输出的类型一致,所以是<Text, IntWritable>。Reduce阶段完成对word出现次数的统计,输出每个词出现的次数,所以输出的键值对类型也是<Text, IntWritable>

确定了键值对的类型之后,就是实现父类中的reduce方法。这个reduce方法会对每个key值相同的所有键值对组执行一次。这里,传入的键值对组就表示这个key在各个地方出现的次数,我们只需要完成累加即可传出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outV = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
// key对应一个key,values对应在所有得到的键值对中key对应的所有values
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
outV.set(sum);
// key表示word内容,无需改写
context.write(key, outV);
}
}

WordCountDriver

之后需要完成Driver的书写。Driver中完成的任务是流程化,模板化的,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 固定套路
// 1. 获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2. 设置jar包路径
// 通过反射机制可以获取全类名,后续也是同样的原理
job.setJarByClass(WordCountDriver.class);

// 3. 关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

// 4. 设置map输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// 5. 设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 6. 设置输出路径和输出路径
// FileInputFormat.setInputPaths(job, new Path("D:/input/inputword"));
// FileOutputFormat.setOutputPath(job, new Path("D:/output/output1"));
// 动态获取输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7. 提交job(job成功完成返回ture)
boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);
}
}

3. 本地测试

在本地运行需要在本地也存在Hadoop环境才可以运行。准备好对应的环境之后,运行WordCountDriver类即可。

4. 提交到集群测试

提交到集群上运行首先需要将程序打成jar包然后上传到服务器端。

之后在服务器端运行即可;

1
hadoop jar wc.jar com.syh.mapreduce.wordcount.WordCountDriver /input /output

Hadoop学习笔记-MapReduce(1)-MapReduce概述
http://example.com/2022/02/20/Hadoop学习笔记-MapReduce-1-MapReduce概述/
作者
EverNorif
发布于
2022年2月20日
许可协议