Hive学习笔记-Hive优化(1)-Hive性能优化

Hive分区表,分桶表和索引

Hive的设计思想是通过元数据解析描述后,将HDFS上的文件映射成表。当用户通过HQL语句对Hive中的表进行复杂数据处理和计算时,默认将其转换成分布式计算MapReduce程序,然后读取HDFS中的数据。

在执行查询计划的时候,Hive会使用表的最后一级目录作为底层处理数据的输入。如果没有特殊处理,最后一级目录就是表的目录,里面存放了表中的所有数据,因此这个时候就是全表扫描。通过explain命令可以查看执行计划,其中可以看到实际读取的HDFS文件路径。

分区表会将不同分区的数据存放在单独的HDFS目录中,这样在查询的时候,根据查询条件,就可以只读取对应分区的数据作为输入,减少不必要的数据加载,提高程序的性能。

分桶表则可以解决Hive中join的问题。这点会在后续join问题部分进行深入介绍。

在Hive中还存在索引的设计。Hive允许用户为字段构建索引,提高数据的查询速率。当为某张表的某个字段创建索引时,Hive中会自动创建一张索引表,该表中记录了该字段的每个值与数据实际物理位置之间的关系,例如数据所在的HDFS文件地址,所在文件中的偏移量offset等信息。

但是索引功能的支持从Hive0.7版本开始支持,但是到Hive3.0开始不再支持。因为Hive中构建和维护索引的操作过于复杂,首先Hive构建索引的过程是通过一个MapReduce程序来实现的,并且每次Hive中原始数据表的数据发生更新的时候,索引表不会自动更新,而必须手动执行一个alter index命令来通过MapReduce再次更新索引表,导致整体性能较差,维护相对繁琐。

Hive表数据优化

文件格式

Hive数据的存储底层还是HDFS,所有的数据读写都基于HDFS文件来实现。为了提高对HDFS文件读写的性能,Hive中提供了多种文件存储格式,包括TextFile、SequenceFile、ORC、Parquet等,在建表的时候进行指定。

TextFile是Hive中默认的文件格式,存储形式为按行存储。T

  • 优点:最简单的数据格式,可以直接查看,可以使用任意分隔符进行分割
  • 缺点:耗费存储空间,IO性能较低。结合压缩的时候Hive不进行数据切分合并,不能进行并行操作,查询效率低。按行存储,读取列的性能差

SequenceFile是Hadoop中用来存储序列化键值对的一种文件格式,可以作为MapReduce作业的输入输出,因此Hive也支持这种格式

  • 优点:以二进制KV形式存储数据,与底层交互更加友好,性能更快。可压缩、可分割、优化磁盘利用率和I/O,可并行操作数据,查询效率高
  • 缺点:存储空间消耗最大,与Hadoop生态系统之外的工具不兼容

Parquet是一种支持嵌套结果的列式存储文件格式,作为大数据系统中OLAP查询的优化方案,它已经被多种查询引擎原生支持,并且部分高性能引擎将其作为默认的文件存储格式

  • 优点:更高效的压缩和编码,可压缩、可分割、优化磁盘利用率和IO,可用于多种数据处理框架
  • 缺点:不支持update、insert、delete、ACID

ORC(OptimizedRC File)也是一种Hadoop生态圈中的列式存储格式。最初产生自Hive,用于降低Hadoop数据存储空间和加速Hive查询速度。

  • 优点:列式存储,存储效率非常高。可压缩,高效的列存取。查询效率高,支持索引,支持矢量化查询
  • 缺点:加载时性能消耗较大,读取全量数据时性能较差

需要注意的是,以上的文件格式除了TextFile,都无法直接通过数据导入形成。因为load命令只会完成数据的纯移动或者复制,而不会修改数据的文件格式。正确的方式应该是在创建表的时候指定文件格式,然后利用insert+select从原始表中查询后插入指定了文件格式的表中。

数据压缩

Hive压缩实际上说的就是MapReduce的压缩,可以指定不同阶段的压缩算法。Hadoop中支持的压缩在Hive中都可以直接使用。

