本系列将参考Spark源码设计,尝试解读Spark源码中的几个核心模块,并结合源码分析实现。
根据之前的一系列分析,我们对spark作业从创建到调度分发,到执行,最后结果回传driver的过程有了一个大概的了解。但是在分析源码的过程中也留下了大量的问题,最主要的就是涉及到的spark中重要的几个基础模块,我们对这些基础设施的内部细节并不是很了解,之前走读源码时基本只是大概了解每个模块的作用以及对外的主要接口,这些重要的模块包括BlockMananger
, MemoryMananger
, ShuffleManager
, MapOutputTracker
, rpc模块NettyRPCEnv
,以及BroadcastManager
。
而对于调度系统涉及到的几个类包括DAGSchedulerManager
, TaskSchedulerManager
, CoarseGrainedSchedulerBackend
, CoarseGrainedExecutorBackend
, Executor
, TaskRunner
,我们之前已经做了较为详细的分析,因此这几个模块暂告一段落。
本篇,我们来看一下spark中最基础的一个的模块–存储系统BlockManager
的内部实现。
BlockManager调用时机
首先,我们来整理一下在一个作业的运行过程中都有哪些地方使用到了BlockManager。
DAGScheduler.getCacheLocs
。这个方法的调用是在提交一个stage时,需要获取分区的偏向位置时会调用该方法。我们知道rdd是可以缓存的,而rdd的缓存就是通过blockManager来管理的,有一个专门的RDDBlockId用来表示一个RDD缓存块的唯一标识。最终调用的方法是:
blockManagerMaster.getLocations(blockIds)
广播变量。在
DAGscheduler
中提交stage时需要把rdd和ShuffleDependency(对于ResultStage则是一个函数)对象序列化用于网络传输,实际上序列化后的字节数组是通过broadcastManager
组件进行网络传输的,而broadcastManager
实际又是通过BlockMananger
来将要广播的数据存储成block,并在executor端发送rpc请求向BlockManangerMaster
请求数据。每个广播变量会对应一个TorrentBroadcast对象,TorrentBroadcast对象内的writeBlocks和readBlocks是读写广播变量的方法,最终调用的方法是:
blockManager.putSingle
和blockManager.putBytes
Shuffle的map阶段输出。如果我们没有启动外部shuffle服务及ExternalShuffle,那么就会用spark自己的shuffle机制,在map阶段输出时通过blockManager对输出的文件进行管理。shuffle这部分主要使用的是DiskBlockManager组件。
最终调用的是:DiskBlockManager相关方法包括
createTempShuffleBlock
,getDiskWriter
。
DiskBlockObjectWriter相关方法,包括write
方法和commitAndGet
方法任务运行结果序列化后传回driver。这里分为两种情况,如果结果序列化后体积较小,小于maxDirectResultSize,则直接通过rpc接口传回,如果体积较大,就需要先通过blockManager写入executor几点的内存和磁盘中,然后在driver端进行拉取。
最终调用的是:
blockManager.putBytes
此外,我们还注意到,以上几种情形中使用的BlockId都是不同的,具体可以看一下BlockId.scala文件中关于各种BlockId的定义。
所以,接下来,我们的思路就很清晰了,以上面提到的对BlockManager的方法调用为切入点进行分析。
BlockManagerMaster.getLocations
这个方法用于获取指定的blockId对应的块所在存储位置。
1 | def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = { |
这里向driverEndpoint
发送了一个GetLocations
消息,注意这里的driverEndpoint
并不是DriverEndpoint
的端点引用,在SparkEnv的构造过程我们可以看到,这是一个BlockManagerMasterEndpoint
端点的引用。所以我们需要在BlockManagerMasterEndpoint
中寻找对于该消息的处理。注意,由于这里调用了ask方法,所以在服务端是由receiveAndReply方法来处理并响应的。
BlockManagerMasterEndpoint.receiveAndReply
我们截取了对GetLocations处理的部分代码
1 | case GetLocationsMultipleBlockIds(blockIds) => |
调用的是getLocations方法:
1 | private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { |
这个方法很简单,就是直接从缓存中查找blockId对应的位置,位置信息用BlockManagerId封装。那么缓存中的信息什么时候加进去呢?当然是写入新的block并更新block位置信息的时候,后面的会分析到。
BlockManager.putSingle
这个方法写入一个有单个对象组成的块,
1 | def putSingle[T: ClassTag]( |
可以看到,把对象包装成了一个只有一个元素的迭代器,然后调用putIterator
方法,最后调用doPutIterator
方法
BlockManager.doPutIterator
上面的方法,最终调用了doPutIterator
方法。
1 | private def doPutIterator[T]( |
总结一下这段代码的主要逻辑:
- 如果存储等级允许存入内存,那么优先存入内存中。根据存储的数据是否需要序列化分别选择调用memoryStore的不同方法。
- 如果存储等级不允许内存,那么只能存入磁盘中,存入磁盘中的数据一定是经过序列化的,这点要注意。
- 向BlockManagerMaster汇报刚写入的块的位置信息
- 更新任务度量系统中关于块信息的相关统计值
- 如果副本数大于1,那么需要进行额外的复制
从上面的步骤可以看到,在完成数据写入后,会通过rpc调用向BlockManagerMaster
汇报块的信息,这也解答了blockManagerMaster.getLocations
方法从内存的map结构中查询块的位置信息的来源。
单纯就存储数据来说,最重要的无疑是内存管理器MemoryStore
和磁盘管理器DiskStore
。
对于MemoryStore
和DiskStore
调用的存储方法有:
1 | memoryStore.putIteratorAsValues |
blockManager.putBytes
我们再来接着看另一个写入方法,putBytes
,即写入字节数组数据。它的实际写入的逻辑在doPutBytes
方法中,我们看一下这个方法:blockManager.doPutBytes
这个方法的主要步骤与doPutIterator
方法差不多。只不过doPutIterator
方法插入的是java对象,如果存储级别要求序列化或者存储到磁盘时,需要将对象序列化。
1 | private def doPutBytes[T]( |
对于MemoryStore
和DiskStore
调用的方法有:
1 | memoryStore.putBytes |
总结
综上,我们把一个spark作业运行过程中需要调用到BlockManager
的时机以及调用的BlockManager
的一些写入数据的方法大致整理了一下。BlockManager
主要是通过内部的两个组件MemoryStore
和DiskStore
来管理数据向内存或磁盘写入的。此外DiskBlockManager
组件主要是用来管理Block和磁盘文件之间的对应关系,分配文件路径,管理本地文件系统路径等作用。对于MemoryStore
和DiskStore
的调用主要有如下几个方法:
1 | memoryStore.putIteratorAsValues |