Spark学习笔记-SparkSQL(2)-DataFrame相关API的使用

DataFrame

在SparkSQL中,DataFrame就类似于一张关系型数据库表。我们可以通过相关API来对DataFrame进行操作,就相当于是在数据库中对表进行操作。完整的操作API可以参考官方文档:ScalaDoc - DataSet。可以看到这里的文档实际上是属于DataSet的,DataFrame可以看作是DataSet的一个特例,即DataFrame = DataSet[Row]

数据准备

为了进行后续的说明,首先我们需要进行数据准备,即构造出一个带有数据的DataFrame。DataFrame可以从现有的表文件中生成,例如csv、parquet、json等格式的数据,也可以直接从RDD对象中生成。

如果是从现有的表文件中读取数据,会使用到read函数。记得在读取的时候可以使用.option()来指定相关的参数,例如读取表头,推断类型等。

这里我们直接从RDD对象中生成,数据准备的代码如下:

1
2
3
4
5
6
7
8
9
// 创建SparkSQL的运行环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._

// 创建RDD, DataFrame和DataSet
val rdd = spark.sparkContext.makeRDD(
List((1, "A", 30), (2, "AB", 40), (3, "ABC", 50), (4, "DE", 20)))
var df: DataFrame = rdd.toDF("id", "name", "age")

我们可以将对应的结果进行展示,使用df.show()即可,得到如下的结果:

1
2
3
4
5
6
7
8
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| A| 30|
| 2| AB| 40|
| 3|ABC| 50|
| 4| DE| 20|
+---+---+---+

我们还可以打印这个表的schema,如下所示:

1
println(df.schema)

输出如下:

1
StructType(StructField(_1,IntegerType,false), StructField(_2,StringType,true), StructField(_3,IntegerType,false))

由于我们的DataFrame是直接从RDD中生成的,并没有指定列名,因此这里使用了默认的列名_1,_2,_3


这里有一个小tips:在默认情况下,我们使用IDEA进行本地Spark项目运行的时候,在控制台中会显示出很多日志信息,这是因为默认的日志配置中会输出所有INFO级别以上的日志。这些日志能够帮助我们更好地了解程序运行的情况,进行错误的排查,但是众多的日志输出也会干扰我们对实际结果的观察。

为了不打印这些日志,我们可以找到默认使用的log4j.properties。默认使用的配置文件位置在org/apache/spark/log4j-defaults.properties,我们也可以在Spark对应的安装目录中找到模板文件SPARK_HOME/conf/log4j.properties.template。将日志文件放在resources目录下,并将其中第一行有效配置log4j.rootCategory=INFO, console修改为log4j.rootCategory=ERROR, console,这样就只会打印ERROR级别以上的日志了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.sparkproject.jetty=WARN
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

# For deploying Spark ThriftServer
# SPARK-34128:Suppress undesirable TTransportException warnings involved in THRIFT-4805
log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilter
log4j.appender.console.filter.1.StringToMatch=Thrift error occurred during processing of message
log4j.appender.console.filter.1.AcceptOnMatch=false

数据查询

最基本的数据展示就是使用show()方法,它有多种重载形式。

  • show():默认只显示前20条记录
  • show(numRows: Int):传入的参数numRows表示需要显示前多少条记录
  • show(truncate: Boolean):有时候一个字段的内容可能很长,超过了20个字符。truncate表示是否最多只显示20个字符,默认值为true
  • show(numRows: Int, truncate: Boolean):前面两者的综合效果

还有其他相关的方法可以帮助我们进行部分数据的查询,获取若干行数据。这些数据会以Row或者Array[Row]的形式返回

  • first:获取第一行数据
  • head:获取第一行数据
  • head(n: Int):获取前n行数据
  • take(n: Int):获取前n行数据
  • takeAsList(n: Int):获取前n行数据,并以List的形式展现出来

limit(n: Int)方法也可以获取DataFrame的前n行记录,不过这是得到一个新的DataFrame对象。与take和head不同,limit方法并不是action操作,不会触发Job的执行。

describe方法可以帮助我们获取各个字段的统计信息,该方法接受一个或者若干个表示列名的字符串,如果不传参数,则默认处理所有的字段。结果仍然是一个DataFrame对象。统计信息包括count、mean、stddev、min、max等。

1
df.describe().show()

结果如下:

1
2
3
4
5
6
7
8
9
+-------+------------------+----+------------------+
|summary| _1| _2| _3|
+-------+------------------+----+------------------+
| count| 4| 4| 4|
| mean| 2.5|null| 35.0|
| stddev|1.2909944487358056|null|12.909944487358056|
| min| 1| A| 20|
| max| 4| DE| 50|
+-------+------------------+----+------------------+

在SQL中,最为基本的查询操作就是Select操作,在SparkSQL中也提供相关API。包括selectselectExpr。在select可以传入字符串作为需要查询的列名,也可以直接传入Column参数,此时可以进行一些特殊处理。在selectExpr中,则可以直接传入一些查询字符串,进行操作或者指定别名等,更加灵活:

1
2
3
4
5
6
7
8
// 字符串指定列名
df.select("_1", "_2").show()

// 获取Column列
df.select(df("_1"), df("_3") + 1).show()

// 使用字符串操作表达式
df.selectExpr("_1 as id", "_2 as name", "round(_3) as age").show()

还有一些方法可以获取指定的列:

  • col(colName: String):获取某一个指定的字段
  • apply(colName: String):获取某一个指定的字段,同时由于是apply方法,因此可以直接使用括号进行调用

数据处理

对于DataFrame中列的处理,除了上面的查询之外,还可以进行增加和删除。列的删除可以使用drop方法,与上面类似,它可以接受String类型的参数,也可以接受Column的参数。drop返回一个新的DataFrame对象,去除指定字段,保留其他字段。drop一次只能去除一个字段。

