# PDI-时间戳Hive到关系型数据增量同步示例
本示例主要介绍 Primeton DI 处理引擎下,使用基于时间戳的 Hive 到 MySQL 进行增量数据同步的场景。
主要步骤如下:
# 准备数据
创建 Hive 来源数据库 test2,在来源数据库 test2 中创建一个表 orderinfo_2。
CREATE TABLE `test2.orderinfo_2`(
`ordercode` string,
`amount` int,
`price` string,
`manufacturer2` string,
`createtime` timestamp,
`createuser` string,
`updatetime` timestamp)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
'field.delim'=',',
'serialization.format'=',')
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'hdfs://node1.primeton.com:8020/user/hive/warehouse/test2.db/orderinfo_2'
TBLPROPERTIES (
'transient_lastDdlTime'='1705563868');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('001', 1000, '5000', '星云制造', '2023-10-25', '张无忌4', '2024-01-04 12:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('002', 50022, '1000', '西安金属', '2018-12-13', '张无忌4', '2018-12-13 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('003', 1500, '7000', '枫禾林木', '2018-12-13', '魏雨', '2024-01-11 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('004', 1000, '75000', '星峰', '2019-01-13', '王林', '2019-01-13 12:23:01');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('005', 1300, '8000', '天威科技', '2018-12-13', '王林', '2018-12-13 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('006', 2000, '5000', '微云科', '2023-10-25', '王林', '2023-10-25 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('007', 3000, '9001', '西安金属', '2024-12-13', '王强', '2020-12-13 11:11:11');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('008', 12300, '11589', '星峰', '2018-12-13', '王强', '2024-02-04 12:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('009', 5100, '22909', '星云制造', '2018-12-13', '魏雨', '2018-12-13 23:59:59');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer2, createtime, createuser, updatetime)
VALUES('010', 1800, '15900', '天威科技', '2018-12-13', '魏雨', '2018-12-13 00:00:00');
# 新建通用模板作业
点击资源树节点上的【...】,选择弹出菜单【新建通用模板作业】,填写"模板名称",选择"模板类型",点击【确定】按钮。
# 配置来源和目标数据源、数据库名称
选择来源和目标表的"数据源"、"库名称"。
# 配置表名映射
- 点击【添加表】,在弹窗中选择将要同步的源表名称。
⚠️ 提示:单表同步时选择一张表,多表同步时选择多张表,整库迁移时选择全部表。
- 配置字段映射。配置完成后,变为“字段映射 (已配置)”
# 同步配置
默认“表添加模式”为“表不存在时创建”;默认“数据添加模式”为“追加数据”;“默认大小写转换模式”为“不转换”;“忽略错误继续执行”为不勾选;可根据需要修改。
# 通用配置
在通用配置中可以配置任务优先级、Worker 分组、命名参数、本地参数、超时告警。
可以参考示例关系型表数据同步示例 中的"通用配置"说明。
# 指标日志
在指标日志中可以设置是否启用指标日志,当开启后,可以在【作业调度-【任务实例】中使用“查看数据量”统计数据量。
# 保存草稿
如果所有组件属性都已设置完毕,点击【保存】按钮,可以看到保存过的历史草稿,并可以随意切换草稿。(草稿只保存最近 10 个)
可以参考示例关系型表数据同步示例 中的"保存草稿"说明。
# 运行
点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。
# 提交版本
当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。
提交后的版本,可以在作业调度中进行"定时"调度配置。
可以参考示例关系型表数据同步示例 中的"提交版本"说明。