irpas技术客

Flink 1.12 Could not find any factory for identifier ‘kafka‘ that implements ‘or

网络投稿 7148

java配置flinksql表连接kafka。

例如:

tableEnv.executeSql("CREATE TABLE invalidCtp (\n" + " sys_name STRING,\n" + " broker_id STRING,\n" + " investor_id STRING\n," + " row_rank BIGINT" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'invalidCtpDetail',\n" + " 'properties.bootstrap.servers' = '47.104.234.54:9092',\n" + // " 'connector.startup-mode' = 'latest-offset',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + // " 'kafka.auto.offset.reset' = 'latest',\n" + " 'format' = 'json'\n" + ")");

本地可运行,服务器报错:Flink 1.12 Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath

解决办法:

pom.xml文件中加入依赖(也可去如下网站下载对应版本)

https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka_2.11/1.12.1

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> ????????????<!--<scope>provided</scope>--> </dependency>

然后把该依赖包(flink-sql-connector-kafka_${scala.binary.version})放到flink安装目录的lib目录下:

加入该依赖包后,可能会导致如下报错:flink提交任务报错:java.lang.ClassCastException LinkedMap cannot be cast to LinkedMap exceptions

解决办法:

在conf/flink-conf.yaml添加如下内容并重启flink:

classloader.resolve-order: parent-first


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #112 #could #not #Find #any #factory #for