flink-connector-oceanbase

Using Flink CDC as Source

English 简体中文

Dependencies

This project is based on the SQL Client JAR of Flink CDC Source Connector.

We do not provide Flink CDC Source Connector in the JAR package of this project, so you need to manually download the used Flink CDC SQL JAR. Note that this project requires Flink CDC to be 3.2.0 or later version.

If you’re using Flink Oracle CDC as source, you need also download the dependencies of the source connector, see the Dependencies chapter of Oracle CDC Connector.

Preparation

Add the CLI JAR flink-connector-oceanbase-cli-xxx.jar and dependency JAR flink-sql-connector-mysql-cdc-xxx.jar to $FLINK_HOME/lib.

Then prepare tables and data in MySQL database.

USE test_db;

CREATE TABLE products
(
  id          INTEGER      NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name        VARCHAR(255) NOT NULL DEFAULT 'flink',
  description VARCHAR(512),
  weight      FLOAT
);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default, "scooter", "Small 2-wheel scooter", 3.14),
       (default, "car battery", "12V car battery", 8.1),
       (default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", 0.8),
       (default, "hammer", "12oz carpenter's hammer", 0.75),
       (default, "hammer", "14oz carpenter's hammer", 0.875),
       (default, "hammer", "16oz carpenter's hammer", 1.0),
       (default, "rocks", "box of assorted rocks", 5.3),
       (default, "jacket", "water resistent black wind breaker", 0.1),
       (default, "spare tire", "24 inch spare tire", 22.2);

CREATE TABLE customers
(
  id         INTEGER      NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name  VARCHAR(255) NOT NULL,
  email      VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT = 1001;

INSERT INTO customers
VALUES (default, "Sally", "Thomas", "sally.thomas@acme.com"),
       (default, "George", "Bailey", "gbailey@foobar.com"),
       (default, "Edward", "Walker", "ed@walker.com"),
       (default, "Anne", "Kretchmar", "annek@noanswer.org");

Submit Job via CLI

Replace the following command with your real database information, and execute it to submit a Flink job.

$FLINK_HOME/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    lib/flink-connector-oceanbase-cli-xxx.jar \
    --source-type mysql-cdc \
    --source-conf hostname=xxxx \
    --source-conf port=3306 \
    --source-conf username=root \
    --source-conf password=xxxx \
    --source-conf database-name=test_db \
    --source-conf table-name=.* \
    --sink-conf username=xxxx \
    --sink-conf password=xxxx \
    --sink-conf url=jdbc:mysql://xxxx:xxxx \
    --database test_db \
    --including-tables ".*"

Check and Verify

Check the target OceanBase database, you should find out these two tables and rows data.

You can go on insert test data to MySQL database, since it is a CDC task, after data is inserted in MySQL, you can query and verify the synchronized data in OceanBase.

Options

Option Required Type Default Description
--source-type Yes Enumeration value Source type, can be mysql-cdc.
--source-conf Yes Multi-value parameter Configurations of the specific source.
--sink-conf Yes Multi-value parameter Configurations of OceanBase Sink.
--job-name No String ${source-type} Sync The Flink job name.
--database No String The target database in OceanBase, if not set, the source db name will be used.
--table-prefix No String The target table name prefix in OceanBase.
--table-suffix No String The target table name suffix in OceanBase.
--including-tables No String Table whitelist pattern.
--excluding-tables No String Table blacklist pattern.
--multi-to-one-origin No String The source table name patterns which mapping multiple source tables to one target table, multiple values are separated by |.
--multi-to-one-target No String The target table name that corresponds to --multi-to-one-origin, multiple values are separated by |, and the length should equal to --multi-to-one-origin.
--create-table-only No Boolean false Whether to only synchronize the structure of the table.
--ignore-default-value No Boolean false Whether to ignore default values.
--ignore-incompatible No Boolean false Whether to ignore incompatible data types.