| English | 简体中文 |
本项目是一个基于 OceanBase 旁路导入功能的 Flink Connector,可以在 Flink 中通过旁路导入的方式将数据高效写入到 OceanBase。
本连接器专为批处理场景设计,具有以下特点:
关于 OceanBase 的旁路导入功能,见 旁路导入文档。
您可以在 Releases 页面 或者 Maven 中央仓库 找到正式的发布版本。
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase-directload</artifactId>
<version>${project.version}</version>
</dependency>
如果你想要使用最新的快照版本,可以通过配置 Maven 快照仓库来指定:
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase-directload</artifactId>
<version>${project.version}</version>
</dependency>
<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
您也可以通过源码构建的方式获得程序包。
git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests
要直接通过 Flink SQL 使用此连接器,您需要下载名为flink-sql-connector-oceanbase-directload-${project.version}.jar的包含所有依赖的 jar 文件:
数据源必须是有界流(Bounded Stream),旁路导入 Connector 不支持无界流。
推荐使用 Flink Batch 模式以获得更好的性能:
Table API / Flink SQL:
SET 'execution.runtime-mode' = 'BATCH';
DataStream API:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
注意:虽然也可以在 Streaming 模式下使用有界数据源,但 Batch 模式通常能提供更好的性能和资源利用率。
并行度调整:支持多节点并行写入,可以通过调整 Flink Task 的并行度来提高写入吞吐量
SET 'parallelism.default' = '8'; -- 根据数据量调整并行度
服务端并行度:通过 parallel 参数配置 OceanBase 服务端处理导入任务的 CPU 资源
在 OceanBase 数据库 MySQL 模式下的 test 库中创建目的表 t_sink。
USE test;
CREATE TABLE `t_sink`
(
`id` int(10) NOT NULL,
`username` varchar(20) DEFAULT NULL,
`score` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
);
将需要用到的依赖的 JAR 文件放到 Flink 的 lib 目录下,之后通过 SQL Client 在 Flink 中创建目的表。
-- 推荐设置为 BATCH 模式以获得更好性能
SET 'execution.runtime-mode' = 'BATCH';
-- 可选:根据数据量调整并行度以提高吞吐量
SET 'parallelism.default' = '8';
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'schema-name' = 'test',
'table-name' = 't_sink',
'username' = 'root',
'tenant-name' = 'test',
'password' = 'password',
'parallel' = '8' -- OceanBase 服务端并行度
);
插入测试数据:
INSERT INTO t_sink
VALUES (1, 'Tom', 99),
(2, 'Jerry', 88),
(3, 'Alice', 95);
执行完成后,即可在 OceanBase 中查询验证。
注意:在 INSERT 语句执行期间(旁路导入进行中),目标表 t_sink 会被锁定,此时只能对该表执行查询操作,无法执行其他写入操作。
| 参数名 | Table API 必需 | DataStream 必需 | 默认值 | 类型 | 描述 |
|---|---|---|---|---|---|
| host | 是 | 是 | String | OceanBase数据库的host地址。 | |
| port | 是 | 是 | Integer | 旁路导入使用的RPC端口。 | |
| username | 是 | 是 | String | 数据库用户名,比如 'root'。注意:而不是像 'root@sys' 格式的连接用户名。 | |
| tenant-name | 是 | 是 | String | 租户名。 | |
| password | 是 | 是 | String | 密码。 | |
| schema-name | 是 | 是 | String | schema名或DB名。 | |
| table-name | 是 | 是 | String | 表名。 | |
| parallel | 否 | 否 | 8 | Integer | 旁路导入服务端的并发度。该参数决定了服务端使用多少cpu资源来处理本次导入任务。 |
| buffer-size | 否 | 否 | 1024 | Integer | 一次写入OceanBase的缓冲区大小。 |
| max-error-rows | 否 | 否 | 0 | Long | 旁路导入任务最大可容忍的错误行数目。 |
| dup-action | 否 | 否 | REPLACE | String | 旁路导入任务中主键重复时的处理策略。可以是 STOP_ON_DUP(本次导入失败),REPLACE(替换)或 IGNORE(忽略)。 |
| timeout | 否 | 否 | 7d | Duration | 旁路导入任务的超时时间。 |
| heartbeat-timeout | 否 | 否 | 60s | Duration | 旁路导入任务客户端的心跳超时时间。 |
| heartbeat-interval | 否 | 否 | 10s | Duration | 旁路导入任务客户端的心跳间隔时间。 |
| direct-load.load-method | 否 | 否 | full | String | 旁路导入导入模式:full, inc, inc_replace。
|