kafka python,超詳細!一文詳解 SparkStreaming 如何整合 Kafka !附代碼可實踐

 2023-10-22 阅读 24 评论 0

摘要:來源 | Alice菌責編 | Carol封圖 |? CSDN 下載于視覺中國出品 | CSDN(ID:CSDNnews)相信很多小伙伴已經接觸過 SparkStreaming 了,理論就不講太多了,今天的內容主要是為大家帶來的是 SparkStreaming 整合 Kafka 的教程。kafka python、文中含代碼

來源 | Alice菌

責編 | Carol

封圖 |? CSDN 下載于視覺中國

出品 | CSDN(ID:CSDNnews)

相信很多小伙伴已經接觸過 SparkStreaming 了,理論就不講太多了,今天的內容主要是為大家帶來的是 SparkStreaming 整合 Kafka 的教程。

kafka python、文中含代碼,感興趣的朋友可以復制動手試試!

Kafka回顧

kafka c++,正式開始之前,先讓我們來對Kafka回顧一波。

  • 核心概念圖解

Broker:安裝Kafka服務的機器就是一個broker

Producer:消息的生產者,負責將數據寫入到broker中(push)

Consumer:消息的消費者,負責從kafka中拉取數據(pull),老版本的消費者需要依賴zk,新版本的不需要

Topic:?主題,相當于是數據的一個分類,不同topic存放不同業務的數據?–主題:區分業務

Replication:副本,數據保存多少份(保證數據不丟失)?–副本:數據安全

Partition:分區,是一個物理的分區,一個分區就是一個文件,一個Topic可以有1~n個分區,每個分區都有自己的副本?–分區:并發讀寫

Consumer Group:消費者組,一個topic可以有多個消費者/組同時消費,多個消費者如果在一個消費者組中,那么他們不能重復消費數據?–消費者組:提高消費者消費速度、方便統一管理

注意[1]:一個Topic可以被多個消費者或者組訂閱,一個消費者/組也可以訂閱多個主題

注意[2]:讀數據只能從Leader讀, 寫數據也只能往Leader寫,Follower會從Leader那里同步數據過來做副本!!!

  • 常用命令

啟動kafka

/export/servers/kafka/bin/kafka-server-start.sh?-daemon?/export/servers/kafka/config/server.properties?

停止kafka

/export/servers/kafka/bin/kafka-server-stop.sh?

查看topic信息

/export/servers/kafka/bin/kafka-topics.sh?--list?--zookeeper?node01:2181

創建topic

/export/servers/kafka/bin/kafka-topics.sh?--create?--zookeeper?node01:2181?--replication-factor?3?--partitions?3?--topic?test

查看某個topic信息

/export/servers/kafka/bin/kafka-topics.sh?--describe?--zookeeper?node01:2181?--topic?test

刪除topic

/export/servers/kafka/bin/kafka-topics.sh?--zookeeper?node01:2181?--delete?--topic?test

啟動生產者–控制臺的生產者一般用于測試

/export/servers/kafka/bin/kafka-console-producer.sh?--broker-list?node01:9092?--topic?spark_kafka

啟動消費者–控制臺的消費者一般用于測試

/export/servers/kafka/bin/kafka-console-consumer.sh?--zookeeper?node01:2181?--topic?spark_kafka--from-beginning

消費者連接到borker的地址

/export/servers/kafka/bin/kafka-console-consumer.sh?--bootstrap-server?node01:9092,node02:9092,node03:9092?--topic?spark_kafka?--from-beginning?

整合kafka兩種模式說明

這同時也是一個面試題的熱點。

開發中我們經常會利用SparkStreaming實時地讀取kafka中的數據然后進行處理,在spark1.3版本后,kafkaUtils里面提供了兩種創建DStream的方法:

1、Receiver接收方式:

  • KafkaUtils.createDstream(開發中不用,了解即可,但是面試可能會問)。

  • Receiver作為常駐的Task運行在Executor等待數據,但是一個Receiver效率低,需要開啟多個,再手動合并數據(union),再進行處理,很麻煩

  • Receiver哪臺機器掛了,可能會丟失數據,所以需要開啟WAL(預寫日志)保證數據安全,那么效率又會降低!

  • Receiver方式是通過zookeeper來連接kafka隊列,調用Kafka高階API,offset存儲在zookeeper,由Receiver維護。

  • spark在消費的時候為了保證數據不丟也會在Checkpoint中存一份offset,可能會出現數據不一致

  • 所以不管從何種角度來說,Receiver模式都不適合在開發中使用了,已經淘汰了

