Hive学习笔记-Hive优化(1)-Hive性能优化
Hive分区表,分桶表和索引
Hive的设计思想是通过元数据解析描述后,将HDFS上的文件映射成表。当用户通过HQL语句对Hive中的表进行复杂数据处理和计算时,默认将其转换成分布式计算MapReduce程序,然后读取HDFS中的数据。
在执行查询计划的时候,Hive会使用表的最后一级目录作为底层处理数据的输入。如果没有特殊处理,最后一级目录就是表的目录,里面存放了表中的所有数据,因此这个时候就是全表扫描。通过explain
命令可以查看执行计划,其中可以看到实际读取的HDFS文件路径。
分区表会将不同分区的数据存放在单独的HDFS目录中,这样在查询的时候,根据查询条件,就可以只读取对应分区的数据作为输入,减少不必要的数据加载,提高程序的性能。
分桶表则可以解决Hive中join的问题。这点会在后续join问题部分进行深入介绍。
在Hive中还存在索引的设计。Hive允许用户为字段构建索引,提高数据的查询速率。当为某张表的某个字段创建索引时,Hive中会自动创建一张索引表,该表中记录了该字段的每个值与数据实际物理位置之间的关系,例如数据所在的HDFS文件地址,所在文件中的偏移量offset等信息。
但是索引功能的支持从Hive0.7版本开始支持,但是到Hive3.0开始不再支持。因为Hive中构建和维护索引的操作过于复杂,首先Hive构建索引的过程是通过一个MapReduce程序来实现的,并且每次Hive中原始数据表的数据发生更新的时候,索引表不会自动更新,而必须手动执行一个alter index命令来通过MapReduce再次更新索引表,导致整体性能较差,维护相对繁琐。
Hive表数据优化
文件格式
Hive数据的存储底层还是HDFS,所有的数据读写都基于HDFS文件来实现。为了提高对HDFS文件读写的性能,Hive中提供了多种文件存储格式,包括TextFile、SequenceFile、ORC、Parquet等,在建表的时候进行指定。
TextFile是Hive中默认的文件格式,存储形式为按行存储。T
- 优点:最简单的数据格式,可以直接查看,可以使用任意分隔符进行分割
- 缺点:耗费存储空间,IO性能较低。结合压缩的时候Hive不进行数据切分合并,不能进行并行操作,查询效率低。按行存储,读取列的性能差
SequenceFile是Hadoop中用来存储序列化键值对的一种文件格式,可以作为MapReduce作业的输入输出,因此Hive也支持这种格式
- 优点:以二进制KV形式存储数据,与底层交互更加友好,性能更快。可压缩、可分割、优化磁盘利用率和I/O,可并行操作数据,查询效率高
- 缺点:存储空间消耗最大,与Hadoop生态系统之外的工具不兼容
Parquet是一种支持嵌套结果的列式存储文件格式,作为大数据系统中OLAP查询的优化方案,它已经被多种查询引擎原生支持,并且部分高性能引擎将其作为默认的文件存储格式
- 优点:更高效的压缩和编码,可压缩、可分割、优化磁盘利用率和IO,可用于多种数据处理框架
- 缺点:不支持update、insert、delete、ACID
ORC(OptimizedRC File)也是一种Hadoop生态圈中的列式存储格式。最初产生自Hive,用于降低Hadoop数据存储空间和加速Hive查询速度。
- 优点:列式存储,存储效率非常高。可压缩,高效的列存取。查询效率高,支持索引,支持矢量化查询
- 缺点:加载时性能消耗较大,读取全量数据时性能较差
需要注意的是,以上的文件格式除了TextFile,都无法直接通过数据导入形成。因为load命令只会完成数据的纯移动或者复制,而不会修改数据的文件格式。正确的方式应该是在创建表的时候指定文件格式,然后利用insert+select从原始表中查询后插入指定了文件格式的表中。
数据压缩
Hive压缩实际上说的就是MapReduce的压缩,可以指定不同阶段的压缩算法。Hadoop中支持的压缩在Hive中都可以直接使用。
在Hive中使用压缩,需要对MapReduce和Hive进行相应的配置
1 |
|
存储优化
存储优化的第一个问题是小文件的问题。
Hive的存储本质是HDFS,而HDFS并不利于小文件的存储。在使用Hive进行处理分析的时候,要尽量避免小文件的生成。Hive中提供了一个特殊的机制,可以自动判断是否是小文件,如果是小文件可以自动将小文件进行合并
1 |
|
上面解决的情况是输出为小文件的情况,而Hive中也提供一种输入类CombineHiveInputFormat
用于解决输入是小文件的情况,它将小文件进行合并之后再进行处理
1 |
|
存储优化的第二个问题是ORC的文件索引。
在使用ORC文件时,为了加快读取ORC文件中的数据内容,ORC提供了两种索引机制:Row Group Index 和 Bloom Filter Index,可以帮助提高查询ORC文件的性能。当用户写入数据时,可以指定构建索引,当用户查询数据时,可以根据索引提前对数据进行过滤,避免不必要的数据扫描。
一个ORC文件会包含一个或者多个stripes,可以理解为原始数据的一部分。每个stripe中,包含了其中数据的每个column的min/max。当查询中有大小判断的操作时,可以根据最大最小值,跳过一定不包含结果的stripe。这里为每个stripe建立的包含min/max值的索引,就称为Row
Group
Index。建立ORC格式表的时候,指定表参数orc.create.index=true
之后就会建立Row
Group
Index。而为了使得该索引有效利用,向表中加载数据的时候,必须对需要使用索引的字段进行排序
1 |
|
Bloom Filter
Index即为字段建立布隆过滤器的数据结构。当查询条件中包含对该字段的等值过滤时,可以通过布隆过滤器判断该stripe中是否存在,如果返回不存在则直接跳过该stripe。建表的时候通过指定表参数orc.bloom.filter.columns=columnName...
来指定为哪些字段建立布隆过滤器索引
1 |
|
Hive的默认查询执行引擎一次处理一行,而矢量化查询执行是一种Hive针对ORC文件操作的特性,目的是按照每批1024行读取数据,并且一次性对整个记录整合(而不是对单条记录)应用操作,提升了像过滤, 联合, 聚合等等操作的性能。注意:要使用矢量化查询执行,就必须以ORC格式存储数据。
1 |
|
Hive作业执行优化
explain查询计划
explain会解析HQL语句,将整个HQL语句的实现步骤、依赖关系、实现过程都会进行解析返回,可以了解一条HQL语句在底层是如何实现数据的查询及处理的过程,辅助用户对Hive进行优化。
常用语法命令如下:
1 |
|
- formatted:对执行计划进行格式化展示
- extended:提供更详细的信息
- dependency:以json格式返回查询所依赖的表和分区列表
- authorization:列出需要被授权的条目,包括输入和输出
Hive中每个查询计划由以下几个部分组成:
- 抽象语法树(AST):Hive使用Antlr解析生成器,可以自动地将HQL生成为抽象语法树
- Stage依赖关系:会列出运行查询划分的stage阶段以及之间的依赖关系
- Stage内容:包含了每个stage非常重要的信息,比如运行时的operator和sort orders等具体的信息
MapReduce属性优化
MapReduce属性优化包括多个方面,如本地模式,JVM重用和并行执行等。
本地模式指的是直接在本地计算,允许程序不提交给Yarn。这是指在Hive的过程中,有一些数据量不大的表也会转换成MapReduce进行处理。如果提交到集群,需要申请资源,等待分配,最后再运行的一系列流程,比较繁琐。而本身数据量不大,导致整体效率较低。而本地计算模式,允许程序不提交给Yarn,直接在本地运行以便提高小数据量程序的性能。
1 |
|
Hadoop默认会为每个Task启动一个JVM运行,而在JVM启动的时候内存开销较大。Job数据量大的情况,如果单个Task数据量比较小,也会申请JVM,这就导致了资源紧张及浪费的情况;JVM重用可以使得JVM实例在同一个job中重新使用N次,当一个Task运行结束以后,JVM不会进行释放,而是继续供下一个Task运行,直到运行了N个Task以后,就会释放;N的值可以在Hadoop的mapred-site.xml文件中进行配置,通常在10-20之间。
1 |
|
Hive在实现HQL计算运行时,会解析为多个Stage,有时候Stage彼此之间有依赖关系,只能挨个执行,但是在一些别的场景下,很多的Stage之间是没有依赖关系的,例如Union语句,Join语句等等,这些Stage没有依赖关系,但是Hive依旧默认挨个执行每个Stage,这样会导致性能非常差,我们可以通过修改参数,开启并行执行,当多个Stage之间没有依赖关系时,允许多个Stage并行执行,提高性能。
1 |
|
join优化
在Hive中,join的底层通过MapReduce来实现。而为了提高MapReduce的性能,Hive中提供了多种join的方案,例如适合小表join大表的map join;大表join大表的reduce join;大表join的优化方案bucket join等
map join适合的场景是小表join大表或者小表join小表。map join会在每个MapTask的内存中都存放一份完整的小表数据,而大表数据通过多个MapTask并行处理,大表的每个部分都可以与小表的完整数据进行join。由于这种方式不需要启动ReduceTask,所以底层不需要经过shuffle,但是需要占用内存空间来存放较小的数据文件。
在Hive中默认开启了map join,会尽量使用map
join来实现join,即能够使用map join的join都使用map
join。hive.auto.convert.join=true
。在Hive中对于小表的大小限制如下:
1 |
|
reduce join的场景是没法使用map join的大表join大表情况。这时候由于内存中无法存放某张大表的所有数据,就没法使用map join。reduce join指的是在Reduce阶段完成join。两张表按照关联字段进行分组,然后通过shuffle过程进入到同一个Reduce Task完成join操作
可以看到上面的reduce join会经过shuffle阶段,因此效率并不是很高。Hive还提供bucket join对大表join大表的情况进行优化。这就是前面提到的分桶表。我们首先将两张表都按照相同的字段进行分桶,然后分桶值对应的数据之间进行join,减少了比较次数,提高了性能。
使用bucket join的前提是开启了对应的功能,同时要求分桶字段=join字段,并且两个表桶个数相等或者成倍数(这样才能根据hash规则得到的余数找到对应规则)
1 |
|
bucket join又分成两个层次,第一个就是基于分桶的字段进行排序,即是上面提到的。另一个层次更进一步,基于有序的数据进行join,全称为Sort Merge Bucker Join(SMB)。这要求开启下面的配置,同时分桶字段=排序字段=join字段,桶的个数相等或者成倍数
1 |
|
优化器
当一个程序中如果有一些操作彼此之间有关联性,是可以在一个MapReduce中实现的,但是Hive会不智能的选择,Hive会使用两个MapReduce来完成这两个操作。例如:当我们执行
select …… from table group by id order by id desc
。该SQL语句转换为MapReduce时,我们可以有两种方案来实现:
- 第一个MapReduce做group by,经过shuffle阶段对id做分组;第二个MapReduce对第一个MapReduce的结果做order by,经过shuffle阶段对id进行排序
- 因为都是对id处理,可以使用一个MapReduce的shuffle既可以做分组也可以排序
显然第二种方式性能较高,但是Hive默认选择第一种方案实现。在Hive中可以开启关联优化,对有关联关系的操作进行解析的时候,可以尽量放在同一个MapReduce任务中实现,配置开启如下:
1 |
|
Hive中的优化器引擎有RBO和CBO:
- RBO,Rule Basic Optimizer:是基于规则的优化器,根据设定好的规则对程序进行优化
- CBO,Cost Basic Optimizer:是基于代价的优化器,根据不同场景所需要付出的代价来合适选择优化的方案,对数据的分布的信息【数值出现的次数,条数,分布】来综合判断用哪种处理的方案是最佳方案
Hive中默认使用RBO优化器引擎,但是我们也可以配置底层的优化器引擎为CBO引擎:
1 |
|
CBO引擎通过Analyze分析器来辅助判断每种方案的计算代价。Analyze分析器用于提前运行一个MapReduce程序将表或者分区的信息构建一些元数据【表的信息、分区信息、列的信息】,搭配CBO引擎一起使用
1 |
|
谓词下推 PPD
谓词下推Predicate Pushdown(PPD)基本思想:将过滤表达式尽可能移动至靠近数据源的位置,以使真正执行时能直接跳过无关的数据。简单点说就是在不影响最终结果的情况下,尽量将过滤条件提前执行。Hive中谓词下推后,过滤条件会下推到map端,提前执行过滤,减少map到reduce的传输数据,提升整体性能。开启参数如下,默认是开启状态:
1 |
|
数据倾斜
分布式计算中最常见,最容易遇到的问题就是数据倾斜。数据倾斜指的是分布式程序中大多数的Task已经运行结束了,而某一个Task一直在运行,由于数据量过大而始终无法结束。导致数据倾斜的原因一般都是数据分配的问题。
group by就是一种很有可能导致数据倾斜的操作。如果数据本身是倾斜的,按照MapReduce中Hash分区规则之后,肯定会出现数据倾斜的问题。根本原因是因为分区规则导致的,所以可以通过以下几种方案来解决group by导致的数据倾斜的问题。
方案一是开启Map端聚合。这指的是通过减少shuffle数据量和Reduce计算的执行时间,避免每个Task数差异过大导致数据倾斜。
1 |
|
方案二是实现随机分区。
1 |
|
方案三是数据倾斜之后进行自动负载均衡。开启这个参数之后,当前程序会自动通过两个MapReduce程序来完成。第一个MapReduce将数据自动随机分布到Reducer中,每个Reducer做部分聚合操作输出结果;第二个MapReduce将上一步聚合的结果再按照业务进行处理,保证相同的分布到一起,最终聚合得到结果
1 |
|
join操作是另外一个很容易导致数据倾斜的操作。join操作时,如果两张表比较大,无法实现map Join,只能走reduce Join,那么当关联字段中某一种值过多的时候依旧会导致数据倾斜的问题;面对join产生的数据倾斜,核心的思想是尽量避免reduce Join的产生,优先使用map Join来实现;但往往很多的join场景不满足map Join的需求,那么可以以下几种方案来解决join产生的数据倾斜问题。
方案一是提前过滤。提前过滤即将大表数据变成小表数据,实现map join。
方案二是使用bucket join。前面反复提到通过分桶表可以优化join的执行效率,避免数据倾斜。
方案三是使用skew join。skew join是Hive中一种专门为了避免数据倾斜而设计的特殊的join过程。这种join的原理是将map join和reduce join进行合并,如果某个值出现了数据倾斜,就会将产生数据倾斜的数据单独使用map join来实现。其他没有产生数据倾斜的数据由reduce join来实现,这样就避免了reduce join中产生数据倾斜的问题。最终将map join的结果和reduce join的结果进行union合并。配置如下:
1 |
|
Fetch抓取
Hive会将我们写的SQL语句转换成MapReduce程序,但是在某些情况下,Hive的查询没有必要使用MapReduce程序。这就是Fectch抓取。类似于语句select * from table_name
,只需要简单读取table_name
对应存储目录下的文件,然后输出查询结果即可。不过只有一些简单的查询可以被优化,不能包括子查询、聚合查询、侧视图、join等复杂操作。
控制该行为的参数是hive.fetch.task.conversion
。字段描述如下:
1 |
|
- 若属性为null,则不使用fetch优化,任何语句都会转化成MapReduce程序
- 若属性为minimal,则只有
select *
语句,在分区列上的过滤语句(包括having
和where
),和limit
关键字可以被优化成fetch任务 - 若属性为more,则简单的
select
语句(可以全表查询、特定列查询、或者函数表达式等,包括UDF函数,但是表生成函数UDTF还不支持)、过滤语句(where
或having
)、和limit
关键字可以被优化成fetch任务
在Hive 0.10.0版本开始加入该属性,默认值为minmal;Hive 0.14.0之后,默认值为more