Canal监听MySQL 1、Mysql数据库开启binlog模式
注意:Mysql容器,此处Mysql版本为5.7
#进入容器 docker exec -it mysql /bin/bash #进入配置目录 cd /etc/mysql/mysql.conf.d #修改配置文件 vi mysqld.cnf(1) 修改mysqld.cnf配置文件,添加如下配置:
log-bin=mysql-bin server-id=12345(2) 创建账号 用于测试使用,使用root账号创建用户并授予权限
create user canal@'%' IDENTIFIED by 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;(3) 重启mysql容器
docker restart mysql 2、Docker下Canal容器安装(1)拉取canal镜像
docker pull docker.io/canal/canal-server(2)创建Canal容器
docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server(3)进入容器,修改核心配置canal.properties 和instance.properties,canal.properties 是canal自身的配置,instance.properties是需要同步数据的数据库连接配置。
#进入容器 docker exec -it canal /bin/bash cd canal-server/conf/ #修改 canal.properties vi canal.properties cd example/ #修改 instance.properties vi instance.properties修改canal.properties的id,不能和mysql的server-id重复,如下图: 修改instance.properties,配置数据库连接地址:
这里的canal.instance.filter.regex有多种配置,如下:
可以参考地址如下: https://github.com/alibaba/canal/wiki/AdminGuide
mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 常见例子: 1. 所有表:.* or .*\\..* 2. canal schema下所有表: canal\\..* 3. canal下的以canal打头的表:canal\\.canal.* 4. canal schema下的一张表:canal.test1 5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔) 注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)配置完成后,设置开机启动,并记得重启canal。
docker update --restart=always canal docker restart canal 3、Canal Client项目搭建(1)引入依赖
<parent> <artifactId>spring-boot-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.1.4.RELEASE</version> </parent> <dependencies> <!--canal依赖--> <dependency> <groupId>com.xpand</groupId> <artifactId>starter-canal</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> </dependencies>注意:canal依赖stater在中央仓库是不存在的,需要手动放进本地仓库或者你公司里面的nexus
这里以放进本地仓库为例:
首先解压spring-boot-starter-canal-master.zip在spring-boot-starter-canal-master\starter-canal文件夹下执行mvn clean install此时在本地仓库就会存在jar包引入依赖 <!--canal依赖--> <dependency> <groupId>com.xpand</groupId> <artifactId>starter-canal</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>(2)启动类编写
@SpringBootApplication @EnableCanalClient public class CanalApplication { public static void main(String[] args) { SpringApplication.run(CanalApplication.class,args); } }(3)监听器编写
@CanalEventListener public class CanalDataEventListener { /*** * 增加数据监听 * @param eventType * @param rowData */ @InsertListenPoint public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue())); } /*** * 修改数据监听 * @param rowData */ @UpdateListenPoint public void onEventUpdate(CanalEntry.RowData rowData) { System.out.println("UpdateListenPoint"); rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue())); } /*** * 删除数据监听 * @param eventType */ @DeleteListenPoint public void onEventDelete(CanalEntry.EventType eventType) { System.out.println("DeleteListenPoint"); } /*** * 自定义数据修改监听 * @param eventType * @param rowData */ @ListenPoint(destination = "example", schema = "torlesse_test", table = {"tb_user", "tb_order"}, eventType = CanalEntry.EventType.UPDATE) public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { System.err.println("DeleteListenPoint"); rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue())); } @ListenPoint(destination = "example", schema = "test_canal", //所要监听的数据库名 table = {"tb_user"}, //所要监听的数据库表名 eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE}) public void onEventCustomUpdateForTbUser(CanalEntry.EventType eventType, CanalEntry.RowData rowData){ getChangeValue(eventType,rowData); } public static void getChangeValue(CanalEntry.EventType eventType, CanalEntry.RowData rowData){ if(eventType == CanalEntry.EventType.DELETE){ rowData.getBeforeColumnsList().forEach(column -> { //获取删除前的数据 System.out.println(column.getName() + " == " + column.getValue()); }); }else { rowData.getBeforeColumnsList().forEach(column -> { //打印改变前的字段名和值 System.out.println(column.getName() + " == " + column.getValue()); }); rowData.getAfterColumnsList().forEach(column -> { //打印改变后的字段名和值 System.out.println(column.getName() + " == " + column.getValue()); }); } } }到此就可以实现Canal监听Mysql
项目gitee地址:test-canal
详情可以查看:
https://github.com/alibaba/canal (阿里官方)https://github.com/alibaba/canal/wiki/AdminGuide (阿里官方)https://github.com/chenqian56131/spring-boot-starter-canal (自制starter)
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。 |
标签: #canal监听mysql #exec #it #MySQL #binbash进入配置目录cd #mysqldcnf1