ithreads でスレッドプール
マルチスレッドなサーバー実装を色々模索していて、Perl の ithreads で遊ぶ。ithreads は Linux の pthread にリンクさせた perl なら一応 NPTL で動いてくれるので、pthread アプリケーションの設計を試すのにも良い。
試しににやってみたのは、たとえば mod_perl とかで重い SQL でブロックするのが嫌なときとかにそれを別プロセスに丸投げしてやる、その丸投げされる側のサーバー実装。(やりたいことだけに関して言うと、TheSchwartz に似てる)
- クライアントとサーバーの IPC は UNIX ドメインソケット
- メッセージングのプロトコルは JSON
- サーバーはクライアントからのリクエストをバッファリングしたら、SQL を実行する前にクライアントとの接続を切断
- この時点でクライアントは制御が戻る
- サーバーは内部ではフロントエンド / バックエンドの二つの部分に分かれていて
という仕様。
#!/usr/local/bin/perl use strict; use warnings; use threads; use threads::shared; use IO::Socket; use IO::Select; use Path::Class; use JSON::Syck qw/Load/; use DBI; use Readonly; Readonly my $dsn => "dbi:mysql:database=myapp"; Readonly my $dbuser => "melody"; Readonly my $dbpass => "nelson"; ## shared variables my $queue : shared; $queue = &share([]); my $mutex : shared; my $cond : shared; my $socket = file('/var', 'tmp', 'mocod.sock'); $socket->remove if -e $socket; my $nthreads = shift || 5; my $listen = IO::Socket::UNIX->new( Type => SOCK_STREAM, Local => $socket, Listen => SOMAXCONN ) or die $!; for (1..$nthreads) { threads->new(\&handle_request)->detach; } my $select = IO::Select->new; $select->add($listen); while (1) { my @ready = $select->can_read; for my $sock (@ready) { if ($sock eq $listen) { my $con = $listen->accept or die "accept: $!"; $select->add($con); } else { my $nread = $sock->sysread(my $buffer, 1024); $select->remove($sock); $sock->close; if ($buffer) { lock($mutex); push @$queue, $buffer; lock($cond); cond_signal($cond); } } } } sub handle_request { while (1) { my $json = do { lock($mutex); cond_wait($cond, $mutex) while (@$queue == 0); shift @$queue; }; my $data = JSON::Syck::Load($json); my $dbh = DBI->connect_cached($dsn, $dbuser, $dbpass) or die $DBI::errstr; my $sth = $dbh->prepare_cached($data->{sql}, undef, 1) or warn $dbh->errstr and next; $sth->execute(@{$data->{bind}}) or warn $sth->errstr; $sth->finish; } }
クライアントとの I/O が 1,024 バイト固定というあたりで実装端折ってるんだけど、これで一応意図したとおりに動く。
% ps auxw -L | egrep '(mocod|NLWP)' USER PID LWP %CPU NLWP %MEM VSZ RSS TTY STAT START TIME COMMAND naoya 17764 17764 2.1 6 1.5 62092 15784 pts/2 Sl+ 17:46 0:00 perl threaded_mocod.pl naoya 17764 17765 0.0 6 1.5 62092 15784 pts/2 Sl+ 17:46 0:00 perl threaded_mocod.pl naoya 17764 17766 0.0 6 1.5 62092 15784 pts/2 Sl+ 17:46 0:00 perl threaded_mocod.pl naoya 17764 17767 0.0 6 1.5 62092 15784 pts/2 Sl+ 17:46 0:00 perl threaded_mocod.pl naoya 17764 17768 0.0 6 1.5 62092 15784 pts/2 Sl+ 17:46 0:00 perl threaded_mocod.pl naoya 17764 17769 0.0 6 1.5 62092 15784 pts/2 Sl+ 17:46 0:00 perl threaded_mocod.pl
と、ps するとメインスレッドと併せて計6本、NPTL スレッドが立ち上がる。
#!/usr/local/bin/perl use strict; use warnings; use IO::Socket::UNIX; use JSON::Any; use Path::Class qw/file/; my $socket = file('/var', 'tmp', 'mocod.sock'); my $json = JSON::Any->new; my $con = IO::Socket::UNIX->new( Type => SOCK_STREAM, Peer => $socket, ) or die $@; $con->syswrite($json->to_json({ sql => "INSERT INTO log values(NULL, ?, ?)", bind => [qw/naoya foobar/], })); $con->close;
とかこんなクライアントを書いて、JSON で SQL とプレースホルダを投げてやるとサーバー側でそれが実行される。クライアントはリクエストを送るところだけブロックするので最小限のやりとりで済みます。IPC を UNIX ドメインソケットでなく、もっと軽量なものにすればよりクライアントに負担がかからない設計にもできるでしょう。(たとえばクライアント/サーバーで同期しながら共有メモリ... というのだと http://cheesy.dip.jp/tutorialog/archives/6 の実装とかが面白い。)
ithreads 周り。
- ちょっと良くわかってないのが threads::shared の cond_signal() / cond_wait() あたり。pthreads のそれと同じように扱いたいがために mutex 相当のロック変数を確保したりしてるけど、なんか冗長なような。
- スレッド間の共有リソースにソケットオブジェクトを入れられない(この実装だと $queue に push @$queue, $socket できない) とかはこういうときに不便。
- CPAN モジュールがスレッドセーフなのかが全然自明でないのがアレ。
ithreads はプロトタイピングには良いです。自分の理解レベルでは実際のサービスで使うのにはまだちょっと怖いです。