本文最后更新于:2022-05-09T09:49:08+08:00
跨分区场景
我们知道Hadoop中会先对输入数据进行分片操作。针对每个分区,使用RecordReader
来读取数据,组织成键值对的形传给Map函数进行处理。默认情况下我们使用的是TextInputFormat
,它的读取逻辑是将文件按行读取,以行首字节在文件中的偏移量作为key进行传递。但是很有可能出现的情况是,数据分区的时候会将一行内容从中切开,即一个行记录出现在相邻的两个分区当中。读取到残缺的行显然是会影响实际的业务逻辑的,在Hadoop数据读取的源码中有相应的处理。
源码解析
TextInputFormat
中创建了一个RecordReader
,其中new了一个LineRecordReader
,该类用于读取一行。其中的关键逻辑是调用了LineReader
的readLine方法,该类的所在包为package org.apache.hadoop.util
LineReader
的readLine方法会根据是否有用户传入的分隔符走不同的逻辑,其中readDefaultLine方法使用的是默认的行分隔符(CR、LF、CRLF)
1 2 3 4 5 6 7 8 public int readLine (Text str, int maxLineLength, int maxBytesToConsume) throws IOException { if (this .recordDelimiterBytes != null ) { return readCustomLine(str, maxLineLength, maxBytesToConsume); } else { return readDefaultLine(str, maxLineLength, maxBytesToConsume); } }
LineReader
的readDefaultLine方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 private int readDefaultLine (Text str, int maxLineLength, int maxBytesToConsume) throws IOException { str.clear(); int txtLength = 0 ; int newlineLength = 0 ; boolean prevCharCR = false ; long bytesConsumed = 0 ; do { int startPosn = bufferPosn; if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0 ; if (prevCharCR) { ++bytesConsumed; } bufferLength = fillBuffer(in, buffer, prevCharCR); if (bufferLength <= 0 ) { break ; } } for (; bufferPosn < bufferLength; ++bufferPosn) { if (buffer[bufferPosn] == LF) { newlineLength = (prevCharCR) ? 2 : 1 ; ++bufferPosn; break ; } if (prevCharCR) { newlineLength = 1 ; break ; } prevCharCR = (buffer[bufferPosn] == CR); } int readLength = bufferPosn - startPosn; if (prevCharCR && newlineLength == 0 ) { --readLength; } bytesConsumed += readLength; int appendLength = readLength - newlineLength; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } if (appendLength > 0 ) { str.append(buffer, startPosn, appendLength); txtLength += appendLength; } } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); if (bytesConsumed > Integer.MAX_VALUE) { throw new IOException ("Too many bytes before newline: " + bytesConsumed); } return (int )bytesConsumed; }
在进行readLine的时候,我们总是从buffer中读取数据,如果buffer中的数据读取完毕,我们会加载下一批数据进入buffer。在读取一行的时候,我们会在buffer中寻找行结束符,然后确定一行的长度。注意这里的buffer读取是直接从文件中操作的,并不会受分区的限制,因此在读取的时候是可能跨分区读取的。因此在读取的时候,如果存在跨分区的行,我们也是会完整的读取 。
这样的操作就会导致一个问题,即我们需要判断上一个分区是否跨分区读取过,从而避免遗漏或者重复读取处于分区之间的行。
这里我们首先查看读取文件的LineRecordReader.next()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public synchronized boolean next (LongWritable key, Text value) throws IOException { while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { key.set(pos); int newSize = 0 ; if (pos == 0 ) { newSize = skipUtfByteOrderMark(value); } else { newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; } if (newSize == 0 ) { return false ; } if (newSize < maxLineLength) { return true ; } LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } return false ; }
其中的关键在于while的判断逻辑 。在切片规划中,我们的每个切片规划信息中包含filePath、start、length
,三者可以确定当前切片是在哪个文件中哪里起始,长度是多少。这里的end
即为start+length
。在一般的文件读取当中,我们读取的文件内容,行偏移量范围应该为[start, end)
,但是这里的while条件是<=end
。具体分析,切分规划有两种可能,一种是恰好切分到行尾,这样的话end
偏移量就是下一行的行首,按照while的逻辑我们会将这一行进行读取;另一种是将该分区中的最后一行从中间切开,那么end
偏移量还是在本行而不是下一行,按照readLine
的逻辑,我们会将这一行读取完毕。可以看到,上面两种逻辑,我们都读取了下一个分区中的第一行 。
因此,在LineRecordReader
中,除了第一个分区之外,会跳过每一个分区的第一行,从下一行开始读取。源代码中判断逻辑如下:
1 2 3 4 5 6 if (start != 0 ) { start += in.readLine(new Text (), 0 , maxBytesToConsume(start)); }
总结
总结来说,Hadoop中的切片操作是一种宏观 上的规划,为的是降低数据的规模,但是这是逻辑上的切片规划 。在实际读取数据到不同切片的时候存在微观上的差别 。在每个分区的数据读取中,我们会跳过第一行(除了第一个分区),并且读取下一个分区中的第一行(可能完整,也可能不完整),从而保证跨分区数据读取的正确性。