Shuffle Map Task运算结果的处理
这个结果的处理,分为两部分,一个是在Executor端是如何直接处理Task的结果的;还有就是Driver端,如果在接到Task运行结束的消息时,如何对Shuffle Write的结果进行处理,从而在调度下游的Task时,下游的Task可以得到其需要的数据。
Executor端的处理 在解析BasicShuffle Writer时,我们知道ShuffleMap Task在Executor上运行时,最终会调用org.apache.spark.scheduler.ShuffleMapTask的runTask:
|
|
那么这个结果最终是如何处理的呢?特别是下游的Task如何获取这些Shuffle的数据呢?还要从Task是如何开始执行开始讲起。在Worker上接收Task执行命令的是org.apache.spark.executor.CoarseGrainedExecutorBackend。它在接收到LaunchTask的命令后,通过在Driver创建SparkContext时已经创建的org.apache.spark.executor.Executor的实例的launchTask,启动Task:
|
|
最终Task的执行是在org.apache.spark.executor.Executor.TaskRunner#run。
在Executor运行Task时,得到计算结果会存入org.apache.spark.scheduler.DirectTaskResult。
|
|
在将结果回传到Driver时,会根据结果的大小有不同的策略: 1) 如果结果大于1GB,那么直接丢弃这个结果。这个是Spark1.2中新加的策略。可以通过spark.driver.maxResultSize来进行设置。
2) 对于“较大”的结果,将其以taskid为key存入org.apache.spark.storage.BlockManager;如果结果不大,那么直接回传给Driver。那么如何判定这个阈值呢?
这里的回传是直接通过akka的消息传递机制。因此这个大小首先不能超过这个机制设置的消息的最大值。这个最大值是通过spark.akka.frameSize设置的,单位是MBytes,默认值是10MB。除此之外,还有200KB的预留空间。因此这个阈值就是conf.getInt("spark.akka.frameSize",10) * 1024 1024 – 2001024。
3) 其他的直接通过AKKA回传到Driver。
实现源码解析如下:
|
|
而execBackend是org.apache.spark.executor.ExecutorBackend的一个实例,它实际上是Executor与Driver通信的接口:
|
|
TaskRunner会将Task执行的状态汇报给Driver(org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor)。 而Driver会转给org.apache.spark.scheduler.TaskSchedulerImpl#statusUpdate。 Driver的处理 TaskRunner将Task的执行状态汇报给Driver后,Driver会转给org.apache.spark.scheduler.TaskSchedulerImpl#statusUpdate。而在这里不同的状态有不同的处理:
1) 如果类型是TaskState.FINISHED,那么调用org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask进行处理。
2) 如果类型是TaskState.FAILED或者TaskState.KILLED或者TaskState.LOST,调用org.apache.spark.scheduler.TaskResultGetter#enqueueFailedTask进行处理。对于TaskState.LOST,还需要将其所在的Executor标记为failed,并且根据更新后的Executor重新调度。
enqueueSuccessfulTask的逻辑也比较简单,就是如果是IndirectTaskResult,那么需要通过blockid来获取结果:sparkEnv.blockManager.getRemoteBytes(blockId);如果是DirectTaskResult,那么结果就无需远程获取了。然后调用
1) org.apache.spark.scheduler.TaskSchedulerImpl#handleSuccessfulTask
2) org.apache.spark.scheduler.TaskSetManager#handleSuccessfulTask
3) org.apache.spark.scheduler.DAGScheduler#taskEnded
4) org.apache.spark.scheduler.DAGScheduler#eventProcessActor
5) org.apache.spark.scheduler.DAGScheduler#handleTaskCompletion
进行处理。核心逻辑都在第5个调用栈。
如果task是ShuffleMapTask,那么它需要将结果通过某种机制告诉下游的Stage,以便于其可以作为下游Stage的输入。这个机制是怎么实现的?
实际上,对于ShuffleMapTask来说,其结果实际上是org.apache.spark.scheduler.MapStatus;其序列化后存入了DirectTaskResult或者IndirectTaskResult中。而DAGScheduler#handleTaskCompletion通过下面的方式来获取这个结果:
val status=event.result.asInstanceOf[MapStatus]
通过将这个status注册到org.apache.spark.MapOutputTrackerMaster,就完成了结果处理的漫长过程:
|
|
而registerMapOutputs的处理也很简单,以Shuffle ID为key将MapStatus的列表存入带有时间戳的HashMap:TimeStampedHashMapInt, Array[MapStatus]。如果设置了cleanup的函数,那么这个HashMap会将超过一定时间(TTL,Time to Live)的数据清理掉。