[PyCon KR 2019] Real-world asyncio

Joongi Kim
August 18, 2019

  1. § CTO @ Lablup Inc. · Backend.AI 주 개발자 §

    https://github.com/achimnol § asyncio와의 인연 · Python 3.3 시절부터 asyncio 써왔음 (표준라이브러리 등재는 3.4) · pyzmq asyncio 성능 개선 이슈 제안 (zeromq/pyzmq#859) · PyCon APAC 2016 aiohttp 스프린트 참여 About Me
  2. § Cancellation 개념 § 나의 발등을 찍었던 믿는 도끼 사례들

    § Structured Concurrency § 그동안 제안된 대안들 § 요약 Contents
  3. § 비동기 I/O(연산 아님!)를 이벤트루프 기반으로 코루틴 형식으로 표현 및

    실행 · 내부적으로는 generator처럼 취급됨 § async def로 코루틴 함수(coroutine function)를 선언 · 동기 코드에서 비동기 코드로 진입 : asyncio.run(f()) · 흐름 분기(=새로운 "micro-thread" 생성) : asyncio.create_task(f()) · 흐름 유지(=함수호출) : await f() · 추가 편의 기능 : async for, async with Asyncio in 1 slide Python 3.7에서 추가된 사용자 친화 API
  4. § Coroutine이 외부 요인에 의해 중단되었음을 알려주는 역할 § 모든

    await 구문에서 발생할 수 있음 · await가 없는 코드는 아무리 길고 오래 걸려도 cancel 불가능 · 당하는 입장에서는 CancelledError 예외로 보이며, 이에 대한 처리를 통해 clean-up 가능 § Task 또는 Future에 대한 참조를 가진 쪽에서 .cancel() 메소드를 부름으로써 발생 Cancellation async def do(): print("step 1") await asyncio.sleep(1) print("step 2") await asyncio.sleep(1) print("step 3") async def not_cancellable(): for i in range(1000000): do_some_calc(i) 이런 작업을 cancel 가능하게 만들고 싶으면 중간중간 await asyncio.sleep(0)을 끼워줘야 한다.
  5. § asyncio에서 비동기 작업의 "취소" 방법 · 다른 작업을 취소하려면

    : future 또는 task 객체의 .cancel() 메소드 호출 – 참고) await를 붙일 수 있는 것은 모두 future 아니면 task이다. – future는 즉시 취소 및 완료 상태가 되지만, task는 clean up 작업을 위해 추가로 비동기 작업을 더 해야 할 수 있으므로, await를 항상 한번 더 걸어주어야 한다. · 스스로를 취소하려면 : raise asyncio.CancelledError – Q. asyncio.current_task().cancel() 과의 차이점?! Cancellation in Action
  6. 1: Swallowed cancellation try: ... except Exception: ... try: ...

    except: ... Considered Harmful! A good practice try: ... except asyncio.CancelledError: ... raise except Exception: ... A must practice
  7. § https://bugs.python.org/issue32528 § Python의 예외 클래스 계층 · BaseException –

    KeyboardInterrupt, SystemExit, GeneratorExit – Exception – RuntimeError, ValueError, TypeError, IOError, ... – 다른 라이브러리에서 정의한 모든 사용자 정의 예외 § CancelledError re-raise 안 해도 되는 경우 : 자신이 Task의 최상위 Coroutine이면서, 명시적으로 resource clean up을 하고자 할 때 (=라이브러리 만드는 중이면 항상 할 것) 1: Swallowed cancellation from Python 3.8! + asyncio.CancelledError
  8. Proper cancellation async def coro(): try: print('working') await asyncio.sleep(1) print('done')

    except asyncio.CancelledError: print('cancelled') raise # bubble up! finally: await asyncio.sleep(1) print('cleaned up') import asyncio async def main(): t = asyncio.create_task(coro()) try: await asyncio.wait_for(t, timeout=0.5) except asyncio.TimeoutError: print('timeout detected') import asyncio async def main(): t = asyncio.create_task(coro()) await asyncio.sleep(0.5) t.cancel() try: await t except asyncio.CancelledError: print('cancellation detected') working cancelled cleaned up cancellation detected working cancelled cleaned up timeout detected 걸린 시간 : 1.5초
  9. § 증상 : 몇 분 정도 실습 진행하면 계속 서버가

    죽음 (무한 서버 재시작) § 원인 : aiopg transaction block의 cancel 처리 버그 + 잦은 browser refresh · aiopg.sa.exc.InvalidRequestError: Cannot release a connection with not finished transaction · 이 오류 자체로 서버가 죽는 것이 아니라, 그 오류가 누적되면서 회수되지 않은 connection으로 인해 DB connection pool을 전부 소모해서 장애 발생 § (임시) 해결책 : aiopg transaction을 모두 asyncio.shield로 감쌈 · https://github.com/lablup/backend.ai-manager/issues/140 2: Not cancellable library
  10. § await로 부를 수 있는 coroutine을 shield로 감싸면 · await를

    실행하는 task (outer)를 취소하더라도 · 감싸진 coroutine (do)은 취소되지 않음 § 주의 사항 · 해당 coroutine은 "orphan" 상태가 되므로 내부의 예외처리를 잘 해주어야 함 · 이벤트 루프가 계속 돌고있어야 그 coroutine이 끝까지 실행을 완료할 수 있음 Shielding from cancellation async def do(): ... async def outer(): try: await asyncio.shield(do()) except asyncio.CancelledError: raise async def main(): t = asyncio.create_task(outer()) await asyncio.sleep(0) # let outer proceed t.cancel() ...
  11. aiojobs async def coro(timeout): await asyncio.sleep(timeout) async def main(): scheduler

    = await aiojobs.create_scheduler() for i in range(100): # spawn jobs await scheduler.spawn(coro(i / 10)) await asyncio.sleep(5.0) # not all scheduled jobs are finished at the moment # wait completion of started jobs & cancel not started jobs await scheduler.close()
  12. aiojobs + aiohttp from aiohttp import web from aiojobs.aiohttp import

    setup, spawn import aiojobs async def handler(request): await spawn(request, coro()) return web.Response() app = web.Application() app.router.add_get('/', handler) setup(app) from aiojobs.aiohttp import atomic @atomic async def handler(request): return web.Response() aiojobs: 일단 request handler가 실행되면, 그 handler의 실행은 끝까지 하도록 보장 (process shutdown 시에도 진행 중이던 handler들이 모두 끝나도록 일정 시간 기다림) coro() 내부 코드는 외부로부터의 cancel을 걱정할 필요가 없어지지만... • 장시간 돌아가는 websocket/streaming? • response 객체가 갑자기 "정상동작"하지 않는다면? aiohttp는 handler 처리 도중이라도 연결이 끊기거나 하면 handler task를 바로 cancel!
  13. § 네트워크 처리 루프를 cancel하였을 때 종료 처리를 위해 사용

    중이던 네트워크 연결을 재사용할 수 없는 경우가 있음 · 예) aioredis를 이용해 Redis 서버의 blocking call을 await하는 상태에서 해당 coroutine을 cancel한 후, 종료 처리 과정에서 Redis 서버를 접근하고자 할 땐 기존 connection을 사용할 수 없고 새 connection을 만들어야 함 · 이런 현상이 발생하는 이유 : aioredis 내부 구현이 cancel이 발생했을 때 protocol 상태를 원래 상태로 되돌리지 못함 (버그라기보다는 Redis 프로토콜 상 단일 connection에서 blocking call 취소가 불가능. 다중 connection인 경우 "CLIENT UNBLOCK" 명령을 사용할 수 있으나 aioredis에서 아직 미지원) 3: Not cancellable library
  14. § Redis pub/sub 권장 구현 · "SUB" 명령은 해당 커넥션을

    더 이상 다른 용도로 못 쓰게 함 (cancel 처리를 통해 "UNSUBSCRIBE"하는 것이 불가능함) · Redis connection pool을 두고 aiohttp request handler에서 매번 SUB 명령을 실행하는 커넥션을 만들면 connection pool이 금방 소진될 수 있음! Match Lifetime of Coro & App
  15. 4: partial functioncoroutine >>> import functools, inspect >>> async def

    do(x): await asyncio.sleep(x) ... >>> inspect.iscoroutinefunction(do) True >>> do2 = functools.partial(do, x=1) >>> inspect.iscoroutinefunction(do2) False >>> import aiotools >>> do2 = aiotools.apartial(do, x=1) >>> inspect.iscoroutinefunction(do2) True async def dispatch_callbacks(): for cb in registered_callbacks: if inspect.iscoroutinefunction(cb): await cb() else: cb() Common pattern: Pitfall: ?!?! Solution: RuntimeWarning: coroutine 'cb' was never awaited 그냥 warning으로 끝나는 게 아니라 불려야 할 코드가 안 불려서 심각한 버그로 이어짐...
  16. § 무엇이 빠졌기에 asyncio를 쓰면서 이런 어려움들이 발생했는가? · 구조적

    프로그래밍 · type 검사 / type 보존 What's missing? sequential goto condition loop function ··· ··· ··· ··· 왜 goto가 나쁘다고 할까? Abstraction을 깨기 때문! 이 구조들의 공통점은 항상 원래 맥락으로 되돌아옴이 강제·보장된다는 것 Diagrams from https://vorpus.org/blog/notes-on-structured-concurrency- or-go-statement-considered-harmful/
  17. § Fire-and-forget 패턴 - 취소를 포함한 예외 처리 주체가 불명확해짐

    · 예) Go 언어의 go 구문, Python asyncio의 create_task() · 그 안에서 예외가 발생하면 누가 잡는가? (loop.set_exception_handler?) Structured Concurrency fire-and-forget structured-concurrency ··· ··· ··· 구조적 프로그래밍 관점에서 보면 goto와 동일
  18. Happy Eyeballs (RFC 8305) failure cancelled cancelled success time connection

    to addr1 connection to addr2 connection to addr3 connection to addr4 250 msec (or TCP SYN retransmit delay)
  19. § aiojobs의 단점 · 여전히 개별 task 내에서 발생하는 예외나

    timeout에 대해서는 물음표 상태 · task들이 만들어낸 nested task들에 대한 일괄 취소 불가 § curio (by David Beazley) · 일괄 취소가 가능한 TaskGroup API를 처음 제안 § trio (by Nathaniel J. Smith) · 맥락 분기는 항상 nursery (=TaskGroup) 통해서만 하도록 강제 · MultiError를 통해 다중 작업의 예외를 모두 보존하여 한번에 받을 수 있음 trio & curio
  20. § trio : cancellation scope (ref: https://www.youtube.com/watch?v=oLkfnc_UMcE) · trio.move_on_after() +

    nursery.cancel_scope.cancel() · trio.open_tcp_stream()의 DNS resolve 용도로 사용 · Twisted로 수백줄 필요한 구현을 단 40줄 정도로 구현 § asyncio : "staggered race" – Python 3.8의 문서화되지 않은 내부 API로 asyncio.staggered.staggered_race() 함수 제공 – loop.create_connection()의 DNS resolve 용도로 사용 – trio 없이 asyncio로 구현 원하는 경우 참고할 만한 구현체 Async Happy Eyeballs
  21. § await asyncio.gather(*aws, return_exceptions=False) -> List[Result | Exception] § await

    asyncio.shield(aw) -> Result § await asyncio.wait_for(aw, timeout) -> Result § await asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED) -> Set[Task | Future], Set[Task | Future] · FIRST_COMPLETED / FIRST_EXCEPTION § asyncio.as_completed(aws, timeout=None) -> Iterable[Future] § asyncio.{Lock, Event, Condition, Semaphore, BoundedSemaphore} Structured Concurrency in asyncio Happy Eyeballs를 이걸로 구현하지 않은 이유 : 1) 각 task 시작 시간을 원하는 간격으로 설정할 수 없고 2) task 실패 시 다음 task를 동적으로 추가할 수 없기 때문
  22. § curio와 trio는 3rd-party event loop 구현이므로 현재 Python 표준

    라이브러리인 asyncio 기반 생태계와는 호환되지 않음 § 당연히, asyncio 생태계에도 nursery 추상화를 도입하고자 하는 움직임이 있음 · aionursery · TaskGroups "nursery" in asyncio
  23. § ...망했습니다. · 이유는 모든 coroutine에 자동으로 붙는 "hook"을 만들

    수가 없어서! · 그리고 모든 low-level coroutine (socket, subprocess, ...)이 명시적으로 cancel 처리를 지원해야 함 aionursery https://www.reddit.com/r/Python/comments/bgnvfo/ do_people_actually_hate_asyncio_or_is_it_just_a/
  24. § Python 3.8에 넣을 예정이었던 API · loop.create_supervisor() · TaskGroups

    API · asyncio.shield() as context manager § 개발 지연으로 바로 표준라이브러리에 반영되지는 못한 상태 TaskGroups https://twitter.com/1st1/status/1028032168327565312
  25. § CancelledError BaseException § New unified "Stream" interface (StreamReader, StreamWriter

    now deprecated) · write() & await write_drain() is now just await write() § UDP implemented for ProactorEventLoop, ProactorEventLoop is now default on Windows. § Happy Eyeballs is implemented for loop.create_connection() Python 3.8
  26. § 모든 것에 시작이 있으면 끝도 있다. § Coroutine을 어떻게

    잘 종료 또는 중단할 것인가? § Structured Concurrency · asyncio가 처음 만들어질 때는 생각하지 못했던 문제들 · 아마도 Python 3.9 ~ 3.10 정도 되면 잘 지원될 것으로 예상 Summary
