本文最后更新于:2023-08-16T21:27:07+08:00
Parquet
简介
Apache Parquet
是由Twitter和Cloudera合作开发的列式存储项目,也是Google在2010年发表的Dermel论文中描述的内部列式存储格式的开源实现。与传统的列式存储相比,Parquet最大的特点就是支持嵌套格式数据的列式存储。对于大规模的嵌套格式数据来说,Parquet的原生支持能够减少规则化嵌套数据的开销,提升处理大规模数据的效率。
Parquet是一种存储格式,它与语言和平台无关,不需要与任何一种数据处理框架绑定。Parquet在大数据处理框架中十分常用,如果说
HDFS 是大数据时代文件系统的事实标准的话,Parquet
就是大数据时代存储格式的事实标准,有许多组件都支持Parquet存储格式,包括下面的组件:
- 查询引擎:Hive、Impala、Pig、Presto、Drill、Tajo、HAWQ、IBM Big
SQL
- 计算框架:MapReduce、Spark、Cascading、Crunch、Scalding、Kite
- 数据模型:Avro、Thrift、Protocol Buffers、POJOs
parqeut文件默认是无法直接查看的,不过我们可以通过一些工具进行查看。其中一个是parquet-tools
,该工具可以利用pip进行下载,下载之后就可以在命令行中查看parquet文件。(还有一种parquet-tools是以jar包形式存在的,但是目前在官方的github项目地址中似乎已经找不到了。)
1
| pip install parquet-tools -i https://pypi.tuna.tsinghua.edu.cn/simple
|
当然我们也可以在JetBrains中下载Plugin Big Data
Tools,该插件提供了大数据生态的相关工具,其中之一就包含对parquet文件的查看支持。
数据模型
Schema
要理解Parquet的存储格式,首先需要了解它的数据模型。
Parquet支持嵌套的数据模型,数据模型可以利用Schema来描述。在Parquet中,一个数据模型的Schema最上层是message
,其中可以包含一系列字段。每个字段具有三个属性,分别是重复性(Repetition)、类型(Type)以及名称(Name)。其中字段的类型可以是原子类型,例如int、boolean、string等,也可以是一个group,这就表示这个字段是一个嵌套字段。字段的重复性则可以有下面三种情况:
required
:出现有且仅有一次
optional
:可以出现0次或者1次
repeated
:可以出现0次或者多次
这样定义的数据模型非常简洁,同时也具有强大的表达能力。一些复杂的数据类型例如Map、List和Set等也可以利用repeated字段+groups来表示,因此也就不需要再额外对这些类型进行单独定义。
1 2 3 4 5 6
| message ExampleMap { repeated groups Entry { required string key; optional string value; } }
|
这个数据Schema可以用来表示Map数据结构。在一个Map中可以有多个Entry对象,即键值对。每个Entry中又包含了key和value,其中key是必须的,value是可选的。
1 2 3
| message ExampleList { repeatd string ListElement; }
|
这个数据Schema可以表示List或者Set,这表示字段listElemet可以重复出现。
Parquet中提出以树状层级的形式来组织Schema中的字段,树的每个叶子结点对应一个原子类型字段。如此这个模型就可以同时覆盖嵌套结构数据和扁平结构数据(扁平结构数据只是嵌套结构数据的一种特例)。如下是一个数据Schema的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| message Document { required int64 DocId; optional group Links { repeated int64 Backward; repeated int64 Forward; } repeated group Name { repeated group Language { required string Code; optional string Country; } optional string Url; } }
|
这个Schema可以表示成下面的树状结构:
考虑现在存在符合上面Schema的两份数据,数据如下,我们需要将其保存起来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| --Data 1: DocId: 10 Links: Forward: 20 Forward: 40 Forward: 60 Name: Language: Code: 'en-us' Conutry: 'us' Language: Code: 'en' Url: 'http://A' Name: Url: 'http://B' Name: Language: Code: 'en-gb' Country: 'gb'
--Data 2: DocId: 20 Links: Backward: 10 Backward: 30 Forward: 80 Name: Url: 'http://C'
|
如果将这样的数据按行存储成关系型表,我们应该会得到如下的表结果:
10 |
|
20 |
'en-us' |
'us' |
'http://A' |
10 |
|
20 |
'en' |
|
'http://A' |
10 |
|
20 |
|
|
'http://B' |
10 |
|
20 |
'en-gb' |
'gb' |
|
10 |
|
40 |
'en-us' |
'us' |
'http://A' |
10 |
|
40 |
'en' |
|
'http://A' |
10 |
|
40 |
|
|
'http://B' |
10 |
|
40 |
'en-gb' |
'gb' |
|
10 |
|
60 |
'en-us' |
'us' |
'http://A' |
10 |
|
60 |
'en' |
|
'http://A' |
10 |
|
60 |
|
|
'http://B' |
10 |
|
60 |
'en-gb' |
'gb' |
|
20 |
10 |
|
|
|
'http://C' |
20 |
30 |
|
|
|
'http://C' |
20 |
|
80 |
|
|
'http://C' |
可以看到,为了表示这种嵌套结构,按行存储的关系表必须将所有的嵌套都进行扁平化,因此这张表一共得到了六列六个字段。但是这会导致相关字段的重复存储,造成冗余,同时会有大量字段为空。而Parquet是按列进行存储的,它仅需要存储对应的最小粒度的原子类型字段即可。对应到Schema的树结构,则仅需要在叶子结点上进行存储即可。下面给出Parquet列式存储模型的示意图:
Striping/Assembly算法
按照上面Parquet列式存储的示意图,我们进行存储的时候仅存储Value是不够的,因为仅存储Value,我们无法将其还原成原始的嵌套结构。例如在上面的例子中,观察Name.Language.Code
中的三个值,两两比较。如果仅存放Value,我们无法确定这两个Code是属于同一个Name的不同Language,还是属于不同Name的不同Language。
而为了能够在列式存储的同时完整地保留嵌套结构信息,Parquet使用Striping/Assembly算法。具体来说,在进行列式存储的时候,除了存储每一个具体的value,还同时存储其他信息以供嵌套结构的还原。这些其他信息包括两个数据,Repetition
Level和Definition Level
Repetiton Level
对于每个Value,它都有对应的一个Repetition
Level。该值表示在该字段路径上哪个节点进行了重复。考虑Schema的树状结构,从根节点出发到达该字段有一条路径。对于当前值和前一个值来说,分别可以得到两条路径,这两条路径的最后一个公共节点所处的层级数,就是当前值对应的repetition
levels,则这个值就是对应的层数。简短一些说,就是当前值所在节点和前一个值所在节点的最近的公共父节点所处的层级。
考虑上面例子中存储的Forward属性,将所有的Forwad
Value映射到树形结构中,可以得到如下的路径:
由于这个例子的树形结构最多只有4层,因此Repetition
Level的范围对应第一到第三层,值的范围就是0~2。因此对于第一个20,它没有前一个值,所以Repetition
Level为0。对于40来说,它和第一个值20,最近公共父节点是Links,所在树的层级是1,因此Repetition
Level为1。对于60来说也是同样的分析。而对于80来说,
它与60的最近公共父节点是Document,因此Repetition Level为0。
Definition Level
但是仅有Repetition
Level是不够的,因为repeated和optional类型的存在,它还无法完全保留嵌套结构信息。因为有可能一条记录中的某一列是没有值的,假设我们不记录这样的值,则会导致本该属于下一条记录的值被当作当前记录的值,从而造成数据结构错误。因此对于这种情况我们需要填充一个占位符来表示。
Definition
Level值的含义表示当前字段路径上有多少个optional或repeated的字段实际进行了定义。这个值只对填充的空值有意义。因为如果是非空值,则表示该值路径上的所有可选字段都已经进行了定义,那么这个值总是等于该路径上所有的Option字段总数。
考虑上面的Name.Language.Code
字段,按照顺序以此在一棵树上列出所有的值路径,同时注意填充空值,因此我们可以得到如下的表示。其中有实际值的字段,Definition
Level值均为2,这表示前面的Name和Language字段都出现了。而为空的字段,两者路径上的Name都是有定义的,但是Language没有,因此Definition
Level的值为1。
这里需要注意的是,实际上空值是不需要存储的,因为按照Definition
Level的特性,只要该值小于路径上Definition
Level的最大值,则表示当前值就是一个空值
总结来说,为了得到实际的列式存储模型,对于每个字段,我们需要得到对应值的路径,同时需要注意空值的填充。之后按照定义计算Repetition
Level与Definition
Level。最终,我们可以得到上面例子的最终列式存储模型,如下图所示:
存储格式
上面介绍了Parquet使用的数据模型,接下来介绍Parquet的存储格式。Parquet文件是以二进制方式存储,不可以直接读取。在Parquet文件中包含该文件的数据和元数据,即文件是自解析的。
这里首先介绍一些在HDFS文件系统和Parquet文件中的相关概念:
- HDFS块(Block):它是HDFS上的最小的副本单位,HDFS会把一个Block存储在本地的一个文件并且维护分散在不同的机器上的多个副本,通常情况下一个Block的大小为256M、512M等。
- HDFS文件(File):一个HDFS的文件,包括数据和元数据,数据分散存储在多个Block中。
- 行组(Row
Group):按照行将数据物理上划分为多个单元,每一个行组包含一定的行数,在一个HDFS文件中至少存储一个行组,Parquet读写的时候会将整个行组缓存在内存中,所以如果每一个行组的大小是由内存的大小决定的,例如记录占用空间比较小的Schema可以在每一个行组中存储更多的行。
- 列块(Column
Chunk):在一个行组中每一列保存在一个列块中,行组中的所有列连续的存储在这个行组文件中。一个列块中的值都是相同类型的,不同的列块可能使用不同的算法进行压缩。
- 页(Page):每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块的不同页可能使用不同的编码方式。
并行化的基本单元:
- MapReduce:一个MapReduce任务对应一个文件或者一个Row Group
- IO:任务中的IO以Column Chunk为单位读取
- Encoding :编码以Page为单位
下图展示了Parquet的文件格式:
一个文件中可以存储多个行组,文件的首位都是该文件的Magic
Code,用于校验它是否是一个Parquet文件,Parquet 文件格式是自解析的,采用
thrift 格式定义的文件 schema
以及其他元数据信息一起存储在文件的末尾。文件的元数据中包括每一个行组的元数据信息和该文件存储数据的Schema信息。读取的时候首先从文件末尾读取文件元数据信息,再在其中找到感兴趣的
Column Chunk
信息,并依次读取。文件元数据信息放在文件最后是为了方便数据依序一次性写入。
Parquet 总共有3种类型的元数据:文件元数据、列(块)元数据和 page
header 元数据。所有元数据都采用 thrift
协议存储。每一页的开始都会存储该页的元数据
,在Parquet中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,其中就存储了上面我们在数据模型中介绍过的Value、Repetition
Level和Definition
Level。字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引。
在数据类型方面。Parquet
只保留了最精简的部分数据类型,以方便存储和读写。对于其他数据类型,可以使用逻辑类型(Logical
Types)进行扩展,比如:逻辑类型 strings 就映射为带有 UTF8 标识的二进制
byte arrays 进行存储。下面是Parquet中的基本数据类型:
BOOLEAN
: 1 bit boolean
INT32
: 32 bit signed ints
INT64
: 64 bit signed ints
INT96
: 96 bit signed ints
FLOAT
: IEEE 32-bit floating point values
DOUBLE
: IEEE 64-bit floating point values
BYTE_ARRAY
: arbitrarily long byte arrays
总结来说,Parquet存储格式关键点有这几个,首先是Schema信息或者说是元信息存放在文件的尾部;其次是对于数据,会首先按照行进行划分成为多个行组Raw Group
,每个行组又包含多个Column
Chunk,每一列的数据都存放在Column Chunk
中。而每个Column
Chunk中又包含多个Page,每个Page存放一个Value,以及对应的Repetition
Level以及Definition Level。
Java读写parquet文件
写文件
为了在Java中完成parquet文件的读写,我们需要引入如下依赖,当然这里的版本可以根据需要进行调整。
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop</artifactId> <version>1.12.3</version> </dependency>
|
我们首先介绍parquet文件的写入。首先我们需要创建相关的实体类,这里就以上面的Document为例,创建相关的实体类如下,相关get,set,toString,构造方法等没有列出,仅是展示了相关的属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class Document { private Long docId; private Link link; private Name name; }
public class Link { private Long backward; private Long forward; }
public class Name { private String url; private Language language; }
public class Language { private String code; private String country; }
|
需要注意的一点是这里我们在parquet
schema中指定了int64,对应的是Long类型。如果类型不对,后续会出现解析错误的报错。
之后就开始学习如何写入parquet文件。为了写入文件,首先我们需要定义parquet的schema。下面的方法调用相关API进行schema的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public MessageType getDocumentSchema() { Types.GroupBuilder<GroupType> linksSchemaBuilder = Types.buildGroup(Type.Repetition.OPTIONAL); linksSchemaBuilder.repeated(PrimitiveType.PrimitiveTypeName.INT64).named("backward") .repeated(PrimitiveType.PrimitiveTypeName.INT64).named("forward"); GroupType linksSchema = linksSchemaBuilder.named("links");
Types.GroupBuilder<GroupType> languageSchemaBuilder = Types.buildGroup(Type.Repetition.REPEATED); languageSchemaBuilder .required(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("code") .optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("country"); GroupType languageSchema = languageSchemaBuilder.named("language");
Types.GroupBuilder<GroupType> nameSchemaBuilder = Types.buildGroup(Type.Repetition.REPEATED); nameSchemaBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(LogicalTypeAnnotation.stringType()).named("url"); nameSchemaBuilder.addField(languageSchema); GroupType nameSchema = nameSchemaBuilder.named("name");
Types.MessageTypeBuilder documentSchemaBuilder = Types.buildMessage(); documentSchemaBuilder.required(PrimitiveType.PrimitiveTypeName.INT64).named("docId"); documentSchemaBuilder.addFields(linksSchema, nameSchema); return documentSchemaBuilder.named("document"); }
|
对于Schema定义来说,我们最终需要得到的是一个MessageType
类型的对象。可以看到上面有我们属性的方法,例如option,repeated,group等,分别用于构造不同类型的字段。需要注意上面方法中如何对嵌套的Schema进行定义。对上面的方法得到的对象进行输出,我们可以得到如下的结果,可以看到与我们之前的介绍是一致的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| message document { required int64 docId; optional group links { repeated int64 backward; repeated int64 forward; } repeated group name { optional binary url (STRING); repeated group language { required binary code (STRING); optional binary country (STRING); } } }
|
这种构造Schema的方法是调用相关API,但是对于稍微复杂一些的嵌套数据来说,整个定义过程会非常繁琐,于是还有另外一种方式可以可以进行Schema定义。MesspageTypeParser
可以通过给定的字符串来解析出对应的MessageType对象,相较于上面一种方法来说,这种方法更加直观,也更加容易使用。当然其中需要注意的就是对于Schema字符串的书写正确性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public MessageType getDocumentSchemaV2() { MessageType messageType = MessageTypeParser.parseMessageType("message document {\n" + " required int64 docId;\n" + " optional group links {\n" + " repeated int64 backward;\n" + " repeated int64 forward;\n" + " }\n" + " repeated group name {\n" + " optional binary url (STRING);\n" + " repeated group language {\n" + " required binary code (STRING);\n" + " optional binary country (STRING);\n" + " }\n" + " }\n" + "}"); return messageType; }
|
至此,我们已经得到了代表Document的Schema信息,之后就可以进入写入环节。写入parquet文件的核心代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public void write(List<Document> documents, String targetPath) throws IOException { MessageType documentSchema = getDocumentSchema(); ParquetWriter<Group> parquetWriter = ExampleParquetWriter.builder(new Path(targetPath)) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) .withCompressionCodec(CompressionCodecName.SNAPPY) .withType(documentSchema).build(); SimpleGroupFactory simpleGroupFactory = new SimpleGroupFactory(documentSchema); for (Document document : documents) { Group group = simpleGroupFactory.newGroup(); group.add("docId", document.getDocId());
Group links = group.addGroup("links"); links.add("backward", document.getLink().getBackward()); links.add("forward", document.getLink().getForward());
Group name = group.addGroup("name"); Group language = name.addGroup("language"); language.add("code", document.getName().getLanguage().getCode()); language.add("country", document.getName().getLanguage().getCountry()); name.add("url", document.getName().getUrl());
parquetWriter.write(group); } parquetWriter.close(); }
|
我们首先需要获取写入对象,之后就可以进行写入了。同样需要注意这里写入的层级关系。之后我们可以调用测试方法,完成写入:
1 2 3 4 5 6 7 8 9 10 11 12
| @Test public void testWrite() throws IOException { MyParquetWriter myParquetWriter = new MyParquetWriter();
Document document = new Document(10l, new Link(20l, 30l), new Name("http://A", new Language("en", "us"))); ArrayList<Document> documents = new ArrayList<>(); documents.add(document); String path = this.getClass().getClassLoader().getResource("").getPath();
myParquetWriter.write(documents, path + "document.parquet"); }
|
调用方法之后,就可以在生成的target/test-classes
目录下看见生成的document.parquet
文件,以及一个校验文件。我们可以使用前面提到的相关工具来查看parquet文件,例如parquet-tools
。利用parquet-tools,我们可以查看新生成的文件,效果如下,可以看到的确是写入成功了。
1 2 3 4 5 6
| parquet-tools show .\document.parquet +---------+-----------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+ | docId | links | name | |---------+-----------------------------------------------------------------------------+-------------------------------------------------------------------------------------------| | 10 | {'backward': array([20], dtype=int64), 'forward': array([30], dtype=int64)} | [{'url': 'http://A', 'language': array([{'code': 'en', 'country': 'us'}], dtype=object)}] | +---------+-----------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+
|
读文件
读文件的流程则相对简单,核心代码如下,这里可以按照字段名称进行读取,也可以按照偏移量进行读取。对于其中的Group对象,也可以使用getType
等方法获取对应的元信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public void read(String path) throws IOException { GroupReadSupport readSupport = new GroupReadSupport(); ParquetReader.Builder<Group> builder = ParquetReader.builder(readSupport, new Path(path)); ParquetReader<Group> reader = builder.build();
Group line = null; while ((line = reader.read()) != null) { long docId = line.getLong("docId", 0); Group links = line.getGroup("links", 0); Group name = line.getGroup("name", 0);
long backward = links.getLong("backward", 0); long forward = links.getLong("forward", 0);
Group language = name.getGroup("language", 0); String code = new String(language.getBinary("code", 0).getBytes()); String country = new String(language.getBinary("country", 0).getBytes()); String url = new String(name.getBinary("url", 0).getBytes());
System.out.println("docId: " + docId); System.out.println("links.backward: " + backward); System.out.println("links.forward: " + forward); System.out.println("name.url: " + url); System.out.println("name.language.code: " + code); System.out.println("name.language.country: " + country); } }
|
执行测试代码,读取刚才我们生成的parquet文件。
1 2 3 4 5 6 7
| @Test public void testRead() throws IOException { MyParquetReader myParquetReader = new MyParquetReader();
String path = this.getClass().getClassLoader().getResource("document.parquet").getPath(); myParquetReader.read(path); }
|
输出如下,同样可以看到是正确读出了其中的内容。
1 2 3 4 5 6
| docId: 10 links.backward: 20 links.forward: 30 name.url: http://A name.language.code: en name.language.country: us
|
读取Schema
我们同样可以读取parquet中处于文件末尾的元信息,然后解析出相关的schema,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public void readSchema(String path) throws IOException { Path file = new Path(path); ParquetMetadata metadata = ParquetFileReader.readFooter(new Configuration(), file); MessageType schema = metadata.getFileMetaData().getSchema();
for (Type field : schema.getFields()) { printField(field); } }
public void printField(Type field) { if (field.isPrimitive()) { String typeName = field.asPrimitiveType().getPrimitiveTypeName().name(); OriginalType originalType = field.asPrimitiveType().getOriginalType(); String originalName = originalType != null ? originalType.name() : ""; System.out.println(field.getName() + ":" + typeName + ":" + originalName); } else { GroupType groupType = field.asGroupType(); for (Type groupTypeField : groupType.getFields()) { printField(groupTypeField); } } }
|
需要注意的是,只有当字段Field是基本字段,我们才能得到相关的基本类型信息,否则需要进一步进行字段的获取。因此这里我们使用了一个递归函数进行字段信息的获取。同样借助刚才生成的document.parquet
进行测试,可以得到如下的结果输出:
1 2 3 4 5 6
| docId:INT64: backward:INT64: forward:INT64: url:BINARY:UTF8 code:BINARY:UTF8 country:BINARY:UTF8
|
参考文章
- 一文讲透大数据列存标准格式
- Parquet | Liam's Blog
- Parquet文件存储格式详细解析
- Overview |
Apache Parquet
- Java方式对Parquet文件进行文件生成和解析
- parquet-tools工具使用
- java解析parquet文件