flink-connector-oceanbase

English 简体中文

This is the Flink connector for OceanBase, which can be used to read data from and write data to OceanBase via JDBC driver. It supports both MySQL and Oracle compatibility modes.

Getting Started

You can get the release packages at Releases Page or Maven Central.

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase</artifactId>
    <version>${project.version}</version>
</dependency>

If you’d rather use the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase</artifactId>
    <version>${project.version}</version>
</dependency>

<repositories>
    <repository>
        <id>sonatype-snapshots</id>
        <name>Sonatype Snapshot Repository</name>
        <url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

You can also manually build it from the source code.

git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests

SQL JAR

To use this connector through Flink SQL directly, you need to download the shaded jar file named flink-sql-connector-oceanbase-${project.version}.jar:

This project has built-in MySQL driver 8.0.28. For users of OceanBase EE who want to use OceanBase JDBC driver, they need to manually introduce the following dependencies:

Dependency Item Description
com.oceanbase:oceanbase-client:2.4.9 Used for connecting to OceanBase EE.

Demo

Source

Preparation

Create the source table ‘t_source’ under the ‘test’ database of the OceanBase MySQL mode and insert some data.

USE test;
CREATE TABLE `t_source` (
  `id` int(10) NOT NULL,
  `username` varchar(20) DEFAULT NULL,
  `score` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
);
INSERT INTO t_source VALUES (1, 'Tom', 99), (2, 'Jerry', 88), (3, 'Alice', 95);

Put the JAR files of dependencies to the ‘lib’ directory of Flink, and then create the source table with Flink SQL through the sql client.

CREATE TABLE t_source
(
    id       INT,
    username VARCHAR,
    score    INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'oceanbase',
    'url' = 'jdbc:mysql://127.0.0.1:2881/test',
    'schema-name' = 'test',
    'table-name' = 't_source',
    'username' = 'root@test#obcluster',
    'password' = 'pswd',
    'compatible-mode' = 'MySQL',
    'split-size' = '8192',
    'fetch-size' = '1024'
);

Query data from the source table.

SELECT * FROM t_source;

For users of OceanBase EE with Oracle mode, you must use the OceanBase JDBC driver (MySQL driver is not supported for Oracle mode), and specify the url and compatible-mode.

CREATE TABLE t_source
(
    id       INT,
    username VARCHAR,
    score    INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'oceanbase',
    'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS',
    'driver-class-name' = 'com.oceanbase.jdbc.Driver',
    'schema-name' = 'SYS',
    'table-name' = 'T_SOURCE',
    'compatible-mode' = 'Oracle',
    'username' = 'SYS@test#obcluster',
    'password' = 'pswd',
    'split-size' = '8192',
    'fetch-size' = '1024'
);

Sink

Preparation

Create the destination table ‘t_sink’ under the ‘test’ database of the OceanBase MySQL mode.

USE test;
CREATE TABLE `t_sink` (
  `id` int(10) NOT NULL,
  `username` varchar(20) DEFAULT NULL,
  `score` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
);

Put the JAR files of dependencies to the ‘lib’ directory of Flink, and then create the destination table with Flink SQL through the sql client.

CREATE TABLE t_sink
(
  id       INT,
  username VARCHAR,
  score    INT,
  PRIMARY KEY (id) NOT ENFORCED
) with (
  'connector' = 'oceanbase',
  'url' = 'jdbc:mysql://127.0.0.1:2881/test',
  'schema-name' = 'test',
  'table-name' = 't_sink',
  'username' = 'root@test#obcluster',
  'password' = 'pswd',
  'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
  'buffer-flush.interval' = '1s',
  'buffer-flush.buffer-size' = '5000',
  'max-retries' = '3');

Insert records by Flink SQL.

INSERT INTO t_sink
VALUES (1, 'Tom', 99),
       (2, 'Jerry', 88),
       (1, 'Tom', 89);

Once executed, the records should have been written to OceanBase.

For users of OceanBase EE, you need to specify the url and driver-class-name corresponding to the OceanBase JDBC driver.

CREATE TABLE t_sink
(
    id       INT,
    username VARCHAR,
    score    INT,
    PRIMARY KEY (id) NOT ENFORCED
) with (
    'connector' = 'oceanbase',
    'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS',
    'driver-class-name' = 'com.oceanbase.jdbc.Driver',
    'schema-name' = 'SYS',
    'table-name' = 'T_SINK',
    'username' = 'SYS@test#obcluster',
    'password' = 'pswd',
    'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
    'buffer-flush.interval' = '1s',
    'buffer-flush.buffer-size' = '5000',
    'max-retries' = '3'
    );

Configuration

Common Options

Option Required by Table API Required by DataStream Default Type Description
url Yes Yes String JDBC url.
username Yes Yes String The username.
password Yes Yes String The password.
schema-name Yes Yes String The schema name or database name.
table-name Yes Yes String The table name.
compatible-mode No No MySQL String The compatible mode of OceanBase, can be 'MySQL' or 'Oracle'.
driver-class-name No No com.mysql.cj.jdbc.Driver String The driver class name, use 'com.mysql.cj.jdbc.Driver' by default. If other value is set, you need to introduce the driver manually.
druid-properties No No String Druid connection pool properties, multiple values are separated by semicolons.
table.oracle-tenant-case-insensitive No No true Boolean By default, under the Oracle tenant, schema names and column names are case-insensitive.

Source Options

Option Required by Table API Required by DataStream Default Type Description
split-size No No 8192 Integer The number of rows per split for parallel reading.
chunk-key-column No No String The column used for splitting chunks. In MySQL mode, defaults to the primary key column. In Oracle mode, defaults to ROWID. If not specified, the table will be split automatically based on the default strategy. The specified column must be a non-null column. NULL values in the chunk key column will be silently excluded from the read results.
fetch-size No No 1024 Integer The number of rows fetched per round trip for JDBC queries.

Sink Options

Option Required by Table API Required by DataStream Default Type Description
sync-write No No false Boolean Whether to write data synchronously, will not use buffer if it's set to 'true'.
buffer-flush.interval No No 1s Duration Buffer flush interval. Set '0' to disable scheduled flushing.
buffer-flush.buffer-size No No 1000 Integer Buffer size.
max-retries No No 3 Integer Max retry times on failure.
memstore-check.enabled No No true Boolean Whether enable memstore check.
memstore-check.threshold No No 0.9 Double Memstore usage threshold ratio relative to the limit value.
memstore-check.interval No No 30s Duration Memstore check interval.
partition.enabled No No false Boolean Whether to enable partition calculation and flush records by partitions. Only works when 'sync-write' and 'direct-load.enabled' are 'false'.

References

https://issues.apache.org/jira/browse/FLINK-25569

https://github.com/apache/flink-connector-jdbc

https://github.com/oceanbase/obconnector-j