Spark学习笔记-SparkCore(1)-RDD以及相关算子

RDD

概念

RDD(Resilient Distributed Dataset),弹性分布式数据集,是Spark中最基本的数据处理模型。在代码中,它是一个抽象类,代表一个弹性的、不可变的、可分区、里面的元素可并行计算的集合。

  • 弹性:
    • 存储的弹性:内存与磁盘的自动切换
    • 容错的弹性:数据丢失的自动恢复
    • 计算的弹性:计算出错重试机制
    • 分片的弹性:可以根据需要重新分片
  • 分布式:数据存储在集群的不同节点上
  • 数据集:RDD中封装了计算逻辑,并不保存数据
  • 数据抽象:RDD是一个抽象类,需要子类的具体实现
  • 不可变:RDD中封装了计算逻辑,是不可以改变的。如果想要改变,只能产生新的RDD,在新的RDD里面封装逻辑
  • 可分区:分区后进行并行计算

输入数据首先需要组织成RDD的结果,之后可以调用RDD中的方法(后续进行处理)。多个不同功能的算子组合成复杂的业务逻辑,直到调用行动算子,数据进行处理,让数据走过对应的逻辑得到最终的输出。

RDD中存在分区的概念,由于需要面对分布式计算的场景,实际计算的时候也是将任务分配给其他的Executor来执行。分区机制,即如何将输入数据进行划分,是RDD中一个重要属性。

核心属性

RDD中有五个核心属性:

1
2
3
4
5
6
Internally, each RDD is characterized by five main properties:
- A list of partitions
- A function for computing each split
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
  • 分区列表:RDD中的分区列表,用于实现分布式计算

    1
    protected def getPartitions: Array[Partition]
  • 分区计算函数:RDD的计算逻辑,Spark在计算的时候,使用分区函数对每个分区中的数据进行处理

    1
    2
    @DeveloperApi
    def compute(split: Partition, context: TaskContext): Iterator[T]
  • RDD之间的依赖关系:RDD是计算模型的封装,当需要将多个计算模型进行组合的时候,就需要将多个RDD建立依赖关系

    1
    protected def getDependencies: Seq[Dependency[_]] = deps
  • 分区器(可选):当数据为Key-Value类型数据的时候,可以通过设定分区器自定义数据的分区

    1
    @transient val partitioner: Option[Partitioner] = None
  • 首选位置(可选):可以设置分区的物理位置偏好,设定分区优先选择分配给哪个节点

    1
    protected def getPreferredLocations(split: Partition): Seq[String] = Nil

执行原理

从计算的角度来看,在数据处理的过程中,需要将计算资源(例如内存、CPU等)和计算模型进行协调和整合。Spark框架在执行的时候,先申请资源,然后将应用程序中的数据处理逻辑分解成一个个的计算任务,然后将任务分发到已经分配资源的计算节点上。数据按照指定的计算模型进行处理,得到最终的计算结果。

以Yarn环境举例来说,首先启动Yarn集群环境,集群中会存在Resource Manager和Node Manager两种角色。之后Spark通过申请资源来创建调度节点和计算节点,分别在对应的节点上得到Driver和Executor。然后Spark框架根据需求将计算逻辑根据分区划分成不同的任务,调度节点将不同分区的数据送到不同的Executor上进行处理。RDD在整个流程中主要用于将逻辑进行封装,生成Task发送给Executor节点执行计算。

基础编程

RDD创建

首先我们需要能够从数据中创建RDD(生成RDD),有以下4种方法

1
2
3
// 前置环境
val sparkConf = new SparkConf().setMaster("local[*]")
val sc = new SparkContext(sparkConf)
  • 从内存中创建RDD
1
2
3
4
5
6
7
// 从内存中的集合创建 
// makeRDD()方法
val seq = Seq[Int](1, 2, 3, 4, 5)
val rdd = sc.makeRDD(seq)

// parallelize()方法
val rdd = sc.parallelize(seq)

sc.makeRDD()sc.parallelize()两种方式都能够从内存的集合创建出RDD,其中makeRDD()方法其实只是parallelize()方法的一层封装

在创建RDD的时候可以传递第二个参数,这个参数表示分区的数量。默认值采用会采用配置参数spark.default.parallelism,如果该配置参数没有被设置的话,则采用当前环境下的最大可用核数。可用使用saveAsTextFile来查看分区效果,默认情况下一个分区会产生一个保存文件,下面的代码最终会在工作目录下的/ouput保存出5个内容文件:part-00000 ~ part-00004

(具体的分区逻辑会在后面进行说明)

1
2
3
4
5
6
7
val sparkConf = new SparkConf().setMaster("local[*]")
sparkConf.set("spark.default.parallelism", "5")
val sc = new SparkContext(sparkConf)

val rdd = sc.makeRDD(Seq[Int](1, 2, 3, 4, 5))

rdd.saveAsTextFile("output")
  • 从文件中创建RDD
1
2
3
4
5
6
7
8
9
// 从文件中创建,相对路径默认为工作目录开始
// 本地文件
val rdd = sc.textFile("input/data.txt")
// 本地目录
val rdd = sc.textFile("input")
// 使用通配符
val rdd = sc.textFile("input/data1*.txt")
// 分布式存储系统路径
val rdd = sc.textFile("hdfs://hadoop102:8020/input")

可以有外部存储系统的数据集创建RDD包括:本地的文件系统,所有Hadoop支持的数据系统如HDFS、HBase等。并且其中文件路径有很灵活的使用功能。

有时候我们希望在读取数据的同时知道这些数据的来源,那么可以使用wholeTextFiles方法,得到的rdd读取结果为元组,第一个元素表示文件路径,第二个元素表示文件内容

1
2
val rdd = sc.wholeTextFiles("data")
rdd.collect().foreach(println)

输出效果如下,其中的折行效果来自于输入文件中的换行符,路径为绝对路径:

1
2
3
4
5
6
(file:/D:/Java/Spark/data/1.txt,1
1
)
(file:/D:/Java/Spark/data/2.txt,2
2
)

从文件中同样可以指定分区数量,具体逻辑与从内存中创建有所不同,也会在后续进行对比。

  • 从其他RDD创建

从其他RDD创建指的是,一个RDD运算完成之后,得到的是一个新的RDD

  • 直接创建RDD

直接使用new的方式构造RDD,一般在Spark框架内部使用

RDD并行度与分区

分区:Spark将一个作业切分成多个任务,发送给Executor节点并行计算

并行度:能够同时并行计算的任务数量称为并行度。

从内存中生成RDD和从文件中生成RDD,分区逻辑略有区别,分别体现在默认分区数量以及分区划分上。

读取内存数据:

makeRDD的第二个参数为numSlices,表示分区数量。默认的分区数量采用会采用配置参数spark.default.parallelism,如果该配置参数没有被设置的话,则采用当前环境下的最大可用核数。

分区划分的核心逻辑如下(org.apache.spark.rdd.ParallelCollectionRDDslice方法),其中numSlices之前传入的参数,length为构建RDD的序列的长度,经过下面代码可以得到numSlices个tuple。每个tuple中保存了startend,作为参数调用Array的slice(from: Int, until: Int)方法可以切分子数组,即对应分区中的内容。

1
2
3
4
5
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}

举例来说,下面的代码运行后,在output目录下得到4个输出文件

1
2
3
4
5
6
7
val rdd = sc.makeRDD(List[Int](0, 1, 2, 3, 4, 5, 6, 7, 8, 9), 4)
// numSlices = 4, length = 10
// 0: [0, 2)
// 1: [2, 5)
// 2: [5, 7)
// 3: [7, 10)
rdd.saveAsTextFile("output")

输出文件内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# part-00000
0
1
# part-00001
2
3
4
# part-00002
5
6
# part-00003
7
8
9

读取文件数据:

textFile的第二个参数为minPartitions,表示最小的分区数量,默认值为defalutMinPartitions,定义如下,即该值只能取1或2:

1
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

textFile中返回的RDD是HadoopRDD类,其中使用了FileInputFormat的getPartition方法,并且是旧的API。分区的核心逻辑就在于Hadoop中的FileInputFormat,关于旧API中的getSplits切片逻辑以及行读取的问题可以查看本博客的其他两篇相关文章,这里直接介绍结论

在读取文件的时候,默认情况下会使用goalSize作为分区大小,goalSize的大小通过我们传入的minPartitions以及所有文件的总大小来确定。goalSize = totalSize / minPartitions。由于存在切片逻辑是对输入的每个文件来说的,每个文件的最终切片大小可能达不到goalSize,因此最终的分片数量可能会高于minPartitions。得到切片规划之后,读取数据到对应分区中的逻辑可以参考TextInputFormat的行读取,总结来说,在每个分区的数据读取中,我们会跳过第一行(除了第一个分区),并且读取下一个分区中的第一行(可能完整,也可能不完整),从而保证跨分区数据读取的正确性。如果是多文件读取,则在totalSize的计算的时候需要计算所有文件的大小总和,而在分区划分的时候则是按照逐个文件单独进行的。

举例来说,我们存在下面的文件data/1.txt,总大小为19字节(下面显示写出了CR和LF,为Windows操作系统中的行结束符)

1
2
3
4
5
6
7
1 CR LF
2 CR LF
3 CR LF
4 CR LF
5 CR LF
6 CR LF
7

运行下面的代码:

1
2
val rdd = sc.textFile("data/1.txt")
rdd.saveAsTextFile("output")

计算得到分区大小为19 / 2 = 9字节,由于10 / 9 = 1.1..... > 1.1,最终会产生三个切片文件,切片规划如下:

1
2
3
(fileName: .../data/1.txt, start: 0, length: 9, ...)
(fileName: .../data/1.txt, start: 9, length: 9, ...)
(fileName: .../data/1.txt, start: 18, length: 1, ...)

在第一个切片中,我们读取偏移量[0, 9]的所在行(由于while的特殊逻辑,这里会读取到下个分区的第一行),得到如下数据

1
2
3
4
1 LF
2 LF
3 LF
4 LF

在第二个切片中,读取偏移量[9, 18]所在行(不是第一个切片,忽略第一行,读取下个分区的第一行),得到如下数据

1
2
3
5 LF
6 LF
7 LF

在第三个切片中,读取偏移量[18,19]所在行,由于忽略了分区中的第一行,已经不存在数据了,因此得到空文件

1

实际运行代码,得到的结果和我们分析的是一样的,得到part-00000part-00002,大小分别为8字节、6字节、0字节

我们还可以利用glom()方法来查看分区数据,glom函数的功能是将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。

1
2
3
4
5
val rdd = sc.textFile("data/1.txt")
rdd.glom().collect().foreach(arr => {
println(arr)
println(arr.mkString(","))
})

其中arr表示分区数组。glom()会得到一个Array列表,其中每个元素都代表一个分区

RDD算子

RDD算子即RDD中提供给我们的方法,可以大致梳理出如下脉络。其中RDD转换算子在描述计算逻辑,只有当执行RDD行动算子的时候,才会真正将数据送入,进行执行。RDD算子与普通的方法调用有所不同,RDD算子的外部操作都是在Driver端执行的,而RDD算子内部的操作则是在Executor端

  • RDD转换算子:根据数据处理方式的不同将算子整体上分为Value、双Value和Key-Value类型
    • Value类型:map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy、filter、sample、distinct、coalesce、repartitions、sortBy
    • 双Value类型:intersection、union、subtract、zip
    • Key-Value类型:partitionBy、reduceByKey、groupByKey、aggregateByKey、foldByKey、combineByKey、sortByKey、join、leftOuterJoin、cogroup
  • RDD行动算子
    • reduce、collect、count、first、take、takeOrdered、aggregate、fold、countByKey、save相关算子、foreach

RDD转换算子

value类型

map

函数声明:

1
def map[U: ClassTag](f: T => U): RDD[U]

函数说明:

将集合中的数据逐条进行映射转换。T表示集合中的数据类型,U表示转换后的数据类型。

前面提到RDD中有分区的概念。在分区内,数据是逐个执行相应逻辑的,只有当前面的数据全部逻辑执行完毕之后,才会执行下一个数据,即分区内数据的执行是有序的。但是不同分区在不同Executor上执行,执行没有严格的先后顺序,即分区间的执行是无序的

mapPartitions

函数声明:

1
2
3
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

函数说明:

将待处理的数据以分区为单位发送到计算节点进行处理。前面的map是分区内的每一条数据进行对应的处理,而mapPartitions是处理分区内的所有数据。

这里会将整个分区的数据加载到内存中进行引用,如果处理完的数据不被释放,会存在对象的引用。在内存较小,数据量较大的情况下,容易出现内存溢出的情况。

注意这里的返回数据类型,f函数接受一个迭代器类型,返回一个迭代器类型。下面的代码可以完成分区内最大值的查找,返回的最大值包装成一个列表,然后返回其迭代器

1
2
3
4
5
6
7
val rdd = sc.makeRDD(List[Int](0, 1, 2, 3, 4, 5, 6, 7, 8, 9), 4)
// [0, 1][2, 3, 4][5, 6][7, 8, 9]
val res = rdd.mapPartitions(
iter =>{
List(iter.max).iterator
}
)

map和mapPartitions的区别

从数据处理的角度来看:map算子是逐个处理分区中的数据,mapPartition算子是以分区为单位进行批处理操作

从功能的角度来看:map算子的主要目的是将数据源中的数据进行转换和改变,但是不会减少或增多数据;mapPartitions算子接受和返回迭代器类型

迭代器的注意事项:需要注意的是Scala中的迭代器具有懒执行的特性,只有需要的时候才会计算,这点在利用mapPartitions的时候尤其需要注意。通常情况下,我们会在mapPartitions中进行数据库连接之类的操作,然后对iter中的每一条进行操作,如下所示。但是这里使用map的话,由于懒执行,真正的执行是被推后了的,也就是说等到真正的执行,我们的数据库实际上已经关闭连接了,一因此这里需要使用foreach来真正进行执行。

1
2
3
4
5
6
mapPartitions(iter => {
// 连接数据库
// iter.map(...) 懒执行
// 应该使用iter.foreach(...)
// 断开连接
})

mapPartitionsWithIndex

函数声明:

1
2
3
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

函数说明:

类似于mapPartitions操作,将整个分区的数据发送给计算节点,不过同时可以获取到当前的索引。函数f的输入类型为一个元组,第一个元素即表示分区的索引,第二个元素迭代器即表示分区的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 代码
val rdd = sc.makeRDD(List[Int](0, 1, 2, 3, 4, 5, 6, 7, 8, 9), 4)
// [0, 1][2, 3, 4][5, 6][7, 8, 9]
val res = rdd.mapPartitionsWithIndex(
(index, data) => {
data.map((index, _))
}
)
res.collect().foreach(println)
// 输出内容如下
(0,0)
(0,1)
(1,2)
(1,3)
(1,4)
(2,5)
(2,6)
(3,7)
(3,8)
(3,9)

flatMap

函数声明:

1
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

函数说明:

首先对数据进行映射处理,之后再进行扁平化操作,因此该算子也称为扁平映射。

glom

函数声明:

1
def glom(): RDD[Array[T]]

函数说明:

将同一个分区的数据直接转换成相同类型的数组进行处理,分区不变。(注意这个算子是转换算子,而不是行动算子)

groupBy

函数声明:

1
2
3
def groupBy[K](
f: T => K,
numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

函数说明:

将数据按照指定的规则进行分组,打乱后的数据重新分组。返回的rdd是一个Key-Value类型的结果,其中相同key的数据在同一个分组中。groupBy可能发生shuffle操作,其中会满足同一个分组内的数据在一个分区当中,但是并不是说一个分区内只有一个组。在极限情况下,所有数据可能被分在同一个分区。

shuffle:前面的操作,默认情况下分区不变,指的是本来在一个分区的数据经过处理之后仍然在这个分区,但是groupBy会有一个打乱重新组合的过程,在这个过程中可能存在情况,数据原本在一个分区,经过处理之后被分配到另一个分区中,这个过程就叫做shuffle。我们需要知道跨分区的操作性能是要比分区内的操作低的,所以groupBy的性能较低。

filter

函数声明:

1
def filter(f: T => Boolean): RDD[T]

函数说明:

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。传入的f函数返回一个布尔变量,用于判断是否符合规则。当数据进行筛选过滤之后,分区不变,但是分区内的数据可能不均衡,容易出现数据倾斜。

sample

函数声明:

1
2
3
4
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]

函数说明:

按照指定的规则从数据集中抽取数据,具体的规则可以根据传入的参数进行确定。第一个参数表示是否放回,如果为true,则选择泊松算法;如果为false,则选择伯努利算法。两种算法都需要后面两个参数,但是含义不同

在有放回的泊松算法中,参数2表示每一个元素被抽取到的次数的数学期望,需要大于等于0,参数3表示随机数种子。

在无放回的伯努利算法中,参数2表示每一个元素被抽取到的概率,概率在[0, 1]之间,参数3表示随机数种子。

可以不给随机数种子,默认参数为当前系统时间的时间戳。

distinct

函数声明:

1
def distinct(): RDD[T]

函数说明:

将数据集中重复的数据去重。

集合的distinct VS. RDD算子中的distinct

distinct完成的功能都是完成数据的去重,但是具体的逻辑有所不同。在集合的distinct中,底层实现使用的是HashSet的结构;在RDD算子中的distinct中,底层使用的是连续的算子调用,核心逻辑如下,利用reduceByKey每次聚合只留其中一个重复。

1
map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)

coalesce

函数声明:

1
2
3
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null): RDD[T] = withScope

函数说明:

改变分区数量。第一个参数为目标分区数量,第二个参数shuffle表示是否经过shuffle过程,如果为false,则不存在打乱重新组合的过程。如果需要缩减分区数量,是否经过shuffle过程都可以完成,没有shuffle的话则相当于合并分区。如果需要扩大分区,则一定需要shuffle过程。

repartitions

函数声明:

1
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

函数说明:

重新分配分区,该操作内部其实执行的还是coalescs操作,并且其中的shuffle默认值为true。使用repartitions,无论是扩大分区还是缩减分区,都会经过shuffle过程。

1
coalesce(numPartitions, shuffle = true)

sortBy

函数声明:

1
2
3
4
5
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

函数说明:

该操作用于数据排序。在排序之前,可以将数据通过f函数进行处理,之后按照f函数的处理结果进行排序。默认为升序排序,排序后产生的新的RDD分区数目默认与原RDD的分区数一致,其中存在shuffle的过程。分区规则与前面提到的一致。

双value类型

intersection

函数声明:

1
def intersection(other: RDD[T]): RDD[T]

函数说明:

返回两个数据集合的交集

union

函数声明:

1
def union(other: RDD[T]): RDD[T]

函数说明:

返回两个数据集合的并集。这里的集合是广义上的集合,并集中可能存在重复元素。

subtract

函数声明:

1
def subtract(other: RDD[T]): RDD[T]

函数说明:

返回两个数据集合的差集。

zip

函数声明:

1
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

函数说明:

类似于拉链操作,但是与集合的拉链操作不同,zip算子要求两个数据源的分区数量保持一致,每个分区中的数据量也要保持一致。数据类型可以不一致。

上面的双value类型算子中,并、交、差集都要求两个数据源的数据类型保持一致,但是zip算子并不要求类型一致

Key-Value类型

在RDD类中并没有Key-Value类型的RDD算子,但是在Scala中存在隐式转换,转换成PairRDDFunctions类则可以找到对应的算子

partitionBy

函数声明:

1
def partitionBy(partitioner: Partitioner): RDD[(K, V)]

函数说明:

将数据按照指定Partitioner重新进行分区。Spark中默认的分区器是HashPartitioner。如果重分区的分区器和当前RDD的分区器一样,底层会有一个判断逻辑,不做任何操作。也可以自定义数据分区器进行数据分区。

reduceByKey

函数声明:

1
2
3
def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

函数说明:

将数据按照相同的Key,对Value进行聚合。注意这里为两两聚合,需要保持数据类型的一致。如果某个key的数据只有一个,那么是不会参与计算的。

groupByKey

函数声明:

1
2
3
4
5
def groupByKey(): RDD[(K, Iterable[V])]

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

函数说明:

根据数据源的数据,按照Key对Value进行分组。形成一个对偶元组,元组中的第一个元素为key,第二个元素为相同key的value集合

reduceByKeygroupByKey的区别:

从Shuffle的角度来看:reduceByKey和groupByKey都存在Shuffle的操作,但是reduceByKey可以在Shuffle之前对分区内相同Key的数据进行预聚合(combine)功能,这样可以减少落盘的数据量,性能较高。而groupByKey只是进行分组,不存在数据量减少的问题。

从功能的角度来看,reduceByKey其实包含了分组和聚合的功能,而groupByKey只能进行分组,不能聚合。

上面的落盘,指的是在执行过程中,程序在执行完毕之前无法判断自己的分组是否已经没有下一个元素,于是需要等待到执行完毕才能进行下一步的聚合和分区。如果数据量过大,则在内存中等待的数据可能导致内存溢出,因此数据会落盘等待。

aggregateByKey

函数声明:

1
2
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]

函数说明:

将数据根据不同的规则进行分区内计算和分区间计算。在进行分区间计算的时候还提供有初始值。

第一个参数列表中的参数为初始值,类型(泛型)为U;第二个参数列表中有两个参数,参数1表示分区内的计算规则,参数2表示分区间的计算规则。分区内的计算规则要求最后得到的类型为U,而分区间的计算规则最终也要得到U。aggregateByKey最终的返回数据结果应该和初始值的类型保持一致。(注意这里计算规则的类型指定)

foldByKey

函数声明:

1
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

函数说明:

功能类似于上面的aggregateByKey,当分区内计算规则和分区内计算规则相同的时候,aggregateByKey就可以简化为foldByKey。需要注意的是这里传入的初始值的数据类型和RDD中的数据类型一致

combineByKey

函数声明:

1
2
3
4
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]

函数说明:

最通用的对于Key-Value类型RDD进行聚集操作的函数,类似于aggregateByKey,但是combineByKey允许用户返回值的类型与输入不一致,并且不设置初始值。有三个重要参数:

  • createCombiner:将相同Key的第一个参数
  • mergeValue:分区内的计算规则
  • mergeCombiners:分区间的计算规则

不同聚集方法的区别和联系:

  • reduceByKey:相同的Key,第一个数据不进行任何计算,分区内和分区间的计算规则相同
  • foldByKey:相同的Key,第一个数据和初始值进行分区内计算,分区内和分区间的计算规则相同
  • aggregateByKey:相同的Key,第一个数据和初始值进行分区内计算,分区内和分区间的计算规则可以不同
  • combineByKey:相同的Key,如果第一个数据结构不满足要求,可以转换结构,分区内和分区间的计算规则可以不同

实际上,它们的底层都是combineByKeyWithClassTag,只是完成了不同程度的封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// reduceByKey:
combineByKeyWithClassTag[V](
(v: V) => v, // 第一个值不会参与计算
func, // 分区内计算规则
func, // 分区间计算规则
)

