本文最后更新于:2022-04-20T11:22:06+08:00
官方文档链接:Apache HBase ™
Reference Guide-Bulk Loading
Bulk Load的简介
HBase中提供了一些导入数据的方式,前面我们也提到了,包括使用HBase
Shell中的put命令,使用HBase中提供的MapReduce程序,使用Java提供的操作HBase中Table的API等。这些方式都需要与HBase连接,然后再及进行操作,在涉及海量数据存储的情况下,这些方式会给HBase的存储,计算以及网络资源造成较大消耗,并不是一种高效的方式。
Bulk Load就是可以用来解决上面提到的问题。Bulk
Load的方式是使用一个MapReduce任务来将数据以HBase的内部结构StroeFile的形式直接输出到运行的集群上,之后再将这些文件与HBase建立联系。相比于与HBase直接连接的操作,Bulk
Load可以绕过与HBase的交互,包括预写日志、写入MemStore以及溢写flush等操作,因此使用的CPU以及网络资源更少,效率更高,适合海量数据的加载。
Bulk Load 步骤
MapReduce Job完成数据准备
Bulk Load首先需要利用MapReduce Job将数据加载到HDFS文件系统中。
依赖说明:
hbase-client |
HBase客户端 |
hbase-mapreduce |
HBase对MapReduce的支持 |
hadoop-common |
Hadoop通用包 |
hadoop-mapreduce-client-jobclient |
Hadoop MapReduce任务客户端 |
hadoop-mapreduce-client-core |
MapReduce客户端核心库 |
hadoop-hdfs |
HDFS相关操作 |
hadoop-auth |
Hadoop权限认证 |
commons-io |
方便操作文件的Apache的工具类包 |
由于该任务中只需要对数据进行读取并且输出,因此只需要Map阶段即可。
Mapper程序编写:
在HBase中提供了两个类来专门对MapReduce支持:
ImmutableBytesWritable
:对应Row Key
MapReduceExtendedCell
:对应key-value键值对
在Mapper中,设置out_key为ImmutableBytesWritable
,out_value为MapReduceExtendedCell
,Mapper读取文件,之后经过操作逻辑之后转为输出键值对。需要使用KeyValue类来构建单元格,每个需要写入到表中的字段都需要构建出单元格。
(下面的程序假设已经从输入键值对中获取到了Row
Key以及需要写入的字段,均为String形式)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class MyMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, MapReduceExtendedCell> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, MapReduceExtendedCell>.Context context) throws IOException, InterruptedException { String rowKeyString = "00001"; String colFamilyString = "C1"; String colQualifier1 = "col1"; String colQualifier2 = "col2"; String value1 = "value1"; String value2 = "value2";
byte[] rowKeyBytes = Bytes.toBytes(rowKeyString); byte[] colFamilyBytes = Bytes.toBytes(colFamilyString); byte[] colQualifier1Bytes = Bytes.toBytes(colQualifier1); byte[] colQualifier2Bytes = Bytes.toBytes(colQualifier2); byte[] value1Bytes = Bytes.toBytes(value1); byte[] value2Bytes = Bytes.toBytes(value2);
ImmutableBytesWritable out_key_rowKey = new ImmutableBytesWritable(rowKeyBytes);
KeyValue keyValue1 = new KeyValue(rowKeyBytes, colFamilyBytes, colQualifier1Bytes, value1Bytes); KeyValue keyValue2 = new KeyValue(rowKeyBytes, colFamilyBytes, colQualifier2Bytes, value2Bytes);
context.write(out_key_rowKey, new MapReduceExtendedCell(keyValue1)); context.write(out_key_rowKey, new MapReduceExtendedCell(keyValue2)); } }
|
驱动类编写:
与一般MapReduce程序驱动类不同之处在于需要获取利用HFileOutputFormat2
来配置HFile输出
- 加载配置文件
- 创建HBase连接
- 获取Table对象
- 构建MapReduce Job
- MapReduce Job驱动类的固定配置
- 获取HBase Region的分布情况
- 配置HFile输出
- 提交MapReduce任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class MyDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf("MY_TABLE"); Table my_table = connection.getTable(tableName);
Job job = Job.getInstance(conf);
job.setJarByClass(MyDriver.class); job.setMapperClass(MyMapper.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(MapReduceExtendedCell.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(MapReduceExtendedCell.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop102:8020/input")); FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop102:8020/output"));
RegionLocator regionLocator = connection.getRegionLocator(tableName);
HFileOutputFormat2.configureIncrementalLoad(job, my_table, regionLocator); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
|
数据加载到HBase
执行上面的MapReduce任务之后,就可以在集群中观察到上传的数据,之后需要将对应数据加载到HBase中,使用HBase中自带的工具即可
1
| hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles /myData/output MY_NAMESPACE:MY_TABLE
|
- 后面的两个参数分别为数据在HDFS文件系统中的路径、需要加载到的表名(命名空间:
表名)