本文最后更新于:2022-05-09T10:56:56+08:00
源码对比
旧API
首先来查看旧API中的getSplits
,所在的位置为package org.apache.hadoop.mapred.FileInputFormat
,以下代码只保留了核心的切分逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { ... long totalSize = 0; ... List<FileStatus> files = new ArrayList<>(stats.length); for (FileStatus file: stats) { ... files.add(file); totalSize += file.getLen(); ... }
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
... long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; }
if (bytesRemaining != 0) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } ... }
|
其中的重要参数:
totalSize
:文件夹中所有文件大小总和
goalSize
:目标分区大小,通过totalSize
和传入参数numSplits
计算得来(totalSize / numSplits
)
minSize
:分区最小大小,由配置参数mapreduce.input.fileinputformat.split.minsize
,为该配置参数与minSplitSize=1
之间的最大值,默认情况下minSize
值为1
blockSize
:文件系统的块大小
splitSize
:分区大小,通过以下方法确定
1 2 3
| protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); }
|
默认情况下,如果goalSize
不超过blockSize
,则分区大小为goalSize
;否则为blockSize
。
确认了分区大小之后,就可以进行切片操作。这里需要注意的是SPLIT_SLOP = 1.1
,只有当剩余大小和分区大小的比值大于1.1才能继续切片,否则将剩余内容作为一个完整的切片。
新API
在新的API中,分区大小的计算逻辑进行改善,并且getSplits
不再接受int numSplits
参数,以下是新API中的getSplits
,位置为org.apache.hadoop.mapreduce.lib.input.FileInputFormat
,同样以下代码只保留了核心的切片逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public List<InputSplit> getSplits(JobContext job) throws IOException { StopWatch sw = new StopWatch().start(); long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job);
List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); ... for (FileStatus file: files) { ... long length = file.getLen(); ... long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; }
if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } ... } return splits; }
|
其中的重要参数:
新API中的分区逻辑改变不大,只是改变了传入参数,删减了goalSize
。当然同样存在SPLIT_SLOP = 1.1
的逻辑。