hadoop join之reduce side join

在介绍这个实例之前,请各位参考:。

reduce side join是一种最简单的join方式,其主要思想如下:在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。在这个例子中我们假设有两个数据文件如下:

user.csv文件:

"ID","NAME","SEX""1","user1","0""2","user2","0""3","user3","0""4","user4","1""5","user5","0""6","user6","0""7","user7","1""8","user8","0""9","user9","0"

order.csv文件:

"USER_ID","NAME""1","order1""2","order2""3","order3""4","order4""7","order7""8","order8""9","order9"

目前网上的例子大多是基于0.20以前版本的API写的,所以咱们采用新的API来写,具体代码如下:

public class MyJoin{public static class MapClass extendsMapper<LongWritable, Text, Text, Text>{//最好在map方法外定义变量,以减少map计算时创建对象的个数private Text key = new Text();private Text value = new Text();private String[] keyValue = null;@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{//采用的数据输入格式是TextInputFormat,//文件被分为一系列以换行或者制表符结束的行,,//key是每一行的位置(偏移量,LongWritable类型),//value是每一行的内容,Text类型,所有我们要把key从value中解析出来keyValue = value.toString().split(",", 2);this.key.set(keyValue[0]);this.value.set(keyValue[1]);context.write(this.key, this.value);}}public static class Reduce extends Reducer<Text, Text, Text, Text>{//最好在reduce方法外定义变量,以减少reduce计算时创建对象的个数private Text value = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException{StringBuilder valueStr = new StringBuilder();//values中的每一个值是不同数据文件中的具有相同key的值//即是map中输出的多个文件相同key的value值集合for(Text val : values){valueStr.append(val);valueStr.append(",");}this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString());context.write(key, this.value);}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = new Job(conf, "MyJoin");job.setJarByClass(MyJoin.class);job.setMapperClass(MapClass.class);job.setReducerClass(Reduce.class);//job.setCombinerClass(Reduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//分别采用TextInputFormat和TextOutputFormat作为数据的输入和输出格式//如果不设置,这也是Hadoop默认的操作方式job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

你不能左右天气,但你能转变你的心情

hadoop join之reduce side join

相关文章:

你感兴趣的文章:

标签云: