本文最后更新于:2022-05-09T10:56:56+08:00
                  
                  
                
              
            
            
              
                
                源码对比
旧API
首先来查看旧API中的getSplits,所在的位置为package org.apache.hadoop.mapred.FileInputFormat,以下代码只保留了核心的切分逻辑。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 
 | public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {...
 long totalSize = 0;
 ...
 
 List<FileStatus> files = new ArrayList<>(stats.length);
 for (FileStatus file: stats) {
 ...
 files.add(file);
 totalSize += file.getLen();
 ...
 }
 
 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
 FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
 
 ...
 long blockSize = file.getBlockSize();
 long splitSize = computeSplitSize(goalSize, minSize, blockSize);
 
 long bytesRemaining = length;
 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
 length-bytesRemaining, splitSize, clusterMap);
 splits.add(makeSplit(path, length-bytesRemaining, splitSize,
 splitHosts[0], splitHosts[1]));
 bytesRemaining -= splitSize;
 }
 
 if (bytesRemaining != 0) {
 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
 - bytesRemaining, bytesRemaining, clusterMap);
 splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
 splitHosts[0], splitHosts[1]));
 }
 ...
 }
 
 
 | 
其中的重要参数:
- totalSize:文件夹中所有文件大小总和
 
- goalSize:目标分区大小,通过- totalSize和传入参数- numSplits计算得来(- totalSize / numSplits)
 
- minSize:分区最小大小,由配置参数- mapreduce.input.fileinputformat.split.minsize,为该配置参数与- minSplitSize=1之间的最大值,默认情况下- minSize值为1
 
- blockSize:文件系统的块大小
 
- splitSize:分区大小,通过以下方法确定
 | 12
 3
 
 | protected long computeSplitSize(long goalSize, long minSize, long blockSize) {return Math.max(minSize, Math.min(goalSize, blockSize));
 }
 
 |  
 
- 默认情况下,如果- goalSize不超过- blockSize,则分区大小为- goalSize;否则为- blockSize。
 
确认了分区大小之后,就可以进行切片操作。这里需要注意的是SPLIT_SLOP = 1.1,只有当剩余大小和分区大小的比值大于1.1才能继续切片,否则将剩余内容作为一个完整的切片。
新API
在新的API中,分区大小的计算逻辑进行改善,并且getSplits不再接受int numSplits参数,以下是新API中的getSplits,位置为org.apache.hadoop.mapreduce.lib.input.FileInputFormat,同样以下代码只保留了核心的切片逻辑:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 
 | public List<InputSplit> getSplits(JobContext job) throws IOException {StopWatch sw = new StopWatch().start();
 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
 long maxSize = getMaxSplitSize(job);
 
 
 List<InputSplit> splits = new ArrayList<InputSplit>();
 List<FileStatus> files = listStatus(job);
 
 ...
 
 for (FileStatus file: files) {
 ...
 long length = file.getLen();
 
 ...
 
 long blockSize = file.getBlockSize();
 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
 
 long bytesRemaining = length;
 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
 splits.add(makeSplit(path, length-bytesRemaining, splitSize,
 blkLocations[blkIndex].getHosts(),
 blkLocations[blkIndex].getCachedHosts()));
 bytesRemaining -= splitSize;
 }
 
 if (bytesRemaining != 0) {
 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
 blkLocations[blkIndex].getHosts(),
 blkLocations[blkIndex].getCachedHosts()));
 }
 
 ...
 
 }
 return splits;
 }
 
 | 
其中的重要参数:
新API中的分区逻辑改变不大,只是改变了传入参数,删减了goalSize。当然同样存在SPLIT_SLOP = 1.1的逻辑。