Spark学习笔记-SparkSQL(1)-概述以及编程入门

SparkSQL概述

SparkSQL是Spark用于结构化数据处理的一个模块。SparkSQL可以简化RDD的开发,提高开发效率,并且执行效率高。我们可以在SparkSQL中使用SQL语句来完成查询,同时也可以使用SparkSQL提供的接口来完成开发。

Spark具有如下特点:

  1. 易整合:无缝地整合了SQL查询和Spark编程,我们可以很容易在Spark编程中书写SQL语句
  2. 统一的数据访问:我们可以使用相同的方式连接不同的数据源
  3. 兼容Hive:可以在已有的数据仓库上直接运行SQL或者HiveSQL
  4. 标准数据连接:允许通过JDBC或者ODBC来进行连接

SparkSQL中为我们提供了两个编程抽象,DataFrame和DataSet

DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表。DataFrame与RDD的主要区别在于,RDD中仅包括数据,而DataFrame中还包括数据schema的信息,即每一列的名称和类型等,即DataFrame携带更多的结构信息,我们可以将它当作数据库中的一张表来对待。

DataSet是在Spark1.6中添加的一个新抽象,是DataFrame的一个扩展。在RDD中我们关注数据的类型,在DataFrame中我们关注每一列的数据和类型,DataSet则融合了两者的优点,即既有每行数据的类型,又有每列数据的类型。

  • 使用样例类来定义DataSet中数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称
  • DataSet是强类型的,例如DataSet[Car], DataSet[Person]等,可以理解为表中的每一行都是一个数据类型
  • DataFrame是DataSet的一个特例,每一行的数据类型为Row,即DataFrame=DataSet[Row]。Row是一个类型,获取数据的时候需要指定顺序。

简单演示

我们可以在Spark的交互式命令行窗口中简单演示SparkSQL的使用。在前面我们执行Spark应用程序,需要首先构建上下文对象SparkContext。SparkSQL可以理解为对Spark Core的一种封装,不仅在模型上进行了封装,而且对上下文环境对象也进行了封装。我们在查询前,需要构建查询起点SparkSession,在命令行中,已经存在有该参数,名称为spark。

创建DataFrame

创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行创建;从Hive进行查询返回。这里我们使用Spark从数据源进行创建,数据源是一个json文件,其中的每一行是一个json表达式。

1
2
scala> val df = spark.read.json("data.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

展示效果如下:

1
2
3
4
5
6
7
8
scala> df.show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+

注意:如果是从内存中获取数据,Spark可以知道具体的数据类型是什么。如果是数字,则默认作为Int处理。如果是从文件中读取的数字,不能确定是什么类型,则使用bigint接收,可以与Long类型进行转换

使用SQL进行查询

我们可以在SparkSQL中直接使用SQL进行查询,但是这种风格的查询必须要有视图来进行辅助。视图分为全局视图和临时视图。

临时视图创建:

1
scala> df.createOrReplaceTempView("people")

使用SQL进行查询:

1
2
3
4
5
6
7
8
9
10
11
scala> val sqlDF = spark.sql("select * from people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

scala> sqlDF.show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+

全局视图创建:

普通临时表是Session范围内的,如果想要在应用范围内有效,应该使用全局的临时表。在访问的时候,也应该使用全路径进行访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scala> df.createGlobalTempView("people1")

scala> spark.sql("select * from global_temp.people1").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+

scala> spark.newSession().sql("select * from global_temp.people1").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+

使用DSL进行查询

DataFrame中还提供一个领域特定语言(Domain-Specific Language,DSL)来管理结构化的数据,可以在Scala、Java、Python和R中使用DSL。使用DSL则没有必要创建临时视图了。

查看DataFrame的schema信息:

1
2
3
4
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- username: string (nullable = true)

查看某一列:

1
2
3
4
5
6
7
8
scala> df.select("username").show()
+--------+
|username|
+--------+
|zhangsan|
| lisi|
| wangwu|
+--------+

对列进行运算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> df.select($"username", $"age" + 1).show()
+--------+---------+
|username|(age + 1)|
+--------+---------+
|zhangsan| 21|
| lisi| 31|
| wangwu| 41|
+--------+---------+

scala> df.select('username, 'age + 2).show()
+--------+---------+
|username|(age + 2)|
+--------+---------+
|zhangsan| 22|
| lisi| 32|
| wangwu| 42|
+--------+---------+

注意:涉及到对列进行运算的时候,每一列都必须使用$,或者采用引号表达式:单引号+字段

还有其他更多的操作,如filter,groupby等。

创建DataSet

DataSet的创建可以使用样例类序列创建,也可以使用基本类型序列创建

1
2
3
4
5
6
7
8
9
10
11
12
13
scala> case class Person(name: String, age: Long)
defined class Person

scala> val caseClassDS = Seq(Person("zhangsan", 300), Person("lisi", 400)).toDS
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> caseClassDS.show
+--------+---+
| name|age|
+--------+---+
|zhangsan|300|
| lisi|400|
+--------+---+
1
2
3
4
5
6
7
8
9
10
11
12
13
scala> val seqDS = Seq(1, 2, 3, 4, 5).toDS
seqDS: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> seqDS.show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
+-----+

三种抽象之间的关系

到目前为止,Spark中为我们提供了三种抽象,分别是RDD,DataFrame,DataSet。它们之间有相似之处,也有不同点。

相同点:

  1. 都是Spark平台下的分布式弹性数据集,为处理大型数据提供便利
  2. 都具有惰性机制,直到行动算子才会触发执行
  3. 会根据Spark的内存情况进行自动缓存
  4. 都有分区的概念
  5. DataFrame和DataSet可以使用模式匹配来获取各个字段的值和类型

不同点:

  1. RDD一般和Spark MLlib同时使用,不支持SparkSQL操作
  2. DataFrame每一行的类型固定为Row,支持SparkSQL操作
  3. DataSet和DataFrame具有完全相同的成员函数,区别在于每一行的数据类型不同

它们分别是不同程度的抽象,可以互相转换。

如果需要它们之间的相互操作,需要引入import spark.implicits._。这里的spark不是Scala的包名,而是创建的SparkSession对象的变量名称,所以需要创建SparkSession对象之后再导入。在spark-shell中无需我们手动导入,已经存在。

注意这里的spark对象不能使用var声明,因为Scala支持val修饰的对象进行导入。

RDD与 DataFrame之间的转换

RDD -> DataFrame:toDF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scala> val rdd = sc.makeRDD(List(("zhangsan", 30), ("lisi", 40)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[35] at makeRDD at <console>:24

scala> rdd.toDF.show()
+--------+---+
| _1| _2|
+--------+---+
|zhangsan| 30|
| lisi| 40|
+--------+---+

scala> rdd.toDF("username", "age").show()
+--------+---+
|username|age|
+--------+---+
|zhangsan| 30|
| lisi| 40|
+--------+---+

DataFrame -> RDD:.rdd

1
2
scala> df.rdd.collect
res22: Array[org.apache.spark.sql.Row] = Array([20,zhangsan], [30,lisi], [40,wangwu])

DataFrame和DataSet之间的转换

DataFrame -> DataSet:as

注意需要属性和样例类属性名称保持一致

1
2
3
4
5
6
7
8
scala> case class Person(name: String, age: Long)
defined class Person

scala> val df = sc.makeRDD(List(("zhangsan", 12), ("lisi", 13))).toDF("name", "age")
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> val ds = df.as[Person]
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: int

DataSet -> DataFrame:toDF

1
2
scala> ds.toDF
res32: org.apache.spark.sql.DataFrame = [name: string, age: int]

RDD与DataSet之间的转换

RDD -> DataSet:toDS

SparkSQL能够自动将包含有case类的RDD转换为DataSet。样例类定义的表的结构,样例类的属性通过反射变为表的列名。

1
2
3
4
5
6
7
8
9
10
11
12
13
scala> case class Person(name: String, age: Long)
defined class Person

scala> val dataset = sc.makeRDD(List(Person("zhangsan", 11), Person("lisi", 22))).toDS
dataset: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

scala> dataset.show
+--------+---+
| name|age|
+--------+---+
|zhangsan| 11|
| lisi| 22|
+--------+---+

DataSet -> DataFrame:.rdd

1
2
scala> dataset.rdd.collect
res29: Array[Person] = Array(Person(zhangsan,11), Person(lisi,22))

IDEA编程

使用IDEA进行SparkSQL进行编程的话,需要进行依赖的添加:

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.3</version>
</dependency>

简单准备数据环境:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
object SQL {
def main(args: Array[String]): Unit = {
// 创建SparkSQL的运行环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._

// 创建RDD, DataFrame和DataSet
val rdd = spark.sparkContext.makeRDD(List((1, "zhangsan", 30), (2, "lisi", 40)))
val df: DataFrame = rdd.toDF("id", "name", "age")
val ds:Dataset[Person] = df.as[Person]

df.show()
ds.show()

// 环境关闭
spark.close()
}

case class Person(id: Int, name: String, age: Int)
}

用户自定义函数

用户可以通过spark.udf功能添加自定义函数

UDF

UDF函数只需要进行注册即可使用。例如这里的函数就是给姓名在查询的使用加一个前缀:

1
2
3
4
5
6
7
8
// 临时视图
ds.createOrReplaceTempView("person")

// 自定义函数UDF
spark.udf.register("myUDF", (name:String) => {
"Name: " + name
})
spark.sql("select myUDF(name), age from person").show()

UDAF

UDAF指的是聚合操作。对于聚合操作来说,我们可以将它的执行过程分为两个步骤,首先进行遍历,将一些临时数据放在Buffer中,之后再对这些Buffer中的数据进行某种特殊的操作,得到聚合结果。

在SparkSQL中,实现UDAF也是类似的逻辑。我们需要继承特定的类,然后重写其中的方法,方法的总体逻辑与上面我们说的执行过程类似。这里可供继承的类有UserDefinedAggregateFunctionAggregator,其中前者是弱类型的,需要使用顺序来确定值,而后者是强类型,可以使用类型名来确定值。前者已经不推荐使用,这里只做简单介绍。下面我们就使用用户自定义聚合函数来实现计算年龄的平均值

UserDefinedAggregateFunction

自定义类继承UserDefinedAggregateFunction,实现其中的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class MyAvg_UDAF extends UserDefinedAggregateFunction {
// 输入数据的结构
override def inputSchema: StructType = {
StructType(
Array(StructField("age", LongType))
)
}

// buffer的结构
override def bufferSchema: StructType = {
StructType(
Array(
StructField("total", LongType),
StructField("count", LongType)
)
)
}

// 输出数据的数据类型
override def dataType: DataType = LongType

// 函数稳定性
override def deterministic: Boolean = true

// 缓冲区buffer初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, 0L)
buffer.update(1, 0L)
}

// 根据输入的值更新buffer数据
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer.update(0, buffer.getLong(0) + input.getLong(0))
buffer.update(1, buffer.getLong(1) + 1)
}

// 合并buffer数据,buffer2的数据合并到buffer1中
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))
}

// 计算聚合结果
override def evaluate(buffer: Row): Any = {
buffer.getLong(0) / buffer.getLong(1)
}
}

然后进行注册,在SQL语句中使用

1
2
spark.udf.register("myAvg_udaf", new MyAvg_UDAF())
spark.sql("select myAvg_udaf(age) from person").show()

Aggregator

可以看到上面的操作涉及到很多位置下标的使用,可读性不好也容易混淆。因此Spark提供了强类型的Aggregator,我们也需要自定义类来继承Aggregator,实现其中的方法。

注意这里不要导错对象了,正确路径为import org.apache.spark.sql.expression.Aggregator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 需要指定泛型
case class Buffer(var total: Long, var count: Long)

class MyAvg_Agg extends Aggregator[Long, Buffer, Long] {
// 初始值,缓冲区初始化
override def zero: Buffer = {
Buffer(0L, 0L)
}

// 根据输入来更新缓冲区
override def reduce(buffer: Buffer, in: Long): Buffer = {
buffer.total = buffer.total + in
buffer.count = buffer.count + 1
buffer
}

// 合并缓冲区
override def merge(buffer1: Buffer, buffer2: Buffer): Buffer = {
buffer1.total = buffer1.total + buffer2.total
buffer1.count = buffer1.count + buffer2.count
buffer1
}

// 计算结果
override def finish(buffer: Buffer): Long = {
buffer.total / buffer.count
}

// 缓冲区的编码操作(基本是固定格式,如果是自定义则为product,如果是系统自带则为scalaxxx)
override def bufferEncoder: Encoder[Buffer] = Encoders.product

// 输出的编码操作
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}

之后注册使用

