Arantium Maestum

プログラミング、囲碁、読書の話題

OCamlとLwtでマルチクライアントなエコーサーバを実装してみる

OCamlで非同期的プログラミングをするための著名なライブラリが二つある。Jane StreetのAsyncと`Ocsigenプロジェクト発のLwtだ。

個別の機能のネーミングは多少違うが、概念的には非常に似通っているようで、協力的マルチタスクによる非同期プログラミングをモナドAPIで提供している。

OCamlPythonなどと同じく、ガベージコレクションの関係でネイティブスレッドによる並列実行があまりうまくいかない(multicore OCaml早く来てくれ・・・)ので、協調的マルチタスクが主流になるのはある意味必然的。

Lwtの勉強のために、同時に複数の(ローカルな)クライアントが接続可能なエコーサーバを最低限に近い形で実装してみる。

以下のサイトを参考にしている:

www.baturin.org

今回のコード:

Minimal Lwt multi-client echo server based on https://www.baturin.org/code/lwt-counter-server/ · GitHub

コード解説

まずは頻出のモジュールをopenしておく:

open Lwt
open Lwt.Syntax

Lwt.SyntaxはLwtをOCamllet*モナド記法で扱えるようにするためのモジュール。Lwtに限らずモナドAPIを提供するOCamlライブラリではこういうSyntaxモジュールを用意するのが推奨されているようだ。

ここからは、プログラムの大枠から話すためにコードを下から上に辿っていく。

まずはmainに相当するlet () = ...部分:

let () =
  let open Lwt_unix in
  let sock = socket PF_INET SOCK_STREAM 0 in
  let sock_addr = ADDR_INET(Unix.inet_addr_loopback, 9000) in
  async (fun () -> bind sock sock_addr);
  listen sock 10;
  Lwt_main.run @@ server_loop sock

Unix関連のユーティリティが定義されているLwt_unixをローカルスコープで開いておいて、ソケットを作成、特定のアドレス(この場合はポート9000)に束縛、listenでアクセスし始める。ループしながら外部からの接続を受け入れるserver_loop関数にそのソケットを渡し、その関数が返すpromise的なスレッドをメインスレッド(終了するまでプログラムが走り続ける)としてLwt_main.runで実行する、という流れになっている。

server_loopの実装は以下の通り:

let rec server_loop sock =
  let* conn_fd, _ = Lwt_unix.accept sock in
  async (fun () -> accept_conn conn_fd);
  server_loop sock

ループなので再帰関数になっている。

第一行ではソケットから接続が入ってくるのを待つLwt_unix.accept sockが返すpromiseを、OCamlモナドlet*記法でファイルデスクリプタ+αに展開している。let*を使うことで、このpromiseが解決するまでこのスレッド内部の処理はブロックされ、以降の行は実行されない。このpromiseの解決を待っている間、他のスレッドが実行されることになる(もし他のスレッドが存在していれば)。

次の行ではpromiseが解決して受け取ったconnectionファイルデスクリプタを(後述する)accept_conn関数に渡す処理となっている。前の行と違い、async (fun () -> ...);という構文を使うことによってpromiseの解決を待たずに次の行の実行に移るようになっている。ignore @@ accept_conn conn_fd;でも似たような結果になるが、accept_conn conn_fdのpromiseがエラーを返したときにignoreだと文字通り無視されてしまうのに対してasync (fun () ...を使うとエラーがちゃんとログに出力される。

最後の行はループのための再帰

accept_conn関数:

let accept_conn conn_fd =
  let ic = Lwt_io.of_fd ~mode:Lwt_io.Input conn_fd in
  let oc = Lwt_io.of_fd ~mode:Lwt_io.Output conn_fd in
  request_response_loop ic oc

接続のファイルデスクリプタを受けとり、そのデスクリプタからLwt_io.of_fdを使って入力用と出力用チャンネルを作り、接続からの入力を非同期的ループで処理するrequest_response_loop関数に渡している。

request_response_loop関数:

let rec request_response_loop ic oc =
  let* msg = Lwt_io.read_line_opt ic in
  match msg with
  | Some text ->
      let* () = Lwt_io.write_line oc text in
      request_response_loop ic oc
  | None -> return ()

Lwt_io.read_line_opt icで非同期的に入力チャンネルからの文字列を受けとり(ただし入力チャンネルが閉じられた場合などはNone、文字列が受け取れた場合はSome ...となる)、それを受けとるまでスレッドをblockしながらmsgに束縛。

msgにパターンマッチして、Some textだった場合は出力チャンネルにtextを渡してループ。

let* () = ...とblockingにしているがnon-blockingでも良さそうな気もしている。non-blockingの方が次の入力を早く受けとって処理を開始できる。がそうすると同じクライアントからのメッセージを返す順番が保証されないかもしれない。同じ出力チャンネルに対するLwt_io.write_lineが非同期ではあっても順番を遵守するかどうかがポイントだが、まだ調べていない。

msgNoneの場合はreturn ()でユニットを非同期的に返す。

以上でコードは終わり。ネットワーク関連のそれなりに低レベルな処理をしながら28行に収まっている。

ビルドのためにduneスクリプトを同じディレクトリに入れておく:

(executable
  (name server)
  (libraries lwt.unix))

実行手順

dune exec ./server.exe

としてサーバを立ち上げてから、別のターミナルからtelnet localhost 9000とクライアントとしてポートにアクセスすればいい。複数のターミナルから同時に繋げて、好きな文字列を入力すると全く同じ文字列が返される。

$ telnet localhost 9000
...
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hi
hi
ho
ho
hum
hum

マルチクライアントからシングルクライアントへ

let rec server_loop sock =
  let* conn_fd, _ = Lwt_unix.accept sock in
  async (fun () -> accept_conn conn_fd);
  server_loop sock

let rec server_loop sock =
  let* conn_fd, _ = Lwt_unix.accept sock in
  let* () = accept_conn conn_fd in
  server_loop sock

と変えるとコネクションの処理がblockingとなりシーケンシャルに実行されるので、最初にアクセスしたクライアントが接続を閉じるまで他のクライアントは待たされるようになる。

establish_server_with_client_address

今回は勉強のために一から書いているが、実際にはLwt_ioestablish_server_with_client_socketestablish_server_with_client_addressという関数が用意されていて、特に後者は~no_close:trueというパラメータを渡しておけばserver_loopaccept_connとまったく同じことをしてくれるようだ。大抵の場合はこの関数で事足りるのではないだろうか。