Hadoop Streaming
id:naoya:20080511:1210506301 のエントリのコメント欄で kzk さんに教えていただいた Hadoop Streaming を試しています。
Hadoop はオープンソースの MapReduce + 分散ファイルシステムです。Java で作られています。Yahoo! Inc のバックエンドや、Facebook、Amazon.com などでも利用されているとのことです。詳しくは http://codezine.jp/a/article/aid/2448.aspx (kzk さんによる連載記事)を参照してください。
Hadoop Streaming
記事にもあります通り、Hadoop 拡張の Hadoop Streaming を使うと標準入出力を介するプログラムを記述するだけで、Hadoop による MapReduce を利用することができます。つまり、Java 以外の任意の言語で MapReduce が可能ということです。
例によって Apache のアクセスログからステータスコードの回数を数える計算を行ってみます。言語は Perl です。
#!/usr/local/bin/perl use strict; use warnings; while (<>) { chomp; my @segments = split /\s+/; printf "%s\t%s\n", $segments[8], 1; }
という Mapper (map.pl) と
#!/usr/local/bin/perl use strict; use warnings; my %count; while (<>) { chomp; my ($key, $value) = split /\t/; $count{$key}++; } while (my ($key, $value) = each %count) { printf "%s\t%s\n", $key, $value; }
という Reducer (reduce.pl) を書いて Hadoop に入力を与えると、Hadoop クラスタに分散されて計算が実行されます。map() への入力は標準入力を介して行ベースで渡ってきます。reduce() への入力は key-value ペアがタブ区切りです。
[naoya@colinux hadoop]% hadoop jar ~/hadoop/contrib/hadoop-0.15.3-streaming.jar \ -input httpd_logs \ -output logc_output \ -mapper /home/naoya/work/hadoop/analog/map.pl \ -reducer /home/naoya/work/hadoop/analog/reduce.pl \ -inputformat TextInputFormat \ -outputformat TextOutputFormat
上記のようにコマンドラインで、Mapper と Reducer に map.pl、reduce.pl を与えてやります。
出力は HDFS (Hadoop による分散ファイルシステム) 上に保存されます。HDFS のコマンドで cat します。
[naoya@colinux hadoop]% hadoop dfs -cat logc_output/* 304 262 200 4606 500 43 404 24
MapReduce における reduce() への入力データ構造
Hadoop Streaming は標準入出力を扱うだけで良く、これはUNIX プログラマにとっては非常になじみ深いプログラミングモデルであり、大きな利点であると思います。
ところで、Google 論文 にある Reduce の疑似コードを今一度見てみましょう。
reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));
reduce に渡ってくる入力は key-value ペアではなく key-values ペアです。map() で emit された key-value ペアは、reduce() に渡る前に、同一のキーでまとめられて、且つキーの順にソートされます。すなわち
# key => values (Iterator) 200 => [ 1,1,1,1,1,1,1,1,1,... ] 304 => [ 1,1,1,1,1,1,1,1,1,... ] 404 => [ 1,1,1,1,1,1,1,1,1,... ] 500 => [ 1,1,1,1,1,1,1,1,1,... ]
という形のデータ構造になっています。値はリストです。値はメモリに収まりきらないほど大きなリストになる可能性もあるので、Iterator になっています。
MapReduce の計算モデルが簡易でありつつ幅広い分野に応用が利くのは、この reduce() への入力データ構造が一つの鍵になっています。
最たるものはキーが昇順にソートされている点です。例えば MapReduce で検索エンジンの転置インデックスを作る場合は (単語 => ドキュメントIDのリスト) という入力が渡ってくることになります。この入力をストリームで処理して次々と emit してやると、reduce の出力もそのままキー、つまり単語順でソートされることになります。単語順でソートされていればその単語群に対する検索のアルゴリズムでアドバンテージが得られます。
これが、入力が不特定な順番で渡ってくるとなると、メモリに一度全部出力を溜めてから最後にソートする必要が出てきます。MapReduce で扱うようなデータは非常に膨大ですから、多くの場合単一のサーバーのメモリ内でのソートは難しい規模になるでしょう。このソートの必要性を排除するために、MapReduce は reduce() 前に分散環境であらかじめソートを行います。
Hadoop Streaming の reduce() への入力はソートこそされているものの、フラットな行入力です。つまり、
200 1 200 1 200 1 ... 304 1 304 1 ...
という具合です。このフラットな構造を単純に扱ったのでは MapReduce の利点が一つ失われてしまいます。現に、先のログ解析ではメモリに結果を一度溜めてしまっています。また、カウンタをハッシュテーブルで実装しているため出力がキー順になっていません。
Hadoop Streaming をより使いやすく
そこで、Hadoop Streaming の入力を構造化された状態で扱えるようにする簡単な Perl フレームワークを作りました。
map.pl は以下のように書きます。
#!/usr/bin/env perl package Analog::Mapper; use Moose; with 'Hadoop::Mapper'; sub map { my ($self, $key, $value) = @_; my @segments = split /\s+/, $value; $self->emit($segments[8] => 1); } package main; use FindBin::libs; Analog::Mapper->run;
一方の reduce.pl は以下です。
#!/usr/bin/env perl package Analog::Reducer; use Moose; with 'Hadoop::Reducer'; sub reduce { my ($self, $key, $values) = @_; my $count = 0; while ($values->has_next) { $count++; $values->next; } $self->emit($key, $count); } package main; use FindBin::libs; Analog::Reducer->run;
$values がイテレータになっている点にご注目ください。イテレータを回すことでストリームデータを構造化された入力として扱うことができます。
実際にこのフレームワークを使って計算させてみます。結果は以下のようになります。キーの順に並びます。
[naoya@colinux hadoop]% hadoop dfs -cat logc_output/* 200 4606 304 262 404 24 500 43
ソースは以下で公開しています。
半日弱ででっち上げたフレームワークなので、まだテストもありませんし、バグもありそうです。また、Hadoop Streaming はオプションにより入出力のフォーマットを切り替えることができますが、現時点では TextInputFormat / TextOutputFormat にしか対応していません。
Hadoop は Java 向け、ということでこれまで食わず嫌いでいましたが、Hadoop Streaming を知って俄然興味が沸いてきました。もう少し細部まで調べてみようと思います。kzk さん、ありがとうございます。連載の今後も期待しています。