当一个job在DAGScheduler中被分隔为stage,将会根据其内部的shuffle关系将整个job整理出ShuffleMapStage,而最后结果的ResultStage在提交时,将会不断遍历其parent stage,而本身被加入DAGScheduler的等待集合,只在所有parent的stage执行完毕之后才会执行任务流程中的child stage。
private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {val missing = getMissingParentStages(stage).sortBy(_.id)logDebug("missing: " + missing)if (missing.isEmpty) {logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")submitMissingTasks(stage, jobId.get)} else {for (parent <- missing) {submitStage(parent)}waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id, None)}
}
在上方的代码可以看到,当子stage提交之后,将会不断递归,将parent stage调用该方法进行提交,在这里,最上级的stage将会被通过submitMissingTasks()方法被分成task进行执行,而下游的stage将会放入等待集合,直到上级全部执行完毕才回进入执行计划。mr和spark的shuffle,
通过submitMissingTasks()方法,将会把当前需要执行的stage转换成task交给Executor执行,ShuffleStage也将会被转换成ShuffleMapTask经由Executor执行。
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
在Executor执行完ShuffleMapTask的结尾,将会把ShuffleMapTask执行完毕的结果写到BlockManager中,并把其在BlockManager中的地址整理成一份map,作为task执行完毕的结果序列化后返回告知driver端。
在driver端,task的执行结果,尤其是ShuffleMapTask的结果的处理,主要分为两步,一步是注册shuffle结果在BlockManager中的位置,另一个便是启动其下游的child stage。
第一步,将会通过一个MapOutputTracker的成员实现。
mapOutputTracker.registerMapOutput(shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
在处理task处理结果的时候,ShuffleMapTask会通过MapOutputTracker的registerMapOutput()方法将其shuffleid以及分区号和处理结果存放到处于driver端的MapOutputTracker中。
MapOutputTracker维护着一个map,以ShuffleId作为key用来保存shuffle处理结果的一个映射,以便当需要具体使用到的时候可以快速获取其处理结果在BlockManager上的位置。
第二步,在其处理完之后,将会调用submitWaitingChildStages()方法尝试将child stage进行执行。
private def submitWaitingChildStages(parent: Stage) {logTrace(s"Checking if any dependencies of $parent are now runnable")logTrace("running: " + runningStages)logTrace("waiting: " + waitingStages)logTrace("failed: " + failedStages)val childStages = waitingStages.filter(_.parents.contains(parent)).toArraywaitingStages --= childStagesfor (stage <- childStages.sortBy(_.firstJobId)) {submitStage(stage)}
}
将会从等待集合获取该stage的所有子stage准备通过本文最一开始的submitStage()方法调用,但是在submitStage()将会再次判断,直到其所有parent全部执行完才会正式执行。
Child stage如何获取上述的shuffle 结果?
以ShuffleRowRDD的compute()方法为例子。
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]// The range of pre-shuffle partitions that we are fetching at here is// [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].val reader =SparkEnv.get.shuffleManager.getReader(dependency.shuffleHandle,shuffledRowPartition.startPreShufflePartitionIndex,shuffledRowPartition.endPreShufflePartitionIndex,context)reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2)
}
其将会直接构造一个reader尝试读取shuffle的结果。
最后构造的实际上是一个BlockStoreShuffleReader,而顾名思义,shuffle的结果也存放在BlockManager当中。
val wrappedStreams = new ShuffleBlockFetcherIterator(context,blockManager.shuffleClient,blockManager,mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),serializerManager.wrapStream,// Note: we use getSizeAsMb when no suffix is provided for backwards compatibilitySparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
在其read()方法中,将会构造一个数据流,而数据流的获取目的地坐标,则是通过MapOutputTracker的getMapSizesByExecutorId()方法来获取。
当executor这个方法被调用的时候,首先会通过getStatus()尝试和master获取具体的存放数据,如果不存在将会构造网络请求向driver端MapOutputTracker尝试拉取对应的shuffleId的具体存放位置。当获取之后,shuffle的具体数据也将得到,可以构造数据流的形式作为child stage的数据继续执行。
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态