HBASE结合MapReduce批量导入数据

转载请注明出处:

废话不多说,,直接上代码,你懂得

package hbase;import java.text.SimpleDateFormat;import java.util.Date;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;/** * HBASE结合MapReduce批量导入 * @author liuyazhuang */public class BatchImport {static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");Text v2 = new Text();protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {final String[] splited = value.toString().split("\t");try {final Date date = new Date(Long.parseLong(splited[0].trim()));final String dateFormat = dateformat1.format(date);String rowKey = splited[1]+":"+dateFormat;v2.set(rowKey+"\t"+value.toString());context.write(key, v2);} catch (NumberFormatException e) {final Counter counter = context.getCounter("BatchImport", "ErrorFormat");counter.increment(1L);System.out.println("出错了"+splited[0]+" "+e.getMessage());}};}static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{protected void reduce(LongWritable key, java.lang.Iterable<Text> values, Context context) throws java.io.IOException ,InterruptedException {for (Text text : values) {final String[] splited = text.toString().split("\t");final Put put = new Put(Bytes.toBytes(splited[0]));put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1]));put.add(Bytes.toBytes("cf"), Bytes.toBytes("msisdn"), Bytes.toBytes(splited[2]));//省略其他字段,调用put.add(….)即可context.write(NullWritable.get(), put);}};}public static void main(String[] args) throws Exception {final Configuration configuration = new Configuration();//设置zookeeperconfiguration.set("hbase.zookeeper.quorum", "hadoop0");//设置hbase表名称configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");//将该值改大,防止hbase超时退出configuration.set("dfs.socket.timeout", "180000");final Job job = new Job(configuration, "HBaseBatchImport");job.setMapperClass(BatchImportMapper.class);job.setReducerClass(BatchImportReducer.class);//设置map的输出,不设置reduce的输出类型job.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(Text.class);job.setInputFormatClass(TextInputFormat.class);//不再设置输出路径,而是设置输出格式类型job.setOutputFormatClass(TableOutputFormat.class);FileInputFormat.setInputPaths(job, "hdfs://hadoop0:9000/input");job.waitForCompletion(true);}}

仿佛一支飘荡在水上的华丽咏叹调。

HBASE结合MapReduce批量导入数据

相关文章:

你感兴趣的文章:

标签云: