OCamlとLwtでマルチクライアントなエコーサーバを実装してみる
OCamlで非同期的プログラミングをするための著名なライブラリが二つある。Jane StreetのAsyncと`Ocsigenプロジェクト発のLwtだ。
個別の機能のネーミングは多少違うが、概念的には非常に似通っているようで、協力的マルチタスクによる非同期プログラミングをモナドAPIで提供している。
OCamlはPythonなどと同じく、ガベージコレクションの関係でネイティブスレッドによる並列実行があまりうまくいかない(multicore OCaml早く来てくれ・・・)ので、協調的マルチタスクが主流になるのはある意味必然的。
Lwtの勉強のために、同時に複数の(ローカルな)クライアントが接続可能なエコーサーバを最低限に近い形で実装してみる。
以下のサイトを参考にしている:
今回のコード:
コード解説
まずは頻出のモジュールをopenしておく:
open Lwt open Lwt.Syntax
Lwt.Syntax
はLwtをOCamlのlet*
モナド記法で扱えるようにするためのモジュール。Lwtに限らずモナドAPIを提供するOCamlライブラリではこういうSyntax
モジュールを用意するのが推奨されているようだ。
ここからは、プログラムの大枠から話すためにコードを下から上に辿っていく。
まずはmain
に相当するlet () = ...
部分:
let () = let open Lwt_unix in let sock = socket PF_INET SOCK_STREAM 0 in let sock_addr = ADDR_INET(Unix.inet_addr_loopback, 9000) in async (fun () -> bind sock sock_addr); listen sock 10; Lwt_main.run @@ server_loop sock
Unix関連のユーティリティが定義されているLwt_unix
をローカルスコープで開いておいて、ソケットを作成、特定のアドレス(この場合はポート9000)に束縛、listen
でアクセスし始める。ループしながら外部からの接続を受け入れるserver_loop
関数にそのソケットを渡し、その関数が返すpromise的なスレッドをメインスレッド(終了するまでプログラムが走り続ける)としてLwt_main.run
で実行する、という流れになっている。
server_loop
の実装は以下の通り:
let rec server_loop sock = let* conn_fd, _ = Lwt_unix.accept sock in async (fun () -> accept_conn conn_fd); server_loop sock
ループなので再帰関数になっている。
第一行ではソケットから接続が入ってくるのを待つLwt_unix.accept sock
が返すpromiseを、OCamlのモナドlet*
記法でファイルデスクリプタ+αに展開している。let*
を使うことで、このpromiseが解決するまでこのスレッド内部の処理はブロックされ、以降の行は実行されない。このpromiseの解決を待っている間、他のスレッドが実行されることになる(もし他のスレッドが存在していれば)。
次の行ではpromiseが解決して受け取ったconnectionファイルデスクリプタを(後述する)accept_conn
関数に渡す処理となっている。前の行と違い、async (fun () -> ...);
という構文を使うことによってpromiseの解決を待たずに次の行の実行に移るようになっている。ignore @@ accept_conn conn_fd;
でも似たような結果になるが、accept_conn conn_fd
のpromiseがエラーを返したときにignore
だと文字通り無視されてしまうのに対してasync (fun () ...
を使うとエラーがちゃんとログに出力される。
最後の行はループのための再帰。
accept_conn
関数:
let accept_conn conn_fd = let ic = Lwt_io.of_fd ~mode:Lwt_io.Input conn_fd in let oc = Lwt_io.of_fd ~mode:Lwt_io.Output conn_fd in request_response_loop ic oc
接続のファイルデスクリプタを受けとり、そのデスクリプタからLwt_io.of_fd
を使って入力用と出力用チャンネルを作り、接続からの入力を非同期的ループで処理するrequest_response_loop
関数に渡している。
request_response_loop
関数:
let rec request_response_loop ic oc = let* msg = Lwt_io.read_line_opt ic in match msg with | Some text -> let* () = Lwt_io.write_line oc text in request_response_loop ic oc | None -> return ()
Lwt_io.read_line_opt ic
で非同期的に入力チャンネルからの文字列を受けとり(ただし入力チャンネルが閉じられた場合などはNone
、文字列が受け取れた場合はSome ...
となる)、それを受けとるまでスレッドをblockしながらmsg
に束縛。
msg
にパターンマッチして、Some text
だった場合は出力チャンネルにtext
を渡してループ。
let* () = ...
とblockingにしているがnon-blockingでも良さそうな気もしている。non-blockingの方が次の入力を早く受けとって処理を開始できる。がそうすると同じクライアントからのメッセージを返す順番が保証されないかもしれない。同じ出力チャンネルに対するLwt_io.write_line
が非同期ではあっても順番を遵守するかどうかがポイントだが、まだ調べていない。
msg
がNone
の場合はreturn ()
でユニットを非同期的に返す。
以上でコードは終わり。ネットワーク関連のそれなりに低レベルな処理をしながら28行に収まっている。
ビルドのためにduneスクリプトを同じディレクトリに入れておく:
(executable (name server) (libraries lwt.unix))
実行手順
dune exec ./server.exe
としてサーバを立ち上げてから、別のターミナルからtelnet localhost 9000
とクライアントとしてポートにアクセスすればいい。複数のターミナルから同時に繋げて、好きな文字列を入力すると全く同じ文字列が返される。
$ telnet localhost 9000 ... Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. hi hi ho ho hum hum
マルチクライアントからシングルクライアントへ
let rec server_loop sock = let* conn_fd, _ = Lwt_unix.accept sock in async (fun () -> accept_conn conn_fd); server_loop sock
を
let rec server_loop sock = let* conn_fd, _ = Lwt_unix.accept sock in let* () = accept_conn conn_fd in server_loop sock
と変えるとコネクションの処理がblockingとなりシーケンシャルに実行されるので、最初にアクセスしたクライアントが接続を閉じるまで他のクライアントは待たされるようになる。
establish_server_with_client_address
今回は勉強のために一から書いているが、実際にはLwt_ioにestablish_server_with_client_socket
とestablish_server_with_client_address
という関数が用意されていて、特に後者は~no_close:true
というパラメータを渡しておけばserver_loop
とaccept_conn
とまったく同じことをしてくれるようだ。大抵の場合はこの関数で事足りるのではないだろうか。