在Hive中使用压缩,需要对MapReduce和Hive进行相应的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
--开启hive中间传输数据压缩功能
--1)开启hive中间传输数据压缩功能
set hive.exec.compress.intermediate=true;
--2)开启mapreduce中map输出压缩功能
set mapreduce.map.output.compress=true;
--3)设置mapreduce中map输出数据的压缩方式
set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec;

--开启Reduce输出阶段压缩
--1)开启hive最终输出数据压缩功能
set hive.exec.compress.output=true;
--2)开启mapreduce最终输出数据压缩
set mapreduce.output.fileoutputformat.compress=true;
--3)设置mapreduce最终数据输出压缩方式
set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
--4)设置mapreduce最终数据输出压缩为块压缩
set mapreduce.output.fileoutputformat.compress.type=BLOCK;

存储优化

存储优化的第一个问题是小文件的问题。

Hive的存储本质是HDFS,而HDFS并不利于小文件的存储。在使用Hive进行处理分析的时候,要尽量避免小文件的生成。Hive中提供了一个特殊的机制,可以自动判断是否是小文件,如果是小文件可以自动将小文件进行合并

1
2
3
4
5
6
7
8
-- 如果hive的程序,只有maptask,将MapTask产生的所有小文件进行合并
set hive.merge.mapfiles=true;
-- 如果hive的程序,有Map和ReduceTask,将ReduceTask产生的所有小文件进行合并
set hive.merge.mapredfiles=true;
-- 每一个合并的文件的大小(244M)
set hive.merge.size.per.task=256000000;
-- 平均每个文件的大小,如果小于这个值就会进行合并(15M)
set hive.merge.smallfiles.avgsize=16000000

上面解决的情况是输出为小文件的情况,而Hive中也提供一种输入类CombineHiveInputFormat用于解决输入是小文件的情况,它将小文件进行合并之后再进行处理

1
2
-- 设置Hive中底层MapReduce读取数据的输入类:将所有文件合并为一个大文件作为输入
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

存储优化的第二个问题是ORC的文件索引。

在使用ORC文件时,为了加快读取ORC文件中的数据内容,ORC提供了两种索引机制:Row Group IndexBloom Filter Index,可以帮助提高查询ORC文件的性能。当用户写入数据时,可以指定构建索引,当用户查询数据时,可以根据索引提前对数据进行过滤,避免不必要的数据扫描。

一个ORC文件会包含一个或者多个stripes,可以理解为原始数据的一部分。每个stripe中,包含了其中数据的每个column的min/max。当查询中有大小判断的操作时,可以根据最大最小值,跳过一定不包含结果的stripe。这里为每个stripe建立的包含min/max值的索引,就称为Row Group Index。建立ORC格式表的时候,指定表参数orc.create.index=true之后就会建立Row Group Index。而为了使得该索引有效利用,向表中加载数据的时候,必须对需要使用索引的字段进行排序

1
2
3
4
5
6
7
8
9
---存储优化-ORC文件索引
--1、开启索引配置
set hive.optimize.index.filter=true;
--2、创建表并制定构建索引
create table table_name
stored as orc tblproperties ("orc.create.index"="true")
as select * from table_source
distribute by stime
sort by stime;

Bloom Filter Index即为字段建立布隆过滤器的数据结构。当查询条件中包含对该字段的等值过滤时,可以通过布隆过滤器判断该stripe中是否存在,如果返回不存在则直接跳过该stripe。建表的时候通过指定表参数orc.bloom.filter.columns=columnName...来指定为哪些字段建立布隆过滤器索引

1
2
3
4
5
6
--创建表指定创建布隆索引
create table table_name
stored as orc tblproperties ("orc.create.index"="true","orc.bloom.filter.columns"="stime,userid")
as select * from table_source
distribute by stime
sort by stime;

Hive的默认查询执行引擎一次处理一行,而矢量化查询执行是一种Hive针对ORC文件操作的特性,目的是按照每批1024行读取数据,并且一次性对整个记录整合(而不是对单条记录)应用操作,提升了像过滤, 联合, 聚合等等操作的性能。注意:要使用矢量化查询执行,就必须以ORC格式存储数据。

1
2
3
-- 开启矢量化查询
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;

