| English | 简体中文 |
This Flink Connector is based on the direct-load feature of OceanBase, enabling high-performance bulk data loading from Flink to OceanBase.
This connector is specifically designed for batch processing scenarios with the following characteristics:
For more details on OceanBase’s direct-load feature, see the direct-load document.
You can get the release packages at Releases Page or Maven Central.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase-directload</artifactId>
<version>${project.version}</version>
</dependency>
If you’d rather use the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase-directload</artifactId>
<version>${project.version}</version>
</dependency>
<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
You can also manually build it from the source code.
git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests
To use this connector through Flink SQL directly, you need to download the shaded jar file named flink-sql-connector-oceanbase-directload-${project.version}.jar:
Data sources must be bounded streams. The direct-load connector does not support unbounded streams.
Flink Batch mode is recommended for better performance:
Table API / Flink SQL:
SET 'execution.runtime-mode' = 'BATCH';
DataStream API:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
Note: While you can use bounded data sources in Streaming mode, Batch mode typically provides better performance and resource utilization.
Parallelism Adjustment: Supports multi-node parallel writing. Increase Flink task parallelism to improve throughput
SET 'parallelism.default' = '8'; -- Adjust based on data volume
Server-side Parallelism: Use the parallel parameter to configure CPU resources on OceanBase server for processing the import task
Create the destination table ‘t_sink’ under the ‘test’ database of the OceanBase MySQL mode.
USE test;
CREATE TABLE `t_sink`
(
`id` int(10) NOT NULL,
`username` varchar(20) DEFAULT NULL,
`score` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
);
Put the JAR files of dependencies into the ‘lib’ directory of Flink, then create the destination table using Flink SQL through the SQL client.
-- Recommended to set BATCH mode for better performance
SET 'execution.runtime-mode' = 'BATCH';
-- Optional: Adjust parallelism based on data volume to improve throughput
SET 'parallelism.default' = '8';
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'schema-name' = 'test',
'table-name' = 't_sink',
'username' = 'root',
'tenant-name' = 'test',
'password' = 'password',
'parallel' = '8' -- OceanBase server-side parallelism
);
Insert records using Flink SQL:
INSERT INTO t_sink
VALUES (1, 'Tom', 99),
(2, 'Jerry', 88),
(3, 'Alice', 95);
Once executed, the records should have been written to OceanBase.
Note: During the execution of the INSERT statement (while direct-load is in progress), the target table t_sink will be locked. Only SELECT queries are allowed; INSERT/UPDATE/DELETE operations are not permitted.
| Option | Required by Table API | Required by DataStream | Default | Type | Description |
|---|---|---|---|---|---|
| host | Yes | Yes | String | Hostname used in direct-load. | |
| port | Yes | Yes | Integer | Rpc port number used in direct-load. | |
| username | Yes | Yes | String | The name of the database user, like 'root'. NOTE: Not the connection username like 'root@sys'. | |
| tenant-name | Yes | Yes | String | The tenant name. | |
| password | Yes | Yes | String | The password. | |
| schema-name | Yes | Yes | String | The schema name or database name. | |
| table-name | Yes | Yes | String | The table name. | |
| parallel | No | No | 8 | Integer | The parallel of the direct-load server. This parameter determines how much CPU resources the server uses to process this import task. |
| buffer-size | No | No | 1024 | Integer | The size of the buffer that is written to the OceanBase at one time. |
| max-error-rows | No | No | 0 | Long | Maximum tolerable number of error rows. |
| dup-action | No | No | REPLACE | String | Action when there is duplicated record of direct-load task. Can be STOP_ON_DUP, REPLACE or IGNORE. |
| timeout | No | No | 7d | Duration | The timeout for direct-load task. |
| heartbeat-timeout | No | No | 60s | Duration | Client heartbeat timeout in direct-load task. |
| heartbeat-interval | No | No | 10s | Duration | Client heartbeat interval in direct-load task. |
| direct-load.load-method | No | No | full | String | The direct-load load mode: full, inc, inc_replace.
|