flink-connector-oceanbase

Flink Connector OceanBase Direct Load

English 简体中文

This Flink Connector based on the direct-load feature of OceanBase. It can write data to OceanBase through direct-load in Flink.

For OceanBase’s direct-load feature, see the direct-load document.

Getting Started

You can get the release packages at Releases Page or Maven Central.

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase-directload</artifactId>
    <version>${project.version}</version>
</dependency>

If you’d rather use the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase-directload</artifactId>
    <version>${project.version}</version>
</dependency>

<repositories>
    <repository>
        <id>sonatype-snapshots</id>
        <name>Sonatype Snapshot Repository</name>
        <url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

You can also manually build it from the source code.

git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests

SQL JAR

To use this connector through Flink SQL directly, you need to download the shaded jar file named flink-sql-connector-oceanbase-directload-${project.version}.jar:

Instructions for use:

Demo

Preparation

Create the destination table ‘t_sink’ under the ‘test’ database of the OceanBase MySQL mode.

USE test;
CREATE TABLE `t_sink`
(
  `id`       int(10) NOT NULL,
  `username` varchar(20) DEFAULT NULL,
  `score`    int(10)     DEFAULT NULL,
  PRIMARY KEY (`id`)
);

single-node write

Put the JAR files of dependencies to the ‘lib’ directory of Flink, and then create the destination table with Flink SQL through the sql client.

SET 'execution.runtime-mode' = 'BATCH';

CREATE TABLE t_sink
(
    id       INT,
    username VARCHAR,
    score    INT,
    PRIMARY KEY (id) NOT ENFORCED
) with (
    'connector' = 'oceanbase-directload',
    'host' = '127.0.0.1',
    'port' = '2882',
    'schema-name' = 'test',
    'table-name' = 't_sink',
    'username' = 'root',
    'tenant-name' = 'test',
    'password' = 'password'
    );

Insert records by Flink SQL.

INSERT INTO t_sink
VALUES (1, 'Tom', 99),
       (2, 'Jerry', 88),
       (1, 'Tom', 89);

Once executed, the records should have been written to OceanBase.

Multi-node write

1. Create a direct-load task in the code and obtain the execution id
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.oceanbase.directload</groupId>
    <artifactId>multi-node-write-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.oceanbase</groupId>
            <artifactId>obkv-table-client</artifactId>
            <version>1.2.13</version>
        </dependency>
        <dependency>
          <groupId>com.alibaba.fastjson2</groupId>
          <artifactId>fastjson2</artifactId>
          <version>2.0.53</version>
        </dependency>
    </dependencies>
</project>

For code examples, see the complete sample code below.

Put the JAR files of dependencies to the ‘lib’ directory of Flink, and then create the destination table with Flink SQL through the sql client.

Note, set enable-multi-node-write to true and set execution-id to the execution id obtained in the above steps.

SET 'execution.runtime-mode' = 'BATCH';
SET 'parallelism.default' = '3';

CREATE TABLE t_sink
(
    id       INT,
    username VARCHAR,
    score    INT,
    PRIMARY KEY (id) NOT ENFORCED
) with (
    'connector' = 'oceanbase-directload',
    'host' = '127.0.0.1',
    'port' = '2882',
    'schema-name' = 'test',
    'table-name' = 't_sink',
    'username' = 'root',
    'tenant-name' = 'test',
    'password' = 'password',
    'enable-multi-node-write' = 'true',
    'execution-id' = '5cIeLwELBIWAxOAKAAAAwhY='
    );

Insert records by Flink SQL.

INSERT INTO t_sink
VALUES (1, 'Tom', 99),
       (2, 'Jerry', 88),
       (1, 'Tom', 89);

For code examples, see the complete sample code below.

Complete sample code

public class MultiNode {
    private static String host = "127.0.0.1";
    private static int port = 2882;

    private static String userName = "root";
    private static String tenantName = "test";
    private static String password = "password";
    private static String dbName = "test";
    private static String tableName = "t_sink";

    public static void main(String[] args) throws ObDirectLoadException, IOException, InterruptedException {
        // 1. Create a direct-load task and obtain the execution id.
        ObDirectLoadConnection connection = ObDirectLoadManager.getConnectionBuilder()
                        .setServerInfo(host, port)
                        .setLoginInfo(tenantName, userName, password, dbName)
                        .build();
        ObDirectLoadStatement statement = connection.getStatementBuilder()
                        .setTableName(tableName)
                        .build();
        statement.begin();
        ObDirectLoadStatementExecutionId statementExecutionId =
                statement.getExecutionId();
        byte[] executionIdBytes = statementExecutionId.encode();
        // Convert the execution id in byte[] form to string form so that it can be passed to the Flink-SQL job as a parameter.
        String executionId = java.util.Base64.getEncoder().encodeToString(executionIdBytes);
        System.out.println(executionId);

        // 2. After obtaining the executionId, submit the Flink SQL job.

        // 3. Enter the id of the Flink job submitted in the second step on the command line and wait for the Flink job to be completed.
        Scanner scanner = new Scanner((System.in));
        String flinkJobId = scanner.nextLine();

        while (true) {
            // Loop to check the running status of Flink jobs, see: https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/rest_api/
            JSONObject jsonObject = JSON.parseObject(new URL("http://localhost:8081/jobs/" + flinkJobId));
            String status = jsonObject.getString("state");
            if ("FINISHED".equals(status)) {
                break;
            }
            Thread.sleep(3_000);
        }

        // 4. After waiting for the Flink job execution to FINISHED, perform the final submission action of the direct-load task.
        statement.commit();

        statement.close();
        connection.close();
    }
}

Once executed, the records should have been written to OceanBase.

Configuration

Option Required by Table API Required by DataStream Default Type Description
host Yes Yes String Hostname used in direct-load.
port Yes Yes Integer Rpc port number used in direct-load.
username Yes Yes String The name of the database user, like 'root'. NOTE: Not the connection username like 'root@sys'.
tenant-name Yes Yes String The tenant name.
password Yes Yes String The password.
schema-name Yes Yes String The schema name or database name.
table-name Yes Yes String The table name.
parallel No No 8 Integer The parallel of the direct-load server. This parameter determines how much CPU resources the server uses to process this import task.
buffer-size No No 1024 Integer The size of the buffer that is written to the OceanBase at one time.
max-error-rows No No 0 Long Maximum tolerable number of error rows.
dup-action No No REPLACE String Action when there is duplicated record of direct-load task. Can be STOP_ON_DUP, REPLACE or IGNORE.
timeout No No 7d Duration The timeout for direct-load task.
heartbeat-timeout No No 60s Duration Client heartbeat timeout in direct-load task.
heartbeat-interval No No 10s Duration Client heartbeat interval in direct-load task.
direct-load.load-method No No full String The direct-load load mode: full, inc, inc_replace.
  • full: full direct-load, default value.
  • inc: normal incremental direct-load, primary key conflict check will be performed, observer-4.3.2 and above support, direct-load.dup-action REPLACE is not supported for the time being.
  • inc_replace: special replace mode incremental direct-load, no primary key conflict check will be performed, directly overwrite the old data (equivalent to the effect of replace), direct-load.dup-action parameter will be ignored, observer-4.3.2 and above support.
enable-multi-node-write No No false Boolean Whether to enable direct-load that supports multi-node writing. Not enabled by default.
execution-id No No String The execution id of the direct-load task. This parameter is only valid when the enable-multi-node-write parameter is true.

References