irpas技术客

Canal + RocketMQ 同步 MySQL 数据到 Redis 实战_银河小徐

irpas 962

原文地址:https://blog.xaoxu.cn/archives/canal-rocketmq-sync-mysql-data-to-redis

前言 缓存和数据库一致性问题

读取缓存步骤一般没有什么问题,但是一旦涉及到数据更新,就容易出现缓存(Redis)和数据库(MySQL)间的数据一致性问题。因为写和读是并发的,没法保证顺序,就会出现缓存和数据库的数据不一致的问题。

无论是 先删除Redis缓存,再写MySQL数据库,还是 先写MySQL数据库,再删除Redis缓存,都有可能出现数据不一致的情况:

先删除Redis缓存,再写MySQL数据库:如果删除了Redis缓存,还没有来得及写MySQL数据库,另一个线程就来读取,发现缓存为空,则去数据库中读取数据写入缓存,此时缓存中的数据就是脏数据。先写MySQL数据库,再删除Redis缓存:如果先写了MySQL数据库,在删除Redis缓存前,写缓存的线程宕机了,没有删除掉缓存,则也会出现数据不一致情况。 解决方案 方案一:采用延时双删策略

在写库前后都进行 redis.del(key) 操作,并且设定合理的超时时间。具体实现步骤如下:

先删除缓存再写数据库(在1步骤后,2步骤前,可能有 读 请求因缓存未命中,读到脏数据,再次将脏数据存入缓存)休眠500毫秒再次删除缓存(删除可能在1-2步之间被存入的脏数据) public void update(String key, Object data){ redis.delKey(key); db.updateData(data); Thread.sleep(500); redis.delKey(key); } 为什么要休眠一段时间呢?休眠多长时间合适呢?

至于休眠多长时间需要视自己的项目情况而定,考虑项目中读取数据的业务逻辑耗时,同时还要考虑Redis和数据库主从同步的耗时来确定自己的休眠时间。 目的:确保请求结束时,写请求可以删除读请求造成的缓存脏数据。

缺点

1、代码耦合性太高 2、白白增加接口请求耗时

方案二:异步更新缓存(基于订阅 MySQL Binlog的同步机制)

MySQL Binlog增量订阅消费+消息队列+增量数据更新到redis

读Redis:热数据基本都在Redis写MySQL:增删改都是操作MySQL更新Redis数据:MySQL的数据操作Binlog,然后更新到Redis

那么我们如何订阅Binlog呢?如何将数据发送到消息队列呢?如何消费呢?不要慌,继续往下看,带你进入实战环节!

实战

基于MySQL Binlog同步数据保障一致性的架构图大致如下:

Canal是什么?

我这里就不为大家再去啰嗦介绍一遍了,推荐大家阅读下Canal的简介,地址:https://github.com/alibaba/canal/wiki/%E7%AE%80%E4%BB%8B

Canal工作原理 canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议mysql master收到dump请求,开始推送binary log给slave(也就是canal)canal解析binary log对象(原始为byte流)

当大家仔细阅读完官方的介绍之后,对Canal也是有了一个初步的了解,接下来我们就进入实战环节。

环境准备 MySQL配置

1、对于自建 MySQL,需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步。

2、授权 Canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES; SHOW GRANTS FOR 'canal' Canal的安装与配置 Canal Admin安装和配置

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

1、下载 canal-admin,访问 release 页面,选择需要的包下载,如以 1.1.4 版本为例:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

2、解压缩

mkdir /canal/canal-admin tar zxvf canal.admin-1.1.4.tar.gz -C /canal/canal-admin

解压完成后,进入 /canal/canal-admin 目录,可以看到如下结构:

drwxr-xr-x 7 xiaoxuxuy staff 224B Mar 15 16:36 bin drwxr-xr-x 9 xiaoxuxuy staff 288B Mar 13 20:27 conf drwxr-xr-x 90 xiaoxuxuy staff 2.8K Mar 13 20:24 lib drwxrwxrwx 5 xiaoxuxuy staff 160B Mar 17 10:07 logs

3、配置修改

vi conf/application.yml server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin

4、初始化元数据库

mysql -h127.1 -uroot -p # 导入初始化SQL > source conf/canal_manager.sql 初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化canal_manager.sql默认会在conf目录下,也可以通过链接下载 canal_manager.sql

5、启动

sh bin/startup.sh

启动成功后,可以通过 http://127.0.0.1:8089/ 访问即可看到 Canal 登录页面,默认密码:admin/123456

6、关闭

sh bin/stop.sh Canal Deployer安装和配置

1、下载 canal-deployer,访问 release 页面,选择需要的包下载,如以 1.1.4 版本为例:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2、解压缩

mkdir /canal/canal-deployer tar zxvf canal.deployer-1.1.4.tar.gz -C /canal/canal-deployer

解压完成后,进入 /canal/canal-deployer 目录,可以看到如下结构:

drwxr-xr-x 7 xiaoxuxuy staff 224B Mar 15 18:32 bin drwxr-xr-x 11 xiaoxuxuy staff 352B Mar 14 17:09 conf drwxr-xr-x 83 xiaoxuxuy staff 2.6K Mar 13 20:27 lib drwxrwxrwx 5 xiaoxuxuy staff 160B Mar 14 17:09 logs

3、修改 canal_local.properties 文件配置来覆盖 canal.properties 文件

官方解释:目前conf下会包含canal.properties/canal_local.properties两个文件,考虑历史版本兼容性,默认配置会以canal.properties为主。

# register ip canal.register.ip = # canal admin config canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster =

4、启动

sh bin/startup.sh local

启动成功后,我们在 Canal Admin Web UI 中刷新 server 管理,可以看到 canal server 已经启动成功。

5、修改 Canal Server 配置文件,修改消息队列相关配置

我这里已 RocketMQ 为示例,RocketMQ 安装参考:RocketMQ QuickStart

启动 RocketMQ UI 管理页面,我这里用 Docker 启动了,根据自己喜欢的方式启动即可:

docker run --name rocketmq-ui -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.0.7:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8082:8080 -itd styletang/rocketmq-console-ng:1.0.0

然后我们回到 Canal Server 配置文件,我的内容如下:

################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 canal.zkServers = # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, RocketMQ canal.serverMode = RocketMQ # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 # aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey = ################################################# ######### destinations ############# ################################################# canal.destinations = cms_article,cms_user # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ ############# ################################################## canal.mq.servers = 192.168.0.7:9876 canal.mq.retries = 3 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 canal.mq.lingerMs = 100 canal.mq.bufferMemory = 33554432 canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all #canal.mq.properties. = canal.mq.producerGroup = cms # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local # aliyun mq namespace #canal.mq.namespace = ################################################## ######### Kafka Kerberos Info ############# ################################################## canal.mq.kafka.kerberos.enable = false canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf" canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

主要简单修改了以下几个位置:

# ... # tcp, kafka, RocketMQ canal.serverMode = RocketMQ # ... canal.destinations = cms_article,cms_user # ... canal.mq.servers = 192.168.0.7:9876 # ... canal.mq.flatMessage = true # ... canal.mq.producerGroup = cms # ...

修改完成后,记得点击保存!保存后会自动重启。

Canal Admin WebUI 中配置 Instance 管理

新建 Instance -> 填写 Instance 名称:cms_article -> 选择所属集群/主机 -> 载入模板 -> 修改配置信息如下:

################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=cms-manage.article # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=cms_article # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################

配置好之后,点击保存,此时在 Instances 管理中就可以看到新创建实例信息。

这里我对应着 Instance 创建了两张表进行演示。

