4.3 TaskScheduler解析
TaskScheduler的核心任务是提交TaskSet到集群运算并汇报结果。
(1)为TaskSet创建和维护一个TaskSetManager,并追踪任务的本地性以及错误信息。
(2)遇到Straggle任务时,会放到其他节点进行重试。
(3)向DAGScheduler汇报执行情况,包括在Shuffle输出丢失的时候报告fetch failed错误等信息。
4.3.1 TaskScheduler原理剖析
DAGScheduler将划分的一系列的Stage(每个Stage封装一个TaskSet),按照Stage的先后顺序依次提交给底层的TaskScheduler去执行。接下来我们分析TaskScheduler接收到DAGScheduler的Stage任务后,是如何来管理Stage(TaskSet)的生命周期的。
首先,回顾一下DAGScheduler在SparkContext中实例化的时候,TaskScheduler以及SchedulerBackend就已经先在SparkContext的createTaskScheduler创建出实例对象了。
虽然Spark支持多种部署模式(包括Local、Standalone、YARN、Mesos等),但是底层调度器TaskScheduler接口的实现类都是TaskSchedulerImpl。并且,考虑到方便读者对TaskScheduler的理解,对于SchedulerBackend的实现,我们也只专注Standalone部署模式下的具体实现StandaloneSchedulerBackend来作分析。
TaskSchedulerImpl在createTaskScheduler方法中实例化后,就立即调用自己的initialize方法把StandaloneSchedulerBackend的实例对象传进来,从而赋值给TaskSchedulerImpl的backend。在TaskSchedulerImpl的initialize方法中,根据调度模式的配置创建实现了SchedulerBuilder接口的相应的实例对象,并且创建的对象会立即调用buildPools创建相应数量的Pool存放和管理TaskSetManager的实例对象。实现SchedulerBuilder接口的具体类都是SchedulerBuilder的内部类。
(1)FIFOSchedulableBuilder:调度模式是SchedulingMode.FIFO,使用先进先出策略调度。先进先出(FIFO)为默认模式。在该模式下只有一个TaskSetManager池。
(2)FairSchedulableBuilder:调度模式是SchedulingMode.FAIR,使用公平策略调度。
在createTaskScheduler方法返回后,TaskSchedulerImpl通过DAGScheduler的实例化过程设置DAGScheduler的实例对象。然后调用自己的start方法。在TaskSchedulerImpl调用start方法的时候,会调用StandaloneSchedulerBackend的start方法,在StandaloneSchedulerBackend的start方法中,会最终注册应用程序AppClient。TaskSchedulerImpl的start方法中还会根据配置判断是否周期性地检查任务的推测执行。
TaskSchedulerImpl启动后,就可以接收DAGScheduler的submitMissingTasks方法提交过来的TaskSet进行进一步处理。TaskSchedulerImpl在submitTasks中初始化一个TaskSetManager,对其生命周期进行管理,当TaskSchedulerImpl得到Worker节点上的Executor计算资源的时候,会通过TaskSetManager发送具体的Task到Executor上执行计算。
如果Task执行过程中有错误导致失败,会调用TaskSetManager来处理Task失败的情况,进而通知DAGScheduler结束当前的Task。TaskSetManager会将失败的Task再次添加到待执行Task队列中。Spark Task允许失败的次数默认是4次,在TaskSchedulerImpl初始化的时候,通过spark.task.maxFailures设置该值。
如果Task执行完毕,执行的结果会反馈给TaskSetManager,由TaskSetManager通知DAGScheduler,DAGScheduler根据是否还存在待执行的Stage,继续迭代提交对应的TaskSet给TaskScheduler去执行,或者输出Job的结果。
通过下面的调度链,Executor把Task执行的结果返回给调度器(Scheduler)。
(1)Executor.run。
(2)CoarseGrainedExecutorBackend.statusUpdate(发送StatusUpdate消息)。
(3)CoarseGrainedSchedulerBackend.receive(处理StatusUpdate消息)。
(4)TaskSchedulerImpl.statusUpdate。
(5)TaskResultGetter.enqueueSuccessfulTask或者enqueueFailedTask。
(6)TaskSchedulerImpl.handleSuccessfulTask或者handleFailedTask。
(7)TaskSetManager.handleSuccessfulTask或者handleFailedTask。
(8)DAGScheduler.taskEnded。
(9)DAGScheduler.handleTaskCompletion。
在上面的调度链中值得关注的是:第(7)步中,TaskSetManager的handleFailedTask方法会将失败的Task再次添加到待执行Task队列中。在第(6)步中,TaskSchedulerImpl的handleFailedTask方法在TaskSetManager的handleFailedTask方法返回后,会调用CoarseGrainedSchedulerBackend的reviveOffers方法给重新执行的Task获取资源。
4.3.2 TaskScheduler源码解析
TaskScheduler是Spark的底层调度器。底层调度器负责Task本身的调度运行。
下面编写一个简单的测试代码,setMaster("local-cluster[1, 1, 1024]")设置为Spark本地伪分布式开发模式,从代码的运行日志中观察Spark框架的运行情况。
在IDEA中运行代码,运行结果中打印的日志如下:
日志中显示:StandaloneAppClient$ClientEndpoint: Connecting to master spark://192.168.189.100:59722,表明StandaloneAppClient的ClientEndpoint注册给master。日志中显示StandaloneAppClient$ClientEndpoint: Executor added获取了Executor。具体是通过Standalone-AppClient的ClientEndpoint来管理Executor。日志中显示StandaloneSchedulerBackend:SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio:0.0,说明Standalone- SchedulerBackend已经准备好。
这里是在IDEA本地伪分布式运行的(通过count的action算子启动了Job)。如果是通过Spark-shell运行程序来观察日志,当启动Spark-shell本身的时候,命令终端反馈回来的主要是ClientEndpoint和StandaloneSchedulerBackend,因为此时还没有任何Job的触发,这是启动Application本身而已,所以主要就是实例化SparkContext,并注册当前的应用程序给Master,且从集群中获得ExecutorBackend计算资源。
IDEA本地伪分布式运行,Job启动的日志如下:
count是action算子触发了Job;然后DAGScheduler获取Final stage: ResultStage,提交Submitting ResultStage。最后提交任务给TaskSetManager,启动任务。任务完成后,DAGScheduler完成Job。
DAGScheduler划分好Stage后,会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage中的所有任务TaskSet。TaskSetManager会根据locality aware来为Task分配计算资源、监控Task的执行状态(如重试、慢任务进行推测式执行等)。
TaskSet是一个数据结构,TaskSet包含了一系列高层调度器交给底层调度器的任务的集合。第一个成员是Tasks,第二个成员task属于哪个Stage,stageAttemptId是尝试的ID,priority优先级,调度的时候有一个调度池,调度归并调度的优先级。
TaskSetManager实例化的时候完成TaskSchedulerImpl的工作,接收TaskSet任务的集合,maxTaskFailures是任务失败重试的次数。
TaskSetManager.scala的源码如下:
TaskSetManager的blacklistTracker的成员变量,用于黑名单列表executors及nodes的跟踪。
TaskScheduler与SchedulerBackend总体的底层任务调度的过程如下。
(1)TaskSchedulerImpl.submitTasks:主要作用是将TaskSet加入到TaskSetManager中进行管理。
DAGScheduler.scala收到JobSubmitted消息,调用handleJobSubmitted方法。
在handleJobSubmitted方法中提交submitStage。
submitStage方法调用submitMissingTasks提交task。
DAGScheduler.scala的submitMissingTasks里面调用了taskScheduler.submitTasks。
Spark 2.2.1版本的DAGScheduler.scala的源码如下:
Spark 2.4.3版本的DAGScheduler.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第7行、第11行、第18行的stage.latestInfo.attemptId调整为stage.latestInfo.attemptNumber,从Spark 2.3.0版本以后,将attemptId改为使用attemptNumber。
上段代码中第19行删除。
taskScheduler是一个接口trait,这里没有具体的实现。
taskScheduler的子类是TaskSchedulerImpl,TaskSchedulerImpl中submitTasks的具体实现如下。
Spark 2.2.1版本的TaskSchedulerImpl.scala的submitTasks的源码。
Spark 2.4.3版本的TaskSchedulerImpl.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第8行之后新增任务集管理器TaskSetManagers,检查任务集是否为Zombie的代码。
删掉上段代码中第10~16行的代码。
高层调度器把任务集合传给了TaskSet,任务可能是ShuffleMapTask,也可能是ResultTask。获得taskSet.tasks任务赋值给变量tasks。然后使用了同步块synchronized,在同步块中调用createTaskSetManager,创建createTaskSetManager。createTaskSetManager代码如下。
TaskSchedulerImpl.scala的源码如下:
TaskSchedulerImpl.scala的createTaskSetManager会调用new()函数创建一个TaskSetManager,传进来的this是其本身TaskSchedulerImpl、任务集taskSet、最大失败重试次数maxTaskFailures。maxTaskFailures是在构建TaskSchedulerImpl时传入的。
而TaskSchedulerImpl是在SparkContext中创建的。SparkContext的源码如下:
在SparkContext.scala中,通过createTaskScheduler创建taskScheduler,而在createTaskScheduler方法中,模式匹配到Standalone的模式,用new函数创建一个TaskSchedulerImpl。
TaskSchedulerImpl的构造方法将获取配置文件中的config.MAX_TASK_FAILURES,MAX_TASK_FAILURES默认的最大失败重试次数是4次。
回到TaskSchedulerImpl,createTaskSetManager创建了TaskSetManager后,非常关键的一行代码是schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)。
(2)SchedulableBuilder.addTaskSetManager:SchedulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManager的locality aware来确定每个Task具体运行在哪个ExecutorBackend中。
schedulableBuilder是应用程序级别的调度器。SchedulableBuilder是一个接口trait,建立调度树。buildPools:建立树节点pools。addTaskSetManager:建立叶子节点TaskSetManagers。
schedulableBuilder支持两种调度模式:FIFOSchedulableBuilder、FairSchedulableBuilder。FIFOSchedulableBuilder是先进先出调度模式。FairSchedulableBuilder是公平调度模式。调度策略可以通过spark-env.sh中的spark.scheduler.mode进行具体设置,默认是FIFO的方式。
回到TaskSchedulerImpl的submitTasks,看一下schedulableBuilder.addTaskSetManager中的调度模式schedulableBuilder。
1. var schedulableBuilder: SchedulableBuilder = null
schedulableBuilder是SparkContext中new TaskSchedulerImpl(sc)在创建TaskSchedulerImpl的时候通过scheduler.initialize(backend)的initialize方法对schedulableBuilder进行了实例化。
具体调度模式有FIFO和FAIR两种,对应的SchedulableBuilder也有两种,即FIFOSchedulableBuilder、FairSchedulableBuilder。initialize方法中的schedulingMode模式默认是FIFO。
回到TaskSchedulerImpl的submitTasks,schedulableBuilder.addTaskSetManager之后,关键的一行代码是backend.reviveOffers()。
(3)CoarseGrainedSchedulerBackend.reviveOffers:给DriverEndpoint发送ReviveOffers。SchedulerBackend.scala的reviveOffers方法没有具体实现。
CoarseGrainedSchedulerBackend是SchedulerBackend的子类。CoarseGrainedScheduler-Backend的reviveOffers方法如下:
1. override def reviveOffers() { 2. driverEndpoint.send(ReviveOffers) 3. }
CoarseGrainedSchedulerBackend的reviveOffers方法中给DriverEndpoint发送ReviveOffers消息,而ReviveOffers本身是一个空的case object对象,ReviveOffers本身是一个空的case object对象,只是起到触发底层资源调度的作用,在有Task提交或者计算资源变动的时候,会发送ReviveOffers这个消息作为触发器。
1. case object ReviveOffers extends CoarseGrainedClusterMessage
TaskScheduler中要负责为Task分配计算资源:此时程序已经具备集群中的计算资源了,根据计算本地性原则确定Task具体要运行在哪个ExecutorBackend中。
driverEndpoint.send(ReviveOffers)将ReviveOffers消息发送给driverEndpoint,而不是发送给StandaloneAppClient,因为driverEndpoint是程序的调度器。driverEndpoint的receive方法中模式匹配到ReviveOffers消息,就调用makeOffers方法。
(4)在DriverEndpoint接受ReviveOffers消息并路由到makeOffers具体的方法中:在makeOffers方法中首先准备好所有可以用于计算的workOffers(代表了所有可用ExecutorBackend中可以使用的Cores等信息)。
Spark 2.2.1版本的CoarseGrainedSchedulerBackend.scala的源码如下:
Spark 2.4.3版本的CoarseGrainedSchedulerBackend.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第7行构建WorkerOffer实例时,新建一个地址address参数,address是可选的hostPort字符串,在同一主机上启动多个执行器executors时,address提供的信息比host更有用。
其中的executorData类如下,包括freeCores、totalCores等信息。
在makeOffers中首先找到可以利用的activeExecutors,然后创建workOffers。workOffers是一个数据结构case class,表示具体的Executor可能的资源。这里只考虑CPU cores,不考虑内存,因为之前内存已经分配完成。
Spark 2.2.1版本的WorkerOffer.scala的源码如下:
Spark 2.4.3版本的WorkerOffer.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第2行WorkerOffer类增加一个address成员变量。
makeOffers方法中,TaskSchedulerImpl.resourceOffers为每个Task具体分配计算资源,输入offers: IndexedSeq[WorkerOffer]一维数组是可用的计算资源,ExecutorBackend及其上可用的Cores,输出TaskDescription的二维数组Seq[Seq[TaskDescription]]定义每个任务的数据本地性及放在哪个Executor上执行。
TaskDescription包括executorId,TaskDescription中已经确定好了Task具体要运行在哪个ExecutorBackend上。而确定Task具体运行在哪个ExecutorBackend上的算法由TaskSetManager的resourceOffer方法决定。
Spark 2.2.1版本的TaskDescription.scala的源码如下:
Spark 2.4.3版本的TaskDescription.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第6行之后新增一个成员变量partitionId。
resourceOffers由群集管理器调用提供slaves的资源,根据优先级顺序排列任务,以循环的方式填充每个节点的任务,使得集群的任务运行均衡。
Spark 2.2.1版本的TaskSchedulerImpl.scala的源码如下:
Spark 2.4.3版本的TaskSchedulerImpl.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第35行将o.cores调整为o.cores / CPUS_PER_TASK。
上段代码中第35行之后新增代码计算可用的槽位。
上段代码中第49~68行替换为新增的代码。
resourceOffers中:
标记每一个活着的slave,记录它的主机名,并跟踪是否增加了新的Executor。感知集群动态资源的状况。
offers是集群有哪些可用的资源,循环遍历offers,hostToExecutors是否包含当前的host,如果不包含,就将Executor加进去。因为这里是最新请求,获取机器有哪些可用的计算资源。
getRackForHost是数据本地性,默认情况下,在一个机架Rack里面,生产环境中可能分若干个机架Rack。
重要的一行代码val shuffledOffers = shuffleOffers(filteredOffers):将可用的计算资源打散。
tasks将获得洗牌后的shuffledOffers通过map转换,对每个worker用了ArrayBuffer[TaskDescription],每个Executor可以放几个[TaskDescription],就可以运行多少个任务。即多少个Cores,就可以分配多少任务。ArrayBuffer是一个一维数组,数组的长度根据当前机器的CPU个数决定。
ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)说明当前ExecutorBackend上可以分配多少个Task,并行运行多少Task,和RDD的分区个数是两个概念:这里不是决定Task的个数,RDD的分区数在创建RDD时就已经决定了。这里,具体任务调度是指Task分配在哪些机器上,每台机器上分配多少Task,一次能分配多少Task。
在TaskSchedulerImpl中的initialize中创建rootPool,将schedulingMode调度模式传进去。rootPool的叶子节点是TaskSetManagers,按照一定的算法计算Stage的TaskSet调度的优先顺序。
for循环遍历sortedTaskSets,如果有新的可用的Executor,通过taskSet.executorAdded()加入taskSet。
TastSetManager的executorAdded方法如下:
数据本地优先级从高到低依次为:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY。其中,NO_PREF是指机器本地性,一台机器上有很多Node,Node的优先级高于机器本地性。
resourceOffers中追求最高级别的优先级本地性源码如下:
循环遍历sortedTaskSets,对其中的每个taskSet,首先考虑myLocalityLevels的优先性,myLocalityLevels计算数据本地性的Level,将PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY循环一遍。myLocalityLevels是通过computeValidLocalityLevels方法获取到的。
TaskSetManager.scala的computeValidLocalityLevels的源码如下:
Spark 2.2.1版本的TaskSchedulerImpl.scala的resourceOfferSingleTaskSet的源码如下:
Spark 2.4.3版本的TaskSchedulerImpl.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第6行之后新增一个参数addressesWithDescs: ArrayBuffer[(String,TaskDescription)])。
上段代码中第21行之后新增屏障任务主机更新的代码。
resourceOfferSingleTaskSet方法中的CPUS_PER_TASK是每个Task默认采用一个线程进行计算的。TaskSchedulerImpl.scala中CPUS_PER_TASK的源码如下:
resourceOfferSingleTaskSet方法中的taskSet.resourceOffer,通过调用TaskSetManager的resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level。
Spark 2.2.1版本的TaskSetManager.scala的源码如下:
Spark 2.4.3版本的TaskSetManager.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第14行构建TaskDescription实例时,新增传入task.partitionId成员变量。
1. ...... 2. task.partitionId, 3. ......
以上内容都在做一件事情:获取Locality Level本地性的层次。DagScheduler告诉我们任务运行在哪台机器上,DAGScheduler是从数据层面考虑preferedLocation的,DAGScheduler从RDD的层面确定就可以;而TaskScheduler是从具体计算Task的角度考虑计算的本地性,TaskScheduler是更具体的底层调度。本地性的两个层面:①数据的本地性;②计算的本地性。
总结:scheduler.resourceOffers确定了每个Task具体运行在哪个ExecutorBackend上;resourceOffers到底是如何确定Task具体运行在哪个ExecutorBackend上的呢?
①通过Random.shuffle方法重新洗牌所有的计算资源,以寻求计算的负载均衡。
②根据每个ExecutorBackend的cores的个数声明类型为TaskDescription的ArrayBuffer数组。
③如果有新的ExecutorBackend分配给我们的Job,此时会调用executorAdded来获得最新的、完整的可用计算资源。
④追求最高级别的优先级本地性。
⑤通过调用TaskSetManager的resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level。
回到CoarseGrainedSchedulerBackend.scala的launchTasks方法。
Spark 2.2.1版本的CoarseGrainedSchedulerBackend.scala的源码如下:
Spark 2.4.3版本的CoarseGrainedSchedulerBackend.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第5行调整为使用Option方法。
(5)通过launchTasks把任务发送给ExecutorBackend去执行。
launchTasks首先进行序列化,但序列化Task的大小不能太大,如果超过maxRpcMessageSize,则提示出错信息。
RpcUtils.scala中maxRpcMessageSize的定义,spark.rpc.message.maxSize默认设置是128MB。
Task进行广播时的maxSizeInMB大小是128MB,如果任务大于等于128MB,则Task直接被丢弃掉;如果小于128MB,会通过CoarseGrainedSchedulerBackend去launchTask到具体的ExecutorBackend上。
CoarseGrainedSchedulerBackend.scala的launchTasks方法:通过executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))交给Task要运行的ExecutorBackend,给它发送一个消息LaunchTask,发送序列化的Task。
CoarseGrainedExecutorBackend就收到了launchTasks消息,启动executor.launchTask。