| English | 简体中文 |
This is the Flink connector for OceanBase, which can be used to read data from and write data to OceanBase via JDBC driver. It supports both MySQL and Oracle compatibility modes.
You can get the release packages at Releases Page or Maven Central.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase</artifactId>
<version>${project.version}</version>
</dependency>
If you’d rather use the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase</artifactId>
<version>${project.version}</version>
</dependency>
<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
You can also manually build it from the source code.
git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests
To use this connector through Flink SQL directly, you need to download the shaded jar file named flink-sql-connector-oceanbase-${project.version}.jar:
This project has built-in MySQL driver 8.0.28. For users of OceanBase EE who want to use OceanBase JDBC driver, they need to manually introduce the following dependencies:
| Dependency Item | Description |
|---|---|
| com.oceanbase:oceanbase-client:2.4.9 | Used for connecting to OceanBase EE. |
Create the source table ‘t_source’ under the ‘test’ database of the OceanBase MySQL mode and insert some data.
USE test;
CREATE TABLE `t_source` (
`id` int(10) NOT NULL,
`username` varchar(20) DEFAULT NULL,
`score` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
);
INSERT INTO t_source VALUES (1, 'Tom', 99), (2, 'Jerry', 88), (3, 'Alice', 95);
Put the JAR files of dependencies to the ‘lib’ directory of Flink, and then create the source table with Flink SQL through the sql client.
CREATE TABLE t_source
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = 'jdbc:mysql://127.0.0.1:2881/test',
'schema-name' = 'test',
'table-name' = 't_source',
'username' = 'root@test#obcluster',
'password' = 'pswd',
'compatible-mode' = 'MySQL',
'split-size' = '8192',
'fetch-size' = '1024'
);
Query data from the source table.
SELECT * FROM t_source;
For users of OceanBase EE with Oracle mode, you must use the OceanBase JDBC driver (MySQL driver is not supported for Oracle mode), and specify the url and compatible-mode.
CREATE TABLE t_source
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS',
'driver-class-name' = 'com.oceanbase.jdbc.Driver',
'schema-name' = 'SYS',
'table-name' = 'T_SOURCE',
'compatible-mode' = 'Oracle',
'username' = 'SYS@test#obcluster',
'password' = 'pswd',
'split-size' = '8192',
'fetch-size' = '1024'
);
Create the destination table ‘t_sink’ under the ‘test’ database of the OceanBase MySQL mode.
USE test;
CREATE TABLE `t_sink` (
`id` int(10) NOT NULL,
`username` varchar(20) DEFAULT NULL,
`score` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
);
Put the JAR files of dependencies to the ‘lib’ directory of Flink, and then create the destination table with Flink SQL through the sql client.
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'oceanbase',
'url' = 'jdbc:mysql://127.0.0.1:2881/test',
'schema-name' = 'test',
'table-name' = 't_sink',
'username' = 'root@test#obcluster',
'password' = 'pswd',
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3');
Insert records by Flink SQL.
INSERT INTO t_sink
VALUES (1, 'Tom', 99),
(2, 'Jerry', 88),
(1, 'Tom', 89);
Once executed, the records should have been written to OceanBase.
For users of OceanBase EE, you need to specify the url and driver-class-name corresponding to the OceanBase JDBC driver.
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'oceanbase',
'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS',
'driver-class-name' = 'com.oceanbase.jdbc.Driver',
'schema-name' = 'SYS',
'table-name' = 'T_SINK',
'username' = 'SYS@test#obcluster',
'password' = 'pswd',
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3'
);
| Option | Required by Table API | Required by DataStream | Default | Type | Description |
|---|---|---|---|---|---|
| url | Yes | Yes | String | JDBC url. | |
| username | Yes | Yes | String | The username. | |
| password | Yes | Yes | String | The password. | |
| schema-name | Yes | Yes | String | The schema name or database name. | |
| table-name | Yes | Yes | String | The table name. | |
| compatible-mode | No | No | MySQL | String | The compatible mode of OceanBase, can be 'MySQL' or 'Oracle'. |
| driver-class-name | No | No | com.mysql.cj.jdbc.Driver | String | The driver class name, use 'com.mysql.cj.jdbc.Driver' by default. If other value is set, you need to introduce the driver manually. |
| druid-properties | No | No | String | Druid connection pool properties, multiple values are separated by semicolons. | |
| table.oracle-tenant-case-insensitive | No | No | true | Boolean | By default, under the Oracle tenant, schema names and column names are case-insensitive. |
| Option | Required by Table API | Required by DataStream | Default | Type | Description |
|---|---|---|---|---|---|
| split-size | No | No | 8192 | Integer | The number of rows per split for parallel reading. |
| chunk-key-column | No | No | String | The column used for splitting chunks. In MySQL mode, defaults to the primary key column. In Oracle mode, defaults to ROWID. If not specified, the table will be split automatically based on the default strategy. The specified column must be a non-null column. NULL values in the chunk key column will be silently excluded from the read results. | |
| fetch-size | No | No | 1024 | Integer | The number of rows fetched per round trip for JDBC queries. |
| Option | Required by Table API | Required by DataStream | Default | Type | Description |
|---|---|---|---|---|---|
| sync-write | No | No | false | Boolean | Whether to write data synchronously, will not use buffer if it's set to 'true'. |
| buffer-flush.interval | No | No | 1s | Duration | Buffer flush interval. Set '0' to disable scheduled flushing. |
| buffer-flush.buffer-size | No | No | 1000 | Integer | Buffer size. |
| max-retries | No | No | 3 | Integer | Max retry times on failure. |
| memstore-check.enabled | No | No | true | Boolean | Whether enable memstore check. |
| memstore-check.threshold | No | No | 0.9 | Double | Memstore usage threshold ratio relative to the limit value. |
| memstore-check.interval | No | No | 30s | Duration | Memstore check interval. |
| partition.enabled | No | No | false | Boolean | Whether to enable partition calculation and flush records by partitions. Only works when 'sync-write' and 'direct-load.enabled' are 'false'. |
https://issues.apache.org/jira/browse/FLINK-25569