MapReduce TopK 文件

问题描述:对于每日访问google 的ip做个记录 对应计算出当天前K个访问次数最多的ip地址。

对应此问题 先自定制一个ip格式的数据类型 继承WritableComparable接口。

package reverseIndex;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;public class ipAndcount implements WritableComparable<ipAndcount>{private Text ip;private IntWritable count;public ipAndcount(){this.ip = new Text("");this.count = new IntWritable(1);}public ipAndcount(Text ip,IntWritable count){this.ip =ip;this.count = count;}@Overridepublic void readFields(DataInput input) throws IOException {// TODO Auto-generated method stubip.readFields(input);count.readFields(input);}@Overridepublic void write(DataOutput output) throws IOException {// TODO Auto-generated method stubip.write(output);count.write(output);}@Overridepublic int compareTo(ipAndcount o) {// TODO Auto-generated method stubreturn ((ipAndcount)o).count.compareTo(count)==0?ip.compareTo(((ipAndcount)o).ip):((ipAndcount)o).count.compareTo(count);}public boolean equals(ipAndcount o){if(!(o instanceof ipAndcount)){return false;}ipAndcount other = (ipAndcount)o;return ip.equals(other.ip) &&(count.equals(other.count));}public String toString(){StringBuffer buf = new StringBuffer("IP=");buf.append(ip.toString());buf.append(",Count=");buf.append(count.toString());buf.append(";");return buf.toString();}public Text getIp(){return ip;}public IntWritable getCount(){return count;}public void setCount(IntWritable count){this.count = count;}}此问题 应该分为俩个作业进行完成,一个用于统计IP及其整合的数量(类似WordCount)另一个用于选择出前K个进行输出:

package reverseIndex;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;//分为2个作业进行 完成 一个 用于统计每日的访问ip 另一个用于选择出前K个 访问高的ippublic class firstK {public static class FindIpMapper extends Mapper<LongWritable, Text, Text, IntWritable>{private IntWritable one = new IntWritable(1);public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{context.write(value,one);}}public static class IpReducer extends Reducer<Text,IntWritable,Text,IntWritable>{public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException, InterruptedException{int sum = 0;for(IntWritable val : values){sum += val.get();}context.write(key, new IntWritable(sum));}}public static class beforeSortIpmapper extends Mapper<Text,Text,ipAndcount,Text>{public void map(Text key,Text value,Context context) throws IOException, InterruptedException{ipAndcount tmp = new ipAndcount(key,new IntWritable(Integer.valueOf(value.toString())));context.write(tmp,new Text());}}public static class selectTopKReducer extends Reducer<ipAndcount,Text,ipAndcount,Text>{int count = 0;int k = 10;public void reduce(ipAndcount key,Iterable<Text> values,Context context) throws IOException, InterruptedException{if(count<k){context.write(key, null);count++;}}}public static void main(String[] args) throws IOException {// TODO Auto-generated method stubConfiguration conf = new Configuration();Job job1 = new Job(conf,"sum ip");job1.setJarByClass(firstK.class);//默认输入输出格式job1.setInputFormatClass(TextInputFormat.class);job1.setOutputFormatClass(TextOutputFormat.class);//读取文件路径 和输出路径Path in = new Path(args[0]);Path out = new Path(args[1]);FileInputFormat.addInputPath(job1,in);FileOutputFormat.setOutputPath(job1,out);//设置map的输入输出格式job1.setMapOutputKeyClass(Text.class);job1.setMapOutputValueClass(IntWritable.class);job1.setOutputKeyClass(Text.class);job1.setOutputValueClass(IntWritable.class);//设置处理类job1.setMapperClass(FindIpMapper.class);job1.setReducerClass(IpReducer.class);//reduce任务个数job1.setNumReduceTasks(7);//作业2的配置Configuration conf2 = new Configuration();Job job2 = new Job(conf2,"select K");job1.setJarByClass(firstK.class);job1.setInputFormatClass(KeyValueTextInputFormat.class);job1.setOutputFormatClass(TextOutputFormat.class);Path in2 = new Path(args[1]);Path out2 = new Path(args[2]);FileInputFormat.addInputPath(job2,in2);FileOutputFormat.setOutputPath(job2,out2);job1.setMapOutputKeyClass(ipAndcount.class);job1.setMapOutputValueClass(Text.class);job1.setOutputKeyClass(ipAndcount.class);job1.setOutputValueClass(Text.class);job1.setMapperClass(beforeSortIpmapper.class);job1.setReducerClass(selectTopKReducer.class);job1.setNumReduceTasks(1);//作业的关联性 使用jobcontrol进行处理JobControl jc = new JobControl("select k ip");ControlledJob cjob1 = new ControlledJob(conf);cjob1.setJob(job1);ControlledJob cjob2 = new ControlledJob(conf2);cjob2.setJob(job2);jc.addJob(cjob1);jc.addJob(cjob2);//依赖关系cjob2.addDependingJob(cjob1);jc.run();}}

,选择逃避,选择被动的去面对生活

MapReduce TopK 文件

相关文章:

你感兴趣的文章:

标签云: