MapReduce之Combiner组件

简述

Combiner的作用是把一个map产生的多个<KEY,VALUE>合并成一个新的<KEY,VALUE>,然后再将新<KEY,VALUE>的作为reduce的输入;

在map函数与reduce函数之间多了一个combine函数,目的是为了减少map输出的中间结果,这样减少了reduce复制map输出的数据,减少网络传输负载;

并不是所有情况下都能使用Combiner,Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景就不能使用Combiner了。如果可以使用Combiner,一般情况下,和我们的reduce函数是一致的。

什么时候运行Combiner?

1、当job设置了Combiner,并且spill的个数到min.num.spill.for.combine(默认是3)的时候,那么combiner就会Merge之前执行; 2、但是有的情况下,Merge开始执行,但spill文件的个数没有达到需求,这个时候Combiner可能会在Merge之后执行; 3、Combiner也有可能不运行,,Combiner会考虑当时集群的一个负载情况。如果集群负载量很大,会尽量提早执行完map,空出资源,所以,就不会去执行。

实例代码:package MyCombiner;import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public static String INPUT_PATH = “hdfs://master:8020/input”;private final static String OUTPUT_PATH = “hdfs://master:8020/output.txt”;public static <Text word = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] str = value.toString().split(“\\s+”);for (String string : str) {System.out.println(string);word.set(string);context.write(word, one);}}}public static <Text, IntWritable,Text, IntWritable>{private IntWritable result = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum+=val.get();}result.set(sum);context.write(key,result);}}public static void main(String[] args) throws Exception {//1、配置 Configuration conf = new Configuration();final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);if(fileSystem.exists(new Path(OUTPUT_PATH))){fileSystem.delete(new Path(OUTPUT_PATH),true);}Job job = Job.getInstance(conf, “word count”);//2、打包运行必须执行的方法job.setJarByClass(CombinerExp.class);//3、输入路径FileInputFormat.addInputPath(job, new Path(INPUT_PATH));//4、Mapjob.setMapperClass(MyMapper.class);//5、Combinerjob.setCombinerClass(MyReducer.class);//6、Reducer//job.setReducerClass(MyReducer.class);job.setNumReduceTasks(0);//reduce个数默认是1job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//7、 输出路径FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));//8、提交作业System.exit(job.waitForCompletion(true) ? 0 : 1);} }[root@master liguodong]# hdfs dfs -ls -R /input/-rw-r–r– :15 /input/input1-rw-r–r– :15 /input/input2当我们只有map和combine而没有reduce时,combine并不会执行。而输出的结果并没有被求和。[root@master liguodong]# hdfs dfs -ls -R /output/-rw-r–r– :17 /output/_SUCCESS-rw-r–r– :17 /output/part-m-00000-rw-r–r– :17 /output/part-m-00001[root@master liguodong]# hdfs dfs -cat /output/part-m-00000hello 1you1hello 1everyone1hello 1hadoop 1[root@master liguodong]# hdfs dfs -cat /output/part-m-00001hello 1you1hello 1me1hi1baby 1当我们把第79行注释取消,将80行注释的时候,将会执行combine函数。[main] INFO org.apache.hadoop.mapreduce.Job – Counters: 32File System Counters……Map-Reduce FrameworkMap input records=6Map output records=12……Input split bytes=192Combine input records=12Combine output records=9……Reduce input records=9Reduce output records=7Spilled Records=18……Virtual memory (bytes) snapshot=CountersBytes Read=65File Output Format CountersBytes Written=51[root@master hadoop]# hdfs dfs -ls -R /output/-rw-r–r– :41 /output/_SUCCESS-rw-r–r– :41 /output/part-r-00000[root@master hadoop]# hdfs dfs -cat /output/pa*baby 1everyone1hadoop 1hello 5hi1me1you2

孝敬父母、疼爱孩子、体贴爱人、善待朋友。

MapReduce之Combiner组件

相关文章:

你感兴趣的文章:

标签云: