English | 简体中文 |
This Flink Connector based on the direct-load feature of OceanBase. It can write data to OceanBase through direct-load in Flink.
For OceanBase’s direct-load feature, see the direct-load document.
You can get the release packages at Releases Page or Maven Central.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase-directload</artifactId>
<version>${project.version}</version>
</dependency>
If you’d rather use the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-oceanbase-directload</artifactId>
<version>${project.version}</version>
</dependency>
<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
You can also manually build it from the source code.
git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests
To use this connector through Flink SQL directly, you need to download the shaded jar file named flink-sql-connector-oceanbase-directload-${project.version}.jar
:
SET 'execution.runtime-mode' = 'BATCH';
DataStream API:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
Create the destination table ‘t_sink’ under the ‘test’ database of the OceanBase MySQL mode.
USE test;
CREATE TABLE `t_sink`
(
`id` int(10) NOT NULL,
`username` varchar(20) DEFAULT NULL,
`score` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
);
Put the JAR files of dependencies to the ‘lib’ directory of Flink, and then create the destination table with Flink SQL through the sql client.
SET 'execution.runtime-mode' = 'BATCH';
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'schema-name' = 'test',
'table-name' = 't_sink',
'username' = 'root',
'tenant-name' = 'test',
'password' = 'password'
);
Insert records by Flink SQL.
INSERT INTO t_sink
VALUES (1, 'Tom', 99),
(2, 'Jerry', 88),
(1, 'Tom', 89);
Once executed, the records should have been written to OceanBase.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.oceanbase.directload</groupId>
<artifactId>multi-node-write-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>obkv-table-client</artifactId>
<version>1.2.13</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.53</version>
</dependency>
</dependencies>
</project>
For code examples, see the complete sample code below.
Put the JAR files of dependencies to the ‘lib’ directory of Flink, and then create the destination table with Flink SQL through the sql client.
Note, set enable-multi-node-write
to true and set execution-id
to the execution id obtained in the above steps.
SET 'execution.runtime-mode' = 'BATCH';
SET 'parallelism.default' = '3';
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'schema-name' = 'test',
'table-name' = 't_sink',
'username' = 'root',
'tenant-name' = 'test',
'password' = 'password',
'enable-multi-node-write' = 'true',
'execution-id' = '5cIeLwELBIWAxOAKAAAAwhY='
);
Insert records by Flink SQL.
INSERT INTO t_sink
VALUES (1, 'Tom', 99),
(2, 'Jerry', 88),
(1, 'Tom', 89);
For code examples, see the complete sample code below.
public class MultiNode {
private static String host = "127.0.0.1";
private static int port = 2882;
private static String userName = "root";
private static String tenantName = "test";
private static String password = "password";
private static String dbName = "test";
private static String tableName = "t_sink";
public static void main(String[] args) throws ObDirectLoadException, IOException, InterruptedException {
// 1. Create a direct-load task and obtain the execution id.
ObDirectLoadConnection connection = ObDirectLoadManager.getConnectionBuilder()
.setServerInfo(host, port)
.setLoginInfo(tenantName, userName, password, dbName)
.build();
ObDirectLoadStatement statement = connection.getStatementBuilder()
.setTableName(tableName)
.build();
statement.begin();
ObDirectLoadStatementExecutionId statementExecutionId =
statement.getExecutionId();
byte[] executionIdBytes = statementExecutionId.encode();
// Convert the execution id in byte[] form to string form so that it can be passed to the Flink-SQL job as a parameter.
String executionId = java.util.Base64.getEncoder().encodeToString(executionIdBytes);
System.out.println(executionId);
// 2. After obtaining the executionId, submit the Flink SQL job.
// 3. Enter the id of the Flink job submitted in the second step on the command line and wait for the Flink job to be completed.
Scanner scanner = new Scanner((System.in));
String flinkJobId = scanner.nextLine();
while (true) {
// Loop to check the running status of Flink jobs, see: https://nightlies.apache.org/flink/flink-docs-master/zh/docs/ops/rest_api/
JSONObject jsonObject = JSON.parseObject(new URL("http://localhost:8081/jobs/" + flinkJobId));
String status = jsonObject.getString("state");
if ("FINISHED".equals(status)) {
break;
}
Thread.sleep(3_000);
}
// 4. After waiting for the Flink job execution to FINISHED, perform the final submission action of the direct-load task.
statement.commit();
statement.close();
connection.close();
}
}
Once executed, the records should have been written to OceanBase.
Option | Required by Table API | Required by DataStream | Default | Type | Description |
---|---|---|---|---|---|
host | Yes | Yes | String | Hostname used in direct-load. | |
port | Yes | Yes | Integer | Rpc port number used in direct-load. | |
username | Yes | Yes | String | The name of the database user, like 'root'. NOTE: Not the connection username like 'root@sys'. | |
tenant-name | Yes | Yes | String | The tenant name. | |
password | Yes | Yes | String | The password. | |
schema-name | Yes | Yes | String | The schema name or database name. | |
table-name | Yes | Yes | String | The table name. | |
parallel | No | No | 8 | Integer | The parallel of the direct-load server. This parameter determines how much CPU resources the server uses to process this import task. |
buffer-size | No | No | 1024 | Integer | The size of the buffer that is written to the OceanBase at one time. |
max-error-rows | No | No | 0 | Long | Maximum tolerable number of error rows. |
dup-action | No | No | REPLACE | String | Action when there is duplicated record of direct-load task. Can be STOP_ON_DUP , REPLACE or IGNORE . |
timeout | No | No | 7d | Duration | The timeout for direct-load task. |
heartbeat-timeout | No | No | 60s | Duration | Client heartbeat timeout in direct-load task. |
heartbeat-interval | No | No | 10s | Duration | Client heartbeat interval in direct-load task. |
direct-load.load-method | No | No | full | String | The direct-load load mode: full , inc , inc_replace .
|
enable-multi-node-write | No | No | false | Boolean | Whether to enable direct-load that supports multi-node writing. Not enabled by default. |
execution-id | No | No | String | The execution id of the direct-load task. This parameter is only valid when the enable-multi-node-write parameter is true. |