Mahout: Canopy and K-means Clustering

MahoutのCanopyとK-Meansを用い、Canopy生成しCanopy Centroidを用いたK-Means Clustering実行できる Driverの実装法を解説します。次のようなコマンドライン呼び出しで、質の良い Canopy+K-Meansの一連のClusteringの手続きを実行できます。以下では、org.apache.mahout.clustering.canopykmeans packageを作り、追加実装しコンパイルしたJobファイルを $MAHOUT_HOME/bin/mahout-core-0.4-job.jar に置いています。MahoutのVersionは最新の0.4です。

実行例

$HADOOP_HOME/bin/hadoop jar $MAHOUT_HOME/bin/mahout-core-0.4-job.jar \
org.apache.mahout.clustering.canopykmeans.CanopyKmeansDriver \
 -i testdata/vectors \
 -o testdata/cluster \
 -dm org.apache.mahout.common.distance.EuclideanDistanceMeasure \
 -t1 2000 \
 -t2 1500 \
 -cl \
 -k 20 \
 -cd 0.1 \
 -x 100 \
 -ow \
 -xm mapreduce

実行時、以下のOptionを指定できます。

実行Option

Job-Specific Options:                                                           
  --input (-i) input                           Path to job input directory 
  --output (-o) output                         The directory pathname for output                        
  --distanceMeasure (-dm) distanceMeasure      The classname of the DistanceMeasure. Default is SquaredEuclidean                 
  --t1 (-t1) t1                                T1 max threshold value               
  --t2 (-t2) t2                                T2 min threshold value               
  --overwrite (-ow)                            If present, overwrite the output directory before running job     
  --convergenceDelta (-cd) convergenceDelta    The convergence delta value. Default is 0.5                   
  --maxIter (-x) maxIter                       The maximum number of iterations.   
  --clustering (-cl)                           If present, run kmeans clustering
  --method (-xm) method                        The execution method to use: sequential or mapreduce. Default is mapreduce                   
  --help (-h)                                  Print out help                   


アルゴリズム、ソースも含め、以下に解説する。(※Mahout は Hadoop上で動作する大規模データマイニング機械学習のライブラリ)

Canopy Generation/Clustering

Canopy Clustering ではクラスタ数を指定せず、指定距離範囲だけ重心が離れたクラスタを導出するアルゴリズム。k-means 等、通常の分割するクラスタ数を指定し Clustering が行われ、そのクラスタ分割数が適切であるか否かは別途検討が必要である。一方、Canopy Clustering では距離範囲 [T2, T1]を指定し、その距離範囲だけ離れた適切なデータ点の組を算出することにより、指定距離範囲で重心が離れた形でのクラスタリングを行うことが出来る。「第9回データマイニング+WEB勉強会@東京」でトークした資料も以下に記載。

