R言語で MapReduce −Hadoop Streaming−

統計解析・データマイニングに特化した言語である R言語
Hadoop Streaming を使った R言語でのMapReduce実装を以下に紹介。


Hadoop Streaming で標準入出力を用いデータの受け渡しを行い、files オプションで R の mapper, reducer を配布し実行。
例えば、WordCountはR言語MapReduce実装で以下のように書ける。

mapper.r

#!/usr/bin/Rscript

con = file(description="stdin",open="r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
	line <- unlist(strsplit(line, "\t"))
	for(word in line){
		cat(sprintf("%s\t%s\n", word, 1), sep = "")
	}
}
close(con)

reducer.r

#!/usr/bin/Rscript

env <- new.env(hash = TRUE)
con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
	line <- unlist(strsplit(line, "\t"))
	key <- line[1]
	value <- as.integer(line[2])
	if (exists(key, envir = env, inherits = FALSE)) {
		count <- get(key, envir = env)
		assign(key, count + value, envir = env)
	} 
	else {
		assign(key, value, envir = env)
	}
}
close(con)

for (key in ls(env, all = TRUE)) {
	cat(key, "\t", get(key, envir = env), "\n", sep = " ")
}

実行

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-x.xx.x-streaming.jar  \
-files mapper.r, reducer.r \
-input /rtest/input.tsv \
-output /rtest/output \
-mapper mapper.r \
-reducer reducer.r

データ例

入力例:/rtest/input.tsv

a       b       b
a       a       a       d       d
b       b       c       e

出力:/rtest/output/part-00000

a        4
b        4
c        1
d        2
e        1

参考文献

Hadoop

Hadoop