# CDC数据抽取示例
本示例主要演示从Mysql orderinfo表中实时读取数据输出COMPATIBLE_DEBEZIUM_JSON格式的数据,通过JsonPath组件解析json数据提取需要的字段(比如除了源表字段,还需额外添加operation_type(操作类型)、change_time(变更时间))、使用SQL函数TO_DATE(string, formatString)转换日期时间、FROM_UNIXTIME(unixtime, formatString,timeZone)转换时间戳,使用DefineSinkType组件定义接收字段的存储类型(对于开启自动建表时有效),最后将结果数据同步到目标表。
主要步骤如下:
# 环境准备说明
1、源表和目标表都必须定义主键;
2、目标表不存在时,模型运行时会自动创建表,表字段和存储类型与DefineSinkType组件中定义的一致;
3、目标表存在时,要确保源表与目标表的数据结构、字段类型一致。同时源表开启了 cdc 配置。
# 准备数据
创建表 orderinfo 作为源表,并给源表orderinfoINSERT 一些数据。
-- ---------------------------------------
-- Table structure for orderinfo
-- ---------------------------------------
DROP TABLE IF EXISTS `orderinfo`;
CREATE TABLE `orderinfo` (
`ordercode` varchar(32) NOT NULL,
`amount` int DEFAULT NULL,
`price` varchar(32) DEFAULT NULL,
`manufacturer` varchar(32) DEFAULT NULL,
`createtime` timestamp NULL DEFAULT NULL,
`createuser` varchar(32) DEFAULT NULL,
`updatetime` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`ordercode`)
);
-- ----------------------------
-- Records of orderinfo
-- ----------------------------
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('001', 2000, '5900', '天威科技11', '2024-12-13 00:00:00', '魏雨', '2025-12-13 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('002', 1800, '9000', '天威科技22', '2018-12-13 00:00:00', '魏雨', '2018-12-13 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('003', 1800, '5900', '天威科技', '2018-12-13 00:00:00', '魏雨', '2018-12-13 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('004', 1800, '5900', '天威科技', '2018-12-13 00:00:00', '魏雨1', '2018-12-13 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('005', 2000, '5000', '微云科技', '2018-12-13 00:00:00', '王林', '2019-12-01 00:20:11');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('006', 3000, '666', '西安金属', '2018-12-13 00:00:00', '王强', '2022-12-13 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('007', 2300, '11589', '星峰来了', '2018-12-13 00:00:00', '王强', '2025-01-01 00:00:00');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('008', 5100, '12909', '星云制造1', '2018-12-13 00:00:00', '魏雨', '2026-01-01 12:24:23');
INSERT INTO orderinfo
(ordercode, amount, price, manufacturer, createtime, createuser, updatetime)
VALUES('009', 1800, '5900', '天威科技66', '2018-12-12 16:00:00', '魏雨', '2018-12-12 16:00:00');
# 新建同步作业
点击数据同步上的【...】,选择弹出菜单【新建数据同步作业】,作业名称为:CDC-transform-JdbcSink。
# 拖拽图元
依次拖拽数据源中的CDC组件、转换中的SQL组件、JsonPath组件、SQL组件、DefineSinkType组件、目标中的关系型数据库组件,依次连线。如下图所示:

# 配置组件属性
1、双击"CDC"组件,根据下图所示步骤依次配置。

debezium参数说明:
"debezium": {
"key.converter.schemas.enable": false, //控制消息键(Key)中是否包含 Schema 信息
"value.converter.schemas.enable": false //控制消息值(Value)中是否包含 Schema 信息
}
2、双击"SQL"组件,根据下图所示步骤依次配置。

3、双击"JsonPath"组件,根据下图所示步骤依次配置。

通过JsonPath解析上游value字段,提取属性,上游vaule字段的数据结构如下:
{
"before": {
"ordercode": "002",
"amount": 1800,
"price": "9000",
"manufacturer": "天威科技qq",
"createtime": "2018-12-12T16:00:00Z",
"createuser": "魏雨",
"updatetime": "2018-12-12T16:00:00Z"
},
"after": {
"ordercode": "002",
"amount": 1800,
"price": "9000",
"manufacturer": "天威科技22",
"createtime": "2018-12-12T16:00:00Z",
"createuser": "魏雨",
"updatetime": "2018-12-12T16:00:00Z"
},
"source": {
"version": "1.9.8.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1774943439000,
"snapshot": "false",
"db": "cdc_test",
"sequence": null,
"table": "orderinfo",
"server_id": 1,
"gtid": null,
"file": "binlog.000063",
"pos": 64097,
"row": 0,
"thread": 288455,
"query": null
},
"op": "u",
"ts_ms": 1774943439315,
"transaction": null
}
字段含义:
before: 表示更新前的行数据。
after: 表示更新后的行数据。
source: 表示这个变更事件的源(数据库)信息。
version: Debezium连接器的版本
connector: 连接器类型
name: 连接器的逻辑名称,这里是"mysql_binlog_source",对应配置中的database.server.name
ts_ms: 数据库提交该变更的时间戳(毫秒)
snapshot: 是否快照,这里为"false",表示这不是在初始快照阶段
db: 数据库名,这里是"cdc_test"
sequence: 序列号
table: 表名,这里是"orderinfo"
server_id: MySQL服务器的ID
gtid: MySQL的全局事务标识符
file: binlog文件名,这里是"binlog.000063"
pos: 事件在binlog中的位置(偏移量),
row: 事件在事务中的行号(从0开始)
thread: 执行该事务的线程ID
query: 导致变更的SQL语句(默认Debezium不捕获SQL)
op: 操作类型,这里是"u",表示更新(UPDATE)。
其他可能值:
"c": 创建(INSERT)
"u": 更新(UPDATE)
"d": 删除(DELETE)
"r": 读取(快照读取)
ts_ms: Debezium处理该事件的时间戳(毫秒)
注意:这个时间可能略晚于数据库提交时间(source.ts_ms),因为网络传输和Debezium处理需要时间。
transaction: 事务信息。
实际应用中的注意事项
- 时间戳处理:
优先使用 source.ts_ms作为业务变更时间
ts_ms是客户端捕获到此次变更的时间戳,source.ts_ms是源数据实际发生变更的时间戳 - null 值处理:
删除操作时 after为 null
插入操作时 before为 null
4、双击"SQL"组件,根据下图所示步骤依次配置。

5、双击"DefineSinkType"组件,根据下图所示步骤依次配置。

6、双击"关系型数据库"组件,根据下图所示步骤依次配置。

上游CDC Source选择数据输出格式为:compatible_debezium_json时,源表名默认为default
7、Ctrl+S保存该模型。
# 运行
点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。

# 查看数据
通过图中操作查看数据同步情况
源表数据

目标表数据

# 提交版本
当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。
提交后的版本,可以在作业调度中进行"定时"调度配置。
