思想,《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(叔篇)——TaskScheduler的啟動...

 2023-11-19 阅读 22 评论 0

摘要:《深入理解Spark:核心思想與源碼分析》一書前言的內容請看鏈接《深入理解SPARK:核心思想與源碼分析》一書正式出版上市 《深入理解Spark:核心思想與源碼分析》一書第一章的內容請看鏈接《第1章 環境準備》 《深入理解Spark:核心思想與源碼分析》

《深入理解Spark:核心思想與源碼分析》一書前言的內容請看鏈接《深入理解SPARK:核心思想與源碼分析》一書正式出版上市

《深入理解Spark:核心思想與源碼分析》一書第一章的內容請看鏈接《第1章 環境準備》

《深入理解Spark:核心思想與源碼分析》一書第二章的內容請看鏈接《第2章?SPARK設計理念與基本架構》

思想,由于本書的第3章內容較多,所以打算分別開辟四篇隨筆分別展現。

《深入理解Spark:核心思想與源碼分析》一書第三章第一部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(伯篇)》

《深入理解Spark:核心思想與源碼分析》一書第三章第二部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(仲篇)》

本文展現第3章第三部分的內容:

3.8 TaskScheduler的啟動

  3.7節介紹了任務調度器TaskScheduler的創建,要想TaskScheduler發揮作用,必須要啟動它,代碼如下。

taskScheduler.start()

TaskScheduler在啟動的時候,實際調用了backend的start方法。

  override def start() {backend.start()}

以LocalBackend為例,啟動LocalBackend時向actorSystem注冊了LocalActor,見代碼清單3-30所示(在《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(中)》一文)。

3.8.1 創建LocalActor

  創建LocalActor的過程主要是構建本地的Executor,見代碼清單3-36。

代碼清單3-36???????? LocalActor的實現

private[spark] class LocalActor(scheduler: TaskSchedulerImpl, executorBackend: LocalBackend,private val totalCores: Int) extends Actor with ActorLogReceive with Logging {import context.dispatcher   // to use Akka's scheduler.scheduleOnce()private var freeCores = totalCoresprivate val localExecutorId = SparkContext.DRIVER_IDENTIFIERprivate val localExecutorHostname = "localhost"val executor = new Executor(localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true)override def receiveWithLogging = {case ReviveOffers =>reviveOffers()case StatusUpdate(taskId, state, serializedData) =>scheduler.statusUpdate(taskId, state, serializedData)if (TaskState.isFinished(state)) {freeCores += scheduler.CPUS_PER_TASKreviveOffers()} case KillTask(taskId, interruptThread) =>executor.killTask(taskId, interruptThread)case StopExecutor =>executor.stop()}}

Executor的構建,見代碼清單3-37,主要包括以下步驟:

1)?創建并注冊ExecutorSource。ExecutorSource是做什么的呢?筆者將在3.10.2節詳細介紹。

2) 獲取SparkEnv。如果是非local模式,Worker上的CoarseGrainedExecutorBackend向Driver上的CoarseGrainedExecutorBackend注冊Executor時,則需要新建SparkEnv。可以修改屬性spark.executor.port(默認為0,表示隨機生成)來配置Executor中的ActorSystem的端口號。

3) 創建并注冊ExecutorActor。ExecutorActor負責接受發送給Executor的消息。

4) urlClassLoader的創建。為什么需要創建這個ClassLoader?在非local模式中,Driver或者Worker上都會有多個Executor,每個Executor都設置自身的urlClassLoader,用于加載任務上傳的jar包中的類,有效對任務的類加載環境進行隔離。

5) 創建Executor執行TaskRunner任務(TaskRunner將在5.5節介紹)的線程池。此線程池是通過調用Utils.newDaemonCachedThreadPool創建的,具體實現請參閱附錄A。

6) 啟動Executor的心跳線程。此線程用于向Driver發送心跳。

此外,還包括Akka發送消息的幀大小(10485760字節)、結果總大小的字節限制(1073741824字節)、正在運行的task的列表、設置serializer的默認ClassLoader為創建的ClassLoader等。

代碼清單3-37???????? Executor的構建

  val executorSource = new ExecutorSource(this, executorId)private val env = {if (!isLocal) {val port = conf.getInt("spark.executor.port", 0)val _env = SparkEnv.createExecutorEnv(conf, executorId, executorHostname, port, numCores, isLocal, actorSystem)SparkEnv.set(_env)_env.metricsSystem.registerSource(executorSource)_env.blockManager.initialize(conf.getAppId)_env} else {SparkEnv.get}}private val executorActor = env.actorSystem.actorOf(Props(new ExecutorActor(executorId)), "ExecutorActor")private val urlClassLoader = createClassLoader()private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)env.serializer.setDefaultClassLoader(urlClassLoader)private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)private val maxResultSize = Utils.getMaxResultSize(conf)val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]startDriverHeartbeater()

3.8.2 ExecutorSource的創建與注冊

hadoop開源?  ExecutorSource用于測量系統。通過metricRegistry的register方法注冊計量,這些計量信息包括threadpool.activeTasks、threadpool.completeTasks、threadpool.currentPool_size、threadpool.maxPool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeRead_ops、filesystem.hdfs.write_ops等,ExecutorSource的實現見代碼清單3-38。Metric接口的具體實現,參考附錄D。

代碼清單3-38???????? ExecutorSource的實現

private[spark] class ExecutorSource(val executor: Executor, executorId: String) extends Source {private def fileStats(scheme: String) : Option[FileSystem.Statistics] =FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOptionprivate def registerFileSystemStat[T](scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = {metricRegistry.register(MetricRegistry.name("filesystem", scheme, name), new Gauge[T] {override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)})}override val metricRegistry = new MetricRegistry()override val sourceName = "executor"metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {override def getValue: Int = executor.threadPool.getActiveCount()})metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {override def getValue: Long = executor.threadPool.getCompletedTaskCount()})metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {override def getValue: Int = executor.threadPool.getPoolSize()})metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {override def getValue: Int = executor.threadPool.getMaximumPoolSize()})// Gauge for file system stats of this executorfor (scheme <- Array("hdfs", "file")) {registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)}}?

創建完ExecutorSource后,調用MetricsSystem的registerSource方法將ExecutorSource注冊到MetricsSystem。registerSource方法使用MetricRegistry的register方法,將Source注冊到MetricRegistry,見代碼清單3-39。關于MetricRegistry,具體參閱附錄D。

代碼清單3-39???????? MetricsSystem注冊Source的實現

  def registerSource(source: Source) {sources += sourcetry {val regName = buildRegistryName(source)registry.register(regName, source.metricRegistry)} catch {case e: IllegalArgumentException => logInfo("Metrics already registered", e)}}?

3.8.3 ExecutorActor的構建與注冊

  ExecutorActor很簡單,當接收到SparkUI發來的消息時,將所有線程的棧信息發送回去,代碼實現如下。

  override def receiveWithLogging = {case TriggerThreadDump =>sender ! Utils.getThreadDump()}

3.8.4 Spark自身ClassLoader的創建

  獲取要創建的ClassLoader的父加載器currentLoader,然后根據currentJars生成URL數組,spark.files.userClassPathFirst屬性指定加載類時是否先從用戶的classpath下加載,最后創建ExecutorURLClassLoader或者ChildExecutorURLClassLoader,見代碼清單3-40。

代碼清單3-40???????? Spark自身ClassLoader的創建

  private def createClassLoader(): MutableURLClassLoader = {val currentLoader = Utils.getContextOrSparkClassLoaderval urls = currentJars.keySet.map { uri =>new File(uri.split("/").last).toURI.toURL}.toArrayval userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false)userClassPathFirst match {case true => new ChildExecutorURLClassLoader(urls, currentLoader)case false => new ExecutorURLClassLoader(urls, currentLoader)}}?

Utils.getContextOrSparkClassLoader的實現見附錄A。ExecutorURLClassLoader或者ChildExecutorURLClassLoader實際上都繼承了URLClassLoader,見代碼清單3-41。?

