Spark学习笔记-SparkCore(2)-RDD序列化,依赖,持久化以及分区
RDD序列化
我们前面提到过,在RDD算子以外的代码都是在Driver端执行的,而算子内部的代码都是在Executor端执行的。而一个非常常见的情景是,在算子内部提供的函数会使用到算子外的数据,这样就形成了闭包的效果。为了能够正确实现操作,在Driver端的算子外的数据会传输给每个Task,这样在Task执行的时候也能够使用到对应的数据而不会产生错误。
例如下面的代码,我们在map算子内部使用到了算子外部的数据num
,但是程序能够正确执行,因为会将num
传送给每个Task。
1 |
|
既然需要将算子外的数据进行传输,那么就意味着这个数据需要能够序列化,否则就无法传输。举例来说,在下面的代码中,我们定义了一个类,并在算子外部new得到的对应的类,并且在算子内部使用了这个类。
1 |
|
由于我们自定义的类没有实现序列化,因此无法在网络上传输,运行会报错
1 |
|
于是我们只需要将自定义的类混入序列化特征Serializable
即可
1 |
|
当然我们也可以使用样例类来完成对应的功能,因为样例类在编译的时候会自动混入序列化特征
1 |
|
需要注意的是,如果我们的RDD中传入的数据为空,调用上述没有实现序列化的代码依旧会报错,因为上面的闭包检测的过程是在执行任务计算之前的,闭包检测会检测闭包内的对象是否可以进行序列化
Java提供的序列化能够实现任何类的序列化,但是是一个比较重量级的序列化,得到的序列化结果大小较大。Spark处于性能的考虑,在2.0之后开始支持新的Kryo序列化机制。Kyro序列化的速度比Serializable快得多,并且得到的结果大小较小。当RDD在Shuffle数据的时候,简单数据类型、数组、字符串类型等已经在Spark内部使用Kryo来序列化
要使用Kyro序列化,同样需要实现
Serializable
特征,同时在创建配置文件的时候进行相关的配置和注册
1
2
3
4
5
val conf: SparkConf = new SparkConf().setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类
.registerKryoClasses(Array(classOf[User]))这里的User类需要实现
Serializable
特征或者使用样例类
RDD依赖关系
依赖与血缘关系
一个RDD通过算子操作后可以得到一个新的RDD,我们可以说这个新的RDD依赖于旧的RDD。这样的关系我们称为依赖关系,即相邻两个RDD之间的关系称为依赖关系。而新的RDD又会经过一系列算子的操作,依赖关系彼此相连,形成血缘关系,即多个连续的RDD的依赖关系称为血缘关系。
我们说RDD中不会保存数据,但是为了提高容错性,RDD之间的关系会进行保存,即RDD保存了依赖关系和血缘关系。RDD支持粗粒度的转换,即在大量记录上执行的单个操作,RDD会将创建的一系列依赖关系记录下来,以便可以完成恢复操作,根据这些信息来重新运算,恢复丢失的数据分区。
何为恢复:由于RDD中不保存数据,如果一个RDD中执行出错,由于不保存数据,恢复的操作就是重新执行,因此需要记录前面需要执行的操作,即记录血缘关系。
RDD有两个方法toDebugString
和dependencies
,可以分别获取当前RDD的血缘关系和依赖关系。
血缘关系获取:
1 |
|
输出如下:
1 |
|
其中的数字表示分区数量,并且注意到|
和+-
符号,表示的含义是否会经过Shuffle过程
依赖关系获取:
1 |
|
输出如下:
1 |
|
宽依赖与窄依赖
注意到前面在获取依赖的时候,得到的依赖分为两种,OneToOneDependency
和ShuffleDependency
。依赖是新旧RDD之间的关系,两类依赖之间的差别在于RDD分区之间关系
OneToOneDependency
:表示旧的RDD中的一个分区的数据只会传入新RDD的一个分区,不经过Shuffle过程。一对一或者多对一,又被称为窄依赖ShuffleDependecy
:表示旧的RDD中的一个分区的数据会传入新RDD中的不同分区,经过Shuffle过程。一对多,又被称为宽依赖
宽窄依赖名称由来:
OneToOneDependency
类的底层继承了NarrowDependency
,而ShuffleDependency
类的底层继承了Dependency
类,因此我们称第一个为窄依赖,而为了表示对应称第二个为宽依赖
1
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
阶段划分
在RDD整体的执行流程中,实际上是分为多个阶段的。如果RDD之间的依赖是窄依赖,这就意味着一个分区中的数据无需等待其他分区中的数据处理完成,那么这些处理就可以处于同一个阶段。但是如果RDD之间的依赖是宽依赖,则新的一个分区中的数据依赖于旧的多个分区,因此需要等待其他分区中的数据处理完成。
Spark根据Shuffle依赖来划分阶段,首先整体的逻辑是一个阶段,然后每遇到一个Shuffle依赖则会增加一个阶段。阶段之间的执行必须按照先后顺序,一个阶段执行完毕之后才能执行下一个阶段。整体流程的阶段数量 = Shuffle依赖数量 + 1
任务划分
RDD的任务划分,其中会分为如下的部分:Application、Job、Stage和Task
- Application:初始化一个SparkContext即会生成一个Application
- Job:一个行动算子会生成一个Job
- Stage:阶段的数量等于宽依赖数量的个数+1
- Task:在一个Stage中,最后一个RDD的分区数量就是Task的个数
RDD持久化
重用分析
在一些场景下,我们可能需要重复利用到一些前置的运算逻辑,然后做不同的行动算子的处理。例如下面的代码,我们需要重用前面map的逻辑,然后使用两次不同的行动算子逻辑进行处理。
1 |
|
我们很容易想到上面的代码,并且上面的代码确实能够得到正确的结果,但是其中的重用逻辑却值得分析。我们知道RDD中不会保存数据,因此我们实际上重用的不是数据,而是RDD对象,是RDD逻辑。当执行res1的reduce行动算子的时候,会从数据开始走一遍map的逻辑,执行res2的reduce行动算子的时候,同样也会从数据开始从头走一遍map的逻辑。我们可以在map的传入函数中增加输出功能,如下所示
1 |
|
输出结果如下,可以看出res1和res2的执行都触发了前面的mapRdd的逻辑执行。
1 |
|
注意这里不要将map中的逻辑写成下面的形式:
1
2
3
4
val mapRdd = rdd.map({
println("-map-")
_ + 1
})如果写成上面的形式,最终"-map-"只会输出一次,实际传入map的函数只有
x=>{x+1}
,可以参考 Scala匿名函数中下划线简化的注意事项
临时缓存
上面我们分析了重用对象的逻辑,发现数据是没有被重用的。但是我们需要重用也是有办法的,首先介绍临时缓存。具体来说,RDD通过Cache
或者Persist
方法将前面的计算结果缓存,默认情况下会将数据缓存到JVM的堆内存当中。我们可以修改上面的代码,在调用reduce之前将mapRdd缓存下来。这样在后面调用reduce的时候,就不会重新调用mapRdd以及之前的逻辑,而是会采用缓存中的数据。
1 |
|
其中cache方法的底层调用的还是persist方法,默认缓存只存放在内存中。而persist方法可以更改存储级别,在StorageLevel中可以查看所有支持的存储级别,大致分为MEMORY_ONLY
,MEMORY_ONLY_SER
,MEMORY_AND_DISK
,MEMORY_AND_DISK_SER
,DISK_ONLY
还有其他几个需要注意的点:
- 持久化操作是在行动算子执行时候即提交任务时完成的,如果没有行动算子触发任务执行,持久化操作也不会进行
- Spark会自动对一些Shuffle操作的中间数据进行持久化操作(例如reduceByKey),这样做的目的是为了当一个节点Shuffle失败后避免重新计算整个输入,但是在实际应用的时候,如果想要重用数据,仍然建议调用persist或者cache
持久检查点
上面的缓存操作只是临时缓存,而创建检查点可以将RDD的中间结果写入磁盘。在中间阶段做检查点容错,如果检查点之后RDD出现问题,则检查点之间的RDD均不需要再次检查,可以从检查点开始重新检查血缘,减小开销
同样的,检查点持久化操作需要通过行动算子进行触发。检查点路径保存的文件在作业执行完毕之后不会被删除。
1 |
|
需要注意,首先我们需要设置检查点存放的位置,并且注意到这里后续存在三个reduce行动算子,而输出如下。其中输出了四次"-map-",是因为在checkpoint()方法中会重新触发一个job执行checkpoint。
1 |
|
可以分析上面结果输出的原因,经过reduce行动算子触发之后,首先会过一遍前面的逻辑,直到checkpoint方法,输出了两个-map-
。之后在checkpoint方法中会重新创建一个Job执行前面的逻辑来创建checkpoint。因此会再输出两个-map-
。后面的三个reduce可以对前面的数据进行重用,因此直接输出最终的结果。我们通常会将cache和checkpoint配合使用,避免重新执行一遍前面的逻辑。增加之后,上面的代码运行只会输出两个-map-
。
1 |
|
缓存和检查点的区别
- cache缓存只是将数据保存起来,不会切断血缘关系,会添加一层cache的依赖。而checkpoints会切断血缘关系,重新建立新的血缘关系
- persist将数据临时存储在磁盘文件中进行数据重用,在涉及到磁盘IO的情况下,性能较低,但是数据安全。如果作业执行完毕,临时保存的数据文件会丢失。
- checkpoint将数据长久地保存在磁盘文件中进行数据重用,一般保存在hdfs中。涉及磁盘IO,性能较低,但是数据安全。并且为了提高效率,通常和
cache
配合使用
RDD分区器
Spark自带分区器
Spark目前支持Hash分区和Range分区,以及用户自定义分区。Hash分区为默认分区。分区器直接决定RDD中分区的个数,RDD中每条数据经过Shuffle后进入哪个分区。
- 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区值为None
- 每个RDD的分区ID范围为
[0, numPartitions-1)
,分别对应numPartitions个分区(类似Hadoop中的分区概念)
Hash分区:对于给定的Key,计算其hashCode,对分区个数进行取余
Range分区:将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序。
自定义分区器
我们也可以自定义分区器,我们自己实现的类需要继承Partitioner,实现其中的特定方法。下面是示例代码,根据Key的含义进行分区划分
1 |
|