网站首页 > 技术教程 正文
下载地址
https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
生成简易模板
-- 查看配置模板
python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
-- 保存模板
python datax.py -r {YOUR_READER} -w {YOUR_WRITER} > YOUR_WRITER.JSON
启动 DataX
cd {YOUR_DATAX_DIR_BIN}
python datax.py ./stream2stream.json
DataX 传参
在JSON配置文件中使用${param}引用参数,在提交任务时使用 **-p "-Dparam=value"** 传入参数值;
-- 执行任务
python bin/datax.py ./job/base_province_2.json -p "-Ddt=2020-06-14"
-- JSON 文件
{
"job":{
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"connection":[
{
"jdbcUrl":[
"jdbc:mysql://server15:3306/gmall2022"
],
"querySql":[
"select id,name from base_province where id>=3"
]
}
],
"password":"root",
"username":"root"
}
},
"writer":{
"name":"hdfswriter",
"parameter":{
"column":[
{
"name":"id",
"type":"bigint"
},
{
"name":"name",
"type":"string"
}
],
"compress":"gzip",
"defaultFS":"hdfs://server16:8020",
"fieldDelimiter":"\t",
"fileName":"base_province",
"fileType":"text",
"path":"/base_province/${dt}",
"writeMode":"append"
}
}
}
],
"setting":{
"speed":{
"channel":"1"
}
}
}
}
优化
参数 | 说明 |
job.setting.speed.channel | 总并发数 |
job.setting.speed.record | 总record限速 |
job.setting.speed.byte | 总byte限速 |
core.transport.channel.speed.record | 单个channel的record限速,默认值为10000(10000条/s) |
core.transport.channel.speed.byte | 单个channel的byte限速,默认值1024*1024(1M/s) |
注意事项:
1.若配置了总record限速,则必须配置单个channel的record限速
2.若配置了总byte限速,则必须配置单个channe的byte限速
3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:
计算公式为:
min(总byte限速/单个channle的byte限速,总record限速/单个channel的record限速)
示例 :
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 1048576 //单个channel byte限速1M/s
}
}
}
},
"job": {
"setting": {
"speed": {
"byte" : 5242880 //总byte限速5M/s
}
},
...
}
}
当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:
一种是直接更改datax.py脚本;
另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json
平台集成
1. 使用Datax API进行集成
Datax提供了丰富的API接口,可以通过调用这些接口实现与云平台的集成。例如,可以使用POST方法调用Datax的startJob接口,启动一个数据同步任务。
import com.aliyun.datax.client.api.DataxClient;
import com.aliyun.datax.client.config.JobConfig;
import com.aliyun.datax.client.exception.DataXException;
public class DataxRunner {
public static void main(String[] args) throws Exception {
// 创建Datax客户端对象
DataxClient client = new DataxClient("http://your_datax_host:your_datax_port");
// 创建作业配置对象
JobConfig jobConfig = new JobConfig();
jobConfig.setProjectName("your_project_name");
jobConfig.setJobName("your_job_name");
// 启动作业
client.startJob(jobConfig);
}
}
2.使用Datax SDK进行集成
import com.aliyun.datax.client.dataxclient.api.IDataxClient;
import com.aliyun.datax.client.dataxclient.entity.JobRequest;
import com.aliyun.datax.client.dataxclient.entity.JobResponse;
import com.aliyun.datax.client.dataxclient.impl.DefaultDataxClient;
public class DataxRunner {
public static void main(String[] args) throws Exception {
// 创建Datax客户端对象
IDataxClient client = new DefaultDataxClient("http://your_datax_host:your_datax_port");
// 创建作业请求对象
JobRequest jobRequest = new JobRequest();
jobRequest.setProjectName("your_project_name");
jobRequest.setJobName("your_job_name");
// 启动作业
JobResponse jobResponse = client.startJob(jobRequest);
System.out.println("Job status: " + jobResponse.getStatus());
System.out.println("Job id: " + jobResponse.getId());
}
}
DataX-WEB 安装部署
1.下载
-- 源码
https://github.com/WeiYe-Jing/datax-web
-- 部署文档
https://github.com/WeiYe-Jing/datax-web/blob/master/doc/datax-web/datax-web-deploy.md
2.执行一键安装脚本
进入解压后的目录,找到bin目录下面的install.sh文件,如果选择交互式的安装,则直接执行
./bin/install.sh
3.数据库初始化(MySQL与 DataX-WEB不在同一台服务器上)
1. 登录 msyql 建库
mysql> create database dataxweb;
2. 初始化脚本
mysql> {INSTALL_PATH}/bin/db/datax-web.sql
3. 修改 datax-web 连接 MySQL 配置
vim {INSTALL_PATH}/modules/datax-admin/conf/bootstrap.properties
#Database
#DB_HOST=
#DB_PORT=
#DB_USERNAME=
#DB_PASSWORD=
#DB_DATABASE=
- 启动服务
一键启动所有服务
./bin/start-all.sh
单一地启动某一模块服务:
./bin/start.sh -m {module_name}
一键取消所有服务
./bin/stop-all.sh
单一地停止某一模块服务:
./bin/stop.sh -m {module_name}
- 查看服务
在Linux环境下使用JPS命令,查看是否出现 DataXAdminApplication 和 DataXExecutorApplication 进程,如果存在这表示项目运行成功
如果项目启动失败,请检查启动日志:modules/datax-admin/bin/console.out 或者 modules/datax-executor/bin/console.out
- 登录
在浏览器中输入 http://ip:port/index.html 就可以访问对应的主界面(ip为datax-admin部署所在服务器ip,port为为datax-admin 指定的运行端口)
http://ip:9527/index.html
输入用户名 admin 密码 123456 就可以直接访问系统
- 配置
1. 在项目目录: /modules/datax-admin/bin/env.properties
配置邮件服务
MAIL_USERNAME=""
MAIL_PASSWORD=""
访问端口
SERVER_PORT=9527
2. 在项目目录下/modules/datax-execute/bin/env.properties
指定 PYTHON_PATH 的路径(datax.py 绝对路径)
PYTHON_PATH=/opt/datax/bin/datax.py
datax json 文件存放位置
JSON_PATH=${BIN}/../json
保持和datax-admin端口一致;默认是9527,如果没改datax-admin的端口,可以忽略
DATAX_ADMIN_PORT=
- 运行日志
部署完成之后,在modules/对应的项目/data/applogs下(用户也可以自己指定日志,修改application.yml 中的logpath地址即可),用户可以根据此日志跟踪项目实际启动情况
如果执行器启动比admin快,执行器会连接失败,日志报"拒绝连接"的错误,一般是先启动admin,再启动executor,30秒之后会重连,如果成功请忽略这个异常。
Shell 脚本
#!/bin/bash
# 设置需要同步的哪一天的数据
# 一般情况下,当天产生的数据,会在第二天的凌晨增量导入到数据仓库中
dt=`date -d yesterday +"%Y-%m-%d"`
# 提取时间分量
year=${dt:0:4}
month=${dt:5:2}
# 设置 Datax的路径
DATAX_HOME=/opt/datax
# 设置jobs的路径
JOBS HOME=/opt/datax/jobs
# 增量导入数据
python3 $DATAX_HOME/bin/datax.py -p"-Ddate=$dt" $JOBS_HOME/incr-xxx.json
# 增量导入订单数据
# 1.检查Hive表中指定的分区是否存在,如果不存在,需要手动创建分区
if [ ! hive -e "show partitions datax_shop.orders partition(year='$year', month='$month')" ]; then
hive -e "alter table datax shop.orders add partition(year='$vear', month='$month')"
fi
# 2.执行增量入订单数据
python3 DATAX_HOME/bin/datax,py -p "-Ddate=Sdt -Dyear=Syear -Dmonth=Smonth" SJOBS-HOME/incr-orders.json
mysqlreader 插件任务切分
问题
1. DataX读取的字段比job中column的字段个数多或者少,或者一行数据转换为多行数据
问题描述:
DataX是将reader读取到的数据,默认是CSV格式,分隔符是\t,单行数据分割符是\n\r,所以当读取到的字段中包含\t时,会将一列数据转换为两列,包含\n\r时,会将一行数据转换为两行。导致writer时,报Reason: actual column number is more than schema column number.actual number: 21异常
处理方式:
1、单独处理制表符
REPLACE(columnName, ' ', '')
2、处理字段中包含制表符、回车符和换行符
REPLACE(REPLACE(REPLACE(columnName, chr(10),' '), chr(13), ''), chr(9),' ')
在 job.content.reader.parameter.connection.querySql 中通过 select SQL 语句替换;
猜你喜欢
- 2024-10-20 【0基础学爬虫】爬虫基础之代理的基本使用
- 2024-10-20 体验IntelliJ IDEA的远程开发(Remote Development)
- 2024-10-20 Mac 上使用 Windows,Parallels Desktop 套装满减优惠
- 2024-10-20 探索X窗口系统 窗口探测工具
- 2024-10-20 快速掌握Linux基础,走好万里长征第一步
- 2024-10-20 网络协议之:haproxy的Proxy Protocol代理协议
- 2024-10-20 linux定时器编程详解(包含代码) 定时器 linux
- 2024-10-20 微软2月Win11更新阻止软件/注册表方式修改默认网络浏览器
- 2024-10-20 Go 每日一库之 rpcx github每日一题
- 2024-10-20 MacOS访达增强工具-TotalFinder mac访达是什么
你 发表评论:
欢迎- 01-09单因素方差分析+作图
- 01-09描述性统计分析 之 均值分析
- 01-0986:重复性和再现性分析GRR(2)-GRR均值极差分析法和方差分析法
- 01-09SPC如何做方差分析,意义又在哪里?
- 01-09MedSPSS小课堂——多因素方差分析
- 01-09MedSPSS小课堂——双因素方差分析
- 01-09SPSS单因素方差分析的操作步骤及结果解读,陈老师SPSS数据分析
- 01-0914单因素方差分析:One-Way ANOVA
- 最近发表
- 标签列表
-
- sd分区 (65)
- raid5数据恢复 (81)
- 地址转换 (73)
- 手机存储卡根目录 (55)
- tcp端口 (74)
- project server (59)
- 双击ctrl (55)
- 鼠标 单击变双击 (67)
- debugview (59)
- 字符动画 (65)
- flushdns (57)
- ps复制快捷键 (57)
- 清除系统垃圾代码 (58)
- web服务器的架设 (67)
- 16进制转换 (69)
- xclient (55)
- ps源文件 (67)
- filezilla server (59)
- 句柄无效 (56)
- word页眉页脚设置 (59)
- ansys实例 (56)
- 6 1 3固件 (59)
- sqlserver2000挂起 (59)
- vm虚拟主机 (55)
- config (61)
本文暂时没有评论,来添加一个吧(●'◡'●)