読者です 読者をやめる 読者になる 読者になる

自由課題

学んだり、考えたり、試したりしたこと。

node.jsを支えるlibuvのチュートリアル"uvbook" :プロセス

C node.js

この文書はuvbookの日本語翻訳の一部となります。文書そのものの説明その他については目次をご覧ください。

プロセス

libuvはかなりの量の子プロセス管理機能を提供しており、プラットフォーム間の差異を抽象化し、ストリームや名前付きパイプを用いて子プロセスと通信することを可能にしています。

Unixにおける共通のイディオムは全てのプロセスが一つのことを良好に行えることです。このようなケースでは、プロセスはタスクを完遂するために複数の子プロセスを使用します(シェルがパイプを用いるように)。メッセージを用いるマルチプロセスモデルはスレッドと共有メモリを用いるモデルに比較して簡単になるでしょう。

イベントベースのプログラムに対する共通のマイナス要因は現代のコンピュータにおいて複数のコアを持つ利点を活用しきれない点です。マルチスレッドプログラムではカーネルはスケジューリングを行い、異なるスレッドに異なるコアを割り当てることができます。しかし、イベントループは単一のスレッドです。これに対する回避策は代わりに複数のプロセスを起動することであり、各プロセスはイベントループを処理し、各プロセスは別々のコアを割り当てられます。

子プロセスの起動

最も単純なケースはプロセスを起動し、いつプロセスが終了したかを知ることです。これは uv_spawn を用いることで達成されます。

spawn/main.c

uv_loop_t *loop;
uv_process_t child_req;
uv_process_options_t options;

int main() {
    loop = uv_default_loop();

    char* args[3];
    args[0] = "mkdir";
    args[1] = "test-dir";
    args[2] = NULL;

    options.exit_cb = on_exit;
    options.file = "mkdir";
    options.args = args;

    if (uv_spawn(loop, &child_req, options)) {
        fprintf(stderr, "%s¥n", uv_strerror(uv_last_error(loop)));
        return 1;
    }

    return uv_run(loop, UV_RUN_DEFAULT);
}

NOTE
optionsグローバル変数であるため、明示的に0で初期化されています。もし options とローカル変数に変える場合、使用しない全てのフィールドをnullに初期化することを忘れないでください。

    uv_process_options_t options = {0};

uv_process_t 構造体はウォッチャとしてのみ動作し、全てのオプションは uv_process_options_t を通じて設定します。プロセスの起動を簡単にするために、 fileargs フィールドのみを設定する必要があります。 file は実行するプログラムです。 uv_spawn は内部的にexecvpを内部的に使用するため、フルパスを渡す必要はありません。最後に、基本的な規則として、 引数の配列は引数の数より一つ大きく、最後の要素はNULLである必要があります。

uv_spawn の呼び出しのあとに、 uv_process_t.pid には子プロセスのプロセスIDが格納されています。

exitコールバックは 終了ステータス と exitの原因となった signal の種類とともに呼び出されます。

spawn/main.c

void on_exit(uv_process_t *req, int exit_status, int term_signal) {
    fprintf(stderr, "Process exited with status %d, signal %d¥n", exit_status, term_signal);
    uv_close((uv_handle_t*) req, NULL);
}

プロセスが終了した後にプロセスのウォッチャをクローズすることが 必要です。

プロセスのパラメータ変更

子プロセスが起動する前に uv_process_options_t 内のフィールドを用いて実行環境を制御することができます。

実行ディレクトリの変更

uv_process_options_t.cwd を対応するディレクトリに設定します。

環境変数の設定

uv_process_options_t.env は文字列のヌル終端された配列であり、 各 VAR=VALUE 形式はプロセスの環境変数を設定するために用いられます。これを NULL にすることにより親プロセスからからの環境変数を受け継ぎます。

オプションフラグ

uv_process_options_t.flags に下記のビット単位のORフラグを設定することにより、子プロセスの動作を修正します。

  • UV_PROCESS_SETUID - これは子の実行ユーザIDを uv_process_options_t.uid に設定します。 UV_PROCESS_SETGID - これは子の実行グループIDを uv_process_options_t.gid に設定します。

UID/GIDの変更はUnixのみでサポートされており、 uv_spawnWindowsでは UV_ENOTSUP で失敗します。

  • UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS - uv_process_options_t.args にクオートやエスケープを付与しません。Unixでは無視されます。
  • UV_PROCESS_DETACHED - 新しいセッションで子プロセスを開始し、親プロセスが終了した後も処理を続けます。下記の例を参照してください。

プロセスをデタッチする

UV_PROCESS_DETACHED フラグを渡すことで、親の終了が影響しないデーモン、もしくは親と独立した子プロセスを起動することができます。

detach/main.c

int main() {
    loop = uv_default_loop();

    char* args[3];
    args[0] = "sleep";
    args[1] = "100";
    args[2] = NULL;

    options.exit_cb = NULL;
    options.file = "sleep";
    options.args = args;
    options.flags = UV_PROCESS_DETACHED;

    if (uv_spawn(loop, &child_req, options)) {
        fprintf(stderr, "%s¥n", uv_strerror(uv_last_error(loop)));
        return 1;
    }
    fprintf(stderr, "Launched sleep with PID %d¥n", child_req.pid);
    uv_unref((uv_handle_t*) &child_req);

    return uv_run(loop, UV_RUN_DEFAULT);
}

ただ、ウォッチャはまだ子を監視しているため、プログラムは終了しないことをを忘れないでください。もっと 発動したら忘れる ことを求めるなら、 uv_unref() を使用してください。

プロセスへのシグナル送信

libuvはUnix標準の kill(2) システムコール、及びWindowsにおいて同様の機能性を実装していますが、一つ警告があります。 SIGTERMSIGINTSIGKILL はプロセスを終了させます。 uv_killシグネチャは以下です。

uv_err_t uv_kill(int pid, int signum);

libuvを用いて開始されたプロセスのために、 uv_process_kill を代わりに用いることができます。これは pidの代わりに uv_process_t ウォッチャを第一引数として受け取ります。この場合、ウォッチャに対して uv_close呼び出すことを忘れないでください。

シグナル

TODO: update based on https://github.com/joyent/libuv/issues/668

libuvはWindowsサポートによってUnixシグナルに対するラッパも提供します。

libuvにより ’よく動作する' シグナルを生成するために、あるAPI全ての実行中のイベントループ上の全てのハンドラ に対してシグナルを送信します! ハンドラを初期化し、ループに関連付けるためにuv_signal_init() を使用してください。特定のシグナルを待ち受けるために、ハンドラ関数とともに uv_signal_start() を使用してください。 各ハンドラは一つのシグナル番号に対してのみ関連付け、 uv_signal_start() を続けて呼び出すことで、以前の関連付けを上書きすることができます。監視を中止するためには uv_signal_stop() を使用してください。以下はいろいろな使い方を示した小さな例です。

signal/main.c

#include <stdio.h>
#include <unistd.h>
#include <uv.h>

void signal_handler(uv_signal_t *handle, int signum)
{
    printf("Signal received: %d¥n", signum);
    uv_signal_stop(handle);
}

// two signal handlers in one loop
void thread1_worker(void *userp)
{
    uv_loop_t *loop1 = uv_loop_new();

    uv_signal_t sig1a, sig1b;
    uv_signal_init(loop1, &sig1a);
    uv_signal_start(&sig1a, signal_handler, SIGUSR1);

    uv_signal_init(loop1, &sig1b);
    uv_signal_start(&sig1b, signal_handler, SIGUSR1);

    uv_run(loop1, UV_RUN_DEFAULT);
}

// two signal handlers, each in its own loop
void thread2_worker(void *userp)
{
    uv_loop_t *loop2 = uv_loop_new();
    uv_loop_t *loop3 = uv_loop_new();

    uv_signal_t sig2;
    uv_signal_init(loop2, &sig2);
    uv_signal_start(&sig2, signal_handler, SIGUSR1);

    uv_signal_t sig3;
    uv_signal_init(loop3, &sig3);
    uv_signal_start(&sig3, signal_handler, SIGUSR1);

    while (uv_run(loop2, UV_RUN_NOWAIT) || uv_run(loop3, UV_RUN_NOWAIT)) {
    }
}

int main()
{
    printf("PID %d¥n", getpid());

    uv_thread_t thread1, thread2;

    uv_thread_create(&thread1, thread1_worker, 0);
    uv_thread_create(&thread2, thread2_worker, 0);

    uv_thread_join(&thread1);
    uv_thread_join(&thread2);
    return 0;
}

NOTE
uv_run(loop, UV_RUN_NOWAIT) は これがたった一つのイベントしか処理しない点で uv_run(loop, UV_RUN_ONCE) と同様です。UV_RUN_NOWAITがすぐに制御を戻すのに対し、 UV_RUN_ONCEは保留されたイベントがない場合にブロックします。NOWAITは他のループが保留された処理を持っていないという理由でループがロック(starved)しないようにするために用いることができます。

SIGUSR1 をプロセスに送信すると、各 uv_signal_t で1つずつ、4回ハンドラが起動していることを確認することができます。ハンドラはプログラムを終了するために各ハンドルを停止するだけです。全てのハンドラに対してのこの種の送信はとても有用です。単純に各イベントループに SIGINT のためのウォッチャを追加するだけで、複数のイベントループを使用するサーバは終了する前にデータが安全に保存されることを保証できます。

子プロセスのI/O

通常の、新規に起動したプロセスは自分自身のファイルディスクリプタのセットを持っており、0、1、2、はそれぞれ stdinstdoutstderr に対応しています。ときどきファイルディスクリプタを子プロセスと共有したい場合があります。例えば、アプリケーションはサブコマンドを起動し、発生したエラーをログファイルに書きたいが stdout は無視したいかもしれません。このために子プロセスの stderr を表示したいとします。この場合、libuvはファイルディスクリプタ継承 をサポートしています。この例では、下記のようなテストプログラムを起動します。

proc-streams/test.c

#include <stdio.h>

int main()
{
    fprintf(stderr, "This is stderr¥n");
    printf("This is stdout¥n");
    return 0;
}

実際のプログラムである proc-streamsstderr のみを継承して実行されます。 uv_process_options_tstdio フィールドを使用することで子プロセスのファイルディスクリプタは設定されます。最初に stdio_count フィールドに設定するファイルディスクリプタの数を設定します。 uv_process_options_t.stdiouv_stdio_container_t の配列であり、下記のようになります。

typedef struct uv_stdio_container_s {
  uv_stdio_flags flags;

  union {
    uv_stream_t* stream;
    int fd;
  } data;
} uv_stdio_container_t;

ここで、フラグはいくつかの値を持つことができます。 これが使われない場合に UV_IGNORE を用いてください。もし最初の3つの stdio フィールドが UV_IGNORE に設定された場合、これらは /dev/null にリダイレクトされます。

既存のディスクリプタに渡したいため、 UV_INHERIT_FD を使用します。それから fdstderr に設定します。

proc-streams/main.c

int main() {
    loop = uv_default_loop();


 /* ... */

    options.stdio_count = 3;
    uv_stdio_container_t child_stdio[3];
    child_stdio[0].flags = UV_IGNORE;
    child_stdio[1].flags = UV_IGNORE;
    child_stdio[2].flags = UV_INHERIT_FD;
    child_stdio[2].data.fd = 2;
    options.stdio = child_stdio;

    options.exit_cb = on_exit;
    options.file = args[0];
    options.args = args;


    if (uv_spawn(loop, &child_req, options)) {
        fprintf(stderr, "%s¥n", uv_strerror(uv_last_error(loop)));
        return 1;
    }

    return uv_run(loop, UV_RUN_DEFAULT);
}

