【Hadoop基础教程】7、Hadoop之一对多关联查询

我们都知道一个地址拥有着多家公司,本案例将通过两种类型输入文件:address类(地址)和company类(公司)进行一对多的关联查询,得到地址名(例如:Beijing)与公司名(例如:Beijing JD、Beijing Red Star)的关联信息。

开发环境

硬件环境:Centos 6.5 服务器4台(一台为Master节点,三台为Slave节点) 软件环境:Java 1.7.0_45、hadoop-1.2.1

1、 Map过程

首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容并存入< key,value>例如<0,” 1:Beijing”>。Map过程首先按照输入文件的类型不同对输入信息进行不同的处理,例如,对于address类型输入文件将value值(”1:Beijing”)处理成<”1”,”address:Beijing”>,对于company类型输入文件将value值(”Beijing Red Star:1”)处理成<”1”,”company:Beijing Red Star”>,如图所示:

Map端核心代码实现如下,,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。

public static class MapClass extends Mapper<LongWritable, Text, Text, Text>{@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {Text addressId = new Text();Text info = new Text();String[] line = value.toString().split(“:”);// 获取文件的每一行数据,并以”:”分割String path = ((FileSplit) context.getInputSplit()).getPath().toString();if (line.length < 2) {return;}if (path.indexOf(“company”) >= 0) {//处理company文件的value信息: “Beijing Red Star:1″addressId.set(line[1]);//”1″info.set(“company” + “:” + line[0]);//”company:Beijing Red Star”context.write(addressId,info);//<key,value> –<“1″,”company:Beijing Red Star”>} else if (path.indexOf(“address”) >= 0) {//处理adress文件的value信息:”1:Beijing”addressId.set(line[0]);//”1″info.set(“address” + “:” + line[1]);//”address:Beijing”context.write(addressId,info);//<key,value> –<“1″,”address:Beijing”>}}}2、 Reduce过程

Reduce过程首先对输入< key,values>即<”1”,[“company:Beijing Red Star”,”company:Beijing JD”,”address:Beijing”]>的values值进行遍历获取到单元信息value(例如”company:Beijing Red Star”),然后根据value中的标识符(company和address)将公司名和地址名分别存入到company集合和address集合,最后对company集合和address集合进行笛卡尔积运算得到company与address的关系,并进行输出,如图所示。

Reduce端核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。

public static class ReduceClass extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException {List<String> companys = new ArrayList<String>();List<String> addresses = new ArrayList<String>();//[“company:Beijing Red Star”,”company:Beijing JD”,”address:Beijing”]Iterator<Text> it = values.iterator();while(it.hasNext()){String value = it.next().toString();//”company:Beijing Red Star”String[] result = value.split(“:”);if(result.length >= 2){if(result[0].equals(“company”)){companys.add(result[1]);}else if(result[0].equals(“address”)){addresses.add(result[1]);}}}// 求笛卡尔积if(0 != companys.size()&& 0 != addresses.size()){for(int i=0;i<companys.size();i++){for(int j=0;j<addresses.size();j++){context.write(new Text(companys.get(i)), new Text(addresses.get(j)));//<key,value>–<“Beijing JD”,”Beijing”>}}}}}3、 驱动实现

驱动核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。

public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 3) {System.err.println(“Usage: company Join address <companyTableDir> <addressTableDir> <out>”);System.exit(2);}Job job = new Job(conf, “company Join address”);//设置Job入口类job.setJarByClass(CompanyJoinAddress.class);// 设置Map和Reduce处理类job.setMapperClass(MapClass.class);job.setReducerClass(ReduceClass.class);// 设置输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 设置输入和输出目录FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//companyTableDirFileInputFormat.addInputPath(job, new Path(otherArgs[1]));//addressTableDirFileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));//outSystem.exit(job.waitForCompletion(true) ? 0 : 1);}4、部署运行1)启动Hadoop集群[hadoop@K-Master ~]$ start-dfs.sh[hadoop@K-Master ~]$ start-mapred.sh[hadoop@K-Master ~]$ jps5283 SecondaryNameNode5445 JobTracker5578 Jps5109 NameNode2)部署源码#设置工作环境[hadoop@K-Master ~]$ mkdir -p /usr/hadoop/workspace/MapReduce#部署源码将CompanyJoinAddress文件夹拷贝到/usr/hadoop/workspace/MapReduce/ 路径下;既有美妙的风景,也会有称不上景只有风的地方。

【Hadoop基础教程】7、Hadoop之一对多关联查询

相关文章:

你感兴趣的文章:

标签云: