Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Asyncio Evolved: Enhanced Exception Handling wi...

Junya Fukuda
September 04, 2023

Asyncio Evolved: Enhanced Exception Handling with TaskGroup in Python 3.11(PyConTW 2023)

In Python 3.11, asyncio.TaskGroup() and asyncio timeout context managers were added. Additionally, get_event_loop() was deprecated in 3.10, and loop objects were further obscured, making the already less intimidating asyncio even less scary.

I will discuss the basic usage of asyncio in Python 3.11, which has become simpler and more flexible, focusing on exception and cancellation handling.

I hope that this will be helpful for those who choose asyncio when writing web applications using ASGI frameworks or Python scripts where asynchronous I/O is useful.

Session: https://tw.pycon.org/2023/en-us/conference/talk/280
Twitter: https://twitter.com/JunyaFff

Junya Fukuda

September 04, 2023
Tweet

More Decks by Junya Fukuda

Other Decks in Technology

Transcript

  1. Who am I? 👤 •Junya Fukudaʢ@JunyaFffʣcalled “Jun” not “Junior” •Photos

    📷 Share 🐦 👍 •Software Engineer at Groove X, inc. Here is today's code ෱ా൏໵
  2. Who am I? 👤 •book •Python Practical Recipes Here is

    today's code •Junya Fukudaʢ@JunyaFffʣcalled “Jun” not “Junior” •Photos 📷 Share 🐦 👍 •Software Engineer at Groove X, inc. ෱ా൏໵
  3. Who am I? 👤 •book •Python Practical Recipes •Expert Python

    Programming - Fourth Edition •Written by Tarek Ziadé, Michał Jaworski Here is today's code •Junya Fukudaʢ@JunyaFffʣcalled “Jun” not “Junior” •Photos 📷 Share 🐦 👍 •Software Engineer at Groove X, inc. ෱ా൏໵
  4. Who am I? 👤 •book •Python Practical Recipes •Expert Python

    Programming - Fourth Edition •Written by Tarek Ziadé, Michał Jaworski Here is today's code •Junya Fukudaʢ@JunyaFffʣcalled “Jun” not “Junior” •Photos 📷 Share 🐦 👍 •Software Engineer at Groove X, inc. ෱ా൏໵
  5. Today's Goal •Focus on exception handling and cancellation. •Introducing the

    new API asyncio.TaskGroup. •Writing asyncio code in a "reliable" and "safe" way •The new standard "Hello-ish world"
  6. async def main(): print(f"{time.ctime()} Hello!") await asyncio.sleep(1.0) print(f"{time.ctime()} GoodBye!") loop

    = asyncio.get_event_loop() task = loop.create_task(main()) loop.run_until_complete(task) pending = asyncio.all_tasks(loop=loop) for task in pending: task.cancel() group = asyncio.gather(*pending, return_exceptions=True) loop.run_until_complete(group) loop.close()
  7. Motivation (why are we talking about this?) •I would like

    to revamp this code with asyncio.TaskGroup
  8. Scope of Today's Talk •Talk. •Exception handling in asyncio (also

    new except* syntax) •Cancel Task •asyncio's New Hello World •Don't talk. •Speci fi c examples in the application •What's happening at ASGI Here is today's code
  9. Overview of asyncio •asyncio: Python's async library. •Continues to evolve.

    •Handling multiple tasks concurrently. •Useful for I/O-bound tasks like database or HTTP requests.
  10. Why Not Using Asyncio? Reasons to Consider •Synchronous processing, which

    we're used to, is suf fi cient. •Primarily CPU-bound tasks: •Asynchronous processing seems like it might behave unpredictably. •We might choose to write in another language Mainly focusing on machine learning and data analysis with minimal IO (such as Go, JavaScript (nodejs/deno..), Rust, C#, Swift, etc.)
  11. Why Not Using Asyncio? Reasons to Consider •Synchronous processing, which

    we're used to, is suf fi cient. •Primarily CPU-bound tasks: •Asynchronous processing seems like it might behave unpredictably. •We might choose to write in another language Mainly focusing on machine learning and data analysis with minimal IO (such as Go, JavaScript (nodejs/deno..), Rust, C#, Swift, etc.)
  12. Why Not Using Asyncio? Reasons to Consider •Synchronous processing, which

    we're used to, is suf fi cient. •Primarily CPU-bound tasks: •Asynchronous processing seems like it might behave unpredictably. •We might choose to write in another language Mainly focusing on machine learning and data analysis with minimal IO (such as Go, JavaScript (nodejs/deno..), Rust, C#, Swift, etc.)
  13. Why Not Using Asyncio? Reasons to Consider •Synchronous processing, which

    we're used to, is suf fi cient. •Primarily CPU-bound tasks: •Asynchronous processing seems like it might behave unpredictably. •We might choose to write in another language Mainly focusing on machine learning and data analysis with minimal IO (such as Go, JavaScript (nodejs/deno..), Rust, C#, Swift, etc.) Ultimately, it depends on the requirements. If it's for the web, it's mostly about I/O. It might be a good idea to create it with asyncio.
  14. Why Not Using Asyncio? Reasons to Consider •Synchronous processing, which

    we're used to, is suf fi cient. •Primarily CPU-bound tasks: •Asynchronous processing seems like it might behave unpredictably. •We might choose to write in another language Mainly focusing on machine learning and data analysis with minimal IO (such as Go, JavaScript (nodejs/deno..), Rust, C#, Swift, etc.)
  15. Have you ever mentioned or been asked this during a

    review? "What happens if an exception occurs in this process?" ʢ´-`ʣ.ŇoO
  16. Are you using asyncio? •Asynchronous processing seems to have unpredictable

    behavior. •that's true •Seems like it might behave unpredictably
  17. Are you using asyncio? •Asynchronous processing seems to have unpredictable

    behavior. •that's true •Seems like it might behave unpredictably •When an exception occurs, it's unclear what happens •“What happens to this concurrently running task if an exception occursʁ” •During an exception, what on earth happens to the other tasks...
  18. Are you using asyncio? •Asynchronous processing seems to have unpredictable

    behavior. •that's true •Seems like it might behave unpredictably •When an exception occurs, it's unclear what happens •“What happens to this concurrently running task if an exception occursʁ” •During an exception, what on earth happens to the other tasks... •A new feature that solves these issues
  19. •Let's start with the of fi cial documentation •The basic

    feature of asyncio.TaskGroup •multiple tasks are handled concurrently. •Similar functionality was available even before Python 3.10. •asyncio.gather() and asyncio.wait() asycio.TaskGroup
  20. asycio.TaskGroup import asyncio async def main(): async with asyncio.TaskGroup() as

    tg: task1 = tg.create_task(some_coro(...)) task2 = tg.create_task(another_coro(...)) print("Both tasks have completed now.")
  21. •Handle multiple tasks at once Capturing concurrent exceptions •Naturally, exceptions

    may occur at the same time •Let's see how to capture it in Python 3.10 or earlier •Using the high-level API asyncio.gather() added in Python 3.7 •Options for exceptions to asyncio.gather()ʮreturn_exceptionsʯ •return_exceptions False •return_exceptions True •Simultaneous execution of speci fi ed tasks (asynchronous functions)
  22. Capturing concurrent exceptions - Python 3.10 or earlier •Options for

    exceptions to asyncio.gather() ”return_exceptions” •return_exceptions False •return_exceptions True
  23. async def main(): “""return_exceptions False""" try: results = await asyncio.gather(

    coro_success(), coro_value_err(), coro_type_err() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}") asyncio.run(main()) asyncio.gather - return_exceptions False Capturing Concurrent Exceptions
  24. async def main(): “""return_exceptions False""" try: results = await asyncio.gather(

    coro_success(), coro_value_err(), coro_type_err() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}") asyncio.run(main()) asyncio.gather - return_exceptions False Capturing Concurrent Exceptions
  25. async def main(): “""return_exceptions False""" try: results = await asyncio.gather(

    coro_success(), coro_value_err(), coro_type_err() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}") asyncio.run(main()) asyncio.gather - return_exceptions False Capturing Concurrent Exceptions
  26. $ python310 1_asyncio_gather_except.py async def main(): “""return_exceptions False""" try: results

    = await asyncio.gather( coro_success(), coro_value_err(), coro_type_err() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}") asyncio.run(main()) asyncio.gather - return_exceptions False Capturing Concurrent Exceptions
  27. $ python310 1_asyncio_gather_except.py err=ValueError() $ python310 async def main(): “""return_exceptions

    False""" try: results = await asyncio.gather( coro_success(), coro_value_err(), coro_type_err() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}") asyncio.run(main()) asyncio.gather - return_exceptions False Capturing Concurrent Exceptions
  28. $ python310 1_asyncio_gather_except.py err=ValueError() $ python310 async def main(): “""return_exceptions

    False""" try: results = await asyncio.gather( coro_success(), coro_value_err(), coro_type_err() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}") asyncio.run(main()) asyncio.gather - return_exceptions False Capturing Concurrent Exceptions ValueError only TypeError cannot be caught
  29. $ python310 1_asyncio_gather_except.py err=ValueError() $ python310 ValueError only TypeError cannot

    be caught Only the fi rst exception is captured The rest are suppressed. async def main(): “""return_exceptions False""" try: results = await asyncio.gather( coro_success(), coro_value_err(), coro_type_err() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}") asyncio.run(main()) asyncio.gather - return_exceptions False Capturing Concurrent Exceptions
  30. Capturing concurrent exceptions - Python 3.10 or earlier •Options for

    exceptions to asyncio.gather() ”return_exceptions” •return_exceptions False •return_exceptions True
  31. Capturing concurrent exceptions - Python 3.10 or earlier •Options for

    exceptions to asyncio.gather() ”return_exceptions” •return_exceptions False •return_exceptions True return_exceptions If speci fi ed Get a list of results after all tasks have been executed Need to check the return list.
  32. async def main(): """return_exceptions=True""" results = await asyncio.gather( coro_success(), coro_value_err(),

    coro_type_err(), return_exceptions=True) print(f"{results=}") asyncio.gather - return_exceptions True Capturing Concurrent Exceptions
  33. async def main(): """return_exceptions=True""" results = await asyncio.gather( coro_success(), coro_value_err(),

    coro_type_err(), return_exceptions=True) print(f"{results=}") asyncio.gather - return_exceptions True Capturing Concurrent Exceptions
  34. async def main(): """return_exceptions=True""" results = await asyncio.gather( coro_success(), coro_value_err(),

    coro_type_err(), return_exceptions=True) print(f"{results=}") asyncio.gather - return_exceptions True Capturing Concurrent Exceptions
  35. $ python310 2_asyncio_gather_except_true.py async def main(): """return_exceptions=True""" results = await

    asyncio.gather( coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True) print(f"{results=}") asyncio.gather - return_exceptions True Capturing Concurrent Exceptions
  36. $ python310 2_asyncio_gather_except_true.py async def main(): """return_exceptions=True""" results = await

    asyncio.gather( coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True) print(f"{results=}") $ python310 asyncio.gather - return_exceptions True Capturing Concurrent Exceptions results=['success', ValueError(), TypeError()]
  37. $ python310 2_asyncio_gather_except_true.py async def main(): """return_exceptions=True""" results = await

    asyncio.gather( coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True) print(f"{results=}") results=['success', ValueError(), TypeError()] $ python310 Get all results in a list Need to see what exceptions were raised asyncio.gather - return_exceptions True Capturing Concurrent Exceptions
  38. for result in results: match result: case ValueError(): print("ValueError") case

    TypeError(): print("TypeError") async def main(): """return_exceptions=True""" results = await asyncio.gather( coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True) print(f"{results=}") asyncio.gather - return_exceptions True Capturing Concurrent Exceptions
  39. for result in results: match result: case ValueError(): print("ValueError") case

    TypeError(): print("TypeError") async def main(): """return_exceptions=True""" results = await asyncio.gather( coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True) print(f"{results=}") asyncio.gather - return_exceptions True Capturing Concurrent Exceptions
  40. $ python310 2_asyncio_gather_except_true.py async def main(): """return_exceptions=True""" results = await

    asyncio.gather( coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True) print(f"{results=}") for result in results: match result: case ValueError(): print("ValueError") case TypeError(): print("TypeError") asyncio.gather - return_exceptions True Capturing Concurrent Exceptions
  41. $ python310 2_asyncio_gather_except_true.py async def main(): """return_exceptions=True""" results = await

    asyncio.gather( coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True) print(f"{results=}") results=[‘DONE', ValueError(), TypeError()] ValueError() for result in results: match result: case ValueError(): print("ValueError") case TypeError(): print("TypeError") TypeError() asyncio.gather - return_exceptions True Capturing Concurrent Exceptions
  42. asyncio.gather - return_exceptions True $ python310 2_asyncio_gather_except_true.py async def main():

    """return_exceptions=True""" results = await asyncio.gather( coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True) print(f"{results=}") results=[‘DONE', ValueError(), TypeError()] ValueError() for result in results: match result: case ValueError(): print("ValueError") case TypeError(): print("TypeError") TypeError() Need to check results All tasks need to be performed Capturing Concurrent Exceptions
  43. Capturing Concurrent Exceptions Python 3.10 and earlier issues •Unable to

    capture except for the fi rst exception. •Cannot log all exceptions that occur •return_exceptions False
  44. Capturing Concurrent Exceptions Python 3.10 and earlier issues •Unable to

    capture except for the fi rst exception. •Cannot log all exceptions that occur •Need to check the return list. •All tasks must be performed •return_exceptions False •return_exceptions True
  45. Capturing Concurrent Exceptions Python 3.10 and earlier issues The solution

    is asyncio.TaskGroup and the new "except*" syntax
  46. async def main(): async with asyncio.TaskGroup() as tg: task1 =

    tg.create_task(coro_success()) task2 = tg.create_task(coro_value_err()) task3 = tg.create_task(coro_type_err()) results = [task1.result(), task2.result(), task3.result()] Capturing Exceptions in asycio.TaskGroup
  47. try: async def main(): async with asyncio.TaskGroup() as tg: task1

    = tg.create_task(coro_success()) task2 = tg.create_task(coro_value_err()) task3 = tg.create_task(coro_type_err()) results = [task1.result(), task2.result(), task3.result()] Capturing Exceptions in asycio.TaskGroup
  48. async def main(): async with asyncio.TaskGroup() as tg: task1 =

    tg.create_task(coro_success()) task2 = tg.create_task(coro_value_err()) task3 = tg.create_task(coro_type_err()) results = [task1.result(), task2.result(), task3.result()] except* ValueError as err: print(f"{err=}") try: Capturing Exceptions in asycio.TaskGroup
  49. try: async def main(): async with asyncio.TaskGroup() as tg: task1

    = tg.create_task(coro_success()) task2 = tg.create_task(coro_value_err()) task3 = tg.create_task(coro_type_err()) results = [task1.result(), task2.result(), task3.result()] except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f"{err=}") Capturing Exceptions in asycio.TaskGroup
  50. try: async def main(): async with asyncio.TaskGroup() as tg: task1

    = tg.create_task(coro_success()) task2 = tg.create_task(coro_value_err()) task3 = tg.create_task(coro_type_err()) results = [task1.result(), task2.result(), task3.result()] except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f"{err=}") Using except* Capturing Exceptions in asycio.TaskGroup
  51. try: async def main(): async with asyncio.TaskGroup() as tg: task1

    = tg.create_task(coro_success()) task2 = tg.create_task(coro_value_err()) task3 = tg.create_task(coro_type_err()) results = [task1.result(), task2.result(), task3.result()] except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f"{err=}") Capturing Exceptions in asycio.TaskGroup
  52. try: async def main(): async with asyncio.TaskGroup() as tg: task1

    = tg.create_task(coro_success()) task2 = tg.create_task(coro_value_err()) task3 = tg.create_task(coro_type_err()) results = [task1.result(), task2.result(), task3.result()] except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f"{err=}") $ python311 3_asyncio_taskgroup_except.py Capturing Exceptions in asycio.TaskGroup
  53. try: async def main(): async with asyncio.TaskGroup() as tg: task1

    = tg.create_task(coro_success()) task2 = tg.create_task(coro_value_err()) task3 = tg.create_task(coro_type_err()) results = [task1.result(), task2.result(), task3.result()] except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f"{err=}") $ python311 3_asyncio_taskgroup_except.py err=ExceptionGroup('unhandled errors in a TaskGroup', [ValueError()]) err=ExceptionGroup('unhandled errors in a TaskGroup', [TypeError()]) $ python311 Capturing Exceptions in asycio.TaskGroup
  54. try: async def main(): async with asyncio.TaskGroup() as tg: task1

    = tg.create_task(coro_success()) task2 = tg.create_task(coro_value_err()) task3 = tg.create_task(coro_type_err()) results = [task1.result(), task2.result(), task3.result()] except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f"{err=}") $ python311 3_asyncio_taskgroup_except.py err=ExceptionGroup('unhandled errors in a TaskGroup', [ValueError()]) err=ExceptionGroup('unhandled errors in a TaskGroup', [TypeError()]) $ python311 Two Exception are captured. Capturing Exceptions in asycio.TaskGroup
  55. try: async def main(): async with asyncio.TaskGroup() as tg: task1

    = tg.create_task(coro_success()) task2 = tg.create_task(coro_value_err()) task3 = tg.create_task(coro_type_err()) results = [task1.result(), task2.result(), task3.result()] except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f"{err=}") Capturing Exceptions in asycio.TaskGroup
  56. except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f"{err=}")

    except* except* Capturing Exceptions in asycio.TaskGroup
  57. Capturing Exceptions in asycio.TaskGroup except* ValueError as err: print(f"{err=}") except*

    TypeError as err: print(f"{err=}") except* except* err=ExceptionGroup('unhandled errors in a TaskGroup', [ValueError()]) err=ExceptionGroup('unhandled errors in a TaskGroup', [TypeError()])
  58. new syntax except* except* •PEP654 Motivation 5 cases •Concurrent errors

    Born for what? •There is no good way to handle exceptions sent by multiple tasks. •asyncio.gather()’s issues
  59. asycio.TaskGroup IUUQTHJUIVCDPNQZUIPODQZUIPOCMPCNBJO-JCBTZODJPUBTLHSPVQTQZ class TaskGroup: ... async def __aexit__(self, et, exc,

    tb): ... if self._errors: # Exceptions are heavy objects that can have object # cycles (bad for GC); let's not keep a reference to # a bunch of them. errors = self._errors self._errors = None me = BaseExceptionGroup('unhandled errors in a TaskGroup', errors) raise me from None ExceptionGroup
  60. class TaskGroup: ... async def __aexit__(self, et, exc, tb): ...

    if self._errors: # Exceptions are heavy objects that can have object # cycles (bad for GC); let's not keep a reference to # a bunch of them. errors = self._errors self._errors = None me = BaseExceptionGroup('unhandled errors in a TaskGroup', errors) raise me from None IUUQTHJUIVCDPNQZUIPODQZUIPOCMPCNBJO-JCBTZODJPUBTLHSPVQTQZ asycio.TaskGroup ExceptionGroup
  61. IUUQTHJUIVCDPNQZUIPODQZUIPOCMPCNBJO-JCBTZODJPUBTLHSPVQTQZ class TaskGroup: ... async def __aexit__(self, et, exc, tb):

    ... if self._errors: # Exceptions are heavy objects that can have object # cycles (bad for GC); let's not keep a reference to # a bunch of them. errors = self._errors self._errors = None me = BaseExceptionGroup('unhandled errors in a TaskGroup', errors) raise me from None asycio.TaskGroup ExceptionGroup
  62. •Exception Groups and except: Irit Katriel at PyCon UK •https://www.youtube.com/watch?v=uARIj9eAZcQ

    •The 2021 Python Language Summit: PEP 654 — Exception Groups and except* •https://pyfound.blogspot.com/2021/05/the-2021-python-language-summit-pep-654.html • How Exception Groups Will Improve Error Handling in AsyncIO - Łukasz Langa | Power IT Conference •https://www.youtube.com/watch?v=Lfe2zsGS0Js reference
  63. Cancel task when exception occurs •Concurrent task behavior during exceptionsʁ

    •Tasks may continue running. •Example: asyncio.gather()
  64. Cases where tasks remain - asyncio.gather - return_exceptions false async

    def main(): """return_exceptions False""" try: results = await asyncio.gather( coro_success(), coro_value_err(), coro_long() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f"{err=}") print("DONE") asyncio.run(main())
  65. Cases where tasks remain - asyncio.gather - return_exceptions false async

    def main(): """return_exceptions False""" try: results = await asyncio.gather( coro_success(), coro_value_err(), coro_long() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f"{err=}") print("DONE") asyncio.run(main())
  66. async def main(): """return_exceptions False""" try: results = await asyncio.gather(

    coro_success(), coro_value_err(), coro_long() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f"{err=}") print("DONE") asyncio.run(main()) Cases where tasks remain - asyncio.gather - return_exceptions false
  67. Cases where tasks remain - asyncio.gather - return_exceptions false async

    def main(): """return_exceptions False""" try: results = await asyncio.gather( coro_success(), coro_value_err(), coro_long() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f"{err=}") print("DONE") asyncio.run(main())
  68. $ python310 4_remaining_tasks.py Cases where tasks remain - asyncio.gather -

    return_exceptions false async def main(): """return_exceptions False""" try: results = await asyncio.gather( coro_success(), coro_value_err(), coro_long() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f"{err=}") print("DONE") asyncio.run(main())
  69. $ python310 4_remaining_tasks.py Cases where tasks remain - asyncio.gather -

    return_exceptions false async def main(): """return_exceptions False""" try: results = await asyncio.gather( coro_success(), coro_value_err(), coro_long() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f"{err=}") print("DONE") asyncio.run(main()) err=ValueError() DONE
  70. $ python310 4_remaining_tasks.py err=ValueError() DONE coro_long: Task is still running.

    Cases where tasks remain - asyncio.gather - return_exceptions false async def main(): """return_exceptions False""" try: results = await asyncio.gather( coro_success(), coro_value_err(), coro_long() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f"{err=}") print("DONE") asyncio.run(main())
  71. $ python310 4_remaining_tasks.py err=ValueError() DONE coro_long: Task is still running.

    If an exception occurs Other tasks remain running Hard to predict behavior Cases where tasks remain - asyncio.gather - return_exceptions false async def main(): """return_exceptions False""" try: results = await asyncio.gather( coro_success(), coro_value_err(), coro_long() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f"{err=}") print("DONE") asyncio.run(main())
  72. $ python310 4_remaining_tasks.py err=ValueError() DONE coro_long: Task is still running.

    I want to stop. I want to roll back Cases where tasks remain - asyncio.gather - return_exceptions false If an exception occurs Other tasks remain running Hard to predict behavior async def main(): """return_exceptions False""" try: results = await asyncio.gather( coro_success(), coro_value_err(), coro_long() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f"{err=}") print("DONE") asyncio.run(main())
  73. Cases where tasks remain - asyncio.gather - return_exceptions false $

    python310 4_remaining_tasks.py coro_long: Task is still running. ɾࢭΊ͍ͨ ɾϩʔϧόοΫ͍ͨ͠ ྫ֎͕ൃੜͨ͠৔߹ ଞͷλεΫ͸ಈ͍ͨ·· 
 ಈ࡞Λ༧ଌͮ͠Β͍ I want to stop. I want to roll back If an exception occurs Other tasks remain running Hard to predict behavior async def main(): """return_exceptions False""" try: results = await asyncio.gather( coro_success(), coro_value_err(), coro_long() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f"{err=}") print("DONE") asyncio.run(main()) err=ValueError() DONE
  74. Cancel task when exception occurs •If an exception occurs, what

    happens to the remaining tasks that were running at the same timeʁ •In some cases, the remaining tasks are executed as they are.
  75. Cancel task when exception occurs •In some cases, the remaining

    tasks are executed as they are. •Cancel a task (Cancellations were not added on 3.11) •Cancellation Basics •Cancel when using asyncio.gather() before 3.10 •Let's fi x the process that leaves the task •If an exception occurs, what happens to the remaining tasks that were running at the same timeʁ
  76. Cancel task when exception occurs •Let's fi x the process

    that leaves the task •Asynchronous function (coro_long()) to be canceled •Instructions to check and cancel remaining tasks •Wait for post-cancellation processing to complete. Added post-processing for cancellations •There are three perspectives for modi fi cation
  77. Cancel task when exception occurs •Let's fi x the process

    that leaves the task •Asynchronous function (coro_long()) to be canceled •Instructions to check and cancel remaining tasks •Wait for post-cancellation processing to complete. Added post-processing for cancellations •There are three perspectives for modi fi cation
  78. Cancel task when exception occurs •Let's fi x the process

    that leaves the task •Asynchronous function (coro_long()) to be canceled •Instructions to check and cancel remaining tasks •Wait for post-cancellation processing to complete. Added post-processing for cancellations •There are three perspectives for modi fi cation
  79. Cancel task when exception occurs •Let's fi x the process

    that leaves the task •Asynchronous function (coro_long()) to be canceled •Instructions to check and cancel remaining tasks •Wait for post-cancellation processing to complete. Added post-processing for cancellations •There are three perspectives for modi fi cation
  80. •task.cancel() •Cancel request to a task •CancelledError exception is sent

    to the cancelled task •receive asyncio.CancelledError in an asynchronous function •Resend asyncio.CancelledError Added post-processing for cancellations
  81. •task.cancel() •Cancel request to a task •CancelledError exception is sent

    to the cancelled task •receive asyncio.CancelledError in an asynchronous function •Resend asyncio.CancelledError •I'm a little confused. Let's try to implement it. Added post-processing for cancellations
  82. async def coro_long(): await asyncio.sleep(1) print("coro_long: Task is still running.")

    return "DONE?" except asyncio.CancelledError as err: try:
  83. async def coro_long(): await asyncio.sleep(1) print("coro_long: Task is still running.")

    return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError
  84. Cancel task when exception occurs •Let's fi x the process

    that leaves the task •Asynchronous function (coro_long()) to be canceled •Instructions to check and cancel remaining tasks •Wait for post-cancellation processing to complete. Added post-processing for cancellations •There are three perspectives for modi fi cation
  85. Cancel task when exception occurs •Let's fi x the process

    that leaves the task •Asynchronous function (coro_long()) to be canceled •Instructions to check and cancel remaining tasks •Wait for post-cancellation processing to complete. Added post-processing for cancellations •There are three perspectives for modi fi cation
  86. •Request a task to cancel - Task.cancel() Instructions to check

    and cancel remaining tasks •Determine completion of a task - Task.done() •Wait for post-cancel process to complete - asyncio.sleep() •Let's modify the code.
  87. async def main(): """return_exceptions False""" try: print(f"{results=}") except ValueError as

    err: print(f"{err=}") except TypeError as err: print(f”{err=}”) results = await asyncio.gather( coro_success(), coro_value_err(), coro_long() ) asyncio.run(main()) Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather() print("DONE")
  88. async def main(): """return_exceptions False""" try: results = await asyncio.gather(

    coro_success(), coro_value_err(), coro_long() ) asyncio.run(main()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) print("DONE") Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather()
  89. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) results = await asyncio.gather(*[task1, task2, task3]) asyncio.run(main()) Easy to check task status The process is the same print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather() print("DONE")
  90. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) results = await asyncio.gather(*[task1, task2, task3]) asyncio.run(main()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) for task in [task1, task2, task3]: if task.done() is False: Identifying un fi nished tasks with Task.done() Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather() print("DONE")
  91. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) results = await asyncio.gather(*[task1, task2, task3]) asyncio.run(main()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) for task in [task1, task2, task3]: if task.done() is False: task.cancel() # Cancel unfinished tasks Task.cancel() for un fi nished tasks Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather() print("DONE")
  92. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) results = await asyncio.gather(*[task1, task2, task3]) asyncio.run(main()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) for task in [task1, task2, task3]: if task.done() is False: Waiting for completion asyncio.sleep() await asyncio.sleep(1) Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather() print("DONE") task.cancel() # Cancel unfinished tasks
  93. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) results = await asyncio.gather(*[task1, task2, task3]) asyncio.run(main()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) for task in [task1, task2, task3]: if task.done() is False: After modi fi cation await asyncio.sleep(1) Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather() print("DONE") task.cancel() # Cancel unfinished tasks
  94. async def main(): """return_exceptions False""" try: results = await asyncio.gather(

    coro_success(), coro_value_err(), coro_long() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) print("DONE") asyncio.run(main()) $ python310 4_remaining_tasks.py err=ValueError() DONE coro_long: Task is still running. Cases where tasks remain - asyncio.gather - return_exceptions false Review Execution before modi fi cation
  95. async def main(): """return_exceptions False""" try: results = await asyncio.gather(

    coro_success(), coro_value_err(), coro_long() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) print("DONE") asyncio.run(main()) $ python310 4_remaining_tasks.py coro_long: Task is still running. Cases where tasks remain - asyncio.gather - return_exceptions false Review Execution before modi fi cation Output from remaining a task err=ValueError() DONE
  96. Cases where tasks remain - asyncio.gather - return_exceptions false async

    def main(): """return_exceptions False""" try: results = await asyncio.gather( coro_success(), coro_value_err(), coro_long() ) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) print("DONE") asyncio.run(main()) $ python310 4_remaining_tasks.py coro_long: Task is still running. Review Execution before modi fi cation Output from remaining a task err=ValueError() DONE
  97. async def coro_long(): await asyncio.sleep(1) print("coro_long: Task is still running.")

    return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError
  98. async def coro_long(): await asyncio.sleep(1) print("coro_long: Task is still running.")

    return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError
  99. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) results = await asyncio.gather(*[task1, task2, task3]) asyncio.run(main()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) for task in [task1, task2, task3]: if task.done() is False: task.cancel() # Cancel unfinished tasks print("DONE") await asyncio.sleep(1) Run it async def coro_long(): await asyncio.sleep(1) print("coro_long: Task is still running.") return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather()
  100. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) results = await asyncio.gather(*[task1, task2, task3]) asyncio.run(main()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) for task in [task1, task2, task3]: if task.done() is False: await asyncio.sleep(1) $ python310 5_canceling_task_gather.py async def coro_long(): await asyncio.sleep(1) print("coro_long: Task is still running.") return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather() task.cancel() # Cancel unfinished tasks print("DONE")
  101. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) results = await asyncio.gather(*[task1, task2, task3]) asyncio.run(main()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) for task in [task1, task2, task3]: if task.done() is False: await asyncio.sleep(1) $ python310 5_canceling_task_gather.py err=ValueError() async def coro_long(): await asyncio.sleep(1) print("coro_long: Task is still running.") return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather() task.cancel() # Cancel unfinished tasks print("DONE")
  102. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) results = await asyncio.gather(*[task1, task2, task3]) asyncio.run(main()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) for task in [task1, task2, task3]: if task.done() is False: await asyncio.sleep(1) $ python310 5_canceling_task_gather.py err=ValueError() coro_long: Task was cancelled. async def coro_long(): await asyncio.sleep(1) print("coro_long: Task is still running.") return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather() task.cancel() # Cancel unfinished tasks print("DONE")
  103. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) results = await asyncio.gather(*[task1, task2, task3]) asyncio.run(main()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) for task in [task1, task2, task3]: if task.done() is False: await asyncio.sleep(1) $ python310 5_canceling_task_gather.py err=ValueError() coro_long: Task was cancelled. async def coro_long(): await asyncio.sleep(1) print("coro_long: Task is still running.") return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError DONE Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather() task.cancel() # Cancel unfinished tasks print("DONE")
  104. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) results = await asyncio.gather(*[task1, task2, task3]) asyncio.run(main()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) for task in [task1, task2, task3]: if task.done() is False: await asyncio.sleep(1) If canceled On the asynchronous function side Catch Cancel $ python310 5_canceling_task_gather.py err=ValueError() coro_long: Task was cancelled. async def coro_long(): await asyncio.sleep(1) print("coro_long: Task is still running.") return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather() task.cancel() # Cancel unfinished tasks print("DONE") DONE
  105. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) results = await asyncio.gather(*[task1, task2, task3]) asyncio.run(main()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) for task in [task1, task2, task3]: if task.done() is False: await asyncio.sleep(1) $ python310 5_canceling_task_gather.py err=ValueError() coro_long: Task was cancelled. async def coro_long(): await asyncio.sleep(1) print("coro_long: Task is still running.") return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError basic principle of cancellation Cases where tasks remain → Add instructions to con fi rm and cancel tasks Python 3.10 or earlier - asyncio.gather() task.cancel() # Cancel unfinished tasks print("DONE") DONE If canceled On the asynchronous function side Catch Cancel
  106. Cancel task when exception occurs •Let's fi x the process

    that leaves the task •Asynchronous function (coro_long()) to be canceled •Instructions to check and cancel remaining tasks •Wait for post-cancellation processing to complete. Added post-processing for cancellations •There are three perspectives for modi fi cation
  107. •At the time of exiting the context manager, •Remaining tasks

    are canceled. •Wait for cancellation process to complete. Cancellation process in asycio.TaskGroup
  108. •At the time of exiting the context manager, •Remaining tasks

    are canceled. •Wait for cancellation process to complete. •That's all! •In short, nothing special needs to be done. It will be canceled. Let's refactor the code we just wrote. Cancellation process in asycio.TaskGroup
  109. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) asyncio.run(main()) print(f"{results=}") for task in [task1, task2, task3]: if task.done() is False: await asyncio.sleep(1) Cancellation process in asycio.TaskGroup Let's refactor it. results = await asyncio.gather(*[task1, task2, task3]) except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) print("DONE") task.cancel() # Cancel unfinished tasks
  110. async def main(): """return_exceptions False""" try: task1 = asyncio.create_task(coro_success()) task2

    = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) asyncio.run(main()) for task in [task1, task2, task3]: if task.done() is False: await asyncio.sleep(1) results = await asyncio.gather(*[task1, task2, task3]) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) Cancellation process in asycio.TaskGroup Let's refactor it. print("DONE") task.cancel() # Cancel unfinished tasks
  111. async def main(): """Cancel in TaskGroup""" try: asyncio.run(main()) for task

    in [task1, task2, task3]: if task.done() is False: await asyncio.sleep(1) async with asyncio.TaskGroup() as g: task1 = asyncio.create_task(coro_success()) task2 = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) results = await asyncio.gather(*[task1, task2, task3]) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) Cancellation process in asycio.TaskGroup Let's refactor it. print("DONE") task.cancel() # Cancel unfinished tasks
  112. asyncio.run(main()) for task in [task1, task2, task3]: if task.done() is

    False: await asyncio.sleep(1) async with asyncio.TaskGroup() as g: task1 = asyncio.create_task(coro_success()) task2 = asyncio.create_task(coro_value_err()) task3 = asyncio.create_task(coro_long()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) Cancellation process in asycio.TaskGroup Let's refactor it. print("DONE") task.cancel() # Cancel unfinished tasks async def main(): """Cancel in TaskGroup""" try:
  113. asyncio.run(main()) for task in [task1, task2, task3]: if task.done() is

    False: await asyncio.sleep(1) async with asyncio.TaskGroup() as g: task1 = g.create_task(coro_success()) task2 = g.create_task(coro_value_err()) task3 = g.create_task(coro_long()) print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) Cancellation process in asycio.TaskGroup Let's refactor it. print("DONE") task.cancel() # Cancel unfinished tasks async def main(): """Cancel in TaskGroup""" try:
  114. asyncio.run(main()) for task in [task1, task2, task3]: if task.done() is

    False: await asyncio.sleep(1) async with asyncio.TaskGroup() as g: print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) results = [task1.result(), task2.result(), task3.result()] task1 = g.create_task(coro_success()) task2 = g.create_task(coro_value_err()) task3 = g.create_task(coro_long()) Cancellation process in asycio.TaskGroup Let's refactor it. print("DONE") task.cancel() # Cancel unfinished tasks async def main(): """Cancel in TaskGroup""" try:
  115. asyncio.run(main()) for task in [task1, task2, task3]: if task.done() is

    False: await asyncio.sleep(1) async with asyncio.TaskGroup() as g: print(f"{results=}") except ValueError as err: print(f"{err=}") except TypeError as err: print(f”{err=}”) results = [task1.result(), task2.result(), task3.result()] task1 = g.create_task(coro_success()) task2 = g.create_task(coro_value_err()) task3 = g.create_task(coro_long()) Cancellation process in asycio.TaskGroup Let's refactor it. print("DONE") task.cancel() # Cancel unfinished tasks async def main(): """Cancel in TaskGroup""" try:
  116. asyncio.run(main()) for task in [task1, task2, task3]: if task.done() is

    False: await asyncio.sleep(1) async with asyncio.TaskGroup() as g: print(f"{results=}") except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f”{err=}”) results = [task1.result(), task2.result(), task3.result()] task1 = g.create_task(coro_success()) task2 = g.create_task(coro_value_err()) task3 = g.create_task(coro_long()) Cancellation process in asycio.TaskGroup Let's refactor it. print("DONE") task.cancel() # Cancel unfinished tasks async def main(): """Cancel in TaskGroup""" try:
  117. asyncio.run(main()) for task in [task1, task2, task3]: if task.done() is

    False: await asyncio.sleep(1) async with asyncio.TaskGroup() as g: print(f"{results=}") except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f”{err=}”) results = [task1.result(), task2.result(), task3.result()] task1 = g.create_task(coro_success()) task2 = g.create_task(coro_value_err()) task3 = g.create_task(coro_long()) Cancellation process in asycio.TaskGroup Let's refactor it. print("DONE") task.cancel() # Cancel unfinished tasks async def main(): """Cancel in TaskGroup""" try:
  118. asyncio.run(main()) async with asyncio.TaskGroup() as g: print(f"{results=}") except* ValueError as

    err: print(f"{err=}") except* TypeError as err: print(f”{err=}”) results = [task1.result(), task2.result(), task3.result()] task1 = g.create_task(coro_success()) task2 = g.create_task(coro_value_err()) task3 = g.create_task(coro_long()) Cancellation process in asycio.TaskGroup Let's refactor it. print("DONE") async def main(): """Cancel in TaskGroup""" try:
  119. async def main(): """Cancel in TaskGroup""" try: asyncio.run(main()) async with

    asyncio.TaskGroup() as g: print(f"{results=}") except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f”{err=}”) results = [task1.result(), task2.result(), task3.result()] task1 = g.create_task(coro_success()) task2 = g.create_task(coro_value_err()) task3 = g.create_task(coro_long()) Cancellation process in asycio.TaskGroup Let's refactor it. print("DONE")
  120. Let's run it. async def coro_long(): await asyncio.sleep(1) print("coro_long: Task

    is still running.") return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError asyncio.run(main()) async with asyncio.TaskGroup() as g: print(f"{results=}") except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f”{err=}”) results = [task1.result(), task2.result(), task3.result()] task1 = g.create_task(coro_success()) task2 = g.create_task(coro_value_err()) task3 = g.create_task(coro_long()) $ python311 6_cancelled_task_taskgroup.py Cancellation process in asycio.TaskGroup print("DONE") async def main(): """Cancel in TaskGroup""" try:
  121. Let's run it. async def coro_long(): await asyncio.sleep(1) print("coro_long: Task

    is still running.") return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError asyncio.run(main()) async with asyncio.TaskGroup() as g: print(f"{results=}") except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f”{err=}”) results = [task1.result(), task2.result(), task3.result()] task1 = g.create_task(coro_success()) task2 = g.create_task(coro_value_err()) task3 = g.create_task(coro_long()) $ python311 6_cancelled_task_taskgroup.py coro_long: Task was cancelled. Cancellation process in asycio.TaskGroup print("DONE") async def main(): """Cancel in TaskGroup""" try:
  122. Let's run it. async def coro_long(): await asyncio.sleep(1) print("coro_long: Task

    is still running.") return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError asyncio.run(main()) async with asyncio.TaskGroup() as g: print(f"{results=}") except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f”{err=}”) results = [task1.result(), task2.result(), task3.result()] task1 = g.create_task(coro_success()) task2 = g.create_task(coro_value_err()) task3 = g.create_task(coro_long()) $ python311 6_cancelled_task_taskgroup.py err=ExceptionGroup('unhandled errors in a TaskGroup', [ValueError()]) coro_long: Task was cancelled. Cancellation process in asycio.TaskGroup print("DONE") async def main(): """Cancel in TaskGroup""" try:
  123. Let's run it. async def coro_long(): await asyncio.sleep(1) print("coro_long: Task

    is still running.") return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError asyncio.run(main()) async with asyncio.TaskGroup() as g: print(f"{results=}") except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f”{err=}”) results = [task1.result(), task2.result(), task3.result()] task1 = g.create_task(coro_success()) task2 = g.create_task(coro_value_err()) task3 = g.create_task(coro_long()) $ python311 6_cancelled_task_taskgroup.py err=ExceptionGroup('unhandled errors in a TaskGroup', [ValueError()]) coro_long: Task was cancelled. DONE Cancellation process in asycio.TaskGroup print("DONE") async def main(): """Cancel in TaskGroup""" try:
  124. Capturing Exceptions and Processing after canceling The order of Capturing

    and processing after cancellation is swapped. async def coro_long(): await asyncio.sleep(1) print("coro_long: Task is still running.") return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError asyncio.run(main()) async with asyncio.TaskGroup() as g: print(f"{results=}") except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f”{err=}”) results = [task1.result(), task2.result(), task3.result()] task1 = g.create_task(coro_success()) task2 = g.create_task(coro_value_err()) task3 = g.create_task(coro_long()) $ python311 6_cancelled_task_taskgroup.py err=ExceptionGroup('unhandled errors in a TaskGroup', [ValueError()]) coro_long: Task was cancelled. Cancellation process in asycio.TaskGroup print("DONE") DONE async def main(): """Cancel in TaskGroup""" try:
  125. In TaskGroup complete Predictable behavior async def coro_long(): await asyncio.sleep(1)

    print("coro_long: Task is still running.") return "DONE?" except asyncio.CancelledError as err: try: print("coro_long: Task was cancelled.") raise asyncio.CancelledError asyncio.run(main()) async with asyncio.TaskGroup() as g: print(f"{results=}") except* ValueError as err: print(f"{err=}") except* TypeError as err: print(f”{err=}”) results = [task1.result(), task2.result(), task3.result()] task1 = g.create_task(coro_success()) task2 = g.create_task(coro_value_err()) task3 = g.create_task(coro_long()) Cancellation process in asycio.TaskGroup $ python311 6_cancelled_task_taskgroup.py err=ExceptionGroup('unhandled errors in a TaskGroup', [ValueError()]) coro_long: Task was cancelled. print("DONE") DONE async def main(): """Cancel in TaskGroup""" try:
  126. asycio.TaskGroup Features •Capturing Concurrent Exceptions •Cancel task when exception occurs

    •Simply catch exceptions in a conventional way of writing •Cancel without indication •Except* will not spare any concurrent exceptions. •No longer need to be aware of tasks when executing them
  127. asycio.TaskGroup Features •Capturing Concurrent Exceptions •Cancel task when exception occurs

    •Simply catch exceptions in a conventional way of writing •Cancel without indication •Except* will not spare any concurrent exceptions. •No longer need to be aware of tasks when executing them •Writing asyncio code in a "reliable" and "safe" way •The new standard "Hello-ish world"
  128. import asyncio async def main(): async with asyncio.TaskGroup() as tg:

    task1 = tg.create_task(some_coro(...)) task2 = tg.create_task(another_coro(...)) New “Hello-ish world”
  129. import asyncio async def main(): async with asyncio.TaskGroup() as tg:

    task1 = tg.create_task(some_coro(...)) task2 = tg.create_task(another_coro(...)) try: except* BaseException as e: print(f"{e=}") asyncio.run(main()) New “Hello-ish world”
  130. New “Hello-ish world” import asyncio async def main(): async with

    asyncio.TaskGroup() as tg: task1 = tg.create_task(some_coro(...)) task2 = tg.create_task(another_coro(...)) try: except* BaseException as e: print(f"{e=}") asyncio.run(main()) Python 3.11
  131. •Quickstart - Example 3-2. The “Hello-ish World” of Asyncio import

    asyncio import time async def main(): print(f"{time.ctime()} Hello!") await asyncio.sleep(1.0) print(f"{time.ctime()} GoodBye!") loop = asyncio.get_event_loop() task = loop.create_task(main()) loop.run_until_complete(task) pending = asyncio.all_tasks(loop=loop) for task in pending: task.cancel() group = asyncio.gather(*pending, return_exceptions=True) loop.run_until_complete(group) loop.close()
  132. •Quickstart - Example 3-2. import asyncio import time async def

    main(): print(f"{time.ctime()} Hello!") await asyncio.sleep(1.0) print(f"{time.ctime()} GoodBye!") The “Hello-ish World” of Asyncio Checking task status Waiting for cancellation process loop = asyncio.get_event_loop() task = loop.create_task(main()) loop.run_until_complete(task) pending = asyncio.all_tasks(loop=loop) for task in pending: task.cancel() group = asyncio.gather(*pending, return_exceptions=True) loop.run_until_complete(group) loop.close()
  133. •Quickstart - Example 3-2. import asyncio import time async def

    coro(): print(f"{time.ctime()} Hello!") await asyncio.sleep(1.0) print(f"{time.ctime()} GoodBye!") The “Hello-ish World” of Asyncio Checking task status Waiting for cancellation process async def main(): async with asyncio.TaskGroup() as tg: task = tg.create_task(coro(...)) try: except* BaseException as e: print(f"{e=}") asyncio.run(main())
  134. Summary •Capturing and canceling exceptions in asyncio.TaskGroup •asyncio for external

    I/O •Because of external dependence, exceptions are likely to occur. •Easy to write and read •Let's touch TaskGroup. •Exception/cancellation triggers a review of the existing code.
  135. Summary •Capturing and canceling exceptions in asyncio.TaskGroup •asyncio for external

    I/O •Because of external dependence, exceptions are likely to occur. •Easy to write and read •Let's touch TaskGroup. •Exception/cancellation triggers a review of the existing code. •A more comfortable asyncio life
  136. •Task Status Appendix: Task Status •PENDING •FINISHED •CANCELLED •You can

    check with task._state •The sample code also outputs the state. Please try to run it. Here is the code
  137. Appendix: HTTPX with TaskGroup import asyncio from time import time

    import httpx URL = "https://pokeapi.co/api/v2/pokemon/" async def access_url_poke(client, num: int) -> str: r = await client.get(f"{URL}{num}") pokemon = r.json() # JSONύʔε return pokemon["name"] # ϙέ໊Λൈ͘ async def main_poke(): """httpxͰϙέϞϯ151ඖҾͬ͜ൈ͘""" start = time() async with httpx.AsyncClient() as client: try: async with asyncio.TaskGroup() as tg: tasks = [ tg.create_task(access_url_poke(client, number)) for number in range(1, 151) ] except* BaseException as e: print(f"{e=}") results = [task.result() for task in tasks] print(results) print("time: ", time() - start) asyncio.run(main_poke()) Here is the code
  138. Appendix: New features that are still available! import asyncio async

    def main(): try: async with asyncio.TaskGroup() as tg: task1 = tg.create_task(some_coro(...)) task2 = tg.create_task(another_coro(...)) except* BaseException as e: print(f"{e=}") with asyncio.Runner() as runner: runner.run(main()) 3.11 What's New What is asyncio.Runner! For interactive sessions? run multiple times. Here is the code
  139. Appendix: How to Cancel and Track Future States •It helps

    to think of Futures as being in one of four states: •Waiting •Done, holding a result •Done, holding an exception /FPQZUIPOJD3FBTPOJOHBCPVUBTZODJP4FNBQIPSF IUUQOFPQZUIPOJDCMPHTQPUDPNSFBTPOJOHBCPVUBTZODJPTFNBQIPSFIUNM •Done, but cancelled
  140. reference •Using Asyncio in Python - Oreilly & Associates Inc

    •Python 3.11 Preview: Task and Exception Groups - realpython.com •https://realpython.com/python311-exception-groups/ •New in Python 3.11 (Part 4) PEP 654 Exception Groups and TaskGroups •https://www.python.jp/news/wnpython311/except-grp.html •The 2021 Python Language Summit: PEP 654 — Exception Groups and except* •https://pyfound.blogspot.com/2021/05/the-2021-python-language-summit-pep-654.html • How Exception Groups Will Improve Error Handling in AsyncIO - Łukasz Langa | Power IT Conference •https://www.youtube.com/watch?v=Lfe2zsGS0Js
  141. Who am I? 👤 •Junya Fukudaʢ@JunyaFffʣcalled “Jun” not “Junior” •Photos

    📷 Tweets 🐦 👍 •Software Engineer at Groove X, inc •book •Python Practical Recipes •Expert Python Programming - Fourth Edition •Written by Tarek Ziadé, Michał Jaworski Here is today's code
  142. Groove X is ... •We manufacture and sell a home

    robot called LOVOT •LOVOT is “Love ✖︎ Robot”
  143. Do you have a any Question? Feel free to ask

    questions on X 🐦. (@junya ff f)