1
2
df.drop("_1")
df.drop(df("_2"))

在DataFrame中增加一列,使用的是withColumn方法。该方法签名为withColumn(colName: String, col: Column)。该方法可以向DataFrame中新增一列,如果colName已经存在,则会覆盖对应的列。

1
2
3
// 新增一列名称为new,值为随机值
// 需要注意引入的package路径:import org.apache.spark.sql.functions.rand
df.withColumn("new", rand())

还有一个与它名称类似的方法,为withColumnRenamed。该方法用于字段的重命名,如果指定的字段名不存在,则不进行任何操作。

1
df.withColumnRenamed("_3", "age")

关于DataFrame的数据去重,与SQL中类似,可以使用distinct方法。它默认会对全表进行去重,比较的条件是所有的字段。另外一种方式是根据指定字段进行去重,对应方法为dropDuplicates。该方法可以接收多个字符串,也可以接受字符串列表,其中可以传入需要指定去重的字段。

1
2
3
4
5
6
7
// distinct
df.distinct()

// dropDuplicates
df.dropDuplicates("_1", "_2")
df.dropDuplicates(Seq("_1", "_2"))
df.dropDuplicates(Array("_1", "_2"))

DataFrame的数据排序相关的方法为sortorderBy。这两个方法可以按照指定字段进行排序,默认为升序。如果需要降序,则只需要增加一个-即可。

1
2
3
4
5
6
7
// orderBy
df.orderBy("_3").show()
df.orderBy(- df("_3")).show()

// sort 与 orderBy 用法一致
df.sort("_3").show()
df.sort(- df("_3")).show()

union则表示两个DataFrame的简单拼接,与SQL中的union all的效果一致。同时还提供一个unionAllAPI,与union是等价的,unionALL内部就是直接调用了union

1
2
df.union(df)
df.unionAll(df)

collectcollectAsList可以获取所有数据到数组中,需要注意的就是它们的返回值不同。前者返回的是Scala中的Array,而后者返回的是Java中的List。

1
2
val rows1:Array[Row] = df.collect()
val rows2:util.List[Row]= df.collectAsList()

聚合计算

在DataFrame中可以使用聚合函数来进行字段的聚合计算,例如计算某个字段的最值。在聚合函数中指定需要计算的最值。当然除了最值之外,还可以使用其他的相关聚合函数。

1
2
3
4
5
// 最值计算
// package: import org.apache.spark.sql.functions.{min, max}
val row: Row = df.agg(min("_3"), max("_3")).head()
val minValue = row.getInt(0)
val maxValue = row.getInt(1)

agg本身返回的是一个DataFrame,只不过这个DataFrame只有一行。

1
df.agg("_1" -> "mean", "_2" -> "max", "_3" -> "sum").show()

上面操作会打印出下面的内容:

1
2
3
4
5
+-------+-------+-------+
|avg(_1)|max(_2)|sum(_3)|
+-------+-------+-------+
| 2.5| DE| 140|
+-------+-------+-------+

聚合函数也通常与groupBy进行连用,与SQL的表达类似,这里主要关注生成的数据格式(为了测试需要,这里临时在原始DataFrame的数据中将第一行数据重复了三遍)

1
df.groupBy("_2").count().show()

输出如下:

1
2
3
4
5
6
7
| _2|count|
+---+-----+
| DE| 1|
| A| 3|
| AB| 1|
|ABC| 1|
+---+-----+

条件过滤

在DataFrame中,可以使用filter或者where来进行条件过滤,过滤掉相关的行。过滤之后仍然返回一个DataFrame。

1
2
3
4
5
6
7
// filter
df.filter(df.col("_3").geq(25) && df.col("_3").leq(45)).show()
df.filter("_3 >= 25 and _3 <= 45").show()

// where
df.where(df.col("_3").geq(25) && df.col("_3").leq(45)).show()
df.where("_3 >= 25 and _3 <= 45").show()

得到如下结果:

1
2
3
4
5
6
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1| A| 30|
| 2| AB| 40|
+---+---+---+

连接处理

在SQL语言中,非常常用的操作就是join操作,在DataFrame中同样提供,下面主要介绍join API的使用方式。为了演示操作,这里创建了一个新的DataFrame

1
2
3
4
val rdd2 = spark.sparkContext.makeRDD(
List((1, "A"), (2, "AB"))
)
val df2: DataFrame = rdd2.toDF()

join的相关使用如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 笛卡尔积
df.join(df2).show()

// using某一个字段
df.join(df2, "_1").show()

// using多个字段
df.join(df2, Seq("_1", "_2")).show()

// 更加灵活的条件指定
df.join(df2, df("_1") === df("_2")).show()
df.join(df2, df("_1") =!= df("_2")).show()

// 指定join类型
df.join(df2, Seq("_1"), "right").show()
df.join(df2, df("_1") === df2("_1"), "right").show()

具体的结果这里就不再进行展示了,都是非常符合直觉的结果。其中在灵活的条件指定中,我们使用的是Column之间的比较。Column提供===来表示相等,对应的=!=表示不相等。在指定join类型的时候,表示join类型的字符串只能是第三个参数,同时必须是下面这些值之一:inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti, left_anti。在需要使用join类型的时候,一定要认真观察它提供给我们的方法签名。

参考文章

  1. Spark-SQL之DataFrame操作大全
  2. Spark-ScalaDoc-Dataset

Spark学习笔记-SparkSQL(2)-DataFrame相关API的使用
http://example.com/2023/03/19/Spark学习笔记-SparkSQL-2-DataFrame相关API的使用/
作者
EverNorif
发布于
2023年3月19日
许可协议