# Wait for new proto messages that need to be sent out: _, pending_tasks = await asyncio.wait( ( asyncio.create_task(async_objs.must_stop.wait()), asyncio.create_task(async_objs.need_send_data.wait()), ), return_when=asyncio.FIRST_COMPLETED, ) # We need to cancel the pending tasks (the `must_stop` one in most situations). # Otherwise, this would stack up one waiting task per loop # (e.g. per forward message). These tasks cannot be garbage collected # causing an increase in memory (-> memory leak). for task in pending_tasks: task.cancel() Juliaにおけるタスク終了の待ち方 2024.08.31 JuliaTokyo #12 10
throw=true) = _wait_multiple(tasks, throw) function _wait_multiple(waiting_tasks, throwexc=false, all=false, failfast=false) tasks = Task[] for t in waiting_tasks t isa Task || error("Expected an iterator of `Task` object") push!(tasks, t) end # ... ここから先はタスクを待つコード ... end どうやって Task 以外の型に拡張するか? 今後の展開 2024.08.31 JuliaTokyo #12 24
_waitany(::Nothing, throw::Bool) = ([], []) _waitany(::Type{T}, seq, throw::Bool) where {T} = waitany([x::T for x in seq]; throw=throw) _waitany(::Type{Any}, seq, throw::Bool) = _waitany(peel(seq)..., throw) _waitany(fst::T, rest::R, throw::Bool) where {T, R} = waitany(collectwaitables(fst, rest); throw=throw) waitany(tasks::AbstractVector{Task}; throw::Bool=true) = _wait_multiple(tasks, throw) function collectwaitables(fst::T, rest) where {T} checkwaitable(fst) # fst の型が waitable type かチェックし、違う場合に例外を出す waitables = [fst] for x in rest x isa T || error("Expected an iterator of `$(T)` object") push!(waitables, x) end return waitables end 今後の展開 2024.08.31 JuliaTokyo #12 26