Upgrade to Pro — share decks privately, control downloads, hide ads and more …

asyncio - how sausage is made

asyncio - how sausage is made

You can find multiple presentations on how to develop with asyncio. But do you know how underlying abstractions like event loop, coroutines, futures and tasks work under the covers? Let me show you how sausage is made!

Marek Stępniowski

June 26, 2014
Tweet

More Decks by Marek Stępniowski

Other Decks in Programming

Transcript

  1. asyncore asynchat ! github.com/mstepniowski/pygadu # -*- Mode: Python -*- #

    Id: asyncore.py,v 2.51 2000/09/07 # 22:29:26 rushing Exp # Author: Sam Rushing # =============================== # Copyright 1996 by Sam Rushing
  2. • CPU bound vs I/O bound • Avoid waiting for

    I/O • Schedule tasks for later • Solve embarassingly parallel problem • C10k Warmup
  3. asyncFunction1(function (result, error) { asyncFunction2(function (result, error) { asyncFunction3(function (result,

    error) { asyncFunction4(function (result, error) { asyncFunction5(function (result, error) { asyncFunction6(function (result, error) { asyncFunction7(function (result, error) { asyncFunction8(function (result, error) { // do something useful }) }) }) }) }) }) }) })
  4. • Basic components for working with asynchronous I/O in Python

    • Based on PEP-3156 • No callback hell, thanks to yield from • Will convince you to try Python 3 asyncio
  5. def update_index(urls, word): data = [] for url in urls:

    data.append(request_url(url)) ! result = process_data(data) update_database(result) update_cache(result) boring
  6. @asyncio.coroutine def update_index(urls, word): tasks = [] for url in

    urls: tasks.append(asyncio.async(request_url(url))) data = yield from asyncio.gather(*tasks) result = yield from process_data(data) yield from update_database(result) asyncio.get_event_loop().call_soon(update_cache(result)) let’s
  7. call_later(delay, func, *args) call_at(when, func, *args) call_soon(func, *args) add_reader(fd, func,

    *args) add_writer(fd, func, *args) remove_reader(fd) remove_writer(fd) run_until_complete() run_forever() stop()
  8. def run_forever(self): self._running = True try: while True: try: self._run_once()

    except _StopError: break finally: self._running = False Inside
  9. def _raise_stop_error(self, *args): raise _StopError ! def run_until_complete(self, future): future

    = tasks.async(future, loop=self) future.add_done_callback(self._raise_stop_error) self.run_forever() future.remove_done_callback(self._raise_stop_error) return future.result() Inside
  10. def _run_once(self): timeout = 0 if self._ready else self._get_timeout() event_list

    = self._selector.select(timeout) self._process_events(event_list) ! end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) self._ready.append(handle) ! ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() if not handle._cancelled: handle._run() Inside
  11. def _run_once(self): timeout = 0 if self._ready else self._get_timeout() event_list

    = self._selector.select(timeout) self._process_events(event_list) ! end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) self._ready.append(handle) ! ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() if not handle._cancelled: handle._run() Inside
  12. def _run_once(self): timeout = 0 if self._ready else self._get_timeout() event_list

    = self._selector.select(timeout) self._process_events(event_list) ! end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) self._ready.append(handle) ! ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() if not handle._cancelled: handle._run() Inside
  13. def _run_once(self): timeout = 0 if self._ready else self._get_timeout() event_list

    = self._selector.select(timeout) self._process_events(event_list) ! end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) self._ready.append(handle) ! ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() if not handle._cancelled: handle._run() Inside
  14. • Coroutine • a generator function • decorated with @coroutine

    • calls other coroutines using yield from • can return values • Future • promise of a result or an error • Task • future which runs a coroutine
  15. @asyncio.coroutine def update_index(urls, word): tasks = [] for url in

    urls: tasks.append(asyncio.async(request_url(url))) data = yield from asyncio.gather(*tasks) result = yield from process_data(data) yield from update_database(result) asyncio.get_event_loop().call_soon(update_cache(result)) Coroutines what
  16. loop = asyncio.get_event_loop() @asyncio.coroutine def handle_client(client, addr): while True: data

    = y ield from loop.sock_recv(client, 4096) if not data: break client.send(data) ! @asyncio.coroutine def accept_connections(server_socket): while True: client, addr = yield from loop.sock_accept(server_socket) asyncio.async(handle_client(client, addr)) ! server = socket.socket() server.bind('127.0.0.1', 1234) server.listen(128) server.set_blocking(False) loop.run_until_complete(accept_connections(server))
  17. Futures >>> future = asyncio.Future() >>> future.result() Traceback (most recent

    call last) ... InvalidStateError: Result is not ready. ! >>> future.set_result(42) >>> future.result() 42
  18. Futures >>> future = asyncio.Future() >>> future.add_done_callback( ... lambda f:

    print('result: %r’ % f.result())) >>> future.set_result(42)
  19. Futures >>> future = asyncio.Future() >>> future.add_done_callback( ... lambda f:

    print('result: %r’ % f.result())) >>> future.set_result(42) >>> loop = asyncio.get_event_loop() >>> loop.run_until_complete(future) result: 42 42
  20. Hint: yield from works with Futures. `r = yield from

    f` will block until r is available.
  21. @asyncio.coroutine def faux_sync_funtion(*args): future = asyncio.Future() def callback(result, error): if

    error is None: future.set_result(result) else: future.set_exception(Exception(error)) async_function(callback, *args) return (yield from future) No more callbacks
  22. Tasks • Unicorns covered by fairy dust • Coroutines wrapped

    in a Future • Work with yield from r = yield from Task(coroutine)
  23. def _run_once(self): timeout = 0 if self._ready else self._get_timeout() event_list

    = self._selector.select(timeout) self._process_events(event_list) ! end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) self._ready.append(handle) ! ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() if not handle._cancelled: handle._run() handle._run
  24. def _step(self, value=None, exc=None): self.__class__._current_tasks[self._loop] = self try: if exc

    is not None: result = self._coroutine.throw(exc) elif value is not None: result = self._coroutine.send(value) else: result = next(self._coroutine) except StopIteration as exc: self.set_result(exc.value) except futures.CancelledError as exc: super().cancel() # I.e., Future.cancel(self). except Exception as exc: self.set_exception(exc) ! … Inside
  25. def _step(self, value=None, exc=None): self.__class__._current_tasks[self._loop] = self try: if exc

    is not None: result = self._coroutine.throw(exc) elif value is not None: result = self._coroutine.send(value) else: result = next(self._coroutine) except StopIteration as exc: self.set_result(exc.value) except futures.CancelledError as exc: super().cancel() # I.e., Future.cancel(self). except Exception as exc: self.set_exception(exc) ! … Inside
  26. def _step(self, value=None, exc=None): self.__class__._current_tasks[self._loop] = self try: if exc

    is not None: result = self._coroutine.throw(exc) elif value is not None: result = self._coroutine.send(value) else: result = next(self._coroutine) except StopIteration as exc: self.set_result(exc.value) except futures.CancelledError as exc: super().cancel() # I.e., Future.cancel(self). except Exception as exc: self.set_exception(exc) ! … Inside
  27. def _step(self, value=None, exc=None): … if isinstance(result, futures.Future): result.add_done_callback(self._wakeup) self._fut_waiter

    = result if self._must_cancel: if self._fut_waiter.cancel(): self._must_cancel = False elif result is None: self._loop.call_soon(self._step) else: self._loop.call_soon(self._step, exc=RuntimeError()) ! def _wakeup(self, future): try: value = future.result() except Exception as exc: self._step(None, exc) else: self._step(value, None) Inside
  28. def _step(self, value=None, exc=None): … if isinstance(result, futures.Future): result.add_done_callback(self._wakeup) self._fut_waiter

    = result if self._must_cancel: if self._fut_waiter.cancel(): self._must_cancel = False elif result is None: self._loop.call_soon(self._step) else: self._loop.call_soon(self._step, exc=RuntimeError()) ! def _wakeup(self, future): try: value = future.result() except Exception as exc: self._step(None, exc) else: self._step(value, None) Inside
  29. def _step(self, value=None, exc=None): … if isinstance(result, futures.Future): result.add_done_callback(self._wakeup) self._fut_waiter

    = result if self._must_cancel: if self._fut_waiter.cancel(): self._must_cancel = False elif result is None: self._loop.call_soon(self._step) else: self._loop.call_soon(self._step, exc=RuntimeError()) ! def _wakeup(self, future): try: value = future.result() except Exception as exc: self._step(None, exc) else: self._step(value, None) Inside
  30. • Transport • represents a connection (socket, pipe…) • talks

    in bytes • Protocol • implements the application flow loop.create_connection(callback) loop.create_server(callback) loop.open_connection(callback) loop.start_server(callback) What I didn’t cover?