上一篇,我们主要分析了一次作业的提交过程,严格说是在driver
端的过程,作业提交之后经过DAGScheduler
根据shuffle依赖关系划分成多个stage,依次提交每个stage,将每个stage创建于分区数相同数量的Task,并包装成一个任务集,交给TaskSchedulerImpl
进行分配。
TaskSchedulerImpl
则会根据SchedulerBackEnd
提供的计算资源(executor),并考虑任务本地性,黑名单,调度池的调度顺序等因素对任务按照round-robin的方式进行分配,并将Task与executor的分配关系包装成TaskDescription
返回给SchedulerBackEnd
。
然后SchedulerBackEnd
就会根据收到的TaskDescription
将任务再次序列化之后发送到对应的executor
上执行。
本篇,我们就来分析一下Task在executor
上的执行过程。
太长不看系列:
- 首先executor端的rpc服务端点(比如
CoarseGrainedExecutorBackend
)收到LaunchTask
的消息,并对传过来的任务数据 data 进行反序列化成TaskDescription
. - 将任务交给Executor对象运行
- Executor根据传过来的
TaskDescription
对象创建一个TaskRunner
对象,并放到线程池中运行。这里的线程池用的是Executors.newCachedThreadPool,空闲是不会有线程在跑 TaskRunner
对任务进一步反序列化,调用Task.run
方法执行任务运行逻辑ShuffleMapTask
类型的任务会将rdd计算结果数据经过排序合并之后写到一个文件中,并写一个索引文件ResultTask
类型任务会根据func执行计算。
- 任务运行完成后会更新一些任务统计量和度量系统中的一些统计量
- 最后会根据结果序列化后的大小选择不同的方式将结果传回driver。
CoarseGrainedExecutorBackend
SchedulerBackend
特质有两个实现子类CoarseGrainedExecutorBackend
和LocalSchedulerBackend
。在local部署模式下使用LocalSchedulerBackend
模式,在其他模式下使用CoarseGrainedExecutorBackend
。这两个子类也继承了ExecutorBackend
特质。
我们将主要分析后者。
CoarseGrainedExecutorBackend
是Driver
和Executor
通信的后端接口。
任务执行入口Executor.launchTask
首先,我们知道CoarseGrainedExecutorBackend
是yarn模式下的executor的实现类,这时一个rpc服务端,所以我们根据rpc客户端也就是CoarseGrainedSchedulerBackEnd
发送的消息,然后在服务端找到处理对应消息的方法,顺藤摸瓜就能找到Task执行的入口。
通过上一篇的分析知道发送任务时,CoarseGrainedSchedulerBackEnd
发送的是一个LaunchTask
类型的消息,我们看一下CoarseGrainedExecutorBackend.receive
方法,其中对于LaunchTask
消息的处理如下:
1 | case LaunchTask(data) => |
可以看到,实际上任务交给内部的Executor
对象来处理,实际上Executor
对象承担了executor
端的绝大部分逻辑,可以认为CoarseGrainedExecutorBackend
仅仅是充当在executor机器上的rpc消息中转的角色,充当spark的rpc框架中端点的角色,而实际的任务执行的逻辑则是由Executor
对象来完成的。
Executor概述
我们先来看一下Executor类的说明:Spark 执行器,由线程池支持以运行任务。
1 | /** |
Executor内部有一个线程池用来运行任务,Mesos, YARN, 和 standalone模式都是用这个类作为任务运行的逻辑。此外Executor对象持有SparkEnv
的引用,以此来使用spark的一些基础设施,包括rpc引用。
我们还是以任务运行为线索分析这个类的代码。
Executor.launchTask
1 | def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = { |
executor
收到TaskDescription
以后封装成为了一个TaskRunner
,然后放到线程池去执行。从这个地方也能看出来,在executor端,一个task对应一个线程。
TaskRunner
这个类实现Runnable
接口,包括了一个run方法,可以让线程池去执行任务。所以接下来我们就看一下TaskRunner
这个类。
TaskRunner.run
其中有一些统计量我就不说了,比如任务运行时间统计,cpu耗时统计,gc耗时统计等等,这里有一点可以积累的地方是MXBean,cpu,gc耗时都是通过获取jvm内置的相关的MXBean获取到的,入口类是ManagementFactory
,具体的可以细看,这里不再展开。
1 | override def run(): Unit = { |
总结一下这个方法的主要步骤:
创建Task需要的TaskMemoryManager。
向driver发送一个更新任务状态的消息,通知driver这个task处于运行的状态。
对任务进行反序列化生成Task对象,根据任务类型可能是ShuffleMapTask或者ResultTask。得到任务需要的taskFiles(文件), taskJars(任务jar包), taskProps(任务属性), taskBytes(任务本身)。
- taskProps放入ThreadLocal
- 从taskFiles, taskJars得到任务依赖。
- 对taskBytes再次反序列得到task实例。
检查任务有没有被杀死,如果被杀死则抛出一个异常;(driver随时都可能发送一个杀死任务的消息)
调用
Task.run
方法执行任务的运行逻辑任务运行结束后,清除未正常释放的内存资源和block锁资源,进行资源回收。
更新度量系统中的相关统计量
将任务运行的结果数据序列化。检测序列化后的体积,有两个阈值:maxResultSize和maxDirectResultSize,
- 如果超过maxResultSize直接丢弃结果,就是不往
blockmanager
里面写数据,这样driver
端在试图通过blockmanager
远程拉取数据的时候就获取不到数据,这时driver就知道这个任务的结果数据太大,失败了; - 而对于体积超过maxDirectResultSize的情况,会将任务结果数据通过
blockmanager
写到本地内存和磁盘,然后将block信息发送给driver, driver会根据这些信息来这个节点拉取数据; - 如果体积小于maxDirectResultSize,则直接通过rpc接口将结果数据发送给driver。
- 如果超过maxResultSize直接丢弃结果,就是不往
最后还会有对任务失败的各种总异常的处理。
Task.run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73final def run(
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem): T = {
// 调用BlockManager的registerTask,注册信息。
SparkEnv.get.blockManager.registerTask(taskAttemptId)
// 创建任务上下文
context = new TaskContextImpl(
stageId,
stageAttemptId, // stageAttemptId and stageAttemptNumber are semantically equal
partitionId,
taskAttemptId,
attemptNumber,
taskMemoryManager,
localProperties,
// 度量系统就是SparkEnv的度量对象
metricsSystem,
metrics)
// 将任务尝试的上下文保存到ThreadLocal中
TaskContext.setTaskContext(context)
// 获取运行任务尝试的线程
taskThread = Thread.currentThread()
// 如果被kill,将任务和上下文标记为kill
if (_reasonIfKilled != null) {
kill(interruptThread = false, _reasonIfKilled)
}
new CallerContext(
"TASK",
SparkEnv.get.conf.get(APP_CALLER_CONTEXT),
appId,
appAttemptId,
jobId,
Option(stageId),
Option(stageAttemptId),
Option(taskAttemptId),
Option(attemptNumber))
.setCurrentContext()
try {
runTask(context) // 调用子类实现的runTask方法尝试任务。
} catch {
case e: Throwable =>
// Catch all errors; run task failure callbacks, and rethrow the exception.
try {
context.markTaskFailed(e)
} catch {
case t: Throwable =>
e.addSuppressed(t)
}
throw e
} finally {
context.markTaskCompleted()
try {
Utils.tryLogNonFatalError {
// 释放内存快管理器中该任务使用的内存,最终是通过内存管理器来释放的
// 实际上就是更新内存管理器内部的一些用于记录内存使用情况的簿记量
// 真正的内存回收肯定还是有gc来完成的
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
val memoryManager = SparkEnv.get.memoryManager
// 内存释放之后,需要通知其他在等待内存资源的 线程
memoryManager.synchronized { memoryManager.notifyAll() }
}
} finally {
// 移除ThreadLocal中保存的当前任务尝试线程的上下文
TaskContext.unset()
}
}
}代码总结:
调用BlockManager的registerTask,注册信息。
创建任务尝试的上下文TaskContextImpl,并设置到一个ThreadLocal变量中
检查任务是否被杀死
创建调用者上下文CallerContext
调用子类实现的runTask方法执行实际的任务逻辑
无论task是否成功,都会在finally调用TaskContextImpl的markTaskComplete方法。
最后会释放在shuffle过程中申请的用于数据unroll的内存资源,释放堆内和堆外内存,唤醒其他任务。移除ThreadLocal中保存的当前Task线程的TaskContextImpl
所以,接下来我们要分析的肯定就是runTask方法,而这个方法是个抽象方法,在ResultTask
和ShuffleMapTask
有不同的实现。
ResultTask.runTask
1 | override def runTask(context: TaskContext): U = { |
总结:
- 反序列化task,得到RDD和需要执行的func
- 调用RDD的iterator方法进行迭代计算和最终处理。
ShuffleMapTask.runTask
这个方法还是大概逻辑还是很简单的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39override def runTask(context: TaskContext): MapStatus = {
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
// 反序列化RDD和shuffle, 关键的步骤
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
var writer: ShuffleWriter[Any, Any] = null
try {
// shuffle管理器
val manager = SparkEnv.get.shuffleManager
// 获取一个shuffle写入器
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
// 主要是删除中间过程的溢写文件,向内存管理器释放申请的内存
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception => log.debug("Could not stop writer", e)
}
throw e
}
}
- 对任务反序列化,得到RDD和shuffleDependency。
- 调用
sortShuffleManager
的getWriter方法,获得对执行分区的数据进行磁盘写的sortShuffleWriter
。 - 通过rdd的iterator方法获取当前task对应的分区的计算结果(结果一一个迭代器的形式返回)
- 利用shuffleManager通过blockManager写入到文件block中,然后将block信息传回driver上报给BlockManagerMaster。
这里可以看到rdd计算的核心方法就是iterator方法SortShuffleWriter的write方法可以分为几个步骤:
- 将上游rdd计算出的数据(通过调用rdd.iterator方法)写入内存缓冲区,
- 在写的过程中如果超过 内存阈值就会溢写磁盘文件,可能会写多个文件
- 最后将溢写的文件和内存中剩余的数据一起进行归并排序后写入到磁盘中形成一个大的数据文件
- 这个排序是先按分区排序,在按key排序
- 在最后归并排序后写的过程中,没写一个分区就会手动刷写一遍,并记录下这个分区数据在文件中的位移。所以实际上最后写完一个task的数据后,磁盘上会有两个文件:数据文件和记录每个reduce端partition数据位移的索引文件
所以实际上重要的步骤有两个:通过RDD的计算链获取计算结果;将计算结果经过排序和分区写到文件中。这里我先分析第二个步骤。
SortShuffleWriter.write
spark在2.0之后shuffle管理器改成了排序shuffle管理器,即SortShuffleManager
,所以这里通过SortShuffleManager
管理器获取到的在一般情况下都是SortShuffleWriter
,当然在满足bypass条件(map端不需要combine,并且分区数小于200)的情况下会使用BypassMergeSortShuffleWriter
。
1 | override def write(records: Iterator[Product2[K, V]]): Unit = { |
总结一下这个方法的主要逻辑:
- 首先获取一个排序器,并检查是否有map端的合并器
- 将rdd计算结果数据写入排序器,过程中可能会溢写过个磁盘文件
- 最后将多个碎小的溢写文件和内存缓冲区的数据进行归并排序,写到一个文件中
- 将每个分区数据在文件中的偏移量写到一个索引文件中,用于reduce阶段拉取数据时使用
- 返回一个MapStatus对象,封装了当前executor上的blockManager的id和每个分区在数据文件中的位移量
总结
总结一下任务在executor端的执行流程:
- 首先executor端的rpc服务端点收到
LaunchTask
的消息,并对传过来的任务数据进行反序列化成TaskDescription
. - 将任务交给Executor对象运行
- Executor根据传过来的
TaskDescription
对象创建一个TaskRunner
对象,并放到线程池中运行。这里的线程池用的是Executors.newCachedThreadPool,空闲是不会有线程在跑 TaskRunner
对任务进一步反序列化,调用Task.run
方法执行任务运行逻辑ShuffleMapTask
类型的任务会将rdd计算结果数据经过排序合并之后写到一个文件中,并写一个索引文件ResultTask
类型任务会根据func执行计算。
- 任务运行完成后会更新一些任务统计量和度量系统中的一些统计量
- 最后会根据结果序列化后的大小选择不同的方式将结果传回driver。