liushaobo的专栏

Assistance.java 辅助类,功能详见注释

package KMeans;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.util.LineReader;import java.io.IOException;import java.util.*;public class Assistance {//读取聚类中心点信息:聚类中心ID、聚类中心点public static List<ArrayList<Float>> getCenters(String inputpath){List<ArrayList<Float>> result = new ArrayList<ArrayList<Float>>();Configuration conf = new Configuration();try {FileSystem hdfs = FileSystem.get(conf);Path in = new Path(inputpath);FSDataInputStream fsIn = hdfs.open(in);LineReader lineIn = new LineReader(fsIn, conf);Text line = new Text();while (lineIn.readLine(line) > 0){String record = line.toString();/*因为Hadoop输出键值对时会在键跟值之间添加制表符,所以用空格代替之。*/String[] fields = record.replace("\t", " ").split(" ");List<Float> tmplist = new ArrayList<Float>();for (int i = 0; i < fields.length; ++i){tmplist.add(Float.parseFloat(fields[i]));}result.add((ArrayList<Float>) tmplist);}fsIn.close();} catch (IOException e){e.printStackTrace();}return result;}//删除上一次MapReduce作业的结果public static void deleteLastResult(String path){Configuration conf = new Configuration();try {FileSystem hdfs = FileSystem.get(conf);Path path1 = new Path(path);hdfs.delete(path1, true);} catch (IOException e){e.printStackTrace();}}//计算相邻两次迭代结果的聚类中心的距离,判断是否满足终止条件public static boolean isFinished(String oldpath, String newpath, int k, float threshold)throws IOException{List<ArrayList<Float>> oldcenters = Assistance.getCenters(oldpath);List<ArrayList<Float>> newcenters = Assistance.getCenters(newpath);float distance = 0;for (int i = 0; i < k; ++i){for (int j = 1; j < oldcenters.get(i).size(); ++j){float tmp = Math.abs(oldcenters.get(i).get(j) – newcenters.get(i).get(j));distance += Math.pow(tmp, 2);}}System.out.println("Distance = " + distance + " Threshold = " + threshold);if (distance < threshold)return true;/*如果不满足终止条件,,则用本次迭代的聚类中心更新聚类中心*/Assistance.deleteLastResult(oldpath);Configuration conf = new Configuration();FileSystem hdfs = FileSystem.get(conf);hdfs.copyToLocalFile(new Path(newpath), new Path("/home/hadoop/class/oldcenter.data"));hdfs.delete(new Path(oldpath), true);hdfs.moveFromLocalFile(new Path("/home/hadoop/class/oldcenter.data"), new Path(oldpath));return false;}}

KMeansDriver.java 作业驱动类

package KMeans;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;public class KMeansDriver{public static void main(String[] args) throws Exception{int repeated = 0;/*不断提交MapReduce作业指导相邻两次迭代聚类中心的距离小于阈值或到达设定的迭代次数*/do {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 6){System.err.println("Usage: <int> <out> <oldcenters> <newcenters> <k> <threshold>");System.exit(2);}conf.set("centerpath", otherArgs[2]);conf.set("kpath", otherArgs[4]);Job job = new Job(conf, "KMeansCluster");//新建MapReduce作业job.setJarByClass(KMeansDriver.class);//设置作业启动类Path in = new Path(otherArgs[0]);Path out = new Path(otherArgs[1]);FileInputFormat.addInputPath(job, in);//设置输入路径FileSystem fs = FileSystem.get(conf);if (fs.exists(out)){//如果输出路径存在,则先删除之fs.delete(out, true);}FileOutputFormat.setOutputPath(job, out);//设置输出路径job.setMapperClass(KMeansMapper.class);//设置Map类job.setReducerClass(KMeansReducer.class);//设置Reduce类job.setOutputKeyClass(IntWritable.class);//设置输出键的类job.setOutputValueClass(Text.class);//设置输出值的类job.waitForCompletion(true);//启动作业++repeated;System.out.println("We have repeated " + repeated + " times.");} while (repeated < 10 && (Assistance.isFinished(args[2], args[3], Integer.parseInt(args[4]), Float.parseFloat(args[5])) == false));//根据最终得到的聚类中心对数据集进行聚类Cluster(args);}public static void Cluster(String[] args)throws IOException, InterruptedException, ClassNotFoundException{Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 6){System.err.println("Usage: <int> <out> <oldcenters> <newcenters> <k> <threshold>");System.exit(2);}conf.set("centerpath", otherArgs[2]);conf.set("kpath", otherArgs[4]);Job job = new Job(conf, "KMeansCluster");job.setJarByClass(KMeansDriver.class);Path in = new Path(otherArgs[0]);Path out = new Path(otherArgs[1]);FileInputFormat.addInputPath(job, in);FileSystem fs = FileSystem.get(conf);if (fs.exists(out)){fs.delete(out, true);}FileOutputFormat.setOutputPath(job, out);//因为只是将样本点聚类,不需要reduce操作,故不设置Reduce类job.setMapperClass(KMeansMapper.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(Text.class);job.waitForCompletion(true);}}KMeansMapper.java

别让别人徘徊的脚步踩碎你明天美好的梦想,

liushaobo的专栏

相关文章:

你感兴趣的文章:

标签云: