Hadoop源码-TextInputFormat解决跨分区的行读取问题

跨分区场景

我们知道Hadoop中会先对输入数据进行分片操作。针对每个分区,使用RecordReader来读取数据,组织成键值对的形传给Map函数进行处理。默认情况下我们使用的是TextInputFormat,它的读取逻辑是将文件按行读取,以行首字节在文件中的偏移量作为key进行传递。但是很有可能出现的情况是,数据分区的时候会将一行内容从中切开,即一个行记录出现在相邻的两个分区当中。读取到残缺的行显然是会影响实际的业务逻辑的,在Hadoop数据读取的源码中有相应的处理。

源码解析

TextInputFormat中创建了一个RecordReader,其中new了一个LineRecordReader,该类用于读取一行。其中的关键逻辑是调用了LineReader的readLine方法,该类的所在包为package org.apache.hadoop.util

LineReader的readLine方法会根据是否有用户传入的分隔符走不同的逻辑,其中readDefaultLine方法使用的是默认的行分隔符(CR、LF、CRLF)

1
2
3
4
5
6
7
8
public int readLine(Text str, int maxLineLength,
int maxBytesToConsume) throws IOException {
if (this.recordDelimiterBytes != null) {
return readCustomLine(str, maxLineLength, maxBytesToConsume);
} else {
return readDefaultLine(str, maxLineLength, maxBytesToConsume);
}
}

LineReader的readDefaultLine方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 从文件中读取一行,行分隔符是CR('\r')、LF('\n')或者CRLF('\r\n')
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
str.clear();
int txtLength = 0; //tracks str.getLength(), as an optimization
int newlineLength = 0; //行结束符的长度(不同操作系统对此定义不同)
boolean prevCharCR = false; // 标记之前的字符是否是CR
long bytesConsumed = 0;
do {
int startPosn = bufferPosn; //starting from where we left off the last time
// 如果当前在buffer中的数据已经读取完毕,则加载一批数据到buffer中
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0; // 重置从buffer中读取的起始位置
if (prevCharCR) {
++bytesConsumed; //account for CR from previous read
}
bufferLength = fillBuffer(in, buffer, prevCharCR);
if (bufferLength <= 0) {
break; // EOF
}
}
// 获取一行的长度,在这里需要判断行结束符的长度,由于行结束符在不同操作系统中的定义不同,因此需要结合CR进行判断
for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
if (buffer[bufferPosn] == LF) {
newlineLength = (prevCharCR) ? 2 : 1;
++bufferPosn; // at next invocation proceed from following byte
break;
}
if (prevCharCR) { //CR + notLF, we are at notLF
newlineLength = 1;
break;
}
prevCharCR = (buffer[bufferPosn] == CR);
}
int readLength = bufferPosn - startPosn; // 读取一行的长度
if (prevCharCR && newlineLength == 0) {
--readLength; //CR at the end of the buffer
}
bytesConsumed += readLength;
int appendLength = readLength - newlineLength;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
// 读取到了一行或者超过读取长度了则退出
} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

if (bytesConsumed > Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: " + bytesConsumed);
}
return (int)bytesConsumed;
}

在进行readLine的时候,我们总是从buffer中读取数据,如果buffer中的数据读取完毕,我们会加载下一批数据进入buffer。在读取一行的时候,我们会在buffer中寻找行结束符,然后确定一行的长度。注意这里的buffer读取是直接从文件中操作的,并不会受分区的限制,因此在读取的时候是可能跨分区读取的。因此在读取的时候,如果存在跨分区的行,我们也是会完整的读取

这样的操作就会导致一个问题,即我们需要判断上一个分区是否跨分区读取过,从而避免遗漏或者重复读取处于分区之间的行。

这里我们首先查看读取文件的LineRecordReader.next()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public synchronized boolean next(LongWritable key, Text value)
throws IOException {

// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
key.set(pos);

int newSize = 0;
if (pos == 0) {
newSize = skipUtfByteOrderMark(value);
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
}

if (newSize == 0) {
return false;
}
if (newSize < maxLineLength) {
return true;
}

// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
}

return false;
}

其中的关键在于while的判断逻辑。在切片规划中,我们的每个切片规划信息中包含filePath、start、length,三者可以确定当前切片是在哪个文件中哪里起始,长度是多少。这里的end即为start+length。在一般的文件读取当中,我们读取的文件内容,行偏移量范围应该为[start, end),但是这里的while条件是<=end。具体分析,切分规划有两种可能,一种是恰好切分到行尾,这样的话end偏移量就是下一行的行首,按照while的逻辑我们会将这一行进行读取;另一种是将该分区中的最后一行从中间切开,那么end偏移量还是在本行而不是下一行,按照readLine的逻辑,我们会将这一行读取完毕。可以看到,上面两种逻辑,我们都读取了下一个分区中的第一行

因此,在LineRecordReader中,除了第一个分区之外,会跳过每一个分区的第一行,从下一行开始读取。源代码中判断逻辑如下:

1
2
3
4
5
6
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}

总结

总结来说,Hadoop中的切片操作是一种宏观上的规划,为的是降低数据的规模,但是这是逻辑上的切片规划。在实际读取数据到不同切片的时候存在微观上的差别。在每个分区的数据读取中,我们会跳过第一行(除了第一个分区),并且读取下一个分区中的第一行(可能完整,也可能不完整),从而保证跨分区数据读取的正确性。


Hadoop源码-TextInputFormat解决跨分区的行读取问题
http://example.com/2022/05/09/Hadoop源码-TextInputFormat解决跨分区的行读取问题/
作者
EverNorif
发布于
2022年5月9日
许可协议