Bulk Load-HBase数据导入最佳实践

一、概述

HBase本身提供了很多种数据导入的方式,通常有两种常用方式:

1、使用HBase提供的TableOutputFormat,原理是通过一个Mapreduce作业将数据导入HBase

2、另一种方式就是使用HBase原生Client API

这两种方式由于需要频繁的与数据所存储的RegionServer通信,一次性入库大量数据时,特别占用资源,所以都不是最有效的。了解过HBase底层原理的应该都知道,HBase在HDFS中是以HFile文件结构存储的,一个比较高效便捷的方法就是使用 “Bulk Loading”方法直接生成HFile,即HBase提供的HFileOutputFormat类。

二、Bulk Load基本原理

Bulk Load处理由两个主要步骤组成

1、准备数据文件

Bulk Load的第一步,会运行一个Mapreduce作业,其中使用到了HFileOutputFormat输出HBase数据文件:StoreFile。HFileOutputFormat的作用在于使得输出的HFile文件可以适应单个region,使用TotalOrderPartitioner类将map输出结果分区到各个不同的key区间中,每个key区间都对应着HBase表的region。

2、导入HBase表

第二步使用completebulkload工具将第一步的结果文件依次交给负责文件对应region的RegionServer,并将文件move到region在HDFS上的存储目录中,一旦完成,将数据开放给clients。

如果在bulk load准备导入或在准备导入与完成导入的临界点上发现region的边界已经改变,completebulkload工具会自动split数据文件到新的边界上,但是这个过程并不是最佳实践,所以用户在使用时需要最小化准备导入与导入集群间的延时,特别是当其他client在同时使用其他工具向同一张表导入数据。

注意:

bulk load的completebulkload步骤,就是简单的将importtsv或HFileOutputFormat的结果文件导入到某张表中,使用类似以下命令

hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable

命令会很快执行完成,将/user/todd/myoutput下的HFile文件导入到mytable表中。注意:如果目标表不存在,工具会自动创建表。

三、生成HFile程序说明:1、最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。2、最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。3、MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只适合一次对单列族组织成HFile文件。4、MR例子中HFileOutputFormat.configureIncrementalLoad(job, table);自动对job进行配置。SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。5、MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了。

四、示例

1、创建表

create 'hfiletable','fm1','fm2'

2、准备原始数据

key1fm1:col1value1key1fm1:col2value2key1fm2:col1value3key4fm1:col1value4

3、导入HBase MR

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FsShell;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;public class BulkLoadJob {static Logger logger = LoggerFactory.getLogger(BulkLoadJob.class);public static class BulkLoadMap extendsMapper<LongWritable, Text, ImmutableBytesWritable, Put> {public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] valueStrSplit = value.toString().split("\t");String hkey = valueStrSplit[0];String family = valueStrSplit[1].split(":")[0];String column = valueStrSplit[1].split(":")[1];String hvalue = valueStrSplit[2];final byte[] rowKey = Bytes.toBytes(hkey);final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey);Put HPut = new Put(rowKey);byte[] cell = Bytes.toBytes(hvalue);HPut.add(Bytes.toBytes(family), Bytes.toBytes(column), cell);context.write(HKey, HPut);}}public static void main(String[] args) throws Exception {Configuration conf = HBaseConfiguration.create();String inputPath = args[0];String outputPath = args[1];HTable hTable = null;try {Job job = Job.getInstance(conf, "ExampleRead");job.setJarByClass(BulkLoadJob.class);job.setMapperClass(BulkLoadJob.BulkLoadMap.class);job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(Put.class);// speculationjob.setSpeculativeExecution(false);job.setReduceSpeculativeExecution(false);// in/out formatjob.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(HFileOutputFormat2.class);FileInputFormat.setInputPaths(job, inputPath);FileOutputFormat.setOutputPath(job, new Path(outputPath));hTable = new HTable(conf, args[2]);HFileOutputFormat2.configureIncrementalLoad(job, hTable);if (job.waitForCompletion(true)) {FsShell shell = new FsShell(conf);try {shell.run(new String[]{"-chmod", "-R", "777", args[1]});} catch (Exception e) {logger.error("Couldnt change the file permissions ", e);throw new IOException(e);}//加载到hbase表LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);loader.doBulkLoad(new Path(outputPath), hTable);} else {logger.error("loading failed.");System.exit(1);}} catch (IllegalArgumentException e) {e.printStackTrace();} finally {if (hTable != null) {hTable.close();}}}}可是我知道结果是惨淡的,但还是心存希望!

Bulk Load-HBase数据导入最佳实践

相关文章:

你感兴趣的文章:

标签云: