irpas技术客

DataX oracle同步mysql(全量和增量)_闪耀的瞬间_datax oracle到mysql

网络 793

本篇博客说说DataX如何进行全量和增量数据同步,虽然用演示oracle同步到mysql,但其他数据库之间的同步都差不多

1.DataX介绍

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 Github主页地址:https://github.com/alibaba/DataX DataX工具下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

DataX采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件

Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

下面是DataX3.0支持的插件,也就是能在这些数据源直接根据json脚本配置互相同步数据

类型数据源Reader(读)Writer(写)文档RDBMS 关系型数据库MySQL√√读 、写Oracle√√读 、写OceanBase√√读 、写SQLServer√√读 、写PostgreSQL√√读 、写DRDS√√读 、写达梦√√读 、写通用RDBMS(支持所有关系型数据库)√√读 、写阿里云数仓数据存储ODPS√√读 、写ADS√写OSS√√读 、写OCS√√读 、写NoSQL数据存储OTS√√读 、写Hbase0.94√√读 、写Hbase1.1√√读 、写MongoDB√√读 、写Hive√√读 、写无结构化数据存储TxtFile√√读 、写FTP√√读 、写HDFS√√读 、写Elasticsearch√写
2.DataX实战 2.1.DataX基础环境搭建

1.把下载好的 datax.tar.gz 上传到Linux

2.解压 tar -xzvf datax.tar.gz ,会有/datax 目录,进入cd datax 目录

3.先删除datax目录中的所有隐藏文件,否则执行脚本会失败

find ./ -name '._*' -print0 |xargs -0 rm -rf 4.执行测试脚本:./bin/datax.py job/job.json ,看到下图的效果,说明环境正常 2.2.DataX 全量同步数据,oracle 到 mysql

从上面的介绍中可以知道,datax是通过不同的插件去同步数据的,每个插件都有 reader 和 writer ,要从oracle同步数据到mysql,执行: ./bin/datax.py -r oraclereader -w mysqlwriter ,获取示例json配置,然后去修改里面的参数

#在job 目录中创建 vi oracle_to_mysql.json,这是改完后能同步的参数配置 { "job": { "content": [ { "reader": { "name": "oraclereader", "parameter": { "column": ["INVESTOR_ID","INVESTOR_NAME","ID_TYPE","ID_NO","CREATE_TIME"], "splitPk": "INVESTOR_ID", "where" : "INVESTOR_ID is not null", "connection": [ { "jdbcUrl": ["jdbc:oracle:thin:@172.17.112.177:1521:helowin"], "table": ["CXX.CUSTOMER"] } ], "password": "123456", "username": "admin" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [ "customer_no", "customer_name", "id_type", "id_no", "create_time" ], "connection": [ { "jdbcUrl": "jdbc:mysql://172.17.112.176:3306/customer_db?useUnicode=true&characterEncoding=UTF-8", "table": ["customer_datax"] } ], "username": "admin", "password": "123456", "preSql": [], "session": ["set session sql_mode='ANSI'"], "writeMode": "update" } } } ], "setting": { "speed": { "channel": "3" } } } }

执行全量同步:./bin/datax.py job/oracle_to_mysql.json ,可以看到有 1045条记录被同步到mysql了

2.3.DataX 增量同步数据,oracle 到 mysql

增量同步需要Linux的crontab定时任务配合,再通过shell脚本计算时间,并传递到json脚本的Where条件中,"where" : "CREATE_TIME > unix_to_oracle(${create_time}) and CREATE_TIME <= unix_to_oracle(${end_time})"

${create_time} 和 ${end_time}由shell脚本计算 vi /home/datax/servers/datax/job/oracle_to_mysql.json

{ "job": { "content": [ { "reader": { "name": "oraclereader", "parameter": { "column": ["INVESTOR_ID","INVESTOR_NAME","ID_TYPE","ID_NO","CREATE_TIME"], "splitPk": "INVESTOR_ID", "where" : "CREATE_TIME > unix_to_oracle(${create_time}) and CREATE_TIME <= unix_to_oracle(${end_time})", "connection": [ { "jdbcUrl": ["jdbc:oracle:thin:@172.17.112.177:1521:helowin"], "table": ["CXX.CUSTOMER"] } ], "password": "123456", "username": "admin" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [ "customer_no", "customer_name", "id_type", "id_no", "create_time" ], "connection": [ { "jdbcUrl": "jdbc:mysql://172.17.112.176:3306/customer_db?useUnicode=true&characterEncoding=UTF-8", "table": ["customer_datax"] } ], "username": "admin", "password": "123456", "preSql": [], "session": ["set session sql_mode='ANSI'"], "writeMode": "update" } } } ], "setting": { "speed": { "channel": "3" } } } }