Hive作业执行优化

explain查询计划

explain会解析HQL语句,将整个HQL语句的实现步骤、依赖关系、实现过程都会进行解析返回,可以了解一条HQL语句在底层是如何实现数据的查询及处理的过程,辅助用户对Hive进行优化。

常用语法命令如下:

1
explain [formatted|extended|dependency|authorization] query
  • formatted:对执行计划进行格式化展示
  • extended:提供更详细的信息
  • dependency:以json格式返回查询所依赖的表和分区列表
  • authorization:列出需要被授权的条目,包括输入和输出

Hive中每个查询计划由以下几个部分组成:

  1. 抽象语法树(AST):Hive使用Antlr解析生成器,可以自动地将HQL生成为抽象语法树
  2. Stage依赖关系:会列出运行查询划分的stage阶段以及之间的依赖关系
  3. Stage内容:包含了每个stage非常重要的信息,比如运行时的operator和sort orders等具体的信息

MapReduce属性优化

MapReduce属性优化包括多个方面,如本地模式,JVM重用和并行执行等。

本地模式指的是直接在本地计算,允许程序不提交给Yarn。这是指在Hive的过程中,有一些数据量不大的表也会转换成MapReduce进行处理。如果提交到集群,需要申请资源,等待分配,最后再运行的一系列流程,比较繁琐。而本身数据量不大,导致整体效率较低。而本地计算模式,允许程序不提交给Yarn,直接在本地运行以便提高小数据量程序的性能。

1
2
--开启本地模式
set hive.exec.mode.local.auto = true;

Hadoop默认会为每个Task启动一个JVM运行,而在JVM启动的时候内存开销较大。Job数据量大的情况,如果单个Task数据量比较小,也会申请JVM,这就导致了资源紧张及浪费的情况;JVM重用可以使得JVM实例在同一个job中重新使用N次,当一个Task运行结束以后,JVM不会进行释放,而是继续供下一个Task运行,直到运行了N个Task以后,就会释放;N的值可以在Hadoop的mapred-site.xml文件中进行配置,通常在10-20之间。

1
2
3
-- Hadoop3之前的配置,在mapred-site.xml中添加以下参数
-- Hadoop3中已不再支持该选项
mapreduce.job.jvm.numtasks=10

Hive在实现HQL计算运行时,会解析为多个Stage,有时候Stage彼此之间有依赖关系,只能挨个执行,但是在一些别的场景下,很多的Stage之间是没有依赖关系的,例如Union语句,Join语句等等,这些Stage没有依赖关系,但是Hive依旧默认挨个执行每个Stage,这样会导致性能非常差,我们可以通过修改参数,开启并行执行,当多个Stage之间没有依赖关系时,允许多个Stage并行执行,提高性能。

1
2
3
4
-- 开启Stage并行化,默认为false
set hive.exec.parallel=true;
-- 指定并行化线程数,默认为8
set hive.exec.parallel.thread.number=16;

join优化

在Hive中,join的底层通过MapReduce来实现。而为了提高MapReduce的性能,Hive中提供了多种join的方案,例如适合小表join大表的map join;大表join大表的reduce join;大表join的优化方案bucket join等

map join适合的场景是小表join大表或者小表join小表。map join会在每个MapTask的内存中都存放一份完整的小表数据,而大表数据通过多个MapTask并行处理,大表的每个部分都可以与小表的完整数据进行join。由于这种方式不需要启动ReduceTask,所以底层不需要经过shuffle,但是需要占用内存空间来存放较小的数据文件。

在Hive中默认开启了map join,会尽量使用map join来实现join,即能够使用map join的join都使用map join。hive.auto.convert.join=true。在Hive中对于小表的大小限制如下:

1
2
3
4
-- 2.0版本之前的控制属性
hive.mapjoin.smalltable.filesize=25M
-- 2.0版本开始由以下参数控制
hive.auto.convert.join.noconditionaltask.size=512000000

reduce join的场景是没法使用map join的大表join大表情况。这时候由于内存中无法存放某张大表的所有数据,就没法使用map join。reduce join指的是在Reduce阶段完成join。两张表按照关联字段进行分组,然后通过shuffle过程进入到同一个Reduce Task完成join操作

