LJBlog2014的专栏

在Hadoop分布式环境下实现K-Means聚类算法的伪代码如下:

输入:参数0–存储样本数据的文本文件inputfile;

参数1–存储样本数据的SequenceFile文件inputPath;

参数2–存储质心数据的SequenceFile文件centerPath;

参数3–存储聚类结果文件(SequenceFile文件)所处的路径clusterPath;

参数4–类的数量k;

输出:k个类

Begin

读取inputPath,从中选取前k个点作为初始质心,将质心数据写入centerPath;

While 聚类终止条件不满足

在Mapper阶段,读取inputPath,对于key所对应的点,遍历所有的质心,选择最近的质心,将该质心的编号作为键,

该点的编号作为值传递给Reducer;

在Reducer阶段,将Mapper阶段传递过来的值根据键归并输出,结果写入clusterPath;

读取clusterPath,重新计算质心,将结果写入centerPath;

EndWhile

End

判断聚类效果好坏的常见指标是下述的准则函数值:

有理由认为上述值越小,聚类效果越好,随着循环的不断进行,上述准则函数值会收敛到一个很小的值,所以可以用这个值不再明显变化作为聚类循环的终止条件。

以下是存储样本数据(总共200个点)的本地文件kmeans.txt的部分片段(10个点):

163 61 2017 34 2566 7 1014 34 34128 5 4149 33 24185 58 2083 8 1454 3 1796 1 13

其中第一个字段为点的id,第二个字段是点的横坐标,第三个字段是点的纵坐标。

将上述点可视化,见下图:

为了便于访问待聚类的点的ID及其坐标,将输入样本数据存储在SequenceFile格式的文件中,

其中key是点的ID,数据类型为Text,点的坐标是一个double[]型的数组,将该数组封装在类DoubleArray中,这个类需要继承Writable接口,

类DoubleArray的定义如下:DoubleArray.java

package kmeans;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class DoubleArray implements Writable {private double[] data;public DoubleArray() {}public DoubleArray(double[] data) {set(data);}public void set(double[] data) {this.data = data;}public double[] get() {return data;}public void write(DataOutput out) throws IOException {int length = 0;if (data != null) {length = data.length;}out.writeInt(length);for (int i = 0; i < length; i++) {out.writeDouble(data[i]);}}public void readFields(DataInput in) throws IOException {int length = in.readInt();data = new double[length];for (int i = 0; i < length; i++) {data[i] = in.readDouble();}}public double distanceTo(DoubleArray point) {double[] data1 = point.get();double distance = 0;for (int i = 0; i < data.length; i++) {distance = distance + Math.pow(data[i] – data1[i], 2);}return distance;}public void plus(DoubleArray point) {double[] data1 = point.get();for (int i = 0; i < data.length; i++) {data[i] = data[i] + data1[i];}}public void averageN(int n) {for (int i = 0; i < data.length; i++) {data[i] = data[i]/n;}}}

在Mapper阶段,为了便于计算准则函数的值,需要向Reducer传递隶属于某个质心的点的编号以及该点到该质心的距离的平方,为此将这两项数据封装在类IdAndDistance中,该类需要继承Writable接口,代码如下:IdAndDistance.java

