Apache Kafka 編程實戰您可能感性的文章:
kafka有什么用。Apache-Kafka簡介
Apache Kafka安裝和使用
Apache-Kafka核心概念
Apache-Kafka核心組件和流程-協調器
Apache-Kafka核心組件和流程(副本管理器)
Apache-Kafka 核心組件和流程-控制器
Apache-Kafka核心組件和流程-日志管理器
....
通過前幾章的學習,我們已經從宏觀層面了解了kafka的設計理念。包括kafka集群的組成、消息的主題、主題的分區、分區的副本等內容。接下來我們會繼續深入,了解kafka的主要組件以及核心的流程,最后還會介紹kafka的消息是如何存儲的。此章非常重要,通過本章和上一章的學習,你已經能夠掌握kafka 80%的核心內容。當然隨著學習的深入,難度也會越來越大,有任何問題歡迎留言或者私信。
Kafka主要的組件如下
我們將會逐個進行講解,講解過長還將保持前面章節的特點,多用有形的圖表幫助讀者理解。本篇博客先講解控制器部分。
1、控制器
在前一章的學習中,我們已經知道Kafka的集群由n個的broker所組成,每個broker就是一個kafka的實例或者稱之為kafka的服務。其實控制器也是一個broker,控制器也叫leader broker。他除了具有一般broker的功能外,還負責分區leader的選取,也就是負責選舉partition的leader replica。控制器是kafka核心中的核心,需要重點學習和理解。
控制器選舉
kafka每個broker啟動的時候,都會實例化一個KafkaController,并將broker的id注冊到zookeeper,這在第二章中已經通過例子做過講解。集群在啟動過程中,通過選舉機制選舉出其中一個broker作為leader,也就是前面所說的控制器。
包括集群啟動在內,有三種情況觸發控制器選舉:
1、集群啟動
2、控制器所在代理發生故障
3、zookeeper心跳感知,控制器與自己的session過期
按照慣例,先看圖。我們根據下圖來講解集群啟動時,控制器選舉過程。
image
假設此集群有三個broker,同時啟動。
(一)3個broker從zookeeper獲取/controller臨時節點信息。/controller存儲的是選舉出來的leader信息。此舉是為了確認是否已經存在leader。
(二)如果還沒有選舉出leader,那么此節點是不存在的,返回-1。如果返回的不是-1,而是leader的json數據,那么說明已經有leader存在,選舉結束。
(三)三個broker發現返回-1,了解到目前沒有leader,于是均會觸發向臨時節點/controller寫入自己的信息。最先寫入的就會成為leader。
(四)假設broker 0的速度最快,他先寫入了/controller節點,那么他就成為了leader。而broker1、broker2很不幸,因為晚了一步,他們在寫/controller的過程中會拋出ZkNodeExistsException,也就是zk告訴他們,此節點已經存在了。
經過以上四步,broker 0成功寫入/controller節點,其它broker寫入失敗了,所以broker 0成功當選leader。
此外zk中還有controller_epoch節點,存儲了leader的變更次數,初始值為0,以后leader每變一次,該值+1。所有向控制器發起的請求,都會攜帶此值。如果控制器和自己內存中比較,請求值小,說明kafka集群已經發生了新的選舉,此請求過期,此請求無效。如果請求值大于控制器內存的值,說明已經有新的控制器當選了,自己已經退位,請求無效。kafka通過controller_epoch保證集群控制器的唯一性及操作的一致性。
由此可見,Kafka控制器選舉就是看誰先爭搶到/controller節點寫入自身信息。
控制器初始化
控制器的初始化,其實是初始化控制器所用到的組件及監聽器,準備元數據。
前面提到過每個broker都會實例化并啟動一個KafkaController。KafkaController和他的組件關系,以及各個組件的介紹如下圖:
image
圖中箭頭為組件層級關系,組件下面還會再初始化其他組件。可見控制器內部還是有些復雜的,主要有以下組件:
1、ControllerContext,此對象存儲了控制器工作需要的所有上下文信息,包括存活的代理、所有主題及分區分配方案、每個分區的AR、leader、ISR等信息。
2、一系列的listener,通過對zookeeper的監聽,觸發相應的操作,黃色的框的均為listener
3、分區和副本狀態機,管理分區和副本。
4、當前代理選舉器ZookeeperLeaderElector,此選舉器有上位和退位的相關回調方法。
5、分區leader選舉器,PartitionLeaderSelector
6、主題刪除管理器,TopicDeletetionManager
7、leader向broker批量通信的ControllerBrokerRequestBatch。緩存狀態機處理后產生的request,然后統一發送出去。
8、控制器平衡操作的KafkaScheduler,僅在broker作為leader時有效。
圖片是我根據資料所總結,個人認為對于理解kafkaController的全貌很有幫助。本章節后面講到相應組件和流程時,還需要反復回來理解此圖,思考組件所處的位置,對整體的作用。
故障轉移
故障轉移其實就是leader所在broker發生故障,leader轉移為其他的broker。轉移的過程就是重新選舉leader的過程。
重新選舉leader后,需要為該broker注冊相應權限,調用的是ZookeeperLeaderElector的onControllerFailover()方法。在這個方法中初始化和啟動了一系列的組件來完成leader的各種操作。具體如下,其實和控制器初始化有很大的相似度。
1、注冊分區管理的相關監聽器
監聽名稱監聽zookeeper節點作用PartitionsReassignedListener/admin/reassign_partitions節點變化將會引發分區重分配IsrChangeNotificationListener/isr_change_notification處理分區的ISR發生變化引發的操作PreferredReplicaElectionListener/admin/preferred_replica_election將優先副本選舉為leader副本
2、注冊主題管理的相關監聽
監聽名稱監聽zookeeper節點作用TopicChangeListener/brokers/topics監聽主題發生變化時進行相應操作DeleteTopicsListener/admin/delete_topics完成服務器端刪除主題的相應操作。否則客戶端刪除主題僅僅是表示刪除
3、注冊代理變化監聽器
監聽名稱監聽zookeeper節點作用BrokerChangeListener/brokers/ids代理發生增減的時候進行相應的處理
4、重新初始化ControllerContext,
5、啟動控制器和其他代理之間通信的ControllerChannelManager
6、創建用于刪除主題的TopicDeletionManager對象,并啟動。
7、啟動分區狀態機和副本狀態機
8、輪詢每個主題,添加監聽分區變化的PartitionModificationsListener
9、如果設置了分區平衡定時操作,那么創建分區平衡的定時任務,默認300秒檢查并執行。
除了這些組件的啟動外,onControllerFailover方法中還做了如下操作:
1、/controller_epoch值+1,并且更新到ControllerContext
2、檢查是否出發分區重分配,并做相關操作
3、檢查需要將優先副本選為leader,并做相關操作
4、向kafka集群所有代理發送更新元數據的請求。
下面來看代理下線的方法onControllerResignation
1、該方法中注銷了控制器的權限。取消在zookeeper中對于分區、副本感知的相應監聽器的監聽。
2、關閉啟動的各個組件
3、最后把ControllerContext中記錄控制器版本的數值清零,并設置當前broker為RunnignAsBroker,變為普通的broker。
通過對控制器啟動過程的學習,我們應該已經對kafka工作的原理有了了解,核心是監聽zookeeper的相關節點,節點變化時觸發相應的操作。其它的處理流程都是相類似的。本篇教程接下來做簡要介紹,想要了解詳情的,可以先找其它資料。我后續也會再補充更為詳細的教程。
代理上下線
有新的broker加入集群時,稱為代理上線。反之,當broker關閉,推出集群時,稱為代理下線。
代理上線:
1、新代理啟動時向/brokers/ids寫數據
2、BrokerChangeListener監聽到變化。對新上線節點調用controllerChannelManager.addBroker(),完成新上線代理網絡層初始化
3、調用KafkaController.onBrokerStartup()處理
3.1通過向所有代理發送UpdateMetadataRequest,告訴所有代理有新代理加入
3.2根據分配給新上線節點的副本集合,對副本狀態做變遷。對分區也進行處理。
3.3觸發一次leader選舉,確認新加入的是否為分區leader
3.4輪詢分配給新broker的副本,調用KafkaController.onPartitionReassignment(),執行分區副本分配
3.5恢復因新代理上線暫停的刪除主題操作線程
代理下線:
1、查找下線節點集合
2、輪詢下線節點,調用controllerChannelManager.removeBroker(),關閉每個下線節點網絡連接。清空下線節點消息隊列,關閉下線節點request請求
3、輪詢下線節點,調用KafkaController.onBrokerFailure處理
3.1處理leader副本在下線節點上上的分區,重新選出leader副本,發送updateMetadataRequest請求。
3.2處理下線節點上的副本集合,做下線處理,從ISR集合中刪除,不再同步,發送updateMetadataRequest請求。
4、向集群全部存活代理發送updateMetadataRequest請求
主題管理
通過分區狀態機及副本狀態機來進行主題管理
1、創建主題
/brokers/topics下創建主題對應子節點
TopicChangeListener監聽此節點
變化時獲取重入鎖ReentrantLock,調用handleChildChange方法進行處理。
通過對比zookeeper中/brokers/topics存儲的主題集合及控制器的ControllerContext中緩存的主題集合的差集,得到新增的主題。反過來求差集,得到刪除的主題。
接下來遍歷新增的主題集合,進行主題操作的實質性操作。之前僅僅是在zookeeper中添加了主題。新增主題涉及的操作有分區、副本狀態的轉化、分區leader的分配、分區存儲日志的創建等。
2、刪除主題
/admin/delete_topics創建刪除主題的子節點
DeleteTopicsListener監聽此節點,
變化時獲取重入鎖ReentrantLock,進行處理
具體的刪除邏輯再次就不再詳述。
分區管理
1、分區自動平衡
onControllerFailover方法中啟動分區自動平衡任務。定時檢查是否失去平衡。
自動平衡的操作就是把優先副本選為分區leader,AR中第一個副本為優先副本。
先查出所有可用副本,以分區AR頭節點分組。
輪詢代理節點,判斷分區不平衡率是否超過10%(leader為非優先副本的分區/該代理分區總數),則調用onPreferredReplicaElection(),讓優先副本成為leader。達到自動平衡。
分區平衡操作的流程已經在第三章做了很詳細的講解,此處不再重復,可以參考kafka核心概念。
2、分區重分配
當zk節點/admin/reassign_partitions變化時,觸發分區重分配操作。該節點存儲分區重分配的方案。
通過計算主題分區原AR(OAR)和重新分配后的AR(RAR),分別做相應處理:
1、OAR+RAR:更新到該主題分區AR,并通知副本節點同步。leader_epoch+1
2、RAR-OAR:副本設為NewReplica。
3、(OAR+RAR)- RAR:需要下線的副本,做下線操作
具體流程不再詳述
小結:關于控制器的相關知識點就先講到這里,控制器初始化中的那張圖需要充分去理解,理解了此圖,對控制器內部的構造,以及控制器要做什么事情、如何做的,就已經掌握了。
你真的不關注一下嘛~
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态