spark 上下游shuffle结果的存放获取

 2023-09-15 阅读 22 评论 0

摘要:当一个job在DAGScheduler中被分隔为stage,将会根据其内部的shuffle关系将整个job整理出ShuffleMapStage,而最后结果的ResultStage在提交时,将会不断遍历其parent stage,而本身被加入DAGScheduler的等待集合,只在所有parent的stage执行完毕

当一个job在DAGScheduler中被分隔为stage,将会根据其内部的shuffle关系将整个job整理出ShuffleMapStage,而最后结果的ResultStage在提交时,将会不断遍历其parent stage,而本身被加入DAGScheduler的等待集合,只在所有parent的stage执行完毕之后才会执行任务流程中的child stage。

private def submitStage(stage: Stage) {val jobId = activeJobForStage(stage)if (jobId.isDefined) {logDebug("submitStage(" + stage + ")")if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {val missing = getMissingParentStages(stage).sortBy(_.id)logDebug("missing: " + missing)if (missing.isEmpty) {logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")submitMissingTasks(stage, jobId.get)} else {for (parent <- missing) {submitStage(parent)}waitingStages += stage}}} else {abortStage(stage, "No active job for stage " + stage.id, None)}
}

在上方的代码可以看到,当子stage提交之后,将会不断递归,将parent stage调用该方法进行提交,在这里,最上级的stage将会被通过submitMissingTasks()方法被分成task进行执行,而下游的stage将会放入等待集合,直到上级全部执行完毕才回进入执行计划。mr和spark的shuffle,

 

 

通过submitMissingTasks()方法,将会把当前需要执行的stage转换成task交给Executor执行,ShuffleStage也将会被转换成ShuffleMapTask经由Executor执行。

setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

在Executor执行完ShuffleMapTask的结尾,将会把ShuffleMapTask执行完毕的结果写到BlockManager中,并把其在BlockManager中的地址整理成一份map,作为task执行完毕的结果序列化后返回告知driver端。

 

在driver端,task的执行结果,尤其是ShuffleMapTask的结果的处理,主要分为两步,一步是注册shuffle结果在BlockManager中的位置,另一个便是启动其下游的child stage。

第一步,将会通过一个MapOutputTracker的成员实现。

mapOutputTracker.registerMapOutput(shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)

在处理task处理结果的时候,ShuffleMapTask会通过MapOutputTracker的registerMapOutput()方法将其shuffleid以及分区号和处理结果存放到处于driver端的MapOutputTracker中。

MapOutputTracker维护着一个map,以ShuffleId作为key用来保存shuffle处理结果的一个映射,以便当需要具体使用到的时候可以快速获取其处理结果在BlockManager上的位置。

 

第二步,在其处理完之后,将会调用submitWaitingChildStages()方法尝试将child stage进行执行。

private def submitWaitingChildStages(parent: Stage) {logTrace(s"Checking if any dependencies of $parent are now runnable")logTrace("running: " + runningStages)logTrace("waiting: " + waitingStages)logTrace("failed: " + failedStages)val childStages = waitingStages.filter(_.parents.contains(parent)).toArraywaitingStages --= childStagesfor (stage <- childStages.sortBy(_.firstJobId)) {submitStage(stage)}
}

将会从等待集合获取该stage的所有子stage准备通过本文最一开始的submitStage()方法调用,但是在submitStage()将会再次判断,直到其所有parent全部执行完才会正式执行。

 

 

Child stage如何获取上述的shuffle 结果?

以ShuffleRowRDD的compute()方法为例子。

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]// The range of pre-shuffle partitions that we are fetching at here is// [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].val reader =SparkEnv.get.shuffleManager.getReader(dependency.shuffleHandle,shuffledRowPartition.startPreShufflePartitionIndex,shuffledRowPartition.endPreShufflePartitionIndex,context)reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2)
}

其将会直接构造一个reader尝试读取shuffle的结果。

最后构造的实际上是一个BlockStoreShuffleReader,而顾名思义,shuffle的结果也存放在BlockManager当中。

val wrappedStreams = new ShuffleBlockFetcherIterator(context,blockManager.shuffleClient,blockManager,mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),serializerManager.wrapStream,// Note: we use getSizeAsMb when no suffix is provided for backwards compatibilitySparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))

在其read()方法中,将会构造一个数据流,而数据流的获取目的地坐标,则是通过MapOutputTracker的getMapSizesByExecutorId()方法来获取。

当executor这个方法被调用的时候,首先会通过getStatus()尝试和master获取具体的存放数据,如果不存在将会构造网络请求向driver端MapOutputTracker尝试拉取对应的shuffleId的具体存放位置。当获取之后,shuffle的具体数据也将得到,可以构造数据流的形式作为child stage的数据继续执行。

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/2/58140.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息