上一篇讲到DAGScheduler根据shuffle依赖对作业的整个计算链划分成多个stage之后,就开始提交最后一个ResultStage,而由于stage之间的依赖关系,实际上最终是循着计算链从上到下依次提交stage的。每提交一个stage,就会将这个stage分成多个Task,并且会计算每个Task的偏向位置,将RDD和ShuffleDependency,TaskMetrics等对象序列化用于远程传输,最后把一个stage的所有Task包装成一个任务集,提交给TaskSchedulerImpl运行。本节就来分析一下这个TaskSchedulerImpl。
- TaskSchedulerImpl
- TaskSchedulerImpl.submitTasks
- TaskSchedulerImpl.resourceOffers
- TaskSetManager.resourceOffer
- 总结
太长不看系列
DAGScheduler
对作业计算链按照shuffle依赖划分多个stage,提交一个stage根据个stage的一些信息创建多个Task,包括ShuffleMapTask
和ResultTask
, 并封装成一个任务集(TaskSet),把这个任务集交给TaskScheduler
。TaskSchedulerImpl
将接收到的任务集加入调度池中,然后通知调度后端SchedulerBackend
(在driver上)。CoarseGrainedSchedulerBackend
收到新任务提交的通知后,转发给driverEndpoint
。driverEndpoint
检查下现在可用 executor有哪些,并把这些可用的executor交给TaskSchedulerImpl
。TaskSchedulerImpl
根据获取到的计算资源,根据任务本地性级别的要求以及考虑到黑名单因素,按照round-robin的方式对可用的executor进行轮询分配任务。- 经过多个本地性级别分配,多轮分配后最终得出任务与executor之间的分配关系,并封装成
TaskDescription
形式返回给Driver上的SchedulerBackend
SchedulerBackend
拿到这些分配关系后,就知道哪些任务该发往哪个executor了,通过调用rpc接口将任务通过网络发送给对应executor即可。
TaskSchedulerImpl
首先把TaskSchedulerImpl
的说明翻译一下:TaskSchedulerImpl
的主要作用是调度Task,执行在Driver
上。内部通过SchedulerBackend
进行实际任务的传输。不同的集群类型对应不同的具体的调度后端的实现,例如本地模式的调度后端实现是LocalSchedulerBackend
,而任务调度器的实现只有一种就是TaskSchedulerImpl
。TaskSchedulerImpl
主要处理一些通用的逻辑,例如在多个作业之间决定调度顺序,执行推测执行的逻辑等等。
TaskSchedulerImpl
在使用之前应该先调用initialize()
和 start()
方法,然后再提交任务集。
这里插一句,这两个方法分别在上面地方调用呢?都是在SparkContext初始化的时候调用的,具体可以看SparkContext初始化代码。
关于线程安全的一些提示:由于会存在多线程同时提交任务的情况,所以面向外部的public的方法必须加锁以保持内部状态量,簿记量的一致性。
此外,一些SchedulerBackend的方法会先获取自身的锁,然后获取TaskSchedulerImpl对象的锁,所有应该避免在持有TaskSchedulerImpl对象的锁的情况下再尝试获取调度后端的锁,这样会造成死锁。实际上这句话的意思就是因为有些操作需要同时持有调度后端的锁和TaskSchedulerImpl锁,对于这种需要同时持有多把锁的情况,应该保持获取锁的顺序是一致的,这样就能避免出现死锁的情况。
好了,接下来我们接着任务提交的逻辑继续分析。
TaskSchedulerImpl.submitTasks
1 | override def submitTasks(taskSet: TaskSet) { |
总结submitTask
做的事情
- 获取锁,更新一些簿记量
- 将新的任务集封装为
TaskSetManager
添加到调度池中 - 调用
SchedulerBackEnd.reviveOffers()
,给任务分配可用资源
CoarseGrainedSchedulerBackend.reviveOffers
1 | override def reviveOffers() { |
这个方法通过rpc模块给DriverEndPoint
发送一个消息,本地进程内调用rpc方法,主要是为了代码模块的统一。 在DriverEndpoint.receive方法中,我们可以看到,在接收到ReviveOffers消息后,就会调用makeOffers方法,
DriverEndpoint.makeOffers
1 | private def makeOffers() { |
这个方法主要是将当前所有可用的资源(executor)封装成资源对象(WorkerOffer)交给TaskSchedulerImpl
,TaskSchedulerImpl
会综合考虑任务本地性,黑名单,调度池的调度顺序等因素,返回TaskDescription
集合。
TaskDescription对象是对一个Task的完整描述,包括序列化的任务数据,任务在哪个executor上运行,依赖文件和jar包等信息。
从这里也可以看出,SchedulerBackend的职责其实相对比较少,主要是对executor的管理,以及调用rpc远端服务的引用发送任务数据,大部分的调度工作还是由TaskSchedulerImpl来完成。
接下来我们分析一下Task调度最重要的一个方法,TaskSchedulerImpl.resourceOffers
TaskSchedulerImpl.resourceOffers
这个方法由 SchedulerBackend
调用,SchedulerBackend
会将可用的executor资源告诉TaskSchedulerImpl
,TaskSchedulerImpl
根据TaskSet优先级(调度池),黑名单,本地性等因素给出要实际运行的任务。我们使用round-robin的方式将任务分配到各个executor上,以使得计算资源的 使用更均衡。
1 |
|
- 更新一些簿记量,如物理节点和executor的相互映射关系,机架和host的映射关系,host和executor上运行的任务信息等等; 检查是否有新的可用executor加入
- 触发黑名单的超时检查,被加入黑明单的节点或executor是由一定超时时间的,在超时时间内不能像他们提交任务,而过了超时时间,这些资源将被重新投入使用; 根据最新的黑名单过滤掉在黑名单中的计算资源,包括host和executor
- 通过调度池对所有的任务集按优先级进行排序,获取排序后的任务集
- 对于每一个任务集,按照对executor进行round-robin的方式分配任务,会进行多轮分配,每一轮依次轮询所有的executor,为每一个executor分配一个符合本地性要求的任务
TaskSchedulerImpl.resourceOfferSingleTaskSet
1 | private def resourceOfferSingleTaskSet( |
这个方法就是对所有可用的executor进行一轮round-robin方式的分配,一轮分配中,每个executor最多只能得到一个任务,这样做是为了尽量将任务“打散”,均匀第“撒到”所有executor上。
每一个executor会根据本地性,找到适合自己的task。
TaskSetManager.resourceOffer
1 | TaskNotSerializableException] [ |
这个方法的作用是对给定的executor和本地性级别,分配一个符合要求的任务给这个executor。 最终任务被封装成TaskDescription对象。
小结
在给定的计算资源上分配合适的任务,这个工作主要是由TaskScheduler
和TaskSetManager
两个类协同完成的。
而任务本地性的维护与分配时的检查工作是在TaskSetManage
r中完成的。
接下来,我们分析一下获取到可以实际运行的任务后,SchedulerBackend
是怎么把这些任务发送到制定的executor上执行的。
DriverEndpoint.makeOffers
首先还得接着回到DriverEndpoint.makeOffers
方法,makeOffers方法中通过调用TaskSchedulerImpl.resourceOffers
方法切入TaskSchedulerImpl
,然后就是TaskSchedulerImpl
在做任务分配的工作,最终TaskSchedulerImpl
将分配好的任务以TaskDescription的封装形式返回给DriverEndpoint
(DriverEndpoint是调度后端的一个内部类),然后紧接着调用DriverEndpoint.launchTasks
方法将这些任务传给相应的executor执行。
DriverEndpoint.launchTasks
1 | private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { |
这个方法跟简单,主要就是对TaskDescription进行序列化,然后检查体积是否超过阈值,如果没超过阈值就调用rpc服务引用,将任务发送到指定的executor上。
总结
好了,经过漫长的调用,终于我们的任务要离开driver,驶向executor了,回顾一下任务在driver中从诞生到最终发送的过程,主要有一下几个步骤:
DAGSchedule
r对作业计算链按照shuffle依赖划分多个stage,提交一个stage根据个stage的一些信息创建多个Task,包括ShuffleMapTask
和ResultTask
, 并封装成一个任务集(TaskSet),把这个任务集交给TaskScheduler
。TaskSchedulerImpl
将接收到的任务集加入调度池中,然后通知调度后端SchedulerBackend
。CoarseGrainedSchedulerBackend
收到新任务提交的通知后,检查下现在可用 executor有哪些,并把这些可用的executor交给TaskSchedulerImpl
。TaskSchedulerImpl
根据获取到的计算资源,根据任务本地性级别的要求以及考虑到黑名单因素,按照round-robin的方式对可用的executor进行轮询分配任务。- 经过多个本地性级别分配,多轮分配后最终得出任务与executor之间的分配关系,并封装成
TaskDescription
形式返回给Driver上的SchedulerBackend
SchedulerBackend
拿到这些分配关系后,就知道哪些任务该发往哪个executor了,通过调用rpc接口将任务通过网络发送即可。