apache druid 与kafka整合使用

 2023-09-15 阅读 13 评论 0

摘要:前言 在上一篇,我们了解了apache druid的搭建,以及如何快速导入外部数据源到apache druid中进行数据分析和使用 本篇,我们结合一个实际的简单的应用场景,来说说apache druid如何在实际项目中进行使用 业务场景 如下所示,是一个很常见的数据

前言

在上一篇,我们了解了apache druid的搭建,以及如何快速导入外部数据源到apache druid中进行数据分析和使用

本篇,我们结合一个实际的简单的应用场景,来说说apache druid如何在实际项目中进行使用

业务场景

如下所示,是一个很常见的数据分析的业务,通常来说,很多实时或准实时的数据(这里理解为外部数据源)需要通过kafka进行中转,即发送到kafka中,

kafka 版本、apache druid提供了导入外部数据源的功能,可以接收来自kafka指定topic的数据,然后支持数据分析,将kafka的数据导入到apache druid之后,再通过程序(后台应用)进行数据读取,根据实际的业务需求读取从kafka中摄取的数据进行逻辑处理

最后,应用程序将数据处理之后,进行写库,或者作为大屏展示的数据进行输出
在这里插入图片描述

以此为基础,可以将这个过程应用到很多与之相关的场景中,比如源数据是来自大数据引擎的处理结果,或者是python程序爬虫得到的结果…

下面我们来对这个过程从操作到代码实现做一个完整的演示

前置准备

  • docker或者linux环境下搭建的zookeeper和kafka,并提前创建一个topic
  • 启动apache druid服务

kafka实战,做一下kafaka的数据测试,验证topic可以正常的收发消息
在这里插入图片描述
在这里插入图片描述

1、apache druid 控制台连接kafka

loada data 选择kafka
在这里插入图片描述
填写kafka的连接信息即可
在这里插入图片描述
然后一路next等待解析,解析完毕,通过顶部的query查看左侧是否出现下面的这个自定义的库名
在这里插入图片描述
上面的意思是,将kafka中某个topic中的数据解析到apache druid的库中,然后就可以通过apache druid对导入的数据进行管理和分析了

我们不妨使用sql查询一下,可以看到刚刚我们做测试的数据都展示出来了
在这里插入图片描述

kafka部署?2、编写程序,定时向kafka推送消息

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;public class KfkaTest {public static void main(String[] args) {AtomicLong atomicLong = new AtomicLong(1);Runnable runnable = new Runnable() {public void run() {//定时向kafka推送消息long l = atomicLong.incrementAndGet();pushMessage(l);}};ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();// 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间service.scheduleAtFixedRate(runnable, 10, 1, TimeUnit.SECONDS);}public static void pushMessage(long num) {Properties properties = new Properties();properties.put("bootstrap.servers", "IP:9092");properties.put("acks", "all");properties.put("retries", "3");properties.put("batch.size", "16384");properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);//key和value的序列化properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//构造生产者对象KafkaProducer<String, String> producer = new KafkaProducer<>(properties);ObjectMapper objectMapper = new ObjectMapper();Map<String, Object> map = new HashMap<>();map.put("name", "gaoliang:" + num);map.put("age", 19);map.put("city", "深圳");String val = null;try {val = objectMapper.writeValueAsString(map);System.out.println(val);} catch (JsonProcessingException e) {e.printStackTrace();}producer.send(new ProducerRecord<>("study1", "congge ", val));//关闭连接资源producer.close();}}

3、通过程序读取apache druid 的数据

关于这一点,方式就很灵活了,是将读取到的数据做何种处理呢?那就要看业务的具体需求了,比如可以直接通过接口将读取到的最新数据返回给页面做展示呢?还是将数据进行逻辑处理之后入库呢?还是交给其他的服务进一步使用呢?通常来说,进行读取之后,写库和展示的应用场景比较多

下面来演示下,如何在程序中读取apache druid的数据,想必这个是大家关心的

apache kafka实战。直接在pom文件中添加如下依赖

		<dependency><groupId>org.apache.calcite.avatica</groupId><artifactId>avatica-core</artifactId><version>1.15.0</version></dependency>

apache druid官方提供了jdbc的方式对数据进行查询的连接方式,下面直接上代码了

import org.apache.calcite.avatica.AvaticaConnection;
import org.apache.calcite.avatica.AvaticaStatement;import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;public class DruidTest {private static final String DRUID_URL = "jdbc:avatica:remote:url=http://IP:8888/druid/v2/sql/avatica/";private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<>();/*** 打开连接* @param* @return* @throws SQLException*/public static AvaticaConnection connection() throws SQLException {Properties properties = new Properties();AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(DRUID_URL, properties);threadLocal.set(connection);return connection;}/*** 关闭连接* @throws SQLException*/public static void closeConnection() throws SQLException{System.out.println("关闭线程:"+threadLocal.get());AvaticaConnection conn = threadLocal.get();if(conn != null){conn.close();threadLocal.remove();}}/*** 根据sql查询结果* @param* @param sql* @return* @throws SQLException*/public static ResultSet executeQuery (String sql) throws SQLException{AvaticaStatement statement = connection().createStatement();ResultSet resultSet = statement.executeQuery(sql);return resultSet;}public static void main(String[] args) {try {String sql = "SELECT * FROM \"study1\" limit 10";for (int i = 0; i < 5; i++) {ResultSet resultSet = executeQuery(sql);System.out.println("开始连接"+i + ";   连接线程:"+threadLocal.get());while(resultSet.next()){String name = resultSet.getString("name");System.out.println(name + "   ;   "+ name);}closeConnection();}} catch (SQLException throwables) {throwables.printStackTrace();}}}

在这里插入图片描述

这时候不妨往kafka的study1的topic中再推一条消息
在这里插入图片描述
界面上查询,可以看到,数据已经过来了
在这里插入图片描述
再次运行程序,也能成功读取到
在这里插入图片描述

kafka 教程,以上,通过程序结合控制台讲述了如何利用java程序连接kafka和apache druid的一种简单的业务场景,本篇的处理较为简单,未涉及到具体的功能层面的整合,主要是为使用apache druid进行进一步的深入使用做一个铺垫,希望对看到的同学有用!

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://hbdhgg.com/2/62778.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 匯編語言學習筆記 Inc. 保留所有权利。

底部版权信息