Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)
上QQ阅读APP看书,第一时间看更新

8.3 Task全生命周期详解

本节讲解Task的生命过程,对Task在Driver和Executor中交互的全生命周期原理和源码进行详解。

8.3.1 Task的生命过程详解

Task的生命过程详解如下。

(1)当Driver中的CoarseGrainedSchedulerBackend给CoarseGrainedExecutorBackend发送LaunchTask之后,CoarseGrainedExecutorBackend收到LaunchTask消息后,首先会反序列化TaskDescription。

(2)Executor会通过launchTask执行Task,在launchTask方法中调用new()函数创建TaskRunner,TaskRunner继承自Runnable接口。

(3)TaskRunner在ThreadPool运行具体的Task,在TaskRunner的run方法中首先会通过调用statusUpdate给Driver发信息汇报自己的状态,说明自己是Running状态。其中execBackend是ExecutorBackend,ExecutorBackend是一个trait,其具体的实现子类是CoarseGrained-ExecutorBackend,其中的statusUpdate方法中将向Driver提交StatusUpdate消息。

(4)TaskRunner内部会做一些准备工作:例如,反序列化Task的依赖,然后通过网络获取需要的文件、Jar等。

(5)然后是反序列Task本身。

(6)调用反序列化后的Task.run方法来执行任务,并获得执行结果。其中Task的run方法调用时会导致Task的抽象方法runTask的调用,在Task的runTask内部会调用RDD的iterator方法,该方法就是我们针对当前Task所对应的Partition进行计算的关键所在,在处理的内部会迭代Partition的元素,并交给我们自定义的function进行处理。

 对于ShuffleMapTask,首先要对RDD以及其依赖关系进行反序列化,最终计算会调用RDD的compute方法。具体计算时有具体的RDD,例如,MapPartitionsRDD的compute。compute方法其中的f就是我们在当前的Stage中计算具体Partition的业务逻辑代码。

 对于ResultTask:调用rdd.iterator方法,最终计算仍然会调用RDD的compute方法。

(7)把执行结果序列化,并根据大小判断不同的结果传回给Driver。

(8)CoarseGrainedExecutorBackend给DriverEndpoint发送StatusUpdate来传输执行结果。DriverEndpoint会把执行结果传递给TaskSchedulerImpl处理,然后交给TaskResultGetter内部通过线程去分别处理Task执行成功和失败时的不同情况,最后告诉DAGScheduler任务处理结束的状况。

说明:

①在执行具体Task的业务逻辑前,会进行四次反序列。

· TaskDescription的反序列化。

· 反序列化Task的依赖。

· Task的反序列化。

· RDD反序列化。

②在Spark 1.6中,AkkFrameSize是128MB,所以可以广播非常大的任务;而任务的执行结果最大可以达到1GB。Spark 2.2版本中,CoarseGrainedSchedulerBackend的launchTask方法中序列化任务大小的限制是maxRpcMessageSize为128MB。

8.3.2 Task在Driver和Executor中交互的全生命周期原理和源码详解

在Standalone模式中,Driver中的CoarseGrainedSchedulerBackend给CoarseGrained-ExecutorBackend发送launchTasks消息,CoarseGrainedExecutorBackend收到launchTasks消息以后会调用executor.launchTask。

CoarseGrainedExecutorBackend的receive方法如下,模式匹配收到LaunchTask消息:

(1)LaunchTask判断Executor是否存在,如果Executor不存在,则直接退出,然后会反序列化TaskDescription。

CoarseGrainedExecutorBackend的receive方法的源码如下:

1.  val taskDesc = ser.deserialize[TaskDescription](data.value)

(2)Executor会通过launchTask来执行Task,launchTask方法中分别传入taskId、尝试次数、任务名称、序列化后的任务本身。

CoarseGrainedExecutorBackend的receive方法的源码如下:

进入Executor.scala的launchTask方法,在launchTask方法中调用new()函数创建一个TaskRunner,传入的参数包括taskId、尝试次数、任务名称、序列化后的任务本身。然后放入runningTasks数据结构,在threadPool中执行TaskRunner。

TaskRunner本身是一个Runnable接口。

下面看一下TaskRunner的run方法。TaskMemoryManager是内存的管理,deserialize-StartTime是反序列化开始的时间,setContextClassLoader是ClassLoader加载具体的类。ser是序列化器。

然后调用execBackend.statusUpdate,statusUpdate是ExecutorBackend的方法,Executor-Backend通过statusUpdate给Driver发信息,汇报自己的状态。

其中,execBackend是ExecutorBackend,ExecutorBackend是一个trait,其具体的实现子类是CoarseGrainedExecutorBackend。execBackend实例是在CoarseGrainedExecutorBackend的receive方法收到LaunchTask消息,调用executor.launchTask(this, taskId = taskDesc.taskId,attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask)时将CoarseGrainedExecutorBackend自己本身的this实例传进来的。这里调用CoarseGrained-ExecutorBackend的statusUpdate方法。statusUpdate方法将向Driver提交StatusUpdate消息。

CoarseGrainedExecutorBackend的statusUpdate的源码如下:

(3)TaskRunner的run方法中,TaskRunner在ThreadPool中运行具体的Task,在TaskRunner的run方法中首先会通过调用statusUpdate给Driver发信息汇报自己的状态,说明自己是Running状态。

1.  execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)

其中,EMPTY_BYTE_BUFFER没有具体内容。

1.  private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))

接下来通过Task.deserializeWithDependencies(serializedTask)反序列化Task,得到一个Tuple,获取到taskFiles、taskJars、taskProps、taskBytes等信息。

(4)Executor会通过TaskRunner在ThreadPool中运行具体的Task,TaskRunner内部会做一些准备工作:反序列化Task的依赖。

Executor.scala的源码如下:

然后通过网络来获取需要的文件、Jar等。

Executor.scala的源码如下:

再来看一下updateDependencies方法。从SparkContext收到一组新的文件JARs,下载Task运行需要的依赖Jars,在类加载机中加载新的JARs包。updateDependencies方法的源码如下所示。

Executor.scala的源码如下:

Executor的updateDependencies方法中,Executor运行具体任务时进行下载,下载文件使用synchronized关键字,因为Executor在线程中运行,同一个Stage内部不同的任务线程要共享这些内容,因此ExecutorBackend多条线程资源操作的时候,需要通过同步块加锁。

updateDependencies方法的Utils.fetchFile将文件或目录下载到目标目录,支持各种方式获取文件,包括HTTP,Hadoop兼容的文件系统、标准文件系统的文件,基于URL参数。获取目录只支持从Hadoop兼容的文件系统。如果usecache设置为true,第一次尝试取文件到本地缓存,执行同一应用程序进行共享。usecache主要用于executors,而不是本地模式。如果目标文件已经存在,并有不同于请求文件的内容,将抛出SparkException异常。

Spark 2.2.1版本的doFetchFile方法如下,包括spark、http | https | ftp、file各种协议方式的下载。

Spark 2.4.3版本的Utils.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第7行新增一个函数,返回类型File。

