Hadoop学习笔记-MapReduce(6)-Join应用

背景描述

现在我们有两个文件,称为文件A和文件B,文件B更大,文件A更小。希望完成两个文件的Join操作,即使用文件A中的某个字段来替换B中的字段,Join依据为id相同。

这个任务的重点在于如何完成Join操作,我们可以选择在Reduce阶段完成,也可以在Map阶段完成。

Reduce Join

在这种情况下,Map阶段的工作就是将两个文件都读入,处理成键值对的形式。连接字段作为key,其余内容作为值。并且我们需要打上标签,来表示后面的内容是来自哪一张表的。

而在Reduce阶段,我们可以根据键值对中的标签来区分不同文件,之后再进行合并即可。

这里不知道如何完成的操作应该是如何判断键值对的内容来自哪一张表,可以通过以下过程来完成

1
2
3
4
5
//获取对应文件名称 context为上下文对象
InputSplit split = context.getInputSplit();
FileSplit fileSplit = (FileSplit) split;
filename = fileSplit.getPath().getName();

由于不同文件会产生不同的数据分片,也就会分配给不同的Mapper,所以每个Mapper只需要执行上面的过程一次即可,可以选择将其放在Mapper的setup函数当中(重写setup函数)

这种方案,合并的操作是在Reduce阶段完成的,Reduce阶段的处理压力太大,而Map阶段的运算负载很低,资源利用率不高,在Reduce阶段容易产生数据倾斜。

Map Join

Map join适用于一张表十分小,一张表很大的场景。具体思想就是在Map端缓存多张表,提前处理业务逻辑。增加Map端的业务,来减少Reduce端数据的压力,尽可能减少数据倾斜。

使用DistributedCache,具体流程如下:

  1. 在Mapper的setup阶段,将文件读取到缓存集合当中

    1
    2
    3
    4
    5
    6
    7
    8
    //通过缓存文件得到小表数据 pd.txt
    URI[] cacheFiles = context.getCacheFiles();
    Path path = new Path(cacheFiles[0]);
    //获取文件系统对象,并开流
    FileSystem fs = FileSystem.get(context.getConfiguration());
    FSDataInputStream fis = fs.open(path);
    //通过包装流转换为 reader,方便按行读取
    BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
  2. 在Driver驱动类中加载缓存

    1
    2
    3
    4
    //缓存普通文件到 Task 运行节点。
    job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
    //如果是集群运行,需要设置 HDFS 路径
    job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));

Hadoop学习笔记-MapReduce(6)-Join应用
http://example.com/2022/02/22/Hadoop学习笔记-MapReduce-6-Join应用/
作者
EverNorif
发布于
2022年2月22日
许可协议