第9回 データマイニング+WEB 勉強会@東京 ( #TokyoWebmining #9) −2nd Week−方法論・ソーシャル祭り− を開催しました - hamadakoichi blog

Canopy Clustering and K-means Clustering

Canopy中心(Canopy Centroids)は、k-means アルゴリズムのよい初期点して活用できる。通常のK-meansでは初期点を Random に選ぶため適切にクラスタ分割される保証はない。例えば、点の分布に偏りがある場合、同じ塊の中の点を選んでしまい適切な距離離れたクラスタを得ることができない。Canopy Centroids は上記のように指定距離離れていることが保証されているので、よりクラスタ初期点となる。Hadoop Sumit Booz Allen Hamilton : http://www.slideshare.net/ydn/3-biometric-hadoopsummit2010

CanopyKmeansDriver

Mahoutを用い Canopy Centroids算出し K-Meansを実行する CanopyKmeansDriverを実行するDriverを実装してみた。追加実装しコンパイルしたJobファイルを $MAHOUT_HOME/bin/mahout-core-0.4-job.jar に置いています。org.apache.mahout.clustering.canopykmeans というPackageに追加しています。
算出した Canopy Centroidsの全てを初期ClusterとしてK-Meansを実行でき、-k オプションで指定すると Canopy Centroidsの中からK個を選びK-means Clustering することもできる。testdata/vectors をVectorの入力Directory、testdata/clusterを結果Clusterの出力Directoryとする。

Mahout Clustering の各入力形式である Vectors のSequenceFile を生成する Data Converter は以前のエントリに実行法・ソースを記載している。Mahout: Data Converter for Clustering - hamadakoichi blog

package org.apache.mahout.clustering.canopykmeans;

import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.common.distance.DistanceMeasure;
import org.apache.mahout.clustering.canopy.CanopyDriver;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;

public class CanopyKmeansDriver extends AbstractJob {
	private static final String CANOPIES_DIR = "canopies";

	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new CanopyKmeansDriver(), args);
	}

	@Override
	public int run(String[] args) throws IOException, ClassNotFoundException,
			InterruptedException, InstantiationException, IllegalAccessException {

		addInputOption();
		addOutputOption();
		addOption(DefaultOptionCreator.distanceMeasureOption().create());
		addOption(DefaultOptionCreator.t1Option().create());
		addOption(DefaultOptionCreator.t2Option().create());
		addOption(DefaultOptionCreator.numClustersOption().create());
		addOption(DefaultOptionCreator.clusteringOption().create());
		addOption(DefaultOptionCreator.overwriteOption().create());
		addOption(DefaultOptionCreator.convergenceOption().create());
		addOption(DefaultOptionCreator.maxIterationsOption().create());
		addOption(DefaultOptionCreator.methodOption().create());
		
		if (parseArguments(args) == null) {
			return -1;
		}

		Path input = getInputPath();
		Path output = getOutputPath();
		if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
			HadoopUtil.overwriteOutput(output);
		}
		String measureClass = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
		double t1 = Double.parseDouble(getOption(DefaultOptionCreator.T1_OPTION));
		double t2 = Double.parseDouble(getOption(DefaultOptionCreator.T2_OPTION));
		double convergenceDelta = Double.parseDouble(getOption(DefaultOptionCreator.CONVERGENCE_DELTA_OPTION));
		int maxIterations = Integer.parseInt(getOption(DefaultOptionCreator.MAX_ITERATIONS_OPTION));
		
		ClassLoader loader = Thread.currentThread().getContextClassLoader();
		DistanceMeasure measure = loader.loadClass(measureClass).asSubclass(DistanceMeasure.class).newInstance();
		
		boolean runClustering = hasOption(DefaultOptionCreator.CLUSTERING_OPTION);
		boolean runSequential = getOption(DefaultOptionCreator.METHOD_OPTION).equalsIgnoreCase(DefaultOptionCreator.SEQUENTIAL_METHOD);
		
		run(getConf(), input, output, measure, t1, t2, convergenceDelta,maxIterations, runClustering, runSequential);
		return 0;
	}

	public static void run(Configuration conf, Path input, Path output,
			DistanceMeasure measure, double t1, double t2,
			double convergenceDelta, int maxIterations, boolean runClustering, boolean runSequential) throws IOException,
			InterruptedException, ClassNotFoundException,
			InstantiationException, IllegalAccessException {

		// Run Canopy and Kmeans Clustering
		Path canopyCentroids = new Path(output, CANOPIES_DIR);
		boolean runTmpClustering = false;
		
		CanopyDriver.run(conf, input, canopyCentroids, measure, t1, t2, runTmpClustering, runSequential);
		KMeansDriver.run(conf, input, new Path(canopyCentroids, "clusters-*"), output, measure,
				convergenceDelta, maxIterations, runClustering, runSequential);
	}
}

Clusterの中身をみる

ClusterDumperを用いSequenceFile の Clusterの中身をみることができる。
org.apache.mahout.utils.clustering.ClusterDumper

実行法:

$MAHOUT_HOME/bin/mahout bin/mahout clusterdump \
^K-s clusterDirPath \
^K-o outputTextPath^K

実行例:

$MAHOUT_HOME/bin/mahout bin/mahout clusterdump \
^K -s testdata/clusters/clusters* \
^K -o cluster_results.txt^K

Vectorの中身をみる

VectorDumperを用い SequenceFileの Vectorの中身もみることができる。
org.apache.mahout.utils.vectors.VectorDumper

実行法:

$MAHOUT_HOME/bin/mahout vectordump \
 -s vectorDirPath \
 ^K-o outputTextPath^K

実行例:

$MAHOUT_HOME/bin/mahout vectordump \
 -s testdata/vectors/part-m-00000 \
 ^K-o vector_results.txt^K