在Hadoop分布式环境下实现K-Means聚类算法的伪代码如下:
输入:参数0–存储样本数据的文本文件inputfile;
参数1–存储样本数据的SequenceFile文件inputPath;
参数2–存储质心数据的SequenceFile文件centerPath;
参数3–存储聚类结果文件(SequenceFile文件)所处的路径clusterPath;
参数4–类的数量k;
输出:k个类
Begin
读取inputPath,从中选取前k个点作为初始质心,将质心数据写入centerPath;
While 聚类终止条件不满足
在Mapper阶段,读取inputPath,对于key所对应的点,遍历所有的质心,选择最近的质心,将该质心的编号作为键,
该点的编号作为值传递给Reducer;
在Reducer阶段,将Mapper阶段传递过来的值根据键归并输出,结果写入clusterPath;
读取clusterPath,重新计算质心,将结果写入centerPath;
EndWhile
End
判断聚类效果好坏的常见指标是下述的准则函数值:
有理由认为上述值越小,聚类效果越好,随着循环的不断进行,上述准则函数值会收敛到一个很小的值,所以可以用这个值不再明显变化作为聚类循环的终止条件。
以下是存储样本数据(总共200个点)的本地文件kmeans.txt的部分片段(10个点):
163 61 2017 34 2566 7 1014 34 34128 5 4149 33 24185 58 2083 8 1454 3 1796 1 13
其中第一个字段为点的id,第二个字段是点的横坐标,第三个字段是点的纵坐标。
将上述点可视化,见下图:
为了便于访问待聚类的点的ID及其坐标,将输入样本数据存储在SequenceFile格式的文件中,
其中key是点的ID,数据类型为Text,点的坐标是一个double[]型的数组,将该数组封装在类DoubleArray中,这个类需要继承Writable接口,
类DoubleArray的定义如下:DoubleArray.java
package kmeans;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class DoubleArray implements Writable {private double[] data;public DoubleArray() {}public DoubleArray(double[] data) {set(data);}public void set(double[] data) {this.data = data;}public double[] get() {return data;}public void write(DataOutput out) throws IOException {int length = 0;if (data != null) {length = data.length;}out.writeInt(length);for (int i = 0; i < length; i++) {out.writeDouble(data[i]);}}public void readFields(DataInput in) throws IOException {int length = in.readInt();data = new double[length];for (int i = 0; i < length; i++) {data[i] = in.readDouble();}}public double distanceTo(DoubleArray point) {double[] data1 = point.get();double distance = 0;for (int i = 0; i < data.length; i++) {distance = distance + Math.pow(data[i] – data1[i], 2);}return distance;}public void plus(DoubleArray point) {double[] data1 = point.get();for (int i = 0; i < data.length; i++) {data[i] = data[i] + data1[i];}}public void averageN(int n) {for (int i = 0; i < data.length; i++) {data[i] = data[i]/n;}}}
在Mapper阶段,为了便于计算准则函数的值,需要向Reducer传递隶属于某个质心的点的编号以及该点到该质心的距离的平方,为此将这两项数据封装在类IdAndDistance中,该类需要继承Writable接口,代码如下:IdAndDistance.java
package kmeans;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class IdAndDistance implements Writable {private String id;private double distance;public void set(String id, double distance) {this.id = id;this.distance = distance;}public IdAndDistance() {}public IdAndDistance(String id, double distance) {set(id, distance);}public String getId() {return id;}public double getDistance() {return distance;}public void write(DataOutput out) throws IOException {out.writeUTF(id);out.writeDouble(distance);}public void readFields(DataInput in) throws IOException {id = in.readUTF();distance = in.readDouble();}}Mapper阶段代码:KMeansMapper.javapackage kmeans;import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.util.ReflectionUtils;public class KMeansMapper extends Mapper<Text, DoubleArray, Text, IdAndDistance> { private DoubleArray[] centers = null; protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); centers = new DoubleArray[conf.getInt("numberOfCenters", 4)]; String centerPath = conf.get("centerPath"); FileSystem fs = FileSystem.get(URI.create(centerPath), conf); Path path = new Path(centerPath); SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); Text key = (Text) ReflectionUtils.newInstance(Text.class, conf); DoubleArray value = (DoubleArray) ReflectionUtils.newInstance(DoubleArray.class, conf); try { while (reader.next(key, value)) { int index = Integer.parseInt(key.toString()); double[] shuzu = value.get(); centers[index] = new DoubleArray(shuzu); } } finally { IOUtils.closeStream(reader); } } public void map(Text key, DoubleArray value, Context context) throws IOException, InterruptedException { double minDistance = Double.MAX_VALUE; int nearestCenter = 0; for (int i = 0; i < centers.length; i++) { if (value.distanceTo(centers[i]) < minDistance) { nearestCenter = i; minDistance = value.distanceTo(centers[i]); } } context.write(new Text(String.valueOf(nearestCenter)), new IdAndDistance(key.toString(),minDistance)); }}Reducer阶段代码:KMeansReducer.javapackage kmeans;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableUtils;import org.apache.hadoop.mapreduce.Reducer;public class KMeansReducer extends Reducer<Text, IdAndDistance, Text, Text>{ public void reduce(Text key, Iterable<IdAndDistance> values, Context context) throws IOException, InterruptedException { double sumOfDistance = 0; Iterator<IdAndDistance> ite = values.iterator(); String cluster = ""; while (ite.hasNext()) { IdAndDistance temp = WritableUtils.clone(ite.next(), context.getConfiguration()); if (cluster.length() > 0) cluster = cluster + ","; cluster = cluster + temp.getId(); sumOfDistance = sumOfDistance + temp.getDistance(); } cluster = cluster + "," + String.valueOf(sumOfDistance); context.write(key, new Text(cluster)); }}驱动程序代码:KMeansDriver.javapackage kmeans;import java.io.File;import java.io.IOException;import java.io.PrintWriter;import java.net.URI;import java.util.Scanner;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;public class KMeansDriver { public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); conf.setInt("numberOfCenters", Integer.parseInt(args[4])); String inputfile = args[0];//linux系统中包含全路径的文件 String inputPath = args[1];//hdfs文件系统中包含全路径的文件 String centerPath = args[2];//hdfs文件系统中包含全路径的文件 String clusterPath = args[3];//hdfs文件系统中存放簇的路径,不含文件名 conf.set("centerPath", centerPath); double s = 0; double s1 = Double.MAX_VALUE; double shold = 0.1; int times = 0; writeToSeq(conf,inputPath,inputfile);//将样本数据转换成SequenceFile格式文件 System.out.println("Begin to generate centers"); int dimention = centersInitial(conf, inputPath, centerPath);//生成初始质心,并返回样本点的维数 System.out.println("Generating centers for MRJob "+times+" successfully"); conf.setInt("dimention", dimention); FileSystem fs = FileSystem.get(conf); Job job = null; do { System.out.println("MRJob—————————–"+times); fs.delete(new Path(clusterPath), true);//删除上一轮循环生成的聚类结果 job = new Job(conf); job.setJarByClass(KMeansDriver.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setMapperClass(KMeansMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IdAndDistance.class); job.setReducerClass(KMeansReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileInputFormat.addInputPath(job, new Path(inputPath)); SequenceFileOutputFormat.setOutputPath(job, new Path(clusterPath)); job.waitForCompletion(true); if (job.waitForCompletion(true)) { fs.delete(new Path(centerPath), true); System.out.println("Begin to generate centers");// 根据聚类结果生成新质心,并返回聚类结果的准则函数值 double s2 = newCenters(conf, inputPath, centerPath, clusterPath); System.out.println("s2 = "+s2); times ++; s = Math.abs(s1 – s2); System.out.println("Generating centers for MRJob "+times+" successfully"); System.out.println("s = "+s); s1 = s2; } } while (s > shold);//若准则函数值不再有显著变化,则终止循环,否则进入下一轮循环 writeClustersToLocalFile(conf, clusterPath);//将聚类结果写入本地文本文件 } public static void writeToSeq(Configuration conf, String inputPath, String inputfile) throws IOException { String uri = inputPath; FileSystem fs=FileSystem.get(URI.create(uri),conf); Path path=new Path(uri); SequenceFile.Writer Writer = new SequenceFile.Writer(fs,conf,path,Text.class,DoubleArray.class); File file = new File(inputfile); Scanner input = new Scanner(file); while (input.hasNext()) { String[] line = input.nextLine().split("\t"); Text key = new Text(line[0]); double[] data = new double[2]; data[0] = Double.parseDouble(line[1]); data[1] = Double.parseDouble(line[2]); DoubleArray value = new DoubleArray(data); Writer.append(key, value); } input.close(); Writer.close(); } public static int centersInitial(Configuration conf, String inputPath, String centerPath) throws IOException { int dimention = 0; FileSystem inputPathFs= FileSystem.get(URI.create(inputPath),conf); Path path1 = new Path(inputPath); SequenceFile.Reader inputPathReader = new SequenceFile.Reader(inputPathFs, path1, conf); FileSystem centerPathFs= FileSystem.get(URI.create(centerPath),conf); Path path2 = new Path(centerPath); SequenceFile.Writer centerPathWriter = new SequenceFile.Writer(centerPathFs,conf,path2,Text.class,DoubleArray.class); Text key = (Text) ReflectionUtils.newInstance(Text.class, conf); DoubleArray value = (DoubleArray) ReflectionUtils.newInstance(DoubleArray.class, conf); try { int k = 0; while (inputPathReader.next(key, value)) {// 改进方向:随机选择簇中心 if (k < conf.getInt("numberOfCenters", 5)) { centerPathWriter.append(new Text(String.valueOf(k)), value); dimention = value.get().length; System.out.println("center\t"+String.valueOf(k)+"\t"+"("+(value.get())[0]+","+(value.get())[1]+")"); } else { break; } k = k + 1; } } finally { IOUtils.closeStream(inputPathReader); } centerPathWriter.close(); return dimention; } public static double newCenters(Configuration conf, String inputPath, String centerPath, String clusterPath) throws IOException { double s = 0; String[] clusters = new String[conf.getInt("numberOfCenters", 4)]; DoubleArray[] centers = new DoubleArray[conf.getInt("numberOfCenters", 4)]; for (int i = 0; i < centers.length; i++) { double[] temp = new double[conf.getInt("dimention", 1)]; for (int k = 0; k < temp.length; k++) temp[k] = 0; centers[i] = new DoubleArray(temp); } FileSystem clusterPathFs = FileSystem.get(URI.create(clusterPath+"/part-r-00000"), conf); Path path = new Path(clusterPath+"/part-r-00000"); SequenceFile.Reader clusterReader = new SequenceFile.Reader(clusterPathFs, path, conf); Text clusterKey = (Text) ReflectionUtils.newInstance(Text.class, conf); Text clusterValue = (Text) ReflectionUtils.newInstance(Text.class, conf); int k = 0; try { while (clusterReader.next(clusterKey, clusterValue)) { clusters[Integer.parseInt(clusterKey.toString())] = clusterValue.toString(); int indexOfDistance = clusterValue.toString().lastIndexOf(",") + 1; double sumOfDistance = Double.parseDouble(clusterValue.toString().substring(indexOfDistance)); s = s + sumOfDistance; k = k + 1; } } finally { IOUtils.closeStream(clusterReader); } FileSystem inputPathFs= FileSystem.get(URI.create(inputPath),conf); Path path1 = new Path(inputPath); SequenceFile.Reader inputPathReader = new SequenceFile.Reader(inputPathFs, path1, conf); Text inputKey = (Text) ReflectionUtils.newInstance(Text.class, conf); DoubleArray inputValue = (DoubleArray) ReflectionUtils.newInstance(DoubleArray.class, conf); try { while (inputPathReader.next(inputKey, inputValue)) { for (int i = 0; i < conf.getInt("numberOfCenters", 5); i++) { if (clusters[i].indexOf(inputKey.toString()+",") == 0 || clusters[i].indexOf(","+inputKey.toString()+",") > 0 || clusters[i].indexOf(","+inputKey.toString()) == clusters[i].length()-(","+inputKey.toString()).length()) { centers[i].plus(inputValue); } } } } finally { IOUtils.closeStream(inputPathReader); } for (int i = 0; i < conf.getInt("numberOfCenters", 5); i++) { centers[i].averageN(clusters[i].split(",").length); System.out.println("center\t"+String.valueOf(i)+"\t"+"("+(centers[i].get())[0]+","+(centers[i].get())[1]+")"); } FileSystem centerPathFs= FileSystem.get(URI.create(centerPath),conf); Path path2 = new Path(centerPath); SequenceFile.Writer centerPathWriter = new SequenceFile.Writer(centerPathFs,conf,path2,Text.class,DoubleArray.class); for (int i = 0; i < conf.getInt("numberOfCenters", 5); i++) { centerPathWriter.append(new Text(String.valueOf(i)), centers[i]); } centerPathWriter.close(); return s; } public static void writeClustersToLocalFile(Configuration conf, String clusterPath) throws IOException { File file = new File("/home/liujun/kmeans_clusters.txt"); PrintWriter output = new PrintWriter(file); FileSystem clusterPathFs = FileSystem.get(URI.create(clusterPath+"/part-r-00000"), conf); Path path = new Path(clusterPath+"/part-r-00000"); SequenceFile.Reader clusterReader = new SequenceFile.Reader(clusterPathFs, path, conf); Text clusterKey = (Text) ReflectionUtils.newInstance(Text.class, conf); Text clusterValue = (Text) ReflectionUtils.newInstance(Text.class, conf); try { while (clusterReader.next(clusterKey, clusterValue)) { String[] line = clusterValue.toString().split(","); for (int i = 0; i < line.length – 1; i++) { output.print(clusterKey.toString()+"\t"); output.print(line[i]+"\n"); } } } finally { IOUtils.closeStream(clusterReader); } output.close(); }}代码运行步骤:
第一步:将代码编译打包,生成kmeans.jar;
每个人心中,都会有一个古镇情怀,流水江南,烟笼人家。