本系列将参考Spark源码设计,尝试解读Spark源码中的几个核心模块,并结合源码分析实现。
接着上一篇,本篇,我们分析一下实现磁盘存储的功能类DiskStore
,这个类相对简单。在正式展开之前,我觉得有必要大概分析一下BlockManager
的背景,或者说它的运行环境,运行的作用范围。Blockmanager
这个类其实在运行时的每个节点都会有一个实例(包括driver和executor进程),因为不论是driver端进行广播变量的创建,还是executor端shuffle过程中写shuffle块,或者是任务运行时结果太大需要通过BlockManager
传输,或者是RDD的缓存,其实在每个运行节点上都会通过Blockmanager
来管理程序内部对于本地的内存和磁盘的读写.所以综上,我想表达的核心意思就是每个进程(driver和executor)都有一个Blockmanager
实例,而这些Blockmanager
实例是通过BlockManagerId
类来进行唯一区分的,BlockManagerId
实际上是对进程物理位置的封装。
DiskStore
DiskStore.put
首先我们来看一个最常用的写入方法
1 | def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = { |
这个方法很简单,没什么好说的,但是调用了一个比较重要的类DiskBlockManager,这个类的功能就是对磁盘上的目录和文件进行管理,会在磁盘上按照一定规则创建一些目录和子目录,在分配文件名时也会尽量均匀第分配在这些目录和子目录下。
DiskStore.putBytes
这个方法就不说了,简单处理一下直接调用put方法。
1 | def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { |
DiskStore.getBytes
我们来看一下这个方法,首先通过DiskBlockManager
获取对应的文件名,然后将其包装成一个BlockData
对象,分为加密和不加密两种。
1 | def getBytes(blockId: BlockId): BlockData = { |
DiskBlockData
这个类作为磁盘文件的包装类,主要功能是提供了几个方便的接口,将磁盘文件中的数据读取出来并生成缓冲对象。
这个类中有两个重要的方法toChunkedByteBuffer
和toByteBuffer
,toByteBuffer
就不说了,调用ReadableByteChannel.read(ByteBuffer dst)
方法读取文件数据,我们看一下toChunkedByteBuffer
DiskBlockData.toChunkedByteBuffer
这个方法也很简单,在数据量比较大的时候,由于每次申请的内存块大小有限制maxMemoryMapBytes,所以需要切分成多个块
1 | override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = { |
DiskBlockManager
这个类之前也分析过,主要是用来管理spark运行过程中写入的一些临时文件,以及目录的管理。
首先会根据参数配置创建本地目录(可以是逗号分隔的多个目录),参数的优先顺序是:如果是运行在yarn上,则会使用yarn参数LOCAL_DIRS配置的本地目录;否则获取环境变量SPARK_LOCAL_DIRS的值;否则获取spark.local.dir参数的值;最后如果都没有配置,那么就用java系统参数java.io.tmpdir的值作为临时目录。
其次,关于文件在目录之间分配的问题,使用文件名的hash值对目录数量取余的方法来尽量将文件均匀地分配到不同的目录下。
另外一点要说的是文件名的命名规则,是根据不同作用的Block来区别命名的,例如RDD缓存写入的block的id就是RDDBlockId,它的文件名拼接规则是”rdd_” + rddId + “_” + splitIndex