What is concurrency? Is Python good at it? How to scale up from single-node concurrency to multi-node. What does Python's new concurrent.futures stdlib library give us? What does asyncio give us? How do we evaluate concurrent performance?
Who is GIL? Global Interpreter Lock • One thread runs in Python interpreter at once • Threads tend to keep GIL until done or IO But… • This slows us down. Contention! • To teh codez!
import Thread inQ = Queue() outQ = Queue() def worker(): while True: l = inQ.get() sumL = sum(l) outQ.put( sumL ) numWorkers=10 ts = [Thread(target=worker) for i in xrange(numWorkers)] for t in ts: t.start() Get work to do CPU Bound Work Work output Main thread carries on Create Threads
Queue() def worker(): while True: url = inQ.get() resp = requests.get(url) outQ.put( (url, resp.status_code, resp.text) ) numWorkers=10 ts = [Thread(target=worker) for i in xrange(numWorkers)] ... Get work to do Blocking IO Work output
threading import Thread inQ = Queue() outQ = Queue() def worker(): while True: l = inQ.get() sumL = sum(l) outQ.put( sumL ) numWorkers=10 ts = [Thread(target=worker) for i in xrange(numWorkers)] ... ☹ CPU Bound Work doesn’t release GIL so... ☹ … no gain from more threads/cores ☹ … only more contention ☺ Code straight-forward ☹ Contention Points?
= Queue() def worker(): while True: url = inQ.get() resp = requests.get(url) outQ.put( (url, resp.status_code, resp.text) ) numWorkers=10 ts = [Thread(target=worker) for i in xrange(numWorkers)] ... ☹ … how many to start? pool? ☹ Contention Points? ☺ Gives up GIL ☹ One blocking IO operation per thread so... ☺Code straight-forward
l = inQ.get() sumL = sum(l) outQ.put( sumL ) numWorkers=10 ts = [Process(target=worker, args=(inQ, outQ)) for i in xrange(numWorkers)] ... ☺Not sharing GIL ☺ No GIL: More Processes means concurrent work ☺ Max out all your cores! ☺ Code straight-forward ☹ Contention? ☺(less scary -- process abstractions) We LIKE!
As Light ☺ Danger? High -- mutable, shared state. deadlocks. ☹ Lower, stricter communication ☺ Communication Primitives? Mutexes/locks, atomic CPU instructions, thread-safe data structures ☺ OS abstractions, pipes, sockets, shared memory, etc ☺ If they crash... whole program crashes ☹ only process crashes ☺ Control? Extremely High ☺ Moderate, through abstractions GIL? Yes ☹ No ☺
handle_io() def worker1(): while True: resp = handle_io1() update_shared_state(resp) def worker2(): while True: resp = handle_io2() update_shared_state(resp) Blocking Contention -- need to lock and update IO Bound work often looks like:
select(ui, io1, io2) if whichIsReady == io1: resp = handle_io1(req) if whichIsReady == ui ... Simultaneously block on multiple IO operations No longer need to lock shared state (in one thread)
and stored imgs!” def storeInDb(httpPromise) dbPromise = db.store(httpPromise) return dbPromise promise = httpReq.fetch() imgPromise = parseImgTags(promise) dbPromise = storedInDb(imgPromise) dbPromise.onDone(callback=callWhenDone) Promise/Deferred/Futures Handle to future work Handle to pending IO Chainable with other operations Can still use callbacks
storeInDb(httpPromise) return dbPromise promise = httpReq.fetch() promise.whenComplete(callWhenDone) imgPromise = parseImgTags(promise) dbPromise = storedInDb(imgPromise) myCoroutine.yieldUntil(dbPromise) print “Fetched and stored IMG tags!” Coroutines/cooperative multitasking. I own this thread until I say I’ m done Looks like a blocking call, but in reality yields back to event loop ☺ Readability of blocking IO ☺ Performance of non-blocking async IO
fetchUrl self.resp = None def fetch(self): self.resp = requests.get(self.fetchUrl) def fetchMultiple(urls): fetchers = [Fetcher(url) for url in urls] handles = [] for fetcher in fetchers: handles.append(gevent.spawn(fetcher.fetch)) gevent.joinall(handles) Blocking? Depends on your definition of “blocking” Spawn a Gevent worker that calls “fetch” Wait till all done
Modules (scipy, your module) Cython (compile to Python -> C) Jython/IronPython (JIT to JVM or .NET CLI) GPUs (CUDA, etc) cluster frameworks (discussed later)
worker infrastructure Advanced message patterns Choose your own message broker Mix Python + Java or other languages Java worker infrastructure Advanced message patterns More complex operationally High availability & linear scalability “Lambda Architecture” Options for multi-node concurrency start here upgrade here Redis as message broker Pure Python tasks Pure Python worker infrastructure Simple message patterns
streamparse: Apache Storm (real-time) • parallelize through Python process model • mixed workloads ◦ CPU- and IO-bound • mixed concurrency models are possible ◦ threads within Storm Bolts ◦ process pools within Hadoop Tasks
in stdlib since 2000! Comment from the source code in 2000: There are only two ways to have a program on a single processor do "more than one thing at a time". Multi-threaded programming is the simplest and most popular way to do it, but there is another very different technique, that lets you have nearly all the advantages of multi-threading, without actually using multiple threads. it's really only practical if your program is largely I/O bound. If your program is CPU bound, then pre- emptive scheduled threads are probably what you really need. Network servers are rarely CPU-bound, however.
concurrency in Python • in stdlib in Python 3.2+ • backport in 2.7 (pip install futures) • API design like Java Executor Framework • a Future abstraction as a return value • an Executor abstraction for running things
IO Support “Rebooted” • GvR’s pet project from 2012-2014 • Original implementation called tulip • Released in Python 3.4 as asyncio • PyCon 2013 keynote by GvR focused on it • PEP-380 (yield from) utilized by it
callback scheduling ◦ now ◦ time in in the future ◦ repeated / periodic • associate callbacks with file I/O states • offer pluggable I/O multiplexing mechanism ◦ select() ◦ poll(), epoll(), others
yield from to simplify callback hell • one event loop to rule them all ◦ Twisted and Tornado and gevent in same app! • offers an asyncio.Future ◦ asyncio.Future quacks-like futures.Future ◦ asyncio.wrap_future is an adapter ◦ asyncio.Task is subclass of Future
until future done, then return result result = yield from coroutine suspend until coroutine returns a result return expression return a result to another coroutine raise exception raise an exception to another coroutine
don’t I like green threads? In a simple program using stackless or gevent, it’s easy enough to say, ‘This is a call that goes to the scheduler -- it uses read() or send() or something. I know that’s a blocking call, I’ll be careful…. I don’t need explicit locking because between points A or B, I just need to make sure I don’t make any other calls to the scheduler.’ However, as code gets longer, it becomes hard to keep track. Sooner or later… - Guido van Rossum
models • pip install cassandra-driver • asyncore and libev event loops preferred • twisted and gevent also provided • performance benchmarking is dramatic (ranges from 2k to 18k write ops per sec)
execute / benchmark naive sync writes • switch to batched futures • switch to callback chaining • try different event loops • switch to pypy • discuss how asyncio could clean this up