You can find multiple presentations on how to develop with asyncio. But do you know how underlying abstractions like event loop, coroutines, futures and tasks work under the covers? Let me show you how sausage is made!
urls: tasks.append(asyncio.async(request_url(url))) data = yield from asyncio.gather(*tasks) result = yield from process_data(data) yield from update_database(result) asyncio.get_event_loop().call_soon(update_cache(result)) let’s
urls: tasks.append(asyncio.async(request_url(url))) data = yield from asyncio.gather(*tasks) result = yield from process_data(data) yield from update_database(result) asyncio.get_event_loop().call_soon(update_cache(result)) Coroutines what
= y ield from loop.sock_recv(client, 4096) if not data: break client.send(data) ! @asyncio.coroutine def accept_connections(server_socket): while True: client, addr = yield from loop.sock_accept(server_socket) asyncio.async(handle_client(client, addr)) ! server = socket.socket() server.bind('127.0.0.1', 1234) server.listen(128) server.set_blocking(False) loop.run_until_complete(accept_connections(server))
error is None: future.set_result(result) else: future.set_exception(Exception(error)) async_function(callback, *args) return (yield from future) No more callbacks
is not None: result = self._coroutine.throw(exc) elif value is not None: result = self._coroutine.send(value) else: result = next(self._coroutine) except StopIteration as exc: self.set_result(exc.value) except futures.CancelledError as exc: super().cancel() # I.e., Future.cancel(self). except Exception as exc: self.set_exception(exc) ! … Inside
is not None: result = self._coroutine.throw(exc) elif value is not None: result = self._coroutine.send(value) else: result = next(self._coroutine) except StopIteration as exc: self.set_result(exc.value) except futures.CancelledError as exc: super().cancel() # I.e., Future.cancel(self). except Exception as exc: self.set_exception(exc) ! … Inside
is not None: result = self._coroutine.throw(exc) elif value is not None: result = self._coroutine.send(value) else: result = next(self._coroutine) except StopIteration as exc: self.set_result(exc.value) except futures.CancelledError as exc: super().cancel() # I.e., Future.cancel(self). except Exception as exc: self.set_exception(exc) ! … Inside
= result if self._must_cancel: if self._fut_waiter.cancel(): self._must_cancel = False elif result is None: self._loop.call_soon(self._step) else: self._loop.call_soon(self._step, exc=RuntimeError()) ! def _wakeup(self, future): try: value = future.result() except Exception as exc: self._step(None, exc) else: self._step(value, None) Inside
= result if self._must_cancel: if self._fut_waiter.cancel(): self._must_cancel = False elif result is None: self._loop.call_soon(self._step) else: self._loop.call_soon(self._step, exc=RuntimeError()) ! def _wakeup(self, future): try: value = future.result() except Exception as exc: self._step(None, exc) else: self._step(value, None) Inside
= result if self._must_cancel: if self._fut_waiter.cancel(): self._must_cancel = False elif result is None: self._loop.call_soon(self._step) else: self._loop.call_soon(self._step, exc=RuntimeError()) ! def _wakeup(self, future): try: value = future.result() except Exception as exc: self._step(None, exc) else: self._step(value, None) Inside
in bytes • Protocol • implements the application flow loop.create_connection(callback) loop.create_server(callback) loop.open_connection(callback) loop.start_server(callback) What I didn’t cover?