flink-connector-oceanbase

English 简体中文

This is the Flink connector for OceanBase, which can be used to sink data to OceanBase via JDBC driver.

Getting Started

You can get the release packages at Releases Page or Maven Central.

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase</artifactId>
    <version>${project.version}</version>
</dependency>

If you’d rather use the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase</artifactId>
    <version>${project.version}</version>
</dependency>

<repositories>
    <repository>
        <id>sonatype-snapshots</id>
        <name>Sonatype Snapshot Repository</name>
        <url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

You can also manually build it from the source code.

git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests

SQL JAR

To use this connector through Flink SQL directly, you need to download the shaded jar file named flink-sql-connector-oceanbase-${project.version}.jar:

This project has built-in MySQL driver 8.0.28. For users of OceanBase EE who want to use OceanBase JDBC driver, they need to manually introduce the following dependencies:

Dependency Item Description
com.oceanbase:oceanbase-client:2.4.9 Used for connecting to OceanBase EE.

Demo

Preparation

Create the destination table ‘t_sink’ under the ‘test’ database of the OceanBase MySQL mode.

USE test;
CREATE TABLE `t_sink` (
  `id` int(10) NOT NULL,
  `username` varchar(20) DEFAULT NULL,
  `score` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
);

Put the JAR files of dependencies to the ‘lib’ directory of Flink, and then create the destination table with Flink SQL through the sql client.

CREATE TABLE t_sink
(
  id       INT,
  username VARCHAR,
  score    INT,
  PRIMARY KEY (id) NOT ENFORCED
) with (
  'connector' = 'oceanbase',
  'url' = 'jdbc:mysql://127.0.0.1:2881/test',
  'schema-name' = 'test',
  'table-name' = 't_sink',
  'username' = 'root@test#obcluster',
  'password' = 'pswd',
  'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
  'buffer-flush.interval' = '1s',
  'buffer-flush.buffer-size' = '5000',
  'max-retries' = '3');

Insert records by Flink SQL.

INSERT INTO t_sink
VALUES (1, 'Tom', 99),
       (2, 'Jerry', 88),
       (1, 'Tom', 89);

Once executed, the records should have been written to OceanBase.

For users of OceanBase EE, you need to specify the url and driver-class-name corresponding to the OceanBase JDBC driver.

CREATE TABLE t_sink
(
    id       INT,
    username VARCHAR,
    score    INT,
    PRIMARY KEY (id) NOT ENFORCED
) with (
    'connector' = 'oceanbase',
    'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS',
    'driver-class-name' = 'com.oceanbase.jdbc.Driver',
    'schema-name' = 'SYS',
    'table-name' = 'T_SINK',
    'username' = 'SYS@test#obcluster',
    'password' = 'pswd',
    'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
    'buffer-flush.interval' = '1s',
    'buffer-flush.buffer-size' = '5000',
    'max-retries' = '3'
    );

Configuration

Option Required by Table API Required by DataStream Default Type Description
url Yes Yes String JDBC url.
username Yes Yes String The username.
password Yes Yes String The password.
schema-name Yes Not supported String The schema name or database name.
table-name Yes Not supported String The table name.
driver-class-name No No com.mysql.cj.jdbc.Driver String The driver class name, use 'com.mysql.cj.jdbc.Driver' by default. If other value is set, you need to introduce the driver manually.
druid-properties No No String Druid connection pool properties, multiple values are separated by semicolons.
sync-write No No false Boolean Whether to write data synchronously, will not use buffer if it's set to 'true'.
buffer-flush.interval No No 1s Duration Buffer flush interval. Set '0' to disable scheduled flushing.
buffer-flush.buffer-size No No 1000 Integer Buffer size.
max-retries No No 3 Integer Max retry times on failure.
memstore-check.enabled No No true Boolean Whether enable memstore check.
memstore-check.threshold No No 0.9 Double Memstore usage threshold ratio relative to the limit value.
memstore-check.interval No No 30s Duration Memstore check interval.
partition.enabled No No false Boolean Whether to enable partition calculation and flush records by partitions. Only works when 'sync-write' and 'direct-load.enabled' are 'false'.
table.oracle-tenant-case-insensitive No No true Boolean By default, under the Oracle tenant, schema names and column names are case-insensitive.

References

https://issues.apache.org/jira/browse/FLINK-25569

https://github.com/apache/flink-connector-jdbc

https://github.com/oceanbase/obconnector-j