2、Direct直連方式

  • KafkaUtils.createDirectStream(開發中使用,要求掌握)

  • Direct方式是直接連接kafka分區來獲取數據,從每個分區直接讀取數據大大提高了并行能力

  • Direct方式調用Kafka低階API(底層API),offset自己存儲和維護,默認由Spark維護在checkpoint中,消除了與zk不一致的情況

  • 當然也可以自己手動維護,把offset存在mysql、redis中

  • 所以基于Direct模式可以在開發中使用,且借助Direct模式的特點+手動操作可以保證數據的Exactly once 精準一次

總結:

  • Receiver接收方式

  1. 多個Receiver接受數據效率高,但有丟失數據的風險

  2. 開啟日志(WAL)可防止數據丟失,但寫兩遍數據效率低。

  3. Zookeeper維護offset有重復消費數據可能。

  4. 使用高層次的API

  • Direct直連方式

  1. 不使用Receiver,直接到kafka分區中讀取數據

  2. 不使用日志(WAL)機制

  3. Spark自己維護offset

  4. 使用低層次的API

擴展:關于消息語義
注意:

開發中SparkStreaming和kafka集成有兩個版本:0.8及0.10+

0.8版本有Receiver和Direct模式(但是0.8版本生產環境問題較多,在Spark2.3之后不支持0.8版本了)。

0.10以后只保留了direct模式(Reveiver模式不適合生產環境),并且0.10版本API有變化(更加強大)

結論:

我們學習和開發都直接使用0.10版本中的direct模式,但是關于Receiver和Direct的區別面試的時候要能夠答得上來

spark-streaming-kafka-0-8(了解)

1.Receiver

KafkaUtils.createDstream使用了receivers來接收數據,利用的是Kafka高層次的消費者api,偏移量由Receiver維護在zk中,對于所有的receivers接收到的數據將會保存在Spark executors中,然后通過Spark Streaming啟動job來處理這些數據,默認會丟失,可啟用WAL日志,它同步將接受到數據保存到分布式文件系統上比如HDFS。保證數據在出錯的情況下可以恢復出來。盡管這種方式配合著WAL機制可以保證數據零丟失的高可靠性,但是啟用了WAL效率會較低,且無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。

(官方現在已經不推薦這種整合方式。)

  • 準備工作

1)啟動zookeeper集群

zkServer.sh?start

2)啟動kafka集群

kafka-server-start.sh??/export/servers/kafka/config/server.properties

3.創建topic

kafka-topics.sh?--create?--zookeeper?node01:2181?--replication-factor?1?--partitions?3?--topic?spark_kafka

4.通過shell命令向topic發送消息

kafka-console-producer.sh?--broker-list?node01:9092?--topic??spark_kafka

5.添加kafka的pom依賴

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.2.0</version>
</dependency>
  • API

通過receiver接收器獲取kafka中topic數據,可以并行運行更多的接收器讀取kafak topic中的數據,這里為3個

?val?receiverDStream:?immutable.IndexedSeq[ReceiverInputDStream[(String,?String)]]?=?(1?to?3).map(x?=>?{val?stream:?ReceiverInputDStream[(String,?String)]?=?KafkaUtils.createStream(ssc,?zkQuorum,?groupId,?topics)stream})

如果啟用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)可以設置存儲級別(默認StorageLevel.MEMORY_AND_DISK_SER_2)

代碼演示