proc-stream を実行すると、"This is stderr" という行だけが表示されます。 stdout を継承するように設定して出力を確認してください。

これはこのようなストリームのリダイレクションを適用したあまり感動のない単純なものです。 flagsUV_INHERIT_STREAM に設定し、 data.stream を親プロセスのストリームに設定することにより、子プロセスはそのプロセスを標準I/Oとして扱うことができます。これはCGIのようなものを実装するのに使用できます。

CGI スクリプト/実行ファイルの例は以下になります:

cgi/tick.c

#include <stdio.h>
#include <unistd.h>

int main() {
    int i;
    for (i = 0; i < 10; i++) {
        printf("tick¥n");
        fflush(stdout);
        sleep(1);
    }
    printf("BOOM!¥n");
    return 0;
}

このCGIサーバは、この章のコンセプトと :doc:ネットワーク を結びつけており、全てのクライアントは接続が閉じられるまでに10回"tick"を受信します。

cgi/main.c

void on_new_connection(uv_stream_t *server, int status) {
    uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(server, (uv_stream_t*) client) == 0) {
        invoke_cgi_script(client);
    }
    else {
        uv_close((uv_handle_t*) client, NULL);
    }
}

ここでTCP接続を受け付け、 invoke_cgi_script にソケット (stream) を渡します。

cgi/main.c

void invoke_cgi_script(uv_tcp_t *client) {

    /* ... finding the executable path and setting up arguments ... */

    options.stdio_count = 3;
    uv_stdio_container_t child_stdio[3];
    child_stdio[0].flags = UV_IGNORE;
    child_stdio[1].flags = UV_INHERIT_STREAM;
    child_stdio[1].data.stream = (uv_stream_t*) client;
    child_stdio[2].flags = UV_IGNORE;
    options.stdio = child_stdio;

    options.exit_cb = cleanup_handles;
    options.file = args[0];
    options.args = args;

    child_req.data = (void*) client;
    if (uv_spawn(loop, &child_req, options)) {
        fprintf(stderr, "%s¥n", uv_strerror(uv_last_error(loop)));
        return;
    }
}

CGIスクリプトstdout はtickスクリプトが出力する全ての内容をクライアントに送信させるためにソケットに設定されています。プロセスを用いることにより、read/writeのバッファリングをOSに任せることができ、これは簡単さの観点では素晴らしいことです。ただ、プロセスの生成はコストがかかる処理であることは肝に命じてください。

パイプ

libuvの uv_pipe_t 構造体はUnixプログラマーを少し混乱させるもので、なぜならこれはすぐに |pipe(7)を呼び出すからです。しかし uv_pipe_t は無名パイプとは関連がなく、これは2つを用いています。

  1. ストリームAPI - これは uv_stream_t の具体的な実装として実行されるものです。FIFOを提供するためのAPIであり、ローカルファイルのI/Oに対するストリーミングインターフェイスです。これは :ref:バッファとストリーム で言及される uv_pipe_open を用いて処理されます。これをTCP/UDPのためにも用いることができますが、この目的のためには既に便利な関数と構造体が用意されています。

  2. IPC機構 - uv_pipe_tUnixドメインソケットWindowsの名前付きパイプ.aspx)を複数のプロセスが通信できるように利用することができます。

親-子のIPC

親と子はパイプを通じた一方向もしくは双方向の通信を行うことができ、パイプは uv_stdio_container_t.flags にビット単位のUV_CREATE_PIPEUV_READABLE_PIPE もしくは UV_WRITABLE_PIPE の組み合わせを設定することによって作成することができます。read/writeフラグは子プロセス側の観点のフラグです。

任意のプロセスのIPC

ドメインソケット*1ファイルシステム内にウェルノウンネームとロケーションを持つことができるため関連のないプロセス間でIPCを使用することができます。 オープンソースのデスクトップ環境で使用されているD-BUSシステムはイベント通知のためにドメインソケットを使用しています。さまざまなアプリケーションがオンライン上のコンタクトが来た時や新しいハードウェアが検知された時に反応することができます。MySQLサーバもクライアントとやりとりするドメインソケットを実行します。

ドメインソケットを使用するとき、クライアント-サーバパターンが使用され、ソケットの作成者/オーナがサーバとして動作します。初期化の後は、通信の方法はTCPと変わりがありませんので、エコーサーバの例をもう一度持ち出します。

pipe-echo-server/main.c

int main() {
    loop = uv_default_loop();

    uv_pipe_t server;
    uv_pipe_init(loop, &server, 0);

    signal(SIGINT, remove_sock);

    if (uv_pipe_bind(&server, "echo.sock")) {
        fprintf(stderr, "Bind error %s¥n", uv_err_name(uv_last_error(loop)));
        return 1;
    }
    if (uv_listen((uv_stream_t*) &server, 128, on_new_connection)) {
        fprintf(stderr, "Listen error %s¥n", uv_err_name(uv_last_error(loop)));
        return 2;
    }
    return uv_run(loop, UV_RUN_DEFAULT);
}

ソケットにはローカルディレクトリに作成されるという意味で echo.sock と名前をつけました。このソケットはストリームAPIが関係する限りはTCPソケットと変わらない振る舞いをします。このサーバはnetcatを用いてサーバをテストすることができます。

$ nc -U /path/to/echo.sock

ドメインソケットに接続したいクライアントは以下を使用します::

void uv_pipe_connect(uv_connect_t *req, uv_pipe_t *handle, const char *name, uv_connect_cb cb);

ここで nameecho.sock もしくは同様のものです。

パイプを通じたファイルディスクリプタの送信

ドメインソケットのクールな点はドメインソケットを介して送信することによりファイルディスクリプタを交換できる点です。これはプロセスに他のプロセス間でI/Oを受け渡すことを可能にします。ロードバランスを行うサーバ、ワーカプロセスや他の手法を含むアプリケーションは最適なCPU利用を行うことができます。

WARNING
Windows上ではTCPソケットを表すファイルディスクリプタだけを受け渡しすることができます。

実演するために、ラウンドロビン方式でクライアントをワーカプロセスに引き渡すエコーサーバの実装を見てみましょう。このプログラムは少し複雑で、書籍内には少しのスニペットしかないので、本当に理解するには全てのコードを読むことをおすすめします。

ファイルディスクリプタがマスターから渡されるので、ワーカプロセスは本当に単純です。

multi-echo-server/worker.c

uv_loop_t *loop;
uv_pipe_t queue;


int main() {
    loop = uv_default_loop();

    uv_pipe_init(loop, &queue, 1);
    uv_pipe_open(&queue, 0);
    uv_read2_start((uv_stream_t*)&queue, alloc_buffer, on_new_connection);
    return uv_run(loop, UV_RUN_DEFAULT);
}

一方、 queue は マスタープロセスに接続されたパイプであり、送信された新しいファイルディスクリプタです。 read2 関数を用いてファイルディスクリプタに対する興味(interest)を表現します。 uv_pipe_init()ipc 引数を1に設定することが重要で、これはパイプがプロセス間通信に使用されることを表します! マスターはファイルハンドルをワーカの標準入力に書き込むため、パイプを uv_pipe_open を用いて stdin に接続します。

multi-echo-server/worker.c

void on_new_connection(uv_pipe_t *q, ssize_t nread, uv_buf_t buf, uv_handle_type pending) {
    if (pending == UV_UNKNOWN_HANDLE) {
        // error!
        return;
    }

    uv_pipe_t *client = (uv_pipe_t*) malloc(sizeof(uv_pipe_t));
    uv_pipe_init(loop, client, 0);
    if (uv_accept((uv_stream_t*) q, (uv_stream_t*) client) == 0) {
        fprintf(stderr, "Worker %d: Accepted fd %d¥n", getpid(), client->io_watcher.fd);
        uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
    }
    else {
        uv_close((uv_handle_t*) client, NULL);
    }
}