代碼清單3-41???????? ChildExecutorURLClassLoader與ExecutorURLClassLoader的實現

private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)extends MutableURLClassLoader {private object userClassLoader extends URLClassLoader(urls, null){override def addURL(url: URL) {super.addURL(url)}override def findClass(name: String): Class[_] = {super.findClass(name)}}private val parentClassLoader = new ParentClassLoader(parent)override def findClass(name: String): Class[_] = {try {userClassLoader.findClass(name)} catch {case e: ClassNotFoundException => {parentClassLoader.loadClass(name)}}}def addURL(url: URL) {userClassLoader.addURL(url)}def getURLs() = {userClassLoader.getURLs()}
}private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)extends URLClassLoader(urls, parent) with MutableURLClassLoader {override def addURL(url: URL) {super.addURL(url)}}

如果需要REPL交互,還會調用addReplClassLoaderIfNeeded創建replClassLoader,見代碼清單3-42。

代碼清單3-42???????? addReplClassLoaderIfNeeded的實現

  private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {val classUri = conf.get("spark.repl.class.uri", null)if (classUri != null) {logInfo("Using REPL class URI: " + classUri)val userClassPathFirst: java.lang.Boolean =conf.getBoolean("spark.files.userClassPathFirst", false)try {val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader").asInstanceOf[Class[_ <: ClassLoader]]val constructor = klass.getConstructor(classOf[SparkConf], classOf[String],classOf[ClassLoader], classOf[Boolean])constructor.newInstance(conf, classUri, parent, userClassPathFirst)} catch {case _: ClassNotFoundException =>logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")System.exit(1)null}} else {parent}}

3.8.5 啟動Executor的心跳線程

  Executor的心跳由startDriverHeartbeater啟動,見代碼清單3-43。Executor心跳線程的間隔由屬性spark.executor.heartbeatInterval配置,默認是10000毫秒。此外,超時時間是30秒,超時重試次數是3次,重試間隔是3000毫秒,使用actorSystem.actorSelection (url)方法查找到匹配的Actor引用, url是akka.tcp://sparkDriver@ $driverHost:$driverPort/user/HeartbeatReceiver,最終創建一個運行過程中,每次會休眠10000到20000毫秒的線程。此線程從runningTasks獲取最新的有關Task的測量信息,將其與executorId、blockManagerId封裝為Heartbeat消息,向HeartbeatReceiver發送Heartbeat消息。

代碼清單3-43???????? 啟動Executor的心跳線程

  def startDriverHeartbeater() {val interval = conf.getInt("spark.executor.heartbeatInterval", 10000)val timeout = AkkaUtils.lookupTimeout(conf)val retryAttempts = AkkaUtils.numRetries(conf)val retryIntervalMs = AkkaUtils.retryWaitMs(conf)val heartbeatReceiverRef = AkkaUtils.makeDriverRef("HeartbeatReceiver", conf,env.actorSystem)val t = new Thread() {override def run() {// Sleep a random interval so the heartbeats don't end up in syncThread.sleep(interval + (math.random * interval).asInstanceOf[Int])while (!isStopped) {val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]()val curGCTime = gcTimefor (taskRunner <- runningTasks.values()) {if (!taskRunner.attemptedTask.isEmpty) {Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>metrics.updateShuffleReadMetricsmetrics.jvmGCTime = curGCTime - taskRunner.startGCTimeif (isLocal) {val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))tasksMetrics += ((taskRunner.taskId, copiedMetrics))} else {// It will be copied by serializationtasksMetrics += ((taskRunner.taskId, metrics))}}}}val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)try {val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,retryAttempts, retryIntervalMs, timeout)if (response.reregisterBlockManager) {logWarning("Told to re-register on heartbeat")env.blockManager.reregister()}} catch {case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)}Thread.sleep(interval)}}}t.setDaemon(true)t.setName("Driver Heartbeater")t.start()}

這個心跳線程的作用是什么呢?其作用有兩個:

hadoop實驗心得。q??更新正在處理的任務的測量信息;

q??通知BlockManagerMaster,此Executor上的BlockManager依然活著。

下面對心跳線程的實現詳細分析下,讀者可以自行選擇是否需要閱讀。

  初始化TaskSchedulerImpl后會創建心跳接收器HeartbeatReceiver。HeartbeatReceiver接受所有分配給當前Driver Application的Executor的心跳,并將Task、Task計量信息、心跳等交給TaskSchedulerImpl和DAGScheduler作進一步處理。創建心跳接收器的代碼如下。

  private val heartbeatReceiver = env.actorSystem.actorOf(Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")

hadoop簡單應用實例?HeartbeatReceiver在收到心跳消息后,會調用TaskScheduler的executorHeartbeatReceived方法,代碼如下。

  override def receiveWithLogging = {case Heartbeat(executorId, taskMetrics, blockManagerId) =>val response = HeartbeatResponse(!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))sender ! response}

executorHeartbeatReceived的實現代碼如下。

    val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {taskMetrics.flatMap { case (id, metrics) =>taskIdToTaskSetId.get(id).flatMap(activeTaskSets.get).map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics))}}dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)

