本文最后更新于:2022-05-10T21:07:16+08:00
累加器
现在我们考虑这样的应用场景,我们在Driver即算子的外部创建一个变量,然后在算子内部操作这个变量,但是我们还希望这个变量最终的结果能够返回给Driver。前面的闭包检测能够完成变量的传递,但是由于在不同的Executor上变量其实是一个副本,也无法完成数据的传回以及合并操作,因此就引出累加器的概念。
Spark中的累加器用来叭Executor端变量的信息聚合到Driver端。在Driver程序中定义的变量,每个Executor端的每个Task都会得到这个变量的一份新的副本,每个Task更新这些副本的值之后,传回Driver端进行merge操作,得到最终的结果。累加器又可以分为系统累加器和自定义累加器。
系统累加器:包括longAccumulator
,doubleAccumulator
,collectionAccumulator
1 2 3 4 5 6 7 8 9
| val rdd = sc.makeRDD(List(1, 2, 3, 4, 5), 3)
val sumAcc = sc.longAccumulator("sum")
rdd.foreach(sumAcc.add(_))
println(sumAcc.value)
|
累加器的执行可能会随着行动算子的执行而触发,可能会出现少加或者多加的情况。在一般情况下,累加器会放置在行动算子中进行操作。
- 少加:在转换算子中调用累加器,如果没有行动算子,则累加器也不会执行
- 多加:在转换算子中调用累加器,如果有多个行动算子,那么累加器也会执行多次
自定义累加器:我们可以自定义累加器,来实现自己的累加逻辑。自定义累加器需要继承AccumulatorV2
,设定泛型并重写其中的抽象方法,下面是WordCount的累加器实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Rdd_Test {
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List("hello", "spark", "hello", "scala")) val myAccumulator = new MyAccumulator() sc.register(myAccumulator) rdd.foreach(word => { myAccumulator.add(word) }) println(myAccumulator.value)
sc.stop() }
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] { private var myMap = mutable.Map[String, Int]()
override def isZero: Boolean = myMap.isEmpty
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new MyAccumulator()
override def reset(): Unit = myMap.clear()
override def add(v: String): Unit = { val newCount = myMap.getOrElse(v, 0) + 1 myMap.update(v, newCount) }
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = { val map1 = this.myMap val map2 = other.value
map2.foreach({ case (word, count) => { val newCount = map1.getOrElse(word, 0) + count map1.update(word, newCount) } }) }
override def value: mutable.Map[String, Int] = myMap } }
|
广播变量
前面我们反复提到在RDD算子内部的操作在Executor端完成,对应到分区的概念,而算子外部的操作在Driver端完成。并且我们也提到的闭包检测的概念,在算子内部的数据会发送给对应的分区。但是这里的数据分发是以分区为单位的,即以Task为单位。如果一个物理节点Executor上有多个分区,有多个Task,那么会存在数据冗余的现象。
如果每个分区上只需要一个可读的值,那么可以考虑使用广播变量。广播变量的分发是以Executor为单位的,而不是像闭包数据那样以分区任务为单位,这样就可以明显降低数据冗余。广播变量会将自己保存在Executor中的内存中。
1 2 3 4 5 6 7 8
| val list = List(1, 2, 3, 4, 5)
val broadcast = sc.broadcast(list)
rdd.foreach(_ => { println(broadcast.value) })
|