flink-connector-oceanbase

Flink Connector OceanBase Direct Load

English 简体中文

This Flink Connector is based on the direct-load feature of OceanBase, enabling high-performance bulk data loading from Flink to OceanBase.

⚠️ Important Notes

This connector is specifically designed for batch processing scenarios with the following characteristics:

For more details on OceanBase’s direct-load feature, see the direct-load document.

Getting Started

You can get the release packages at Releases Page or Maven Central.

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase-directload</artifactId>
    <version>${project.version}</version>
</dependency>

If you’d rather use the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase-directload</artifactId>
    <version>${project.version}</version>
</dependency>

<repositories>
    <repository>
        <id>sonatype-snapshots</id>
        <name>Sonatype Snapshot Repository</name>
        <url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

You can also manually build it from the source code.

git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests

SQL JAR

To use this connector through Flink SQL directly, you need to download the shaded jar file named flink-sql-connector-oceanbase-directload-${project.version}.jar:

Prerequisites

Data sources must be bounded streams. The direct-load connector does not support unbounded streams.

Flink Batch mode is recommended for better performance:

Note: While you can use bounded data sources in Streaming mode, Batch mode typically provides better performance and resource utilization.

Performance Tuning

Demo

Preparation

Create the destination table ‘t_sink’ under the ‘test’ database of the OceanBase MySQL mode.

USE test;
CREATE TABLE `t_sink`
(
  `id`       int(10) NOT NULL,
  `username` varchar(20) DEFAULT NULL,
  `score`    int(10)     DEFAULT NULL,
  PRIMARY KEY (`id`)
);

Put the JAR files of dependencies into the ‘lib’ directory of Flink, then create the destination table using Flink SQL through the SQL client.

-- Recommended to set BATCH mode for better performance
SET 'execution.runtime-mode' = 'BATCH';

-- Optional: Adjust parallelism based on data volume to improve throughput
SET 'parallelism.default' = '8';

CREATE TABLE t_sink
(
    id       INT,
    username VARCHAR,
    score    INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'oceanbase-directload',
    'host' = '127.0.0.1',
    'port' = '2882',
    'schema-name' = 'test',
    'table-name' = 't_sink',
    'username' = 'root',
    'tenant-name' = 'test',
    'password' = 'password',
    'parallel' = '8'  -- OceanBase server-side parallelism
);

Insert records using Flink SQL:

INSERT INTO t_sink
VALUES (1, 'Tom', 99),
       (2, 'Jerry', 88),
       (3, 'Alice', 95);

Once executed, the records should have been written to OceanBase.

Note: During the execution of the INSERT statement (while direct-load is in progress), the target table t_sink will be locked. Only SELECT queries are allowed; INSERT/UPDATE/DELETE operations are not permitted.

Configuration

Option Required by Table API Required by DataStream Default Type Description
host Yes Yes String Hostname used in direct-load.
port Yes Yes Integer Rpc port number used in direct-load.
username Yes Yes String The name of the database user, like 'root'. NOTE: Not the connection username like 'root@sys'.
tenant-name Yes Yes String The tenant name.
password Yes Yes String The password.
schema-name Yes Yes String The schema name or database name.
table-name Yes Yes String The table name.
parallel No No 8 Integer The parallel of the direct-load server. This parameter determines how much CPU resources the server uses to process this import task.
buffer-size No No 1024 Integer The size of the buffer that is written to the OceanBase at one time.
max-error-rows No No 0 Long Maximum tolerable number of error rows.
dup-action No No REPLACE String Action when there is duplicated record of direct-load task. Can be STOP_ON_DUP, REPLACE or IGNORE.
timeout No No 7d Duration The timeout for direct-load task.
heartbeat-timeout No No 60s Duration Client heartbeat timeout in direct-load task.
heartbeat-interval No No 10s Duration Client heartbeat interval in direct-load task.
direct-load.load-method No No full String The direct-load load mode: full, inc, inc_replace.
  • full: full direct-load, default value.
  • inc: normal incremental direct-load, primary key conflict check will be performed, observer-4.3.2 and above support, direct-load.dup-action REPLACE is not supported for the time being.
  • inc_replace: special replace mode incremental direct-load, no primary key conflict check will be performed, directly overwrite the old data (equivalent to the effect of replace), direct-load.dup-action parameter will be ignored, observer-4.3.2 and above support.

References