Upgrade to Pro — share decks privately, control downloads, hide ads and more …

waitany と waitall を作った話

waitany と waitall を作った話

Julia で複数タスクを同時に待つための関数を実装した話。そして、今後の展開について。

Kenta Murata

August 31, 2024
Tweet

More Decks by Kenta Murata

Other Decks in Technology

Transcript

  1. Self introduction 名前 : 村田 賢太 所属 : 株式会社サイカ 言語

    : Julia, Ruby, Python, ほか多数 コミュニティ : JuliaLangJa, JuliaTokyo, ruby-jp, vim-jp GitHub: @mrkn Twitter: @KentaMurata 趣味 : カメラいじり、レンズいじり 2024.08.31 JuliaTokyo #12 3
  2. Julia のタスク タスクは Julia における非同期処理の基本 t = Task(func) とか @task

    マクロなどを使って作る t = @task begin # タスク内の処理をここにかく:w end 作ったタスクは schedule(t) で実行キューに追加される Juliaにおけるタスク終了の待ち方 2024.08.31 JuliaTokyo #12 5
  3. Julia におけるタスクの終了待ち タスク t の終了を待つ関数 : wait(t) タスクの終了は 2種類 :

    正常終了と異常終了 t が終了しているとき istaskdone(t) は true t が異常終了しているとき istaskfailed(t) も true タスクの結果取得 : fetch(t) 正常終了では fetch(t) でタスクの結果を取得できる 異常終了では fetch(t) は TaskFailedException を投げる t.exception でタスク内で発生したエラーを取得できる Juliaにおけるタスク終了の待ち方 2024.08.31 JuliaTokyo #12 6
  4. 複数タスクを待つ機能が必要な場合の例 (streamlit より引用 ) while not async_objs.must_stop.is_set(): if self._state ==

    RuntimeState.NO_SESSIONS_CONNECTED: # type: ignore[comparison-overlap] # mypy 1.4 incorrectly thinks this if-clause is unreachable, # because it thinks self._state must be INITIAL | ONE_OR_MORE_SESSIONS_CONNECTED. # Wait for new websocket connections (new sessions): _, pending_tasks = await asyncio.wait( # type: ignore[unreachable] ( asyncio.create_task(async_objs.must_stop.wait()), asyncio.create_task(async_objs.has_connection.wait()), ), return_when=asyncio.FIRST_COMPLETED, ) # Clean up pending tasks to avoid memory leaks for task in pending_tasks: task.cancel() else: # ... セッションが1つ以上ある場合の処理など ... # ... 次のページに続く ... Juliaにおけるタスク終了の待ち方 2024.08.31 JuliaTokyo #12 9
  5. 複数タスクを待つ機能が必要な場合の例のつづき (streamlit より引用 ) # ... 前のページの while の中の続き ...

    # Wait for new proto messages that need to be sent out: _, pending_tasks = await asyncio.wait( ( asyncio.create_task(async_objs.must_stop.wait()), asyncio.create_task(async_objs.need_send_data.wait()), ), return_when=asyncio.FIRST_COMPLETED, ) # We need to cancel the pending tasks (the `must_stop` one in most situations). # Otherwise, this would stack up one waiting task per loop # (e.g. per forward message). These tasks cannot be garbage collected # causing an increase in memory (-> memory leak). for task in pending_tasks: task.cancel() Juliaにおけるタスク終了の待ち方 2024.08.31 JuliaTokyo #12 10
  6. streamlit による複数のタスクを待つ機能の使い方 3つの非同期イベント must_stop : サーバを停止する必要があることを表すイベント has_connection : クライアントからのコネクションが来たことを表すイベント need_send_data

    : クライアントへ送るデータが発生したことを表すイベント 一つ目の asyncio.wait : 一つもセッションがない場合の処理 must_stop と has_connection のうち 1つ以上がセットされるまで待つ つまり、サーバを停止する必要があるか、クライアントからコネクションが来るまで待つ Juliaにおけるタスク終了の待ち方 2024.08.31 JuliaTokyo #12 11
  7. streamlit による複数のタスクを待つ機能の使い方 3つの非同期イベント must_stop : サーバを停止する必要があることを表すイベント has_connection : クライアントからのコネクションが来たことを表すイベント need_send_data

    : クライアントへ送るデータが発生したことを表すイベント 二つ目の asyncio.wait : メインループの最後に毎回実行する処理 must_stop と need_send_data のうち 1つ以上がセットされるまで待つ つまり、サーバを停止する必要があるか、クライアントに送るべきデータが発生するまで 待つ Juliaにおけるタスク終了の待ち方 2024.08.31 JuliaTokyo #12 12
  8. 複数タスクの待ち方は 3種類ある 3種類の待ち方 i. 指定したタスクのうち少なくとも 1つが終了するまで待つ ii. 指定した全てのタスクが終了するまで待つ iii. 指定した全てのタスクが終了するまで待つが、一つ以上のタスクが異常終了した場合はす

    ぐに待機を終える これらに加えて、 「すべてのタスクを待つが、例外が発生した場合は待機をすぐに終了する」と いうパターンも考えられる Pythonの asyncio.wait は 3つすべてに対応している Juliaにおけるタスク終了の待ち方 2024.08.31 JuliaTokyo #12 13
  9. waitany と waitall Julia で複数のタスクを待つための関数が必要になったので自分で作った 「少なくとも一つが終了するまで待つ」関数が waitany 「全てが終了するまで待つ」関数が waitall 機能的に

    Julia の組み込み機能として提供されているべきだと考えたので Pull Request を送っ たらマージしてもらえた Julia v1.12 から使えるようになる予定 waitany と waitall 2024.08.31 JuliaTokyo #12 15
  10. waitany の使い方 1. 基本形 waitany(tasks) -> (done_tasks, remaining_tasks) 少なくとも一つのタスクが終了するまで、現在のタスクの実行をブロックする 待機中のタスクが異常終了したら

    TaskFailedException を発生させる 戻り値は終了したタスクの配列と、まだ終了していないタスクの配列のタプル 2. throw キーワード引数で TaskFailedException を発生させるかどうか指定する waitany(tasks; throw=true) # 基本形と同じ waitany(tasks; throw=false) # 例外を発生しない waitany と waitall 2024.08.31 JuliaTokyo #12 16
  11. waitall の使い方 1. 基本形 waitall(tasks) -> (done_tasks, remainig_tasks) すべてのタスクが終了するまで待つ (戻り値は

    waitany と同じ ) 待機中のタスクが異常終了したらすぐに TaskFailedException を発生させる 2. 異常終了したタスクも含めてすべて終了するまで待つ waitall(tasks; failfast=false) 3. TaskFailedException を発生させない waitall(tasks; throw=false) waitany と waitall 2024.08.31 JuliaTokyo #12 17
  12. 引数 tasks の型 引数 tasks の型は特定していない Task が出てくるイテレータとして使えるものならなんでも使える AbstractVector{Task} だけ特別扱いすると配列のコピーを減らせるので修正したい

    可能なら「 Task が出てくるイテレータになるコンテナ型」を指定したいが、いまの Julia では そういう型指定はできない waitany と waitall 2024.08.31 JuliaTokyo #12 18
  13. なぜ waitany と waitall を分けたか? 「少なくとも 1つが終了するまで待つ」待ち方と「全てが終了するまで待つ」待ち方を引数で切 り替えるより、別の関数にする方が良いと考えたので分た そう考えた理由 関数名が違う方が可読性が高い

    (気がする、少なくとも私にとっては良い ) waitmultiple より名前が短くて良い wait のメソッドを増やすのは wait(::Any) を定義することになってしまう waitany と waitall 2024.08.31 JuliaTokyo #12 19
  14. いま取り組んでいること waitany と waitall を Task 以外の waitable types に対応させること

    Task 以外の waitable types とは Channel Event IO etc. まずは Channel にチャレンジしている 今後の展開 2024.08.31 JuliaTokyo #12 21
  15. Task 以外の waitable types に対応させるやり方 次の手順でどんどん一般化していきたい 1. Channel のみのコレクションに対応させる 2.

    IO のみのコレクションに対応させる 3. Task 、 Channel 、 IO が混在しているコレクションに対応させる 4. Channel と IO に対応できたら select 関数を作りたい 5. 他の waitable types に対応していく 今後の展開 2024.08.31 JuliaTokyo #12 22
  16. 第一引数の扱い方 現在の waitany は第一引数の型を特定せず、 iterate で出てくるものが Task であることを 決め打ちしている waitany(tasks;

    throw=true) = _wait_multiple(tasks, throw) function _wait_multiple(waiting_tasks, throwexc=false, all=false, failfast=false) tasks = Task[] for t in waiting_tasks t isa Task || error("Expected an iterator of `Task` object") push!(tasks, t) end # ... ここから先はタスクを待つコード ... end どうやって Task 以外の型に拡張するか? 今後の展開 2024.08.31 JuliaTokyo #12 24
  17. Task 以外の型への拡張 waitany(seq; throw::Bool=true) とりあえず、現時点では seq の要素はすべて同じ型であることを要請する eltype(seq) が Any

    以外の型なら、それが waitable type かどうかをチェックして wait_multiple を呼べば良さそう eltype(seq) が Any だったら、先頭要素の型 T が waitable type かどうかをチェックし、 残りの要素の型も T である事をチェックしながら Vector{T} に集めて wait_multiple を 呼べば良さそう 上のやり方は異なる watiable types を混ぜて同時に待ちたい場合には機能しないので、そのよ うな場合に対応させる際には別の方法を検討する必要がある 今後の展開 2024.08.31 JuliaTokyo #12 25
  18. この方針に従って waitany を分解する 異なる型が混在しない場合については、次のやり方で対応できそう。 waitany(seq; throw::Bool=true) = _waitany(eltype(seq), seq, throw)

    _waitany(::Nothing, throw::Bool) = ([], []) _waitany(::Type{T}, seq, throw::Bool) where {T} = waitany([x::T for x in seq]; throw=throw) _waitany(::Type{Any}, seq, throw::Bool) = _waitany(peel(seq)..., throw) _waitany(fst::T, rest::R, throw::Bool) where {T, R} = waitany(collectwaitables(fst, rest); throw=throw) waitany(tasks::AbstractVector{Task}; throw::Bool=true) = _wait_multiple(tasks, throw) function collectwaitables(fst::T, rest) where {T} checkwaitable(fst) # fst の型が waitable type かチェックし、違う場合に例外を出す waitables = [fst] for x in rest x isa T || error("Expected an iterator of `$(T)` object") push!(waitables, x) end return waitables end 今後の展開 2024.08.31 JuliaTokyo #12 26