HBase学习笔记-高级(3)-HBase Bulk Loading

官方文档链接:Apache HBase ™ Reference Guide-Bulk Loading

Bulk Load的简介

HBase中提供了一些导入数据的方式,前面我们也提到了,包括使用HBase Shell中的put命令,使用HBase中提供的MapReduce程序,使用Java提供的操作HBase中Table的API等。这些方式都需要与HBase连接,然后再及进行操作,在涉及海量数据存储的情况下,这些方式会给HBase的存储,计算以及网络资源造成较大消耗,并不是一种高效的方式。

Bulk Load就是可以用来解决上面提到的问题。Bulk Load的方式是使用一个MapReduce任务来将数据以HBase的内部结构StroeFile的形式直接输出到运行的集群上,之后再将这些文件与HBase建立联系。相比于与HBase直接连接的操作,Bulk Load可以绕过与HBase的交互,包括预写日志、写入MemStore以及溢写flush等操作,因此使用的CPU以及网络资源更少,效率更高,适合海量数据的加载。

Bulk Load 步骤

MapReduce Job完成数据准备

Bulk Load首先需要利用MapReduce Job将数据加载到HDFS文件系统中。

依赖说明:

依赖 功能
hbase-client HBase客户端
hbase-mapreduce HBase对MapReduce的支持
hadoop-common Hadoop通用包
hadoop-mapreduce-client-jobclient Hadoop MapReduce任务客户端
hadoop-mapreduce-client-core MapReduce客户端核心库
hadoop-hdfs HDFS相关操作
hadoop-auth Hadoop权限认证
commons-io 方便操作文件的Apache的工具类包

由于该任务中只需要对数据进行读取并且输出,因此只需要Map阶段即可。

Mapper程序编写

在HBase中提供了两个类来专门对MapReduce支持:

  1. ImmutableBytesWritable:对应Row Key
  2. MapReduceExtendedCell:对应key-value键值对

在Mapper中,设置out_key为ImmutableBytesWritable,out_value为MapReduceExtendedCell,Mapper读取文件,之后经过操作逻辑之后转为输出键值对。需要使用KeyValue类来构建单元格,每个需要写入到表中的字段都需要构建出单元格。

(下面的程序假设已经从输入键值对中获取到了Row Key以及需要写入的字段,均为String形式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class MyMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, MapReduceExtendedCell> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, MapReduceExtendedCell>.Context context) throws IOException, InterruptedException {
// 从in_key和in_value中获取到下面的内容,包括行键,列族名,列标识符,字段值
String rowKeyString = "00001";
String colFamilyString = "C1";
String colQualifier1 = "col1";
String colQualifier2 = "col2";
String value1 = "value1";
String value2 = "value2";

// 转化成bytes数组备用
byte[] rowKeyBytes = Bytes.toBytes(rowKeyString);
byte[] colFamilyBytes = Bytes.toBytes(colFamilyString);
byte[] colQualifier1Bytes = Bytes.toBytes(colQualifier1);
byte[] colQualifier2Bytes = Bytes.toBytes(colQualifier2);
byte[] value1Bytes = Bytes.toBytes(value1);
byte[] value2Bytes = Bytes.toBytes(value2);

// 利用行键构建构建out_key
ImmutableBytesWritable out_key_rowKey = new ImmutableBytesWritable(rowKeyBytes);

// 利用KeyValue类构建单元格,每个需要写入到表中的字段都需要构建单元格
KeyValue keyValue1 = new KeyValue(rowKeyBytes, colFamilyBytes, colQualifier1Bytes, value1Bytes);
KeyValue keyValue2 = new KeyValue(rowKeyBytes, colFamilyBytes, colQualifier2Bytes, value2Bytes);

// 利用键值对构建out_value
context.write(out_key_rowKey, new MapReduceExtendedCell(keyValue1));
context.write(out_key_rowKey, new MapReduceExtendedCell(keyValue2));
}
}

驱动类编写

与一般MapReduce程序驱动类不同之处在于需要获取利用HFileOutputFormat2来配置HFile输出

  1. 加载配置文件
  2. 创建HBase连接
  3. 获取Table对象
  4. 构建MapReduce Job
  5. MapReduce Job驱动类的固定配置
  6. 获取HBase Region的分布情况
  7. 配置HFile输出
  8. 提交MapReduce任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class MyDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 加载配置文件
Configuration conf = HBaseConfiguration.create();

// 2. 创建HBase连接
Connection connection = ConnectionFactory.createConnection(conf);

// 3. 获取Table对象
TableName tableName = TableName.valueOf("MY_TABLE");
Table my_table = connection.getTable(tableName);

// 4. 构建MapReduce Job
Job job = Job.getInstance(conf);

// 5. MapReduce Job驱动类的固定配置
job.setJarByClass(MyDriver.class);
job.setMapperClass(MyMapper.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(MapReduceExtendedCell.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(MapReduceExtendedCell.class);

FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop102:8020/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop102:8020/output"));

// 6. 获取HBase Region的分布情况
RegionLocator regionLocator = connection.getRegionLocator(tableName);

// 7. 配置HFile输出
HFileOutputFormat2.configureIncrementalLoad(job, my_table, regionLocator);

// 8. 提交MapReduce任务
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

数据加载到HBase

执行上面的MapReduce任务之后,就可以在集群中观察到上传的数据,之后需要将对应数据加载到HBase中,使用HBase中自带的工具即可

1
hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles /myData/output MY_NAMESPACE:MY_TABLE
  • 后面的两个参数分别为数据在HDFS文件系统中的路径、需要加载到的表名(命名空间: 表名)

HBase学习笔记-高级(3)-HBase Bulk Loading
http://example.com/2022/04/20/HBase学习笔记-高级-3-HBase-Bulk-Loading/
作者
EverNorif
发布于
2022年4月20日
许可协议