EchoサーバーをI/O Multiplexingに育てる

グリーンスレッドの実装を考えていたらいつの間にかタイトルのようなことをやっていた。 実装した全部載せはリポジトリに置いてあるので、何を考えたかを吐き出す。

github.com

simple echo

echo サーバー( "hello" と受け取って "hello" と返す)のように、 クライアントからの読み込みと書き込み間でコンテキストが発生する通信のサーバーの実装を考える。 シンプルに単一な accept ループを使う場合、一つのクライアントとの読み書きを終えるまで次のクライアントとは何もできない。 read/write の処理ではブロックするし、同期的に全ての読み書きが処理される。 なので処理中に現れる新規のクライアントは listen で指定された backlog に積まれ、次の accept が来るのを待つことになる。

  • simple_echo.c
int main(int argc, char** argv) {
        soc = socket(AF_INET, SOCK_STREAM, 0) == -1;
        bind(soc, (struct sockaddr *)&saddr, saddrlen);
        listen(soc, CONNECTION);

        while(1) {
                acc = accept(soc, (struct sockaddr *)&caddr, &caddrlen);

                do {
                        read(acc, buf, sizeof(buf));
                        write(acc, buf, strlen(buf));
                } while (strcmp(buf, "Bye!!") != 0);

                read(acc, buf, sizeof(buf));

                close(acc);
        }

        close(soc);
        return 0;
}

asynchronous echo

クライアントは同時に複数捌けたほうが効率は良いので、 まずは同期的な処理を改善する。 accept は単体であれば非同期に呼び出しても問題はなく、読み書きも fd 単位で独立していれば非同期にできる。 非同期処理には thread(または process )を用いる。 この方法にも色々種類はあって、例えば accept 毎に thread(process)を作成する方法や、 予め thread(process)を作成しておいてそれぞれで accept ループを実行する方法がある。

thread か process かによっても違いは生じる。 thread は process に比べて軽量であるが、ヒープやファイルディスクリプタを共有しているため、 適切にロックを入れることによって実行順序やデッドロックを考慮する必要がある。 逆に process はまったく別の(実行単位としての)プロセスとしてプログラムが実行されるため、 上記のように注意する点は少ないが、例えば、fork を呼び出すまでに使用していた fd は fork 後に再度有効になるなど、多少は考慮する点がある。

実装は thread を accept 毎に立ち上げるもの、process を accept 毎に立ち上げるもの、 予め固定数の thread を立ち上げておくもの、予め固定数の process を立ち上げておくものを実装した(つまり全部)。 一応コネクション数には上限を設けている。

I/O Multiplexing echo

非同期的に処理を行うことによって複数のクライアントを同時に扱うことができるようになったが、 各 thread(process)別に見てみると、単一なクライアントとの処理を全て終えるまで他のクライアントに関する処理は行っていない。 加えて、read/write 処理中は fd が無効であれば待ちが発生するためブロックされている。 これを epoll を使用して改善を行う。

I/O Multiplexing な状態であるとは、同時に複数の fd に対して状態監視を行うことで、単一の fd の状態によって全体がブロックされるのを防いでいることである。 これを複数 thread(process)な状態で適用する方法は色々あるが、以下の方針を取ることにした。

  • 対象とするプログラムは pre_thread_echopre_fork_echo
  • accept ループは本体行う
  • 各 thread(process)は epoll_wait(2) で受け取った fd に対して一度きりの I/O 操作を行う

これによってどういう状態になるかというと、 各 thread(process)は複数のクライアントと同時にやり取りを行うようになる。 thread(process)別に見るとクライアントとのやり取りは同期的であるが、 read/write による個別のブロッキングを避けるため、待ちの状態が発生しなくなり、効率的に読み書きを行うことが可能になる。

補足

epoll は今回 EPOLLONESHOT を適用して epoll_wait で一度受け取った fd は無効化するようにしていたが、 accept と異なってこの処理は複数 thread(process)間で同期的に行う必要があった*1。 ここが今回のハマりポイントで、気がつくまで結構時間を取られた。 epoll_pre_thread_echo.c での実装では、結局 epoll_wait 時には lock を入れなければならなかった。 lock フリーな実装を行う方法として、read/write は結局一つの thread しか成功しないため、 ノンブロッキングな状態で read/write 実行し返り値を見る方法を考えたが、 例えば read する対象のサイズが大きいため realloc が必要になる場合、 複数回の read を発行する必要があり、別 thread での read 防ぐために lock が必要になるなど、本末転倒。 よって実装では epoll_wait に lock を入れている。

epoll_pre_fork_echo.c の実装は不可解な処理をしているが、それは複数 process 間で fd の受け渡しを行うために行っている。 メインプロセス中の accept の返り値でクライアントとの有効な fd を受け取るが、もちろん子プロセスでは有効なはずはない。 そのため、メインプロセスと子プロセス間で sendmsg/recvmsg を使ったやり取りを行った fd を受け渡す。 この辺は全然理解せずに使用したので、いつかまた仕組みを詳しく見たい。

ちなみに、epoll を用いたこれらの実装では、クライアントと echo なんてしていなくて、実際は固定の文字列を返しているだけである。 というのも、I/O Multiplexing な状態で echo のコンテキストスイッチが結構面倒で、 やればできるし実装はいいやということで投げている(epoll_event 構造体メンバのdata.ptrにポインタを入れるだけなので楽といえば楽)。

performance

今回は特に計測を目的としていないが、一応どれほど違いがあるのかについて計測は行った。 結果だけ述べると epoll_pre_fork_echo.c が最も速く、次点で epoll_pre_thread_echo.c という結果だった。 epoll を用いて I/O Multiplexing を行っている実装が高速なのは納得であるが、 実は epoll_pre_fork_echo.cepoll_pre_thread_echoc.c にも明確に差が出ている。 これは上記の lock の問題が効いているのかなぁと思う。真面目に実装したら変わるのだろうか。

ちなみに計測は golang の適当なスクリプトで行った。 https://github.com/s4ichi/io_multiplexing_echo_server/blob/master/client.go

reference

Rack::Timeout::RequestTimeoutException の仕組み

Ruby の webserver に使われるミドルウェアである Rack に、timeout を管理する拡張 gem の rack-timeout がある。 その仕事として例えば、レスポンスを返すまでに指定した秒数以上の時間がかかる場合、その秒数で Rack::Timeout::RequestTimeoutException を例外として発生させる。

github.com

タイムアウトすると聞いただけでは、処理を中断させて例外を raise しているんだろうと想像できる。 そのためタイムアウトした処理の場所や環境が取得できないように思えるが、その例外の backtrace にはきちんとタイムアウトした処理の行の backtrace が載っている。 どうせRubyなんだしできるんだろうなと思っていたけど、直感で方法が思いつかなかったのでコード読んでみた*1

rack-timeout/lib/rack/timeout/core.rb

timeout = RT::Scheduler::Timeout.new do |app_thread|
  register_state_change.call :timed_out
  app_thread.raise(RequestTimeoutException.new(env))
end

response = timeout.timeout(info.timeout) do
  begin  @app.call(env)
  rescue RequestTimeoutException => e
    raise RequestTimeoutError.new(env), e.message, e.backtrace
  ensure
    register_state_change.call :completed
  end
end

github

この辺読むだけで良かった、RT::Scheduler::Timeout.new 後にアプリケーションのスレッドで RequestTimeoutException を raise しているのがわかる。 Thread#raiseそのスレッドで例外を発生させる。 しかも backtrace はそのスレッドが処理をしていた行となるので完全に便利。 つまるところアプリケーションの本来の処理を行うスレッドと別のスレッドで秒数をカウントし、 秒数がオーバーしたらアプリケーションのスレッドに向けて例外を発生させるだけというシンプルなものだった。

ちなみにRT::Scheduler::Timeout は下のようなコードになっていて、Thread.currentでアプリケーションスレッドを参照し、 @scheduler で時間に関する処理をしつつ、タイムアウト時には @on_timeout.call を呼んでいる。@scheduler の中身に関しては本質的ではなかったので触れない。

rack-timeout/lib/rack/timeout/support/timeout.rb

ON_TIMEOUT = ->thr { thr.raise Error, "execution expired" }

def initialize(&on_timeout)
  @on_timeout = on_timeout || ON_TIMEOUT
  @scheduler  = Rack::Timeout::Scheduler.singleton
end

def timeout(secs, &block)
  return block.call if secs.nil? || secs.zero?
  thr = Thread.current
  job = @scheduler.run_in(secs) { @on_timeout.call thr }
  return block.call
ensure
  job.cancel! if job
end

github

ちなみに簡素な作りで良ければこういう感じに書ける。

class TimeoutError < StandardError; end

def call_with_timeout(time, &block)
  thr = Thread.current

  Thread.new do
    sleep(time)
    thr.raise(TimeoutError.new)
  end

  return block.call
end

begin
  call_with_timeout(2) do
    3.times do
      sleep(1)
      puts "Hello Thread!"
    end
  end
rescue TimeoutError => e
  puts "timeout!!"
end

実行結果

$ ruby call_with_timeout.rb
Hello Thread!
timeout!!

*1:この記事のコード例は github から引用し、不要な部分を削減しているため注意

Big bag of pagesで型情報を節約する

言語実装アドベントカレンダー20174日目の記事です。

言語実装、特に動的型付け言語の実装においては、実行時に値を扱う際、値本体の他に型などのメタ情報を持たせる必要がある。 静的に解析が可能な言語と違って実行時にしか解析ができないからだ。 しかし、言語の特性を考慮して実装をしないと非効率的なメモリの使い方をしてしまう場合がある。 その対応策として Big bag of pages というメモリの扱い方があるので、どういう部分で有用なのか紹介する。

例えば、C言語で純粋に実装をすると、ゲスト言語*1上で整数を解釈する場合

// guest language input: 35

typedef struct lang_value {
        union {
                int i;
                double d;
                struct fat_struct object;
        } val;
        enum lang_type type;
} lang_value;

lang_value* p = malloc(sizeof(lang_value));
p->val.i = 35;
p->type = Int;

と記述するのが最も簡易的であるし、様々なアーキテクチャ間での互換性を保つことができる。 しかし、この方法で例えば1 + 2 + 3 + ... といった整数値の確保を大量に行うのであれば、 unionによってintのサイズより大きく確保されている構造体を扱う必要があるし、 メタ情報も毎回確保しなければならないので非効率になる。

こういった実装を効率化する方法として、javascriptやmruby、CRubyのなどの処理系では、 ビットの未使用区間を再利用することで、ポインタそのものに型情報を付与するNaN boxingやTagged pointerの手法が取られたりしている。

Big bag of pages

Big bag of pages(以下、bibop)は、ビットそのものに工夫をするといった方法を取らず、ヒープ領域を独自に管理することによって、 値のメタ情報の提供を外部テーブルに任せる仕組みである。具体的には下図のように、ヒープに対して予め型ごとにページを決定しておき、 実行時は決められた領域の先頭アドレスを返すだけでメタ情報がメモリアクセスをすること無く引くことができる。

f:id:everysick:20171203181621j:plain 単純な構想では番地ごとにメタ情報が必要なように見えるが、図のようにパターンがある場合は型情報はビット演算で算出することも可能である。 また、こうして割り当てられている型ごとのページは、それぞれの型の純粋なサイズで確保されるため、unionを使った共通部分の不要なメモリを削減することもできる。

glibcなどの提供するメモリ管理機構におんぶ抱っこすることができないため、実装コストは大きくなるが、 メモリ使用量・メモリアクセス共に効率化を図ることができる。

サンプル実装

sbrk(2)を用いて非常に小さなサンプルを実装した。free_listなどの実装を含んでいないため、メタ情報に使用状況を載せている。 また、出力に用いた部分の実装などは本質ではないため省いている。必要であればリポジトリを参考*2

#define TYPE_NUM 3
#define MAX_PAGE 6

enum type {
    Integer,
    Float,
    Char
};

enum state {
    Free,
    Used
};

typedef struct page_info {
    type t;
    void* p;
    int state;
} page_info;

int main(int argc, char** argv) {
    int i, j;

    page_info pages[MAX_PAGE];

    void *start_address, *end_address;
    size_t total_size;
    size_t type_size[3] = {
        sizeof(int),
        sizeof(float),
        sizeof(char),
    };

    total_size = type_size[0] + type_size[1] + type_size[2];
    start_address = sbrk(0);

    // 型の要求サイズ * ページ分ヒープを拡張
    end_address = sbrk(total_size * MAX_PAGE);

    // ページの先頭アドレスとメタ情報を紐付ける
    for (i = 0; i < MAX_PAGE; i++) {
        size_t size_offset = 0;

        for (j = 0; j < (i % TYPE_NUM); j++) {
            size_offset += type_size[j];
        }

        pages[i].t = (type)(i % TYPE_NUM);
        pages[i].p = start_address + (total_size * (i / TYPE_NUM)) + size_offset;
        pages[i].state = Free;
    }

    // 全ページ出力
    print_page_info(pages);

    // ページの4つ目(int)に対して値を割り当てる
    int* same_value = (int*)pages[3].p;
    pages[3].state = Used;
    *same_value = 100;

    // 全ページ出力
    print_page_info(pages);

    return 0;
}

出力はこんな感じ

start_address: 0x1ef4000
end_address: 0x1ef4000
page[0]:
    pointer: 0x1ef4000
    type is Integer
    State: free
page[1]:
    pointer: 0x1ef4004
    type is Float
    State: free
page[2]:
    pointer: 0x1ef4008
    type is Char
    State: free
page[3]:
    pointer: 0x1ef4009
    type is Integer
    State: free
page[4]:
    pointer: 0x1ef400d
    type is Float
    State: free
page[5]:
    pointer: 0x1ef4011
    type is Char
    State: free


Try allocate integer value 100 to pages[3]
page[0]:
    pointer: 0x1ef4000
    type is Integer
    State: free
page[1]:
    pointer: 0x1ef4004
    type is Float
    State: free
page[2]:
    pointer: 0x1ef4008
    type is Char
    State: free
page[3]:
    pointer: 0x1ef4009
    type is Integer
    Value: 100
page[4]:
    pointer: 0x1ef400d
    type is Float
    State: free
page[5]:
    pointer: 0x1ef4011
    type is Char
    State: free

page[3]に意図した通りの値が載ってる。よかったね。

*1:実装する言語をホスト、実装される言語をゲストと呼び分ける

*2:GitHub - s4ichi/big_bag_of_pages: sample implementation of big bag of pages