DROP TABLE IF EXISTS `article`; CREATE TABLE `article` ( `id` int(11) NOT NULL AUTO_INCREMENT, `content` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4; DROP TABLE IF EXISTS `user`; CREATE TABLE `user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

代码实现 引入依赖 <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> 添加RocketMQ配置文件,Redis自行配置 # 配置rocketmq rocketmq: name-server: 192.168.0.7:9876 producer: group: cms #生产者 consumer: group: cms #消费者 新增Canal监听SQL类型枚举 /** * Canal监听SQL类型 * * @author xiaoxuxuy * @date 2022/3/13 11:19 下午 */ @SuppressWarnings("AlibabaEnumConstantsMustHaveComment") public enum SqlType { INSERT("INSERT", "插入"), UPDATE("UPDATE", "更新"), DELETE("DELETE", "删除"); private final String type; private final String name; SqlType(String type, String name) { this.type = type; this.name = name; } public String getType() { return this.type; } public String getName() { return this.name; } } 新增User Model对象、Article Model对象 import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.extension.activerecord.Model; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; /** * @author xiaoxuxuy */ @Data @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) @ApiModel(description = "用户Model", value = "用户Model") public class User extends Model<User> { private static final long serialVersionUID = 1L; @ApiModelProperty("主键") @TableId(value = "id", type = IdType.AUTO) private Long id; @ApiModelProperty("名字") private String name; } import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.extension.activerecord.Model; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; /** * @author xiaoxuxuy */ @Data @EqualsAndHashCode(callSuper = false) @Accessors(chain = true) @ApiModel(description = "文章Model", value = "文章Model") public class Article extends Model<Article> { private static final long serialVersionUID = 1L; @ApiModelProperty("主键") @TableId(value = "id", type = IdType.AUTO) private Long id; @ApiModelProperty("文章内容") private String content; } Canal同步服务 import com.alibaba.otter.canal.protocol.FlatMessage; import java.util.Collection; /** * Canal同步服务 * * @author xiaoxuxuy * @date 2022/3/13 11:58 下午 */ public interface CanalSyncService<T> { /** * 处理数据 * * @param flatMessage CanalMQ数据 */ void process(FlatMessage flatMessage); /** * DDL语句处理 * * @param flatMessage CanalMQ数据 */ void ddl(FlatMessage flatMessage); /** * 插入 * * @param list 新增数据 */ void insert(Collection<T> list); /** * 更新 * * @param list 更新数据 */ void update(Collection<T> list); /** * 删除 * * @param list 删除数据 */ void delete(Collection<T> list); } 抽象Canal-RocketMQ通用处理服务 import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.annotation.TableId; import com.scaffolding.example.canal.enums.SqlType; import com.scaffolding.example.canal.service.CanalSyncService; import com.scaffolding.example.exception.BusinessException; import com.scaffolding.example.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import com.alibaba.otter.canal.protocol.FlatMessage; import com.google.common.collect.Sets; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.util.ReflectionUtils; import javax.annotation.Resource; import java.lang.reflect.Field; import java.lang.reflect.ParameterizedType; import java.util.*; /** * 抽象Canal-RocketMQ通用处理服务 * * @author xiaoxuxuy * @date 2022/3/13 7:07 下午 */ @Slf4j public abstract class AbstractCanalRocketMqRedisService<T> implements CanalSyncService<T> { @Resource private RedisTemplate redisTemplate; @Resource private RedisUtils redisUtils; private Class<T> classCache; /** * 获取Model名称 * * @return Model名称 */ protected abstract String getModelName(); /** * 处理数据 * * @param flatMessage CanalMQ数据 */ @Override public void process(FlatMessage flatMessage) { if (flatMessage.getIsDdl()) { ddl(flatMessage); return; } Set<T> data = getData(flatMessage); if (SqlType.INSERT.getType().equals(flatMessage.getType())) { insert(data); } if (SqlType.UPDATE.getType().equals(flatMessage.getType())) { update(data); } if (SqlType.DELETE.getType().equals(flatMessage.getType())) { delete(data); } } /** * DDL语句处理 * * @param flatMessage CanalMQ数据 */ @Override public void ddl(FlatMessage flatMessage) { //TODO : DDL需要同步,删库清空,更新字段处理 } /** * 插入 * * @param list 新增数据 */ @Override public void insert(Collection<T> list) { insertOrUpdate(list); } /** * 更新 * * @param list 更新数据 */ @Override public void update(Collection<T> list) { insertOrUpdate(list); } /** * 删除 * * @param list 删除数据 */ @Override public void delete(Collection<T> list) { Set<String> keys = Sets.newHashSetWithExpectedSize(list.size()); for (T data : list) { keys.add(getWrapRedisKey(data)); } redisUtils.delAll(keys); } /** * 插入或者更新redis * * @param list 数据 */ @SuppressWarnings("unchecked") private void insertOrUpdate(Collection<T> list) { redisTemplate.executePipelined((RedisConnection redisConnection) -> { for (T data : list) { String key = getWrapRedisKey(data); // 序列化key byte[] redisKey = redisTemplate.getKeySerializer().serialize(key); // 序列化value byte[] redisValue = redisTemplate.getValueSerializer().serialize(data); redisConnection.set(Objects.requireNonNull(redisKey), Objects.requireNonNull(redisValue)); } return null; }); } /** * 封装redis的key * * @param t 原对象 * @return key */ protected String getWrapRedisKey(T t) { return getModelName() + ":" + getIdValue(t); } /** * 获取类泛型 * * @return 泛型Class */ @SuppressWarnings("unchecked") protected Class<T> getTypeArgument() { if (classCache == null) { classCache = (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]; } return classCache; } /** * 获取Object标有@TableId注解的字段值 * * @param t 对象 * @return id值 */ protected Object getIdValue(T t) { Field fieldOfId = getIdField(); ReflectionUtils.makeAccessible(fieldOfId); return ReflectionUtils.getField(fieldOfId, t); } /** * 获取Class标有@TableId注解的字段名称 * * @return id字段名称 */ protected Field getIdField() { Class<T> clz = getTypeArgument(); Field[] fields = clz.getDeclaredFields(); for (Field field : fields) { TableId annotation = field.getAnnotation(TableId.class); if (annotation != null) { return field; } } log.error("PO类未设置@TableId注解"); throw new BusinessException("PO类未设置@TableId注解"); } /** * 转换Canal的FlatMessage中data成泛型对象 * * @param flatMessage Canal发送MQ信息 * @return 泛型对象集合 */ protected Set<T> getData(FlatMessage flatMessage) { List<Map<String, String>> sourceData = flatMessage.getData(); Set<T> targetData = Sets.newHashSetWithExpectedSize(sourceData.size()); for (Map<String, String> map : sourceData) { T t = JSON.parseObject(JSON.toJSONString(map), getTypeArgument()); targetData.add(t); } return targetData; } } Canal消息的订阅代码

RocketMQ 是支持广播消费的,只需要在消费端进行配置即可,默认情况下使用的是集群消费,这就意味着如果我们配置了多个消费者实例,只会有一个实例消费消息。

对于更新 Redis 来说,一个实例消费消息,完成 Redis 的更新,这就够了。

import com.alibaba.otter.canal.protocol.FlatMessage; import com.scaffolding.example.canal.Model.Article; import com.scaffolding.example.canal.service.impl.AbstractCanalRocketMqRedisService; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.springframework.stereotype.Service; /** * @author xiaoxuxuy * @date 2022/3/14 5:37 下午 */ @Slf4j @Service @RocketMQMessageListener(topic = "cms_article", consumerGroup = "article") public class TestArticleConsumer extends AbstractCanalRocketMqRedisService<Article> implements RocketMQListener<FlatMessage>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(FlatMessage flatMessage) { log.info("consumer message {}", flatMessage); try { process(flatMessage); } catch (Exception e) { log.warn(String.format("message [%s] 消费失败", flatMessage), e); throw new RuntimeException(e); } } @Getter private final String modelName = Article.class.getSimpleName(); @Override public void prepareStart(DefaultMQPushConsumer consumer) { // set consumer consume message from now consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis())); } } import com.alibaba.otter.canal.protocol.FlatMessage; import com.scaffolding.example.canal.Model.User; import com.scaffolding.example.canal.service.impl.AbstractCanalRocketMqRedisService; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.springframework.stereotype.Service; /** * @author xiaoxuxuy * @date 2022/3/14 5:37 下午 */ @Slf4j @Service @RocketMQMessageListener(topic = "cms_user", consumerGroup = "user") public class TestUserConsumer extends AbstractCanalRocketMqRedisService<User> implements RocketMQListener<FlatMessage>, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(FlatMessage flatMessage) { log.info("consumer message {}", flatMessage); try { process(flatMessage); } catch (Exception e) { log.warn(String.format("message [%s] 消费失败", flatMessage), e); throw new RuntimeException(e); } } @Getter private final String modelName = User.class.getSimpleName(); @Override public void prepareStart(DefaultMQPushConsumer consumer) { // set consumer consume message from now consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis())); } } 测试

插入article表一条数据:

观察 Redis 存储内容如下:

看到 RocketMQ 数据成功消费存储到 Redis 中。

Enjoy life !!!🤩🤩🤩


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #canal #rocketmq #同步 #MySQL #数据到 #redis #实战