普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
FAQ
  • Seatunnel-时间戳关系型数据库增量同步示例
  • 准备数据
  • 新建通用模板作业
  • 配置来源和目标数据源、数据库名称
  • 配置表名映射
  • 同步配置
  • 通用配置
  • 保存草稿
  • 运行
  • 提交版本

# Seatunnel-时间戳关系型数据库增量同步示例

本示例主要介绍 Seatunnel 处理引擎下,使用时间戳关系型数据库增量同步进行单表迁移、多表数据迁移、整库数据迁移的增量同步场景。

主要步骤如下:

  • 准备数据
  • 新建通用模板作业
  • 配置数据源/库
  • 配置表名映射
  • 同步配置
  • 通用配置
  • 保存草稿
  • 运行
  • 提交版本

# 准备数据

以 MySQL 数据库为来源数据库。创建数据库:dws_source_dev、dws_front_dev,在数据库 dws_source_dev 中创建一个表 orderinfo。

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for orderinfo
-- ----------------------------
drop table if EXISTS orderinfo;
create table orderinfo
(
  ordercode    VARCHAR(32) not null comment '订单号',
  amount       int(11) comment '数量',
  price        VARCHAR(32) comment '价格',
  manufacturer VARCHAR(32) comment '厂商',
  createtime   DATE comment '创建时间',
  createuser   VARCHAR(32) comment '创建人',
  updatetime   timestamp comment '更新时间'
) ENGINE=INNODB DEFAULT CHARSET=utf8;

alter table orderinfo add constraint ORDER_PRIMARYKEY primary key (ORDERCODE);

-- ----------------------------
-- Records of orderinfo
-- ----------------------------
BEGIN;
insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime) 
values ('001', 1000, '5000', 'xingyun', '2018-12-13', 'zhangcong', '2018-12-13 00:00:00');
insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
values ('002', 500, '1000', '西安金属', '2018-12-13', '王强', '2018-12-15 00:00:00');


COMMIT;
SET FOREIGN_KEY_CHECKS = 1;

# 新建通用模板作业

点击资源树节点上的【...】,选择弹出菜单【新建通用模板作业】,填写"模板名称",选择"模板类型",点击【确定】按钮。

image-20241129175520747

# 配置来源和目标数据源、数据库名称

选择来源和目标表的"数据源"、"库名称"。

image-20241129175646818

# 配置表名映射

  1. 点击【添加表】,在弹窗中选择将要同步的源表名称。

⚠️ 提示:单表同步时选择一张表,多表同步时选择多张表,整库迁移时选择全部表。

image-20241129175732854

  1. 默认来源表与目标表的表名称一样,用户可以通过给目标表名称加前后缀的方式修改"目标表名称"。(可选配置)

image-20241129175806668

  1. 配置字段映射。配置完成后,变为“字段映射 (已配置)”

image-20241129180027429

image-20241129180044529

image-20241129180107188

# 同步配置

默认“表添加模式”为“表不存在时创建”;默认“数据添加模式”为“同步前删除数据”;“默认大小写转换模式”为“不转换”;可根据需要修改。

# 通用配置

在通用配置中可以配置任务优先级、Worker 分组、本地参数、失败重试、超时机制、部署方式。

# 保存草稿

如果所有组件属性都已设置完毕,点击【保存】按钮,可以看到保存过的历史草稿,并可以随意切换草稿。(草稿只保存最近 10 个)

可以参考示例关系型表数据同步示例 中的"保存草稿"说明。

# 运行

点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。

  1. 上述模型中目标表已存在且配置了字段映射,但目标表无数据,故第一次运行时会同步所有数据。

当目标表不存在时,会自动创建目标表。

image-20241129180349457

  1. 当后续再运行作业时,只会同步大于目标表updatetime字段值的数据(字段映射页面配置的【目标表字段】)。

执行如下脚本,插入两条数据,更新一条数据的updatetime

insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
values ('004', 20000, '75000', '星峰', '2018-12-13', '王林', '2018-12-13 00:00:00');
insert into orderinfo (ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
values ('005', 1300, '8000', '天威科技', '2018-12-13', '王林', '2019-12-01 00:00:00');

update orderinfo set updatetime='2020-01-01' where ordercode='001'

在源表执行上述脚本后,再次运行模型并查看数据:ordercode为004的数据未同步(upadtetime字段早于目标表updatetime最大时间)、ordercode为001的updatetime更新成功。

image-20241129181153226

# 提交版本

当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。

提交后的版本,可以在作业调度中进行"定时"调度配置。

可以参考示例关系型表数据同步示例 中的"提交版本"说明。

← Seatunnel-CDC增量同步示例 Seatunnel-时间戳关系型数据库到Hive增量同步示例 →