acceptはこのコード内で奇妙に思えますが、これは的を得ているものです。 accept がしていることは、伝統的に他のファイルディスクリプタ(リスニングソケット)からファイルディスクリプタ(クライアント)を得ることだからです。これはまさにここでしていることです。 queue からファイルディスクリプタ (client) を取得してください。ここからはワーカは標準的なエコーサーバの内容を行います。

マスターに戻って、ロードバランスを可能にするためにワーカがどのように起動されるかを見てみましょう。

multi-echo-server/main.c

uv_loop_t *loop;

struct child_worker {
    uv_process_t req;
    uv_process_options_t options;
    uv_pipe_t pipe;
} *workers;

child_worker 構造体はプロセスと、マスターと各プロセスの間のパイプをラップしています。

multi-echo-server/main.c

void setup_workers() {
    // ...

    // launch same number of workers as number of CPUs
    uv_cpu_info_t *info;
    int cpu_count;
    uv_cpu_info(&info, &cpu_count);
    uv_free_cpu_info(info, cpu_count);

    child_worker_count = cpu_count;

    workers = calloc(sizeof(struct child_worker), cpu_count);
    while (cpu_count--) {
        struct child_worker *worker = &workers[cpu_count];
        uv_pipe_init(loop, &worker->pipe, 1);

        uv_stdio_container_t child_stdio[3];
        child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
        child_stdio[0].data.stream = (uv_stream_t*) &worker->pipe;
        child_stdio[1].flags = UV_IGNORE;
        child_stdio[2].flags = UV_INHERIT_FD;
        child_stdio[2].data.fd = 2;

        worker->options.stdio = child_stdio;
        worker->options.stdio_count = 3;

        worker->options.exit_cb = close_process_handle;
        worker->options.file = args[0];
        worker->options.args = args;

        uv_spawn(loop, &worker->req, worker->options); 
        fprintf(stderr, "Started worker %d¥n", worker->req.pid);
    }
}

ワーカの準備の中で、 uv_cpu_info というちょっとかっこいい関数を使っており、これはCPUの数を取得することができるのでワーカの数をこれと同じにすることができます。再度第三引数を1にしてパイプをIPCとして初期化することの重要性を述べておきます。それから子プロセスの stdin を(子プロセスの観点から)読み取り可能なパイプであることを指定します。ここまで全て正攻法です。ワーカは起動し、パイプにファイルディスクリプタが書き込まれるを待ちます。

on_new_connection (TCP周りは main() で初期化されています)で、クライアントソケットを待ち受け、ラウンドロビンの中の次のワーカにソケットを渡します。

multi-echo-server/main.c

void on_new_connection(uv_stream_t *server, int status) {
    if (status == -1) {
        // error!
        return;
    }

    uv_pipe_t *client = (uv_pipe_t*) malloc(sizeof(uv_pipe_t));
    uv_pipe_init(loop, client, 0);
    if (uv_accept(server, (uv_stream_t*) client) == 0) {
        uv_write_t *write_req = (uv_write_t*) malloc(sizeof(uv_write_t));
        dummy_buf = uv_buf_init(".", 1);
        struct child_worker *worker = &workers[round_robin_counter];
        uv_write2(write_req, (uv_stream_t*) &worker->pipe, &dummy_buf, 1, (uv_stream_t*) client, NULL);
        round_robin_counter = (round_robin_counter + 1) % child_worker_count;
    }
    else {
        uv_close((uv_handle_t*) client, NULL);
    }
}

もう一度、 uv_write2 が全ての抽象化を制御し、正しい引数としてファイルディスクリプタを渡すことが問題であることを述べておきます。これでマルチプロセスのエコーサーバの出来上がりです。

TODO what do the write2/read2 functions do with the buffers?

*1:同様にこの章ではドメインソケットはWindows上では名前付きパイプを用いて実装されています。