本系列将参考Spark源码设计,尝试解读Spark源码中的几个核心模块,并结合源码分析实现。本篇,我会从一次spark作业的运行为切入点,将spark运行过程中涉及到的各个步骤,包括DAG图的划分,任务集的创建,资源分配,任务序列化,任务分发到各个executor,任务执行,任务结果回传driver等等。
太长不看:
DAGScheduler
提交job,包括最后一个RDD- 逆向开始从rdd划分依赖,进而划分stage
- 提交stage
入口:SparkContext.runJob
我们知道spark中的作业执行时懒执行的,懒执行最大的好处是可以把一些算子向流水线一样串在一起,从而形成流式的计算模式,个人认为这个特点也是spark比mapreduce性能高的一种重要原因,避免中间多次落盘。runJob
这个方法是spark中所有行动算子的入口。
1 | /** |
- 首先清除闭包的一些不必要的引用,这一步主要是为了方便序列化,因为一些不必要的引用可能引用了不可序列化的对象,这会导致函数不可序列化。很多时候,用户写的代码并不是很靠谱,spark考虑到这一点,所以这也是为了尽量减少用户的开发难度。
- 调用DAGScheduler执行提交任务的逻辑
DAGScheduler.submitJob
经过一些调用,最终会调用到这个方法。
1 | def submitJob[T, U]( |
这个方法的逻辑也很简单。首先做一些检查,然后向DAG调度器内部的一个事件处理器投递一个作业提交的事件。DAGScheduler
自己有一个事件处理器,是很常规的事件循环处理,使用单线程的方法循环处理事件队列中的事件,逻辑很简单,所以这里不再展开。
投递任务提交任务后,最终会调用DAGScheduler
的handleJobSubmitted
方法。我们可以看到,DAGScheduler
中还有很多其他类似的处理方法,对应了不同的事件类型,事件分发逻辑在DAGSchedulerEventProcessLoop.doOnReceive
方法中,不再展开。
我们仍然回到作业运行这条主线上来,继续看handleJobSubmitted
。
handleJobSubmitted
1 | private[scheduler] def handleJobSubmitted(jobId: Int, |
涉及到的一些簿记量的更新就不再展开了。对于job和jobId,stageId等等的各种映射。
创建最后一个stage,这一步其实会根据shuffle依赖关系对整个RDD的计算关系图(DAG)进行划分,形成不同的stage, 最后一步行动算子会创建
ResultStage
, 然后提交最后一个stage。
可以看到提交实际上是提交的最后的stage,从最后一个stage去顺着rdd的依赖反向寻找前面的stage和task去最终正向提交task。 是不是有点像拓扑排序的意思?
接下来的我们重点分析一下DAG图的划分以及stage的创建,这也是DAGScheduler的主要功能。
stage的划分和创建
DAGScheduler.createResultStage
1 | private def createResultStage( |
重点在于创建父stage。
1 | private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { |
这个方法根据当前的rdd去递归的寻找当前rdd的依赖,并且对于每一个依赖得到对应的stage。
getShuffleDependencies
这个方法用一个栈实现对rdd的深度优先遍历,可以看到在找到shuffle依赖时就记录下来,并且不再继续寻找shuffle依赖前面的依赖。
所以这个方法只会在整个DAG图上找到这个rdd的上一级所有的shuffle依赖,而不会跨越多级shuffle依赖。
1 | private[scheduler] def getShuffleDependencies(rdd: RDD[]): HashSet[ShuffleDependency[, , ]] = { |
在寻找依赖的过程中,找到了shuffle依赖就停止不在继续往前寻找,而对于窄依赖就一直寻找直到找到尽头或者shuffle依赖。
这样其实实现了按照shuffle进行stag的分割。
getOrCreateShuffleMapStage
我们继续看另一个重要的方法,创建shuffle的stage。
1 | private def getOrCreateShuffleMapStage( |
可以看到,这个方法会将所有还没创建stage的祖先shuffle依赖全部创建出来。
我们看一下,创建ShuffleMapStage的具体过程:
1 | def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = { |
其中比较关键的步骤有:
- 创建所有的父stage
- 封装一个ShuffleMapStage对象,比较重要的是mapOutputTracker对象的引用。这个对象主要作用是追踪shuffle过程中map阶段的输出的位置信息,后面我们会讲到map输出是通过shuffleManager对map输出数据进行分区和排序处理并序列化,然后blockManager进行存储,而map输出的位置信息是通过blockId标识,并且都会传回driver,在driver中有一个MapOutputTrackerMaster组件专门负责维护所有stage的所有map任务的输出的位置信息。
- 在mpOutputTrackerMaster注册新创建的stage,其实就是在映射结构里加一条数据
小结
创建ResultStage需要父stage->查找所有的父dependency,然后创建出来每一个父shuffleStage->在创建shuffleStage的时候继续找自己的父shuffleStage
对于stage的创建过程做一个小结:这里涉及到几个方法形成的递归调用;在遍历rdd依赖的过程中按深度优先遍历,每遇到一个shuffle依赖就创建一个stage,所有上游的stage创建完成后,最后再创建一个ResultStage。
stage提交
接下来,我们看一下在作业运行的过程中DAGScheduler负责的最后一步:stage提交
submitStage
首先是submitStage方法。
1 | private def submitStage(stage: Stage) { |
这个方法比较简单:
首先是提交还没有运行过的父stage,把自身放到等待队列中
如果父stage都已经运行完成了,或者不存在父stage,那么提交当前stage,即调用
submitMissingTasks
submitMissingTasks
1 | private def submitMissingTasks(stage: Stage, jobId: Int) { |
这个方法比较长,但是应该说是在DAG调度器提交作业的过程中最重要的方法了。主要做的事情其实就是根据要提交的stage创建一个任务集,每个partition创建一个Task,所有要计算的Task形成一个任务集。
- 更新一些簿记量
- 找出每个Task的偏向位置,对于一般的shuffle stage,通过
mapOutputTracker
来计算Task的偏向位置 - 向事件总线投递一个stage提交的事件
- 对RDD和
ShuffleDependency
或者ResultStage的计算函数func进行序列化,以用于传输 - 序列化任务运行统计量的累加器对象,加器对象序列化有一个比较有意思的地方,在readObject方法中,可以看一下
- 对每个要计算的分区创建一个Task,根据stage类型分为ShuffleMapTask和ResultTask两种
- 最后调用TaskScheduler的方法提交任务
至此,DAGScheduler完成了他的使命,成功将接力棒交给了TaskScheduler,接下来就是TaskScheduler的表演了。