English | 简体中文 |
This project is based on the SQL Client JAR of Flink CDC Source Connector.
We do not provide Flink CDC Source Connector in the JAR package of this project, so you need to manually download the used Flink CDC SQL JAR. Note that this project requires Flink CDC to be 3.2.0 or later version.
If you’re using Flink Oracle CDC as source, you need also download the dependencies of the source connector, see the Dependencies chapter of Oracle CDC Connector.
Add the CLI JAR flink-connector-oceanbase-cli-xxx.jar
and dependency JAR flink-sql-connector-mysql-cdc-xxx.jar
to $FLINK_HOME/lib
.
Then prepare tables and data in MySQL database.
USE test_db;
CREATE TABLE products
(
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
description VARCHAR(512),
weight FLOAT
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default, "scooter", "Small 2-wheel scooter", 3.14),
(default, "car battery", "12V car battery", 8.1),
(default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8),
(default, "hammer", "12oz carpenter's hammer", 0.75),
(default, "hammer", "14oz carpenter's hammer", 0.875),
(default, "hammer", "16oz carpenter's hammer", 1.0),
(default, "rocks", "box of assorted rocks", 5.3),
(default, "jacket", "water resistent black wind breaker", 0.1),
(default, "spare tire", "24 inch spare tire", 22.2);
CREATE TABLE customers
(
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT = 1001;
INSERT INTO customers
VALUES (default, "Sally", "Thomas", "sally.thomas@acme.com"),
(default, "George", "Bailey", "gbailey@foobar.com"),
(default, "Edward", "Walker", "ed@walker.com"),
(default, "Anne", "Kretchmar", "annek@noanswer.org");
Replace the following command with your real database information, and execute it to submit a Flink job.
$FLINK_HOME/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
lib/flink-connector-oceanbase-cli-xxx.jar \
--source-type mysql-cdc \
--source-conf hostname=xxxx \
--source-conf port=3306 \
--source-conf username=root \
--source-conf password=xxxx \
--source-conf database-name=test_db \
--source-conf table-name=.* \
--sink-conf username=xxxx \
--sink-conf password=xxxx \
--sink-conf url=jdbc:mysql://xxxx:xxxx \
--database test_db \
--including-tables ".*"
Check the target OceanBase database, you should find out these two tables and rows data.
You can go on insert test data to MySQL database, since it is a CDC task, after data is inserted in MySQL, you can query and verify the synchronized data in OceanBase.
Option | Required | Type | Default | Description |
---|---|---|---|---|
--source-type | Yes | Enumeration value | Source type, can be mysql-cdc . |
|
--source-conf | Yes | Multi-value parameter | Configurations of the specific source. | |
--sink-conf | Yes | Multi-value parameter | Configurations of OceanBase Sink. | |
--job-name | No | String | ${source-type} Sync | The Flink job name. |
--database | No | String | The target database in OceanBase, if not set, the source db name will be used. | |
--table-prefix | No | String | The target table name prefix in OceanBase. | |
--table-suffix | No | String | The target table name suffix in OceanBase. | |
--including-tables | No | String | Table whitelist pattern. | |
--excluding-tables | No | String | Table blacklist pattern. | |
--multi-to-one-origin | No | String | The source table name patterns which mapping multiple source tables to one target table, multiple values are separated by | . |
|
--multi-to-one-target | No | String | The target table name that corresponds to --multi-to-one-origin , multiple values are separated by | , and the length should equal to --multi-to-one-origin . |
|
--create-table-only | No | Boolean | false | Whether to only synchronize the structure of the table. |
--ignore-default-value | No | Boolean | false | Whether to ignore default values. |
--ignore-incompatible | No | Boolean | false | Whether to ignore incompatible data types. |