flink-connector-oceanbase

Flink Connector OceanBase Direct Load

English 简体中文

本项目是一个基于 OceanBase 旁路导入功能的 Flink Connector,可以在 Flink 中通过旁路导入的方式将数据写入到 OceanBase。

关于 OceanBase 的旁路导入功能,见 旁路导入文档

开始上手

您可以在 Releases 页面 或者 Maven 中央仓库 找到正式的发布版本。

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase-directload</artifactId>
    <version>${project.version}</version>
</dependency>

如果你想要使用最新的快照版本,可以通过配置 Maven 快照仓库来指定:

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase-directload</artifactId>
    <version>${project.version}</version>
</dependency>

<repositories>
    <repository>
        <id>sonatype-snapshots</id>
        <name>Sonatype Snapshot Repository</name>
        <url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

您也可以通过源码构建的方式获得程序包。

git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests

SQL JAR

要直接通过 Flink SQL 使用此连接器,您需要下载名为flink-sql-connector-oceanbase-directload-${project.version}.jar的包含所有依赖的 jar 文件:

使用说明:

示例

准备

在 OceanBase 数据库 MySQL 模式下的 test 库中创建目的表 t_sink。

USE test;
CREATE TABLE `t_sink`
(
  `id`       int(10) NOT NULL,
  `username` varchar(20) DEFAULT NULL,
  `score`    int(10)     DEFAULT NULL,
  PRIMARY KEY (`id`)
);

单节点写入

将需要用到的依赖的 JAR 文件放到 Flink 的 lib 目录下,之后通过 SQL Client 在 Flink 中创建目的表。

SET 'execution.runtime-mode' = 'BATCH';

CREATE TABLE t_sink
(
    id       INT,
    username VARCHAR,
    score    INT,
    PRIMARY KEY (id) NOT ENFORCED
) with (
    'connector' = 'oceanbase-directload',
    'host' = '127.0.0.1',
    'port' = '2882',
    'schema-name' = 'test',
    'table-name' = 't_sink',
    'username' = 'root',
    'tenant-name' = 'test',
    'password' = 'password'
    );

插入测试数据

INSERT INTO t_sink
VALUES (1, 'Tom', 99),
       (2, 'Jerry', 88),
       (1, 'Tom', 89);

执行完成后,即可在 OceanBase 中查询验证。

多节点写入

1、代码中创建旁路导入任务,并获取execution id
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.oceanbase.directload</groupId>
    <artifactId>multi-node-write-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.oceanbase</groupId>
            <artifactId>obkv-table-client</artifactId>
            <version>1.2.13</version>
        </dependency>
        <dependency>
          <groupId>com.alibaba.fastjson2</groupId>
          <artifactId>fastjson2</artifactId>
          <version>2.0.53</version>
        </dependency>
    </dependencies>
</project>

代码示例见下面的完整示例代码。

2、在上述步骤中获取execution id后,提交Flink任务

将需要用到的依赖的 JAR 文件放到 Flink 的 lib 目录下,之后通过 SQL Client 在 Flink 中创建目的表。 注意,将enable-multi-node-write设为true,同时将execution-id设置为上述步骤获取到execution id。

SET 'execution.runtime-mode' = 'BATCH';
SET 'parallelism.default' = '3';

CREATE TABLE t_sink
(
    id       INT,
    username VARCHAR,
    score    INT,
    PRIMARY KEY (id) NOT ENFORCED
) with (
    'connector' = 'oceanbase-directload',
    'host' = '127.0.0.1',
    'port' = '2882',
    'schema-name' = 'test',
    'table-name' = 't_sink',
    'username' = 'root',
    'tenant-name' = 'test',
    'password' = 'password',
    'enable-multi-node-write' = 'true',
    'execution-id' = '5cIeLwELBIWAxOAKAAAAwhY='
    );

插入测试数据

INSERT INTO t_sink
VALUES (1, 'Tom', 99),
       (2, 'Jerry', 88),
       (1, 'Tom', 89);

