ithreads でスレッドプール

マルチスレッドなサーバー実装を色々模索していて、Perl の ithreads で遊ぶ。ithreads は Linux の pthread にリンクさせた perl なら一応 NPTL で動いてくれるので、pthread アプリケーションの設計を試すのにも良い。

試しににやってみたのは、たとえば mod_perl とかで重い SQL でブロックするのが嫌なときとかにそれを別プロセスに丸投げしてやる、その丸投げされる側のサーバー実装。(やりたいことだけに関して言うと、TheSchwartz に似てる)

  • クライアントとサーバーの IPC は UNIX ドメインソケット
  • メッセージングのプロトコルJSON
  • サーバーはクライアントからのリクエストをバッファリングしたら、SQL を実行する前にクライアントとの接続を切断
  • この時点でクライアントは制御が戻る
  • サーバーは内部ではフロントエンド / バックエンドの二つの部分に分かれていて
    • フロントエンドが I/O 多重化シングルスレッドでクライアントリクエストをバッファリング、エンキュー
    • バックエンドがキューを監視してマルチスレッドでデキュー、JSON を parse して SQL を実行

という仕様。

#!/usr/local/bin/perl
use strict;
use warnings;

use threads;
use threads::shared;

use IO::Socket;
use IO::Select;
use Path::Class;
use JSON::Syck qw/Load/;
use DBI;
use Readonly;

Readonly my $dsn => "dbi:mysql:database=myapp";
Readonly my $dbuser => "melody";
Readonly my $dbpass => "nelson";

## shared variables
my $queue : shared;
$queue = &share([]);
my $mutex : shared;
my $cond  : shared;

my $socket = file('/var', 'tmp', 'mocod.sock');
$socket->remove if -e $socket;

my $nthreads = shift || 5;

my $listen = IO::Socket::UNIX->new(
    Type => SOCK_STREAM,
    Local => $socket,
    Listen => SOMAXCONN
) or die $!;

for (1..$nthreads) {
    threads->new(\&handle_request)->detach;
}

my $select = IO::Select->new;
$select->add($listen);

while (1) {
    my @ready = $select->can_read;
    for my $sock (@ready) {
        if ($sock eq $listen) {
            my $con = $listen->accept or die "accept: $!";
            $select->add($con);
        } else {
            my $nread = $sock->sysread(my $buffer, 1024);
            $select->remove($sock);
            $sock->close;

            if ($buffer) {
                lock($mutex);
                push @$queue, $buffer;
                lock($cond);
                cond_signal($cond);
            }
        }
    }
}

sub handle_request {
    while (1) {
        my $json = do {
            lock($mutex);
            cond_wait($cond, $mutex) while (@$queue == 0);
            shift @$queue;
        };
        my $data = JSON::Syck::Load($json);
        my $dbh = DBI->connect_cached($dsn, $dbuser, $dbpass)
            or die $DBI::errstr;
        my $sth = $dbh->prepare_cached($data->{sql}, undef, 1)
            or warn $dbh->errstr and next;
        $sth->execute(@{$data->{bind}}) or warn $sth->errstr;
        $sth->finish;
    }
}

クライアントとの I/O が 1,024 バイト固定というあたりで実装端折ってるんだけど、これで一応意図したとおりに動く。

% ps auxw -L | egrep '(mocod|NLWP)'
USER       PID   LWP %CPU NLWP %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
naoya    17764 17764  2.1    6  1.5  62092 15784 pts/2    Sl+  17:46   0:00 perl threaded_mocod.pl
naoya    17764 17765  0.0    6  1.5  62092 15784 pts/2    Sl+  17:46   0:00 perl threaded_mocod.pl
naoya    17764 17766  0.0    6  1.5  62092 15784 pts/2    Sl+  17:46   0:00 perl threaded_mocod.pl
naoya    17764 17767  0.0    6  1.5  62092 15784 pts/2    Sl+  17:46   0:00 perl threaded_mocod.pl
naoya    17764 17768  0.0    6  1.5  62092 15784 pts/2    Sl+  17:46   0:00 perl threaded_mocod.pl
naoya    17764 17769  0.0    6  1.5  62092 15784 pts/2    Sl+  17:46   0:00 perl threaded_mocod.pl

と、ps するとメインスレッドと併せて計6本、NPTL スレッドが立ち上がる。

#!/usr/local/bin/perl
use strict;
use warnings;
use IO::Socket::UNIX;
use JSON::Any;
use Path::Class qw/file/;

my $socket = file('/var', 'tmp', 'mocod.sock');
my $json = JSON::Any->new;

my $con = IO::Socket::UNIX->new(
    Type => SOCK_STREAM,
    Peer => $socket,
) or die $@;

$con->syswrite($json->to_json({
    sql  => "INSERT INTO log values(NULL, ?, ?)",
    bind => [qw/naoya foobar/],
}));
$con->close;

とかこんなクライアントを書いて、JSONSQLプレースホルダを投げてやるとサーバー側でそれが実行される。クライアントはリクエストを送るところだけブロックするので最小限のやりとりで済みます。IPC を UNIX ドメインソケットでなく、もっと軽量なものにすればよりクライアントに負担がかからない設計にもできるでしょう。(たとえばクライアント/サーバーで同期しながら共有メモリ... というのだと http://cheesy.dip.jp/tutorialog/archives/6 の実装とかが面白い。)

ithreads 周り。

  • ちょっと良くわかってないのが threads::shared の cond_signal() / cond_wait() あたり。pthreads のそれと同じように扱いたいがために mutex 相当のロック変数を確保したりしてるけど、なんか冗長なような。
  • スレッド間の共有リソースにソケットオブジェクトを入れられない(この実装だと $queue に push @$queue, $socket できない) とかはこういうときに不便。
  • CPAN モジュールがスレッドセーフなのかが全然自明でないのがアレ。

ithreads はプロトタイピングには良いです。自分の理解レベルでは実際のサービスで使うのにはまだちょっと怖いです。

Pthreadsプログラミング