Spark学习笔记-SparkCore(2)-RDD序列化,依赖,持久化以及分区

RDD序列化

我们前面提到过,在RDD算子以外的代码都是在Driver端执行的,而算子内部的代码都是在Executor端执行的。而一个非常常见的情景是,在算子内部提供的函数会使用到算子外的数据,这样就形成了闭包的效果。为了能够正确实现操作,在Driver端的算子外的数据会传输给每个Task,这样在Task执行的时候也能够使用到对应的数据而不会产生错误。

例如下面的代码,我们在map算子内部使用到了算子外部的数据num,但是程序能够正确执行,因为会将num传送给每个Task。

1
2
3
4
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
val num = 10
val res = rdd.map(_ + num)
res.collect().foreach(println)

既然需要将算子外的数据进行传输,那么就意味着这个数据需要能够序列化,否则就无法传输。举例来说,在下面的代码中,我们定义了一个类,并在算子外部new得到的对应的类,并且在算子内部使用了这个类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.spark.{SparkConf, SparkContext}

object Rdd_Test {
class User {
val num: Int = 30
}

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
val user = new User()
val res = rdd.map(_ + user.num)
res.collect().foreach(println)

}
}

由于我们自定义的类没有实现序列化,因此无法在网络上传输,运行会报错

1
Task not serializable

于是我们只需要将自定义的类混入序列化特征Serializable即可

1
2
3
class User extends Serializable {
val num: Int = 30
}

当然我们也可以使用样例类来完成对应的功能,因为样例类在编译的时候会自动混入序列化特征

1
2
3
case class User() {
val num: Int = 30
}

需要注意的是,如果我们的RDD中传入的数据为空,调用上述没有实现序列化的代码依旧会报错,因为上面的闭包检测的过程是在执行任务计算之前的,闭包检测会检测闭包内的对象是否可以进行序列化

Java提供的序列化能够实现任何类的序列化,但是是一个比较重量级的序列化,得到的序列化结果大小较大。Spark处于性能的考虑,在2.0之后开始支持新的Kryo序列化机制。Kyro序列化的速度比Serializable快得多,并且得到的结果大小较小。当RDD在Shuffle数据的时候,简单数据类型、数组、字符串类型等已经在Spark内部使用Kryo来序列化

要使用Kyro序列化,同样需要实现Serializable特征,同时在创建配置文件的时候进行相关的配置和注册

1
2
3
4
5
val conf: SparkConf = new SparkConf().setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[User]))

这里的User类需要实现Serializable特征或者使用样例类

RDD依赖关系

依赖与血缘关系

一个RDD通过算子操作后可以得到一个新的RDD,我们可以说这个新的RDD依赖于旧的RDD。这样的关系我们称为依赖关系,即相邻两个RDD之间的关系称为依赖关系。而新的RDD又会经过一系列算子的操作,依赖关系彼此相连,形成血缘关系,即多个连续的RDD的依赖关系称为血缘关系

我们说RDD中不会保存数据,但是为了提高容错性,RDD之间的关系会进行保存,即RDD保存了依赖关系和血缘关系。RDD支持粗粒度的转换,即在大量记录上执行的单个操作,RDD会将创建的一系列依赖关系记录下来,以便可以完成恢复操作,根据这些信息来重新运算,恢复丢失的数据分区。

何为恢复:由于RDD中不保存数据,如果一个RDD中执行出错,由于不保存数据,恢复的操作就是重新执行,因此需要记录前面需要执行的操作,即记录血缘关系。

RDD有两个方法toDebugStringdependencies,可以分别获取当前RDD的血缘关系和依赖关系。

血缘关系获取:

1
2
3
4
5
6
7
8
9
10
11
12
val lines = sc.textFile("data/1.txt")
println(lines.toDebugString)
println()
val words = lines.flatMap(_.split(" "))
println(words.toDebugString)
println()
val wordToOne = words.map(word => (word, 1))
println(wordToOne.toDebugString)
println()
val res = wordToOne.reduceByKey(_ + _)
println(res.toDebugString)
println()

输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
(2) data/1.txt MapPartitionsRDD[1] at textFile at Rdd_Test.scala:16 []
| data/1.txt HadoopRDD[0] at textFile at Rdd_Test.scala:16 []

(2) MapPartitionsRDD[2] at flatMap at Rdd_Test.scala:19 []
| data/1.txt MapPartitionsRDD[1] at textFile at Rdd_Test.scala:16 []
| data/1.txt HadoopRDD[0] at textFile at Rdd_Test.scala:16 []

(2) MapPartitionsRDD[3] at map at Rdd_Test.scala:22 []
| MapPartitionsRDD[2] at flatMap at Rdd_Test.scala:19 []
| data/1.txt MapPartitionsRDD[1] at textFile at Rdd_Test.scala:16 []
| data/1.txt HadoopRDD[0] at textFile at Rdd_Test.scala:16 []

(2) ShuffledRDD[4] at reduceByKey at Rdd_Test.scala:25 []
+-(2) MapPartitionsRDD[3] at map at Rdd_Test.scala:22 []
| MapPartitionsRDD[2] at flatMap at Rdd_Test.scala:19 []
| data/1.txt MapPartitionsRDD[1] at textFile at Rdd_Test.scala:16 []
| data/1.txt HadoopRDD[0] at textFile at Rdd_Test.scala:16 []

其中的数字表示分区数量,并且注意到|+-符号,表示的含义是否会经过Shuffle过程

依赖关系获取:

1
2
3
4
5
6
7
8
9
10
11
12
val lines = sc.textFile("data/1.txt")
println(lines.dependencies)
println()
val words = lines.flatMap(_.split(" "))
println(words.dependencies)
println()
val wordToOne = words.map(word => (word, 1))
println(wordToOne.dependencies)
println()
val res = wordToOne.reduceByKey(_ + _)
println(res.dependencies)
println()

输出如下:

1
2
3
4
5
6
7
List(org.apache.spark.OneToOneDependency@32e54a9d)

List(org.apache.spark.OneToOneDependency@56da7487)

List(org.apache.spark.OneToOneDependency@59cda16e)

List(org.apache.spark.ShuffleDependency@6e7c351d)

宽依赖与窄依赖

注意到前面在获取依赖的时候,得到的依赖分为两种,OneToOneDependencyShuffleDependency。依赖是新旧RDD之间的关系,两类依赖之间的差别在于RDD分区之间关系

  • OneToOneDependency:表示旧的RDD中的一个分区的数据只会传入新RDD的一个分区,不经过Shuffle过程。一对一或者多对一,又被称为窄依赖
  • ShuffleDependecy:表示旧的RDD中的一个分区的数据会传入新RDD中的不同分区,经过Shuffle过程。一对多,又被称为宽依赖

宽窄依赖名称由来:OneToOneDependency类的底层继承了NarrowDependency,而ShuffleDependency类的底层继承了Dependency类,因此我们称第一个为窄依赖,而为了表示对应称第二个为宽依赖

1
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)

阶段划分

在RDD整体的执行流程中,实际上是分为多个阶段的。如果RDD之间的依赖是窄依赖,这就意味着一个分区中的数据无需等待其他分区中的数据处理完成,那么这些处理就可以处于同一个阶段。但是如果RDD之间的依赖是宽依赖,则新的一个分区中的数据依赖于旧的多个分区,因此需要等待其他分区中的数据处理完成。

Spark根据Shuffle依赖来划分阶段,首先整体的逻辑是一个阶段,然后每遇到一个Shuffle依赖则会增加一个阶段。阶段之间的执行必须按照先后顺序,一个阶段执行完毕之后才能执行下一个阶段。整体流程的阶段数量 = Shuffle依赖数量 + 1

任务划分

RDD的任务划分,其中会分为如下的部分:Application、Job、Stage和Task

  • Application:初始化一个SparkContext即会生成一个Application
  • Job:一个行动算子会生成一个Job
  • Stage:阶段的数量等于宽依赖数量的个数+1
  • Task:在一个Stage中,最后一个RDD的分区数量就是Task的个数

RDD持久化

重用分析

在一些场景下,我们可能需要重复利用到一些前置的运算逻辑,然后做不同的行动算子的处理。例如下面的代码,我们需要重用前面map的逻辑,然后使用两次不同的行动算子逻辑进行处理。

1
2
3
4
5
6
7
8
9
val rdd = sc.makeRDD(List(1, 2))

val mapRdd = rdd.map(num => num+1)

val res1 = mapRdd.reduce(_ + _)
val res2 = mapRdd.reduce(_ * _)

println(res1)
println(res2)

我们很容易想到上面的代码,并且上面的代码确实能够得到正确的结果,但是其中的重用逻辑却值得分析。我们知道RDD中不会保存数据,因此我们实际上重用的不是数据,而是RDD对象,是RDD逻辑。当执行res1的reduce行动算子的时候,会从数据开始走一遍map的逻辑,执行res2的reduce行动算子的时候,同样也会从数据开始从头走一遍map的逻辑。我们可以在map的传入函数中增加输出功能,如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)

val rdd = sc.makeRDD(List(1, 2))

val mapRdd = rdd.map(
num => {
println("-map-")
num + 1
}
)

val res1 = mapRdd.reduce(_ + _)
val res2 = mapRdd.reduce(_ * _)

println("res1=" + res1)
println("res2=" + res2)

输出结果如下,可以看出res1和res2的执行都触发了前面的mapRdd的逻辑执行。

1
2
3
4
5
6
-map-
-map-
-map-
-map-
res1=5
res2=6

注意这里不要将map中的逻辑写成下面的形式:

1
2
3
4
val mapRdd = rdd.map({
println("-map-")
_ + 1
})

如果写成上面的形式,最终"-map-"只会输出一次,实际传入map的函数只有x=>{x+1},可以参考 Scala匿名函数中下划线简化的注意事项

临时缓存

上面我们分析了重用对象的逻辑,发现数据是没有被重用的。但是我们需要重用也是有办法的,首先介绍临时缓存。具体来说,RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会将数据缓存到JVM的堆内存当中。我们可以修改上面的代码,在调用reduce之前将mapRdd缓存下来。这样在后面调用reduce的时候,就不会重新调用mapRdd以及之前的逻辑,而是会采用缓存中的数据。