3、等待上述提交的Flink任务执行完成,最后在代码中进行旁路导入任务最后的提交动作

代码示例见下面的完整示例代码。

完整的示例代码

public class MultiNode {
    private static String host = "127.0.0.1";
    private static int port = 2882;

    private static String userName = "root";
    private static String tenantName = "test";
    private static String password = "password";
    private static String dbName = "test";
    private static String tableName = "t_sink";

    public static void main(String[] args) throws ObDirectLoadException, IOException, InterruptedException {
        // 1、构建旁路导入任务,并获取execution id。
        ObDirectLoadConnection connection = ObDirectLoadManager.getConnectionBuilder()
                        .setServerInfo(host, port)
                        .setLoginInfo(tenantName, userName, password, dbName)
                        .build();
        ObDirectLoadStatement statement = connection.getStatementBuilder()
                        .setTableName(tableName)
                        .build();
        statement.begin();
        ObDirectLoadStatementExecutionId statementExecutionId =
                statement.getExecutionId();
        byte[] executionIdBytes = statementExecutionId.encode();
        // 将byte[]形式的execution id转换为字符串形式,方便作为参数传递给Flink-SQL作业。
        String executionId = java.util.Base64.getEncoder().encodeToString(executionIdBytes);
        System.out.println(executionId);

        // 2、获取到executionId后,提交Flink SQL作业。

        // 3、命令行输入第二歩提交的Flink作业的id,等待Flink作业完成。
        Scanner scanner = new Scanner((System.in));
        String flinkJobId = scanner.nextLine();

        while (true) {
            // 循环检测Flink作业的运行状态,见:https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/rest_api/
            JSONObject jsonObject = JSON.parseObject(new URL("http://localhost:8081/jobs/" + flinkJobId));
            String status = jsonObject.getString("state");
            if ("FINISHED".equals(status)) {
                break;
            }
            Thread.sleep(3_000);
        }

        // 4、等待Flink作业执行完成后,进行旁路导入任务的最后提交动作。
        statement.commit();

        statement.close();
        connection.close();
    }
}

执行完成后,即可在 OceanBase 中查询验证。

配置项

参数名 Table API 必需 DataStream 必需 默认值 类型 描述
host String OceanBase数据库的host地址。
port Integer 旁路导入使用的RPC端口。
username String 数据库用户名,比如 'root'。注意:而不是像 'root@sys' 格式的连接用户名。
tenant-name String 租户名。
password String 密码。
schema-name String schema名或DB名。
table-name String 表名。
parallel 8 Integer 旁路导入服务端的并发度。该参数决定了服务端使用多少cpu资源来处理本次导入任务。
buffer-size 1024 Integer 一次写入OceanBase的缓冲区大小。
max-error-rows 0 Long 旁路导入任务最大可容忍的错误行数目。
dup-action REPLACE String 旁路导入任务中主键重复时的处理策略。可以是 STOP_ON_DUP(本次导入失败),REPLACE(替换)或 IGNORE(忽略)。
timeout 7d Duration 旁路导入任务的超时时间。
heartbeat-timeout 60s Duration 旁路导入任务客户端的心跳超时时间。
heartbeat-interval 10s Duration 旁路导入任务客户端的心跳间隔时间。
direct-load.load-method full String 旁路导入导入模式:full, inc, inc_replace
  • full:全量旁路导入,默认值。
  • inc:普通增量旁路导入,会进行主键冲突检查,observer-4.3.2及以上支持,暂时不支持direct-load.dup-action为REPLACE。
  • inc_replace: 特殊replace模式的增量旁路导入,不会进行主键冲突检查,直接覆盖旧数据(相当于replace的效果),direct-load.dup-action参数会被忽略,observer-4.3.2及以上支持。
enable-multi-node-write false Boolean 是否启用支持多节点写入的旁路导入。默认不开启。
execution-id String 旁路导入任务的 execution id。仅当 enable-multi-node-write参数为true时生效。

参考信息