來源 | Alice菌
責編 | Carol
封圖 |? CSDN 下載于視覺中國
出品 | CSDN(ID:CSDNnews)
相信很多小伙伴已經接觸過 SparkStreaming 了,理論就不講太多了,今天的內容主要是為大家帶來的是 SparkStreaming 整合 Kafka 的教程。
kafka python、文中含代碼,感興趣的朋友可以復制動手試試!
Kafka回顧
kafka c++,正式開始之前,先讓我們來對Kafka回顧一波。
核心概念圖解
Broker:安裝Kafka服務的機器就是一個broker
Producer:消息的生產者,負責將數據寫入到broker中(push)
Consumer:消息的消費者,負責從kafka中拉取數據(pull),老版本的消費者需要依賴zk,新版本的不需要
Topic:?主題,相當于是數據的一個分類,不同topic存放不同業務的數據?–主題:區分業務
Replication:副本,數據保存多少份(保證數據不丟失)?–副本:數據安全
Partition:分區,是一個物理的分區,一個分區就是一個文件,一個Topic可以有1~n個分區,每個分區都有自己的副本?–分區:并發讀寫
Consumer Group:消費者組,一個topic可以有多個消費者/組同時消費,多個消費者如果在一個消費者組中,那么他們不能重復消費數據?–消費者組:提高消費者消費速度、方便統一管理
注意[1]:一個Topic可以被多個消費者或者組訂閱,一個消費者/組也可以訂閱多個主題
注意[2]:讀數據只能從Leader讀, 寫數據也只能往Leader寫,Follower會從Leader那里同步數據過來做副本!!!
常用命令
啟動kafka
/export/servers/kafka/bin/kafka-server-start.sh?-daemon?/export/servers/kafka/config/server.properties?
停止kafka
/export/servers/kafka/bin/kafka-server-stop.sh?
查看topic信息
/export/servers/kafka/bin/kafka-topics.sh?--list?--zookeeper?node01:2181
創建topic
/export/servers/kafka/bin/kafka-topics.sh?--create?--zookeeper?node01:2181?--replication-factor?3?--partitions?3?--topic?test
查看某個topic信息
/export/servers/kafka/bin/kafka-topics.sh?--describe?--zookeeper?node01:2181?--topic?test
刪除topic
/export/servers/kafka/bin/kafka-topics.sh?--zookeeper?node01:2181?--delete?--topic?test
啟動生產者–控制臺的生產者一般用于測試
/export/servers/kafka/bin/kafka-console-producer.sh?--broker-list?node01:9092?--topic?spark_kafka
啟動消費者–控制臺的消費者一般用于測試
/export/servers/kafka/bin/kafka-console-consumer.sh?--zookeeper?node01:2181?--topic?spark_kafka--from-beginning
消費者連接到borker的地址
/export/servers/kafka/bin/kafka-console-consumer.sh?--bootstrap-server?node01:9092,node02:9092,node03:9092?--topic?spark_kafka?--from-beginning?
這同時也是一個面試題的熱點。
開發中我們經常會利用SparkStreaming實時地讀取kafka中的數據然后進行處理,在spark1.3版本后,kafkaUtils里面提供了兩種創建DStream的方法:
1、Receiver接收方式:
KafkaUtils.createDstream(開發中不用,了解即可,但是面試可能會問)。
Receiver作為常駐的Task運行在Executor等待數據,但是一個Receiver效率低,需要開啟多個,再手動合并數據(union),再進行處理,很麻煩
Receiver哪臺機器掛了,可能會丟失數據,所以需要開啟WAL(預寫日志)保證數據安全,那么效率又會降低!
Receiver方式是通過zookeeper來連接kafka隊列,調用Kafka高階API,offset存儲在zookeeper,由Receiver維護。
spark在消費的時候為了保證數據不丟也會在Checkpoint中存一份offset,可能會出現數據不一致
所以不管從何種角度來說,Receiver模式都不適合在開發中使用了,已經淘汰了
2、Direct直連方式
KafkaUtils.createDirectStream(開發中使用,要求掌握)
Direct方式是直接連接kafka分區來獲取數據,從每個分區直接讀取數據大大提高了并行能力
Direct方式調用Kafka低階API(底層API),offset自己存儲和維護,默認由Spark維護在checkpoint中,消除了與zk不一致的情況
當然也可以自己手動維護,把offset存在mysql、redis中
所以基于Direct模式可以在開發中使用,且借助Direct模式的特點+手動操作可以保證數據的Exactly once 精準一次
總結:
Receiver接收方式
多個Receiver接受數據效率高,但有丟失數據的風險
開啟日志(WAL)可防止數據丟失,但寫兩遍數據效率低。
Zookeeper維護offset有重復消費數據可能。
使用高層次的API
Direct直連方式
不使用Receiver,直接到kafka分區中讀取數據
不使用日志(WAL)機制
Spark自己維護offset
使用低層次的API
擴展:關于消息語義
注意:
開發中SparkStreaming和kafka集成有兩個版本:0.8及0.10+
0.8版本有Receiver和Direct模式(但是0.8版本生產環境問題較多,在Spark2.3之后不支持0.8版本了)。
0.10以后只保留了direct模式(Reveiver模式不適合生產環境),并且0.10版本API有變化(更加強大)
結論:
我們學習和開發都直接使用0.10版本中的direct模式,但是關于Receiver和Direct的區別面試的時候要能夠答得上來
spark-streaming-kafka-0-8(了解)
1.Receiver
KafkaUtils.createDstream使用了receivers來接收數據,利用的是Kafka高層次的消費者api,偏移量由Receiver維護在zk中,對于所有的receivers接收到的數據將會保存在Spark executors中,然后通過Spark Streaming啟動job來處理這些數據,默認會丟失,可啟用WAL日志,它同步將接受到數據保存到分布式文件系統上比如HDFS。保證數據在出錯的情況下可以恢復出來。盡管這種方式配合著WAL機制可以保證數據零丟失的高可靠性,但是啟用了WAL效率會較低,且無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。
(官方現在已經不推薦這種整合方式。)
準備工作
1)啟動zookeeper集群
zkServer.sh?start
2)啟動kafka集群
kafka-server-start.sh??/export/servers/kafka/config/server.properties
3.創建topic
kafka-topics.sh?--create?--zookeeper?node01:2181?--replication-factor?1?--partitions?3?--topic?spark_kafka
4.通過shell命令向topic發送消息
kafka-console-producer.sh?--broker-list?node01:9092?--topic??spark_kafka
5.添加kafka的pom依賴
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.2.0</version>
</dependency>
API
通過receiver接收器獲取kafka中topic數據,可以并行運行更多的接收器讀取kafak topic中的數據,這里為3個
?val?receiverDStream:?immutable.IndexedSeq[ReceiverInputDStream[(String,?String)]]?=?(1?to?3).map(x?=>?{val?stream:?ReceiverInputDStream[(String,?String)]?=?KafkaUtils.createStream(ssc,?zkQuorum,?groupId,?topics)stream})
如果啟用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)可以設置存儲級別(默認StorageLevel.MEMORY_AND_DISK_SER_2)
代碼演示
import?org.apache.spark.streaming.dstream.{DStream,?ReceiverInputDStream}
import?org.apache.spark.streaming.kafka.KafkaUtils
import?org.apache.spark.streaming.{Seconds,?StreamingContext}
import?org.apache.spark.{SparkConf,?SparkContext}import?scala.collection.immutableobject?SparkKafka?{def?main(args:?Array[String]):?Unit?=?{//1.創建StreamingContextval?config:?SparkConf?=?
new?SparkConf().setAppName("SparkStream").setMaster("local[*]").set("spark.streaming.receiver.writeAheadLog.enable",?"true")
//開啟WAL預寫日志,保證數據源端可靠性val?sc?=?new?SparkContext(config)sc.setLogLevel("WARN")val?ssc?=?new?StreamingContext(sc,Seconds(5))ssc.checkpoint("./kafka")
//==============================================//2.準備配置參數val?zkQuorum?=?"node01:2181,node02:2181,node03:2181"val?groupId?=?"spark"val?topics?=?Map("spark_kafka"?->?2)//2表示每一個topic對應分區都采用2個線程去消費,
//ssc的rdd分區和kafka的topic分區不一樣,增加消費線程數,并不增加spark的并行處理數據數量//3.通過receiver接收器獲取kafka中topic數據,可以并行運行更多的接收器讀取kafak?topic中的數據,這里為3個val?receiverDStream:?immutable.IndexedSeq[ReceiverInputDStream[(String,?String)]]?=?(1?to?3).map(x?=>?{val?stream:?ReceiverInputDStream[(String,?String)]?=?KafkaUtils.createStream(ssc,?zkQuorum,?groupId,?topics)stream})//4.使用union方法,將所有receiver接受器產生的Dstream進行合并val?allDStream:?DStream[(String,?String)]?=?ssc.union(receiverDStream)//5.獲取topic的數據(String,?String)?第1個String表示topic的名稱,第2個String表示topic的數據val?data:?DStream[String]?=?allDStream.map(_._2)
//==============================================//6.WordCountval?words:?DStream[String]?=?data.flatMap(_.split("?"))val?wordAndOne:?DStream[(String,?Int)]?=?words.map((_,?1))val?result:?DStream[(String,?Int)]?=?wordAndOne.reduceByKey(_?+?_)result.print()ssc.start()ssc.awaitTermination()}
}
Direct方式會定期地從kafka的topic下對應的partition中查詢最新的偏移量,再根據偏移量范圍在每個batch里面處理數據,Spark通過調用kafka簡單的消費者API讀取一定范圍的數據。
Direct的缺點是無法使用基于zookeeper的kafka監控工具
Direct相比基于Receiver方式有幾個優點:
簡化并行
不需要創建多個kafka輸入流,然后union它們,sparkStreaming將會創建和kafka分區數一樣的rdd的分區數,而且會從kafka中并行讀取數據,spark中RDD的分區數和kafka中的分區數據是一一對應的關系。
高效?
Receiver實現數據的零丟失是將數據預先保存在WAL中,會復制一遍數據,會導致數據被拷貝兩次,第一次是被kafka復制,另一次是寫到WAL中。而Direct不使用WAL消除了這個問題。
恰好一次語義(Exactly-once-semantics)
Receiver讀取kafka數據是通過kafka高層次api把偏移量寫入zookeeper中,雖然這種方法可以通過數據保存在WAL中保證數據不丟失,但是可能會因為sparkStreaming和ZK中保存的偏移量不一致而導致數據被消費了多次。
? ? ? ? Direct的Exactly-once-semantics(EOS)通過實現kafka低層次api,偏移量僅僅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的問題。
API
KafkaUtils.createDirectStream[String,?String,?StringDecoder,?StringDecoder](ssc,?kafkaParams,?topics)
代碼演示
import?kafka.serializer.StringDecoder
import?org.apache.spark.streaming.dstream.{DStream,?InputDStream}
import?org.apache.spark.streaming.kafka.KafkaUtils
import?org.apache.spark.streaming.{Seconds,?StreamingContext}
import?org.apache.spark.{SparkConf,?SparkContext}object?SparkKafka2?{def?main(args:?Array[String]):?Unit?=?{//1.創建StreamingContextval?config:?SparkConf?=?
new?SparkConf().setAppName("SparkStream").setMaster("local[*]")val?sc?=?new?SparkContext(config)sc.setLogLevel("WARN")val?ssc?=?new?StreamingContext(sc,Seconds(5))ssc.checkpoint("./kafka")//==============================================//2.準備配置參數val?kafkaParams?=?Map("metadata.broker.list"?->?"node01:9092,node02:9092,node03:9092",?"group.id"?->?"spark")val?topics?=?Set("spark_kafka")val?allDStream:?InputDStream[(String,?String)]?=?KafkaUtils.createDirectStream[String,?String,?StringDecoder,?StringDecoder](ssc,?kafkaParams,?topics)//3.獲取topic的數據val?data:?DStream[String]?=?allDStream.map(_._2)//==============================================//WordCountval?words:?DStream[String]?=?data.flatMap(_.split("?"))val?wordAndOne:?DStream[(String,?Int)]?=?words.map((_,?1))val?result:?DStream[(String,?Int)]?=?wordAndOne.reduceByKey(_?+?_)result.print()ssc.start()ssc.awaitTermination()}
}
說明
spark-streaming-kafka-0-10版本中,API有一定的變化,操作更加靈活,開發中使用
pom.xml
<!--<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>${spark.version}</version>
</dependency>-->
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version>
</dependency>
API:
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
創建topic
/export/servers/kafka/bin/kafka-topics.sh?--create?--zookeeper?node01:2181?--replication-factor?3?--partitions?3?--topic?spark_kafka
啟動生產者
/export/servers/kafka/bin/kafka-console-producer.sh?--broker-list?node01:9092,node01:9092,node01:9092?--topic?spark_kafka
代碼演示
import?org.apache.kafka.clients.consumer.ConsumerRecord
import?org.apache.kafka.common.serialization.StringDeserializer
import?org.apache.spark.streaming.dstream.{DStream,?InputDStream}
import?org.apache.spark.streaming.kafka010.{ConsumerStrategies,?KafkaUtils,?LocationStrategies}
import?org.apache.spark.streaming.{Seconds,?StreamingContext}
import?org.apache.spark.{SparkConf,?SparkContext}object?SparkKafkaDemo?{def?main(args:?Array[String]):?Unit?=?{//1.創建StreamingContext//spark.master?should?be?set?as?local[n],?n?>?1val?conf?=?new?SparkConf().setAppName("wc").setMaster("local[*]")val?sc?=?new?SparkContext(conf)sc.setLogLevel("WARN")val?ssc?=?new?StreamingContext(sc,Seconds(5))//5表示5秒中對數據進行切分形成一個RDD//準備連接Kafka的參數val?kafkaParams?=?Map[String,?Object]("bootstrap.servers"?->?"node01:9092,node02:9092,node03:9092","key.deserializer"?->?classOf[StringDeserializer],"value.deserializer"?->?classOf[StringDeserializer],"group.id"?->?"SparkKafkaDemo",//earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費//latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據//none:topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常//這里配置latest自動重置偏移量為最新的偏移量,即如果有偏移量從偏移量位置開始消費,沒有偏移量從新來的數據開始消費"auto.offset.reset"?->?"latest",//false表示關閉自動提交.由spark幫你提交到Checkpoint或程序員手動維護"enable.auto.commit"?->?(false:?java.lang.Boolean))val?topics?=?Array("spark_kafka")//2.使用KafkaUtil連接Kafak獲取數據val?recordDStream:?InputDStream[ConsumerRecord[String,?String]]?=?KafkaUtils.createDirectStream[String,?String](ssc,LocationStrategies.PreferConsistent,//位置策略,源碼強烈推薦使用該策略,會讓Spark的Executor和Kafka的Broker均勻對應ConsumerStrategies.Subscribe[String,?String](topics,?kafkaParams))//消費策略,源碼強烈推薦使用該策略//3.獲取VALUE數據val?lineDStream:?DStream[String]?=?recordDStream.map(_.value())//_指的是ConsumerRecordval?wrodDStream:?DStream[String]?=?lineDStream.flatMap(_.split("?"))?//_指的是發過來的value,即一行數據val?wordAndOneDStream:?DStream[(String,?Int)]?=?wrodDStream.map((_,1))val?result:?DStream[(String,?Int)]?=?wordAndOneDStream.reduceByKey(_+_)result.print()ssc.start()//開啟ssc.awaitTermination()//等待優雅停止}
}
好了,本篇主要講解的 SparkStreaming 整合 Kafka 的過程,并帶大家復習了一波Kafka的基礎知識,如果對你有用的話,麻煩動手手點個“在看”吧~
本文由作者首發 CSDN 博客,原文鏈接:
https://blog.csdn.net/weixin_44318830/article/details/105612516
【END】
更多精彩推薦
?開源激蕩 30 年:從免費社區到價值數十億美元公司
?理解 AI 最偉大的成就之一:卷積神經網絡的局限性
GitHub 標星 10,000+,Apache 頂級項目 ShardingSphere 的開源之路
港科大鄭光廷院士問診未來,揭露 AI 最新應用與實踐
?大促下的智能運維挑戰:阿里如何抗住“雙11貓晚”?
?以太坊2.0中的Custody Game及MPC實現
?很用心的為你寫了 9 道 MySQL 面試題,建議收藏!
你點的每個“在看”,我都認真當成了喜歡
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态