flink-connector-oceanbase

Flink Connector OceanBase

English 简体中文

本项目是一个 OceanBase 的 Flink Connector,可以在 Flink 中通过 JDBC 驱动将数据写入到 OceanBase。

开始上手

您可以在 Releases 页面 或者 Maven 中央仓库 找到正式的发布版本。

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase</artifactId>
    <version>${project.version}</version>
</dependency>

如果你想要使用最新的快照版本,可以通过配置 Maven 快照仓库来指定:

<dependency>
    <groupId>com.oceanbase</groupId>
    <artifactId>flink-connector-oceanbase</artifactId>
    <version>${project.version}</version>
</dependency>

<repositories>
    <repository>
        <id>sonatype-snapshots</id>
        <name>Sonatype Snapshot Repository</name>
        <url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

您也可以通过源码构建的方式获得程序包。

git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests

SQL JAR

要直接通过 Flink SQL 使用此连接器,您需要下载名为flink-sql-connector-oceanbase-${project.version}.jar的包含所有依赖的 jar 文件:

本项目内置了 MySQL 驱动 8.0.28,对于想使用 OceanBase JDBC 驱动的 OceanBase 数据库企业版的用户,需要手动引入以下依赖:

依赖名称 说明
com.oceanbase:oceanbase-client:2.4.9 用于连接到 OceanBase 数据库企业版。

示例

准备

在 OceanBase 数据库 MySQL 模式下的 test 库中创建目的表 t_sink。

USE test;
CREATE TABLE `t_sink`
(
  `id`       int(10) NOT NULL,
  `username` varchar(20) DEFAULT NULL,
  `score`    int(10)     DEFAULT NULL,
  PRIMARY KEY (`id`)
);

将需要用到的依赖的 JAR 文件放到 Flink 的 lib 目录下,之后通过 SQL Client 在 Flink 中创建目的表。

CREATE TABLE t_sink
(
    id       INT,
    username VARCHAR,
    score    INT,
    PRIMARY KEY (id) NOT ENFORCED
) with (
    'connector' = 'oceanbase',
    'url' = 'jdbc:mysql://127.0.0.1:2881/test',
    'schema-name' = 'test',
    'table-name' = 't_sink',
    'username' = 'root@test#obcluster',
    'password' = 'pswd',
    'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
    'buffer-flush.interval' = '1s',
    'buffer-flush.buffer-size' = '5000',
    'max-retries' = '3'
    );

插入测试数据

INSERT INTO t_sink
VALUES (1, 'Tom', 99),
       (2, 'Jerry', 88),
       (1, 'Tom', 89);

执行完成后,即可在 OceanBase 中检索验证。

对于 OceanBase 数据库企业版的用户,需要指定 OceanBase JDBC 驱动对应的 urldriver-class-name

CREATE TABLE t_sink
(
    id       INT,
    username VARCHAR,
    score    INT,
    PRIMARY KEY (id) NOT ENFORCED
) with (
    'connector' = 'oceanbase',
    'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS',
    'driver-class-name' = 'com.oceanbase.jdbc.Driver',
    'schema-name' = 'SYS',
    'table-name' = 'T_SINK',
    'username' = 'SYS@test#obcluster',
    'password' = 'pswd',
    'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
    'buffer-flush.interval' = '1s',
    'buffer-flush.buffer-size' = '5000',
    'max-retries' = '3'
    );

配置项

参数名 Table API 必需 DataStream 必需 默认值 类型 描述
url String 数据库的 JDBC url。
username String 连接用户名。
password String 连接密码。
schema-name 不支持 String 连接的 schema 名或 db 名。
table-name 不支持 String 表名。
driver-class-name com.mysql.cj.jdbc.Driver String 驱动类名,默认为 'com.mysql.cj.jdbc.Driver',如果设置了其他值,需要手动引入对应的依赖。
druid-properties String Druid 连接池属性,多个值用分号分隔。
sync-write false Boolean 是否开启同步写,设置为 true 时将不使用 buffer 直接写入数据库。
buffer-flush.interval 1s Duration 缓冲区刷新周期。设置为 '0' 时将关闭定期刷新。
buffer-flush.buffer-size 1000 Integer 缓冲区大小。
max-retries 3 Integer 失败重试次数。
memstore-check.enabled true Boolean 是否开启内存检查。
memstore-check.threshold 0.9 Double 内存使用的阈值相对最大限制值的比例。
memstore-check.interval 30s Duration 内存使用检查周期。
partition.enabled false Boolean 是否启用分区计算功能,按照分区来写数据。仅当 'sync-write' 和 'direct-load.enabled' 都为 false 时生效。
table.oracle-tenant-case-insensitive true Boolean 默认情况下,在 Oracle 租户下,Schema名和列名不区分大小写。

参考信息

https://issues.apache.org/jira/browse/FLINK-25569

https://github.com/apache/flink-connector-jdbc

https://github.com/oceanbase/obconnector-j