Mahout Programming : K-Means Clustering
Mahout でのデータマイニング。mahout.clustering.kmeans を使ったクラスター分析を実装してみた。Mahoutは、Hadoop上で動くデータマイニング・機械学習の各種アルゴリズムが実装されているライブラリ。
クラスター分析
クラスター分析の方法論自体の内容は以下のエントリにまとめてある。
- R言語プログラミング: クラスター分析 - 階層的クラスタリング - hamadakoichi blog
- 第2回データマイニング+WEB 勉強会@東京 (Tokyo.Webmining#2) を開催しました - 「はじめてでもわかる R言語によるクラスター分析」 - hamadakoichi blog
- 第3回 データマイニング+WEB 勉強会@東京 (Tokyo.Webmining#3) を開催します - hamadakoichi blog
- R言語プログラミング: クラスター分析 - 混合分布モデル - hamadakoichi blog
- R言語プログラミング: クラスター分析 - k-means - hamadakoichi blog
- 第3回データマイニング+WEB 勉強会@東京 (Tokyo.Webmining#3) を開催しました−「R言語による クラスター分析 -活用編-」 - hamadakoichi blog
Reference
Data for Clustering
R言語のデータで有名なiris(アヤメ)を使ってみる。
Rから出力したirisデータから数値部分を抽出したものを iris.csv として用いる。
iris.csv
構成
KmeansTest.java
package mahouttest.test; import java.io.File; import java.io.IOException; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.mahout.clustering.kmeans.Cluster; import org.apache.mahout.clustering.kmeans.KMeansDriver; import org.apache.mahout.common.distance.EuclideanDistanceMeasure; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; public class KmeansTest { public static void writePointToFile(List<Vector> points, String fileName, FileSystem fs, Configuration conf) throws IOException { // Writer Path path = new Path(fileName); SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, LongWritable.class, VectorWritable.class); // Write to File VectorWritable vec = new VectorWritable(); long recordNum = 0; for (Vector point : points) { vec.set(point); writer.append(new LongWritable(recordNum), vec); } writer.close(); } public static void main(String args[]) throws Exception { // Directory File testData = new File("testdata"); if (!testData.exists()) { testData.mkdir(); } testData = new File("testdata/points"); if (!testData.exists()) { testData.mkdir(); } // Data for kmeans (Iris data) List<Vector> vectors = CsvUtil.readVectors("C:/data/iris.csv"); // Config Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); writePointToFile(vectors, "testdata/points/file1", fs, conf); // Path Path path = new Path("testdata/clusters/part-00000"); SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, Text.class, Cluster.class); // Initial Cluster int clusterNum = 3; for (int i = 0; i < clusterNum; i++) { Vector vec = vectors.get(i); Cluster cluster = new Cluster(vec, i); cluster.addPoint(cluster.getCenter()); writer.append(new Text(cluster.getIdentifier()), cluster); } writer.close(); // Run KMeans KMeansDriver.runJob("testdata/points", "testdata/clusters", "output", EuclideanDistanceMeasure.class.getName(), 0.001, 1000, 10); // Display Results SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path( "output/points/part-00000"), conf); Text key = new Text(); Text value = new Text(); while (reader.next(key, value)) { System.out.println(key.toString() + ",clusterId =" + value.toString()); } } }
CsvUtil.java
package mahouttest.test; import java.io.File; import java.io.FileInputStream; import java.util.ArrayList; import java.util.List; import org.apache.mahout.math.DenseVector; import org.apache.mahout.math.Vector; public class CsvUtil { public static List<List<Double>> read(String filename) { ArrayList<List<Double>> data = new ArrayList<List<Double>>(); // ReadFile String inputString = null; try { File file = new File(filename); byte[] b = new byte[(int) file.length()]; FileInputStream fis = new FileInputStream(file); fis.read(b); inputString = new String(b); } catch (Exception e) { return data; } if (inputString == null) { return data; } inputString = inputString.replaceAll("\r", ""); String[] rowdata = inputString.split("\n"); for (String str : rowdata) { String[] values = str.split(","); ArrayList<Double> row = new ArrayList<Double>(); for (String value : values) { row.add(Double.valueOf(value)); } data.add(row); } return data; } public static List<Vector> readVectors(String filename) { List<List<Double>> data = read(filename); return convert(data); } public static List<Vector> convert(List<List<Double>> data) { List<Vector> vectors = new ArrayList<Vector>(); int j = 0; for (List<Double> rowdata : data) { double[] value = new double[rowdata.size()]; for (int i = 0; i < rowdata.size(); i++) { value[i] = rowdata.get(i); } Vector vector = new DenseVector("Vector:" + String.valueOf(j++), value); vectors.add(vector); } return vectors; } }
KMeansTest 実行結果
KMeansDriver#runJob 実行 でMapReduceが開始
2010/07/18 23:11:37 org.slf4j.impl.JCLLoggerAdapter info 情報: Input: testdata/points Clusters In: testdata/clusters Out: output Distance: org.apache.mahout.common.distance.EuclideanDistanceMeasure 2010/07/18 23:11:37 org.slf4j.impl.JCLLoggerAdapter info 情報: convergence: 0.0010 max Iterations: 10 num Reduce Tasks: 1 Input Vectors: org.apache.mahout.math.VectorWritable 2010/07/18 23:11:37 org.slf4j.impl.JCLLoggerAdapter info 情報: Iteration 0 2010/07/18 23:11:38 org.apache.hadoop.metrics.jvm.JvmMetrics init 情報: Initializing JVM Metrics with processName=JobTracker, sessionId= 2010/07/18 23:11:38 org.apache.hadoop.mapred.JobClient configureCommandLineOptions 警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 2010/07/18 23:11:38 org.apache.hadoop.mapred.FileInputFormat listStatus 情報: Total input paths to process : 1 2010/07/18 23:11:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 情報: Running job: job_local_0001 2010/07/18 23:11:39 org.apache.hadoop.mapred.FileInputFormat listStatus 情報: Total input paths to process : 1 2010/07/18 23:11:39 org.apache.hadoop.mapred.MapTask runOldMapper 情報: numReduceTasks: 1 2010/07/18 23:11:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 情報: io.sort.mb = 100 2010/07/18 23:11:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 情報: data buffer = 79691776/99614720 2010/07/18 23:11:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 情報: record buffer = 262144/327680 2010/07/18 23:11:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush 情報: Starting flush of map output 2010/07/18 23:11:40 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 情報: map 0% reduce 0% ...
出力 output/points/part-00000 に Clustering 結果が出力されていることが分かる。
... Vector:0,clusterId =2 Vector:1,clusterId =2 Vector:2,clusterId =2 Vector:3,clusterId =2 Vector:4,clusterId =2 Vector:5,clusterId =2 Vector:6,clusterId =2 Vector:7,clusterId =2 Vector:8,clusterId =2 Vector:9,clusterId =2 Vector:10,clusterId =2 ...
注意
- Hadoopでは出力先にすでファイルにがあると出力されないので、output先には何もない形で実行する。
参考
- 作者: 宮本定明
- 出版社/メーカー: 森北出版
- 発売日: 1999/10
- メディア: 単行本
- クリック: 36回
- この商品を含むブログ (8件) を見る
- 作者: 新納浩幸
- 出版社/メーカー: オーム社
- 発売日: 2007/11/01
- メディア: 単行本
- 購入: 9人 クリック: 207回
- この商品を含むブログ (29件) を見る