阿里巴巴DataX安装同步使用
1、安装jdkmaven运行环境
下载安装压缩包后,解压到指定目录/software并加入环境变量/etc/profile

export JAVA_HOME=/software/jdk1.8.0_65
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH

export PATH=$PATH:/software/apache-maven-3.6.1/bin

2、下载datax源码进行编译安装
下载源码
git clone https://github.com/alibaba/DataX.git

打包应用
mvn -U clean package assembly:assembly -Dmaven.test.skip=true
3、运行datax
编译成功后进入打包目录,编辑一个配置文件mysql_reader.json从mysql中读出数据并输出

[root@localhost bin]# pwd
/project/DataX/target/datax/datax/bin
[root@localhost bin]# ls
datax.py  dxprof.py  mysql_reader.json  perftrace.py
[root@localhost bin]#
mysql_reader.json文件如下

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "id",
                            "name",
							"create_date"
                        ],
                        "splitPk": "id",
                        # 如需实现增量 "where":" trans_date='2019-01-01' "
                        "connection": [
                            {
                                "table": [
                                    "user"
                                ],
                                "jdbcUrl": [
								    "jdbc:mysql://192.168.80.1:3306/test"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print":true
                    }
                }
            }
        ]
    }
}

运行程序
python datax.py mysql_reader.json

当数据量比较大的时候,根据mysql慢查询日志可发现是根据splitPk作为范围查找

2020-01-29 22:37:24.216 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2020-01-29 22:26:55
任务结束时刻                    : 2020-01-29 22:37:24
任务总计耗时                    :                628s
任务平均流量                    :          104.40KB/s
记录写入速度                    :           2124rec/s
读出记录总数                    :             1310720
读写失败总数                    :                   0
4、通过传参的方式批量同步处理表

python datax.py data-trans.json  -p "-DfromTable=credit -DtoTable=credit -DsplitPk=id"


[root@localhost bin]# more data-trans.json 
{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "sqlserverreader",
                    "parameter": {
                        "username": "sa",
                        "password": "123456",
                        "column": [
                            "*"
                        ],
                        "splitPk": "${splitPk}",
                        "connection": [
                            {
                                "table": [
                                    "${fromTable}"
                                ],
                                "jdbcUrl": [
                                    "jdbc:sqlserver://192.168.1.3;DatabaseName=credit;SelectMethod=cursor"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "123456",
                        "column": [
                            "*"
                        ],
                        "session": [
                                "set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "select now()"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.1.220:3306/test?useUnicode=true&characterEncoding=utf-8",
                                "table": [
                                    "${toTable}"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}
[root@localhost bin]# 
  • 注:如需增量同步,如datax任务的执行周期为天,则可增加where条件筛选源数据实现增量同步;

赞赏(Donation)
微信(Wechat Pay)

donation-wechatpay