1
2
spark.udf.register("myAvg_agg", functions.udaf(new MyAvg_Agg()))
spark.sql("select myAvg_agg(age) from person").show()

数据加载与保存

通用方式

SparkSQL提供通用方式进行数据的保存和加载,通用方式指的是使用相同的API,根据不同的参数读取和保存不同格式的数据。默认情况下,SparkSQL读取和保存的文件格式为parquet。(下面的spark都是SparkSession对象。)

通用加载:spark.read.load

1
spark.read.format("...")[.option("...")].load("...")
  • format:指定加载的数据格式,包括csv、jdbc、json、orc、parquet、textFile等
  • load:加载路径
  • option:传入相关参数

通用保存:df.write.save(df是一个DataFrame对象)

1
df.write.format("...")[.option("...")].save("...")
  • format:指定保存的数据格式
  • save:指定保存数据的路径
  • option:传入相关参数

不同格式

上面是通过使用format来实现不同文件格式的加载,Spark中也提供专门的操作符来完成这件事:

1
2
3
scala> spark.read.
csv format jdbc json load option options orc parquet schema
table text textFile
  1. json:要求读取的json文件每一行是一个json串
  2. csv:在option中配置csv文件的相关信息,如分隔符,是否包含表头等
1
2
3
4
5
spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("data/user.csv")
  1. mysql:在IDEA中通过JDBC对MySQL进行操作

先引入依赖

1
2
3
4
5
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 读取数据
//1.通用方式
spark.read.format("jdbc")
.option("url", "jdbc:mysql://xxx")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "xxx")
.option("password", "xxx")
.option("dbtable", "user_table")
.load().show

//2.使用jdbc方法
val props: Properties = new Properties()
props.setProperty("user", "xxx")
props.setProperty("password", "xxxx")
val df: DataFrame = spark.read.jdbc("jdbc:mysql://xxx", "user_table", props)
df.show
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 保存数据
//1.通用方式
ds.write
.format("jdbc")
.option("url", "jdbc:mysql://xxx")
.option("user", "xxx")
.option("password", "xxx")
.option("dbtable", "user_table")
.mode(SaveMode.Append)
.save()

//2.通过jdbc方法
val props: Properties = new Properties()
props.setProperty("user", "xxx")
props.setProperty("password", "xxx")
ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://xxx", "user_table", props)

  1. Hive

SparkSQL可以连接到部署好的Hive上,也可以使用自己的Hive元数据仓库。

如果需要连接外部Hive,应该将hive-site.xml复制到Spark的配置目录文件目录下$SPARK_HOME/conf。如果没有部署外部Hive,那么SparkSQL会在当前目录中创建自己的Hive元数据仓库,名称为metastore_db

内嵌的Hive元数据存储在derby中,默认仓库地址为$SPARK_HOME/spark-warehouse,无需任何配置,直接使用即可。

外部的Hive连接需要以下操作:

  • hive-site.xml拷贝到conf/目录下
  • 将MySQL的驱动复制到jars/目录下

之后可以运行SparkSQL CLI,bin/spark-sql可以开启一个窗口,在其中直接执行SQL语句。

当然也可以运行Spark beeline。与HiveServer2类似,也是通过连接到MetaStore服务来进行元数据的访问。这需要我们启动Spark Thrift Server。它的接口和协议与HiveServer2完全一致,可以和Hive MetaStore进行交互。

1
2
sbin/start-thriftserver.sh
bin/beeline -u jdbc:hive2://xxxx:10000 -n xxx

也可以在代码中操作Hive,首先需要导入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>

然后将hive-site.xml文件拷贝到resources目录下。

创建SparkSession对象的时候,需要添加一个参数启动Hive支持

1
2
3
4
5
6
7
//创建 SparkSession
val spark: SparkSession = SparkSession
.builder()
.enableHiveSupport()
.master("local[*]")
.appName("sql")
.getOrCreate()

默认创建数据库在本地仓库,需要通过参数修改仓库地址

1
config("spark.sql.warehouse.dir", "hdfs://xxx/xxx/xx")

Spark学习笔记-SparkSQL(1)-概述以及编程入门
http://example.com/2022/07/26/Spark学习笔记-SparkSQL-1-概述以及编程入门/
作者
EverNorif
发布于
2022年7月26日
许可协议