English | 简体中文 |
本项目基于 Flink CDC Source 连接器 的 SQL 客户端 JAR。
本项目的 JAR 包中未包含 Flink CDC Source 连接器,因此您需要手动下载使用的 Flink CDC SQL JAR。请注意,本项目要求 Flink CDC 为 3.2.0 或更高版本。
如果您使用 Flink Oracle CDC 作为源端,您还需要下载源连接器的依赖项,请参阅 Oracle CDC 连接器 的 依赖项 章节。
将 CLI JAR flink-connector-oceanbase-cli-xxx.jar
和依赖 JAR flink-sql-connector-mysql-cdc-xxx.jar
添加到 $FLINK_HOME/lib
。
然后在 MySQL 数据库中准备表和数据。
USE test_db;
CREATE TABLE products
(
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
description VARCHAR(512),
weight FLOAT
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default, "scooter", "Small 2-wheel scooter", 3.14),
(default, "car battery", "12V car battery", 8.1),
(default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8),
(default, "hammer", "12oz carpenter's hammer", 0.75),
(default, "hammer", "14oz carpenter's hammer", 0.875),
(default, "hammer", "16oz carpenter's hammer", 1.0),
(default, "rocks", "box of assorted rocks", 5.3),
(default, "jacket", "water resistent black wind breaker", 0.1),
(default, "spare tire", "24 inch spare tire", 22.2);
CREATE TABLE customers
(
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT = 1001;
INSERT INTO customers
VALUES (default, "Sally", "Thomas", "sally.thomas@acme.com"),
(default, "George", "Bailey", "gbailey@foobar.com"),
(default, "Edward", "Walker", "ed@walker.com"),
(default, "Anne", "Kretchmar", "annek@noanswer.org");
将以下命令替换为您的真实数据库信息,并执行它以提交 Flink 作业。
$FLINK_HOME/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
lib/flink-connector-oceanbase-cli-xxx.jar \
--source-type mysql-cdc \
--source-conf hostname=xxxx \
--source-conf port=3306 \
--source-conf username=root \
--source-conf password=xxxx \
--source-conf database-name=test_db \
--source-conf table-name=.* \
--sink-conf username=xxxx \
--sink-conf password=xxxx \
--sink-conf url=jdbc:mysql://xxxx:xxxx \
--database test_db \
--including-tables ".*"
请将以上的数据库信息替换为您真实的数据库信息,当出现类似于以下的信息时,任务构建成功并提交。
检查目标 OceanBase 数据库,你应该找到这两个表和多行数据。
你可以继续将测试数据插入到 MySQL 数据库,由于是CDC任务,MySQL中插入数据后,即可在 OceanBase 中查询验证同步过来的数据。
参数 | 是否必需 | 类型 | 默认值 | 说明 |
---|---|---|---|---|
--source-type | 是 | 枚举值 | 源端类型,可以是 mysql-cdc 。 |
|
--source-conf | 是 | 多值参数 | 指定类型的源端的配置参数。 | |
--sink-conf | 是 | 多值参数 | OceanBase 写连接器的配置参数。 | |
--job-name | 否 | String | ${source-type} Sync | Flink 任务名称。 |
--database | 否 | String | OceanBase 中目标 db 的名称,不设置时将使用源端的 db 名称。 | |
--table-prefix | 否 | String | OceanBase 中目标表名称的前缀。 | |
--table-suffix | 否 | String | OceanBase 中目标表名称的后缀。 | |
--including-tables | 否 | String | 源端表的白名单模式。 | |
--excluding-tables | 否 | String | 源端表的黑名单模式。 | |
--multi-to-one-origin | 否 | String | 将多个源表映射到一个目标表的源表名称模式,多个值以| 分隔。 |
|
--multi-to-one-target | 否 | String | --multi-to-one-origin 对应的目标表名,多个值之间以| 分隔,长度必须等于--multi-to-one-origin 。 |
|
--create-table-only | 否 | Boolean | false | 是否只同步库表结构。 |
--ignore-default-value | 否 | Boolean | false | 是否忽略默认值。 |
--ignore-incompatible | 否 | Boolean | false | 是否忽略不兼容的数据类型。 |