MapReduce实现Reduce端Join操作实例

使用案例:联接两张表Table EMP:(新建文件EMP,第一行属性名不要)NameSexAgeDepNozhangmale201lifemale 252wangfemale 303zhoumale352Table Dep:(新建文件DEP,,第一行属性名不要)DepNoDepName1Sales2Dev3MgtInner join:select Name,Sex,Age,DepName from EMP inner join DEP on EMP.DepNo=DEP.DepNoResult:NameSexAgeDepNamezhangmale20Saleslifemale 25Devwangfemale 30Mgtzhoumale35Dev

接下来使用MapReduce实进行Join操作。

Reduce端进行Join操作

reduce端联接比map端联接更普遍,因为输入的数据不需要特定的结构;效率低,因为所有数据必须经过shuffle过程,但是编写简单。 基本思路: 1、Map端读取所有文件,并在输出的内容里加上标识代表数据是从哪个文件里来的; 2、在reduce处理函数里,按照标识对数据进行保存; 3、然后根据Key的Join来求出结果直接输出;

package Join;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;{private String Name=””;private String Sex=””;private int Age=0;private int DepNo=0;private String DepName=””;private String table=””;public EmpJoinDep() {}public EmpJoinDep(EmpJoinDep empJoinDep) {this.Name = empJoinDep.getName();this.Sex = empJoinDep.getSex();this.Age = empJoinDep.getAge();this.DepNo = empJoinDep.getDepNo();this.DepName = empJoinDep.getDepName();this.table = empJoinDep.getTable();}public String getName() {return Name;}(String name) {Name = name;}public String getSex() {return Sex;}(String sex) {this.Sex = sex;}() {return Age;}(int age) {this.Age = age;}() {return DepNo;}(int depNo) {DepNo = depNo;}public String getDepName() {return DepName;}(String depName) {DepName = depName;}public String getTable() {return table;}(String table) {this.table = table;}(DataOutput out) throws IOException {out.writeUTF(Name);out.writeUTF(Sex);out.writeInt(Age);out.writeInt(DepNo);out.writeUTF(DepName);out.writeUTF(table);}(DataInput in) throws IOException {this.Name = in.readUTF();this.Sex = in.readUTF();this.Age = in.readInt();this.DepNo = in.readInt();this.DepName = in.readUTF();this.table = in.readUTF();}(Object o) {return 0;}@Overridepublic String toString() {return “EmpJoinDep [Name=” + Name + “, Sex=” + Sex + “, Age=” + Age+ “, DepName=” + DepName + “]”;}}package Join;import javaimport javaimport javaimport javaimport orgimport orgimport orgimport orgimport orgimport orgimport orgimport orgimport orgimport orgimport orgimport orgpublic class ReduceJoin {private final static String INPUT_PATH = “hdfs://liguodong:8020/inputjoin”;private final static String OUTPUT_PATH = “hdfs://liguodong:8020/outputmapjoin”;public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, EmpJoinDep>{private EmpJoinDep empJoinDep = new EmpJoinDep();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] values = value.toString().split(“\\s+”);if(values.length==4){empJoinDep.setName(values[0]);empJoinDep.setSex(values[1]);empJoinDep.setAge(Integer.parseInt(values[2]));empJoinDep.setDepNo(Integer.parseInt(values[3]));empJoinDep.setTable(“EMP”);context.write(new IntWritable(Integer.parseInt(values[3])), empJoinDep);}if(values.length==2){empJoinDep.setDepNo(Integer.parseInt(values[0]));empJoinDep.setDepName(values[1]);empJoinDep.setTable(“DEP”);context.write(new IntWritable(Integer.parseInt(values[0])), empJoinDep);}}}public static class MyReducer extends Reducer<IntWritable, EmpJoinDep, NullWritable, EmpJoinDep>{@Overrideprotected void reduce(IntWritable key, Iterable<EmpJoinDep> values,Context context)throws IOException, InterruptedException {String depName = “”;List<EmpJoinDep> list = new LinkedList<EmpJoinDep>();//1 emp//1 depfor (EmpJoinDep val : values) {list.add(new EmpJoinDep(val));//如果是部门表,如果部门编号为1,则获取该部门的名字。if(val.getTable().equals(“DEP”)){depName = val.getDepName();}}//如果上面部门编号是1,则这里也是1。for (EmpJoinDep listjoin : list) {//如果是员工表,则需要设置员工的所属部门。if(listjoin.getTable().equals(“EMP”)){listjoin.setDepName(depName);context.write(NullWritable.get(), listjoin);}}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);if(fileSystem.exists(new Path(OUTPUT_PATH))){fileSystem.delete(new Path(OUTPUT_PATH),true);}Job job = Job.getInstance(conf, “Reduce Join”); job.setJarByClass(ReduceJoin.class);FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(EmpJoinDep.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(EmpJoinDep.class);FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));System.exit(job.waitForCompletion(true) ? 0 : 1);} }不甚酒力,体会不了酒的美味,但却能感受知已的妙处。

MapReduce实现Reduce端Join操作实例

相关文章:

你感兴趣的文章:

标签云: