python 筆記,spark發行版筆記13

 2023-12-06 阅读 27 评论 0

摘要:本期概覽: ReceiverTracker架構設計 消息循環系統 python 筆記、ReceiverTracker具體的實現 Spark Streaming作為Spark Core基礎 架構之上的一個應用程序,其中的ReceiverTracker接收到數據之后,具體該怎么進行數據處理呢? 為了弄清楚這個問題

本期概覽:

ReceiverTracker架構設計

消息循環系統

python 筆記、ReceiverTracker具體的實現

Spark Streaming作為Spark Core基礎 架構之上的一個應用程序,其中的ReceiverTracker接收到數據之后,具體該怎么進行數據處理呢?

為了弄清楚這個問題,首先,我們打開源碼

找到ReceiverSupervisorImpl這個類

Apache Spark。

從源碼中可以看出,寫數據是通過ReceivedBlockHandler的對象receivedBlockHandler寫的。寫的過程中有倆種方式,一種是基于WAL方式進行容錯寫。一種是直接寫(相對不安全)。如下圖所示

?

github記筆記?然后存儲數據完成后并報告給Driver,以便Driver對元數據進行存儲,如下所示

用于匯報給Driver的消息類、如下圖所示

上圖談到了Record,要注意到,一般專業的描述處理的數據的大小的時候,應該用多少條記錄來描述更科學,一般說數據規模達到多少多少百億條記錄,而不是說數據規模達到多少PB的數據規模,這樣不是很科學,因為記錄可能有很多字段,比如說,1PB的數據,5個字段,和5PB的數據1個字段是差不多的。所以1PB的數據規模未必比5PB的數據規模體現出一個大數據引擎的數據處理能力。也比如說,有些數據是視頻或者音頻。更不適合說多少個PB來描述規模大小。

上圖說明ReceiverSupervisorImpl中有ReceiverTracker的通信體,能進行與ReceiverTracker的通信

并且ReceiverSupervisorImpl將數據的元數據信息匯報給ReceiverTracker

于是,我們進入ReceiverTracker這個類,這個類是整個流處理數據管理的中心。

?

ReceiverTracker中有endpoint通信體,這個通信體接收來自ReceiverSuperVisorImpl的元數據的數據匯報。

?

?

接下來,我們再進入ReceiverTracker本身,從整體上認識ReceiverTracker。

記錄Receiver的三種狀態,分別為非活躍狀態,正在執行調度任務狀態,活躍狀態

密封關鍵字,說明所有的子類都密封在這里,方便管理

/**

?* This message will trigger ReceiverTrackerEndpoint to restart a Spark job for the receiver.

?*/

這個消息用來告知為receiver啟動一個job,?ReceiverTracker有很多這樣的case class用于通信。

private[streaming] case class RestartReceiver(receiver: Receiver[_])

? extends ReceiverTrackerLocalMessage

再比如此類相同的消息

/**

?* This message will trigger ReceiverTrackerEndpoint to send stop signals to all registered

?* receivers.

?*/

private[streaming] case object StopAllReceivers extends ReceiverTrackerLocalMessage

注意:param skipReceiverLaunch Do not launch the receiver. This is useful for testing.,如下圖

簡單的來說,ReceiverTracker可以簡單的說包括Receiver的數據的啟動接收,管理,回收三個過程。

?

事先來個預告,我們將把Streaming流處理的所有的代碼一行行的過濾,講整個streaming通過一滴水看世界。

?

所有的輸入流都會交給grapx對象,因為該對象會將所有的待調度的數據統一調度。

內部還有一個成員叫做ReceiverBlockTracker

ListenerBus非常的重要,后續我們會重點分析ListenerBus的源代碼,它在監控層面起著重要的作用。

在這里,可以看出ReceiverTracker的狀態有如下的4種狀態,分別為

初始化,開始,正在停止中,停止了。

接收到ReceiverSuperVisorImpl遠程發送過來的消息之后進行處理的過程在此。

這也是今天的重點之一。

先寫日志后再進行下一步操作,這里是出于容錯的原因考慮的。

注意:這里如果指定了checkpoint目錄的話,才會使得isWriteAheadLogEnabled為true.

ReceivedBlockTrackerLogEvent其實就是元數據信息。

用一個HashMap結構將Stream 與 BlockQueue中的Block一一對應,可謂是真的巧妙到了極點。

?

再回到我們的消息通信層面。

回復對方,告知對方,addBlock成功。并且保存有數據的元數據信息。

ReceivedBlockTracker類的主要的任務在于將Block分配給沒有分配Block的Stream batch。

這是具體分配Block給batch的代碼。

這里說明具體的分配是以batch time為單位分配的.

再次看看消息通信體。

這里說啟動所有的Receiver.

啟動所有的receiver

這樣,整個數據接收的環節就打通了。

最后做點補充:

該階段是CleanupOldBlocks階段,此時將發送消息給ReceiverSuperVisorImpl,從而讓它執行cleanUpOldBlocks方法。


/** Update a receiver's maximum ingestion rate */

最后stopAllReceivers,結束了。


?

轉載于:https://www.cnblogs.com/lilingi/p/5515381.html

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/4/192159.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息