可以看到上面的reduce join会经过shuffle阶段,因此效率并不是很高。Hive还提供bucket join对大表join大表的情况进行优化。这就是前面提到的分桶表。我们首先将两张表都按照相同的字段进行分桶,然后分桶值对应的数据之间进行join,减少了比较次数,提高了性能。

使用bucket join的前提是开启了对应的功能,同时要求分桶字段=join字段,并且两个表桶个数相等或者成倍数(这样才能根据hash规则得到的余数找到对应规则)

1
2
--开启bucket join
set hive.optimize.bucketmapjoin = true

bucket join又分成两个层次,第一个就是基于分桶的字段进行排序,即是上面提到的。另一个层次更进一步,基于有序的数据进行join,全称为Sort Merge Bucker Join(SMB)。这要求开启下面的配置,同时分桶字段=排序字段=join字段,桶的个数相等或者成倍数

1
2
3
4
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;

优化器

当一个程序中如果有一些操作彼此之间有关联性,是可以在一个MapReduce中实现的,但是Hive会不智能的选择,Hive会使用两个MapReduce来完成这两个操作。例如:当我们执行 select …… from table group by id order by id desc。该SQL语句转换为MapReduce时,我们可以有两种方案来实现:

  1. 第一个MapReduce做group by,经过shuffle阶段对id做分组;第二个MapReduce对第一个MapReduce的结果做order by,经过shuffle阶段对id进行排序
  2. 因为都是对id处理,可以使用一个MapReduce的shuffle既可以做分组也可以排序

显然第二种方式性能较高,但是Hive默认选择第一种方案实现。在Hive中可以开启关联优化,对有关联关系的操作进行解析的时候,可以尽量放在同一个MapReduce任务中实现,配置开启如下:

1
set hive.optimize.correlation=true;

Hive中的优化器引擎有RBO和CBO:

  • RBO,Rule Basic Optimizer:是基于规则的优化器,根据设定好的规则对程序进行优化
  • CBO,Cost Basic Optimizer:是基于代价的优化器,根据不同场景所需要付出的代价来合适选择优化的方案,对数据的分布的信息【数值出现的次数,条数,分布】来综合判断用哪种处理的方案是最佳方案

Hive中默认使用RBO优化器引擎,但是我们也可以配置底层的优化器引擎为CBO引擎:

1
2
3
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;

CBO引擎通过Analyze分析器来辅助判断每种方案的计算代价。Analyze分析器用于提前运行一个MapReduce程序将表或者分区的信息构建一些元数据【表的信息、分区信息、列的信息】,搭配CBO引擎一起使用

1
2
3
4
5
6
7
8
9
10
11
12
-- 构建分区信息元数据
analyze table tablename
[partition(partcol1[=val1], partcol2[=val2], ...)]
compute statistics [noscan];

-- 构建列的元数据
analyze table tablename
[partition(partcol1[=val1], partcol2[=val2], ...)]
compute statistics for columns ( columns name1, columns name2...) [noscan];

-- 查看元数据
desc formatted [tablename] [columnname];

谓词下推 PPD

谓词下推Predicate Pushdown(PPD)基本思想:将过滤表达式尽可能移动至靠近数据源的位置,以使真正执行时能直接跳过无关的数据。简单点说就是在不影响最终结果的情况下,尽量将过滤条件提前执行。Hive中谓词下推后,过滤条件会下推到map端,提前执行过滤,减少map到reduce的传输数据,提升整体性能。开启参数如下,默认是开启状态:

1
set hive.optimize.ppd = true

数据倾斜

分布式计算中最常见,最容易遇到的问题就是数据倾斜。数据倾斜指的是分布式程序中大多数的Task已经运行结束了,而某一个Task一直在运行,由于数据量过大而始终无法结束。导致数据倾斜的原因一般都是数据分配的问题。

group by就是一种很有可能导致数据倾斜的操作。如果数据本身是倾斜的,按照MapReduce中Hash分区规则之后,肯定会出现数据倾斜的问题。根本原因是因为分区规则导致的,所以可以通过以下几种方案来解决group by导致的数据倾斜的问题。