vi /home/datax/servers/datax/job/increment_sync.sh

#!/bin/bash source /etc/profile # 截至时间设置为当前时间戳 end_time=$(date +%s) # 开始时间设置为300s前时间戳 create_time=$(($end_time - 300)) # 执行datax脚本,传入时间范围 /home/datax/servers/datax/bin/datax.py /home/datax/servers/datax/job/oracle_to_mysql.json -p "-Dcreate_time=$create_time -Dend_time=$end_time" &

并给increment_sync.sh赋可执行权限:chmod -R 777 increment_sync.sh

然后设置 crontab 定时任务,每5分钟执行一次,和上面脚本中的300s对应

crontab -e */5 * * * * /home/datax/servers/datax/job/increment_sync.sh >/dev/null 2>&1

tip:oralce是没有 unix_to_oracle 函数的,需要自行在oracle中创建

create or replace function unix_to_oracle(in_number NUMBER) return date is begin return(TO_DATE('19700101','yyyymmdd') + in_number/86400 +TO_NUMBER(SUBSTR(TZ_OFFSET(sessiontimezone),1,3))/24); end unix_to_oracle;

好了,此时就完成了增量同步

3.DataX同步流程: 1.第一次部署datax时,手动执行全量同步脚本,同步已有客户数据2.再进行增量同步,用Linux的crontab和脚本配合,能按时间进行增量同步3.oracle同步mysql时,有几种同步模式,建议"writeMode"设置为"update": 3.1.mysql的"writeMode"设置为"insert",在有重复数据记录时,不会同步,直接跳过,就算oracle中该条数据已经修改了,也不会同步3.2.mysql的"writeMode"设置为"replace",在有重复数据记录时,会先删除mysql中的记录,再把oracle中的记录新增进去3.3.mysql的"writeMode"设置为"update",在有重复数据记录时,会把oracle中的列,覆盖mysql中的列,未配置同步的列,不会覆盖 4.需要在oracle库创建uninx_to_date() 函数 4.增量同步方式优化

上面需要在oracle库创建一个uninx_to_date()函数,下面用shell脚本把uninx时间戳转为yyyy-MM-dd hh:mm:ss 类型,然后传到oracle_to_mysql.json配置中,就不用创建这个uninx_to_date()函数了

increment_sync.sh 脚本获取字符串类型时间:

#!/bin/bash source /etc/profile #当前时间戳 cur_time=$(date +%s) #结束时间 end_time="'$(date -d @$cur_time +"%Y-%m-%d %H:%M:%S")'" #开始时间,为当前时间的前300s create_time="'$(date -d @$(($cur_time-120)) +"%Y-%m-%d %H:%M:%S")'" # 执行datax脚本,传入时间范围 /home/datax/servers/datax/bin/datax.py /home/datax/servers/datax/job/oracle_to_mysql.json -p "-Dcreate_time=$create_time -Dend_time=$end_time" &

修改oracle_to_mysql.json的where参数,去掉uninx_to_date()函数

{ "job": { "content": [ { "reader": { "name": "oraclereader", "parameter": { "column": ["INVESTOR_ID","INVESTOR_NAME","ID_TYPE","ID_NO","CREATE_TIME"], "splitPk": "INVESTOR_ID", "where" : "CREATE_TIME >to_date('${create_time}','yyyy-mm-dd hh24:mi:ss') and CREATE_TIME <= to_date('${end_time}','yyyy-mm-dd hh24:mi:ss')", "connection": [ { "jdbcUrl": ["jdbc:oracle:thin:@172.17.112.177:1521:helowin"], "table": ["CXX.CUSTOMER"] } ], "password": "123456", "username": "admin" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [ "customer_no", "customer_name", "id_type", "id_no", "create_time" ], "connection": [ { "jdbcUrl": "jdbc:mysql://172.17.112.176:3306/customer_db?useUnicode=true&characterEncoding=UTF-8", "table": ["customer_datax"] } ], "username": "admin", "password": "123456", "preSql": [], "session": ["set session sql_mode='ANSI'"], "writeMode": "update" } } } ], "setting": { "speed": { "channel": "3" } } } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #dataX #oracle到mysql #sync #oracle #To #MySQL