Spark学习笔记-SparkCore(4)-Spark核心运行机制

Spark核心组件

Spark的核心组件包括Driver和Executor。

Driver: Spark的驱动器节点,用于执行Spark任务中的main方法。在作业流程中,Driver主要负责:

  • 将用户程序转化为作业Job
  • 在Executor之间进行任务调度(Task)
  • 跟踪Executor的执行情况
  • 通过UI展示查询运行情况

Executor:负责在Spark作业中运行具体的任务Task,任务之间彼此独立。Spark应用启动的时候,ExecutorBackend节点同时启动,其中的Executor对象完成实际的运算工作。如果有ExecutorBackend节点发生故障或者崩溃,Spark应用会将出错节点上的任务调度到其他Executor节点上执行。Executor主要负责:

  • 负责运行组成Spark应用的任务,并将结果返回给驱动器(Driver)
  • 通过自身的块管理器为用户程序中要求缓存的RDD提供内存管理。要求缓存的RDD直接缓存在Executor进程中,在任务运行时可以充分利用缓存

Spark有多种部署模式,但是整体有一个通用的流程概述:

  1. 程序提交之后,启动Driver程序(Driver的位置可能因提交方式不同而不同)
  2. Driver向集群管理器注册应用程序
  3. 集群管理器根据任务的配置文件分配Executor并启动,Executor向Driver注册
  4. Driver中代码执行到行动算子时开始反向推导,根据宽依赖对作业Job进行划分,划分成多个阶段Stage,每个阶段对应一个TaskSet,其中包含多个Task。之后使用可用的Executor执行Task的执行
  5. 根据本地化原则,Task会被分发到指定的Executor去执行。在任务执行的过程中,Executor会不断与Driver进行通信,报告任务的运行情况

Spark部署模式

部署模式概览

Spark中支持多种部署模式,主要有以下几种:

  1. 本地模式:不使用集群,而是在本地通过启动多个线程的方式完成并行计算
  2. Standalone模式:使用Spark原生的集群管理器,可以单独部署到一个集群中而无需依赖其他的资源管理系统
  3. Yarn模式:使用Yarn资源管理框架进行资源的管理和调度。根据Driver在集群中位置的不同,又可以分为Yarn Cluster和Yarn Client模式
  4. Apache Mesos模式:Mesos是一个分布式资源管理框架,可以允许其他的框架部署在它之上。Spark在开发之初就考虑到支持Mesos,Spark运行在Mesos上会更加灵活。分为粗粒度模式和细粒度模式。
  5. K8S:容器部署模式

Yarn部署模式

根据Driver的位置不同,Yarn部署模式分为Yarn Cluster以及Yarn Client。在Yarn部署模式下,需要关注ResourceManager、NodeManager以及ApplicationMaster与Driver、Executor之间的关系。

Yarn Cluster模式

运行流程如下:

  1. 利用spark-submit脚本进行程序的提交,启动SparkSubmit的JVM进程
  2. SparkSubmit中的main方法调用YarnClusterApplication对象中的main方法
  3. YarnClusterApplication创建Yarn客户端用来向连接Yarn,向Yarn发送执行指令bin/java ApplicationMaster,即要求运行ApplicationMaster
  4. Yarn框架接收到指令,在对应的NodeManager中启动ApplicationMaster
  5. ApplicationMaster中启动Driver线程,执行用户的程序
  6. ApplicationMasterResourceManager注册,进行资源的申请,即获取到可用的NodeManager,在其上运行Container
  7. 获取资源之后,ApplicationMasterNodeManager发送指令bin/java YarnCoarseGrainedExecutorBackend,该进程用于与Driver的通信,注册已经启动的Executor进程,然后启动计算对象Executor等待接收任务
  8. Driver线程继续执行完成作业的调度和任务的执行,包括任务的阶段划分等
  9. Driver进行任务的分配,并监控任务的运行

注意:这里的SparkSubmitApplicationMasterCoarseGrainedExecutorBackend是独立的进程,Driver是独立的线程;Executor是运行在CoarseGrainedExecutorBackend中的对象。我们平常所说的Executor也可能指的是进程CoarseGrainedExecutorBackend

Yarn Client模式

Yarn Client的运行流程与Yarn Cluster类似,只是Driver线程在本地启动,向ResourceManager申请运行ExecutorLauncher。当然ExecutorLauncher实际上还是调用了ApplicationMaster的main方法。

运行流程如下:

  1. 利用spark-submit脚本进行程序的提交,启动SparkSubmit的JVM进程
  2. SparkSubmit中的main方法调用用户代码的main方法
  3. 创建Driver线程,执行用户的作业,并创建YarnClientScheduleBackend
  4. YarnClientScheduleBackend创建Yarn客户端用来向连接Yarn,向Yarn发送执行指令bin/java ExecutorLauncher
  5. Yarn框架接收到指令,在对应的NodeManager中启动ExecutorLauncher(实际上还是调用了ApplicationMaster的main方法)
  6. ApplicationMasterResourceManager注册,进行资源的申请
  7. 获取资源之后,ApplicationMasterNodeManager发送指令bin/java YarnCoarseGrainedExecutorBackend,该进程用于与Driver的通信,注册已经启动的Executor进程,然后启动计算对象Executor等待接收任务(此时的Driver是一个在提交处本地的线程)
  8. Driver线程继续执行完成作业的调度和任务的执行,包括任务的阶段划分等
  9. Driver进行任务的分配,并监控任务的运行

Standalone部署模式

在Standalone部署模式下,集群中有两个重要组成部分,分别是

  • Master:类似于Yarn中的ResourceManager,是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责
  • Worker:类似于Yarn中的NodeManager,是一个进程,一个Worker运行在集群中的一台服务器上,主要负责两个职责,一个是用自己的内存存储RDD的某些分区,另一个是启动其他进程和Executor线程,对RDD的分区进行并行计算

Standalone部署模式下同样有Cluster和Client模式,总体也与Yarn部署模式的运行机制比较类似。

Standalone Cluster模式

流程如下:

  1. 任务提交,Master找到一个Worker来启动Driver(Driver由集群中的Worker运行)
  2. Dirver启动后向Master注册,并申请启动Executor
  3. Master寻找可用资源,启动Worker,并在这些Worker上分配Executor
  4. Executor启动后向Driver反向注册
  5. Driver执行到行动算子后,开始划分阶段Stage,每个Stage生成对应的TaskSet,将Task分发到各个Executor上执行

Standalone Client模式

Client与Cluster的主要区别在于Driver的启动位置,在Client模式中,Driver在任务提交的本地机器上运行

流程如下:

  1. 任务提交,启动Driver(在提交任务的本地机器上)
  2. Driver启动后向Master注册,申请启动Executor
  3. Master寻找可用资源,启动Worker,并在这些Worker上分配Executor
  4. Executor启动后向Driver反向注册
  5. Driver执行到行动算子后,开始划分阶段Stage,每个Stage生成对应的TaskSet,将Task分发到各个Executor上执行

Spark应用执行

概述

我们首先需要明确几个概念,一个Spark应用程序包括Job、Stage以及Task三个概念:

  1. Job以行动算子为界,每遇到一个行动算子,则会触发一个Job
  2. Stage是Job的划分,以RDD宽依赖为界,每遇到一个Shuffle就进行一次划分
  3. Task是Stage的划分,以分区数来衡量。Stage中最后一个RDD的分区数即为Task的数量

Spark应用执行的过程主要是Driver的工作流程。

Driver线程首先进行SparkContext对象的初始化,准备运行所需的上下文,然后一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster来申请资源;另一方面,根据用户代码逻辑进行任务的调度,将任务下发到空闲的Executor。这个过程会涉及到阶段Stage的划分、任务Task的切分等。

当ResourceManager向ApplicationMaster返回Container资源时,ApplicationMaster就尝试在对应的Container上启动Executor进程,启动完成之后反向注册,注册成功之后保持与Driver的心跳,同时等待Driver分发任务。当分发的任务执行完毕之后,将任务状态上报给Driver。

一个RDD通过转换算子会生成新的RDD,从而形成了RDD血缘关系图。这是一个有向无环图,即DAG。通过行动算子的调用,触发生成Job并调度执行。在执行过程中,会使用到两个调度器,DAGSchedulerTaskScheduler

  • DAGScheduler:负责Stage级别的调度,主要是将Job切分成若干个Stage,并将每个Stage打包成TaskSet交给TaskScheduler进行调度
  • TaskScheduler:负责Task级别的调度,将DAGScheduler得到的TaskSet按照指定的调度策略分发到Executor上执行。调度过程中,ShcedulerBackend负责提供可用资源,其中的SchedulerBackend有多种实现,可以对接不同的资源管理系统

Driver初始化SparkContext过程中,会分别初始化DAGScheduler,TaskScheduler、SchedulerBackend以及HeartbeatReceiver,并启动SchedulerBackend和HeartbeatReceiver。

SchedulerBackend通过ApplicationMaster申请资源,并不断从TaskScheduler中拿到合适的Task分发到Executor上执行。

HeartbeatReceiver负责接收Executor的心跳信息,监控Executor的存活状况,并通知到TaskScheduler

Stage调度

DAGScheduler负责Stage级别的调度,主要是将Job切分成若干个Stage,并将每个Stage打包成TaskSet交给TaskScheduler进行调度

程序运行到行动算子,则会触发一个Job。Job交给DAGScheduler提交,它会根据RDD的血缘关系构成的DAG进行切分,将一个Job划分成若干个Stages。划分策略如下,基本上是一个深度优先搜索算法:

  1. 首先划分一个最终的Stage,称为ResultStage。它是由行动算子决定的。
  2. 由最终的RDD不断向前回溯,判断父依赖是否是宽依赖,即Shuffle依赖。如果是则继续划分一个Stage,称为ShuffleMapStage。它是由对应的Shuffle转换算子决定的

最终得到的阶段数目应该等于流程中Shuffle依赖的个数+1(一个ResultStage)。

Stage的运行是有先后顺序的,只有当前面的Stage运行完成之后,才能提交运行下一个Stage。Stage提交的时候会将Task信息序列化并打包成TaskSet交给TaskScheduler,其中的Task数量与当前Stage中最后一个RDD的分区有关,一个分区对应一个Task。

DAGScheduler会监控Stage的运行状态,如果Executor丢失,或者Task进行Fetch失败,则需要重新提交对应的Stage。其他情况,则会在TaskScheduler的调度过程中进行出错重试。(即前者情况下,Stage信息丢失,需要重新提交)

相对来说,DAGScheduler做的事情较为简单,包括在Stage层面划分,提交Stage以及监控其相关状态信息

Task调度

TaskScheduler负责Task级别的调度,将DAGScheduler得到的TaskSet按照指定的调度策略分发到Executor上执行。

TaskScheduler将TaskSet封装为TaskSetManager加入调度队列中。一个TaskSetManager负责监控和管理同一个Stage中的Task,而TaskScheduler也是以TaskSetManager为单位进行任务的调度。

前面提到在TaskScheduler初始化后会启动SchedulerBackend,它负责接收Executor的注册信息,并维护Executor的状态。SchedulerBackend会定时询问TaskScheduler是否需要执行任务。在接收到询问后,TaskScheduler按照指定的调度策略选择出TaskSetManager去执行。之后,TaskScheduler调用SchedulerBackend的方法,经过对应一系列处理之后得到可用资源,TaskSchduler基于这些资源来进行Task的运行。

TaskScheduler默认支持两种调度策略,FIFO以及FAIR策略。默认为FIFO策略。

在从调度队列中拿到TaskSetManager后,就需要按照一定的规则取出Task交给TaskScheduler,再由TaskScheduler交给SchedulerBackend,发到对应的Executor上执行。

在任务调度的过程中,还会涉及到一个本地化调度的概念,即确定每个分区应该在哪个Executor上运行。本地化调度指的是Spark倾向于以最好的本地化级别来调度Task,但是可能不是每个Task都能做到最好的本地化级别。假如当前Task无法以最好的本地化级别运行,这可能是因为对应的Executor繁忙,Spark会先等待一段时间,如果还不行就降低本地化级别,重复操作。具体的本地化原则调度原理如下,可以通过调整spark.locality相关参数来调整:

  1. 默认等待时间为3秒
  2. 若超时,则下降到下一个本地化级别重新分配
  3. 数据发生传输的时候,Task首先从被本地的BlockManager获取数据,若本地没有数据,则调用getRemote方法从数据所在节点的BlockManager获取数据,返回至该节点

本地化级别由高到低有下面的设置:

级别 说明
PROCESS_LOCAL 进程本地化,Task和数据在同一个Executor中。性能最好
NODE_LOCAL 节点本地化,Task和数据在同一个节点中,但是Task和数据不在同一Executor中,数据需要在进程间进行传输
RACK_LOACL 机架本地化,Task和数据在同一个机架的两个节点上,数据需要通过网络在节点之间传输
NO_PREF 对于Task来说,从哪里获取数据性能都相同,无偏好
ANY 数据在任意地方,性能最差

TaskScheduler还有失败重试和黑名单机制。简单来说,TaskScheduler会监控Task的执行状态,对于失败的Task,会记录它失败的次数,如果失败次数没有超过最大重试次数,就将其放回调度队列中,否则整个Application时报。在记录失败Task的时候,会记录它上次失败所在的Executor以及节点位置,下次再进行调度的时候,会使用黑名单机制,避免它被调度到上次失败的节点上,起到一定的容错作用。

Spark Shuffle

概述

Spark Shuffle发生在ShuffleMapStage中。该阶段的结束伴随着Shuffle文件的写磁盘。宽依赖上游RDD一个分区数据会进入下游RDD的多个分区,数据发生移动,称之为Shuffle。Shuffle一般分为两个阶段,一个是产生Shuffle数据的阶段,即上游RDD写数据到磁盘上;第二个是使用Shuffle数据的阶段,即后续的RDD不同分区获取对应的数据。Map阶段和Reduce阶段通过生产与消费Shuffle中间文件的方式,来完成集群范围内的数据交换。

Shuffle Write: Shuffle写入临时文件的过程叫做Shuffle Write,Spark目前支持三种Writer,分别为SortShuffleWriterBypassMergeSortShuffleWriterUnsafeShuffleWriter

Shuffle Read: Shuffle拉取数据的过程叫做Shuffle Read。对于Map Task生成的中间文件,Reduce Task需要通过网络从不同节点拉取属于自己的数据内容。Shuffle Reader的实现被封装在BlockStoreShuffleReader

Shuffle演变

Spark中的Shuffle是一个不断演变的过程。在Spark初始版本中,引入Hash Based Shuffle,之后基于Hash Shuffle引入文件合并机制,对Hash Based Shuffle进行改进。之后在Spark 1.1版本中引入Sort Based Shuffle,1.4引入Tungsten-Sort Based Shuffle。在Spark 2.0版本之后,仅支持Sort和Tungsten-Sort两种Shuffle方式,不再支持基于Hash的Shuffle方式

Hash Based Shuffle

在Hash Based Shuffle中,每个分区上,MapTask会根据Reducer的数量创建出对应数量的文件假定为x,假如一共有y个MapTask,则会创建出\(x \times y\)个中间文件。可以看出,在这种方式下,生成的小文件太多,对文件系统的压力很大,并且也不利于IO吞吐,会严重影响性能。

针对小文件的问题,Spark基于Hash Shuffle引入了文件合并File Consolidation机制,得到优化后的Hash Shuffle。优化的Hash Based Shuffle主要思想就是通过共同输出文件以降低文件数,将在同一个CPU核心上运行的多个MapTask的输出合并到统一文件,这样一个Core输出的文件个数就是x个。在同一个Core上先后运行的两个MapTask的输出对应到同一个文件不同的segment,称为一个FileSegment。

Sort Based Shuffle

虽然Hash Shuffle引入了文件合并机制,但是还是无法从根本上解决文件数过多的问题,于是引入Sort Based Shuffle。Sort Shuffle有三种运行机制,普通运行机制、bypass机制以及Tungsten 运行机制,分别对应三种不同的Shuffle。