方案一是开启Map端聚合。这指的是通过减少shuffle数据量和Reduce计算的执行时间,避免每个Task数差异过大导致数据倾斜。

1
set hive.map.aggr=true;

方案二是实现随机分区

1
2
3
select * from table distribute by rand();
--distribute by用于指定底层按照哪个字段作为Key实现分区
--通过rank函数随机值实现随机分区,避免数据倾斜

方案三是数据倾斜之后进行自动负载均衡。开启这个参数之后,当前程序会自动通过两个MapReduce程序来完成。第一个MapReduce将数据自动随机分布到Reducer中,每个Reducer做部分聚合操作输出结果;第二个MapReduce将上一步聚合的结果再按照业务进行处理,保证相同的分布到一起,最终聚合得到结果

1
set hive.groupby.skewindata=true;

join操作是另外一个很容易导致数据倾斜的操作。join操作时,如果两张表比较大,无法实现map Join,只能走reduce Join,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题;面对join产生的数据倾斜,核心的思想是尽量避免reduce Join的产生,优先使用map Join来实现;但往往很多的join场景不满足map Join的需求,那么可以以下几种方案来解决join产生的数据倾斜问题。

方案一是提前过滤。提前过滤即将大表数据变成小表数据,实现map join。

方案二是使用bucket join。前面反复提到通过分桶表可以优化join的执行效率,避免数据倾斜。

方案三是使用skew join。skew join是Hive中一种专门为了避免数据倾斜而设计的特殊的join过程。这种join的原理是将map join和reduce join进行合并,如果某个值出现了数据倾斜,就会将产生数据倾斜的数据单独使用map join来实现。其他没有产生数据倾斜的数据由reduce join来实现,这样就避免了reduce join中产生数据倾斜的问题。最终将map join的结果和reduce join的结果进行union合并。配置如下:

1
2
3
4
5
6
7
8
9
10
-- 开启运行过程中skewjoin
set hive.optimize.skewjoin=true;
-- 如果这个key的出现的次数超过这个范围
set hive.skewjoin.key=100000;
-- 在编译时判断是否会产生数据倾斜
set hive.optimize.skewjoin.compiletime=true;
-- 不合并,提升性能
set hive.optimize.union.remove=true;
-- 如果Hive的底层走的是MapReduce,必须开启这个属性,才能实现不合并
set mapreduce.input.fileinputformat.input.dir.recursive=true;

Fetch抓取

Hive会将我们写的SQL语句转换成MapReduce程序,但是在某些情况下,Hive的查询没有必要使用MapReduce程序。这就是Fectch抓取。类似于语句select * from table_name,只需要简单读取table_name对应存储目录下的文件,然后输出查询结果即可。不过只有一些简单的查询可以被优化,不能包括子查询、聚合查询、侧视图、join等复杂操作。

控制该行为的参数是hive.fetch.task.conversion。字段描述如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
<property>
<name>hive.fetch.task.conversion</name>
<value>more</value>
<description>
Expects one of [none, minimal, more].
Some select queries can be converted to single FETCH task minimizing latency.
Currently the query should be single sourced not having any subquery and should not have
any aggregations or distincts (which incurs RS), lateral views and joins.
0. none : disable hive.fetch.task.conversion
1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
2. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
</description>
</property>
  • 若属性为null,则不使用fetch优化,任何语句都会转化成MapReduce程序
  • 若属性为minimal,则只有select *语句,在分区列上的过滤语句(包括havingwhere),和limit关键字可以被优化成fetch任务
  • 若属性为more,则简单的select语句(可以全表查询、特定列查询、或者函数表达式等,包括UDF函数,但是表生成函数UDTF还不支持)、过滤语句(wherehaving)、和limit关键字可以被优化成fetch任务

在Hive 0.10.0版本开始加入该属性,默认值为minmal;Hive 0.14.0之后,默认值为more


Hive学习笔记-Hive优化(1)-Hive性能优化
http://example.com/2022/07/05/Hive学习笔记-Hive优化-1-Hive性能优化/
作者
EverNorif
发布于
2022年7月5日
许可协议