import?org.apache.spark.streaming.dstream.{DStream,?ReceiverInputDStream}
import?org.apache.spark.streaming.kafka.KafkaUtils
import?org.apache.spark.streaming.{Seconds,?StreamingContext}
import?org.apache.spark.{SparkConf,?SparkContext}import?scala.collection.immutableobject?SparkKafka?{def?main(args:?Array[String]):?Unit?=?{//1.創建StreamingContextval?config:?SparkConf?=?
new?SparkConf().setAppName("SparkStream").setMaster("local[*]").set("spark.streaming.receiver.writeAheadLog.enable",?"true")
//開啟WAL預寫日志,保證數據源端可靠性val?sc?=?new?SparkContext(config)sc.setLogLevel("WARN")val?ssc?=?new?StreamingContext(sc,Seconds(5))ssc.checkpoint("./kafka")
//==============================================//2.準備配置參數val?zkQuorum?=?"node01:2181,node02:2181,node03:2181"val?groupId?=?"spark"val?topics?=?Map("spark_kafka"?->?2)//2表示每一個topic對應分區都采用2個線程去消費,
//ssc的rdd分區和kafka的topic分區不一樣,增加消費線程數,并不增加spark的并行處理數據數量//3.通過receiver接收器獲取kafka中topic數據,可以并行運行更多的接收器讀取kafak?topic中的數據,這里為3個val?receiverDStream:?immutable.IndexedSeq[ReceiverInputDStream[(String,?String)]]?=?(1?to?3).map(x?=>?{val?stream:?ReceiverInputDStream[(String,?String)]?=?KafkaUtils.createStream(ssc,?zkQuorum,?groupId,?topics)stream})//4.使用union方法,將所有receiver接受器產生的Dstream進行合并val?allDStream:?DStream[(String,?String)]?=?ssc.union(receiverDStream)//5.獲取topic的數據(String,?String)?第1個String表示topic的名稱,第2個String表示topic的數據val?data:?DStream[String]?=?allDStream.map(_._2)
//==============================================//6.WordCountval?words:?DStream[String]?=?data.flatMap(_.split("?"))val?wordAndOne:?DStream[(String,?Int)]?=?words.map((_,?1))val?result:?DStream[(String,?Int)]?=?wordAndOne.reduceByKey(_?+?_)result.print()ssc.start()ssc.awaitTermination()}
}

2.Direct

Direct方式會定期地從kafka的topic下對應的partition中查詢最新的偏移量,再根據偏移量范圍在每個batch里面處理數據,Spark通過調用kafka簡單的消費者API讀取一定范圍的數據。

  • Direct的缺點是無法使用基于zookeeper的kafka監控工具

  • Direct相比基于Receiver方式有幾個優點:

  1. 簡化并行

    不需要創建多個kafka輸入流,然后union它們,sparkStreaming將會創建和kafka分區數一樣的rdd的分區數,而且會從kafka中并行讀取數據,spark中RDD的分區數和kafka中的分區數據是一一對應的關系。

  2. 高效?

    Receiver實現數據的零丟失是將數據預先保存在WAL中,會復制一遍數據,會導致數據被拷貝兩次,第一次是被kafka復制,另一次是寫到WAL中。而Direct不使用WAL消除了這個問題。

  3. 恰好一次語義(Exactly-once-semantics)

    Receiver讀取kafka數據是通過kafka高層次api把偏移量寫入zookeeper中,雖然這種方法可以通過數據保存在WAL中保證數據不丟失,但是可能會因為sparkStreaming和ZK中保存的偏移量不一致而導致數據被消費了多次。

? ? ? ? Direct的Exactly-once-semantics(EOS)通過實現kafka低層次api,偏移量僅僅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的問題。

  • API

KafkaUtils.createDirectStream[String,?String,?StringDecoder,?StringDecoder](ssc,?kafkaParams,?topics)

代碼演示

import?kafka.serializer.StringDecoder
import?org.apache.spark.streaming.dstream.{DStream,?InputDStream}
import?org.apache.spark.streaming.kafka.KafkaUtils
import?org.apache.spark.streaming.{Seconds,?StreamingContext}
import?org.apache.spark.{SparkConf,?SparkContext}object?SparkKafka2?{def?main(args:?Array[String]):?Unit?=?{//1.創建StreamingContextval?config:?SparkConf?=?
new?SparkConf().setAppName("SparkStream").setMaster("local[*]")val?sc?=?new?SparkContext(config)sc.setLogLevel("WARN")val?ssc?=?new?StreamingContext(sc,Seconds(5))ssc.checkpoint("./kafka")//==============================================//2.準備配置參數val?kafkaParams?=?Map("metadata.broker.list"?->?"node01:9092,node02:9092,node03:9092",?"group.id"?->?"spark")val?topics?=?Set("spark_kafka")val?allDStream:?InputDStream[(String,?String)]?=?KafkaUtils.createDirectStream[String,?String,?StringDecoder,?StringDecoder](ssc,?kafkaParams,?topics)//3.獲取topic的數據val?data:?DStream[String]?=?allDStream.map(_._2)//==============================================//WordCountval?words:?DStream[String]?=?data.flatMap(_.split("?"))val?wordAndOne:?DStream[(String,?Int)]?=?words.map((_,?1))val?result:?DStream[(String,?Int)]?=?wordAndOne.reduceByKey(_?+?_)result.print()ssc.start()ssc.awaitTermination()}
}


spark-streaming-kafka-0-10

  • 說明

spark-streaming-kafka-0-10版本中,API有一定的變化,操作更加靈活,開發中使用

  • pom.xml

<!--<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>${spark.version}</version>
</dependency>-->
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version>
</dependency>
  • API:

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

  • 創建topic

/export/servers/kafka/bin/kafka-topics.sh?--create?--zookeeper?node01:2181?--replication-factor?3?--partitions?3?--topic?spark_kafka
  • 啟動生產者

/export/servers/kafka/bin/kafka-console-producer.sh?--broker-list?node01:9092,node01:9092,node01:9092?--topic?spark_kafka
  • 代碼演示

import?org.apache.kafka.clients.consumer.ConsumerRecord
import?org.apache.kafka.common.serialization.StringDeserializer
import?org.apache.spark.streaming.dstream.{DStream,?InputDStream}
import?org.apache.spark.streaming.kafka010.{ConsumerStrategies,?KafkaUtils,?LocationStrategies}
import?org.apache.spark.streaming.{Seconds,?StreamingContext}
import?org.apache.spark.{SparkConf,?SparkContext}object?SparkKafkaDemo?{def?main(args:?Array[String]):?Unit?=?{//1.創建StreamingContext//spark.master?should?be?set?as?local[n],?n?>?1val?conf?=?new?SparkConf().setAppName("wc").setMaster("local[*]")val?sc?=?new?SparkContext(conf)sc.setLogLevel("WARN")val?ssc?=?new?StreamingContext(sc,Seconds(5))//5表示5秒中對數據進行切分形成一個RDD//準備連接Kafka的參數val?kafkaParams?=?Map[String,?Object]("bootstrap.servers"?->?"node01:9092,node02:9092,node03:9092","key.deserializer"?->?classOf[StringDeserializer],"value.deserializer"?->?classOf[StringDeserializer],"group.id"?->?"SparkKafkaDemo",//earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費//latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據//none:topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常//這里配置latest自動重置偏移量為最新的偏移量,即如果有偏移量從偏移量位置開始消費,沒有偏移量從新來的數據開始消費"auto.offset.reset"?->?"latest",//false表示關閉自動提交.由spark幫你提交到Checkpoint或程序員手動維護"enable.auto.commit"?->?(false:?java.lang.Boolean))val?topics?=?Array("spark_kafka")//2.使用KafkaUtil連接Kafak獲取數據val?recordDStream:?InputDStream[ConsumerRecord[String,?String]]?=?KafkaUtils.createDirectStream[String,?String](ssc,LocationStrategies.PreferConsistent,//位置策略,源碼強烈推薦使用該策略,會讓Spark的Executor和Kafka的Broker均勻對應ConsumerStrategies.Subscribe[String,?String](topics,?kafkaParams))//消費策略,源碼強烈推薦使用該策略//3.獲取VALUE數據val?lineDStream:?DStream[String]?=?recordDStream.map(_.value())//_指的是ConsumerRecordval?wrodDStream:?DStream[String]?=?lineDStream.flatMap(_.split("?"))?//_指的是發過來的value,即一行數據val?wordAndOneDStream:?DStream[(String,?Int)]?=?wrodDStream.map((_,1))val?result:?DStream[(String,?Int)]?=?wordAndOneDStream.reduceByKey(_+_)result.print()ssc.start()//開啟ssc.awaitTermination()//等待優雅停止}
}

好了,本篇主要講解的 SparkStreaming 整合 Kafka 的過程,并帶大家復習了一波Kafka的基礎知識,如果對你有用的話,麻煩動手手點個“在看”吧~

本文由作者首發 CSDN 博客,原文鏈接:

https://blog.csdn.net/weixin_44318830/article/details/105612516

【END】

更多精彩推薦

?開源激蕩 30 年:從免費社區到價值數十億美元公司

?理解 AI 最偉大的成就之一:卷積神經網絡的局限性

GitHub 標星 10,000+,Apache 頂級項目 ShardingSphere 的開源之路

港科大鄭光廷院士問診未來,揭露 AI 最新應用與實踐

?大促下的智能運維挑戰:阿里如何抗住“雙11貓晚”?

?以太坊2.0中的Custody Game及MPC實現

?很用心的為你寫了 9 道 MySQL 面試題,建議收藏!

你點的每個“在看”,我都認真當成了喜歡

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/3/161940.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息