Spark学习笔记-SparkCore(4)-Spark核心运行机制
Spark核心组件
Spark的核心组件包括Driver和Executor。
Driver: Spark的驱动器节点,用于执行Spark任务中的main方法。在作业流程中,Driver主要负责:
- 将用户程序转化为作业Job
- 在Executor之间进行任务调度(Task)
- 跟踪Executor的执行情况
- 通过UI展示查询运行情况
Executor:负责在Spark作业中运行具体的任务Task,任务之间彼此独立。Spark应用启动的时候,ExecutorBackend节点同时启动,其中的Executor对象完成实际的运算工作。如果有ExecutorBackend节点发生故障或者崩溃,Spark应用会将出错节点上的任务调度到其他Executor节点上执行。Executor主要负责:
- 负责运行组成Spark应用的任务,并将结果返回给驱动器(Driver)
- 通过自身的块管理器为用户程序中要求缓存的RDD提供内存管理。要求缓存的RDD直接缓存在Executor进程中,在任务运行时可以充分利用缓存
Spark有多种部署模式,但是整体有一个通用的流程概述:
- 程序提交之后,启动Driver程序(Driver的位置可能因提交方式不同而不同)
- Driver向集群管理器注册应用程序
- 集群管理器根据任务的配置文件分配Executor并启动,Executor向Driver注册
- Driver中代码执行到行动算子时开始反向推导,根据宽依赖对作业Job进行划分,划分成多个阶段Stage,每个阶段对应一个TaskSet,其中包含多个Task。之后使用可用的Executor执行Task的执行
- 根据本地化原则,Task会被分发到指定的Executor去执行。在任务执行的过程中,Executor会不断与Driver进行通信,报告任务的运行情况
Spark部署模式
部署模式概览
Spark中支持多种部署模式,主要有以下几种:
- 本地模式:不使用集群,而是在本地通过启动多个线程的方式完成并行计算
- Standalone模式:使用Spark原生的集群管理器,可以单独部署到一个集群中而无需依赖其他的资源管理系统
- Yarn模式:使用Yarn资源管理框架进行资源的管理和调度。根据Driver在集群中位置的不同,又可以分为Yarn Cluster和Yarn Client模式
- Apache Mesos模式:Mesos是一个分布式资源管理框架,可以允许其他的框架部署在它之上。Spark在开发之初就考虑到支持Mesos,Spark运行在Mesos上会更加灵活。分为粗粒度模式和细粒度模式。
- K8S:容器部署模式
Yarn部署模式
根据Driver的位置不同,Yarn部署模式分为Yarn Cluster以及Yarn Client。在Yarn部署模式下,需要关注ResourceManager、NodeManager以及ApplicationMaster与Driver、Executor之间的关系。
Yarn Cluster模式
运行流程如下:
- 利用
spark-submit
脚本进行程序的提交,启动SparkSubmit
的JVM进程 SparkSubmit
中的main方法调用YarnClusterApplication
对象中的main方法YarnClusterApplication
创建Yarn客户端用来向连接Yarn,向Yarn发送执行指令bin/java ApplicationMaster
,即要求运行ApplicationMaster
- Yarn框架接收到指令,在对应的
NodeManager
中启动ApplicationMaster
- 在
ApplicationMaster
中启动Driver线程,执行用户的程序 ApplicationMaster
向ResourceManager
注册,进行资源的申请,即获取到可用的NodeManager
,在其上运行Container- 获取资源之后,
ApplicationMaster
向NodeManager
发送指令bin/java YarnCoarseGrainedExecutorBackend
,该进程用于与Driver的通信,注册已经启动的Executor进程,然后启动计算对象Executor等待接收任务 - Driver线程继续执行完成作业的调度和任务的执行,包括任务的阶段划分等
- Driver进行任务的分配,并监控任务的运行
注意:这里的
SparkSubmit
、ApplicationMaster
、CoarseGrainedExecutorBackend
是独立的进程,Driver是独立的线程;Executor
是运行在CoarseGrainedExecutorBackend
中的对象。我们平常所说的Executor也可能指的是进程CoarseGrainedExecutorBackend
Yarn Client模式
Yarn Client的运行流程与Yarn
Cluster类似,只是Driver线程在本地启动,向ResourceManager
申请运行ExecutorLauncher
。当然ExecutorLauncher
实际上还是调用了ApplicationMaster
的main方法。
运行流程如下:
- 利用
spark-submit
脚本进行程序的提交,启动SparkSubmit
的JVM进程 SparkSubmit
中的main方法调用用户代码的main方法- 创建Driver线程,执行用户的作业,并创建
YarnClientScheduleBackend
YarnClientScheduleBackend
创建Yarn客户端用来向连接Yarn,向Yarn发送执行指令bin/java ExecutorLauncher
- Yarn框架接收到指令,在对应的
NodeManager
中启动ExecutorLauncher
(实际上还是调用了ApplicationMaster
的main方法) ApplicationMaster
向ResourceManager
注册,进行资源的申请- 获取资源之后,
ApplicationMaster
向NodeManager
发送指令bin/java YarnCoarseGrainedExecutorBackend
,该进程用于与Driver的通信,注册已经启动的Executor进程,然后启动计算对象Executor等待接收任务(此时的Driver是一个在提交处本地的线程) - Driver线程继续执行完成作业的调度和任务的执行,包括任务的阶段划分等
- Driver进行任务的分配,并监控任务的运行
Standalone部署模式
在Standalone部署模式下,集群中有两个重要组成部分,分别是
- Master:类似于Yarn中的ResourceManager,是一个进程,主要负责资源的调度和分配,并进行集群的监控等职责
- Worker:类似于Yarn中的NodeManager,是一个进程,一个Worker运行在集群中的一台服务器上,主要负责两个职责,一个是用自己的内存存储RDD的某些分区,另一个是启动其他进程和Executor线程,对RDD的分区进行并行计算
Standalone部署模式下同样有Cluster和Client模式,总体也与Yarn部署模式的运行机制比较类似。
Standalone Cluster模式
流程如下:
- 任务提交,Master找到一个Worker来启动Driver(Driver由集群中的Worker运行)
- Dirver启动后向Master注册,并申请启动Executor
- Master寻找可用资源,启动Worker,并在这些Worker上分配Executor
- Executor启动后向Driver反向注册
- Driver执行到行动算子后,开始划分阶段Stage,每个Stage生成对应的TaskSet,将Task分发到各个Executor上执行
Standalone Client模式
Client与Cluster的主要区别在于Driver的启动位置,在Client模式中,Driver在任务提交的本地机器上运行
流程如下:
- 任务提交,启动Driver(在提交任务的本地机器上)
- Driver启动后向Master注册,申请启动Executor
- Master寻找可用资源,启动Worker,并在这些Worker上分配Executor
- Executor启动后向Driver反向注册
- Driver执行到行动算子后,开始划分阶段Stage,每个Stage生成对应的TaskSet,将Task分发到各个Executor上执行
Spark应用执行
概述
我们首先需要明确几个概念,一个Spark应用程序包括Job、Stage以及Task三个概念:
- Job以行动算子为界,每遇到一个行动算子,则会触发一个Job
- Stage是Job的划分,以RDD宽依赖为界,每遇到一个Shuffle就进行一次划分
- Task是Stage的划分,以分区数来衡量。Stage中最后一个RDD的分区数即为Task的数量
Spark应用执行的过程主要是Driver的工作流程。
Driver线程首先进行SparkContext对象的初始化,准备运行所需的上下文,然后一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster来申请资源;另一方面,根据用户代码逻辑进行任务的调度,将任务下发到空闲的Executor。这个过程会涉及到阶段Stage的划分、任务Task的切分等。
当ResourceManager向ApplicationMaster返回Container资源时,ApplicationMaster就尝试在对应的Container上启动Executor进程,启动完成之后反向注册,注册成功之后保持与Driver的心跳,同时等待Driver分发任务。当分发的任务执行完毕之后,将任务状态上报给Driver。
一个RDD通过转换算子会生成新的RDD,从而形成了RDD血缘关系图。这是一个有向无环图,即DAG。通过行动算子的调用,触发生成Job并调度执行。在执行过程中,会使用到两个调度器,DAGScheduler
和TaskScheduler
- DAGScheduler:负责Stage级别的调度,主要是将Job切分成若干个Stage,并将每个Stage打包成TaskSet交给TaskScheduler进行调度
- TaskScheduler:负责Task级别的调度,将DAGScheduler得到的TaskSet按照指定的调度策略分发到Executor上执行。调度过程中,ShcedulerBackend负责提供可用资源,其中的SchedulerBackend有多种实现,可以对接不同的资源管理系统
Driver初始化SparkContext过程中,会分别初始化DAGScheduler,TaskScheduler、SchedulerBackend以及HeartbeatReceiver,并启动SchedulerBackend和HeartbeatReceiver。
SchedulerBackend通过ApplicationMaster申请资源,并不断从TaskScheduler中拿到合适的Task分发到Executor上执行。
HeartbeatReceiver负责接收Executor的心跳信息,监控Executor的存活状况,并通知到TaskScheduler
Stage调度
DAGScheduler负责Stage级别的调度,主要是将Job切分成若干个Stage,并将每个Stage打包成TaskSet交给TaskScheduler进行调度
程序运行到行动算子,则会触发一个Job。Job交给DAGScheduler提交,它会根据RDD的血缘关系构成的DAG进行切分,将一个Job划分成若干个Stages。划分策略如下,基本上是一个深度优先搜索算法:
- 首先划分一个最终的Stage,称为ResultStage。它是由行动算子决定的。
- 由最终的RDD不断向前回溯,判断父依赖是否是宽依赖,即Shuffle依赖。如果是则继续划分一个Stage,称为ShuffleMapStage。它是由对应的Shuffle转换算子决定的
最终得到的阶段数目应该等于流程中Shuffle依赖的个数+1(一个ResultStage)。
Stage的运行是有先后顺序的,只有当前面的Stage运行完成之后,才能提交运行下一个Stage。Stage提交的时候会将Task信息序列化并打包成TaskSet交给TaskScheduler,其中的Task数量与当前Stage中最后一个RDD的分区有关,一个分区对应一个Task。
DAGScheduler会监控Stage的运行状态,如果Executor丢失,或者Task进行Fetch失败,则需要重新提交对应的Stage。其他情况,则会在TaskScheduler的调度过程中进行出错重试。(即前者情况下,Stage信息丢失,需要重新提交)
相对来说,DAGScheduler做的事情较为简单,包括在Stage层面划分,提交Stage以及监控其相关状态信息
Task调度
TaskScheduler负责Task级别的调度,将DAGScheduler得到的TaskSet按照指定的调度策略分发到Executor上执行。
TaskScheduler将TaskSet封装为TaskSetManager加入调度队列中。一个TaskSetManager负责监控和管理同一个Stage中的Task,而TaskScheduler也是以TaskSetManager为单位进行任务的调度。
前面提到在TaskScheduler初始化后会启动SchedulerBackend,它负责接收Executor的注册信息,并维护Executor的状态。SchedulerBackend会定时询问TaskScheduler是否需要执行任务。在接收到询问后,TaskScheduler按照指定的调度策略选择出TaskSetManager去执行。之后,TaskScheduler调用SchedulerBackend的方法,经过对应一系列处理之后得到可用资源,TaskSchduler基于这些资源来进行Task的运行。
TaskScheduler默认支持两种调度策略,FIFO以及FAIR策略。默认为FIFO策略。
在从调度队列中拿到TaskSetManager后,就需要按照一定的规则取出Task交给TaskScheduler,再由TaskScheduler交给SchedulerBackend,发到对应的Executor上执行。
在任务调度的过程中,还会涉及到一个本地化调度的概念,即确定每个分区应该在哪个Executor上运行。本地化调度指的是Spark倾向于以最好的本地化级别来调度Task,但是可能不是每个Task都能做到最好的本地化级别。假如当前Task无法以最好的本地化级别运行,这可能是因为对应的Executor繁忙,Spark会先等待一段时间,如果还不行就降低本地化级别,重复操作。具体的本地化原则调度原理如下,可以通过调整spark.locality
相关参数来调整:
- 默认等待时间为3秒
- 若超时,则下降到下一个本地化级别重新分配
- 数据发生传输的时候,Task首先从被本地的BlockManager获取数据,若本地没有数据,则调用getRemote方法从数据所在节点的BlockManager获取数据,返回至该节点
本地化级别由高到低有下面的设置:
级别 | 说明 |
---|---|
PROCESS_LOCAL | 进程本地化,Task和数据在同一个Executor中。性能最好 |
NODE_LOCAL | 节点本地化,Task和数据在同一个节点中,但是Task和数据不在同一Executor中,数据需要在进程间进行传输 |
RACK_LOACL | 机架本地化,Task和数据在同一个机架的两个节点上,数据需要通过网络在节点之间传输 |
NO_PREF | 对于Task来说,从哪里获取数据性能都相同,无偏好 |
ANY | 数据在任意地方,性能最差 |
TaskScheduler还有失败重试和黑名单机制。简单来说,TaskScheduler会监控Task的执行状态,对于失败的Task,会记录它失败的次数,如果失败次数没有超过最大重试次数,就将其放回调度队列中,否则整个Application时报。在记录失败Task的时候,会记录它上次失败所在的Executor以及节点位置,下次再进行调度的时候,会使用黑名单机制,避免它被调度到上次失败的节点上,起到一定的容错作用。
Spark Shuffle
概述
Spark Shuffle发生在ShuffleMapStage中。该阶段的结束伴随着Shuffle文件的写磁盘。宽依赖上游RDD一个分区数据会进入下游RDD的多个分区,数据发生移动,称之为Shuffle。Shuffle一般分为两个阶段,一个是产生Shuffle数据的阶段,即上游RDD写数据到磁盘上;第二个是使用Shuffle数据的阶段,即后续的RDD不同分区获取对应的数据。Map阶段和Reduce阶段通过生产与消费Shuffle中间文件的方式,来完成集群范围内的数据交换。
Shuffle Write: Shuffle写入临时文件的过程叫做Shuffle
Write,Spark目前支持三种Writer,分别为SortShuffleWriter
、BypassMergeSortShuffleWriter
和UnsafeShuffleWriter
Shuffle Read: Shuffle拉取数据的过程叫做Shuffle
Read。对于Map Task生成的中间文件,Reduce
Task需要通过网络从不同节点拉取属于自己的数据内容。Shuffle
Reader的实现被封装在BlockStoreShuffleReader
中
Shuffle演变
Spark中的Shuffle是一个不断演变的过程。在Spark初始版本中,引入Hash Based Shuffle,之后基于Hash Shuffle引入文件合并机制,对Hash Based Shuffle进行改进。之后在Spark 1.1版本中引入Sort Based Shuffle,1.4引入Tungsten-Sort Based Shuffle。在Spark 2.0版本之后,仅支持Sort和Tungsten-Sort两种Shuffle方式,不再支持基于Hash的Shuffle方式
Hash Based Shuffle
在Hash Based
Shuffle中,每个分区上,MapTask会根据Reducer的数量创建出对应数量的文件假定为x
,假如一共有y
个MapTask,则会创建出\(x \times
y\)个中间文件。可以看出,在这种方式下,生成的小文件太多,对文件系统的压力很大,并且也不利于IO吞吐,会严重影响性能。
针对小文件的问题,Spark基于Hash Shuffle引入了文件合并File
Consolidation机制,得到优化后的Hash Shuffle。优化的Hash Based
Shuffle主要思想就是通过共同输出文件以降低文件数,将在同一个CPU核心上运行的多个MapTask的输出合并到统一文件,这样一个Core输出的文件个数就是x
个。在同一个Core上先后运行的两个MapTask的输出对应到同一个文件不同的segment,称为一个FileSegment。
Sort Based Shuffle
虽然Hash Shuffle引入了文件合并机制,但是还是无法从根本上解决文件数过多的问题,于是引入Sort Based Shuffle。Sort Shuffle有三种运行机制,普通运行机制、bypass机制以及Tungsten 运行机制,分别对应三种不同的Shuffle。
普通模式的工作原理类似于MapReduce中的Shuffle过程。在普通模式下,每个MapTask会先将数据写入内存结构中,当达到某个临界阈值之后,就会将内存中的数据溢写到磁盘上。在溢写之前会对数据进行排序,分批写入磁盘文件。在一个Task将数据写入内存的过程中,会出现多次的磁盘溢写操作,也就会产生多个临时文件,最终会将所有临时文件进行合并,得到一个最终的磁盘文件。最终得到的文件,按照parittionId从小到大排序。
由于一个Task对应一个磁盘文件,为下游Stage不同Task(下游RDD不同分区)准备的数据都在这一个文件中,所以还会单独写一份索引文件,用来标识下游各个Task的数据在文件中的位置(start offset和end offset)
bypass Sort Based Shuffle
在Reducer端任务数比较少的情况下,Hash Based Shuffle的效率明显高于Sort Based Shuffle,因此基于Sort Shuffle,Spark还提供了一个回退方案,就是bypass运行机制。它的思想与Hash Based Shuffle类似,每个MapTask会为下游的每个ReduceTask生成一个对应的临时磁盘文件,唯一的区别是在最后会将这些文件进行合并,同时生成索引文件,标识对应的位置。
相比于Hash Based Shuffle,bypass机制提供了更少的最终磁盘文件,Shuffle read的性能更好。
相比于普通的Sort Based Shuffle,bypass机制为每个ReduceTask生成一个临时文件,只在最后进行一次合并,中间没有排序的操作,也就节省了这部分的性能开销。但是由于会为每个ReduceTask(分区)分配一个临时文件,如果ReduceTask过多的话,会对文件系统造成很大压力,因此bypass机制的触发条件如下:
- shuffle reduce
task的数量小于等于
spark.shuffle.sort.bypassMergeThreshold
参数的值,默认为200 - 不能有map端的聚合(例如reduceByKey,因为它对于临时文件来说)
Tungsten-Sort Based Shuffle
从Spark1.5开始,Spark启动钨丝计划,目的是优化内存和CPU的使用,从而进一步提高Spark的性能。由于需要基于JDK Unsafe API来使用堆外内存,因此Tungsten-Sort Based Shuffle又称为Unsafe Shuffle。
该Shuffle的做法类似于Sort Based Shuffle,但是将数据记录使用二进制的方式进行存储,直接在序列化的二进制数据上Sort,而不是在Java对象上。这样一方面能够减少内存的使用和GC的开销;另一方面也可以避免Shuffle过程中频繁的序列化和反序列化。
不过使用Tungsten-Sort Based Shuffle有几个限制条件:
- Shuffle map阶段不能有聚合操作
- 分区数不能超过一定大小(\(2^{24}-1\),这是24bit的partitionID的最大表示范围)
而从Spark 1.6开始,把Sort Shuffle和Tungsten-Sort Based Shuffle全部统一到了Sort Shuffle中,如果检测到满足Tungsten-Sort Based Shuffle条件,会自动采用Tungsten-Sort Based Shuffle,否则采用Sort Shuffle。
Sort Based Shuffle的优缺点:
优点:
- 小文件的数量大量减少,在Mapper端的内存占用变少
- 使得Spark不仅可以处理小规模的数据,对于大规模的数据也不会很容易达到性能瓶颈
缺点:
- 强制在Mapper端进行排序,即使数据本身可能并不需要排序,导致性能损耗
Spark 内存管理
概述
作为一个JVM进程,Executor的内存管理建立在JVM的内存管理之上。Spark对堆内(On-Heap)空间进行更加详细的分配,同时,Spark引入了堆外(Off-Heap)内存,使Spark可以直接在系统内存中开辟空间,进一步优化了内存的使用。堆内内存受到JVM的统一管理,我们能够控制的程度有限;堆外内存直接向操作系统进行内存的申请和释放,我们能够控制的程度较高
堆内内存大小的配置可以通过启动时配置-executor-memory
,也可以在配置参数spark.executor.memory
中指定。Executor中运行的Task共享堆内内存,其中又被划分成几个部分:
- 存储内存:Task缓存RDD数据以及广播数据的时候占用的内存
- 执行内存:任务在执行Shuffle的时候占用的内存
- 其他内存:对象实例占用的空间等
堆内内存的管理主要还是通过JVM来完成的,我们无法精确控制堆内内存的申请和释放,但是Spark通过对存储内存和执行内存各自独立的规划管理,决定是否要在存储内存中缓存新的RDD,以及是否为新的任务分配执行内存,一定程度上提高了内存的利用率。
而为了进一步优化内存的使用,提高Shuffle时的效率,Spark引入了堆外内存,使得Spark可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。堆外内存直接受到操作系统管理,减少了不必要的内存开销以及频繁的GC扫描与回收,提升了性能。堆外内存可以被精确的申请和释放。(利用JDK Unsafe API)
在默认情况下,堆外内存并不启用,可以通过配置spark.memory.offHeap.enabled
参数启用,并由参数spark.memory.offHeap.size
设定堆外空间的大小。
内存空间分配
静态内存管理
Spark最初采用静态内存管理机制,存储内存、执行内存和其他内存的大小在应用程序运行期间保持固定,用户在程序启动之前可以进行配置。
堆内内存划分如下:
堆内内存主要分为三部分,存储内存、执行内存以及其他内存。
- 存储内存:默认占系统内存的60%,该数值对应
spark.storage.memoryFraction
。存储内存中实际使用的部分并不是全部,预留部分空间防止OOM。实际可以使用的部分所占的比例默认为0.9,该数值对应spark.storage.safetyFraction
- 执行内存:默认占系统内存的20%,该数值对应
spark.shuffle.memoryFraction
。同存储内存一样,执行内存也预留了一部分空间,实际使用的空间所占比例默认为0.8,该数值对应spark.shuffle.safetyFraction
- 其他内存:剩余部分即为其他内存
存储内存和执行内存预留空间的目的是防止OOM,因为Spark堆内内存的大小记录是不准确的,需要留出保险空间。
对于存储内存和执行内存的预留部分,是一种逻辑上的规划,在具体使用的时候,Spark并没有区别对待,而是和其他内存一样,交给JVM进行管理
堆外内存的分配则较为简单,只划分为存储内存和执行内存。默认存储内存占用50%,该数值对应spark.memory.storageFraction
,剩余空间则为执行内存。由于堆外内存占用的空间可以被精确计算,所以不需要再预留空间保险。
静态内存管理机制实现较为简单,但是如果用户不熟悉Spark的存储机制,没有根据具体的数据规模和计算任务做相应的配置,很容易影响性能。Spark后续引入了新的内存管理机制,而处于兼容的目的,仍然保留的静态内存管理机制
统一内存管理
Spark1.6之后引入统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲空间。堆内存划分如下:
主要内存划分如下:
- 可用内存(Usable Memory):等于系统内存减去预留内存
- 统一内存:默认占可用内存的60%,其中包括了存储内存和执行内存
- 存储内存:默认占统一内存的0.5,数值对应
spark.stroage.storageFraction
- 执行内存:默认占统一内存的0.5
- 存储内存:默认占统一内存的0.5,数值对应
- 其他内存:默认占可用内存的40%
- 统一内存:默认占可用内存的60%,其中包括了存储内存和执行内存
- 预留内存(Reserved Memory):默认为300M
该机制中最重要的优化在于动态占用机制:
- 设置了基本的存储内存和执行内存区域,由
spark.storage.storageFraction
参数确定 - 双方的空间都不足时,则存储到磁盘;若己方空间不足而对方空间剩余时,则可以借用对方的空间
- 执行内存的空间被占用后,可以要求对方归还借用的空间
- 存储内存的空间被占用后,无法让对方归还借用的空间(主要是考虑到Shuffle过程中的较多因素,实现起来比较复杂)