Mahout: Data Converter for Clustering
Mahout Clustering 実行の入力形式へ変換する DataConverter を実装してみた。以下に Source Code、Command line 実行方法も含め紹介します。
Mahout は Hadoop上で動作する大規模 Data Mining/Machine Learning の Library。Mahout Clustering Packageには、K-Means, Fuzzy K-Means, Canopy, Mean Shift, Latent Dirichlet Allocation 等、豊富な実装が提供されている。ただ Random Forest, FPGrowth 等の他のアルゴリズムと異なり、Mahout Clustering 実装は、専用のデータ入力形式を必要とする。発展途上による情報の少なさ、および、入力データ生成の手続きを各自が実装する必要があることが、初学者の敷居を高くしているのではないかと感じている。
大規模データ活用したいより多くの人々がそれを実現できるようにしたいと考え、Mahout Clustering の入力形式へデータ変換する Converter 実装を以下に紹介します。
「タブ区切りのテキスト形式」の"Id, 各座標"データから「Vector の SequenceFile 形式」へ変換する。Mahout では、座標点(Vector)の数値表現に関し、通常の"密(dense)"な場合に適した 「DenseVector」、"疎(sparse)"な場合に適した "SparseVector" という、2 つの Data Class が提供されている。以下の実装では DenseVector を用いているが、SparseVectorへの置き換えも容易に行える。また、"タブ区切り"の部分も同様に、他のテキスト形式に容易に置き換えられる。以下、org.apache.mahout.clustering.tools Packageに追加、実装している。引数等のコメントも付記した形で Source Code を以下に記載します。
実装
TextToVectorConverter
Command line 実行できる Converter Class。DenseVectorizeMapper 呼び出しによりデータ変換を行う。--input (or -i ) , --output (or -o ) Option でHadoop File System (HDFS) 上のパス指定し、テキストファイルを入力形式ファイルへ変換できる。
package org.apache.mahout.clustering.tools; import java.io.IOException; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.mahout.common.AbstractJob; import org.apache.mahout.math.VectorWritable; /** * This class converts a set of input data in the text file format to vectors in * the sequence file format. The Text file input should have a {@link Text} key * containing a unique identifier and {@link Text} values * * @author hamadakoichi */ public class TextToVectorConverter extends AbstractJob { /** * Convert input data in the {@link Text} format to vectors in the * {@link SequenceFile} format. * * @param input * input directory of the {@link Text} format * @param output * output directory where * {@link org.apache.mahout.math.DenseVector}s in the * {@link SequenceFile} format are generated */ public static void convert(Configuration conf, Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException { Job job = new Job(conf); job.setJobName("TextToDenseVectorConverter"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(VectorWritable.class); job.setMapperClass(DenseVectorizeMapper.class); job.setNumReduceTasks(0); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setJarByClass(TextToVectorConverter.class); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); if (job.waitForCompletion(true) == false) { throw new InterruptedException( "TextToDenseVectorConverter Job failed processing" + input.toString()); } } public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new TextToVectorConverter(), args); } @Override public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException { addInputOption(); addOutputOption(); if (parseArguments(args) == null) { return -1; } Path input = getInputPath(); Path output = getOutputPath(); run(getConf(), input, output); return 0; } /** * Run the job using supplied arguments * * @param conf * @param textInputDir * input directory of the {@link Text} format * @param seqOutputDir * output directory where * {@link org.apache.mahout.math.DenseVector}s in the * {@link SequenceFile} format are generated **/ public static void run(Configuration conf, Path textInputDir, Path seqOutputDir) throws IOException, InterruptedException, ClassNotFoundException, InstantiationException, IllegalAccessException { TextToVectorConverter.convert(conf, textInputDir, seqOutputDir); } }
DenseVectorizeMapper
"Id, N次元座標値" 値を持つ、タブ区切りテキストファイルを、DenseVector に変換する Mapper。最近のHadoop 0.21 以降で推奨される形式で実装している。
package org.apache.mahout.clustering.tools; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.math.DenseVector; import org.apache.mahout.math.NamedVector; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; /** * ConvertMapper for DenseVector * * @author hamadakoichi */ public class DenseVectorizeMapper extends Mapper<LongWritable, Text, Text, VectorWritable> { private Text id = new Text(); private VectorWritable point = new VectorWritable(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] rowdata = line.split("\t", -1); String sid = rowdata[0]; id.set(sid); double[] dpoint = new double[rowdata.length - 1]; for (int i = 1; i < rowdata.length; i++) { dpoint[i - 1] = Double.valueOf(rowdata[i]); } Vector vpoint = new DenseVector(dpoint); NamedVector nvec = new NamedVector(vpoint, sid); point.set(nvec); context.write(id, point); } }
実行方法
Input File 形式
各行に"Id, N次元座標値" 値を持つ、タブ区切りテキスト。
id, value1, value2, ..., valueN.
Command line 実行
上記、作成した org.apache.mahout.clustering.tools Package も追加し コンパイル・作成しなおした mahout-core-0.4-Job.jar ファイルを次のように実行。出力先 testdata/vector.seq に出力される Vector の SequenceFile を 各 Mahout Clustering 実行の入力として使用できる。
$HADOOP_HOME/bin/hadoop jar $MAHOUT_HOME/mahout-core-0.4-job.jar \ org.apache.mahout.clustering.tools.TextToVectorConverter \ -i testdata/input.tsv \ -o testdata/vector \
testdata/vector/part-m-00000 に変換されたSequenceFileができる。
Option
Job-Specific Options: --input (-i) input Text-file input-directory (Tab Separated). --output (-o) output Sequence-file output-directory (Vectors).