package kmeans;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class IdAndDistance implements Writable {private String id;private double distance;public void set(String id, double distance) {this.id = id;this.distance = distance;}public IdAndDistance() {}public IdAndDistance(String id, double distance) {set(id, distance);}public String getId() {return id;}public double getDistance() {return distance;}public void write(DataOutput out) throws IOException {out.writeUTF(id);out.writeDouble(distance);}public void readFields(DataInput in) throws IOException {id = in.readUTF();distance = in.readDouble();}}Mapper阶段代码:KMeansMapper.javapackage kmeans;import java.io.IOException;import java.net.URI;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.util.ReflectionUtils;public class KMeansMapper extends Mapper<Text, DoubleArray, Text, IdAndDistance> {    private DoubleArray[] centers = null;      protected void setup(Context context) throws IOException, InterruptedException {        super.setup(context);        Configuration conf = context.getConfiguration();        centers = new DoubleArray[conf.getInt("numberOfCenters", 4)];        String centerPath = conf.get("centerPath");        FileSystem fs =  FileSystem.get(URI.create(centerPath), conf);        Path path = new Path(centerPath);        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);        Text key = (Text) ReflectionUtils.newInstance(Text.class, conf);        DoubleArray value = (DoubleArray) ReflectionUtils.newInstance(DoubleArray.class, conf);        try {            while (reader.next(key, value)) {                int index = Integer.parseInt(key.toString());                double[] shuzu = value.get();                centers[index] = new DoubleArray(shuzu);            }        } finally {            IOUtils.closeStream(reader);        }    }        public void map(Text key, DoubleArray value, Context context)     throws IOException, InterruptedException {        double minDistance = Double.MAX_VALUE;        int nearestCenter = 0;        for (int i = 0; i < centers.length; i++) {            if (value.distanceTo(centers[i]) < minDistance) {                nearestCenter = i;                minDistance = value.distanceTo(centers[i]);            }        }        context.write(new Text(String.valueOf(nearestCenter)), new IdAndDistance(key.toString(),minDistance));    }}Reducer阶段代码:KMeansReducer.javapackage kmeans;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableUtils;import org.apache.hadoop.mapreduce.Reducer;public class KMeansReducer extends Reducer<Text, IdAndDistance, Text, Text>{    public void reduce(Text key, Iterable<IdAndDistance> values, Context context)     throws IOException, InterruptedException {        double sumOfDistance = 0;        Iterator<IdAndDistance> ite = values.iterator();        String cluster = "";        while (ite.hasNext()) {            IdAndDistance temp = WritableUtils.clone(ite.next(), context.getConfiguration());            if (cluster.length() > 0) cluster = cluster + ",";            cluster = cluster + temp.getId();            sumOfDistance = sumOfDistance + temp.getDistance();        }        cluster = cluster + "," + String.valueOf(sumOfDistance);        context.write(key, new Text(cluster));    }}驱动程序代码:KMeansDriver.javapackage kmeans;import java.io.File;import java.io.IOException;import java.io.PrintWriter;import java.net.URI;import java.util.Scanner;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;public class KMeansDriver {    public static void main(String[] args) throws Exception {        // TODO Auto-generated method stub        Configuration conf = new Configuration();        conf.setInt("numberOfCenters", Integer.parseInt(args[4]));        String inputfile = args[0];//linux系统中包含全路径的文件        String inputPath = args[1];//hdfs文件系统中包含全路径的文件        String centerPath = args[2];//hdfs文件系统中包含全路径的文件        String clusterPath = args[3];//hdfs文件系统中存放簇的路径,不含文件名        conf.set("centerPath", centerPath);        double s = 0;        double s1 = Double.MAX_VALUE;        double shold = 0.1;        int times = 0;        writeToSeq(conf,inputPath,inputfile);//将样本数据转换成SequenceFile格式文件        System.out.println("Begin to generate centers");        int dimention = centersInitial(conf, inputPath, centerPath);//生成初始质心,并返回样本点的维数            System.out.println("Generating centers for MRJob "+times+" successfully");        conf.setInt("dimention", dimention);        FileSystem fs = FileSystem.get(conf);        Job job = null;        do {            System.out.println("MRJob—————————–"+times);            fs.delete(new Path(clusterPath), true);//删除上一轮循环生成的聚类结果            job = new Job(conf);            job.setJarByClass(KMeansDriver.class);            job.setInputFormatClass(SequenceFileInputFormat.class);            job.setMapperClass(KMeansMapper.class);            job.setMapOutputKeyClass(Text.class);            job.setMapOutputValueClass(IdAndDistance.class);            job.setReducerClass(KMeansReducer.class);            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(Text.class);            job.setOutputFormatClass(SequenceFileOutputFormat.class);                        SequenceFileInputFormat.addInputPath(job, new Path(inputPath));            SequenceFileOutputFormat.setOutputPath(job, new Path(clusterPath));            job.waitForCompletion(true);            if (job.waitForCompletion(true)) {                fs.delete(new Path(centerPath), true);                System.out.println("Begin to generate centers");//                根据聚类结果生成新质心,并返回聚类结果的准则函数值                double s2 = newCenters(conf, inputPath, centerPath, clusterPath);                System.out.println("s2 = "+s2);                times ++;                s = Math.abs(s1 – s2);                System.out.println("Generating centers for MRJob "+times+" successfully");                System.out.println("s = "+s);                s1 = s2;            }        } while (s > shold);//若准则函数值不再有显著变化,则终止循环,否则进入下一轮循环        writeClustersToLocalFile(conf, clusterPath);//将聚类结果写入本地文本文件    }        public static void writeToSeq(Configuration conf, String inputPath, String inputfile) throws IOException {        String uri = inputPath;        FileSystem fs=FileSystem.get(URI.create(uri),conf);        Path path=new Path(uri);        SequenceFile.Writer Writer = new SequenceFile.Writer(fs,conf,path,Text.class,DoubleArray.class);        File file = new File(inputfile);        Scanner input = new Scanner(file);        while (input.hasNext()) {            String[] line = input.nextLine().split("\t");            Text key = new Text(line[0]);            double[] data = new double[2];            data[0] = Double.parseDouble(line[1]);            data[1] = Double.parseDouble(line[2]);            DoubleArray value = new DoubleArray(data);            Writer.append(key, value);        }        input.close();        Writer.close();    }            public static int centersInitial(Configuration conf, String inputPath, String centerPath)     throws IOException {        int dimention = 0;        FileSystem inputPathFs= FileSystem.get(URI.create(inputPath),conf);        Path path1 = new Path(inputPath);        SequenceFile.Reader inputPathReader = new SequenceFile.Reader(inputPathFs, path1, conf);                FileSystem centerPathFs= FileSystem.get(URI.create(centerPath),conf);        Path path2 = new Path(centerPath);        SequenceFile.Writer centerPathWriter = new SequenceFile.Writer(centerPathFs,conf,path2,Text.class,DoubleArray.class);        Text key = (Text) ReflectionUtils.newInstance(Text.class, conf);        DoubleArray value = (DoubleArray) ReflectionUtils.newInstance(DoubleArray.class, conf);        try {            int k = 0;            while (inputPathReader.next(key, value)) {//              改进方向:随机选择簇中心                if (k < conf.getInt("numberOfCenters", 5)) {                    centerPathWriter.append(new Text(String.valueOf(k)), value);                    dimention = value.get().length;                    System.out.println("center\t"+String.valueOf(k)+"\t"+"("+(value.get())[0]+","+(value.get())[1]+")");                } else {                    break;                }                k = k + 1;            }        } finally {            IOUtils.closeStream(inputPathReader);        }        centerPathWriter.close();        return dimention;    }        public static double newCenters(Configuration conf, String inputPath, String centerPath, String clusterPath)     throws IOException {        double s = 0;        String[] clusters = new String[conf.getInt("numberOfCenters", 4)];        DoubleArray[] centers = new DoubleArray[conf.getInt("numberOfCenters", 4)];        for (int i = 0; i < centers.length; i++) {            double[] temp = new double[conf.getInt("dimention", 1)];            for (int k = 0; k < temp.length; k++) temp[k] = 0;            centers[i] = new DoubleArray(temp);        }                FileSystem clusterPathFs =  FileSystem.get(URI.create(clusterPath+"/part-r-00000"), conf);        Path path = new Path(clusterPath+"/part-r-00000");        SequenceFile.Reader clusterReader = new SequenceFile.Reader(clusterPathFs, path, conf);        Text clusterKey = (Text) ReflectionUtils.newInstance(Text.class, conf);        Text clusterValue = (Text) ReflectionUtils.newInstance(Text.class, conf);        int k = 0;        try {            while (clusterReader.next(clusterKey, clusterValue)) {                clusters[Integer.parseInt(clusterKey.toString())] = clusterValue.toString();                int indexOfDistance = clusterValue.toString().lastIndexOf(",") + 1;                double sumOfDistance = Double.parseDouble(clusterValue.toString().substring(indexOfDistance));                s = s + sumOfDistance;                k = k + 1;            }        } finally {            IOUtils.closeStream(clusterReader);        }                FileSystem inputPathFs= FileSystem.get(URI.create(inputPath),conf);        Path path1 = new Path(inputPath);        SequenceFile.Reader inputPathReader = new SequenceFile.Reader(inputPathFs, path1, conf);        Text inputKey = (Text) ReflectionUtils.newInstance(Text.class, conf);        DoubleArray inputValue = (DoubleArray) ReflectionUtils.newInstance(DoubleArray.class, conf);        try {            while (inputPathReader.next(inputKey, inputValue)) {                for (int i = 0; i < conf.getInt("numberOfCenters", 5); i++) {                    if (clusters[i].indexOf(inputKey.toString()+",") == 0                         || clusters[i].indexOf(","+inputKey.toString()+",") > 0                         || clusters[i].indexOf(","+inputKey.toString()) == clusters[i].length()-(","+inputKey.toString()).length()) {                        centers[i].plus(inputValue);                    }                }            }        } finally {            IOUtils.closeStream(inputPathReader);        }        for (int i = 0; i < conf.getInt("numberOfCenters", 5); i++) {            centers[i].averageN(clusters[i].split(",").length);            System.out.println("center\t"+String.valueOf(i)+"\t"+"("+(centers[i].get())[0]+","+(centers[i].get())[1]+")");        }                FileSystem centerPathFs= FileSystem.get(URI.create(centerPath),conf);        Path path2 = new Path(centerPath);        SequenceFile.Writer centerPathWriter = new SequenceFile.Writer(centerPathFs,conf,path2,Text.class,DoubleArray.class);        for (int i = 0; i < conf.getInt("numberOfCenters", 5); i++) {            centerPathWriter.append(new Text(String.valueOf(i)), centers[i]);        }        centerPathWriter.close();                return s;    }        public static void writeClustersToLocalFile(Configuration conf, String clusterPath) throws IOException {        File file = new File("/home/liujun/kmeans_clusters.txt");        PrintWriter output = new PrintWriter(file);                FileSystem clusterPathFs =  FileSystem.get(URI.create(clusterPath+"/part-r-00000"), conf);        Path path = new Path(clusterPath+"/part-r-00000");        SequenceFile.Reader clusterReader = new SequenceFile.Reader(clusterPathFs, path, conf);        Text clusterKey = (Text) ReflectionUtils.newInstance(Text.class, conf);        Text clusterValue = (Text) ReflectionUtils.newInstance(Text.class, conf);        try {            while (clusterReader.next(clusterKey, clusterValue)) {                String[] line = clusterValue.toString().split(",");                for (int i = 0; i < line.length – 1; i++) {                    output.print(clusterKey.toString()+"\t");                    output.print(line[i]+"\n");                }            }        } finally {            IOUtils.closeStream(clusterReader);        }        output.close();    }}代码运行步骤:

第一步:将代码编译打包,生成kmeans.jar;

每个人心中,都会有一个古镇情怀,流水江南,烟笼人家。

LJBlog2014的专栏

相关文章:

你感兴趣的文章:

标签云: