hadoop中文分词、词频统计及排序

需求如下:

有如图所示的输入文件。其中第一列代表ip地址,之后的偶数列代表搜索词,数字(奇数列)代表搜索次数,使用"\t"分隔。现在需要对搜索词进行分词并统计词频,此处不考虑搜索次数,可能是翻页,亦不考虑搜索链接的行为。

这里中文分词使用了IK分词包,直接将源码放入src中。感谢IK分词。

程序如下:

<span style="font-size:14px;">package seg;import java.io.ByteArrayInputStream;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.Reader;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.wltea.analyzer.core.IKSegmenter;import org.wltea.analyzer.core.Lexeme;/** * @author zhf * @version 创建时间:2014年8月16日 下午3:04:40 */public class SegmentTool extends Configured implements Tool{public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new SegmentTool(), args);System.exit(exitCode);}@Overridepublic int run(String[] arg0) throws Exception {Configuration conf = new Configuration();String[] args = new GenericOptionsParser(conf,arg0).getRemainingArgs();if(args.length != 2){System.err.println("Usage:seg.SegmentTool <input> <output>");System.exit(2);}Job job = new Job(conf,"nseg.jar");FileSystem fs = FileSystem.get(conf);if(fs.exists(new Path(args[1])))fs.delete(new Path(args[1]),true);job.setJarByClass(SegmentTool.class);job.setMapperClass(SegmentMapper.class);job.setCombinerClass(SegReducer.class);job.setReducerClass(SegReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));return job.waitForCompletion(true) ? 0 : 1;}public static class SegmentMapper extends Mapper<LongWritable,Text,Text,IntWritable>{private IKSegmenter iks = new IKSegmenter(true);private Text word = new Text();private final static IntWritable one = new IntWritable(1);public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{String line = value.toString().trim();String[] str = line.split("\t");for(int i=1;i<str.length;i+=2){String tmp = str[i];if(tmp.startsWith("http"))continue;List<String> list = segment(tmp);for(String s : list){word.set(s);context.write(word, one);}}}private List<String> segment(String str) throws IOException{byte[] byt = str.getBytes();InputStream is = new ByteArrayInputStream(byt);Reader reader = new InputStreamReader(is);iks.reset(reader);Lexeme lexeme;List<String> list = new ArrayList<String>();while((lexeme = iks.next()) != null){String text = lexeme.getLexemeText();list.add(text);}return list;}}public static class SegReducer extends Reducer<Text,IntWritable,Text,IntWritable>{private IntWritable result = new IntWritable();public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{int sum = 0;for(IntWritable val : values)sum += val.get();result.set(sum);context.write(key, result);}}}</span>使用的hadoop环境为:Hadoop 2.3.0-cdh5.0.0。需要引入三个hadoop相关的jar :hadoop-mapreduce-client-core-2.0.0-cdh4.6.0.jar、hadoop-common-2.0.0-cdh4.6.0.jar、commons-cli-1.2.jar。

打包后,执行命令:yarn jar seg.jar seg.SegmentTool /test/user/zhf/input /test/user/zhf/output

输出结果部分如下:

<span style="font-size:18px;">阿迪达斯1附近 2陈22陈乔恩 1陈奕迅 1陈毅 2限额 4陕西 4除个别 1隐私 1隔壁 1集成 4集锦 1雨中 2雪5露1青7青岛 2</span>

但是并没有排序,如果数据量比较小,可以采用linux命令:sort -k2 -n -r kw_result.txt > kw_freq.txt进行排序。

数据量大的话,可以将结果导入Hive,因为只有两列了,,hive -e "select key,count from kw_table sort by count desc;" > kw_freq.txt 即可得到有序的结果。

亦可以将之前的ouput作为下一个job的input,实现排序。需要反转map输出的key和value。

记录沿途的心情。那样的生活才是我想要的。

hadoop中文分词、词频统计及排序

相关文章:

你感兴趣的文章:

标签云: