上一篇中,我们分析了shuffle在map阶段的写过程。简单回顾一下,主要是将ShuffleMapTask计算的结果数据在内存中按照分区和key进行排序,过程中由于内存限制会溢写出多个磁盘文件,最后会对所有的文件和内存中剩余的数据进行归并排序并溢写到一个文件中,同时会记录每个分区(reduce端分区)的数据在文件中的偏移,并且把分区和偏移的映射关系写到一个索引文件中。
- shuffle读过程源码分析
- RDD.groupByKey
- RDD.combineByKeyWithClassTag
- ShuffleRDD.compute
- SortShuffleManager.getReader
- BlockStoreShuffleReader.read
- ShuffleBlockFetcherIterator
- ShuffleBlockFetcherIterator.initialize
- ShuffleBlockFetcherIterator.splitLocalRemoteBlocks
- ShuffleBlockFetcherIterator.fetchUpToMaxBytes
- ShuffleBlockFetcherIterator.next
shuffle读过程源码分析
好了,简单回顾了写过程后,我们不禁思考,reduce阶段的数据读取的具体过程是什么样的?数据读取的发生的时机是什么?
首先应该回答后一个问题:数据读取发生的时机是什么?我们知道,rdd的计算链根据shuffle
被切分为不同的stage
,一个stage
的开始阶段一般就是从读取上一阶段的数据开始,也就是说stage
读取数据的过程其实就是reduce
过程。
然后经过该stage的计算链后得到结果数据,把这些数据写入到磁盘供下一个stage
读取,这个写入的过程实际上就是map输出过程,而这个过程我们之前已经分析过了。
本篇我们要分析的是reduce阶段读取数据的过程。
啰嗦了这么一大段,其实就是为了引出数据读取的入口,还是要回到ShuffleMapTask
,这里我只贴部分代码:
1 | // shuffle管理器 |
读取数据的代码其实就是rdd.iterator(partition, context)
,iterator
方法主要是处理rdd缓存的逻辑,如果有缓存就会从缓存中读取(通过BlockManager
),如果没有缓存就会进行实际的计算,发现最终调用RDD.compute
方法进行实际的计算,这个方法是一个抽象方法,是由子类实现的具体的计算逻辑,用户代码中对于RDD做的一些变换操作实际上最终都会体现在compute方法中。
另一方面,我们知道,map,filter这类算子不是shuffle操作,不会导致stage的划分,所以我们想看shuffle读过程就要找一个Shuffle类型的操作,我们看一下RDD.groupBy,最终调用了groupByKey方法
RDD.groupByKey
1 | def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { |
最终调用了combineByKeyWithClassTag
RDD.combineByKeyWithClassTag
1 | def combineByKeyWithClassTag[C]( |
做一些判断,检查一些非法情况,然后处理一下分区器,最后返回一个ShuffledRDD,所以接下来我们分析一下ShuffleRDD的compute方法
有一个很有特色的地方,如果分区器的分区和当前的分区一样,其实是不需要做shuffle的。也就是说groupByKey其实不一定会触发shuffle。
ShuffleRDD.compute
通过shuffleManager获取一个读取器,数据读取的逻辑在读取器里。
1 | override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = { |
SortShuffleManager.getReader
无需多说,直接看BlockStoreShuffleReader
1 | override def getReader[K, C]( |
BlockStoreShuffleReader.read
显然,这个方法才是核心所在。总结一下主要步骤:
- 获取一个包装的迭代器ShuffleBlockFetcherIterator,它迭代的元素是blockId和这个block对应的读取流,很显然这个类就是实现reduce阶段数据读取的关键
- 将原始读取流转换成反序列化后的迭代器
- 将迭代器转换成能够统计度量值的迭代器,这一系列的转换和java中对于流的各种装饰器很类似
- 将迭代器包装成能够相应中断的迭代器。每读一条数据就会检查一下任务有没有被杀死,这种做法是为了尽量及时地响应杀死任务的请求,比如从driver端发来杀死任务的消息。
- 利用聚合器对结果进行聚合。这里再次利用了AppendonlyMap这个数据结构,前面shuffle写阶段也用到这个数据结构,它的内部是一个以数组作为底层数据结构的,以线性探测法线性的hash表。
- 最后对结果进行排序。
所以很显然,我们想知道的shuffle读取数据的具体逻辑就藏在ShuffleBlockFetcherIterator中
1 | private[spark] class BlockStoreShuffleReader[K, C]( |
ShuffleBlockFetcherIterator
这个类比较复杂,仔细看在类初始化的代码中会调用initialize方法。
其次,我们应该注意它的构造器中的参数,
1 | val wrappedStreams = new ShuffleBlockFetcherIterator( |
ShuffleBlockFetcherIterator.initialize
- 首先将本地的block和远程的block分隔开
- 然后开始发送请求拉取远程数据。这个过程中会有一些约束条件限制拉取数据请求的数量,主要是正在获取的总数据量的限制,请求并发数限制;每个远程地址同时拉取的块数也会有限制,但是这个阈值默认是Integer.MAX_VALUE
- 获取本地的block数据
其中,获取本地数据较为简单,主要就是通过本节点的BlockManager来获取块数据,并通过索引文件获取数据指定分区的数据。
我们着重分析远程拉取的部分
1 | private[this] def initialize(): Unit = { |
ShuffleBlockFetcherIterator.splitLocalRemoteBlocks
我们首先来看如何切分远程和本地的数据块,总结一下这个方法:
首先将同时拉取的数据量的大小除以5作为每次请求拉取的数据量的限制,这么做的原因是为了允许同时从5个节点拉取数据,因为节点的网络环境可能并不稳定,同时从多个节点拉取数据有助于减少网络波动对性能带来的影响,而对整体的同时拉取数据量的限制主要是为了限制本机网络流量的使用
循环遍历每一个节点地址(这里是BlockManagerId),
如果地址与本机地址相同,那么对应的blocks就是本地block
对于远程block,则要根据同时拉取数据量大小的限制将每个节点的所有block切分成多个请求(FetchRequest),确保这些请求单次的拉取数据量不会太大
1 | private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { |
ShuffleBlockFetcherIterator.fetchUpToMaxBytes
回到initialize方法中,在完成本地与远程block的切分后,我们得到了一批封装好的数据拉取请求,将这些请求加到队列中,接下来要做的是通过rpc客户端发送这些请求,
这个方法逻辑还是相对简单,主要逻辑就是两个循环,先发送延缓队列中的请求,然后发送正常的请求;之所以会有延缓队列是因为这些请求在第一次待发送时因为数据量超过阈值或者请求数量超过阈值而不能发送,所以就被放到延缓队列中,而这里的处理也是优先发送延缓队列中的请求。每个请求在发送前必须要满足下面几个条件才会被发送:
当前正在拉取的数据量不能超过阈值maxReqsInFlight(默认48m);这里会有一个问题,如果某个block的数据量超过maxReqsInFlight值呢?这种情况下会等当前已经没有进行中的数据拉取请求才会发送这个请求,因为在对当前请求数据量阈值进行判断时会检查bytesInFlight == 0,如果这个条件满足就不会检查本次请求的数据量是否会超过阈值。
当前正在拉取的请求数据量不能超过阈值(默认Int.MaxValue)
每个远程地址的同时请求数量也会有限制(默认Int.MaxValue)
最后符合条件的请求就会被发送,这里要提出的一点是如果一次请求的数据量超过maxReqSizeShuffleToMem值,那么就会写入磁盘的一个临时文件中,而这个阈值的默认值是Long.MaxValue,所以默认情况下是没有限制的。
1 | // 发送请求 |
ShuffleBlockFetcherIterator.next
通过上一个方法的分析,我们能够看出来,初始化时发起的拉取数据的请求并未将所有请求全部发送出去,并且还会有请求因为超过阈值而被放入延缓队列中,那么这些未发送的请求是什么时候被再次发送的呢?答案就在next方法中。我们知道ShuffleBlockFetcherIterator是一个迭代器,所以外部调用者对元素的访问是通过next方法,所以很容易想到next方法中肯定会有发送拉取数据请求的逻辑。
总结一下:
首先从结果队列中获取一个拉取成功的结果(结果队列是一个阻塞队列,如果没有拉取成功的结果会阻塞调用者)
拿到一个结果后检查这个结果是拉取成功还是拉取失败,如果失败则直接抛异常(重试的逻辑实在rpc客户端实现的,不是在这里实现)
如果是一个成功的结果,首先要更新一下一些任务度量值,更新一些内部的簿记量,如正在拉取的数据量
将拉取到的字节缓冲包装成一个字节输入流
通过外部传进来的函数对流再包装一次,通过外部传进来的函数再包装一次,一般是解压缩和解密
而且流被压缩或者加密过,如果块的大小比较小,那么要将这个流拷贝一份,这样就会实际出发解压缩和解密,以此来尽早暴露块损坏的 问题
最后一句关键语句,再次发起一轮拉取数据请求的发 送,因为经过next处理之后,已经有拉取成功的数据了,正在拉取的数据量和请求数量可能减小了,这就为发送新的请求腾出空间
1 | override def next(): (BlockId, InputStream) = { |
总结
到此,我们就把shuffle读的过程大概分析完了。整体下来,感觉主干逻辑不是很复杂,但是里面有很多细碎逻辑,所以上面的分析还是比较碎,这里把整个过程的主干逻辑再提炼一下,以便能有个整体的认识:
首先,在一些shuffle类型的RDD中,它的计算方法compute会通过ShuffleManager获取一个block数据读取器BlockStoreShuffleReader
通过BlockStoreShuffleReader中的read方法进行数据的读取,一个reduce端分区的数据一般会依赖于所有的map端输出的分区数据,所以数据一般会在多个executor(注意是executor节点,通过BlockManagerId唯一标识,一个物理节点可能会运行多个executor节点)节点上,而且每个executor节点也可能会有多个block,在shuffle写过程的分析中我们也提到,每个map最后时输出一个数据文件和索引文件,也就是一个block,但是因为一个节点
这个方法通过ShuffleBlockFetcherIterator对象封装了远程拉取数据的复杂逻辑,并且最终将拉取到的数据封装成流的迭代器的形式
对所有的block的流进行层层装饰,包括反序列化,任务度量值(读入数据条数)统计,每条数据可中断,对数据进行聚合
对聚合后的数据进行排序
所以,从这里我们也能看出来,新版的shuffle机制中,也就是SortShuffleManager,用户代码对于shuffle之后的rdd拿到的是经过排序的数据(如果指定排序器的话)。