HBase学习笔记-入门(5)-HBase的Java API操作

使用Maven构建Java项目,需要引入hbase-client的依赖。

连接构建

要完成对应的操作,首先需要建立本地Java程序与集群的连接,而构建连接需要对应的配置文件。

在分布式环境下,客户端访问HBase需要通过ZooKeeper的地址和端口来获取当前活跃的Master和所需的RegionServer地址。配置文件中至少要指定上面的内容。

首先将配置文件放在项目中的resources文件夹中,需要的配置文件有HBase的hbase-site.xml和Hadoop的core-site.xml。可以使用HBaseConfiguration的单例方法获取配置。这个方法会去读取我们刚才resource文件夹中的配置文件。如果文件夹中没有放置对应的配置文件也没有关系,可以获取之后使用set方法进行设置(就像下面的情况)。获取配置之后,创建连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Configuration configuration;
private Connection connection;

@Before
public void init() throws IOException {
// 设置Zookeeper的地址和端口
configuration = HBaseConfiguration.create();
// 设置和hbase-site.xml中相同
configuration.set("hbase.zookeeper.quorum", "hadoop102, hadoop103, hadoop104");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
// 获取连接
connection = ConnectionFactory.createConnection(configuration);
}

@After
public void close() throws IOException {
connection.close();
}
  • 注意连接使用完毕之后需要关闭

在Java中对于HBase的操作可以分成两大类:对数据表的管理操作和对表中数据的增删改查操作。前者的操作需要获取到Admin对象,后者的操作需要获取到Table对象

数据表管理操作

对于数据表的管理等操作需要先得到Admin对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Configuration configuration;
private Connection connection;
private Admin admin;

@Before
public void init() throws IOException {
// 设置Zookeeper的地址和端口
configuration = HBaseConfiguration.create();
// 设置和hbase-site.xml中相同
configuration.set("hbase.zookeeper.quorum", "hadoop102, hadoop103, hadoop104");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
// 获取连接
connection = ConnectionFactory.createConnection(configuration);
// 获取HBaseAdmin对象
admin = connection.getAdmin();
}

@After
public void close() throws IOException {
admin.close();
connection.close();
}

创建表

  1. 调用tableExists判断表是否存在
  2. 创建表需要构建表描述器(TableDescriptor)、列族描述器(ColumnFamilyDescriptor)。这两个对象需要通过对应的描述器构建器使用build来创建
  3. 将列族描述器添加到表描述器中
  4. 使用admin.createTable创建表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void createTable() throws IOException {
// 1. 构建表名
TableName tableName = TableName.valueOf("MY_TABLE");

// 2.判断表是否存在
if (admin.tableExists(tableName)) {
System.out.println("表已经存在!");
return;
}

// 3. 构建表描述构建器
// TableDescriptor: 表描述器,描述这个表的相关配置,如列族等
// TableDescriptorBuilder: 表描述构建器,用来构建表描述器
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);

// 4. 构建列族描述构建器
// ColumnFamilyDescriptor: 列族描述器,描述列族相关信息,一个列族描述器对应一个列族
// ColumnFamilyDescriptorBuilder: 列族表述构造器,用来构建列族描述器
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("C1"));

// 5. 构建列族描述和表描述
ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build();
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
TableDescriptor tableDescriptor = tableDescriptorBuilder.build();

// 6. 创建表
admin.createTable(tableDescriptor);
}

由于在HBase中存储的都是byte [ ],所以会经常使用到一个工具类Bytes(hbase包下的Bytes工具类),可以使用这个工具类将字符串、long、double等类型转化成byte [ ]数组,也可以将byte [ ]数组转化成指定类型

删除表

直接调用API即可,注意也是需要先禁用,再删除

1
2
3
4
5
6
7
8
9
10
11
public void deleteTable() throws IOException {
TableName tableName = TableName.valueOf("MY_TABLE");
// 判断表是否存在,如果存在则删除,如果不存在则输出提示信息
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println("表" + tableName.toString() + "已经删除");
} else {
System.out.println("表" + tableName.toString() + "不存在");
}
}

增删改查操作

对于表的增删改查需要先得到Table对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Configuration configuration;
private Connection connection;

@Before
public void init() throws IOException {
// 设置Zookeeper的地址和端口
configuration = HBaseConfiguration.create();
// 设置和hbase-site.xml中相同
configuration.set("hbase.zookeeper.quorum", "hadoop102, hadoop103, hadoop104");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
// 获取连接
connection = ConnectionFactory.createConnection(configuration);
}

@After
public void close() throws IOException {
connection.close();
}

HBase的connection对象是一个重量级的对象,编写代码的时候需要避免重复创建,并且它是线程安全的。

而Table对象施一公轻量级的,使用完之后需要close,它是非线程安全的。

插入数据

  1. 使用HBase连接获取Table对象
  2. 构建RowKey,列族名和列名
  3. 构建put对象
  4. 添加对应列
  5. 使用Table对象执行put操作
  6. 关闭Table对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void put() throws IOException {
//1. 使用HBase连接获取Table对象
TableName tableName = TableName.valueOf("MY_TABLE");
Table table = connection.getTable(tableName);

//2. 构建RowKey,列族名和列名
String rowKey = "00001";
String cfName = "C1";
String colName1 = "Name";
String colName2 = "Age";

//3. 利用行键构建put对象
Put put = new Put(Bytes.toBytes(rowKey));

//4. 添加对应列
put.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(colName1), Bytes.toBytes("syh"));
put.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(colName2), Bytes.toBytes("21"));

