irpas技术客

flink从kafka读取数据并传到mysql数据库_雨落★蝶舞_flink kafka mysql

irpas 4717

记一次flink踩坑教训

上代码

public class FlinkKafkaConsumer1 { public static void main(String[] args) throws Exception{ //1.获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.创建消费者 Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"需要连接的hadoopIP:9092"); // properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first", new SimpleStringSchema(), properties); //3.消费者与对应的flink流关联 DataStreamSource<String> dataStream = env.addSource(kafkaConsumer); SingleOutputStreamOperator<Tuple2<String, Integer>> dataSource = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] file = value.split(","); for ( String word : file ) { out.collect(new Tuple2<>(word, 1)); } } }) .keyBy(0) .sum(1); //连接数据库 dataSource.addSink(new MyJDBCSink()); dataSource.print(); //4.执行操作 env.execute(); } private static class MyJDBCSink extends RichSinkFunction<Tuple2<String,Integer>> { //定义sql连接、预编译器 Connection conn = null; PreparedStatement insertStmt = null; PreparedStatement updateStmt = null; //初始化 、 创建连接 、 和 预编译语句 @Override public void open(Configuration parameters) throws Exception { conn = DriverManager.getConnection("jdbc:mysql://IP:端口号/数据库名","数据库的账号","数据库的密码"); insertStmt = conn.prepareStatement("INSERT INTO test_yuluo (word, number) VALUES (?, ?)"); updateStmt = conn.prepareStatement("UPDATE test_yuluo SET word = ? WHERE number = ? "); } // 执行更新语句,注意不要留 super @Override public void invoke(Tuple2<String, Integer> value, Context context) throws Exception { updateStmt.setString(1, value.f0); updateStmt.setInt(2,value.f1); updateStmt.execute(); // 如果刚才 update 语句没有更新,那么插入 if ( updateStmt.getUpdateCount() == 0 ) { insertStmt.setString(1, value.f0); insertStmt.setInt(2,value.f1); insertStmt.execute(); } } @Override public void close() throws Exception { insertStmt.close(); updateStmt.close(); conn.close(); } } }

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,“需要连接的hadoopIP:9092”); conn = DriverManager.getConnection(“jdbc:mysql://IP:端口号/数据库名”,“数据库的账号”,“数据库的密码”); 注意:如上位置应该替换成自己项目的 我是连接的服务器,项目在本地 ,需要在服务器中先在kafka的bin目录中启动, ./kafka-console-producer.sh --broker-list 需要连接的hadoopIP:9092 --topic 项目中kafka的topic 然后启动本地的项目就报错了,错误显示如下 java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset 然后我看了本地的hadoop版本,下载了一个hadoop包在本地,配置了环境变量和path,重新启动idea,重新启动项目,还是报这个错误,(本人电脑是Windows10 专业版),抱着试一试的态度,重启电脑,重启服务和项目,嗯 这个错误消失了,就感觉很离谱,报了另一个错误,如下

原因是下载的Hadoop中没有winutils.exe 文件,又找了这个文件放在hadoop的bin目录下,重新启动,嗯,项目已经跑通了 虽然是个小的demo,但是一个很小的bug 也导致自己耽搁了不少时间,仅此记录一下. 我把我用的版本的压缩包上传到网盘了,需要的自己取吧,里面的bin目录是3.1.0版本的,自己本地配置的就是这一套,省的你们自己在找了,直接下载解压配置环境变量就可以 环境变量配置就不说了,不会的就自己百度,环境变量把配置完之后记得把bin目录底下的 这两个文件扔到 C:\Windows\System32 目录底下即可,希望大家都可以少走弯路少踩坑.

hadoop3.1.3压缩包


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #Kafka #MySQL