Spark学习笔记-SparkSQL(2)-DataFrame相关API的使用
DataFrame
在SparkSQL中,DataFrame就类似于一张关系型数据库表。我们可以通过相关API来对DataFrame进行操作,就相当于是在数据库中对表进行操作。完整的操作API可以参考官方文档:ScalaDoc
-
DataSet。可以看到这里的文档实际上是属于DataSet的,DataFrame可以看作是DataSet的一个特例,即DataFrame = DataSet[Row]
。
数据准备
为了进行后续的说明,首先我们需要进行数据准备,即构造出一个带有数据的DataFrame。DataFrame可以从现有的表文件中生成,例如csv、parquet、json等格式的数据,也可以直接从RDD对象中生成。
如果是从现有的表文件中读取数据,会使用到
read
函数。记得在读取的时候可以使用.option()
来指定相关的参数,例如读取表头,推断类型等。
这里我们直接从RDD对象中生成,数据准备的代码如下:
1 |
|
我们可以将对应的结果进行展示,使用df.show()
即可,得到如下的结果:
1 |
|
我们还可以打印这个表的schema,如下所示:
1 |
|
输出如下:
1 |
|
由于我们的DataFrame是直接从RDD中生成的,并没有指定列名,因此这里使用了默认的列名_1,_2,_3
。
这里有一个小tips:在默认情况下,我们使用IDEA进行本地Spark项目运行的时候,在控制台中会显示出很多日志信息,这是因为默认的日志配置中会输出所有INFO级别以上的日志。这些日志能够帮助我们更好地了解程序运行的情况,进行错误的排查,但是众多的日志输出也会干扰我们对实际结果的观察。
为了不打印这些日志,我们可以找到默认使用的log4j.properties
。默认使用的配置文件位置在org/apache/spark/log4j-defaults.properties
,我们也可以在Spark对应的安装目录中找到模板文件SPARK_HOME/conf/log4j.properties.template
。将日志文件放在resources目录下,并将其中第一行有效配置log4j.rootCategory=INFO, console
修改为log4j.rootCategory=ERROR, console
,这样就只会打印ERROR级别以上的日志了。
1 |
|
数据查询
最基本的数据展示就是使用show()
方法,它有多种重载形式。
show()
:默认只显示前20条记录show(numRows: Int)
:传入的参数numRows表示需要显示前多少条记录show(truncate: Boolean)
:有时候一个字段的内容可能很长,超过了20个字符。truncate表示是否最多只显示20个字符,默认值为trueshow(numRows: Int, truncate: Boolean)
:前面两者的综合效果
还有其他相关的方法可以帮助我们进行部分数据的查询,获取若干行数据。这些数据会以Row
或者Array[Row]
的形式返回
first
:获取第一行数据head
:获取第一行数据head(n: Int)
:获取前n行数据take(n: Int)
:获取前n行数据takeAsList(n: Int)
:获取前n行数据,并以List的形式展现出来
limit(n: Int)
方法也可以获取DataFrame的前n行记录,不过这是得到一个新的DataFrame对象。与take和head不同,limit方法并不是action操作,不会触发Job的执行。
describe
方法可以帮助我们获取各个字段的统计信息,该方法接受一个或者若干个表示列名的字符串,如果不传参数,则默认处理所有的字段。结果仍然是一个DataFrame对象。统计信息包括count、mean、stddev、min、max等。
1 |
|
结果如下:
1 |
|
在SQL中,最为基本的查询操作就是Select操作,在SparkSQL中也提供相关API。包括select
与selectExpr
。在select可以传入字符串作为需要查询的列名,也可以直接传入Column参数,此时可以进行一些特殊处理。在selectExpr中,则可以直接传入一些查询字符串,进行操作或者指定别名等,更加灵活:
1 |
|
还有一些方法可以获取指定的列:
col(colName: String)
:获取某一个指定的字段apply(colName: String)
:获取某一个指定的字段,同时由于是apply方法,因此可以直接使用括号进行调用
数据处理
对于DataFrame中列的处理,除了上面的查询之外,还可以进行增加和删除。列的删除可以使用drop方法,与上面类似,它可以接受String类型的参数,也可以接受Column的参数。drop返回一个新的DataFrame对象,去除指定字段,保留其他字段。drop一次只能去除一个字段。
1 |
|
在DataFrame中增加一列,使用的是withColumn
方法。该方法签名为withColumn(colName: String, col: Column)
。该方法可以向DataFrame中新增一列,如果colName已经存在,则会覆盖对应的列。
1 |
|
还有一个与它名称类似的方法,为withColumnRenamed
。该方法用于字段的重命名,如果指定的字段名不存在,则不进行任何操作。
1 |
|
关于DataFrame的数据去重,与SQL中类似,可以使用distinct
方法。它默认会对全表进行去重,比较的条件是所有的字段。另外一种方式是根据指定字段进行去重,对应方法为dropDuplicates
。该方法可以接收多个字符串,也可以接受字符串列表,其中可以传入需要指定去重的字段。
1 |
|
DataFrame的数据排序相关的方法为sort
和orderBy
。这两个方法可以按照指定字段进行排序,默认为升序。如果需要降序,则只需要增加一个-
即可。
1 |
|
union
则表示两个DataFrame的简单拼接,与SQL中的union
all的效果一致。同时还提供一个unionAll
API,与union
是等价的,unionALL
内部就是直接调用了union
。
1 |
|
collect
和collectAsList
可以获取所有数据到数组中,需要注意的就是它们的返回值不同。前者返回的是Scala中的Array,而后者返回的是Java中的List。
1 |
|
聚合计算
在DataFrame中可以使用聚合函数来进行字段的聚合计算,例如计算某个字段的最值。在聚合函数中指定需要计算的最值。当然除了最值之外,还可以使用其他的相关聚合函数。
1 |
|
agg
本身返回的是一个DataFrame,只不过这个DataFrame只有一行。
1 |
|
上面操作会打印出下面的内容:
1 |
|
聚合函数也通常与groupBy进行连用,与SQL的表达类似,这里主要关注生成的数据格式(为了测试需要,这里临时在原始DataFrame的数据中将第一行数据重复了三遍)
1 |
|
输出如下:
1 |
|
条件过滤
在DataFrame中,可以使用filter或者where来进行条件过滤,过滤掉相关的行。过滤之后仍然返回一个DataFrame。
1 |
|
得到如下结果:
1 |
|
连接处理
在SQL语言中,非常常用的操作就是join操作,在DataFrame中同样提供,下面主要介绍join API的使用方式。为了演示操作,这里创建了一个新的DataFrame
1 |
|
join的相关使用如下:
1 |
|
具体的结果这里就不再进行展示了,都是非常符合直觉的结果。其中在灵活的条件指定中,我们使用的是Column之间的比较。Column提供===
来表示相等,对应的=!=
表示不相等。在指定join类型的时候,表示join类型的字符串只能是第三个参数,同时必须是下面这些值之一:inner,
cross, outer, full, fullouter, full_outer, left, leftouter, left_outer,
right, rightouter, right_outer, semi, leftsemi, left_semi, anti,
leftanti,
left_anti。在需要使用join类型的时候,一定要认真观察它提供给我们的方法签名。