Spark学习笔记-SparkCore(3)-累加器和广播变量

累加器

现在我们考虑这样的应用场景,我们在Driver即算子的外部创建一个变量,然后在算子内部操作这个变量,但是我们还希望这个变量最终的结果能够返回给Driver。前面的闭包检测能够完成变量的传递,但是由于在不同的Executor上变量其实是一个副本,也无法完成数据的传回以及合并操作,因此就引出累加器的概念。

Spark中的累加器用来叭Executor端变量的信息聚合到Driver端。在Driver程序中定义的变量,每个Executor端的每个Task都会得到这个变量的一份新的副本,每个Task更新这些副本的值之后,传回Driver端进行merge操作,得到最终的结果。累加器又可以分为系统累加器和自定义累加器。

系统累加器:包括longAccumulatordoubleAccumulatorcollectionAccumulator

1
2
3
4
5
6
7
8
9
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5), 3)

// 获取系统累加器,后面传入的name参数是该累加器在UI界面的标识符
val sumAcc = sc.longAccumulator("sum")

rdd.foreach(sumAcc.add(_))

// 获取累加器的值
println(sumAcc.value)

累加器的执行可能会随着行动算子的执行而触发,可能会出现少加或者多加的情况。在一般情况下,累加器会放置在行动算子中进行操作。

  • 少加:在转换算子中调用累加器,如果没有行动算子,则累加器也不会执行
  • 多加:在转换算子中调用累加器,如果有多个行动算子,那么累加器也会执行多次

自定义累加器:我们可以自定义累加器,来实现自己的累加逻辑。自定义累加器需要继承AccumulatorV2,设定泛型并重写其中的抽象方法,下面是WordCount的累加器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Rdd_Test {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)

val rdd = sc.makeRDD(List("hello", "spark", "hello", "scala"))
// 创建自定义累加器
val myAccumulator = new MyAccumulator()
// 向Spark进行注册
sc.register(myAccumulator)
// 在算子中调用自定义累加器
rdd.foreach(word => {
myAccumulator.add(word)
})
// 输出累加器结果
println(myAccumulator.value)

sc.stop()
}

// 自定义累加器,继承AccumulatorV2,确定泛型,实现抽象方法
// 泛型IN:累加器输入的数据类型
// 泛型OUT:累加器输出的数据类型
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {
private var myMap = mutable.Map[String, Int]()

// 判断累加器是否是初始状态
override def isZero: Boolean = myMap.isEmpty

// 累加器的复制
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new MyAccumulator()

// 重置累加器
override def reset(): Unit = myMap.clear()

// 自定义累加逻辑
override def add(v: String): Unit = {
val newCount = myMap.getOrElse(v, 0) + 1
myMap.update(v, newCount)
}

// 不同分区的累加器合并(将other合并到自己)
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
val map1 = this.myMap
val map2 = other.value

map2.foreach({
case (word, count) => {
val newCount = map1.getOrElse(word, 0) + count
map1.update(word, newCount)
}
})
}

// 返回累加器的值
override def value: mutable.Map[String, Int] = myMap
}
}

广播变量

前面我们反复提到在RDD算子内部的操作在Executor端完成,对应到分区的概念,而算子外部的操作在Driver端完成。并且我们也提到的闭包检测的概念,在算子内部的数据会发送给对应的分区。但是这里的数据分发是以分区为单位的,即以Task为单位。如果一个物理节点Executor上有多个分区,有多个Task,那么会存在数据冗余的现象。

如果每个分区上只需要一个可读的值,那么可以考虑使用广播变量。广播变量的分发是以Executor为单位的,而不是像闭包数据那样以分区任务为单位,这样就可以明显降低数据冗余。广播变量会将自己保存在Executor中的内存中。

1
2
3
4
5
6
7
8
val list = List(1, 2, 3, 4, 5)
// 广播变量的声明
val broadcast = sc.broadcast(list)

// 广播变量的使用
rdd.foreach(_ => {
println(broadcast.value)
})

Spark学习笔记-SparkCore(3)-累加器和广播变量
http://example.com/2022/05/10/Spark学习笔记-SparkCore-3-累加器和广播变量/
作者
EverNorif
发布于
2022年5月10日
许可协议