Mahout Programming : K-Means Clustering

Mahout でのデータマイニング。mahout.clustering.kmeans を使ったクラスター分析を実装してみた。Mahoutは、Hadoop上で動くデータマイニング機械学習の各種アルゴリズムが実装されているライブラリ。

Mahout/Hadoop設定

mahout/hadoop の設定手順は以下のエントリにまとめてある。

Data for Clustering

R言語のデータで有名なiris(アヤメ)を使ってみる。

Rから出力したirisデータから数値部分を抽出したものを iris.csv として用いる。

iris.csv


構成

  • KmeansTest.java: Main に mahout.clustering.kmeans を用いクラスター分析を行うための一連の手続きを記述。
  • CsvUtil.java: 数値のCSVデータから、mahout.clustering.kmeans の実行の引数で与える List を作成している。

KmeansTest.java

package mahouttest.test;

import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.mahout.clustering.kmeans.Cluster;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class KmeansTest {

	public static void writePointToFile(List<Vector> points, String fileName,
			FileSystem fs, Configuration conf) throws IOException {
		// Writer
		Path path = new Path(fileName);
		SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
				LongWritable.class, VectorWritable.class);

		// Write to File
		VectorWritable vec = new VectorWritable();
		long recordNum = 0;
		for (Vector point : points) {
			vec.set(point);
			writer.append(new LongWritable(recordNum), vec);
		}
		writer.close();
	}

	public static void main(String args[]) throws Exception {
		
		// Directory
		File testData = new File("testdata");
		if (!testData.exists()) {
			testData.mkdir();
		}
		testData = new File("testdata/points");
		if (!testData.exists()) {
			testData.mkdir();
		}

		// Data for kmeans (Iris data)
		List<Vector> vectors = CsvUtil.readVectors("C:/data/iris.csv");

		// Config
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(conf);
		writePointToFile(vectors, "testdata/points/file1", fs, conf);
		// Path
		Path path = new Path("testdata/clusters/part-00000");
		SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
				Text.class, Cluster.class);

		// Initial Cluster
		int clusterNum = 3;
		for (int i = 0; i < clusterNum; i++) {
			Vector vec = vectors.get(i);
			Cluster cluster = new Cluster(vec, i);
			cluster.addPoint(cluster.getCenter());
			writer.append(new Text(cluster.getIdentifier()), cluster);
		}
		writer.close();

		// Run KMeans
		KMeansDriver.runJob("testdata/points", "testdata/clusters", "output",
				EuclideanDistanceMeasure.class.getName(), 0.001, 1000, 10);

		// Display Results
		SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(
				"output/points/part-00000"), conf);
		Text key = new Text();
		Text value = new Text();
		while (reader.next(key, value)) {
			System.out.println(key.toString() + ",clusterId ="
					+ value.toString());
		}
	}
}
CsvUtil.java
package mahouttest.test;

import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.Vector;

public class CsvUtil {

	public static List<List<Double>> read(String filename) {
		ArrayList<List<Double>> data = new ArrayList<List<Double>>();

		// ReadFile
		String inputString = null;
		try {
			File file = new File(filename);
			byte[] b = new byte[(int) file.length()];
			FileInputStream fis = new FileInputStream(file);
			fis.read(b);
			inputString = new String(b);
		} catch (Exception e) {
			return data;
		}
		if (inputString == null) {
			return data;
		}

		inputString = inputString.replaceAll("\r", "");
		String[] rowdata = inputString.split("\n");
		for (String str : rowdata) {
			String[] values = str.split(",");
			ArrayList<Double> row = new ArrayList<Double>();
			for (String value : values) {
				row.add(Double.valueOf(value));
			}
			data.add(row);
		}
		return data;
	}

	public static List<Vector> readVectors(String filename) {
		List<List<Double>> data = read(filename);
		return convert(data);
	}

	public static List<Vector> convert(List<List<Double>> data) {
		List<Vector> vectors = new ArrayList<Vector>();

		int j = 0;
		for (List<Double> rowdata : data) {
			double[] value = new double[rowdata.size()];
			for (int i = 0; i < rowdata.size(); i++) {
				value[i] = rowdata.get(i);
			}
			Vector vector = new DenseVector("Vector:" + String.valueOf(j++),
					value);
			vectors.add(vector);
		}
		return vectors;
	}
}

KMeansTest 実行結果

KMeansDriver#runJob 実行 でMapReduceが開始

2010/07/18 23:11:37 org.slf4j.impl.JCLLoggerAdapter info
情報: Input: testdata/points Clusters In: testdata/clusters Out: output Distance: org.apache.mahout.common.distance.EuclideanDistanceMeasure
2010/07/18 23:11:37 org.slf4j.impl.JCLLoggerAdapter info
情報: convergence: 0.0010 max Iterations: 10 num Reduce Tasks: 1 Input Vectors: org.apache.mahout.math.VectorWritable
2010/07/18 23:11:37 org.slf4j.impl.JCLLoggerAdapter info
情報: Iteration 0
2010/07/18 23:11:38 org.apache.hadoop.metrics.jvm.JvmMetrics init
情報: Initializing JVM Metrics with processName=JobTracker, sessionId=
2010/07/18 23:11:38 org.apache.hadoop.mapred.JobClient configureCommandLineOptions
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2010/07/18 23:11:38 org.apache.hadoop.mapred.FileInputFormat listStatus
情報: Total input paths to process : 1
2010/07/18 23:11:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
情報: Running job: job_local_0001
2010/07/18 23:11:39 org.apache.hadoop.mapred.FileInputFormat listStatus
情報: Total input paths to process : 1
2010/07/18 23:11:39 org.apache.hadoop.mapred.MapTask runOldMapper
情報: numReduceTasks: 1
2010/07/18 23:11:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
情報: io.sort.mb = 100
2010/07/18 23:11:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
情報: data buffer = 79691776/99614720
2010/07/18 23:11:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
情報: record buffer = 262144/327680
2010/07/18 23:11:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
情報: Starting flush of map output
2010/07/18 23:11:40 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
情報:  map 0% reduce 0%
...

出力 output/points/part-00000 に Clustering 結果が出力されていることが分かる。

...
Vector:0,clusterId =2
Vector:1,clusterId =2
Vector:2,clusterId =2
Vector:3,clusterId =2
Vector:4,clusterId =2
Vector:5,clusterId =2
Vector:6,clusterId =2
Vector:7,clusterId =2
Vector:8,clusterId =2
Vector:9,clusterId =2
Vector:10,clusterId =2
...

注意

  • Hadoopでは出力先にすでファイルにがあると出力されないので、output先には何もない形で実行する。

参考

クラスター分析入門―ファジィクラスタリングの理論と応用

クラスター分析入門―ファジィクラスタリングの理論と応用

Rで学ぶクラスタ解析

Rで学ぶクラスタ解析

k-means clustering - Wikipedia