flink-connector-oceanbase

Flink Connector OceanBase Direct Load

English 简体中文

本项目是一个基于 OceanBase 旁路导入功能的 Flink Connector,可以在 Flink 中通过旁路导入的方式将数据高效写入到 OceanBase。

⚠️ 重要说明

本连接器专为批处理场景设计,具有以下特点:

关于 OceanBase 的旁路导入功能,见 旁路导入文档

开始上手

您可以在 Releases 页面 或者 Maven 中央仓库 找到正式的发布版本。

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase-directload</artifactId>
    <version>${project.version}</version>
</dependency>

如果你想要使用最新的快照版本,可以通过配置 Maven 快照仓库来指定:

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase-directload</artifactId>
    <version>${project.version}</version>
</dependency>

<repositories>
    <repository>
        <id>sonatype-snapshots</id>
        <name>Sonatype Snapshot Repository</name>
        <url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

您也可以通过源码构建的方式获得程序包。

git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests

SQL JAR

要直接通过 Flink SQL 使用此连接器,您需要下载名为flink-sql-connector-oceanbase-directload-${project.version}.jar的包含所有依赖的 jar 文件:

使用前提条件

数据源必须是有界流(Bounded Stream),旁路导入 Connector 不支持无界流。

推荐使用 Flink Batch 模式以获得更好的性能:

注意:虽然也可以在 Streaming 模式下使用有界数据源,但 Batch 模式通常能提供更好的性能和资源利用率。

性能调优

示例

准备

在 OceanBase 数据库 MySQL 模式下的 test 库中创建目的表 t_sink。

USE test;
CREATE TABLE `t_sink`
(
  `id`       int(10) NOT NULL,
  `username` varchar(20) DEFAULT NULL,
  `score`    int(10)     DEFAULT NULL,
  PRIMARY KEY (`id`)
);

将需要用到的依赖的 JAR 文件放到 Flink 的 lib 目录下,之后通过 SQL Client 在 Flink 中创建目的表。

-- 推荐设置为 BATCH 模式以获得更好性能
SET 'execution.runtime-mode' = 'BATCH';

-- 可选:根据数据量调整并行度以提高吞吐量
SET 'parallelism.default' = '8';

CREATE TABLE t_sink
(
    id       INT,
    username VARCHAR,
    score    INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'oceanbase-directload',
    'host' = '127.0.0.1',
    'port' = '2882',
    'schema-name' = 'test',
    'table-name' = 't_sink',
    'username' = 'root',
    'tenant-name' = 'test',
    'password' = 'password',
    'parallel' = '8'  -- OceanBase 服务端并行度
);

插入测试数据:

INSERT INTO t_sink
VALUES (1, 'Tom', 99),
       (2, 'Jerry', 88),
       (3, 'Alice', 95);

执行完成后,即可在 OceanBase 中查询验证。

注意:在 INSERT 语句执行期间(旁路导入进行中),目标表 t_sink 会被锁定,此时只能对该表执行查询操作,无法执行其他写入操作。

配置项

参数名 Table API 必需 DataStream 必需 默认值 类型 描述
host String OceanBase数据库的host地址。
port Integer 旁路导入使用的RPC端口。
username String 数据库用户名,比如 'root'。注意:而不是像 'root@sys' 格式的连接用户名。
tenant-name String 租户名。
password String 密码。
schema-name String schema名或DB名。
table-name String 表名。
parallel 8 Integer 旁路导入服务端的并发度。该参数决定了服务端使用多少cpu资源来处理本次导入任务。
buffer-size 1024 Integer 一次写入OceanBase的缓冲区大小。
max-error-rows 0 Long 旁路导入任务最大可容忍的错误行数目。
dup-action REPLACE String 旁路导入任务中主键重复时的处理策略。可以是 STOP_ON_DUP(本次导入失败),REPLACE(替换)或 IGNORE(忽略)。
timeout 7d Duration 旁路导入任务的超时时间。
heartbeat-timeout 60s Duration 旁路导入任务客户端的心跳超时时间。
heartbeat-interval 10s Duration 旁路导入任务客户端的心跳间隔时间。
direct-load.load-method full String 旁路导入导入模式:full, inc, inc_replace
  • full:全量旁路导入,默认值。
  • inc:普通增量旁路导入,会进行主键冲突检查,observer-4.3.2及以上支持,暂时不支持direct-load.dup-action为REPLACE。
  • inc_replace: 特殊replace模式的增量旁路导入,不会进行主键冲突检查,直接覆盖旧数据(相当于replace的效果),direct-load.dup-action参数会被忽略,observer-4.3.2及以上支持。

参考信息