普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
运维指南
  • CDC数据抽取示例
  • 环境准备说明
  • 准备数据
  • 新建同步作业
  • 拖拽图元
  • 配置组件属性
  • 运行
  • 查看数据
  • 提交版本

# CDC数据抽取示例

本示例主要演示从Mysql orderinfo表中实时读取数据输出COMPATIBLE_DEBEZIUM_JSON格式的数据,通过JsonPath组件解析json数据提取需要的字段(比如除了源表字段,还需额外添加operation_type(操作类型)、change_time(变更时间))、使用SQL函数TO_DATE(string, formatString)转换日期时间、FROM_UNIXTIME(unixtime, formatString,timeZone)转换时间戳,使用DefineSinkType组件定义接收字段的存储类型(对于开启自动建表时有效),最后将结果数据同步到目标表。

主要步骤如下:

  • CDC数据抽取示例
    • 环境准备说明
    • 准备数据
    • 新建同步作业
    • 拖拽图元
    • 配置组件属性
    • 运行
    • 查看数据
    • 提交版本

# 环境准备说明

提示:

1、源表和目标表都必须定义主键;
2、目标表不存在时,模型运行时会自动创建表,表字段和存储类型与DefineSinkType组件中定义的一致;
3、目标表存在时,要确保源表与目标表的数据结构、字段类型一致。同时源表开启了 cdc 配置。

# 准备数据

创建表 orderinfo 作为源表,并给源表orderinfoINSERT 一些数据。

-- ---------------------------------------
-- Table structure for orderinfo
-- ---------------------------------------
DROP TABLE IF EXISTS `orderinfo`;

CREATE TABLE `orderinfo` (
  `ordercode` varchar(32) NOT NULL,
  `amount` int DEFAULT NULL,
  `price` varchar(32) DEFAULT NULL,
  `manufacturer` varchar(32) DEFAULT NULL,
  `createtime` timestamp NULL DEFAULT NULL,
  `createuser` varchar(32) DEFAULT NULL,
  `updatetime` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`ordercode`)
);


-- ----------------------------
-- Records of orderinfo
-- ----------------------------

INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('001', 2000, '5900', '天威科技11', '2024-12-13 00:00:00', '魏雨', '2025-12-13 00:00:00');

INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('002', 1800, '9000', '天威科技22', '2018-12-13 00:00:00', '魏雨', '2018-12-13 00:00:00');

INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('003', 1800, '5900', '天威科技', '2018-12-13 00:00:00', '魏雨', '2018-12-13 00:00:00');

INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('004', 1800, '5900', '天威科技', '2018-12-13 00:00:00', '魏雨1', '2018-12-13 00:00:00');

INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('005', 2000, '5000', '微云科技', '2018-12-13 00:00:00', '王林', '2019-12-01 00:20:11');

INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('006', 3000, '666', '西安金属', '2018-12-13 00:00:00', '王强', '2022-12-13 00:00:00');

INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('007', 2300, '11589', '星峰来了', '2018-12-13 00:00:00', '王强', '2025-01-01 00:00:00');

INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('008', 5100, '12909', '星云制造1', '2018-12-13 00:00:00', '魏雨', '2026-01-01 12:24:23');

INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('009', 1800, '5900', '天威科技66', '2018-12-12 16:00:00', '魏雨', '2018-12-12 16:00:00');

# 新建同步作业

点击数据同步上的【...】,选择弹出菜单【新建数据同步作业】,作业名称为:CDC-transform-JdbcSink。

# 拖拽图元

依次拖拽数据源中的CDC组件、转换中的SQL组件、JsonPath组件、SQL组件、DefineSinkType组件、目标中的关系型数据库组件,依次连线。如下图所示:

cdcsource

# 配置组件属性

1、双击"CDC"组件,根据下图所示步骤依次配置。

cdcsource

debezium参数说明:

"debezium": {
    "key.converter.schemas.enable": false, //控制消息键(Key)中是否包含 Schema 信息
    "value.converter.schemas.enable": false //控制消息值(Value)中是否包含 Schema 信息
}

2、双击"SQL"组件,根据下图所示步骤依次配置。

SQL

3、双击"JsonPath"组件,根据下图所示步骤依次配置。

JsonPath

通过JsonPath解析上游value字段,提取属性,上游vaule字段的数据结构如下:


{
    "before": {
        "ordercode": "002",
        "amount": 1800,
        "price": "9000",
        "manufacturer": "天威科技qq",
        "createtime": "2018-12-12T16:00:00Z",
        "createuser": "魏雨",
        "updatetime": "2018-12-12T16:00:00Z"
    },
    "after": {
        "ordercode": "002",
        "amount": 1800,
        "price": "9000",
        "manufacturer": "天威科技22",
        "createtime": "2018-12-12T16:00:00Z",
        "createuser": "魏雨",
        "updatetime": "2018-12-12T16:00:00Z"
    },
    "source": {
        "version": "1.9.8.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1774943439000,
        "snapshot": "false",
        "db": "cdc_test",
        "sequence": null,
        "table": "orderinfo",
        "server_id": 1,
        "gtid": null,
        "file": "binlog.000063",
        "pos": 64097,
        "row": 0,
        "thread": 288455,
        "query": null
    },
    "op": "u",
    "ts_ms": 1774943439315,
    "transaction": null
}

字段含义:


before: 表示更新前的行数据。

after: 表示更新后的行数据。

source: 表示这个变更事件的源(数据库)信息。

version: Debezium连接器的版本

connector: 连接器类型

name: 连接器的逻辑名称,这里是"mysql_binlog_source",对应配置中的database.server.name

ts_ms: 数据库提交该变更的时间戳(毫秒)

snapshot: 是否快照,这里为"false",表示这不是在初始快照阶段

db: 数据库名,这里是"cdc_test"

sequence: 序列号

table: 表名,这里是"orderinfo"

server_id: MySQL服务器的ID

gtid: MySQL的全局事务标识符

file: binlog文件名,这里是"binlog.000063"

pos: 事件在binlog中的位置(偏移量),

row: 事件在事务中的行号(从0开始)

thread: 执行该事务的线程ID

query: 导致变更的SQL语句(默认Debezium不捕获SQL)

op: 操作类型,这里是"u",表示更新(UPDATE)。
其他可能值:
"c": 创建(INSERT)
"u": 更新(UPDATE)
"d": 删除(DELETE)
"r": 读取(快照读取)

ts_ms: Debezium处理该事件的时间戳(毫秒)
注意:这个时间可能略晚于数据库提交时间(source.ts_ms),因为网络传输和Debezium处理需要时间。

transaction: 事务信息。

实际应用中的注意事项

  • 时间戳处理:
    优先使用 source.ts_ms作为业务变更时间
    ts_ms是客户端捕获到此次变更的时间戳,source.ts_ms是源数据实际发生变更的时间戳
  • null 值处理:
    删除操作时 after为 null
    插入操作时 before为 null

4、双击"SQL"组件,根据下图所示步骤依次配置。

SQL

5、双击"DefineSinkType"组件,根据下图所示步骤依次配置。

DefineSinkType

6、双击"关系型数据库"组件,根据下图所示步骤依次配置。

JdbcSink

上游CDC Source选择数据输出格式为:compatible_debezium_json时,源表名默认为default

7、Ctrl+S保存该模型。

# 运行

点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。

run

# 查看数据

通过图中操作查看数据同步情况 源表数据 result

目标表数据 result

# 提交版本

当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。

提交后的版本,可以在作业调度中进行"定时"调度配置。

submit

← CDC同步关系型数据库示例 ClickHouse同步示例 →