核心编程MapReduce(下)

package job;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;/** * 一,数据去重的Map-Reduce实例 * * @author ljh * */public class Dedup {/* * 1,Map将输入中的value复制到输出数据的key上,并直接输出,value值无所谓,利用shuffle这个工程进行key相同汇总 */public static class Map extends Mapper<Object, Text, Text, Text> {// 定义line存储每行的数据private static Text line = new Text();// map函数直接将value复制给line,然后输出即可public void map(Object key, Text value, Context context)throws IOException, InterruptedException {line = value;context.write(line, new Text(""));}}/* * 2,reduce将输入的key复制到输出数据的key上,并直接输出 */public static class Reduce extends Reducer<Text, Text, Text, Text> {// reduce函数,利用shuffle处理好的,直接输出即可,比较简单public void reduce(Text key, Text values, Context context)throws IOException, InterruptedException {context.write(key, new Text(""));}}/* * 3,main方法 */public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 获取输入文件和输出文件的地址String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: in and out");System.exit(2);}Job job = new Job(conf, "Data deduplication");job.setJarByClass(Dedup.class);job.setMapperClass(Map.class);job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

package job;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;/** * 二,,排序的Map-Reduce实例:排序为一些数字 * @author ljh * */public class Sort {/* * 1,map将输入中的value化成IntWritable类型,作为输出的key */public static class Map extends Mapper<Object, Text, IntWritable, IntWritable>{//用来存储数据,由于是数字的排序,直接使用IntWritable即可private static IntWritable data=new IntWritable();public void map(Object key,Text value,Context context)throws IOException,InterruptedException{String line=value.toString();data.set(Integer.parseInt(line));context.write(data, new IntWritable(1));}}public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{//第一行的行号设置为1private static IntWritable linenum=new IntWritable(1);//已经自动排序好了,输出即可,这里我们来设置一下行号自动加1即可public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{for(IntWritable val:values){context.write(linenum, key);linenum=new IntWritable(linenum.get()+1);}}}/* * 3,main方法 */public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 获取输入文件和输出文件的地址String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: in and out");System.exit(2);}Job job = new Job(conf, "Data Sort");job.setJarByClass(Sort.class);job.setMapperClass(Map.class);//这里没有设置CombinerClass,CombinerClass是为了简述网络流量,为了是输出的数据只是必要的,例如上边的去重,先在本地进行去重,再进行传输整合,而这个例子进行的排序是没有必要的。//job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

package job;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;/** * 三,单表关联查询父子关系的Map-Reduce实例:父子关系的表数据 * * @author ljh * */public class STJoin {// 判断是否是首行,因为首行是标题:输入文件为child-parent,输出文件为grandchild-grandparentpublic static int time = 0;/* * 1,map将输入分割为child和parent,然后正序输出依次作为右表,反序依次输出作为左表,需要注意在value中加上左表和右表的标识 */public static class Map extends Mapper<Object, Text, Text, Text> {public void map(Object key, Text value, Context context)throws IOException, InterruptedException {String childName = new String();// 孩子的名称存储String parentName = new String();// 父亲的名称存储String relationType = new String();// 左表右表的标识1,表示左表,2表示右表String line = value.toString();// 输入的每一行转化为字符串int i = 0;//当检索line的第i个字符不为tab制表符使,进行++,也就是为了下边的切割while (line.charAt(i) != ”) {i++;}//将child和parent放到数组当中,利用上边找到的分割符String[] values = { line.substring(0, i), line.substring(i + 1) };//当不是第一行标题的时候if (values[0].compareTo("child") != 0) {//获取child和parentchildName = values[0];parentName = values[1];// 左表,输出,左右表的key是一对父子关系relationType = "1";context.write(new Text(values[1]), new Text(relationType + "+"+ childName + "+" + parentName));// 右表,输出relationType = "2";context.write(new Text(values[0]), new Text(relationType + "+"+ childName + "+" + parentName));}}}/* * 2,reduce:拿到map输出的左右表是一对负责关系表 * 左表:{tom,[{1,lucy,tom},{1,lili,tom}] * 右表:{tom,[{2,tom,hali},{2,tom,karry}] * 其实应该是合并好的:{tom,[{1,lucy,tom},{1,lili,tom},{2,tom,hali},{2,tom,karry}] * 所有,我们取tom的父亲,和tom的孩子,就得到了爷孙关系 */public static class Reduce extends Reducer<Text, Text, Text, Text> {public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {//如果是第一行,则输出表头信息if (time == 0) {context.write(new Text("grandchild"), new Text("grandparent"));time++;}int grandchildnum = 0;//表示孙子的个数String grandchild[] = new String[10];//存放孙子的数组int grandparentnum = 0;//表示爷爷的个数String grandparent[] = new String[10];//存放爷爷的数组Iterator ite = values.iterator();//每一个儿子对应的父亲列表//如果有父亲while (ite.hasNext()) {//取出父亲的集合String record = ite.next().toString();//父亲集合的长度int len = record.length();int i = 2;if (len == 0) continue;//结束本次循环char relationType = record.charAt(0);//取出是父表还是子表String childname = new String();String parentname = new String();//获取value-list中的value的childwhile (record.charAt(i) != ‘+’) {childname = childname + record.charAt(i);i++;}i = i + 1;// 获取value-list中的value的parentwhile (i < len) {parentname = parentname + record.charAt(i);i++;}//如果是父表,取出孩子,如果是子表,取出父亲,组成了爷孙关系if (relationType == ‘1’) {grandchild[grandchildnum] = childname;grandchildnum++;} else {grandparent[grandparentnum] = parentname;grandparentnum++;}}//爷孙求笛卡尔积,看有多少对组合if (grandparentnum != 0 && grandchildnum != 0) {for (int m = 0; m < grandchildnum; m++) {for (int n = 0; n < grandparentnum; n++) {context.write(new Text(grandchild[m]), new Text(grandparent[n]));}}}}}/* * 3,main方法 */public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 获取输入文件和输出文件的地址String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: in and out");System.exit(2);}Job job = new Job(conf, "single table join");job.setJarByClass(STJoin.class);job.setMapperClass(Map.class);// job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

版权声明:本文为博主原创文章,未经博主允许不得转载。

总有看腻的时候,不论何等荣华的身份,

核心编程MapReduce(下)

相关文章:

你感兴趣的文章:

标签云: