普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
FAQ
  • CDC增量数据同步示例
  • 准备数据
  • 新建通用模板作业
  • 配置来源和目标数据源、数据库名称
  • 配置表名映射
  • 同步配置
  • 通用配置
  • 保存草稿
  • 运行
  • 提交版本

# CDC增量数据同步示例

本示例主要介绍Seatunnel处理引擎下,使用CDC增量同步模板实现实时同步多表增量数据场景。支持的数据库、版本、驱动参见:通用模板作业-CDC增量同步

单表增量数据同步:将关系型数据源的一张表的增量数据(发生变化的数据)同步到另一个数据源。

多表增量数据同步:将关系型数据源的多张表的增量数据(发生变化的数据)同步到另一个数据源。

主要步骤如下:

  • 准备数据
  • 新建通用模板作业
  • 配置数据源/库
  • 配置表名映射
  • 同步配置
  • 通用配置
  • 保存草稿
  • 运行
  • 提交版本

# 准备数据

以 MySQL 数据库为例。

  1. 开启数据库cdc配置,可参考MySQL-CDC配置。

  2. 创建数据库:dws_source_dev,在数据库 dws_source_dev 中创建一个表 t_xueji。

    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;
    
    -- ----------------------------
    -- Table structure for t_xueji
    -- ----------------------------
    DROP TABLE IF EXISTS `t_xueji`;
    CREATE TABLE `t_xueji` (
      `id` int(11)  NOT  NULL PRIMARY KEY,
      `xuehao` varchar(255) DEFAULT NULL,
      `banji` varchar(255) DEFAULT NULL,
      `xueji` varchar(255) DEFAULT NULL,
      `xingbie` int(11) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- ----------------------------
    -- Records of t_xueji
    -- ----------------------------
    BEGIN;
    INSERT INTO `t_xueji` (`id`, `xuehao`, `banji`, `xueji`, `xingbie`) VALUES (1, '2023-0001', '1班', 'H', 1);
    INSERT INTO `t_xueji` (`id`, `xuehao`, `banji`, `xueji`, `xingbie`) VALUES (2, '2023-0002', '1班', 'H', 1);
    INSERT INTO `t_xueji` (`id`, `xuehao`, `banji`, `xueji`, `xingbie`) VALUES (3, '2023-0003', '1班', 'H', 0);
    INSERT INTO `t_xueji` (`id`, `xuehao`, `banji`, `xueji`, `xingbie`) VALUES (4, '2023-0004', '1班', 'H', 0);
    
    COMMIT;
    
    SET FOREIGN_KEY_CHECKS = 1;
    

# 新建通用模板作业

点击资源树节点上的【...】,选择弹出菜单【新建通用模板作业】,填写"模板名称",选择"模板类型",点击【确定】按钮。

image-20241129173135275

# 配置来源和目标数据源、数据库名称

选择来源和目标表的"数据源"、"库名称"。

image-20241129173305632

# 配置表名映射

  1. 点击【添加表】,在弹窗中选择将要同步的源表名称。

⚠️ 提示:单表同步时选择一张表,多表同步时选择多张表,整库迁移时选择全部表。

image-20241129173439263

  1. 默认来源表与目标表的表名称一样,用户可以通过给目标表名称加前后缀的方式修改"目标表名称"。(可选配置)

image-20241129173451661

# 同步配置

默认“表添加模式”为“表不存在时创建”;默认“数据添加模式”为“同步前删除数据”;“默认大小写转换模式”为“不转换”;可根据需要修改。

# 通用配置

在通用配置中可以配置任务优先级、Worker 分组、本地参数、失败重试、超时机制、部署方式。

# 保存草稿

如果所有组件属性都已设置完毕,点击【保存】按钮,可以看到保存过的历史草稿,并可以随意切换草稿。(草稿只保存最近 10 个)

可以参考示例关系型表数据同步示例 中的"保存草稿"说明。

# 运行

点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。

CDC增量同步模型,运行成功后不会停止,任务一直监听源表数据变化实时同步。如果需要停止任务,需要手动点击停止按钮。

image-20241129173507621

image-20241129173624899

当用户增加/修改/删除源表的一条数据时,目标表中的数据会同步增加/修改/删除。

⚠️ 提示:目标表不存在时会自动创建。

在源表中执行以下脚本,模拟数据新增、修改、删除。

INSERT INTO `t_xueji` (`id`, `xuehao`, `banji`, `xueji`, `xingbie`) VALUES (5, '2023-0005', '1班', 'H', 1);
INSERT INTO `t_xueji` (`id`, `xuehao`, `banji`, `xueji`, `xingbie`) VALUES (6, '2023-0006', '1班', 'X', 0);
INSERT INTO `t_xueji` (`id`, `xuehao`, `banji`, `xueji`, `xingbie`) VALUES (7, '2023-0007', '1班', 'X', 0);
INSERT INTO `t_xueji` (`id`, `xuehao`, `banji`, `xueji`, `xingbie`) VALUES (8, '2023-0008', '1班', 'X', 1);
update t_xueji set banji='3班'  where id =2;
delete from t_xueji where id=1;

查看目标表中数据,实时同步新增、修改、删除的数据。

image-20241129173847557

# 提交版本

当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。

提交后的版本,可以在作业调度中进行"定时"调度配置。

可以参考示例关系型表数据同步示例 中的"提交版本"说明。

← Seatunnel-StarRocks到MongoDB同步示例 Seatunnel-时间戳关系型数据库增量同步示例 →