irpas技术客

实践数据湖iceberg 第八课 hive与iceberg集成_*星星之火*_hive iceberg

irpas 6678

系列文章目录

实践数据湖iceberg 第一课 入门 实践数据湖iceberg 第二课 iceberg基于hadoop的底层数据格式 实践数据湖iceberg 第三课 在sqlclient中,以sql方式从kafka读数据到iceberg 实践数据湖iceberg 第四课 在sqlclient中,以sql方式从kafka读数据到iceberg(升级版本到flink1.12.7) 实践数据湖iceberg 第五课 hive catalog特点 实践数据湖iceberg 第六课 从kafka写入到iceberg失败问题 解决 实践数据湖iceberg 第七课 实时写入到iceberg 实践数据湖iceberg 第八课 hive与iceberg集成 实践数据湖iceberg 第九课 合并小文件 实践数据湖iceberg 第十课 快照删除 实践数据湖iceberg 第十一课 测试分区表完整流程(造数、建表、合并、删快照) 实践数据湖iceberg 第十二课 catalog是什么 实践数据湖iceberg 第十三课 metadata比数据文件大很多倍的问题 实践数据湖iceberg 第十四课 元数据合并(解决元数据随时间增加而元数据膨胀的问题) 实践数据湖iceberg 第十五课 spark安装与集成iceberg(jersey包冲突) 实践数据湖iceberg 第十六课 通过spark3打开iceberg的认知之门 实践数据湖iceberg 第十七课 hadoop2.7,spark3 on yarn运行iceberg配置 实践数据湖iceberg 第十八课 多种客户端与iceberg交互启动命令(常用命令) 实践数据湖iceberg 第十九课 flink count iceberg,无结果问题 实践数据湖iceberg 第二十课 flink + iceberg CDC场景(版本问题,测试失败) 实践数据湖iceberg 第二十一课 flink1.13.5 + iceberg0.131 CDC(测试成功INSERT,变更操作失败) 实践数据湖iceberg 第二十二课 flink1.13.5 + iceberg0.131 CDC(CRUD测试成功) 实践数据湖iceberg 第二十三课 flink-sql从checkpoint重启 实践数据湖iceberg 第二十四课 iceberg元数据详细解析 实践数据湖iceberg 第二十五课 后台运行flink sql 增删改的效果 实践数据湖iceberg 第二十六课 checkpoint设置方法 实践数据湖iceberg 第二十七课 flink cdc 测试程序故障重启:能从上次checkpoint点继续工作 实践数据湖iceberg 第二十八课 把公有仓库上不存在的包部署到本地仓库 实践数据湖iceberg 第二十九课 如何优雅高效获取flink的jobId 实践数据湖iceberg 第三十课 mysql->iceberg,不同客户端有时区问题 实践数据湖iceberg 更多的内容目录


提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录 系列文章目录前言1. 准备iceberg与hive结合的包2. hive中测试iceberg2.1. 在hive中建表2.2 直接读取flink-sql生成的iceberg表,报错2.3 为什么2.1直接在hive上建立的icerberg能直接查?从flinksql创建的icerberg报错?2.4 解决 总结


前言 本节学习目标: 测试从hive入口,进行iceberg格式的表CRUD

提示:以下是本篇文章正文内容,下面案例可供参考

1. 准备iceberg与hive结合的包

把 iceberg-hive-runtime-0.12.1.jar 放到hive的lib下。 下载 iceberg-hive-runtime-0.12.1.jar 包,放到HIVE_HOME/lib下。 下载地址 link.

2. hive中测试iceberg 2.1. 在hive中建表

代码如下(示例):

创建表脚本:

CREATE TABLE table_a ( id bigint, name string ) PARTITIONED BY ( dept string ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

运行看看

hive (iceberg_db)> CREATE TABLE table_a ( > id bigint, name string > ) PARTITIONED BY ( > dept string > ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'; OK Time taken: 0.253 seconds hive (iceberg_db)> insert into table_a values(1,'dev'); FAILED: SemanticException [Error 10044]: Line 1:12 Cannot insert into target table because column number/types are different 'table_a': Table insclause-0 has 3 columns, but query has 2 columns. hive (iceberg_db)> insert into table_a values(1,'tom','dev'); WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. Query ID = root_20220120165250_f1a960bc-af68-4b80-b722-211d5fb282ed Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator Starting Job = job_1642579431487_0015, Tracking URL = http://hadoop101:8088/proxy/application_1642579431487_0015/ Kill Command = /opt/module/hadoop/bin/hadoop job -kill job_1642579431487_0015 Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 0 2022-01-20 16:52:56,533 Stage-3 map = 0%, reduce = 0% 2022-01-20 16:53:02,790 Stage-3 map = 100%, reduce = 0%, Cumulative CPU 4.22 sec MapReduce Total cumulative CPU time: 4 seconds 220 msec Ended Job = job_1642579431487_0015 MapReduce Jobs Launched: Stage-Stage-3: Map: 1 Cumulative CPU: 4.22 sec HDFS Read: 114527 HDFS Write: 3531 SUCCESS Total MapReduce CPU Time Spent: 4 seconds 220 msec OK _col0 _col1 _col2 Time taken: 13.967 seconds hive (iceberg_db)> select * from table_a; OK table_a.id table_a.name table_a.dept 1 tom dev Time taken: 0.143 seconds, Fetched: 1 row(s) 2.2 直接读取flink-sql生成的iceberg表,报错

hive中读由flink-sql 生成的iceberg表,报错

hive (iceberg_db6)> select count(*) from behavior_log_ib6; WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. Query ID = root_20220120170353_9d501412-4843-4c1d-9b53-d4c31b91daa5 Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> java.io.IOException: java.util.concurrent.ExecutionException: java.io.IOException: Cannot create an instance of InputFormat class org.apache.hadoop.mapred.FileInputFormat as specified in mapredWork! at org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getSplits(CombineHiveInputFormat.java:519) at org.apache.hadoop.mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:328) at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:320) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287) at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:575) at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:570) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:570) at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:561) at org.apache.hadoop.hive.ql.exec.mr.ExecDriver.execute(ExecDriver.java:414) at org.apache.hadoop.hive.ql.exec.mr.MapRedTask.execute(MapRedTask.java:151) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:199) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:100) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:2183) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1839) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1526) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1237) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1227) at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233) at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:184) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:403) at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:821) at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:759) at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:686) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.util.RunJar.run(RunJar.java:221) at org.apache.hadoop.util.RunJar.main(RunJar.java:136) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Cannot create an instance of InputFormat class org.apache.hadoop.mapred.FileInputFormat as specified in mapredWork! at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getNonCombinablePathIndices(CombineHiveInputFormat.java:476) at org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getSplits(CombineHiveInputFormat.java:508) ... 37 more Caused by: java.io.IOException: Cannot create an instance of InputFormat class org.apache.hadoop.mapred.FileInputFormat as specified in mapredWork! at org.apache.hadoop.hive.ql.io.HiveInputFormat.getInputFormatFromCache(HiveInputFormat.java:330) at org.apache.hadoop.hive.ql.io.CombineHiveInputFormat$CheckNonCombinablePathCallable.call(CombineHiveInputFormat.java:107) at org.apache.hadoop.hive.ql.io.CombineHiveInputFormat$CheckNonCombinablePathCallable.call(CombineHiveInputFormat.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: java.lang.InstantiationException at org.apache.hive.common.util.ReflectionUtil.newInstance(ReflectionUtil.java:85) at org.apache.hadoop.hive.ql.io.HiveInputFormat.getInputFormatFromCache(HiveInputFormat.java:322) ... 6 more Caused by: java.lang.InstantiationException at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hive.common.util.ReflectionUtil.newInstance(ReflectionUtil.java:83) ... 7 more Job Submission failed with exception 'java.io.IOException(java.util.concurrent.ExecutionException: java.io.IOException: Cannot create an instance of InputFormat class org.apache.hadoop.mapred.FileInputFormat as specified in mapredWork!)' FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. java.util.concurrent.ExecutionException: java.io.IOException: Cannot create an instance of InputFormat class org.apache.hadoop.mapred.FileInputFormat as specified in mapredWork!

灵魂拷问: 为什么2.1直接在hive上建立的icerberg能直接查?从flinksql创建的icerberg报错?

2.3 为什么2.1直接在hive上建立的icerberg能直接查?从flinksql创建的icerberg报错?

尝试了把/opt/software/iceberg-flink-runtime-0.12.1.jar 放到hive/lib 下 没有用 头疼,问题先挂起来

2.4 解决 hive (default)> add jar /opt/module/hive/lib/iceberg-hive-runtime-0.13.0.jar ; Added [/opt/module/hive/lib/iceberg-hive-runtime-0.13.0.jar] to class path hive (iceberg_db)> select * from table_a; OK table_a.id table_a.name table_a.dept 1 tom dev Time taken: 0.669 seconds, Fetched: 1 row(s)

count:

hive (iceberg_db)> select count(*) from table_a; WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. Query ID = root_20220424103028_0f95ac5b-74a5-4201-a418-438b20942e9e Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1647389655116_0048, Tracking URL = http://hadoop101:8088/proxy/application_1647389655116_0048/ Kill Command = /opt/module/hadoop/bin/hadoop job -kill job_1647389655116_0048 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2022-04-24 10:30:34,390 Stage-1 map = 0%, reduce = 0% 2022-04-24 10:30:40,571 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 3.13 sec 2022-04-24 10:30:45,718 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 4.67 sec MapReduce Total cumulative CPU time: 4 seconds 670 msec Ended Job = job_1647389655116_0048 MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 4.67 sec HDFS Read: 201410 HDFS Write: 101 SUCCESS Total MapReduce CPU Time Spent: 4 seconds 670 msec OK _c0 1 Time taken: 17.836 seconds, Fetched: 1 row(s)

table_a结构:

hive (iceberg_db)> show create table table_a; OK createtab_stmt CREATE TABLE `table_a`( `id` bigint COMMENT 'from deserializer', `name` string COMMENT 'from deserializer', `dept` string COMMENT 'from deserializer') ROW FORMAT SERDE 'org.apache.iceberg.mr.hive.HiveIcebergSerDe' STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://ns/user/hive/warehouse/iceberg_db.db/table_a'
总结

心情很糟。。。

待续。。。

搞定!(之前卡住,还有别的原因,非iceberg表也count不来,经常抛oom,但就是查几条数据,set mapreduce.task.io.sort.mb=10把内存调小就OK, 测试环境机器内存小)


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #hive #iceberg #第一课实践数据湖iceberg #第二课 #第三课