node.jsを支えるlibuvのチュートリアル"uvbook" :プロセス
この文書はuvbookの日本語翻訳の一部となります。文書そのものの説明その他については目次をご覧ください。
プロセス
libuvはかなりの量の子プロセス管理機能を提供しており、プラットフォーム間の差異を抽象化し、ストリームや名前付きパイプを用いて子プロセスと通信することを可能にしています。
Unixにおける共通のイディオムは全てのプロセスが一つのことを良好に行えることです。このようなケースでは、プロセスはタスクを完遂するために複数の子プロセスを使用します(シェルがパイプを用いるように)。メッセージを用いるマルチプロセスモデルはスレッドと共有メモリを用いるモデルに比較して簡単になるでしょう。
イベントベースのプログラムに対する共通のマイナス要因は現代のコンピュータにおいて複数のコアを持つ利点を活用しきれない点です。マルチスレッドプログラムではカーネルはスケジューリングを行い、異なるスレッドに異なるコアを割り当てることができます。しかし、イベントループは単一のスレッドです。これに対する回避策は代わりに複数のプロセスを起動することであり、各プロセスはイベントループを処理し、各プロセスは別々のコアを割り当てられます。
子プロセスの起動
最も単純なケースはプロセスを起動し、いつプロセスが終了したかを知ることです。これは uv_spawn
を用いることで達成されます。
spawn/main.c
uv_loop_t *loop; uv_process_t child_req; uv_process_options_t options; int main() { loop = uv_default_loop(); char* args[3]; args[0] = "mkdir"; args[1] = "test-dir"; args[2] = NULL; options.exit_cb = on_exit; options.file = "mkdir"; options.args = args; if (uv_spawn(loop, &child_req, options)) { fprintf(stderr, "%s¥n", uv_strerror(uv_last_error(loop))); return 1; } return uv_run(loop, UV_RUN_DEFAULT); }
NOTE
options
はグローバル変数であるため、明示的に0で初期化されています。もしoptions
とローカル変数に変える場合、使用しない全てのフィールドをnullに初期化することを忘れないでください。
uv_process_options_t options = {0};
uv_process_t
構造体はウォッチャとしてのみ動作し、全てのオプションは uv_process_options_t
を通じて設定します。プロセスの起動を簡単にするために、 file
と args
フィールドのみを設定する必要があります。 file
は実行するプログラムです。 uv_spawn
は内部的にexecvpを内部的に使用するため、フルパスを渡す必要はありません。最後に、基本的な規則として、 引数の配列は引数の数より一つ大きく、最後の要素はNULLである必要があります。
uv_spawn
の呼び出しのあとに、 uv_process_t.pid
には子プロセスのプロセスIDが格納されています。
exitコールバックは 終了ステータス と exitの原因となった signal の種類とともに呼び出されます。
spawn/main.c
void on_exit(uv_process_t *req, int exit_status, int term_signal) { fprintf(stderr, "Process exited with status %d, signal %d¥n", exit_status, term_signal); uv_close((uv_handle_t*) req, NULL); }
プロセスが終了した後にプロセスのウォッチャをクローズすることが 必要です。
プロセスのパラメータ変更
子プロセスが起動する前に uv_process_options_t
内のフィールドを用いて実行環境を制御することができます。
実行ディレクトリの変更
uv_process_options_t.cwd
を対応するディレクトリに設定します。
環境変数の設定
uv_process_options_t.env
は文字列のヌル終端された配列であり、 各 VAR=VALUE
形式はプロセスの環境変数を設定するために用いられます。これを NULL
にすることにより親プロセスからからの環境変数を受け継ぎます。
オプションフラグ
uv_process_options_t.flags
に下記のビット単位のORフラグを設定することにより、子プロセスの動作を修正します。
UV_PROCESS_SETUID
- これは子の実行ユーザIDをuv_process_options_t.uid
に設定します。UV_PROCESS_SETGID
- これは子の実行グループIDをuv_process_options_t.gid
に設定します。
UID/GIDの変更はUnixのみでサポートされており、 uv_spawn
はWindowsでは UV_ENOTSUP
で失敗します。
UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS
-uv_process_options_t.args
にクオートやエスケープを付与しません。Unixでは無視されます。UV_PROCESS_DETACHED
- 新しいセッションで子プロセスを開始し、親プロセスが終了した後も処理を続けます。下記の例を参照してください。
プロセスをデタッチする
UV_PROCESS_DETACHED
フラグを渡すことで、親の終了が影響しないデーモン、もしくは親と独立した子プロセスを起動することができます。
detach/main.c
int main() { loop = uv_default_loop(); char* args[3]; args[0] = "sleep"; args[1] = "100"; args[2] = NULL; options.exit_cb = NULL; options.file = "sleep"; options.args = args; options.flags = UV_PROCESS_DETACHED; if (uv_spawn(loop, &child_req, options)) { fprintf(stderr, "%s¥n", uv_strerror(uv_last_error(loop))); return 1; } fprintf(stderr, "Launched sleep with PID %d¥n", child_req.pid); uv_unref((uv_handle_t*) &child_req); return uv_run(loop, UV_RUN_DEFAULT); }
ただ、ウォッチャはまだ子を監視しているため、プログラムは終了しないことをを忘れないでください。もっと 発動したら忘れる ことを求めるなら、 uv_unref()
を使用してください。
プロセスへのシグナル送信
libuvはUnix標準の kill(2)
システムコール、及びWindowsにおいて同様の機能性を実装していますが、一つ警告があります。 SIGTERM
、 SIGINT
と SIGKILL
はプロセスを終了させます。 uv_kill
のシグネチャは以下です。
uv_err_t uv_kill(int pid, int signum);
libuvを用いて開始されたプロセスのために、 uv_process_kill
を代わりに用いることができます。これは pidの代わりに uv_process_t
ウォッチャを第一引数として受け取ります。この場合、ウォッチャに対して uv_close
を 呼び出すことを忘れないでください。
シグナル
TODO: update based on https://github.com/joyent/libuv/issues/668
libuvはWindowsサポートによってUnixシグナルに対するラッパも提供します。
libuvにより ’よく動作する' シグナルを生成するために、あるAPIが 全ての実行中のイベントループ上の全てのハンドラ に対してシグナルを送信します! ハンドラを初期化し、ループに関連付けるためにuv_signal_init()
を使用してください。特定のシグナルを待ち受けるために、ハンドラ関数とともに uv_signal_start()
を使用してください。 各ハンドラは一つのシグナル番号に対してのみ関連付け、 uv_signal_start()
を続けて呼び出すことで、以前の関連付けを上書きすることができます。監視を中止するためには uv_signal_stop()
を使用してください。以下はいろいろな使い方を示した小さな例です。
signal/main.c
#include <stdio.h> #include <unistd.h> #include <uv.h> void signal_handler(uv_signal_t *handle, int signum) { printf("Signal received: %d¥n", signum); uv_signal_stop(handle); } // two signal handlers in one loop void thread1_worker(void *userp) { uv_loop_t *loop1 = uv_loop_new(); uv_signal_t sig1a, sig1b; uv_signal_init(loop1, &sig1a); uv_signal_start(&sig1a, signal_handler, SIGUSR1); uv_signal_init(loop1, &sig1b); uv_signal_start(&sig1b, signal_handler, SIGUSR1); uv_run(loop1, UV_RUN_DEFAULT); } // two signal handlers, each in its own loop void thread2_worker(void *userp) { uv_loop_t *loop2 = uv_loop_new(); uv_loop_t *loop3 = uv_loop_new(); uv_signal_t sig2; uv_signal_init(loop2, &sig2); uv_signal_start(&sig2, signal_handler, SIGUSR1); uv_signal_t sig3; uv_signal_init(loop3, &sig3); uv_signal_start(&sig3, signal_handler, SIGUSR1); while (uv_run(loop2, UV_RUN_NOWAIT) || uv_run(loop3, UV_RUN_NOWAIT)) { } } int main() { printf("PID %d¥n", getpid()); uv_thread_t thread1, thread2; uv_thread_create(&thread1, thread1_worker, 0); uv_thread_create(&thread2, thread2_worker, 0); uv_thread_join(&thread1); uv_thread_join(&thread2); return 0; }
NOTE
uv_run(loop, UV_RUN_NOWAIT)
は これがたった一つのイベントしか処理しない点でuv_run(loop, UV_RUN_ONCE)
と同様です。UV_RUN_NOWAITがすぐに制御を戻すのに対し、 UV_RUN_ONCEは保留されたイベントがない場合にブロックします。NOWAITは他のループが保留された処理を持っていないという理由でループがロック(starved)しないようにするために用いることができます。
SIGUSR1
をプロセスに送信すると、各 uv_signal_t
で1つずつ、4回ハンドラが起動していることを確認することができます。ハンドラはプログラムを終了するために各ハンドルを停止するだけです。全てのハンドラに対してのこの種の送信はとても有用です。単純に各イベントループに SIGINT
のためのウォッチャを追加するだけで、複数のイベントループを使用するサーバは終了する前にデータが安全に保存されることを保証できます。
子プロセスのI/O
通常の、新規に起動したプロセスは自分自身のファイルディスクリプタのセットを持っており、0、1、2、はそれぞれ stdin
、 stdout
、 stderr
に対応しています。ときどきファイルディスクリプタを子プロセスと共有したい場合があります。例えば、アプリケーションはサブコマンドを起動し、発生したエラーをログファイルに書きたいが stdout
は無視したいかもしれません。このために子プロセスの stderr
を表示したいとします。この場合、libuvはファイルディスクリプタの 継承 をサポートしています。この例では、下記のようなテストプログラムを起動します。
proc-streams/test.c
#include <stdio.h> int main() { fprintf(stderr, "This is stderr¥n"); printf("This is stdout¥n"); return 0; }
実際のプログラムである proc-streams
は stderr
のみを継承して実行されます。 uv_process_options_t
の stdio
フィールドを使用することで子プロセスのファイルディスクリプタは設定されます。最初に stdio_count
フィールドに設定するファイルディスクリプタの数を設定します。 uv_process_options_t.stdio
は uv_stdio_container_t
の配列であり、下記のようになります。
typedef struct uv_stdio_container_s { uv_stdio_flags flags; union { uv_stream_t* stream; int fd; } data; } uv_stdio_container_t;
ここで、フラグはいくつかの値を持つことができます。 これが使われない場合に UV_IGNORE
を用いてください。もし最初の3つの stdio
フィールドが UV_IGNORE
に設定された場合、これらは /dev/null
にリダイレクトされます。
既存のディスクリプタに渡したいため、 UV_INHERIT_FD
を使用します。それから fd
を stderr
に設定します。
proc-streams/main.c
int main() { loop = uv_default_loop(); /* ... */ options.stdio_count = 3; uv_stdio_container_t child_stdio[3]; child_stdio[0].flags = UV_IGNORE; child_stdio[1].flags = UV_IGNORE; child_stdio[2].flags = UV_INHERIT_FD; child_stdio[2].data.fd = 2; options.stdio = child_stdio; options.exit_cb = on_exit; options.file = args[0]; options.args = args; if (uv_spawn(loop, &child_req, options)) { fprintf(stderr, "%s¥n", uv_strerror(uv_last_error(loop))); return 1; } return uv_run(loop, UV_RUN_DEFAULT); }
proc-stream
を実行すると、"This is stderr" という行だけが表示されます。 stdout
を継承するように設定して出力を確認してください。
これはこのようなストリームのリダイレクションを適用したあまり感動のない単純なものです。 flags
を UV_INHERIT_STREAM
に設定し、 data.stream
を親プロセスのストリームに設定することにより、子プロセスはそのプロセスを標準I/Oとして扱うことができます。これはCGIのようなものを実装するのに使用できます。
cgi/tick.c
#include <stdio.h> #include <unistd.h> int main() { int i; for (i = 0; i < 10; i++) { printf("tick¥n"); fflush(stdout); sleep(1); } printf("BOOM!¥n"); return 0; }
このCGIサーバは、この章のコンセプトと :doc:ネットワーク
を結びつけており、全てのクライアントは接続が閉じられるまでに10回"tick"を受信します。
cgi/main.c
void on_new_connection(uv_stream_t *server, int status) { uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t)); uv_tcp_init(loop, client); if (uv_accept(server, (uv_stream_t*) client) == 0) { invoke_cgi_script(client); } else { uv_close((uv_handle_t*) client, NULL); } }
ここでTCP接続を受け付け、 invoke_cgi_script
にソケット (stream) を渡します。
cgi/main.c
void invoke_cgi_script(uv_tcp_t *client) { /* ... finding the executable path and setting up arguments ... */ options.stdio_count = 3; uv_stdio_container_t child_stdio[3]; child_stdio[0].flags = UV_IGNORE; child_stdio[1].flags = UV_INHERIT_STREAM; child_stdio[1].data.stream = (uv_stream_t*) client; child_stdio[2].flags = UV_IGNORE; options.stdio = child_stdio; options.exit_cb = cleanup_handles; options.file = args[0]; options.args = args; child_req.data = (void*) client; if (uv_spawn(loop, &child_req, options)) { fprintf(stderr, "%s¥n", uv_strerror(uv_last_error(loop))); return; } }
CGIスクリプトの stdout
はtickスクリプトが出力する全ての内容をクライアントに送信させるためにソケットに設定されています。プロセスを用いることにより、read/writeのバッファリングをOSに任せることができ、これは簡単さの観点では素晴らしいことです。ただ、プロセスの生成はコストがかかる処理であることは肝に命じてください。
パイプ
libuvの uv_pipe_t
構造体はUnixプログラマーを少し混乱させるもので、なぜならこれはすぐに |
とpipe(7)を呼び出すからです。しかし uv_pipe_t
は無名パイプとは関連がなく、これは2つを用いています。
ストリームAPI - これは
uv_stream_t
の具体的な実装として実行されるものです。FIFOを提供するためのAPIであり、ローカルファイルのI/Oに対するストリーミングインターフェイスです。これは :ref:バッファとストリーム
で言及されるuv_pipe_open
を用いて処理されます。これをTCP/UDPのためにも用いることができますが、この目的のためには既に便利な関数と構造体が用意されています。IPC機構 -
uv_pipe_t
はUnixドメインソケットかWindowsの名前付きパイプ.aspx)を複数のプロセスが通信できるように利用することができます。
親-子のIPC
親と子はパイプを通じた一方向もしくは双方向の通信を行うことができ、パイプは uv_stdio_container_t.flags
にビット単位のUV_CREATE_PIPE
と UV_READABLE_PIPE
もしくは UV_WRITABLE_PIPE
の組み合わせを設定することによって作成することができます。read/writeフラグは子プロセス側の観点のフラグです。
任意のプロセスのIPC
ドメインソケット*1はファイルシステム内にウェルノウンネームとロケーションを持つことができるため関連のないプロセス間でIPCを使用することができます。 オープンソースのデスクトップ環境で使用されているD-BUSシステムはイベント通知のためにドメインソケットを使用しています。さまざまなアプリケーションがオンライン上のコンタクトが来た時や新しいハードウェアが検知された時に反応することができます。MySQLサーバもクライアントとやりとりするドメインソケットを実行します。
ドメインソケットを使用するとき、クライアント-サーバパターンが使用され、ソケットの作成者/オーナがサーバとして動作します。初期化の後は、通信の方法はTCPと変わりがありませんので、エコーサーバの例をもう一度持ち出します。
pipe-echo-server/main.c
int main() { loop = uv_default_loop(); uv_pipe_t server; uv_pipe_init(loop, &server, 0); signal(SIGINT, remove_sock); if (uv_pipe_bind(&server, "echo.sock")) { fprintf(stderr, "Bind error %s¥n", uv_err_name(uv_last_error(loop))); return 1; } if (uv_listen((uv_stream_t*) &server, 128, on_new_connection)) { fprintf(stderr, "Listen error %s¥n", uv_err_name(uv_last_error(loop))); return 2; } return uv_run(loop, UV_RUN_DEFAULT); }
ソケットにはローカルディレクトリに作成されるという意味で echo.sock
と名前をつけました。このソケットはストリームAPIが関係する限りはTCPソケットと変わらない振る舞いをします。このサーバはnetcat
を用いてサーバをテストすることができます。
$ nc -U /path/to/echo.sock
ドメインソケットに接続したいクライアントは以下を使用します::
void uv_pipe_connect(uv_connect_t *req, uv_pipe_t *handle, const char *name, uv_connect_cb cb);
ここで name
は echo.sock
もしくは同様のものです。
パイプを通じたファイルディスクリプタの送信
ドメインソケットのクールな点はドメインソケットを介して送信することによりファイルディスクリプタを交換できる点です。これはプロセスに他のプロセス間でI/Oを受け渡すことを可能にします。ロードバランスを行うサーバ、ワーカプロセスや他の手法を含むアプリケーションは最適なCPU利用を行うことができます。
実演するために、ラウンドロビン方式でクライアントをワーカプロセスに引き渡すエコーサーバの実装を見てみましょう。このプログラムは少し複雑で、書籍内には少しのスニペットしかないので、本当に理解するには全てのコードを読むことをおすすめします。
ファイルディスクリプタがマスターから渡されるので、ワーカプロセスは本当に単純です。
multi-echo-server/worker.c
uv_loop_t *loop; uv_pipe_t queue; int main() { loop = uv_default_loop(); uv_pipe_init(loop, &queue, 1); uv_pipe_open(&queue, 0); uv_read2_start((uv_stream_t*)&queue, alloc_buffer, on_new_connection); return uv_run(loop, UV_RUN_DEFAULT); }
一方、 queue
は マスタープロセスに接続されたパイプであり、送信された新しいファイルディスクリプタです。 read2
関数を用いてファイルディスクリプタに対する興味(interest)を表現します。 uv_pipe_init()
の ipc
引数を1に設定することが重要で、これはパイプがプロセス間通信に使用されることを表します! マスターはファイルハンドルをワーカの標準入力に書き込むため、パイプを uv_pipe_open
を用いて stdin
に接続します。
multi-echo-server/worker.c
void on_new_connection(uv_pipe_t *q, ssize_t nread, uv_buf_t buf, uv_handle_type pending) { if (pending == UV_UNKNOWN_HANDLE) { // error! return; } uv_pipe_t *client = (uv_pipe_t*) malloc(sizeof(uv_pipe_t)); uv_pipe_init(loop, client, 0); if (uv_accept((uv_stream_t*) q, (uv_stream_t*) client) == 0) { fprintf(stderr, "Worker %d: Accepted fd %d¥n", getpid(), client->io_watcher.fd); uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read); } else { uv_close((uv_handle_t*) client, NULL); } }
accept
はこのコード内で奇妙に思えますが、これは的を得ているものです。 accept
がしていることは、伝統的に他のファイルディスクリプタ(リスニングソケット)からファイルディスクリプタ(クライアント)を得ることだからです。これはまさにここでしていることです。 queue
からファイルディスクリプタ (client
) を取得してください。ここからはワーカは標準的なエコーサーバの内容を行います。
マスターに戻って、ロードバランスを可能にするためにワーカがどのように起動されるかを見てみましょう。
multi-echo-server/main.c
uv_loop_t *loop;
struct child_worker {
uv_process_t req;
uv_process_options_t options;
uv_pipe_t pipe;
} *workers;
child_worker
構造体はプロセスと、マスターと各プロセスの間のパイプをラップしています。
multi-echo-server/main.c
void setup_workers() { // ... // launch same number of workers as number of CPUs uv_cpu_info_t *info; int cpu_count; uv_cpu_info(&info, &cpu_count); uv_free_cpu_info(info, cpu_count); child_worker_count = cpu_count; workers = calloc(sizeof(struct child_worker), cpu_count); while (cpu_count--) { struct child_worker *worker = &workers[cpu_count]; uv_pipe_init(loop, &worker->pipe, 1); uv_stdio_container_t child_stdio[3]; child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; child_stdio[0].data.stream = (uv_stream_t*) &worker->pipe; child_stdio[1].flags = UV_IGNORE; child_stdio[2].flags = UV_INHERIT_FD; child_stdio[2].data.fd = 2; worker->options.stdio = child_stdio; worker->options.stdio_count = 3; worker->options.exit_cb = close_process_handle; worker->options.file = args[0]; worker->options.args = args; uv_spawn(loop, &worker->req, worker->options); fprintf(stderr, "Started worker %d¥n", worker->req.pid); } }
ワーカの準備の中で、 uv_cpu_info
というちょっとかっこいい関数を使っており、これはCPUの数を取得することができるのでワーカの数をこれと同じにすることができます。再度第三引数を1にしてパイプをIPCとして初期化することの重要性を述べておきます。それから子プロセスの stdin
を(子プロセスの観点から)読み取り可能なパイプであることを指定します。ここまで全て正攻法です。ワーカは起動し、パイプにファイルディスクリプタが書き込まれるを待ちます。
on_new_connection
(TCP周りは main()
で初期化されています)で、クライアントソケットを待ち受け、ラウンドロビンの中の次のワーカにソケットを渡します。
multi-echo-server/main.c
void on_new_connection(uv_stream_t *server, int status) { if (status == -1) { // error! return; } uv_pipe_t *client = (uv_pipe_t*) malloc(sizeof(uv_pipe_t)); uv_pipe_init(loop, client, 0); if (uv_accept(server, (uv_stream_t*) client) == 0) { uv_write_t *write_req = (uv_write_t*) malloc(sizeof(uv_write_t)); dummy_buf = uv_buf_init(".", 1); struct child_worker *worker = &workers[round_robin_counter]; uv_write2(write_req, (uv_stream_t*) &worker->pipe, &dummy_buf, 1, (uv_stream_t*) client, NULL); round_robin_counter = (round_robin_counter + 1) % child_worker_count; } else { uv_close((uv_handle_t*) client, NULL); } }
もう一度、 uv_write2
が全ての抽象化を制御し、正しい引数としてファイルディスクリプタを渡すことが問題であることを述べておきます。これでマルチプロセスのエコーサーバの出来上がりです。
TODO what do the write2/read2 functions do with the buffers?