//5. 使用Table对象执行put操作
table.put(put);

//6. 关闭Table对象
table.close();
}

通过rowkey获取数据

  1. 获取Table对象
  2. 使用rowKey构建get对象
  3. 执行get请求,得到Result对象(表示逻辑一行)
  4. 获取所有单元格
  5. 打印rowkey
  6. 迭代打印单元格列表
  7. 关闭表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void get() throws IOException {
// 1. 获取Table对象
TableName tableName = TableName.valueOf("MY_TABLE");
Table table = connection.getTable(tableName);

// 2. 使用rowKey构建Get对象
String rowKey = "00001";
Get get = new Get(Bytes.toBytes(rowKey));

// 3. 执行get请求
Result result = table.get(get);

// 4. 获取所有单元格
List<Cell> cells = result.listCells();

// 5. 打印rowKey
byte[] row = result.getRow();
System.out.println(Bytes.toString(row));

// 6. 迭代打印单元格列表
for (Cell cell : cells) {
// 将字符数组转化成字符串
// 获取列族的名称
String colFamily = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
// 获取列的名称
String colName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
// 获取值
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());

System.out.println(colFamily + ":" + colName + "->" + value);
}

// 7. 关闭表
table.close();
}

注意在循环内部,获取到的都是Array,需要使用offset和length来指定范围

获取所有数据

  1. 获取Table对象
  2. 构建scan请求对象
  3. 执行scan扫描请求,得到ResultScanner对象
  4. 循环ResultScanner对象,每次得到一个Result对象
  5. 对每个Result对象,即对每个逻辑行进行输出
  6. 手动关闭ResultScanner
  7. 关闭表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void scan() throws IOException {
// 1. 获取Table对象
TableName tableName = TableName.valueOf("MY_TABLE");
Table table = connection.getTable(tableName);

// 2. 构建scan请求对象
Scan scan = new Scan();

// 3. 执行scan扫描请求,得到ResultScanner对象
ResultScanner resultScanner = table.getScanner(scan);

// 4. 循环ResultScanner对象,每次得到一个Result对象
// 5. 对每个Result对象,即对每个逻辑行进行输出
for (Result result : resultScanner) {
// 对于每个result(逻辑行)内容的输出可以参考get的代码
List<Cell> cells = result.listCells();
byte[] row = result.getRow();
System.out.println(Bytes.toString(row));

for (Cell cell : cells) {
String colFamily = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
String colName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());

System.out.println(colFamily + ":" + colName + "->" + value);
}
}

// 6. 手动关闭ResultScanner
resultScanner.close();

// 7. 关闭表
table.close();
}

删除数据

  1. 获取Table对象
  2. 根据rowKey构建delete对象
  3. 执行delete请求
  4. 关闭表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void delete() throws IOException {
// 1. 获取Table对象
TableName tableName = TableName.valueOf("MY_TABLE");
Table table = connection.getTable(tableName);

// 2. 根据rowKey构建delete对象
String rowKey = "00001";
Delete delete = new Delete(Bytes.toBytes(rowKey));

// 3. 执行delete请求
table.delete(delete);

// 4. 关闭表
table.close();
}

过滤器使用

过滤器的使用可以结合scan,在构建scan对象之后,利用setFilter进行设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void scanWithFilter() throws IOException {
TableName tableName = TableName.valueOf("MY_TABLE");
Table table = connection.getTable(tableName);

Scan scan = new Scan();

// 构建两个过滤器
SingleColumnValueFilter startFilter = new SingleColumnValueFilter(
Bytes.toBytes("C1"), Bytes.toBytes("Age"), CompareOperator.GREATER_OR_EQUAL, Bytes.toBytes("19"));
SingleColumnValueFilter endFilter = new SingleColumnValueFilter(
Bytes.toBytes("C1"), Bytes.toBytes("Age"), CompareOperator.LESS_OR_EQUAL, Bytes.toBytes("22"));
// 综合多个过滤器,得到过滤器列表
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, startFilter, endFilter);
// 在Scan对象中设置使用过滤器
scan.setFilter(filterList);

ResultScanner resultScanner = table.getScanner(scan);

for (Result result : resultScanner) {
List<Cell> cells = result.listCells();
byte[] row = result.getRow();
System.out.println(Bytes.toString(row));

for (Cell cell : cells) {
String colFamily = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
String colName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());

System.out.println(colFamily + ":" + colName + "->" + value);
}
}

resultScanner.close();
table.close();
}

可能出现的错误

  1. 出现报错:java.lang.NoSuchMethodError: org.apache.hadoop.security.HadoopKerberosName.setRuleMechanism(Ljava/lang/String;)V

    缺少依赖:hadoop-auth

  2. 没有在本地机器的hosts中配置hadoop102、hadoop103和hadoop104的地址映射


HBase学习笔记-入门(5)-HBase的Java API操作
http://example.com/2022/04/17/HBase学习笔记-入门-5-HBase的Java-API操作/
作者
EverNorif
发布于
2022年4月17日
许可协议