1
2
3
mapRdd.cache()
// 或者
mapRdd.persist(StorageLevel.DISK_ONLY)

其中cache方法的底层调用的还是persist方法,默认缓存只存放在内存中。而persist方法可以更改存储级别,在StorageLevel中可以查看所有支持的存储级别,大致分为MEMORY_ONLYMEMORY_ONLY_SERMEMORY_AND_DISKMEMORY_AND_DISK_SERDISK_ONLY

还有其他几个需要注意的点:

  • 持久化操作是在行动算子执行时候即提交任务时完成的,如果没有行动算子触发任务执行,持久化操作也不会进行
  • Spark会自动对一些Shuffle操作的中间数据进行持久化操作(例如reduceByKey),这样做的目的是为了当一个节点Shuffle失败后避免重新计算整个输入,但是在实际应用的时候,如果想要重用数据,仍然建议调用persist或者cache

持久检查点

上面的缓存操作只是临时缓存,而创建检查点可以将RDD的中间结果写入磁盘。在中间阶段做检查点容错,如果检查点之后RDD出现问题,则检查点之间的RDD均不需要再次检查,可以从检查点开始重新检查血缘,减小开销

同样的,检查点持久化操作需要通过行动算子进行触发。检查点路径保存的文件在作业执行完毕之后不会被删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
sc.setCheckpointDir("./checkpoint")

val rdd = sc.makeRDD(List(1, 2))

val mapRdd = rdd.map(
num => {
println("-map-")
num + 1
}
)
mapRdd.checkpoint()

val res1 = mapRdd.reduce(_ + _)
val res2 = mapRdd.reduce(_ * _)
val res3 = mapRdd.reduce(_ - _)

println("res1=" + res1)
println("res2=" + res2)
println("res3=" + res3)

需要注意,首先我们需要设置检查点存放的位置,并且注意到这里后续存在三个reduce行动算子,而输出如下。其中输出了四次"-map-",是因为在checkpoint()方法中会重新触发一个job执行checkpoint。

1
2
3
4
5
6
7
-map-
-map-
-map-
-map-
res1=5
res2=6
res3=1

可以分析上面结果输出的原因,经过reduce行动算子触发之后,首先会过一遍前面的逻辑,直到checkpoint方法,输出了两个-map-。之后在checkpoint方法中会重新创建一个Job执行前面的逻辑来创建checkpoint。因此会再输出两个-map-。后面的三个reduce可以对前面的数据进行重用,因此直接输出最终的结果。我们通常会将cache和checkpoint配合使用,避免重新执行一遍前面的逻辑。增加之后,上面的代码运行只会输出两个-map-

1
2
mapRDD.cache()
mapRdd.checkpoint()

缓存和检查点的区别

  1. cache缓存只是将数据保存起来,不会切断血缘关系,会添加一层cache的依赖。而checkpoints会切断血缘关系,重新建立新的血缘关系
  2. persist将数据临时存储在磁盘文件中进行数据重用,在涉及到磁盘IO的情况下,性能较低,但是数据安全。如果作业执行完毕,临时保存的数据文件会丢失。
  3. checkpoint将数据长久地保存在磁盘文件中进行数据重用,一般保存在hdfs中。涉及磁盘IO,性能较低,但是数据安全。并且为了提高效率,通常和cache配合使用

RDD分区器

Spark自带分区器

Spark目前支持Hash分区和Range分区,以及用户自定义分区。Hash分区为默认分区。分区器直接决定RDD中分区的个数,RDD中每条数据经过Shuffle后进入哪个分区。

  • 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区值为None
  • 每个RDD的分区ID范围为[0, numPartitions-1),分别对应numPartitions个分区(类似Hadoop中的分区概念)

Hash分区:对于给定的Key,计算其hashCode,对分区个数进行取余

Range分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序。

自定义分区器

我们也可以自定义分区器,我们自己实现的类需要继承Partitioner,实现其中的特定方法。下面是示例代码,根据Key的含义进行分区划分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object Rdd_Test {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)

val rdd = sc.makeRDD(List(
("xxx", "123123"),
("xxx", "123123"),
("xxxx", "123123"),
("xxxxx", "123123"),
("x", "123123"),
("xx", "123123"),
))
val partRdd = rdd.partitionBy(new MyPartitioner)

partRdd.saveAsTextFile("output")

}

class MyPartitioner extends Partitioner {
// 分区数量
override def numPartitions: Int = 4

// 分区逻辑
override def getPartition(key: Any): Int = {
key match {
case "x" => 0
case "xx" => 1
case "xxx" => 2
case _ => 3
}
}
}
}

Spark学习笔记-SparkCore(2)-RDD序列化,依赖,持久化以及分区
http://example.com/2022/05/10/Spark学习笔记-SparkCore-2-RDD序列化-依赖-持久化以及分区/
作者
EverNorif
发布于
2022年5月10日
许可协议