7.2 Shuffle的框架
本节讲解Shuffle的框架、Shuffle的框架内核、Shuffle数据读写的源码解析。Spark Shuffle从基于Hash的Shuffle,引入了Shuffle Consolidate机制(即文件合并机制),演进到基于Sort的Shuffle实现方式。随着Tungsten计划的引入与优化,引入了基于Tungsten-Sort的Shuffle实现方式。
7.2.1 Shuffle的框架演进
Spark的Shuffle框架演进历史可以从框架本身的演进、Shuffle具体实现机制的演进两部分进行解析。
框架本身的演进可以从面向接口编程的原则出发,结合Build设计模式进行理解。整个Spark的Shuffle框架从Spark 1.1版本开始,提供便于测试、扩展的可插拔式框架。
而对应Shuffle的具体实现机制的演进部分,可以跟踪Shuffle实现细节在各个版本中的变更。具体体现在Shuffle数据的写入或读取,以及读写相关的数据块解析方式。下面简单描述一下整个演进过程。
在Spark 1.1之前,Spark中只实现了一种Shuffle方式,即基于Hash的Shuffle。在基于Hash的Shuffle的实现方式中,每个Mapper阶段的Task都会为每个Reduce阶段的Task生成一个文件,通常会产生大量的文件(即对应为M×R个中间文件,其中,M表示Mapper阶段的Task个数,R表示Reduce阶段的Task个数)。伴随大量的随机磁盘I/O操作与大量的内存开销。
为了缓解上述问题,在Spark 0.8.1版本中为基于Hash的Shuffle的实现引入了Shuffle Consolidate机制(即文件合并机制),将Mapper端生成的中间文件进行合并的处理机制。通过将配置属性spark.shuffle.consolidateFiles设置为true,减少中间生成的文件数量。通过文件合并,可以将中间文件的生成方式修改为每个执行单位(类似于Hadoop的Slot)为每个Reduce阶段的Task生成一个文件。其中,执行单位对应为:每个Mapper阶段的Cores数/每个Task分配的Cores数(默认为1)。最终可以将文件个数从M×R修改为E×C/T×R,其中,E表示Executors个数,C表示可用Cores个数,T表示Task分配的Cores个数。
基于Hash的Shuffle的实现方式中,生成的中间结果文件的个数都会依赖于Reduce阶段的Task个数,即Reduce端的并行度,因此文件数仍然不可控,无法真正解决问题。为了更好地解决问题,在Spark 1.1版本引入了基于Sort的Shuffle实现方式,并且在Spark 1.2版本之后,默认的实现方式也从基于Hash的Shuffle,修改为基于Sort的Shuffle实现方式,即使用的ShuffleManager从默认的hash修改为sort。首先,每个Mapper阶段的Task不会为每个Reduce阶段的Task生成一个单独的文件;而是全部写到一个数据(Data)文件中,同时生成一个索引(Index)文件,Reduce阶段的各个Task可以通过该索引文件获取相关的数据。避免产生大量文件的直接收益就是降低随机磁盘I/O与内存的开销。最终生成的文件个数减少到2M,其中M表示Mapper阶段的Task个数,每个Mapper阶段的Task分别生成两个文件(1个数据文件、1个索引文件),最终的文件个数为M个数据文件与M个索引文件。因此,最终文件个数是2×M个。
随着Tungsten计划的引入与优化,从Spark 1.4版本开始(Tungsten计划目前在Spark 1.5与Spark 1.6两个版本中分别实现了第一与第二两个阶段),在Shuffle过程中也引入了基于Tungsten-Sort的Shuffle实现方式,通过Tungsten项目所做的优化,可以极大提高Spark在数据处理上的性能。
为了更合理、更高效地使用内存,在Spark的Shuffle实现方式演进过程中,引进了外部排序等处理机制(针对基于Sort的Shuffle机制。基于Hash的Shuffle机制从最原始的全部放入内存改为记录级写入)。同时,为了保存Shuffle结果提高性能以及支持资源动态分配等特性,也引进了外部Shuffle服务等机制。
7.2.2 Shuffle的框架内核
Shuffle框架的设计可以从两方面理解:一方面,为了Shuffle模块更加内聚并与其他模块解耦;另一方面,为了更方便替换、测试、扩展Shuffle的不同实现方式。从Spark 1.1版本开始,引进了可插拔式的Shuffle框架(通过将Shuffle相关的实现封装到一个统一的对外接口,提供一种具体实现可插拔的框架)。Spark框架中,通过ShuffleManager来管理各种不同实现机制的Shuffle过程,由ShuffleManager统一构建、管理具体实现子类来实现Shuffle框架的可插拔的Shuffle机制。
在详细描述Shuffle框架实现细节之前,先给出可插拔式Shuffle的整体架构的类图,如图7-2所示。
图7-2 可插拔式Shuffle的整体架构的类图
在DAG的调度过程中,Stage阶段的划分是根据是否有Shuffle过程,也就是当存在ShuffleDependency的宽依赖时,需要进行Shuffle,这时会将作业(Job)划分成多个Stage。对应地,在源码实现中,通过在划分Stage的关键点——构建ShuffleDependency时——进行Shuffle注册,获取后续数据读写所需的ShuffleHandle。
Stage阶段划分后,最终每个作业(Job)提交后都会对应生成一个ResultStage与若干个ShuffleMapStage,其中ResultStage表示生成作业的最终结果所在的Stage。ResultStage与ShuffleMapStage中的Task分别对应了ResultTask与ShuffleMapTask。一个作业,除了最终的ResultStage,其他若干ShuffleMapStage中的各个ShuffleMapTask都需要将最终的数据根据相应的分区器(Partitioner)对数据进行分组(即将数据重组到新的各个分区中),然后持久化分组后的数据。对应地,每个RDD本身记录了它的数据来源,在计算(compute)时会读取所需数据,对于带有宽依赖的RDD,读取时会获取在ShuffleMapTask中持久化的数据。
从图7-2中可以看到,外部宽依赖相关的RDD与ShuffleManager之间的注册交互,通过该注册,每个RDD自带的宽依赖(ShuffleDependency)内部会维护Shuffle的唯一标识信息ShuffleId以及与Shuffle过程具体读写相关的句柄ShuffleHandle,后续在ShuffleMapTask中启动任务(Task)的运行时,可以通过该句柄获取相关的Shuffle写入器实例,实现具体的数据磁盘写操作。
而在带有宽依赖(ShuffleDependency)的RDD中,执行compute时会去读取上一Stage为其输出的Shuffle数据,此时同样会通过该句柄获取相关的Shuffle读取器实例,实现具体数据的读取操作。需要注意的是,当前Shuffle的读写过程中,与BlockManager的交互,是通过MapOutputTracker来跟踪Shuffle过程中各个任务的输出数据的。在任务完成等场景中,会将对应的MapStatus信息注册到MapOutputTracker中,而在compute数据读取过程中,也会通过该跟踪器来获取上一Stage的输出数据在BlockManager中的位置,然后通过getReader得到的数据读取器,从这些位置中读取数据。
目前对Shuffle的输出进行跟踪的MapOutputTracker并没有和Shuffle数据读写类一样,也封装到Shuffle的框架中。如果从代码聚合与解耦等角度出发,也可以将MapOutputTracker合并到整个Shuffle框架中,然后在Shuffle写入器输出数据之后立即进行注册,在数据读取器读取数据前获取位置等(但对应的DAG等调度部分,也需要进行修改)。
ShuffleManager封装了各种Shuffle机制的具体实现细节,包含的接口与属性如下所示。
(1)registerShuffle:每个RDD在构建它的父依赖(这里特指ShuffleDependency)时,都会先注册到ShuffleManager,获取ShuffleHandler,用于后续数据块的读写等。
(2)getWriter:可以通过ShuffleHandler获取数据块写入器,写数据时通过Shuffle的块解析器shuffleBlockResolver,获取写入位置(通常将写入位置抽象为Bucket,位置的选择则由洗牌的规则,即Shuffle的分区器决定),然后将数据写入到相应位置(理论上,位置可以位于任何能存储数据的地方,包括磁盘、内存或其他存储框架等,目前在可插拔框架的几种实现中,Spark与Hadoop一样都采用磁盘的方式进行存储,主要目的是为了节约内存,同时提高容错性)。
(3)getReader:可以通过ShuffleHandler获取数据块读取器,然后通过Shuffle的块解析器shuffleBlockResolver,获取指定数据块。
(4)unregisterShuffle:与注册对应,用于删除元数据等后续清理操作。
(5)shuffleBlockResolver:Shuffle的块解析器,通过该解析器,为数据块的读写提供支撑层,便于抽象具体的实现细节。
7.2.3 Shuffle框架的源码解析
用户可以通过自定义ShuffleManager接口,并通过指定的配置属性进行设置,也可以通过该配置属性指定Spark已经支持的ShuffleManager具体实现子类。
在SparkEnv源码中可以看到设置的配置属性,以及当前在Spark的ShuffleManager可插拔框架中已经提供的ShuffleManager具体实现。Spark 2.0版本中支持sort、tungsten-sort两种方式。
SparkEnv.scala的源码如下:
从代码中可以看出,ShuffleManager是Spark Shuffle系统提供的一个可插拔式接口,可以通过spark.shuffle.manager配置属性来设置自定义的ShuffleManager。
在Driver和每个Executor的SparkEnv实例化过程中,都会创建一个ShuffleManager,用于管理块数据,提供集群块数据的读写,包括数据的本地读写和读取远程节点的块数据。
Shuffle系统的框架可以以ShuffleManager为入口进行解析。在ShuffleManager中指定了整个Shuffle框架使用的各个组件,包括如何注册到ShuffleManager,以获取一个用于数据读写的处理句柄ShuffleHandle,通过ShuffleHandle获取特定的数据读写接口:ShuffleWriter与ShuffleReader,以及如何获取块数据信息的解析接口ShuffleBlockResolver。下面通过源码分别对这几个比较重要的组件进行解析。
1.ShuffleManager的源码解析
由于ShuffleManager是Spark Shuffle系统提供的一个可插拔式接口,提供具体实现子类或自定义具体实现子类时,都需要重写ShuffleManager类的抽象接口。下面首先分析ShuffleManager的源码。
ShuffleManager.scala的源码如下:
2.ShuffleHandle的源码解析
1. abstract class ShuffleHandle(val shuffleId: Int) extends Serializable {}
ShuffleHandle比较简单,用于记录Task与Shuffle相关的一些元数据,同时也可以作为不同具体Shuffle实现机制的一种标志信息,控制不同具体实现子类的选择等。
3.ShuffleWriter的源码解析
ShuffleWriter.scala的源码如下:
继承ShuffleWriter的每个具体子类会实现write接口,给出任务在输出时的写记录的具体方法。
4.ShuffleReader的源码解析
ShuffleReader.scala的源码如下:
继承ShuffleReader的每个具体子类会实现read接口,计算时负责从上一阶段Stage的输出数据中读取记录。
5.ShuffleBlockResolver的源码解析
ShuffleBlockResolver的源码如下:
继承ShuffleBlockResolver的每个具体子类会实现getBlockData接口,给出具体的获取块数据的方法。
目前在ShuffleBlockResolver的各个具体子类中,除给出获取数据的接口外,通常会提供如何解析块数据信息的接口,即提供了写数据块时的物理块与逻辑块之间映射关系的解析方法。
7.2.4 Shuffle数据读写的源码解析
1.Shuffle写数据的源码解析
从Spark Shuffle的整体框架中可以看到,ShuffleManager提供了Shuffle相关数据块的写入与读取,即对应的接口getWriter与getReader。
在解析Shuffle框架数据读取过程中,可以构建一个具有ShuffleDependency的RDD,查看执行过程中,Shuffle框架中的数据读写接口getWriter与getReader如何使用,通过这种具体案例的方式来加深对源码的理解。
Spark中Shuffle具体的执行机制可以参考本书的其他章节,在此仅分析与Shuffle直接相关的内容。通过DAG调度机制的解析,可以知道Spark中一个作业可以根据宽依赖切分Stages,而在Stages中,相应的Tasks也包含两种,即ResultTask与ShuffleMapTask。其中,一个ShuffleMapTask会基于ShuffleDependency中指定的分区器,将一个RDD的元素拆分到多个buckets中,此时通过ShuffleManager的getWriter接口获取数据与buckets的映射关系。而ResultTask对应的是一个将输出返回给应用程序Driver端的Task,在该Task执行过程中,最终都会调用RDD的compute对内部数据进行计算,而在带有ShuffleDependency的RDD中,在compute计算时,会通过ShuffleManager的getReader接口,获取上一个Stage的Shuffle输出结果作为本次Task的输入数据。
首先来看ShuffleMapTask中的写数据流程。ShuffleMapTask.scala的源码如下:
2.Shuffle读数据的源码解析
对应的数据读取器,从RDD的5个抽象接口可知,RDD的数据流最终会经过算子操作,即RDD中的compute方法。下面以包含宽依赖的RDD、CoGroupedRDD为例,查看如何获取Shuffle的数据。CoGroupedRDD.scala的源码如下:
从代码中可以看到,带宽依赖的RDD的compute操作中,最终是通过SparkEnv中的ShuffleManager实例的getReader方法,获取数据读取器的,然后再次调用读取器的read读取指定分区范围的Shuffle数据。注意,是带宽依赖的RDD,而非ShuffleRDD,除了ShuffleRDD外,还有其他RDD也可以带上宽依赖的,如前面给出的CoGroupedRDD。
目前支持的几种具体Shuffle实现机制在读取数据的处理上都是一样的。从源码角度可以看到,当前继承了ShuffleReader这一数据读取器的接口的具体子类只有BlockStoreShuffleReader,因此,本章内容仅在此对各种Shuffle实现机制的数据读取进行解析,后续各实现机制中不再重复描述。
源码解析的第一步仍然是查看该类的描述信息,具体格式如下:
从注释上可以看出,读取器负责上一Stage为下一Stage输出数据块的读取。从前面对ShuffleReader接口的解析可知,继承的具体子类需要实现真正的数据读取操作,即实现read方法。因此,该方法便是需要重点关注的源码。
Spark 2.2.1版本的BlockStoreShuffleReader.scala的源码如下:
Spark 2.4.3版本的BlockStoreShuffleReader.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第20行将config.REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM调整为config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM。
上段代码中第64行删掉。
上段代码中第74行将dep.keyOrdering match调整为val resultIter = dep.keyOrdering match。
上段代码中第87行之后的新增代码,如果任务已完成或取消,使用回调停止排序。
上段代码中第92行之后的新增代码,对resultIter的两种情况进行适配处理。
下面进一步解析数据读取的部分细节。首先是数据块获取、读取的ShuffleBlock-FetcherIterator类,在类的构造体中调用了initialize方法(构造体中的表达式会在构造实例时执行),该方法中会根据数据块所在位置(本地节点或远程节点)分别进行读取。
ShuffleBlockFetcherIterator.scala的源码如下:
与Hadoop一样,Spark计算框架也基于数据本地性,即移动数据而非移动计算的原则,因此在获取数据块时,也会考虑数据本地性,尽量从本地读取已有的数据块,然后再远程读取。
另外,数据块的本地性是通过ShuffleBlockFetcherIterator实例构建时所传入的位置信息来判断的,而该信息由MapOutputTracker实例的getMapSizesByExecutorId方法提供,可以参考该方法的返回值类型查看相关的位置信息,返回值类型为:Seq[(BlockManagerId,Seq[(BlockId, Long)])]。其中,BlockManagerId是BlockManager的唯一标识信息,BlockId是数据块的唯一信息,对应的Seq[(BlockId, Long)]表示一组数据块标识ID及其数据块大小的元组信息。
最后简单分析一下如何设置分区内部的排序标识,当需要对分区内的数据进行排序时,会设置RDD中的宽依赖(ShuffleDependency)实例的keyOrdering变量。下面以基于排序的OrderedRDDFunctions提供的sortByKey方法给出解析。
OrderedRDDFunctions.scala的源码如下:
当需要对分区内部的数据进行排序时,构建RDD的同时会设置Key值的排序算法,结合前面的read代码,当指定Key值的排序算法时,就会使用外部排序器对分区内的数据进行排序。