Arantium Maestum

プログラミング、囲碁、読書の話題

OCaml Effects Tutorialの演習「Async/await」をやってみる

OCaml 5.0にeffect handlerが入った理由の一つである、並行処理をdirect styleで記述できるasync/awaitの実装をやってみる。

この演習:

github.com

演習用のコード・サンプルはこれ:

github.com

Schedulerのインタフェース

この演習ではasync/awaitといった並行プログラミング機能をSchedulerモジュールに実装することになる。

SchedulerのシグニチャSCHEDULERは以下の通り:

module type SCHEDULER = sig
  type 'a promise
  (** Type of promises *)

  val async : (unit -> 'a) -> 'a promise
  (** [async f] runs [f] concurrently *)

  val await : 'a promise -> 'a
  (** [await p] returns the result of the promise. *)

  val yield : unit -> unit
  (** yields control to another task *)

  val run   : (unit -> 'a) -> unit
  (** Runs the scheduler *)
end

Schedulerのインタフェースとして:

  • 並行的に実行されているタスクが最終的に結果を返す「値」を表すpromiseという多相な型
  • thunkされている処理を引数にとり、それを並行的に実行する(そしてすぐにそのタスクの結果のpromiseを返す)async関数
  • promiseに対して「その結果が出るまで一時停止して待つ」ことで値を受け取れるawait関数
  • 並行的に実行されているタスクが一旦停止してコントロールをスケジューラに返すyield関数
  • async/awaitを使っているthunk関数を渡してスケジューラを実行するrun関数

が提供されている。

使用例

使い方としては以下のようになる:

let main () =
  let module S = Scheduler in
  let task name () =
    Printf.printf "starting %s\n%!" name;
    let v = Random.int 100 in
    Printf.printf "yielding %s\n%!" name;
    S.yield ();
    Printf.printf "ending %s with %d\n%!" name v;
    v
  in
  let pa = S.async (task "a") in
  let pb = S.async (task "b") in
  let pc = S.async (fun () -> S.await pa + S.await pb) in
  Printf.printf "Sum is %d\n" (S.await pc);
  assert (S.await pa + S.await pb = S.await pc)

let () = Scheduler.run main

Schedulerの中身

まずSchedulerモジュールはSCHEDULERシグニチャを持ち、Effect.Deepを短いローカル名Dとして使っている:

module Scheduler : SCHEDULER = struct
  module D = Effect.Deep

  ...

end

promiseの定義:

  type 'a _promise =
  | Waiting of ('a,unit) D.continuation list
  | Done of 'a

  type 'a promise = 'a _promise ref

まずイミュータブルな'a _promise型を定義し、それのrefとして'a promiseを定義している。

'a _promiseはWaitingとDoneというバリアントを持つ。Waitingにはこのpromiseの結果を待って一時停止している処理の限定継続がリストとして保持されている。promiseが解決したらこれら限定継続に値を渡して逐次スケジュールしていく必要がある。Doneは単にそのpromiseが解決した結果の値を保持している。

'a promiseがrefである理由は「タスクがそのpromiseに対してawaitする」「promiseの処理が終わって結果が出る」などの状況で内容が動的に変わる必要があるからだ。

async, yield, awaitの定義:

  type _ Effect.t +=
  | Async : (unit -> 'a) -> 'a promise Effect.t
  | Yield : unit Effect.t
  | Await : 'a promise -> 'a Effect.t

  let async f = Effect.perform (Async f)
  let yield () = Effect.perform Yield
  let await p = Effect.perform (Await p)

これらは全部エフェクトとして定義されている。SCHEDULERで定義されている各関数の型とエフェクトのGADTがしっかり合致している。

スケジューラのキュー:

  let q = Queue.create ()
  let enqueue t = Queue.push t q
  let dequeue () =
    if Queue.is_empty q then ()
    else Queue.pop q ()

破壊的変更を加える非関数型なQueueを使っていて、モジュールの保持する値として一つの可変なQueueが作成される。このQueueに対してenqueueで新しいタスクを載せる(タスクはunit -> unitな関数)。dequeueでQueueからタスクを取り出し実行する。タスクが残っていない場合はそのまま()を返す。

エフェクトハンドラを作成するmake_handler関数:

  let rec make_handler : 'a. 'a promise -> ('a, unit) D.handler = fun promise ->
    { D.retc = ...;
      D.exnc = ...;
      D.effc = ...}

handlerをそのまま定義するのではなく何らかのpromiseに対してparametrizeしている。これはasync関数でpromiseを作る際そのpromiseをhandlerに紐づける必要が生じるからだ。またhandlerの中で再帰的にhandlerを作ってmatch_withしたりするので再帰関数になっており、さらに多相である必要があるので明示的な型注釈がついている。

Effect.Deepのドキュメントをみればわかるが、type ('a, 'b) handlerの'a型はhandlerをかけて実行する関数の返り値の型、'bはhandlerのretcでその'a型の返り値を受けてmatch_with f handlerが最終的に返す値の型だ。

なのでmatch_with f x (make_handler promise)という式があってxがa型、promiseがb promise型だった場合、fはa -> b型、make_handler promise(b, unit) handler型でmatch_with f x (make_handler promise)全体はunit型となる。

それではハンドラ内の処理を見ていく。

まずretc:

      D.retc = (fun v ->
          match !promise with
          | Done _ -> failwith "Trying to close a closed promise"
          | Waiting ks ->
                let enqueue_continue k = enqueue (fun () -> D.continue k v) in
                List.iter enqueue_continue ks;
                promise := Done v;
                dequeue ()
          );

retcはmatch_with f x (make_handler promise)f xという関数適用が正常に終わり値を返してきたケースだ。Schedulerモジュールで定義されたhandlerが付いた状態でmatch_withしたということは、特定のpromiseが紐づいている(make_handlerの引数として渡されたpromiseがretcの関数のクロージャに捕捉されている)。f xという処理が正常に終了して値が返ってきた時点で、その値がpromiseの結果の値となる。行う処理は三つ:

  1. このpromiseに対してawaitしている処理すべてに、結果の値を渡して再実行するタスクをスケジュールする
  2. promiseの状態をDoneにする
  3. スケジューラにコントロールを渡して次のタスクを走らせる

これらがmatch !promiseWaiting ksケースで行われている。ちなみにpromiseに紐づくタスクが終了して結果を返すのは最大一回のはずなので、retcが走る時点でpromiseがDoneになっているというのは起きえない(起きたらバグ)。

exncのケース:

      D.exnc = raise;

これは簡単でただ単にmatch_with実行中に上がってきたハンドルされていないエラーは上に投げ直す。

effcに関してはAsync、Yield、Awaitの三種のエフェクトに対してのパターンマッチが書かれている。

まずAsyncエフェクト:

      D.effc = (fun (type b) (eff: b Effect.t) ->
          match eff with
          | Async f -> Some (fun (k: (b,_) D.continuation) ->
                  let promise' = ref (Waiting []) in
                  enqueue (fun () -> D.continue k promise');
                  D.match_with f () (make_handler promise')
          )

x = async fでfが非同期的に実行され、最終的にf ()の結果の値に解決するpromiseに変数xが束縛される。

処理の流れとしては以下の通り:

  1. 新しいpromiseを作成
  2. 「そのpromiseを現在のAsyncエフェクトが発生した地点の限定継続に渡す」という処理をタスクとしてスケジュールする
  3. そのpromiseを使って作ったhandlerをかけてasyncで渡されたthunk関数fを実行する

promiseが限定継続に渡されると、そのpromiseに対してその後awaitする処理が出てきたりする。そしてthunk関数fの処理が正常に終了すると、本記事の上の方で見たretcの部分によって、そのpromiseに登録されているawait中の限定継続に値が渡されていくわけだ。

Asyncエフェクトを処理する際に「async発生元の処理」と「asyncで走らせる処理」の二つが存在していて、上記のコードでは前者をスケジュールして後者をそのまま実行していくようになっている。逆にしてもいいし、何ならどちらもスケジュールして、スケジューラにコントロールを渡してもいい。どうするのが一般的なのかは気になる・・・。

Yieldエフェクト:

          | Yield -> Some (fun k ->
                  enqueue (D.continue k);
                  dequeue ()
          )

yield ()すると自身(現在走っている処理)を一時停止してスケジューラにコントロールを返すことになる。実際の処理としては、「限定継続を再度実行する」タスクをスケジューラに加えて、スケジューラの次のタスクを実行する、というものになる。

Awaitエフェクト:

          | Await promise' -> Some (fun (k: (b,_) D.continuation) ->
            match !promise' with
            | Done v -> D.continue k v
            | Waiting ks ->
                  promise' := Waiting(k::ks);
                  dequeue ()
          )

あるpromiseに対してawaitする場合、もしpromiseが解決済みならその結果の値をすぐに受け取る。もしまだpromiseに紐づいた処理が実行中ならpromiseが解決するまで停止し(スケジューラにコントロールを受け渡して)promiseが解決した時点で結果を受け取り処理再開。

実際の処理としてはAwaitエフェクトに保持されているpromiseに対してパターンマッチ(make_handlerの引数として渡されたpromiseではないことに注意)。Doneならその値を即座に限定継続に渡して続行。Waitingならその限定継続のリストに自身も加えdequeue ()でスケジューラの次のタスクを実行。

Async、Yield、Await以外のエフェクトはハンドルせずにより上位のハンドラに任せる:

          | _ -> None

最後にスケジューラを実際に走らせるrun関数:

  let run main =
    let handler = make_handler (ref (Waiting [])) in
    D.match_with main () handler
end

runに渡されるthunk関数mainもスケジューラで実行する必要があるので、この関数に紐づく空のpromiseを作成してmake_handlerしている。このコードの性質上、このpromiseはawaitされないのでmain関数が正常に終わったらその結果は何にも使われないのだが、ハンドラをつける都合上promiseが必要になる。

run関数の処理は「空のまま使われない新しいpromiseをmake_handlerに渡して作ったhandlerをかけてmain関数をmatch_withで実行する」というもの。

雑考

モナドや特殊構文に頼ることなくdirect styleでasync/awaitなコードを書く機能を、ライブラリ的に作ることができたというのは非常に面白い。このアプローチだとasync/awaitで起こりがちな「colored function」問題、つまりほぼ同じ処理の通常版とasync/await版をいろいろ用意しないといけない状態にならないのもポイント。async/awaitを使う関数を普通の高階関数に渡しても問題なく処理できる。実際にこのアプローチを精緻化して実用的なコードとして提供しているのがEioライブラリだ。このライブラリは近いうちに試してみたい。

未解決のpromiseの中身がそのpromiseに対してawait中の処理の限定継続の束だというのは面白い。promise内部のデータとしては、そのpromiseに紐づいた処理へのリンクが何らなく、handlerのretcのクロージャにpromiseが捕捉されていることから紐づく、というのは非直感的な気もする(悪いと言っているわけではない)。

ミュータブルなQueueがモジュールレベルで単一になっているのが気になる。Schedulerが複数回(あるいは別のスレッドで同時に)使われる場合同じQueueが利用されるのはあまり好ましくない。Queueの作成をrun関数の中で行い、make_handlerの引数の一つとして渡してしまうようなデザインの方が良さそうに思う。

元の演習コードだとrun関数の内部関数としてforkという再帰関数が出てくる(上にhandlerがこの関数の中で定義される)のだが、個人的にはこのforkは消してmatch_withを剥き出しにし、handlerの再帰性とpromiseへの依存を明示した方がわかりやすいように感じたのでリファクタしてある。