這段程序通過遍歷taskMetrics,依據taskIdToTaskSetId和activeTaskSets找到TaskSetManager。然后將taskId、TaskSetManager.stageId、TaskSetManager .taskSet.attempt、TaskMetrics封裝到Array[(Long, Int, Int, TaskMetrics)]的數組metricsWithStageIds中。最后調用了dagScheduler的executorHeartbeatReceived方法,其實現如下。

    listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))implicit val timeout = Timeout(600 seconds)Await.result(blockManagerMaster.driverActor ? BlockManagerHeartbeat(blockManagerId),timeout.duration).asInstanceOf[Boolean]

dagScheduler將executorId、metricsWithStageIds封裝為SparkListenerExecutorMetricsUpdate事件,并post到listenerBus中,此事件用于更新Stage的各種測量數據。最后給BlockManagerMaster持有的BlockManagerMasterActor發送BlockManagerHeartbeat消息。BlockManagerMasterActor在收到消息后會匹配執行heartbeatReceived方法(會在4.3.1節介紹)。heartbeatReceived最終更新BlockManagerMaster對BlockManager最后可見時間(即更新BlockManagerId對應的BlockManagerInfo的_lastSeenMs,見代碼清單3-44)。

代碼清單3-44???????? BlockManagerMasterActor的心跳處理

private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {if (!blockManagerInfo.contains(blockManagerId)) {blockManagerId.isDriver && !isLocal} else {blockManagerInfo(blockManagerId).updateLastSeenMs()true}}

local模式下Executor的心跳通信過程,可以用圖3-3來表示。

圖3-3?????? Executor的心跳通信過程

?

注意:在非local模式中Executor發送心跳的過程是一樣的,主要的區別是Executor進程與Driver不在同一個進程,甚至不在同一個節點上。

?

接下來會初始化塊管理器BlockManager,代碼如下。

env.blockManager.initialize(applicationId)

具體的初始化過程,請參閱第4章。

?

未完待續。。。

?

后記:自己犧牲了7個月的周末和下班空閑時間,通過研究Spark源碼和原理,總結整理的《深入理解Spark:核心思想與源碼分析》一書現在已經正式出版上市,目前亞馬遜、京東、當當、天貓等網站均有銷售,歡迎感興趣的同學購買。我開始研究源碼時的Spark版本是1.2.0,經過7個多月的研究和出版社近4個月的流程,Spark自身的版本迭代也很快,如今最新已經是1.6.0。目前市面上另外2本源碼研究的Spark書籍的版本分別是0.9.0版本和1.2.0版本,看來這些書的作者都與我一樣,遇到了這種問題。由于研究和出版都需要時間,所以不能及時跟上Spark的腳步,還請大家見諒。但是Spark核心部分的變化相對還是很少的,如果對版本不是過于追求,依然可以選擇本書。

?

京東(現有滿100減30活動):http://item.jd.com/11846120.html?

當當:http://product.dangdang.com/23838168.html?

轉載于:https://www.cnblogs.com/jiaan-geng/p/5249682.html

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/3/181876.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息