自由課題

学んだり、考えたり、試したりしたこと。

node.jsを支えるlibuvのチュートリアル"uvbook" :ユーティリティ

この文書はuvbookの日本語翻訳の一部となります。文書そのものの説明その他については目次をご覧ください。

ユーティリティ

この章は共通のタスクで有用なツールやテクニックを列挙します。libev man pageはlibuvに適用可能な単純なAPIの変更を通じていくつかのパターンを取り扱っています。また、章を割くほどでもないlibuv APIの部分についても言及します。

タイマ

タイマは開始されてから定められた時間が経過した後にコールバックを起動します。libuvのタイマは一度きりではなく一定の間隔で起動するように設定することもできます。

単純な使用法はウォッチャを初期化して timeout と必要であれば repeat を設定して開始します。タイマはいつでも停止することができます。

    uv_timer_t timer_req;

    uv_timer_init(loop, &timer_req);
    uv_timer_start(&timer_req, callback, 5000, 2000);

は繰り返しタイマを開始し、 uv_timer_start の実行後最初は5秒( timeout )で開始し、その後2秒ごと( repeat )に繰り返します。

    uv_timer_stop(&timer_req);

をタイマを停止するために使用します。この関数はコールバックの中からも安全に使用できます。

繰り返し間隔はいつでも変更することができ、

    uv_timer_set_repeat(uv_timer_t *timer, int64_t repeat);

可能なときに 変更が反映されます。もしこの関数がタイマコールバックから呼び出された場合、以下のような動作となります。

  • もしタイマが繰り返さない場合、タイマは既に停止ししています。 uv_timer_start を再び使用してください。
  • もしタイマが繰り返す場合、次のタイムアウトは既にスケジューリングされていますので、タイマの間隔が更新される前に古い繰り返し間隔が一度だけ使用されます。

ユーティリティ関数の

    int uv_timer_again(uv_timer_t *)

繰り返すタイマー にのみ適用できタイマを停止し、初期の timeout と 古い repeat の値がセットされた repeat により開始することと等価です。もしタイマが開始されなかった場合、失敗( エラーコードは UV_EINVAL )し、-1を返します。

実際のタイマの例は 次項 にあります。

イベントループ参照カウント

イベントループは動作中のウォッチャが存在している間だけ動作します。このシステムはウォッチャが開始された時にイベントループの参照カウントを加算し、ウォッチャが停止した時に参照カウントを減算することによって動作します。また、ハンドルの参照カウントを手動で変更することにより変更できます。

    void uv_ref(uv_handle_t*);
    void uv_unref(uv_handle_t*);

上記の関数は、ウォッチャが動作している間にループを脱出することや、ループを生かしておくためにカスタムオブジェクトを使用することを可能にするために使用されます。

以前は繰り返しタイマと共に使用することができました。X秒毎に動作するガーベッジコレクタや、定期的に他のマシンにハートビートを送信するネットワークサービスを使用することができますが、通常の停止シーケンスかエラー時にこれらを停止したくない場合があります。もしくは、全ての他のウォッチャが処理を完了した時にプログラムを停止したい場合があります。このケースではタイマが唯一のウォッチャであり uv_run が停止するようにタイマを作成直後にunrefするだけです。

続いてはlibuvのメソッドがJS APIの中で使用されている、node.jsの中のものです。 uv_handle_t (全てのウォッチャのスーパークラス) はJSオブジェクト毎に作成され、ref/unrefをすることができます。

ref-timer/main.c

uv_loop_t *loop;
uv_timer_t gc_req;
uv_timer_t fake_job_req;

int main() {
    loop = uv_default_loop();

    uv_timer_init(loop, &gc_req);
    uv_unref((uv_handle_t*) &gc_req);

    uv_timer_start(&gc_req, gc, 0, 2000);

    // could actually be a TCP download or something
    uv_timer_init(loop, &fake_job_req);
    uv_timer_start(&fake_job_req, fake_job, 9000, 0);
    return uv_run(loop, UV_RUN_DEFAULT);
}

ガーベッジコレクタタイマを初期化し、すぐに unref します。9秒後、フェイクのジョブが完了するとガーベッジコレクタがまだ動作しているのにも関わらずプログラムは自動的に停止します。

アイドルウォッチャパターン

アイドル(待機中)ウォッチャのコールバックはイベントループが他の待機中のイベントが無かったときにのみ起動されます。このような状況では、コールバックは各ループの繰り返しにつき一度だけ呼び出されます。このアイドルコールバックはとても低い優先度の処理として使用されます。例えば、待機中の区間に開発者が解析するための日ごとのアプリケーションパフォーマンスのサマリを割り当てることや、SETI計算を行うためにアプリケーションのCPUを使用することもできます :) アイドルウォッチャはGUIアプリケーションにも有効です。イベントループをファイルダウンロードに使用するとします。TCPソケットが接続されたままで他のイベントが存在しなかった場合、イベントループは一時停止 ( ブロック )し、これはプログレスバーが進まずユーザはアプリケーションがクラッシュしたと思うでしょう。このようなケースではアイドルウォッチャを追加(queue up)し、UIを使用可能な状態に保ちます。

idle-compute/main.c

uv_loop_t *loop;
uv_fs_t stdin_watcher;
uv_idle_t idler;
char buffer[1024];

int main() {
    loop = uv_default_loop();

    uv_idle_init(loop, &idler);

    uv_fs_read(loop, &stdin_watcher, 1, buffer, 1024, -1, on_type);
    uv_idle_start(&idler, crunch_away);
    return uv_run(loop, UV_RUN_DEFAULT);
}

まずアイドルウォッチャを初期化し、興味がある実際のイベントと一緒に追加します。 crunch_away はユーザが何かを入力してリターンキーを押下するまで繰り返し呼び出されます。その後ループが入力データの処理を行う間少し停止し、その後アイドルコールバックは再び呼び出され続けます。

idle-compute/main.c

void crunch_away(uv_idle_t* handle, int status) {
    // 地球外生命体に関する計算をする
    // タンパク質の折り畳み計算をする
    // πの計算をする
    // などなど
    fprintf(stderr, "Computing PI...¥n");
    // 端末を埋め尽くしてしまわないように以下をする
    uv_idle_stop(handle);
}

ワーカスレッドへのデータ送信

uv_queue_work を使用するとき、通常はワーカスレッドに複雑なデータを渡す必要があります。この解決方法は struct を使用し、 uv_work_t.data にこれを指し示すように設定することです。 ちょっとしたバリエーションとしては uv_work_t そのものをこのstructの最初のメンバとする(baton *1 と呼ばれます)ことです。こうすることでworkリクエストと全ての関連するデータを一回のfree呼び出しで後片付けすることができます。

    struct ftp_baton {
        uv_work_t req;
        char *host;
        int port;
        char *username;
        char *password;
    }
    ftp_baton *baton = (ftp_baton*) malloc(sizeof(ftp_baton));
    baton->req.data = (void*) baton;
    baton->host = strdup("my.webhost.com");
    baton->port = 21;
    // ...

    uv_queue_work(loop, &baton->req, ftp_session, ftp_cleanup);

それではbatonを作成し、タスクに紐付けてみましょう。

これでタスク関数は必要とするデータを抽出することができます。

    void ftp_session(uv_work_t *req) {
        ftp_baton *baton = (ftp_baton*) req->data;

        fprintf(stderr, "Connecting to %s\n", baton->host);
    }

    void ftp_cleanup(uv_work_t *req) {
        ftp_baton *baton = (ftp_baton*) req->data;

        free(baton->host);
        // ...
        free(baton);
    }

そしてウォッチャも開放することでbatonを開放します。

ポーリングを伴う外部I/O

通常第三者のライブラリは彼ら自身のI/Oを処理し、ソケットや他のファイルを内部的に保持し続けます。このケースでは通常のストリームI/O操作を使用することが不可能ですが、ライブラリはlibuvイベントループと統合することができます。必要なことはライブラリが内部のファイルディスクリプタにアクセスでき、アプリケーションで決めたとおりに少しづつタスクを処理する関数を提供することだけです。いくつかのライブラリはそれでもそのようなアクセスを許可せず、I/Oトランザクション全体を処理し、その後制御を戻す標準的なブロッキング関数を提供するかもしれません。これらはイベントループスレッド内で使用するには適さないため、libuv-work-queueを代わりに使ってください。もちろんこれはライブラリに対する細かい制御を失うことを意味します。

libuvの uv_poll の部分は単純にOSの通知機構を通じてファイルディスクリプタを監視するだけです。ある意味では、libuvが実装する全てのI/O操作は uv_poll のようなコードが裏で支えているとも言えます。OSがポーリングされたファイルディスクリプタの状態変更を通知するときはいつでも、libuvは関連付けられたコールバックを起動します。

それではファイルをダウンロードするためにlibcurlを使用した単純なダウンロードマネージャを見てみましょう。libcurlに全ての制御を渡してしまうより、libuvのイベントループを使用し、libuvがI/Oの準備ができていることを通知するときダウンロードを進めるためにノンブロッキングで非同期のmultiインターフェイスを使用します。

uvwget/main.c - The setup

#include <stdio.h>
#include <stdlib.h>
#include <uv.h>
#include <curl/curl.h>

uv_loop_t *loop;
CURLM *curl_handle;
uv_timer_t timeout;

int main(int argc, char **argv) {
    loop = uv_default_loop();

    if (argc <= 1)
        return 0;

    if (curl_global_init(CURL_GLOBAL_ALL)) {
        fprintf(stderr, "Could not init cURL¥n");
        return 1;
    }

    uv_timer_init(loop, &timeout);

    curl_handle = curl_multi_init();
    curl_multi_setopt(curl_handle, CURLMOPT_SOCKETFUNCTION, handle_socket);
    curl_multi_setopt(curl_handle, CURLMOPT_TIMERFUNCTION, start_timeout);

    while (argc-- > 1) {
        add_download(argv[argc], argc);
    }

    uv_run(loop, UV_RUN_DEFAULT);
    curl_multi_cleanup(curl_handle);
    return 0;
}

それぞれのライブラリがlibuvと統合される方法は様々です。libcurlの場合、2つのコールバックを登録することができます。ソケットのコールバックである handle_socket はソケットの状態が変わったときにはいつでも呼びだされるため、ポーリングを開始する必要があります。 start_timeout は次のタイムアウト間隔を通知するためにlibcurlによって呼び出され、I/O状態に関わらずlibcurlを呼び出します。これはlibcurlがエラーを処理するか、ダウンロードが進行したことを検知するためです。

ダウンローダは下記のように呼び出されます::

$ ./uvwget [url1] [url2] ...

ですので各引数をURLとして加えます。

uvwget/main.c - Adding urls

typedef struct curl_context_s {
    uv_poll_t poll_handle;
    curl_socket_t sockfd;
} curl_context_t;

curl_context_t *create_curl_context(curl_socket_t sockfd) {
    int r;
    curl_context_t *context;

    context = (curl_context_t*) malloc(sizeof *context);

    context->sockfd = sockfd;

    r = uv_poll_init_socket(loop, &context->poll_handle, sockfd);
    context->poll_handle.data = context;

    return context;
}

libcurlを直接ファイルに書くように制御しますが、もし望むのであればそれ以上のことを行わせることもできます。

start_timeout は最初すぐにlibcurlによって呼び出され、動作するように設定されます。これはタイムアウトした時はいつでも CURL_SOCKET_TIMEOUT と共に curl_multi_socket_action を呼び出すlibuvのタイマを単純に開始します。 curl_multi_socket_action はlibcurlを呼び出すものであり、ソケットが状態を変化させた時はいつでも呼び出されます。しかしここに踏み込む前に handle_socket が呼び出された時はいつでもソケットをポーリングする必要があります。