1.  ......
2.  hadoopConf: Configuration): File = {
3.  ......

(5)回到TaskRunner的run方法,所有依赖的Jar都下载完成后,然后是反序列Task本身。

Executor.scala的源码如下:

在执行具体Task的业务逻辑前会进行四次反序列。

① TaskDescription的反序列化。

②反序列化Task的依赖。

③ Task的反序列化。

④ RDD反序列化。

(6)回到TaskRunner的run方法,调用反序列化后的Task.run方法来执行任务并获得执行结果。

其中,Task的run方法调用时会导致Task的抽象方法runTask的调用,在Task的runTask内部会调用RDD的iterator方法,该方法就是针对当前Task所对应的Partition进行计算的关键所在,在处理的内部会迭代Partition的元素并交给自定义的function进行处理。

进入task.run方法,在run方法里面再调用runTask方法。

Spark 2.2.1版本的Task.scala的源码如下:

Spark 2.4.3版本的Task.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第6行context名称调整为taskContext。

进入Task.scala的runTask方法,这里是一个抽象方法,没有具体的实现。

1.  def runTask(context: TaskContext): T

Task包括两种Task:ResultTask和ShuffleMapTask。抽象runTask方法由子类的runTask实现。先看一下ShuffleMapTask的runTask方法,runTask实际运行的时候会调用RDD的iterator,然后针对partition进行计算。

ShuffleMapTask在计算具体的Partition之后实际上会通过shuffleManager获得的shuffleWriter把当前Task计算内容根据具体的shuffleManager实现写入到具体的文件中。操作完成以后会把MapStatus发送给DAGscheduler,Driver的DAGScheduler的MapOutputTracker会收到注册的信息。

同样地,ResultTask的runTask方法也是调用RDD的iterator,然后针对partition进行计算。MapOutputTracker会把ShuffleMapTask执行结果交给ResultTask,ResultTask根据前面Stage的执行结果进行Shuffle,产生整个Job最后的结果。

ResultTask、ShuffleMapTask的runTask方法真正执行的时候,调用RDD的iterator,对Partition进行计算。ResultTask.scala的runTask方法的源码如下:

RDD.scala的iterator方法源码如下:

如果storageLevel不等于NONE,就直接获取或者计算得到RDD的分区;如果storageLevel是空,就从checkpoint中读取或者计算RDD分区。

进入computeOrReadCheckpoint:

最终计算会调用RDD的compute方法。

1.  def compute(split: Partition, context: TaskContext): Iterator[T]

RDD的compute方法中的Partition是一个trait。

RDD的compute方法中的TaskContext里面有很多方法,包括任务是否完成、任务是否中断、任务是否在本地运行、任务运行完成时的监听器、任务运行失败的监听器、stageId、partitionId、重试的次数等。

下面看一下TaskContext具体的实现TaskContextImpl。TaskContextImpl维持了很多上下文信息,如stageId、partitionId、taskAttemptId、重试次数、taskMemoryManager等。

RDD的compute方法具体计算的时候有具体的RDD,如MapPartitionsRDD的compute、传进去的Partition及TaskContext上下文。

MapPartitionsRDD.scala的compute的源码如下:

MapPartitionsRDD.scala的compute中的f就是我们在当前的Stage中计算具体Partition的业务逻辑代码。f是函数,是我们自己写的业务逻辑。Stage从后往前推,把所有的RDD合并变成一个,函数也会变成一个链条,展开成一个很大的函数。Compute返回的是一个Iterator。

Task包括两种Task:ResultTask和ShuffleMapTask。

先看一下ShuffleMapTask的runTask方法,从ShuffleMapTask的角度讲,rdd.iterator获得数据记录以后,对rdd.iterator计算后的Iterator记录进行write。

ResultTask.scala的runTask方法较简单:在ResultTask中,rdd.iterator获得数据记录以后,直接调用func函数。func函数是Task任务反序列化后直接获得的fun函数。

(7)回到TaskRunner的run方法,把执行结果序列化,并根据大小判断不同的结果传回给Driver。

 task.run运行的结果赋值给value。

 resultSer.serialize(value)把task.run的执行结果value序列化。

 maxResultSize > 0 && resultSize > maxResultSize对任务执行结果的大小进行判断,并进行相应的处理。任务执行完以后,任务的执行结果最大可以达到1GB。

如果任务执行结果特别大,超过1GB,日志就会提示超出任务大小限制。返回元数据ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))。

如果任务执行结果小于1GB,大于maxDirectResultSize(128MB),就放入blockManager,返回元数据ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))。

如果任务执行结果小于128MB,就直接返回serializedDirectResult。

TaskRunner的run方法如下所示。

Executor.scala的源码如下:

Spark 2.4.3版本的Executor.scala的源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第3行将try语句调整为Utils.tryWithSafeFinally。tryWithSafeFinally方法执行代码块,然后执行finally块,但如果异常发生在finally块,不要抑制原来的异常。这主要是finally out.close块的问题,其中需要调用Close来清除,但如果在Out.Write发生异常,它很可能已损坏,Out.Close也将失败,这将抑制原始的可能更有意义的out.write调用的异常。

其中的maxResultSize大小是1GB,任务的执行结果最大可以达到1GB。

Spark 2.2.1版本的Executor.scala的源码如下:

Spark 2.4.3版本的Executor.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第3行getMaxResultSize方法调整为从配置文件读取MAX_RESULT_SIZE,用于对结果的大小限制。

其中的Executor.scala中的maxDirectResultSize大小,取spark.task.maxDirectResultSize和RpcUtils.maxMessageSizeBytes的最小值。其中spark.rpc.message.maxSize默认配置是128MB。spark.task.maxDirectResultSize在配置文件中进行配置。

Driver发消息给Executor,CoarseGrainedSchedulerBackend的launchTask方法中序列化任务大小的限制是maxRpcMessageSize,大小是128MB。

CoarseGrainedSchedulerBackend.scala的源码如下:

回到TaskRunner的run方法,execBackend.statusUpdate(taskId, TaskState.FINISHED,serializedResult)给Driver发送一个消息,消息中将taskId、TaskState.FINISHED、serializedResult放进去。

statusUpdate方法的源码如下:

(8)CoarseGrainedExecutorBackend给DriverEndpoint发送StatusUpdate来传输执行结果,DriverEndpoint会把执行结果传递给TaskSchedulerImpl处理,然后交给TaskResultGetter内部通过线程去分别处理Task执行成功和失败的不同情况,最后告诉DAGScheduler任务处理结束的状况。

CoarseGrainedSchedulerBackend.scala中DriverEndpoint的receive方法如下:

DriverEndpoint的receive方法中,StatusUpdate调用scheduler.statusUpdate,然后释放资源,再次进行资源调度makeOffers(executorId)。

TaskSchedulerImpl的statusUpdate中:

 如果是TaskState.LOST,则记录下原因,将Executor清理掉。

 如果是TaskState.isFinished,则从taskSet中运行的任务中remove掉任务,调用taskResultGetter.enqueueSuccessfulTask处理。

 如果是TaskState.FAILED、TaskState.KILLED、TaskState.LOST,则调用taskResultGetter.enqueueFailedTask处理。

Spark 2.2.1版本的TaskSchedulerImpl的statusUpdate的源码如下:

Spark 2.4.3版本的TaskSchedulerImpl.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第6行将根据任务ID获取的TaskSetManager调整为Option(taskIdToTaskSet-Manager.get(tid))。

其中,taskResultGetter是TaskResultGetter的实例化对象。

TaskResultGetter.scala的源码如下:

TaskResultGetter.scala的enqueueSuccessfulTask方法中,处理成功任务的时候开辟了一条新线程,先将结果反序列化,然后根据接收的结果类型DirectTaskResult、IndirectTaskResult分别处理。

如果是DirectTaskResult,则直接获得结果并返回。

如果是IndirectTaskResult,就通过blockManager.getRemoteBytes远程获取。获取以后再进行反序列化。

最后是scheduler.handleSuccessfulTask。

TaskSchedulerImpl的handleSuccessfulTask的源码如下:

TaskSchedulerImpl中也有失败任务的相应处理。

TaskSchedulerImpl.scala的源码如下:

TaskSchedulerImpl的handleSuccessfulTask交给TaskSetManager调用handleSuccessfulTask,告诉DAGScheduler任务处理结束的状况,并且Kill掉其他尝试的相同任务(因为一个任务已经尝试成功,其他的相同任务没必要再次去尝试)。

Spark 2.2.1版本的TaskSetManager的handleSuccessfulTask的源码如下:

Spark 2.4.3版本的TaskSetManager.scala的handleSuccessfulTask的源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第4行之后新增代码,检查在此之前是否有其他尝试成功。如已完成,但由于另一次尝试成功而未提交,将此任务作为已终止的任务处理。

 上段代码中第16行之前新增killedByOtherAttempt的代码。killedByOtherAttempt是一个HashSet,当任务被其他尝试任务杀死时,将任务的tid添加到此HashSet中。这是在将spark.speculation设置为true时发生的。被别人杀死的任务,executor丢失时不应重新提交。

speculationEnabled默认设置为spark.speculation=false,用于推测执行慢的任务;如果设置为true,successfulTaskDurations使用MedianHeap记录成功任务的持续时间,这样就可以确定什么时候启动推测性任务,这种情况只在启用推测时使用,以避免不使用堆时增加堆中的开销。

TaskSetManager的handleSuccessfulTask中调用了maybeFinishTaskSet。maybeFinishTaskSet的源码如下:

TaskSetManager:单TaskSet的任务调度在TaskSchedulerImpl中进行。TaskSetManager类跟踪每项任务,如果任务重试失败(超过有限的次数),对于TaskSet处理本地调度主要的接口是resourceOffer,询问TaskSet是否要在一个节点上运行任务,进行状态更新statusUpdate,告诉TaskSet的一个任务的状态发生了改变(如已完成)。线程:这个类被设计成只在具有锁的代码TaskScheduler上调用(如事件处理程序),不应该从其他线程调用。

图8-3 Task执行及结果处理原理流程图

总结:Task执行及结果处理原理流程图如图8-3所示。任务从Driver上发送过来,CoarseGrainedSchedulerBackend发送任务,CoarseGrainedExecutorBackend收到任务后,交给Executor处理,Executor会通过launchTask执行Task。TaskRunner内部会做很多准备工作:反序列化Task的依赖,通过网络获取需要的文件、Jar、反序列Task本身等待;然后调用Task的runTask执行,runTask有ShuffleMapTask、ResultTask两种。通过iterator()方法根据业务逻辑循环遍历,如果是ShuffleMapTask,就把MapStatus汇报给MapOutTracker;如果是ResultTask,就从前面的MapOutTracker中获取信息。