irpas技术客

FlinkCDC的2.2.0版本怎么监控库中的所有表,增加新表到已有任务?_薛定谔的猫不吃猫粮

网络投稿 2117

FlinkCDC的2.2.0版本怎么监控库中的所有表,增加新表到已有任务? 一、监控全表

? 千呼万唤始出来,之前预告FlinkCDC的2.2.0支持Flink1.14和添加新表,满怀希望!今天一看略显失望,添加新表,不支持动态添加,需要修改tableList之后,从ck中重启,倒是不用重新写新代码了,但是不满足我们目前的需求,失望之一。

二是,api改得有点随意了。

2.0版本监控全表,tableList不设置就行了

DebeziumSourceFunction<String> mySQLSource = MySqlSource.<String>builder() .hostname(parameterTool.get("source1.mysql.jdbc.host")) .port(parameterTool.getInt("source1.mysql.jdbc.port")) .username(parameterTool.get("source1.mysql.jdbc.username")) .password(parameterTool.get("source1.mysql.jdbc.password")) .databaseList(parameterTool.get("source1.mysql.jdbc.database")) //可选配置,如果不指定该参数,则会读取上一个配置下的所有表数据 //指定的时候需要使用db.table的方式明确指定 //.tableList("reported2.epidemic_report_entty") .startupOptions(StartupOptions.latest()) .deserializer(new MyDeseriallizationFun()) .build();

到了2.2版本源码中MySqlSourceConfig类对tableList做了校验,不能为null

this.tableList = checkNotNull(tableList);

对于如何监控所有表,文档中也没有说明。

通过尝试发现,传空串是监控全表,这你。。。行吧,api改得有点随意了,也不考虑版本兼容。。。

正确得写法是

MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname(parameterTool.get("source1.mysql.jdbc.host")) .port(parameterTool.getInt("source1.mysql.jdbc.port")) .databaseList(parameterTool.get("source1.mysql.jdbc.database")) // .scanNewlyAddedTableEnabled(true) .connectTimeout(Duration.ofSeconds(60)) //必选配置,空串的时候为监控全表 //指定的时候需要使用db.table的方式明确指定 // .tableList("") .tableList("") .username(parameterTool.get("source1.mysql.jdbc.username")) .password(parameterTool.get("source1.mysql.jdbc.password")) .startupOptions(StartupOptions.latest()) .deserializer(new MyDeseriallizationFun()) .build(); 二、增加新表到已有的任务

另外想要加新的表,到已有的任务中,需要设置scanNewlyAddedTableEnabled(true)

1.设置savepoint

$ ./bin/flink stop $Existing_Flink_JOB_ID Suspending job "cca7bc1061d61cf15238e92312c2fc20" with a savepoint. Savepoint completed. Path: file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab

2.修改tablelist,增加表

MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("yourHostname") .port(yourPort) .scanNewlyAddedTableEnabled(true) .databaseList("db") .tableList("db.product, db.user, db.address, db.order, db.custom") // set captured tables [product, user, address ,order, custom] .username("yourUsername") .password("yourPassword") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); // your business code

3.从savepoint启动

$ ./bin/flink run \ --detached \ --fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \ ./FlinkCDCExample.jar


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #增加新表到已有任务 #二是api改得有点随意了