uvwget/main.c - Setting up polling

int handle_socket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp) {
    curl_context_t *curl_context;
    if (action == CURL_POLL_IN || action == CURL_POLL_OUT) {
        if (socketp) {
            curl_context = (curl_context_t*) socketp;
        }
        else {
            curl_context = create_curl_context(s);
        }
        curl_multi_assign(curl_handle, s, (void *) curl_context);
    }

    switch (action) {
        case CURL_POLL_IN:
            uv_poll_start(&curl_context->poll_handle, UV_READABLE, curl_perform);
            break;
        case CURL_POLL_OUT:
            uv_poll_start(&curl_context->poll_handle, UV_WRITABLE, curl_perform);
            break;
        case CURL_POLL_REMOVE:
            if (socketp) {
                uv_poll_stop(&((curl_context_t*)socketp)->poll_handle);
                destroy_curl_context((curl_context_t*) socketp);                
                curl_multi_assign(curl_handle, s, NULL);
            }
            break;
        default:
            abort();
    }

    return 0;
}

ここでソケットのfd saction に対して興味があることを表明します。もし存在しない場合あ、各ソケットに対して uv_poll_t ハンドルを生成し、 curl_multi_assign を使用しソケットとハンドルを関連付けます。 このようにしてコールバックが呼び出された時はいつでも socketp はソケットを指し示します。

ダウンロードが終了もしくは失敗したとき、libcurlはpollの除去をリクエストします。これで動作を停止し、pollハンドルを開放します。

libcurlが監視したいイベントに依存し、 UV_READABLE もしくは UV_WRITABLE をポーリングします。これでソケットが読み取りもしくは書き込み可能になった時はいつでもlibuvはpollコールバックを呼び出します。 同じハンドルが受け付け可能で複数回 uv_poll_start を呼び出すと、単にイベントマスクを新しい値で更新するだけです。 curl_perform はこのプログラムの核心です。

uvwget/main.c - Setting up polling

void curl_perform(uv_poll_t *req, int status, int events) {
    uv_timer_stop(&timeout);
    int running_handles;
    int flags = 0;
    if (events & UV_READABLE) flags |= CURL_CSELECT_IN;
    if (events & UV_WRITABLE) flags |= CURL_CSELECT_OUT;

    curl_context_t *context;

    context = (curl_context_t*)req;

    curl_multi_socket_action(curl_handle, context->sockfd, flags, &running_handles);

    char *done_url;

    CURLMsg *message;
    int pending;
    while ((message = curl_multi_info_read(curl_handle, &pending))) {
        switch (message->msg) {
            case CURLMSG_DONE:
                curl_easy_getinfo(message->easy_handle, CURLINFO_EFFECTIVE_URL, &done_url);
                printf("%s DONE¥n", done_url);

                curl_multi_remove_handle(curl_handle, message->easy_handle);
                curl_easy_cleanup(message->easy_handle);

                break;
            default:
                fprintf(stderr, "CURLMSG default¥n");
                abort();
        }
    }
}

最初にすべきことはタイマを停止することです。なぜならその間に何らかの進行があったからです。その後コールバックのきっかけとなったものに応じてlibcurlにその内容を伝達します。それから curl_multi_socket_action を変更のあったソケットと発生したイベントに関するフラグと共に呼び出します。この時点ではlibcurlは内部のタスクを少しずつ処理し、可能な限り速く制御を戻そうとします。これはまさにイベント駆動のプログラムがメインスレッドの中で望むことです。libcurlは転送の進行状況について自分自身のキューにメッセージをキューイングします。このケースでは完了した転送についてのみ興味があります。そのため、これらのメッセージを取り出し、転送が終了したもののハンドルを後かたづけします。

ウォッチャを確認・準備する

TODO

ライブラリの読み込み

libuvは動的に共有ライブラリを読み込むためのクラスプラットフォームのAPIを提供しています。このAPIは独自のプラグイン/エクステンション/モジュールシステムを実装するために使え、node.jsによってバインディングのための require() のサポートを実装するために使用されています。使用方法はライブラリが正しいシンボルをエクスポートするのと同じくらい簡単です。健全性(sanity)に注意し、第三者のコードをロードするときはセキュリティチェックを行わないと予期しない挙動を示すようになるでしょう。この例はプラグインの名前を印刷する以外にはなにもしないシンプルな実装です。

プラグインの著者に提供されているインターフェイスについて見てみましょう。

#ifndef UVBOOK_PLUGIN_SYSTEM
#define UVBOOK_PLUGIN_SYSTEM

void mfp_register(const char *name);

#endif
#include <stdio.h>

void mfp_register(const char *name) {
    fprintf(stderr, "Registered plugin ¥"%s¥"¥n", name);
}

プラグインの著者がアプリケーション内*2で有用な機能を使用するための関数を加えることもできます。このAPIを用いた単純なプラグインは以下です::

#include "plugin.h"

void initialize() {
    mfp_register("Hello World!");
}

インターフェイスは全てのプラグインがアプリケーションによって呼び出される initialize 関数を持っている必要があるという定義をしています。プラグインは共有ライブラリとしてコンパイルされ、アプリケーションを実行することによりロードされます::

$ ./plugin libhello.dylib
Loading libhello.dylib
Registered plugin "Hello World!"

これは 共有ライブラリである libhello.dylib を最初にロードするために uv_dlopen を使用することによって処理されます。その後 uv_dlsym を用いて initialize 関数にアクセスし、これを呼び出します。

plugin/main.c

#include "plugin.h"

typedef void (*init_plugin_function)();

int main(int argc, char **argv) {
    if (argc == 1) {
        fprintf(stderr, "Usage: %s [plugin1] [plugin2] ...¥n", argv[0]);
        return 0;
    }

    uv_lib_t *lib = (uv_lib_t*) malloc(sizeof(uv_lib_t));
    while (--argc) {
        fprintf(stderr, "Loading %s¥n", argv[argc]);
        if (uv_dlopen(argv[argc], lib)) {
            fprintf(stderr, "Error: %s¥n", uv_dlerror(lib));
            continue;
        }

        init_plugin_function init_plugin;
        if (uv_dlsym(lib, "initialize", (void **) &init_plugin)) {
            fprintf(stderr, "dlsym error: %s¥n", uv_dlerror(lib));
            continue;
        }

        init_plugin();
    }

    return 0;
}

uv_dlopen は共有ライブラリへのパスを要求し、隠蔽された(opaque) uv_lib_t ポインタに設定します。成功時には0を、エラー時には-1を返却します。エラーメッセージを取得するには uv_dlerror を使用します。

uv_dlsym は第二引数のシンボルへのポインタを第三引数に格納します。 init_plugin_function はアプリケーションのプラグインの中に探している関数へのポインタです。

TTY

テキスト端末は緩やかに標準化されたコマンドセットによる基本的なフォーマッティングを長い間サポートしています。このフォーマッティングは端末の出力の可読性を改善するためにたびたび用いられます。例えば grep --colour があります。libuvは uv_tty_t (ストリーム)による抽象化と、全てのプラットフォームを通じてANSIのエスケープコードを実装するための関数を提供しています。このことはlibuvはANSIのコードをWindowsにおける等価のものに変換し、端末の情報を取得するための関数を提供していることを意味します。

最初にすべきことはこれが読み書きすべきファイルディスクリプタとともに uv_tty_t を初期化することです。以下のようにします。

    int uv_tty_init(uv_loop_t*, uv_tty_t*, uv_file fd, int readable)

もし readable が falseの場合、このストリームにたいする uv_writeブロッキング されます。

uv_tty_set_mode を大部分のTTYフォーマット、フローコントロールや他の設定を可能にする normal (0) モードに設定するのが最良でしょう。 生の(raw)(1) モードもサポートされています。

端末の状態をリストアするためにプログラムの終了時に uv_tty_reset_mode を呼び出すことを忘れないでください。これが良いマナーというものです。他の良いマナーとしてはリダイレクトに対する注意があります。もしユーザがコマンドの出力をファイルにリダイレクトした場合、制御シーケンスは可読性や grep を損なうものであってはなりません。ファイルディスクリプタが本当にTTYかどうか確認するために、 uv_guess_handle をファイルディスクリプタとともに呼び出し、戻り値を UV_TTY と比較してください。

以下は赤の背景上に白いテキストで表示を行う単純なサンプルです:

tty/main.c

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <uv.h>

uv_loop_t *loop;
uv_tty_t tty;
int main() {
    loop = uv_default_loop();

    uv_tty_init(loop, &tty, 1, 0);
    uv_tty_set_mode(&tty, 0);
    
    if (uv_guess_handle(1) == UV_TTY) {
        uv_write_t req;
        uv_buf_t buf;
        buf.base = "¥033[41;37m";
        buf.len = strlen(buf.base);
        uv_write(&req, (uv_stream_t*) &tty, &buf, 1, NULL);
    }

    uv_write_t req;
    uv_buf_t buf;
    buf.base = "Hello TTY¥n";
    buf.len = strlen(buf.base);
    uv_write(&req, (uv_stream_t*) &tty, &buf, 1, NULL);
    uv_tty_reset_mode();
    return uv_run(loop, UV_RUN_DEFAULT);
}

最後のTTYヘルパは uv_tty_get_winsize() で、端末の幅と高さを取得するために使われ、成功時に 0 を返却します。以下に関数と文字位置のエスケープコードを使用してアニメーションを処理する小さなプログラムを示します。

tty-gravity/main.c

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <uv.h>

uv_loop_t *loop;
uv_tty_t tty;
uv_timer_t tick;
uv_write_t write_req;
int width, height;
int pos = 0;
char *message = "  Hello TTY  ";

void update(uv_timer_t *req, int status) {
    char data[500];

    uv_buf_t buf;
    buf.base = data;
    buf.len = sprintf(data, "¥033[2J¥033[H¥033[%dB¥033[%luC¥033[42;37m%s",
                            pos,
                            (unsigned long) (width-strlen(message))/2,
                            message);
    uv_write(&write_req, (uv_stream_t*) &tty, &buf, 1, NULL);

    pos++;
    if (pos > height) {
        uv_tty_reset_mode();
        uv_timer_stop(&tick);
    }
}

int main() {
    loop = uv_default_loop();

    uv_tty_init(loop, &tty, 1, 0);
    uv_tty_set_mode(&tty, 0);
    
    if (uv_tty_get_winsize(&tty, &width, &height)) {
        fprintf(stderr, "Could not get TTY information¥n");
        uv_tty_reset_mode();
        return 1;
    }

    fprintf(stderr, "Width %d, height %d¥n", width, height);
    uv_timer_init(loop, &tick);
    uv_timer_start(&tick, update, 200, 200);
    return uv_run(loop, UV_RUN_DEFAULT);
}

エスケープコードは以下です。

コード 意味
2 J スクリーンの一部をクリアします。2はスクリーン全体を表します。
H カーソルを特定の位置に移動します。デフォルトは左上です。
n B カーソルをn行下に移動します。
n C カーソルをn列右に移動します。
m 表示設定の文字列に従います。この場合緑の背景(40+2)、白いテキスト(30+7)です。

これがよくフォーマットされた出力を生成するために有用であることがわかるでしょうし、コンソールベースのアーケードゲームでも想像力をくすぐられるでしょう。もっとよい制御を行うためにncursesを試すこともできます。

*1:この文脈におけるバトンを最初に目にしたのは、node.jsのバインディング記述に関するKonstantin Käfer'sの素晴らしいスライドの中でした。-- http://kkaefer.github.com/node-cpp-modules/#baton

*2:mfpは空想上のプラグインです。