//aggregateByKey :
combineByKeyWithClassTag[U](
(v: V) => cleanedSeqOp(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
cleanedSeqOp, // 分区内计算规则
combOp, // 分区间计算规则
)

//foldByKey:
combineByKeyWithClassTag[V](
(v: V) => cleanedFunc(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
cleanedFunc, // 分区内计算规则
cleanedFunc, // 分区间计算规则
)

//combineByKey :
combineByKeyWithClassTag(
createCombiner, // 相同key的第一条数据进行的处理函数
mergeValue, // 表示分区内数据的处理函数
mergeCombiners, // 表示分区间数据的处理函数
)

sortByKey

函数声明:

1
2
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]

函数说明:

在Key-Value类型的RDD上调用,Key必须实现Ordered特征。调用函数返回按照Key排序的RDD

join

函数声明:

1
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

函数说明:

类似数据表之间的join操作,在类型为(K, V)(K, W)的RDD上调用,返回一个相同Key对应所有元素连接在一起的(K, (V, W))的RDD。

  • 如果Key存在不相同没法匹配,则在最后的结果中也不会出现。
  • 如果两个数据源中Key有多个相同的,会依次匹配,可能会出现笛卡尔积的现象

leftOuterJoin

函数声明:

1
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

函数说明:

类似SQL语句的左外连接,当然同时还有右外连接rightOuterJoin

cogroup

函数声明:

1
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

函数说明:

先组内分组,然后进行连接

1
2
3
4
5
val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2),("b", 8), ("c", 3)))
val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6), ("c", 7)))
val res = rdd1.cogroup(rdd2)

res.collect().foreach(println)

输出结果为:

1
2
3
(a,(CompactBuffer(1),CompactBuffer(4)))
(b,(CompactBuffer(2, 8),CompactBuffer(5)))
(c,(CompactBuffer(3),CompactBuffer(6, 7)))

RDD行动算子

行动算子

所谓的行动算子其实就是触发作业Job执行的方法。行动算子的底层代码会调用环境对象的runJob方法,创建ActiveJob并提交执行。

reduce

函数声明:

1
def reduce(f: (T, T) => T)

函数说明:

聚集RDD中所有的数据,先聚合分区内的数据,再聚合分区间的数据。注意这里的类型,参与运算的类型以及生成结果的类型都需要保持一致。

collect

函数声明:

1
def collect(): Array[T]

函数说明:

在驱动程序当中以数组Array的形式返回数据集的所有元素

count

函数声明:

1
def count(): Long

函数说明:

返回RDD中元素的个数

first

函数声明:

1
def first(): T

函数说明:

返回RDD中的第一个元素

take

函数声明:

1
def take(num: Int): Array[T]

函数说明:

返回一个由RDD的前n个元素组成的数组

takeOrdered

函数声明:

1
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

函数说明:

返回该RDD排序后的前n个元素组成的数组。默认为升序,如果需要降序,可以传入第二个参数,其中的Int表示比较时采用的泛型

1
rdd.takeOrdered(8)(Ordering.Int.reverse)

aggregate

函数声明:

1
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

函数说明:

分区的数据通过初始值与分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。可以指定分区内的聚合逻辑和分区间的聚合逻辑。

需要注意的是,这里的初始值会参与分区内计算,也会参与分区间计算,而在转换算子中类似的aggregateByKey,初始值只会参与分区内的计算。

fold

函数声明:

1
def fold(zeroValue: T)(op: (T, T) => T): T

函数说明:

上面aggregateByKey的简化操作,其中分区内和分区间的计算逻辑相同。

countByKey

函数声明:

1
def countByKey(): Map[K, Long] 

函数说明:

统计每种Key的个数

save相关算子

函数声明:

1
2
3
4
5
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_<:CompressionCodec]] = None): Unit

函数说明:

将数据保存至不同格式的文件中,上面的函数分别表示

  • 保存成文本文件
  • 序列化成对象保存在文件中
  • 保存成Sequencefile文件。这种方式要求数据的格式必须为Key-Value类型

foreach

函数声明:

1
2
3
4
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

函数说明:

分布式遍历RDD中的每个元素,调用指定的函数。注意这里和集合的foreach操作不同。集合的foreach操作其实是在节点中的内存集合的循环遍历,而foreach算子是在Executor端内存数据的循环遍历,是分布式的。分区内的执行有序,而分区间的执行无序。


Spark学习笔记-SparkCore(1)-RDD以及相关算子
http://example.com/2022/05/08/Spark学习笔记-SparkCore-1-RDD以及相关算子/
作者
EverNorif
发布于
2022年5月8日
许可协议