| English | 简体中文 |
This is a Flink connector for OBKV HBase that provides flat table structure and usage patterns. It writes data to OceanBase through obkv-hbase-client-java.
CREATE TABLE t_sink (
rowkey STRING,
family1 ROW <column1 STRING, column2 STRING>, -- Nested ROW structure
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase',
'schema-name' = 'test',
'table-name' = 'htable1',
...
);
CREATE TABLE t_sink (
rowkey STRING,
column1 STRING, -- Flat structure, direct column definitions
column2 STRING,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'columnFamily' = 'f', -- Column family specified in configuration
'schema-name' = 'test',
'table-name' = 'htable1',
...
);
You can find official releases on the Releases page or Maven Central.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-obkv-hbase2</artifactId>
<version>${project.version}</version>
</dependency>
For the latest snapshot versions, configure the Maven snapshot repository:
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-obkv-hbase2</artifactId>
<version>${project.version}</version>
</dependency>
<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
You can also build from source:
git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests
To use this connector directly with Flink SQL, download the fat JAR flink-sql-connector-obkv-hbase2-${project.version}.jar:
Create a table in OceanBase named htable1$f, where htable1 is the HBase table name and f is the column family.
use test;
CREATE TABLE `htable1$f`
(
`K` varbinary(1024) NOT NULL,
`Q` varbinary(256) NOT NULL,
`T` bigint(20) NOT NULL,
`V` varbinary(1048576) NOT NULL,
PRIMARY KEY (`K`, `Q`, `T`)
);
Place the required JAR files in Flink’s lib directory, then create the target table using SQL Client.
CREATE TABLE t_sink (
rowkey STRING,
column1 STRING,
column2 STRING,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster',
'schema-name' = 'test',
'table-name' = 'htable1',
'columnFamily' = 'f',
'username' = 'root@test#obcluster',
'password' = '654321',
'sys.username' = 'root',
'sys.password' = '123456'
);
-- Insert data
INSERT INTO t_sink VALUES ('row1', 'value1', 'value2');
INSERT INTO t_sink VALUES ('row2', 'value3', 'value4');
CREATE TABLE t_sink (
rowkey STRING,
column1 STRING,
column2 STRING,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'odp-mode' = 'true',
'odp-ip' = '127.0.0.1',
'odp-port' = '2885',
'schema-name' = 'test',
'table-name' = 'htable1',
'columnFamily' = 'f',
'username' = 'root@test#obcluster',
'password' = '654321'
);
The connector provides flexible partial column update capabilities, supporting different ways to control which columns to update:
CREATE TABLE t_sink (
rowkey STRING,
column1 STRING,
column2 STRING,
column3 STRING,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster',
'schema-name' = 'test',
'table-name' = 'htable1',
'columnFamily' = 'f',
'username' = 'root@test#obcluster',
'password' = '654321',
'sys.username' = 'root',
'sys.password' = '123456',
'ignoreNullWhenUpdate' = 'true', -- Skip NULL columns for partial updates
'excludeUpdateColumns' = 'column3' -- Permanently exclude column3 from updates
);
-- Only update column1, column2 and column3 remain unchanged
INSERT INTO t_sink (rowkey, column1) VALUES ('1', 'new_value');
Two update control methods:
ignoreNullWhenUpdate=true: Skip columns with NULL values, any NULL column will not be written to OBKV-HBaseexcludeUpdateColumns: Permanently exclude specified columns, these columns will never be updatedDynamic column mode allows you to specify column names at runtime, suitable for scenarios with unknown column names.
CREATE TABLE t_sink (
rowkey STRING,
columnKey STRING, -- Dynamic column name
columnValue STRING, -- Dynamic column value
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster',
'schema-name' = 'test',
'table-name' = 'htable1',
'columnFamily' = 'f',
'username' = 'root@test#obcluster',
'password' = '654321',
'sys.username' = 'root',
'sys.password' = '123456',
'dynamicColumnSink' = 'true' -- Enable dynamic column mode
);
-- Insert dynamic column data
INSERT INTO t_sink VALUES ('row1', 'dynamic_col_1', 'value1');
INSERT INTO t_sink VALUES ('row1', 'dynamic_col_2', 'value2');
Note: Timestamp columns are not written to HBase by default and are only used to control HBase version time. If you need to store the timestamp column as a data column, do not configure it in tsColumn or tsMap.
CREATE TABLE t_sink (
rowkey STRING,
ts_col BIGINT, -- Timestamp column (not written to HBase)
column1 STRING,
column2 STRING,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster',
'schema-name' = 'test',
'table-name' = 'htable1',
'columnFamily' = 'f',
'username' = 'root@test#obcluster',
'password' = '654321',
'sys.username' = 'root',
'sys.password' = '123456',
'tsColumn' = 'ts_col', -- Use ts_col as timestamp for all columns
'tsInMills' = 'true' -- Timestamp unit is milliseconds
);
-- Result: Only 2 columns in HBase: column1, column2 (ts_col is automatically excluded)
Note: Timestamp columns are not written to HBase by default and are only used to control HBase version time.
CREATE TABLE t_sink (
rowkey STRING,
ts_col1 BIGINT, -- Timestamp column 1 (not written to HBase)
ts_col2 BIGINT, -- Timestamp column 2 (not written to HBase)
column1 STRING,
column2 STRING,
column3 STRING,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster',
'schema-name' = 'test',
'table-name' = 'htable1',
'columnFamily' = 'f',
'username' = 'root@test#obcluster',
'password' = '654321',
'sys.username' = 'root',
'sys.password' = '123456',
'tsMap' = 'ts_col1:column1;ts_col1:column2;ts_col2:column3', -- column1 and column2 use ts_col1, column3 uses ts_col2
'tsInMills' = 'true'
);
-- Result: Only 3 columns in HBase: column1, column2, column3 (ts_col1 and ts_col2 are automatically excluded)
| Option Name | Required | Default | Type | Description |
|---|---|---|---|---|
| connector | Yes | String | Must be set to ‘obkv-hbase2’ to use this connector. | |
| schema-name | Yes | String | The database name in OceanBase. | |
| table-name | Yes | String | HBase table name (without column family suffix). | |
| username | Yes | String | Username. | |
| password | Yes | String | Password. | |
| odp-mode | No | false | Boolean | Whether to connect to OBKV via ODP. Set to ‘true’ to connect via ODP, otherwise connect via config url. |
| url | No | String | Cluster config url, can be queried by SHOW PARAMETERS LIKE 'obconfig_url'. Required when ‘odp-mode’ is ‘false’. |
|
| sys.username | No | String | Username of sys tenant. Required when ‘odp-mode’ is ‘false’. | |
| sys.password | No | String | Password of sys tenant. Required when ‘odp-mode’ is ‘false’. | |
| odp-ip | No | String | IP address of ODP. Required when ‘odp-mode’ is ‘true’. | |
| odp-port | No | 2885 | Integer | RPC port of ODP. Optional when ‘odp-mode’ is ‘true’. |
| hbase.properties | No | String | Properties to configure ‘obkv-hbase-client-java’, separated by semicolons. Format: ‘key1=value1;key2=value2’. | |
| columnFamily | No | f | String | HBase column family name. |
| rowkeyDelimiter | No | : | String | Delimiter for composite primary keys. |
| writePkValue | No | false | Boolean | Whether to write primary key values as column values in HBase. |
| bufferSize | No | 5000 | Integer | Buffer size for batch writing. |
| ignoreNullWhenUpdate | No | true | Boolean | Whether to ignore null values when updating. When set to ‘true’, columns with null values are skipped for partial updates; when set to ‘false’, null values are written to HBase. |
| ignoreDelete | No | false | Boolean | Whether to ignore delete operations. When set to ‘true’, delete operations are not executed. |
| excludeUpdateColumns | No | String | Column names to exclude from updates, separated by commas. These columns will not be updated. | |
| dynamicColumnSink | No | false | Boolean | Whether to enable dynamic column mode. When enabled, non-PK columns must be exactly 2 columns (columnKey and columnValue), both must be VARCHAR type. |
| tsColumn | No | String | Timestamp column name. When specified, the value of this column will be used as the timestamp for all columns. If this is set, ‘tsMap’ will be ignored. | |
| tsMap | No | String | Timestamp mapping configuration. Format: ‘tsColumn0:column0;tsColumn0:column1;tsColumn1:column2’, meaning column0 and column1 use tsColumn0’s value as timestamp, column2 uses tsColumn1’s value as timestamp. | |
| tsInMills | No | true | Boolean | Whether timestamp unit is milliseconds. When set to ‘false’, timestamp unit is seconds. |
This connector provides the following core features:
tsColumn and tsMap are set, tsColumn takes precedence.tableName$columnFamily format.Bytes.toBytes() for encoding, compatible with OBKV HBase Client SDK.