kafka专题:kafka单机和集群安装详情,Spring Boot如何整合Kafka

 2023-09-15 阅读 16 评论 0

摘要:文章目录1. kafka单机安装1.1 server.properties核心配置2. kafka集群安装2.1 kafka集群可视化界面3. springboot如何整合kafka4. 生产者、消费者更多属性配置 1. kafka单机安装 安装jdk # 由于Kafka是用Scala语言开发的,运行在JVM上, # 因此在安装Kafka之前需

文章目录

  • 1. kafka单机安装
    • 1.1 server.properties核心配置
  • 2. kafka集群安装
    • 2.1 kafka集群可视化界面
  • 3. springboot如何整合kafka
  • 4. 生产者、消费者更多属性配置


1. kafka单机安装

安装jdk

# 由于Kafka是用Scala语言开发的,运行在JVM上,
# 因此在安装Kafka之前需要先安装JDK,如已安装请忽略
yum install java‐1.8.0‐openjdk* ‐y

安装zookeeper

# 由于kafka依赖zookeeper,所以需要先安装zookeeper,如已安装请忽略
wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
cd  apache-zookeeper-3.5.8-bin
cp conf/zoo_sample.cfg conf/zoo.cfg# 启动zookeeper
bin/zkServer.sh start
bin/zkCli.sh 
ls /			#查看zk的根目录相关节点

kafka集群迁移?安装单机kafka

# 1.下载2.4.1 release版本,并解压。如果下不下来,可以用ftp上传
wget https://mirror.bit.edu.cn/apache/kafka/2.4.1/kafka_2.11-2.4.1.tgz  # 2.11是scala的版本,2.4.1是kafka的版本
tar -xzf kafka_2.11-2.4.1.tgz
cd kafka_2.11-2.4.1# 2.修改配置文件config/server.properties
#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.65.60:9092   
#kafka的消息存储文件地址
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181# 3.启动服务
# 启动kafka,运行日志在logs目录的server.log文件里
bin/kafka-server-start.sh -daemon config/server.properties   #后台启动,不会打印日志到控制台
或者用
bin/kafka-server-start.sh config/server.properties &# 4. 验证启动是否成功
# 进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树
bin/zkCli.sh 
ls /		#查看zk的根目录kafka相关节点
ls /brokers/ids	#查看kafka节点# 停止kafka
bin/kafka-server-stop.sh

当kafka启动成功时,会在zookeeper节点中记录一些元信息
在这里插入图片描述
注意:途中红框里的 /zookeeper节点是zk启动时自带的节点,其他的节点都是kafka启动时在zookeeper中生成的元信息节点,其中 /broker/ids节点记录了kafka的集群信息,如下:
在这里插入图片描述


1.1 server.properties核心配置

属性配置描述
broker.id0每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯一的即可。log.dirs
log.dirs/tmp/kafka-logskafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。listeners
listenersPLAINTEXT://192.168.65.60:9092server接受客户端连接的端口,ip配置kafka本机ip即可
zookeeper.connectlocalhost:2181zooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接方式为 hostname1:port1, hostname2:port2, hostname3:port3log.retention.hours
log.retention.hours168168每个日志文件删除之前保存的时间(7天)。默认数据保存时间对所有topic都一样。
min.insync.replicas11当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常
delete.topic.enablefalsefalse是否允许删除主题


2. kafka集群安装

kafka集群的作用
①:容灾备份
②:可以部署多个消费者同时消费kafka多个节点,增加消费能力!

        对于kafka来说,一个单独的broker意味着kafka集群中只有一个节点。要想增加kafka集群中的节点数量,只需要多启动几个broker实例即可。本例在一台机器上同时启动三个broker实例构成kafka集群。

# 1.创建其他两个broker的配置文件
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties# 2.分别修改新增的两个配置文件内容
# config/server-1.properties内容如下
# broker.id属性在kafka集群中必须要是唯一
broker.id=1# kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.65.60:9093   
log.dir=/usr/local/data/kafka-logs-1# kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=192.168.65.60:2181# config/server-2.properties内容如下:
broker.id=2
listeners=PLAINTEXT://192.168.65.60:9094
log.dir=/usr/local/data/kafka-logs-2
zookeeper.con菜单nect=192.168.65.60:2181

kafka集群架构?目前我们已经有一个单机zookeeper实例和一个broker实例在运行了,现在我们只需要在启动2个broker实例即可:

# 启动kafka
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties

验证是否启动成功
在这里插入图片描述
zookeeper节点上已接收到kafka的集群节点信息 (0,1,2)表示集群已启动!

2.1 kafka集群可视化界面

安装及基本使用可参考:https://www.cnblogs.com/dadonggg/p/8205302.html

注意:上文链接中配置kafka集群地址时错误,按照下面的配置即可

# 注释这一行,下面添加一行,代表的是zookeeper的地址
# kafka-manager.zkhosts="localhost:2181"       
kafka-manager.zkhosts="192.168.100.100:2181"

启动后添加集群即可
在这里插入图片描述


3. springboot如何整合kafka

kafka集群可用性、3.1 引入spring boot kafka依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

3.2 配置文件
在application.yml中配置生产端和消费端的属性

server:port: 8080spring:kafka:bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer: # 生产者retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送batch-size: 16384 # 成批发送消息,消息达到16k发一波buffer-memory: 33554432 #消息缓存池 32Macks: 1# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # 消费者group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交# RECORD# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交# TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交# COUNT# TIME | COUNT 有一个条件满足时提交# COUNT_TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交# MANUAL# 手动调用Acknowledgment.acknowledge()后立即提交# MANUAL_IMMEDIATEack-mode: manual_immediate

3.3 发送消息
启动服务并调用 /send 接口发送消息


@RestController
public class KafkaController {//主题private final static String TOPIC_NAME = "my-replicated-topic";// springboot已封装的发消息模板方法 kafkaTemplate@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")/*** 参数分别为* 1. 主题* 2. 分区* 3. 消息的key* 4. 消息的value*/public void send() {kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");}
}

3.4 接收消息

@Component
public class MyConsumer {/*** @KafkaListener(groupId = "testGroup", topicPartitions = {*             @TopicPartition(topic = "topic1", partitions = {"0", "1"}),*             @TopicPartition(topic = "topic2", partitions = "0",*                     partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))*     },concurrency = "6")*  //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数* @param record*/@KafkaListener(topics = "my-replicated-topic",groupId = "zhugeGroup")public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);//手动提交offset/*** 注意如果要使用手动提交offset,需要以下三点* ①:配置文件配置手动提交方式* ②:加上参数Acknowledgment ack* ③:方法中使用ack.acknowledge()执行手动提交*/ack.acknowledge();}/*//配置多个消费组@KafkaListener(topics = "my-replicated-topic",groupId = "tulingGroup")public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);ack.acknowledge();}*/
}


4. 生产者、消费者更多属性配置

生产者:

kafka集群双活,        包括发送消息的持久化机制、失败重试、批量发送消息、消息缓冲区、k-v序列化、发送成功回调(同步和异步) 等参数,这些参数的设置如果要在spring boot中使用,都可以在application.yml中找到相应的属性进行配置,下面代码只是在main方法中做个示例参考,并不是spring boot中的配置!

public class MsgProducer {private final static String TOPIC_NAME = "my-replicated-topic";public static void main(String[] args) throws InterruptedException, ExecutionException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");/*发出消息持久化机制参数(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。(3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。*//*props.put(ProducerConfig.ACKS_CONFIG, "1");*//*发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理*//*props.put(ProducerConfig.RETRIES_CONFIG, 3);//重试间隔设置props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);//设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MBprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);*//*kafka本地线程会从缓冲区取数据,批量发送到broker,设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去*//*props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);*//*默认值是0,意思就是消息必须立即被发送,但这样会影响性能一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长*//*props.put(ProducerConfig.LINGER_MS_CONFIG, 10);*///把发送的key从字符串序列化为字节数组props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//把发送消息value从字符串序列化为字节数组props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<String, String>(props);int msgNum = 5;final CountDownLatch countDownLatch = new CountDownLatch(msgNum);for (int i = 1; i <= msgNum; i++) {Order order = new Order(i, 100 + i, 1, 1000.00);//指定发送分区/*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0, order.getOrderId().toString(), JSON.toJSONString(order));*///未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNumProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));//等待消息发送成功的同步阻塞方法RecordMetadata metadata = producer.send(producerRecord).get();System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());//异步回调方式发送消息/*producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("发送消息失败:" + exception.getStackTrace());}if (metadata != null) {System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());}countDownLatch.countDown();}});*///送积分 TODO}countDownLatch.await(5, TimeUnit.SECONDS);producer.close();}
}

消费者:

        消费者可配置属性包括 是否自动提交offset、消费端心跳检测、新建消费组的消费方式、消费者处理能力太慢自动踢出、指定offset开始消费、指定时间点开始消费 等属性配置,如下:

public class MsgConsumer {private final static String TOPIC_NAME = "my-replicated-topic";private final static String CONSUMER_GROUP_NAME = "testGroup";public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// 消费分组名props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);// 是否自动提交offset,默认就是trueprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 自动提交offset的间隔时间props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");/*当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费latest(默认) :只消费自己启动之后发送到主题的消息earliest:第一次新建消费组时从头开始消费,因为新建消费组时默认只消费borker之后的新增消息以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)*///props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");/*consumer给broker发送心跳的间隔时间,broker接收到心跳如果此时有rebalance发生会通过心跳响应将rebalance方案下发给consumer,这个时间可以稍微短一点*/props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);/*服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组,对应的Partition也会被重新分配给其他consumer,默认是10秒*/props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);/*如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费*/props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(TOPIC_NAME));// 消费指定分区//consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));//消息回溯消费,就是每次启动都从头开始消费/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*///指定offset消费/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*///从指定时间点开始消费/*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);//从1小时前开始消费long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;Map<TopicPartition, Long> map = new HashMap<>();for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);}Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if (key == null || value == null) continue;Long offset = value.offset();System.out.println("partition-" + key.partition() + "|offset-" + offset);System.out.println();//根据消费里的timestamp确定offsetif (value != null) {consumer.assign(Arrays.asList(key));consumer.seek(key, offset);}}*/while (true) {/** poll() API 是拉取消息的长轮询*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());}/*if (records.count() > 0) {// 手动同步提交offset,当前线程会阻塞直到offset提交成功// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了consumer.commitSync();// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for " + offsets);System.err.println("Commit failed exception: " + exception.getStackTrace());}}});}*/}}
}

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/4/59732.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息