Hadoop源码-FileInputFormat在新旧API中切片操作的区别

源码对比

旧API

首先来查看旧API中的getSplits,所在的位置为package org.apache.hadoop.mapred.FileInputFormat,以下代码只保留了核心的切分逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
...
long totalSize = 0; // compute total size
...

List<FileStatus> files = new ArrayList<>(stats.length);
for (FileStatus file: stats) { // check we have valid files
...
files.add(file);
totalSize += file.getLen();
...
}

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

...
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
...
}

其中的重要参数:

  • totalSize:文件夹中所有文件大小总和

  • goalSize:目标分区大小,通过totalSize和传入参数numSplits计算得来(totalSize / numSplits

  • minSize:分区最小大小,由配置参数mapreduce.input.fileinputformat.split.minsize,为该配置参数与minSplitSize=1之间的最大值,默认情况下minSize值为1

  • blockSize:文件系统的块大小

  • splitSize:分区大小,通过以下方法确定

    1
    2
    3
    protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
    }

    默认情况下,如果goalSize不超过blockSize,则分区大小为goalSize;否则为blockSize

确认了分区大小之后,就可以进行切片操作。这里需要注意的是SPLIT_SLOP = 1.1,只有当剩余大小和分区大小的比值大于1.1才能继续切片,否则将剩余内容作为一个完整的切片。

新API

在新的API中,分区大小的计算逻辑进行改善,并且getSplits不再接受int numSplits参数,以下是新API中的getSplits,位置为org.apache.hadoop.mapreduce.lib.input.FileInputFormat,同样以下代码只保留了核心的切片逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);

...

for (FileStatus file: files) {
...
long length = file.getLen();

...

long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}

...

}
return splits;
}

其中的重要参数:

  • minSize:配置参数mapreduce.input.fileinputformat.split.minsize与1的最大值,在不配置的情况下,minSize=1

  • maxSize:配置参数mapreduce.input.fileinputformat.split.maxsize的值,在不配置的情况下,maxSize=Long.MAX_VALUE

  • blockSize:文件系统的块大小

  • splitSize:分区大小,通过以下方法确定

    1
    2
    3
    protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
    }

    默认情况下,分区大小为blockSize

新API中的分区逻辑改变不大,只是改变了传入参数,删减了goalSize。当然同样存在SPLIT_SLOP = 1.1的逻辑。


Hadoop源码-FileInputFormat在新旧API中切片操作的区别
http://example.com/2022/05/09/Hadoop源码-FileInputFormat在新旧API中切片操作的区别/
作者
EverNorif
发布于
2022年5月9日
许可协议