本文最后更新于:2022-07-26T12:43:50+08:00
SparkSQL概述
SparkSQL是Spark用于结构化数据处理的一个模块。SparkSQL可以简化RDD的开发,提高开发效率,并且执行效率高。我们可以在SparkSQL中使用SQL语句来完成查询,同时也可以使用SparkSQL提供的接口来完成开发。
Spark具有如下特点:
- 易整合:无缝地整合了SQL查询和Spark编程,我们可以很容易在Spark编程中书写SQL语句
- 统一的数据访问:我们可以使用相同的方式连接不同的数据源
- 兼容Hive:可以在已有的数据仓库上直接运行SQL或者HiveSQL
- 标准数据连接:允许通过JDBC或者ODBC来进行连接
SparkSQL中为我们提供了两个编程抽象,DataFrame和DataSet
DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表。DataFrame与RDD的主要区别在于,RDD中仅包括数据,而DataFrame中还包括数据schema的信息,即每一列的名称和类型等,即DataFrame携带更多的结构信息,我们可以将它当作数据库中的一张表来对待。
DataSet是在Spark1.6中添加的一个新抽象,是DataFrame的一个扩展。在RDD中我们关注数据的类型,在DataFrame中我们关注每一列的数据和类型,DataSet则融合了两者的优点,即既有每行数据的类型,又有每列数据的类型。
- 使用样例类来定义DataSet中数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称
- DataSet是强类型的,例如
DataSet[Car], DataSet[Person]
等,可以理解为表中的每一行都是一个数据类型
- DataFrame是DataSet的一个特例,每一行的数据类型为
Row
,即DataFrame=DataSet[Row]
。Row是一个类型,获取数据的时候需要指定顺序。
简单演示
我们可以在Spark的交互式命令行窗口中简单演示SparkSQL的使用。在前面我们执行Spark应用程序,需要首先构建上下文对象SparkContext。SparkSQL可以理解为对Spark
Core的一种封装,不仅在模型上进行了封装,而且对上下文环境对象也进行了封装。我们在查询前,需要构建查询起点SparkSession
,在命令行中,已经存在有该参数,名称为spark。
创建DataFrame
创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行创建;从Hive进行查询返回。这里我们使用Spark从数据源进行创建,数据源是一个json文件,其中的每一行是一个json表达式。
1 2
| scala> val df = spark.read.json("data.json") df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
|
展示效果如下:
1 2 3 4 5 6 7 8
| scala> df.show() +---+--------+ |age|username| +---+--------+ | 20|zhangsan| | 30| lisi| | 40| wangwu| +---+--------+
|
注意:如果是从内存中获取数据,Spark可以知道具体的数据类型是什么。如果是数字,则默认作为Int处理。如果是从文件中读取的数字,不能确定是什么类型,则使用bigint接收,可以与Long类型进行转换
使用SQL进行查询
我们可以在SparkSQL中直接使用SQL进行查询,但是这种风格的查询必须要有视图来进行辅助。视图分为全局视图和临时视图。
临时视图创建:
1
| scala> df.createOrReplaceTempView("people")
|
使用SQL进行查询:
1 2 3 4 5 6 7 8 9 10 11
| scala> val sqlDF = spark.sql("select * from people") sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
scala> sqlDF.show() +---+--------+ |age|username| +---+--------+ | 20|zhangsan| | 30| lisi| | 40| wangwu| +---+--------+
|
全局视图创建:
普通临时表是Session范围内的,如果想要在应用范围内有效,应该使用全局的临时表。在访问的时候,也应该使用全路径进行访问。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| scala> df.createGlobalTempView("people1")
scala> spark.sql("select * from global_temp.people1").show() +---+--------+ |age|username| +---+--------+ | 20|zhangsan| | 30| lisi| | 40| wangwu| +---+--------+
scala> spark.newSession().sql("select * from global_temp.people1").show() +---+--------+ |age|username| +---+--------+ | 20|zhangsan| | 30| lisi| | 40| wangwu| +---+--------+
|
使用DSL进行查询
DataFrame中还提供一个领域特定语言(Domain-Specific
Language,DSL)来管理结构化的数据,可以在Scala、Java、Python和R中使用DSL。使用DSL则没有必要创建临时视图了。
查看DataFrame的schema信息:
1 2 3 4
| scala> df.printSchema root |-- age: long (nullable = true) |-- username: string (nullable = true)
|
查看某一列:
1 2 3 4 5 6 7 8
| scala> df.select("username").show() +--------+ |username| +--------+ |zhangsan| | lisi| | wangwu| +--------+
|
对列进行运算:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| scala> df.select($"username", $"age" + 1).show() +--------+---------+ |username|(age + 1)| +--------+---------+ |zhangsan| 21| | lisi| 31| | wangwu| 41| +--------+---------+
scala> df.select('username, 'age + 2).show() +--------+---------+ |username|(age + 2)| +--------+---------+ |zhangsan| 22| | lisi| 32| | wangwu| 42| +--------+---------+
|
注意:涉及到对列进行运算的时候,每一列都必须使用$,或者采用引号表达式:单引号+字段
还有其他更多的操作,如filter,groupby等。
创建DataSet
DataSet的创建可以使用样例类序列创建,也可以使用基本类型序列创建
1 2 3 4 5 6 7 8 9 10 11 12 13
| scala> case class Person(name: String, age: Long) defined class Person
scala> val caseClassDS = Seq(Person("zhangsan", 300), Person("lisi", 400)).toDS caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> caseClassDS.show +--------+---+ | name|age| +--------+---+ |zhangsan|300| | lisi|400| +--------+---+
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| scala> val seqDS = Seq(1, 2, 3, 4, 5).toDS seqDS: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> seqDS.show +-----+ |value| +-----+ | 1| | 2| | 3| | 4| | 5| +-----+
|
三种抽象之间的关系
到目前为止,Spark中为我们提供了三种抽象,分别是RDD,DataFrame,DataSet。它们之间有相似之处,也有不同点。
相同点:
- 都是Spark平台下的分布式弹性数据集,为处理大型数据提供便利
- 都具有惰性机制,直到行动算子才会触发执行
- 会根据Spark的内存情况进行自动缓存
- 都有分区的概念
- DataFrame和DataSet可以使用模式匹配来获取各个字段的值和类型
不同点:
- RDD一般和Spark MLlib同时使用,不支持SparkSQL操作
- DataFrame每一行的类型固定为Row,支持SparkSQL操作
- DataSet和DataFrame具有完全相同的成员函数,区别在于每一行的数据类型不同
它们分别是不同程度的抽象,可以互相转换。
如果需要它们之间的相互操作,需要引入import spark.implicits._
。这里的spark不是Scala的包名,而是创建的SparkSession对象的变量名称,所以需要创建SparkSession对象之后再导入。在spark-shell中无需我们手动导入,已经存在。
注意这里的spark对象不能使用var声明,因为Scala支持val修饰的对象进行导入。
RDD与 DataFrame之间的转换
RDD -> DataFrame:toDF
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| scala> val rdd = sc.makeRDD(List(("zhangsan", 30), ("lisi", 40))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[35] at makeRDD at <console>:24
scala> rdd.toDF.show() +--------+---+ | _1| _2| +--------+---+ |zhangsan| 30| | lisi| 40| +--------+---+
scala> rdd.toDF("username", "age").show() +--------+---+ |username|age| +--------+---+ |zhangsan| 30| | lisi| 40| +--------+---+
|
DataFrame -> RDD:.rdd
1 2
| scala> df.rdd.collect res22: Array[org.apache.spark.sql.Row] = Array([20,zhangsan], [30,lisi], [40,wangwu])
|
DataFrame和DataSet之间的转换
DataFrame -> DataSet:as
注意需要属性和样例类属性名称保持一致
1 2 3 4 5 6 7 8
| scala> case class Person(name: String, age: Long) defined class Person
scala> val df = sc.makeRDD(List(("zhangsan", 12), ("lisi", 13))).toDF("name", "age") df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> val ds = df.as[Person] ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: int
|
DataSet -> DataFrame:toDF
1 2
| scala> ds.toDF res32: org.apache.spark.sql.DataFrame = [name: string, age: int]
|
RDD与DataSet之间的转换
RDD -> DataSet:toDS
。
SparkSQL能够自动将包含有case类的RDD转换为DataSet。样例类定义的表的结构,样例类的属性通过反射变为表的列名。
1 2 3 4 5 6 7 8 9 10 11 12 13
| scala> case class Person(name: String, age: Long) defined class Person
scala> val dataset = sc.makeRDD(List(Person("zhangsan", 11), Person("lisi", 22))).toDS dataset: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> dataset.show +--------+---+ | name|age| +--------+---+ |zhangsan| 11| | lisi| 22| +--------+---+
|
DataSet -> DataFrame:.rdd
1 2
| scala> dataset.rdd.collect res29: Array[Person] = Array(Person(zhangsan,11), Person(lisi,22))
|
IDEA编程
使用IDEA进行SparkSQL进行编程的话,需要进行依赖的添加:
1 2 3 4 5
| <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.1.3</version> </dependency>
|
简单准备数据环境:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| object SQL { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL") val spark = SparkSession.builder().config(sparkConf).getOrCreate() import spark.implicits._
val rdd = spark.sparkContext.makeRDD(List((1, "zhangsan", 30), (2, "lisi", 40))) val df: DataFrame = rdd.toDF("id", "name", "age") val ds:Dataset[Person] = df.as[Person]
df.show() ds.show()
spark.close() }
case class Person(id: Int, name: String, age: Int) }
|
用户自定义函数
用户可以通过spark.udf功能添加自定义函数
UDF
UDF函数只需要进行注册即可使用。例如这里的函数就是给姓名在查询的使用加一个前缀:
1 2 3 4 5 6 7 8
| ds.createOrReplaceTempView("person")
spark.udf.register("myUDF", (name:String) => { "Name: " + name }) spark.sql("select myUDF(name), age from person").show()
|
UDAF
UDAF指的是聚合操作。对于聚合操作来说,我们可以将它的执行过程分为两个步骤,首先进行遍历,将一些临时数据放在Buffer中,之后再对这些Buffer中的数据进行某种特殊的操作,得到聚合结果。
在SparkSQL中,实现UDAF也是类似的逻辑。我们需要继承特定的类,然后重写其中的方法,方法的总体逻辑与上面我们说的执行过程类似。这里可供继承的类有UserDefinedAggregateFunction
和Aggregator
,其中前者是弱类型的,需要使用顺序来确定值,而后者是强类型,可以使用类型名来确定值。前者已经不推荐使用,这里只做简单介绍。下面我们就使用用户自定义聚合函数来实现计算年龄的平均值
UserDefinedAggregateFunction
自定义类继承UserDefinedAggregateFunction
,实现其中的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| class MyAvg_UDAF extends UserDefinedAggregateFunction { override def inputSchema: StructType = { StructType( Array(StructField("age", LongType)) ) }
override def bufferSchema: StructType = { StructType( Array( StructField("total", LongType), StructField("count", LongType) ) ) }
override def dataType: DataType = LongType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer.update(0, 0L) buffer.update(1, 0L) }
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer.update(0, buffer.getLong(0) + input.getLong(0)) buffer.update(1, buffer.getLong(1) + 1) }
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0)) buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1)) }
override def evaluate(buffer: Row): Any = { buffer.getLong(0) / buffer.getLong(1) } }
|
然后进行注册,在SQL语句中使用
1 2
| spark.udf.register("myAvg_udaf", new MyAvg_UDAF()) spark.sql("select myAvg_udaf(age) from person").show()
|
Aggregator
可以看到上面的操作涉及到很多位置下标的使用,可读性不好也容易混淆。因此Spark提供了强类型的Aggregator,我们也需要自定义类来继承Aggregator
,实现其中的方法。
注意这里不要导错对象了,正确路径为import org.apache.spark.sql.expression.Aggregator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| case class Buffer(var total: Long, var count: Long)
class MyAvg_Agg extends Aggregator[Long, Buffer, Long] { override def zero: Buffer = { Buffer(0L, 0L) }
override def reduce(buffer: Buffer, in: Long): Buffer = { buffer.total = buffer.total + in buffer.count = buffer.count + 1 buffer }
override def merge(buffer1: Buffer, buffer2: Buffer): Buffer = { buffer1.total = buffer1.total + buffer2.total buffer1.count = buffer1.count + buffer2.count buffer1 }
override def finish(buffer: Buffer): Long = { buffer.total / buffer.count }
override def bufferEncoder: Encoder[Buffer] = Encoders.product
override def outputEncoder: Encoder[Long] = Encoders.scalaLong }
|
之后注册使用
1 2
| spark.udf.register("myAvg_agg", functions.udaf(new MyAvg_Agg())) spark.sql("select myAvg_agg(age) from person").show()
|
数据加载与保存
通用方式
SparkSQL提供通用方式进行数据的保存和加载,通用方式指的是使用相同的API,根据不同的参数读取和保存不同格式的数据。默认情况下,SparkSQL读取和保存的文件格式为parquet。(下面的spark都是SparkSession对象。)
通用加载:spark.read.load
1
| spark.read.format("...")[.option("...")].load("...")
|
- format:指定加载的数据格式,包括csv、jdbc、json、orc、parquet、textFile等
- load:加载路径
- option:传入相关参数
通用保存:df.write.save
(df是一个DataFrame对象)
1
| df.write.format("...")[.option("...")].save("...")
|
- format:指定保存的数据格式
- save:指定保存数据的路径
- option:传入相关参数
不同格式
上面是通过使用format来实现不同文件格式的加载,Spark中也提供专门的操作符来完成这件事:
1 2 3
| scala> spark.read. csv format jdbc json load option options orc parquet schema table text textFile
|
- json:要求读取的json文件每一行是一个json串
- csv:在option中配置csv文件的相关信息,如分隔符,是否包含表头等
1 2 3 4 5
| spark.read.format("csv") .option("sep", ";") .option("inferSchema", "true") .option("header", "true") .load("data/user.csv")
|
- mysql:在IDEA中通过JDBC对MySQL进行操作
先引入依赖
1 2 3 4 5
| <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
spark.read.format("jdbc") .option("url", "jdbc:mysql://xxx") .option("driver", "com.mysql.jdbc.Driver") .option("user", "xxx") .option("password", "xxx") .option("dbtable", "user_table") .load().show
val props: Properties = new Properties() props.setProperty("user", "xxx") props.setProperty("password", "xxxx") val df: DataFrame = spark.read.jdbc("jdbc:mysql://xxx", "user_table", props) df.show
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
ds.write .format("jdbc") .option("url", "jdbc:mysql://xxx") .option("user", "xxx") .option("password", "xxx") .option("dbtable", "user_table") .mode(SaveMode.Append) .save()
val props: Properties = new Properties() props.setProperty("user", "xxx") props.setProperty("password", "xxx") ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://xxx", "user_table", props)
|
- Hive
SparkSQL可以连接到部署好的Hive上,也可以使用自己的Hive元数据仓库。
如果需要连接外部Hive,应该将hive-site.xml复制到Spark的配置目录文件目录下$SPARK_HOME/conf
。如果没有部署外部Hive,那么SparkSQL会在当前目录中创建自己的Hive元数据仓库,名称为metastore_db
。
内嵌的Hive元数据存储在derby中,默认仓库地址为$SPARK_HOME/spark-warehouse
,无需任何配置,直接使用即可。
外部的Hive连接需要以下操作:
- 将
hive-site.xml
拷贝到conf/
目录下
- 将MySQL的驱动复制到
jars/
目录下
之后可以运行SparkSQL
CLI,bin/spark-sql
可以开启一个窗口,在其中直接执行SQL语句。
当然也可以运行Spark
beeline。与HiveServer2类似,也是通过连接到MetaStore服务来进行元数据的访问。这需要我们启动Spark
Thrift Server。它的接口和协议与HiveServer2完全一致,可以和Hive
MetaStore进行交互。
1 2
| sbin/start-thriftserver.sh bin/beeline -u jdbc:hive2://xxxx:10000 -n xxx
|
也可以在代码中操作Hive,首先需要导入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency>
|
然后将hive-site.xml文件拷贝到resources目录下。
创建SparkSession对象的时候,需要添加一个参数启动Hive支持
1 2 3 4 5 6 7
| val spark: SparkSession = SparkSession .builder() .enableHiveSupport() .master("local[*]") .appName("sql") .getOrCreate()
|
默认创建数据库在本地仓库,需要通过参数修改仓库地址
1
| config("spark.sql.warehouse.dir", "hdfs://xxx/xxx/xx")
|