| English | 简体中文 |
本项目是一个 OBKV HBase 的 Flink Connector,提供扁平表结构和使用方式,可以在 Flink 中通过 obkv-hbase-client-java 将数据写入到 OceanBase。
CREATE TABLE t_sink (
rowkey STRING,
family1 ROW <column1 STRING, column2 STRING>, -- 嵌套 ROW 结构
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase',
'schema-name' = 'test',
'table-name' = 'htable1',
...
);
CREATE TABLE t_sink (
rowkey STRING,
column1 STRING, -- 扁平结构,直接定义列
column2 STRING,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'columnFamily' = 'f', -- 通过配置指定列族
'schema-name' = 'test',
'table-name' = 'htable1',
...
);
您可以在 Releases 页面 或者 Maven 中央仓库 找到正式的发布版本。
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-obkv-hbase2</artifactId>
<version>${project.version}</version>
</dependency>
如果你想要使用最新的快照版本,可以通过配置 Maven 快照仓库来指定:
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-obkv-hbase2</artifactId>
<version>${project.version}</version>
</dependency>
<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
您也可以通过源码构建的方式获得程序包。
git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests
要直接通过 Flink SQL 使用此连接器,您需要下载名为 flink-sql-connector-obkv-hbase2-${project.version}.jar 的包含所有依赖的 jar 文件:
在 OceanBase 中创建一张表 htable1$f,该表的名称中,htable1 对应 HBase 的表名,f 对应 HBase 中的 column family。
use test;
CREATE TABLE `htable1$f`
(
`K` varbinary(1024) NOT NULL,
`Q` varbinary(256) NOT NULL,
`T` bigint(20) NOT NULL,
`V` varbinary(1048576) NOT NULL,
PRIMARY KEY (`K`, `Q`, `T`)
);
将需要用到的依赖的 JAR 文件放到 Flink 的 lib 目录下,之后通过 SQL Client 在 Flink 中创建目的表。
CREATE TABLE t_sink (
rowkey STRING,
column1 STRING,
column2 STRING,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster',
'schema-name' = 'test',
'table-name' = 'htable1',
'columnFamily' = 'f',
'username' = 'root@test#obcluster',
'password' = '654321',
'sys.username' = 'root',
'sys.password' = '123456'
);
-- 插入数据
INSERT INTO t_sink VALUES ('row1', 'value1', 'value2');
INSERT INTO t_sink VALUES ('row2', 'value3', 'value4');
CREATE TABLE t_sink (
rowkey STRING,
column1 STRING,
column2 STRING,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'odp-mode' = 'true',
'odp-ip' = '127.0.0.1',
'odp-port' = '2885',
'schema-name' = 'test',
'table-name' = 'htable1',
'columnFamily' = 'f',
'username' = 'root@test#obcluster',
'password' = '654321'
);
连接器提供了灵活的部分列更新能力,支持通过不同方式控制哪些列需要更新:
CREATE TABLE t_sink (
rowkey STRING,
column1 STRING,
column2 STRING,
column3 STRING,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster',
'schema-name' = 'test',
'table-name' = 'htable1',
'columnFamily' = 'f',
'username' = 'root@test#obcluster',
'password' = '654321',
'sys.username' = 'root',
'sys.password' = '123456',
'ignoreNullWhenUpdate' = 'true', -- 跳过 NULL 列,实现部分列更新
'excludeUpdateColumns' = 'column3' -- 永久排除 column3,不更新此列
);
-- 只更新 column1,column2 和 column3 保持不变
INSERT INTO t_sink (rowkey, column1) VALUES ('1', 'new_value');
两种更新控制方式:
ignoreNullWhenUpdate=true:跳过值为 NULL 的列,此时任何为 NULL 的列都不会写入到 OBKV-HBase 中excludeUpdateColumns:永久排除指定列,这些列永远不会被更新动态列模式允许你在运行时动态指定列名,适用于列名不固定的场景。
CREATE TABLE t_sink (
rowkey STRING,
columnKey STRING, -- 动态列名
columnValue STRING, -- 动态列值
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster',
'schema-name' = 'test',
'table-name' = 'htable1',
'columnFamily' = 'f',
'username' = 'root@test#obcluster',
'password' = '654321',
'sys.username' = 'root',
'sys.password' = '123456',
'dynamicColumnSink' = 'true' -- 启用动态列模式
);
-- 插入动态列数据
INSERT INTO t_sink VALUES ('row1', 'dynamic_col_1', 'value1');
INSERT INTO t_sink VALUES ('row1', 'dynamic_col_2', 'value2');
注意:时间戳列默认不会被写入 HBase,只用于控制 HBase 版本时间。如果需要将时间戳列也作为数据列存储,请不要在 tsColumn 或 tsMap 中配置该列。
CREATE TABLE t_sink (
rowkey STRING,
ts_col BIGINT, -- 时间戳列(不会被写入 HBase)
column1 STRING,
column2 STRING,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster',
'schema-name' = 'test',
'table-name' = 'htable1',
'columnFamily' = 'f',
'username' = 'root@test#obcluster',
'password' = '654321',
'sys.username' = 'root',
'sys.password' = '123456',
'tsColumn' = 'ts_col', -- 使用 ts_col 作为所有列的时间戳
'tsInMills' = 'true' -- 时间戳单位为毫秒
);
-- 结果:HBase 中只有 2 列:column1, column2(ts_col 自动被排除)
注意:时间戳列默认不会被写入 HBase,只用于控制 HBase 版本时间。
CREATE TABLE t_sink (
rowkey STRING,
ts_col1 BIGINT, -- 时间戳列1(不会被写入 HBase)
ts_col2 BIGINT, -- 时间戳列2(不会被写入 HBase)
column1 STRING,
column2 STRING,
column3 STRING,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase2',
'url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster',
'schema-name' = 'test',
'table-name' = 'htable1',
'columnFamily' = 'f',
'username' = 'root@test#obcluster',
'password' = '654321',
'sys.username' = 'root',
'sys.password' = '123456',
'tsMap' = 'ts_col1:column1;ts_col1:column2;ts_col2:column3', -- column1和column2使用ts_col1,column3使用ts_col2
'tsInMills' = 'true'
);
-- 结果:HBase 中只有 3 列:column1, column2, column3(ts_col1 和 ts_col2 自动被排除)
| 参数名 | 是否必需 | 默认值 | 类型 | 描述 |
|---|---|---|---|---|
| connector | 是 | String | 必须设置为 ‘obkv-hbase2’ 以使用此连接器。 | |
| schema-name | 是 | String | OceanBase 的 database 名。 | |
| table-name | 是 | String | HBase 表名(不含列族后缀)。 | |
| username | 是 | String | 用户名。 | |
| password | 是 | String | 密码。 | |
| odp-mode | 否 | false | Boolean | 是否通过 ODP 连接到 OBKV。设置为 ‘true’ 时通过 ODP 连接,否则通过 config url 直连。 |
| url | 否 | String | 集群的 config url,可以通过 SHOW PARAMETERS LIKE 'obconfig_url' 查询。当 ‘odp-mode’ 为 ‘false’ 时必填。 |
|
| sys.username | 否 | String | sys 租户的用户名,当 ‘odp-mode’ 为 ‘false’ 时必填。 | |
| sys.password | 否 | String | sys 租户用户的密码,当 ‘odp-mode’ 为 ‘false’ 时必填。 | |
| odp-ip | 否 | String | ODP 的 IP 地址,当 ‘odp-mode’ 为 ‘true’ 时必填。 | |
| odp-port | 否 | 2885 | Integer | ODP 的 RPC 端口,当 ‘odp-mode’ 为 ‘true’ 时可选。 |
| hbase.properties | 否 | String | 配置 ‘obkv-hbase-client-java’ 的属性,多个值用分号分隔,格式:’key1=value1;key2=value2’。 | |
| columnFamily | 否 | f | String | HBase 列族名称。 |
| rowkeyDelimiter | 否 | : | String | 复合主键的分隔符。 |
| writePkValue | 否 | false | Boolean | 是否将主键值也写入 HBase 列值中。 |
| bufferSize | 否 | 5000 | Integer | 批量写入的缓冲区大小。 |
| ignoreNullWhenUpdate | 否 | false | Boolean | 是否忽略空值更新。设置为 ‘true’ 时,跳过值为 null 的列,实现部分列更新;设置为 ‘false’ 时,会将 null 值写入 HBase。 |
| ignoreDelete | 否 | false | Boolean | 是否忽略删除操作。设置为 ‘true’ 时,不会执行删除操作。 |
| excludeUpdateColumns | 否 | String | 排除更新的列名,多个列用逗号分隔。这些列不会被更新。 | |
| dynamicColumnSink | 否 | false | Boolean | 是否启用动态列模式。启用后,非主键列必须恰好为 2 列(columnKey 和 columnValue),都必须是 VARCHAR 类型。 |
| tsColumn | 否 | String | 时间戳列名。指定后,该列的值将作为所有列的时间戳。如果设置了此项,’tsMap’ 将被忽略。 | |
| tsMap | 否 | String | 时间戳映射配置,格式:’tsColumn0:column0;tsColumn0:column1;tsColumn1:column2’,表示 column0 和 column1 使用 tsColumn0 的值作为时间戳,column2 使用 tsColumn1 的值作为时间戳。 | |
| tsInMills | 否 | true | Boolean | 时间戳的单位是否为毫秒。设置为 ‘false’ 时,时间戳单位为秒。 |
本连接器提供以下核心功能:
tsColumn 和 tsMap,tsColumn 优先生效。tableName$columnFamily 格式。Bytes.toBytes() 进行编码,与 OBKV HBase Client SDK 兼容。