有不少读者反馈,参考上篇文章《Hive 终于等来了 Flink》部署 Flink 并集成 Hive 时,出现一些 bug 以及兼容性等问题。虽已等来,却未可用。所以笔者增加了这一篇文章,作为姊妹篇。
在上篇文章中,笔者使用的 CDH 版本为 5.16.2,其中 Hive 版本为 1.1.0(CDH 5.x 系列 Hive 版本都不高于 1.1.0,是不是不可理解),Flink 源代码本身对 Hive 1.1.0 版本兼容性不好,存在不少问题。为了兼容目前版本,笔者基于 CDH 5.16.2 环境,对 Flink 代码进行了修改,重新打包并部署。
其实经过很多开源项目的实战,比如 Apache Atlas,Apache Spark 等,Hive 1.2.x 和 Hive 1.1.x 在大部分情况下,替换一些 Jar 包,是可以解决兼容性的问题。对于笔者的环境来说,可以使用 Hive 1.2.1 版本的一些 Jar 包来代替 Hive 1.1.0 版本的 Jar 包。在本篇文章的开始部分,笔者会解决这个问题,然后再补充上篇文章缺少的实战内容。
根据读者的反馈,笔者将所有的问题总结为三类:
有的读者不太清楚,如何配置 Flink 连接 Hive 的 Catalog,这里补充一个完整的 conf/sql-client-hive.yaml 示例:
catalogs:- name: staginghive type: hive hive-conf-dir: /etc/hive/conf hive-version: 1.2.1execution: planner: blink type: batch time-characteristic: event-time periodic-watermarks-interval: 200 result-mode: table max-table-result-rows: 1000000 parallelism: 1 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 current-catalog: staginghive current-database: ssb restart-strategy: type: fallbackdeployment: response-timeout: 5000 gateway-address: "" gateway-port: 0 m: yarn-cluster yn: 2 ys: 5 yjm: 1024 ytm: 2048
cdh spark _sql?sql-client-hive.yaml 配置文件里面包含:
类似 spark-sql 命令,Flink 提供了 SQL CLI 工具,即 sql-client.sh 脚本。在 Flink 1.10 版本中,Flink SQL CLI 改进了很多功能,笔者后面讲解。
sql-client.sh 使用方式如下:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
笔者在上篇文章中提到过,在部署 Flink 的环境上部署 CDH gateway,包括 Hadoop、Hive 客户端,另外还需要配置一些环境变量,如下:
export HADOOP_CONF_DIR=/etc/hadoop/confexport YARN_CONF_DIR=/etc/hadoop/confexport HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hiveexport HIVE_CONF_DIR=/etc/hive/conf
先查看一下 Flink 家目录下的 lib 目录:
$ tree liblib├── flink-connector-hive_2.11-1.10.0.jar├── flink-dist_2.11-1.10.0.jar├── flink-hadoop-compatibility_2.11-1.10.0.jar├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar├── flink-table_2.11-1.10.0.jar├── flink-table-blink_2.11-1.10.0.jar├── hive-exec-1.1.0-cdh5.16.2.jar├── hive-metastore-1.1.0-cdh5.16.2.jar├── libfb303-0.9.3.jar├── log4j-1.2.17.jar└── slf4j-log4j12-1.7.15.jar
如果上面前两个问题都解决后,执行如下命令:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
cloudera提供哪几种安装cdh的方法、报错,报错,还是报错:
Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory
其实在运行 sql-client.sh 脚本前,需要指定 Hadoop 环境的依赖包的路径,建议不要报错一个添加一个,除非有的读者喜欢。这里笔者提示一个方便的方式,即设置 HADOOPCLASSPATH(可以添加到 ~/.bashprofile 中)环境变量:
export HADOOP_CLASSPATH=`hadoop classpath`
再次执行:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
很抱歉,继续报错:
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client
这里就是 Hive 1.1.0 版本的 Jar 包与 Flink 出现版本不兼容性的问题了,解决方法是:
$ tree liblib├── flink-connector-hive_2.11-1.10.0.jar├── flink-dist_2.11-1.10.0.jar├── flink-hadoop-compatibility_2.11-1.10.0.jar├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar├── flink-table_2.11-1.10.0.jar├── flink-table-blink_2.11-1.10.0.jar├── hive-exec-1.2.1.jar├── hive-metastore-1.2.1.jar├── libfb303-0.9.2.jar├── log4j-1.2.17.jar└── slf4j-log4j12-1.7.15.jar
CDH下线HDFS、最后再执行:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
这时,读者就可以看到手握栗子的可爱小松鼠了。
在 Flink 1.10 版本(目前为 RC1 阶段) 中,Flink 社区对 SQL CLI 做了大量的改动,比如支持 View、支持更多的数据类型和 DDL 语句、支持分区读写、支持 INSERT OVERWRITE 等,实现了更多的 TableEnvironment API 的功能,更加方便用户使用。
接下来,笔者详细讲解 Flink SQL CLI。
执行下面命令,登录 Flink SQL 客户端:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yamlFlink SQL>
执行 HELP,查看 Flink SQL 支持的命令,如下为大部分常用的:
cdh部署?为了方便读者进行实验,笔者使用 ssb-dbgen 生成测试数据,读者也可以使用测试环境已有的数据来进行实验。
具体如何在 Hive 中一键式创建表并插入数据,可以参考笔者早期的项目 https://github.com/MLikeWater/ssb-kylin。
查看上个步骤中创建的 Hive 表:
0: jdbc:hive2://xx.xxx.xxx.xxx:10000> show tables;+--------------+--+| tab_name |+--------------+--+| customer || dates || lineorder || p_lineorder || part || supplier |+--------------+--+
读者可以对 Hive 进行各种查询,对比后面 Flink SQL 查询的结果。
登录 Flink SQL CLI,并查询 catalogs:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yamlFlink SQL> show catalogs;default_catalogstaginghiveFlink SQL> use catalog staginghive;
通过 show catalogs 获取配置的所有 catalog。由于笔者在 sql-client-hive.yaml 文件中设置了默认的 catalog,即为 staginghive。如果需要切换到其他 catalog,可以使用 usecatalog xxx。
spark和hive的整合,通过 Flink SQL 查询 Hive 数据库和表:
# 查询数据库Flink SQL> show databases;...ssbtmp...Flink SQL> use ssb;# 查询表Flink SQL> show tables;customerdateslineorderp_lineorderpartsupplier# 查询表结构Flink SQL> DESCRIBE customer;root|-- c_custkey: INT|-- c_name: STRING|-- c_address: STRING|-- c_city: STRING|-- c_nation: STRING|-- c_region: STRING|-- c_phone: STRING|-- c_mktsegment: STRING
这里需要注意,Hive 的元数据在 Flink catalog 中都以小写字母使用。
接下来,在 Flink SQL CLI 中查询一些 SQL 语句,完整 SQL 参考 https://github.com/MLikeWater/ssb-kylin 的 README。
目前 Flink SQL 解析 Hive 视图元数据时,会遇到一些 Bug,比如执行 Q1.1 SQL:
Flink SQL> select sum(v_revenue) as revenue> from p_lineorder> left join dates on lo_orderdate = d_datekey> where d_year = 1993> and lo_discount between 1 and 3> and lo_quantity < 25;[ERROR] Could not execute SQL statement. Reason:org.apache.calcite.sql.validate.SqlValidatorException: Tabeorder' not found; did you mean 'LINEORDER'?
Flink SQL 找不到视图中的实体表。
p_lineorder 表是 Hive 中的一张视图,创建表的语句如下:
CREATE VIEW P_LINEORDER ASSELECT LO_ORDERKEY,LO_LINENUMBER,LO_CUSTKEY,LO_PARTKEY,LO_SUPPKEY,LO_ORDERDATE,LO_ORDERPRIOTITY,LO_SHIPPRIOTITY,LO_QUANTITY,LO_EXTENDEDPRICE,LO_ORDTOTALPRICE,LO_DISCOUNT,LO_REVENUE,LO_SUPPLYCOST,LO_TAX,LO_COMMITDATE,LO_SHIPMODE,LO_EXTENDEDPRICE*LO_DISCOUNT AS V_REVENUEFROM ssb.LINEORDER;
spark hive。但是对于 Hive 中视图的定义,Flink SQL 并没有很好地处理元数据。为了后面 SQL 的顺利执行,这里笔者在 Hive 中删除并重建该视图:
0: jdbc:hive2://xx.xxx.xxx.xxx:10000> create view p_lineorder asselect lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriotity,lo_shippriotity,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_extendedprice*lo_discount as v_revenuefrom ssb.lineorder;
然后继续在 Flink SQL CLI 中查询 Q1.1 SQL:
Flink SQL> select sum(v_revenue) as revenue> from p_lineorder> left join dates on lo_orderdate = d_datekey> where d_year = 1993> and lo_discount between 1 and 3> and lo_quantity < 25;revenue894280292647
继续查询 Q2.1 SQL:
Flink SQL> select sum(lo_revenue) as lo_revenue, d_year, p_brand> from p_lineorder> left join dates on lo_orderdate = d_datekey> left join part on lo_partkey = p_partkey> left join supplier on lo_suppkey = s_suppkey> where p_category = 'MFGR#12' and s_region = 'AMERICA'> group by d_year, p_brand> order by d_year, p_brand;lo_revenue d_year p_brand819634128 1998 MFGR#1206877651232 1998 MFGR#1207754489428 1998 MFGR#1208816369488 1998 MFGR#1209668482306 1998 MFGR#1210660366608 1998 MFGR#1211862902570 1998 MFGR#1212...
最后再查询一个 Q4.3 SQL:
Flink SQL> select d_year, s_city, p_brand, sum(lo_revenue) - sum(lo_supplycost) as profit> from p_lineorder> left join dates on lo_orderdate = d_datekey> left join customer on lo_custkey = c_custkey> left join supplier on lo_suppkey = s_suppkey> left join part on lo_partkey = p_partkey> where c_region = 'AMERICA'and s_nation = 'UNITED STATES'> and (d_year = 1997 or d_year = 1998)> and p_category = 'MFGR#14'> group by d_year, s_city, p_brand> order by d_year, s_city, p_brand;d_year s_city p_brand profit1998 UNITED ST9 MFGR#1440 6665681
如果读者感兴趣的话,可以查询剩余的 SQL,当然也可以和 Spark SQL 进行比较。另外 Flink SQL 也支持 EXPLAIN,查询 SQL 的执行计划。
同样,可以在 Flink SQL CLI 中创建和删除视图,如下:
Flink SQL> create view p_lineorder2 as> select lo_orderkey,> lo_linenumber,> lo_custkey,> lo_partkey,> lo_suppkey,> lo_orderdate,> lo_orderpriotity,> lo_shippriotity,> lo_quantity,> lo_extendedprice,> lo_ordtotalprice,> lo_discount,> lo_revenue,> lo_supplycost,> lo_tax,> lo_commitdate,> lo_shipmode,> lo_extendedprice * lo_discount as v_revenue> from ssb.lineorder;[INFO] View has been created.
spark使用。这里笔者需要特别强调的是,目前 Flink 无法删除 Hive 中的视图:
Flink SQL> drop view p_lineorder;[ERROR] Could not execute SQL statement. Reason:The given view does not exist in the current CLI session. Only views created with a CREATE VIEW statement can be accessed.
Hive 数据库中创建一张分区表:
CREATE TABLE IF NOT EXISTS flink_partition_test ( id int, name string) PARTITIONED BY (day string, type string)stored as textfile;
接着,通过 Flink SQL 插入和查询数据:
# 插入静态分区的数据Flink SQL> INSERT INTO flink_partition_test PARTITION (type='Flink', `day`='2020-02-01') SELECT 100001, 'Flink001';# 查询Flink SQL> select * from flink_partition_test;id name day type100001 Flink001 2020-02-01 Flink# 插入动态分区Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL';# 查询Flink SQL> select * from flink_partition_test;id name day type100002 Spark 2020-02-02 SparkSQL100001 FlinkSQL 2020-02-01 Flink# 动态和静态分区结合使用类似,不再演示# 覆盖插入数据Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION (type='Flink') SELECT 100002, 'Spark', '2020-02-08', 'SparkSQL-2.4';id name day type100002 Spark 2020-02-02 SparkSQL100001 FlinkSQL 2020-02-01 Flink
字段 day 在 Flink 属于关键字,要特殊处理。
Flink SQL 支持内置的函数和自定义函数。对于内置的函数,可以执行 show functions 进行查看,这一块笔者以后会单独介绍如何创建自定义函数。
Flink SQL 支持设置环境参数,可以使用 set 命令查看和设置参数:
Flink SQL> set;deployment.gateway-address=deployment.gateway-port=0deployment.m=yarn-clusterdeployment.response-timeout=5000deployment.yjm=1024deployment.yn=2deployment.ys=5deployment.ytm=2048execution.current-catalog=staginghiveexecution.current-database=ssbexecution.max-idle-state-retention=0execution.max-parallelism=128execution.max-table-result-rows=1000000execution.min-idle-state-retention=0execution.parallelism=1execution.periodic-watermarks-interval=200execution.planner=blinkexecution.restart-strategy.type=fallbackexecution.result-mode=tableexecution.time-characteristic=event-timeexecution.type=batchFlink SQL> set deployment.yjm = 2048;
cdh集成flink,在本文中,笔者通过 Flink SQL 比较详细地去操作 Hive 数据库,以及 Flink SQL 提供的一些功能。
当然,目前 Flink SQL 操作 Hive 数据库还是存在一些问题:
Flink 社区发展很快,所有这些问题只是暂时的,随着新版本的发布会被逐个解决。
如果 Flink SQL 目前不满足的需求,建议使用 API 方式来解决问题。
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态