Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)
上QQ阅读APP看书,第一时间看更新

3.5 RDD内部的计算机制

RDD的多个Partition分别由不同的Task处理。Task分为两类:shuffleMapTask、resultTask。本节基于源码对RDD的计算过程进行深度解析。

3.5.1 Task解析

Task是计算运行在集群上的基本计算单位。一个Task负责处理RDD的一个Partition,一个RDD的多个Partition会分别由不同的Task去处理,通过之前对RDD的窄依赖关系的讲解,我们可以发现在RDD的窄依赖中,子RDD中Partition的个数基本都大于等于父RDD中Partition的个数,所以Spark计算中对于每一个Stage分配的Task的数目是基于该Stage中最后一个RDD的Partition的个数来决定的。最后一个RDD如果有100个Partition,则Spark对这个Stage分配100个Task。

Task运行于Executor上,而Executor位于CoarseGrainedExecutorBackend(JVM进程)中。

Spark Job中,根据Task所处Stage的位置,我们将Task分为两类:第一类为shuffleMapTask,指Task所处的Stage不是最后一个Stage,也就是Stage的计算结果还没有输出,而是通过Shuffle交给下一个Stage使用;第二类为resultTask,指Task所处Stage是DAG中最后一个Stage,也就是Stage计算结果需要进行输出等操作,计算到此已经结束;简单地说,Spark Job中除了最后一个Stage的Task为resultTask,其他所有Task都为shuffleMapTask。

3.5.2 计算过程深度解析

Spark中的Job本身内部是由具体的Task构成的,基于Spark程序内部的调度模式,即根据宽依赖的关系,划分不同的Stage,最后一个Stage依赖倒数第二个Stage等,我们从最后一个Stage获取结果;在Stage内部,我们知道有一系列的任务,这些任务被提交到集群上的计算节点进行计算,计算节点执行计算逻辑时,复用位于Executor中线程池中的线程,线程中运行的任务调用具体Task的run方法进行计算,此时,如果调用具体Task的run方法,就需要考虑不同Stage内部具体Task的类型,Spark规定最后一个Stage中的Task的类型为resultTask,因为我们需要获取最后的结果,所以前面所有Stage的Task是shuffleMapTask。

RDD在进行计算前,Driver给其他Executor发送消息,让Executor启动Task,在Executor启动Task成功后,通过消息机制汇报启动成功信息给Driver。Task计算示意图如图3-6所示。

图3-6 Task计算示意图

详细情况如下:Driver中的CoarseGrainedSchedulerBackend给CoarseGrainedExecutor-Backend发送LaunchTask消息。

(1)首先反序列化TaskDescription。

CoarseGrainedExecutorBackend.scala的receive的源码如下:

launchTask中调用了decode方法,解析读取dataIn、taskId、attemptNumber、executorId、name、index等信息,读取相应的JAR、文件、属性,返回TaskDescription值。

Spark 2.2.1版本的TaskDescription.scala的decode的源码如下:

Spark 2.4.3版本的TaskDescription.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第7行之后新增加partitionId的变量。

(2)Executor会通过launchTask执行Task。

(3)Executor的launchTask方法创建一个TaskRunner实例在threadPool来运行具体的Task。

Executor.scala的launchTask的源码如下:

在TaskRunner的run方法首先会通过statusUpdate给Driver发信息汇报自己的状态,说明自己处于running状态。同时,TaskRunner内部会做一些准备工作,如反序列化Task的依赖,通过网络获取需要的文件、Jar等;然后反序列化Task本身。

Spark 2.2.1版本的Executor.scala的run方法的源码如下:

Spark 2.4.3版本的Executor.scala的run源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第15行代码变量taskStart的名称调整为taskStartTime。

1.  var taskStartTime: Long = 0

(4)调用反序列化后的Task.run方法来执行任务,并获得执行结果。

Spark 2.2.1版本的Executor.scala的run方法的源码如下:

Spark 2.4.3版本的Executor.scala的run源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第2行taskStart名称调整为taskStartTime。

 上段代码中第7行try方法调整为Utils.tryWithSafeFinally方法。

task.run方法调用了runTask的方法,而runTask方法是一个抽象方法,runTask方法内部会调用RDD的iterator()方法,该方法就是针对当前Task对应的Partition进行计算的关键所在,在处理的方法内部会迭代Partition的元素,并交给我们自定义的function进行处理。

Task.scala的run方法的源码如下:

task有两个子类,分别是ShuffleMapTask和ResultTask,下面分别对两者进行讲解。

1.ShuffleMapTask

ShuffleMapTask.scala的源码如下:

首先,ShuffleMapTask会反序列化RDD及其依赖关系,然后通过调用RDD的iterator方法进行计算,而iterator方法中进行的最终运算的方法是compute()。

RDD.scala的iterator方法的源码如下:

其中,RDD.scala的computeOrReadCheckpoint的源码如下:

RDD的compute方法是一个抽象方法,每个RDD都需要重写的方法。

此时,选择查看MapPartitionsRDD已经实现的compute方法,可以发现compute方法的实现是通过f方法实现的,而f方法就是我们创建MapPartitionsRDD时输入的操作函数。

Spark 2.2.1版本的MapPartitionsRDD.scala的源码如下:

Spark 2.4.3版本MapPartitionsRDD.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第4行之后新增2个参数isFromBarrier、isOrderSensitive,isFromBarrier参数指示此RDD是否从RDDBarrier转换,至少含有一个RDDBarrier的Stage阶段将转变为屏障阶段(BarrierS tage)。isOrderSensitive参数指示函数是否区分顺序。

 上段代码中第18行之后新增isBarrier_、getOutputDeterministicLevel方法。%

注意:通过迭代器的不断叠加,将每个RDD的小函数合并成一个大的函数流。

然后在计算具体的Partition之后,通过shuffleManager获得的shuffleWriter把当前Task计算的结果根据具体的shuffleManager实现写入到具体的文件中,操作完成后会把MapStatus发送给Driver端的DAGScheduler的MapOutputTracker。

2.ResultTask

Driver端的DAGScheduler的MapOutputTracker把shuffleMapTask执行的结果交给ResultTask,ResultTask根据前面Stage的执行结果进行shuffle后产生整个job最后的结果。

ResultTask.scala的runTask的源码如下:

而ResultTask的runTask方法中反序列化生成func函数,最后通过func函数计算出最终的结果。