普通模式的工作原理类似于MapReduce中的Shuffle过程。在普通模式下,每个MapTask会先将数据写入内存结构中,当达到某个临界阈值之后,就会将内存中的数据溢写到磁盘上。在溢写之前会对数据进行排序,分批写入磁盘文件。在一个Task将数据写入内存的过程中,会出现多次的磁盘溢写操作,也就会产生多个临时文件,最终会将所有临时文件进行合并,得到一个最终的磁盘文件。最终得到的文件,按照parittionId从小到大排序。

由于一个Task对应一个磁盘文件,为下游Stage不同Task(下游RDD不同分区)准备的数据都在这一个文件中,所以还会单独写一份索引文件,用来标识下游各个Task的数据在文件中的位置(start offset和end offset)

bypass Sort Based Shuffle

在Reducer端任务数比较少的情况下,Hash Based Shuffle的效率明显高于Sort Based Shuffle,因此基于Sort Shuffle,Spark还提供了一个回退方案,就是bypass运行机制。它的思想与Hash Based Shuffle类似,每个MapTask会为下游的每个ReduceTask生成一个对应的临时磁盘文件,唯一的区别是在最后会将这些文件进行合并,同时生成索引文件,标识对应的位置。

相比于Hash Based Shuffle,bypass机制提供了更少的最终磁盘文件,Shuffle read的性能更好。

相比于普通的Sort Based Shuffle,bypass机制为每个ReduceTask生成一个临时文件,只在最后进行一次合并,中间没有排序的操作,也就节省了这部分的性能开销。但是由于会为每个ReduceTask(分区)分配一个临时文件,如果ReduceTask过多的话,会对文件系统造成很大压力,因此bypass机制的触发条件如下:

  1. shuffle reduce task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值,默认为200
  2. 不能有map端的聚合(例如reduceByKey,因为它对于临时文件来说)

Tungsten-Sort Based Shuffle

从Spark1.5开始,Spark启动钨丝计划,目的是优化内存和CPU的使用,从而进一步提高Spark的性能。由于需要基于JDK Unsafe API来使用堆外内存,因此Tungsten-Sort Based Shuffle又称为Unsafe Shuffle。

该Shuffle的做法类似于Sort Based Shuffle,但是将数据记录使用二进制的方式进行存储,直接在序列化的二进制数据上Sort,而不是在Java对象上。这样一方面能够减少内存的使用和GC的开销;另一方面也可以避免Shuffle过程中频繁的序列化和反序列化。

不过使用Tungsten-Sort Based Shuffle有几个限制条件:

  1. Shuffle map阶段不能有聚合操作
  2. 分区数不能超过一定大小(\(2^{24}-1\),这是24bit的partitionID的最大表示范围)

而从Spark 1.6开始,把Sort Shuffle和Tungsten-Sort Based Shuffle全部统一到了Sort Shuffle中,如果检测到满足Tungsten-Sort Based Shuffle条件,会自动采用Tungsten-Sort Based Shuffle,否则采用Sort Shuffle。

Sort Based Shuffle的优缺点:

优点:

  • 小文件的数量大量减少,在Mapper端的内存占用变少
  • 使得Spark不仅可以处理小规模的数据,对于大规模的数据也不会很容易达到性能瓶颈

缺点:

  • 强制在Mapper端进行排序,即使数据本身可能并不需要排序,导致性能损耗

Spark 内存管理

概述

作为一个JVM进程,Executor的内存管理建立在JVM的内存管理之上。Spark对堆内(On-Heap)空间进行更加详细的分配,同时,Spark引入了堆外(Off-Heap)内存,使Spark可以直接在系统内存中开辟空间,进一步优化了内存的使用。堆内内存受到JVM的统一管理,我们能够控制的程度有限;堆外内存直接向操作系统进行内存的申请和释放,我们能够控制的程度较高

堆内内存大小的配置可以通过启动时配置-executor-memory,也可以在配置参数spark.executor.memory中指定。Executor中运行的Task共享堆内内存,其中又被划分成几个部分:

  • 存储内存:Task缓存RDD数据以及广播数据的时候占用的内存
  • 执行内存:任务在执行Shuffle的时候占用的内存
  • 其他内存:对象实例占用的空间等

堆内内存的管理主要还是通过JVM来完成的,我们无法精确控制堆内内存的申请和释放,但是Spark通过对存储内存和执行内存各自独立的规划管理,决定是否要在存储内存中缓存新的RDD,以及是否为新的任务分配执行内存,一定程度上提高了内存的利用率。

而为了进一步优化内存的使用,提高Shuffle时的效率,Spark引入了堆外内存,使得Spark可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。堆外内存直接受到操作系统管理,减少了不必要的内存开销以及频繁的GC扫描与回收,提升了性能。堆外内存可以被精确的申请和释放。(利用JDK Unsafe API)

在默认情况下,堆外内存并不启用,可以通过配置spark.memory.offHeap.enabled参数启用,并由参数spark.memory.offHeap.size设定堆外空间的大小。

内存空间分配

静态内存管理

Spark最初采用静态内存管理机制,存储内存、执行内存和其他内存的大小在应用程序运行期间保持固定,用户在程序启动之前可以进行配置。

堆内内存划分如下:

堆内内存主要分为三部分,存储内存、执行内存以及其他内存。

  • 存储内存:默认占系统内存的60%,该数值对应spark.storage.memoryFraction。存储内存中实际使用的部分并不是全部,预留部分空间防止OOM。实际可以使用的部分所占的比例默认为0.9,该数值对应spark.storage.safetyFraction
  • 执行内存:默认占系统内存的20%,该数值对应spark.shuffle.memoryFraction。同存储内存一样,执行内存也预留了一部分空间,实际使用的空间所占比例默认为0.8,该数值对应spark.shuffle.safetyFraction
  • 其他内存:剩余部分即为其他内存

存储内存和执行内存预留空间的目的是防止OOM,因为Spark堆内内存的大小记录是不准确的,需要留出保险空间。

对于存储内存和执行内存的预留部分,是一种逻辑上的规划,在具体使用的时候,Spark并没有区别对待,而是和其他内存一样,交给JVM进行管理

堆外内存的分配则较为简单,只划分为存储内存和执行内存。默认存储内存占用50%,该数值对应spark.memory.storageFraction,剩余空间则为执行内存。由于堆外内存占用的空间可以被精确计算,所以不需要再预留空间保险。

静态内存管理机制实现较为简单,但是如果用户不熟悉Spark的存储机制,没有根据具体的数据规模和计算任务做相应的配置,很容易影响性能。Spark后续引入了新的内存管理机制,而处于兼容的目的,仍然保留的静态内存管理机制

统一内存管理

Spark1.6之后引入统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲空间。堆内存划分如下:

主要内存划分如下:

  • 可用内存(Usable Memory):等于系统内存减去预留内存
    • 统一内存:默认占可用内存的60%,其中包括了存储内存和执行内存
      • 存储内存:默认占统一内存的0.5,数值对应spark.stroage.storageFraction
      • 执行内存:默认占统一内存的0.5
    • 其他内存:默认占可用内存的40%
  • 预留内存(Reserved Memory):默认为300M

该机制中最重要的优化在于动态占用机制:

  1. 设置了基本的存储内存和执行内存区域,由spark.storage.storageFraction参数确定
  2. 双方的空间都不足时,则存储到磁盘;若己方空间不足而对方空间剩余时,则可以借用对方的空间
  3. 执行内存的空间被占用后,可以要求对方归还借用的空间
  4. 存储内存的空间被占用后,无法让对方归还借用的空间(主要是考虑到Shuffle过程中的较多因素,实现起来比较复杂)

参考文章

  1. 【Spark重点难点】你以为的Shuffle和真正的Shuffle - 王知无 - 博客园 (cnblogs.com)
  2. 彻底搞懂spark的shuffle过程(shuffle write)_大跃ET的博客-CSDN博客
  3. Spark的shuffle过程为什么要排序? - 知乎 (zhihu.com)

Spark学习笔记-SparkCore(4)-Spark核心运行机制
http://example.com/2022/07/21/Spark学习笔记-SparkCore-4-Spark核心运行机制/
作者
EverNorif
发布于
2022年7月21日
许可协议