Several months ago a StackOverflow user asked an interesting question:
Imagine we have an iterator, say
iter(range(1, 1000)). And we have two functions, each accepting an iterator as the only parameter, say
max(). In SQL world we would call them aggregate functions. Is there any way to obtain results of both without buffering the iterator output?
The question went unnoticed for some time, perhaps because it didn’t specify the customary
python-asyncio. I found it challenging, so I revived it with an answer that provides solutions using the various async tools available in Python¹. It soon rose to blog-post-length and might be interesting to my readers, so I’m reproducing it here.
¹ Technically not all solutions because it doesn’t cover curio and trio, but they are in this respect close enough to asyncio that the asyncio example can be easily adapted to them.
Two functions, one iterator
Let’s consider how to apply two aggregate functions to the same iterator, which we can only exhaust once. The initial attempt (which hardcodes
max for brevity, but is trivially generalizable to an arbitrary number of aggregate functions) might look like this:
def max_and_sum_buffer(it): content = list(it) p = sum(content) m = max(content) return p, m
This implementation has the downside that it stores all the generated elements in memory at once, despite both functions being perfectly capable of stream processing. The question anticipates this cop-out and explicitly requests the result to be produced without buffering the iterator output. Is it possible to do this?
Serial execution: itertools.tee
It certainly seems possible. After all, Python iterators are external, so every iterator is already capable of suspending itself. How hard can it be to provide an adapter that splits an iterator into two new iterators that provide the same content? Indeed, this is exactly the description of
itertools.tee, which appears perfectly suited to parallel iteration:
def max_and_sum_tee(it): it1, it2 = itertools.tee(it) p = sum(it1) # XXX m = max(it2) return p, m
The above produces the correct result, but doesn’t work the way we’d like it to. The trouble is that we’re not iterating in parallel. Aggregate functions like
max never suspend – each insists on consuming all of the iterator content before producing the result. So
sum will exhaust
max has had a chance to run at all. Exhausting elements of
it1 while leaving
it2 alone will cause those elements to be accumulated inside an internal FIFO shared between the two iterators. That’s unavoidable here – since
max(it2) must see the same elements,
tee has no choice but to accumulate them. (For more interesting details on
tee, refer to this post.)
In other words, there is no difference between this implementation and the first one, except that the first one at least makes the buffering explicit. To eliminate buffering,
max must run in parallel, not one after the other.
Let’s see what happens if we run the aggregate functions in separate threads, still using
tee to duplicate the original iterator:
def max_and_sum_threads_simple(it): it1, it2 = itertools.tee(it) with concurrent.futures.ThreadPoolExecutor(2) as executor: sum_future = executor.submit(lambda: sum(it1)) max_future = executor.submit(lambda: max(it2)) return sum_future.result(), max_future.result()
max actually run in parallel (as much as the GIL permits), threads being managed by the excellent
concurrent.futures module. It has a fatal flaw, however: for
tee not to buffer the data,
max must process their items at exactly the same rate. If one is even a little bit faster than the other, they will drift apart, and
tee will buffer all intermediate elements. Since there is no way to predict how fast each will run, the amount of buffering is both unpredictable and has the nasty worst case of buffering everything.
To ensure that no buffering occurs,
tee must be replaced with a custom generator that buffers nothing and blocks until all the consumers have observed the previous value before proceeding to the next one. As before, each consumer runs in its own thread, but now the calling thread is busy running a producer, a loop that actually iterates over the source iterator and signals that a new value is available. Here is an implementation:
def max_and_sum_threads(it): STOP = object() next_val = None consumed = threading.Barrier(2 + 1) # 2 consumers + 1 producer val_id = 0 got_val = threading.Condition() def send(val): nonlocal next_val, val_id consumed.wait() with got_val: next_val = val val_id += 1 got_val.notify_all() def produce(): for elem in it: send(elem) send(STOP) def consume(): last_val_id = -1 while True: consumed.wait() with got_val: got_val.wait_for(lambda: val_id != last_val_id) if next_val is STOP: return yield next_val last_val_id = val_id with concurrent.futures.ThreadPoolExecutor(2) as executor: sum_future = executor.submit(lambda: sum(consume())) max_future = executor.submit(lambda: max(consume())) produce() return sum_future.result(), max_future.result()
This is quite some amount of code for something so simple conceptually, but it is necessary for correct operation.
produce() loops over the outside iterator and sends the items to the consumers, one value at a time. It uses a barrier, a convenient synchronization primitive added in Python 3.2, to wait until all consumers are done with the old value before overwriting it with the new one in
next_val. Once the new value is actually ready, a condition is broadcast.
consume() is a generator that transmits the produced values as they arrive, until detecting
STOP. The code can be generalized run any number of aggregate functions in parallel by creating consumers in a loop, and adjusting their number when creating the barrier.
The downside of this implementation is that it requires creation of threads (possibly alleviated by making the thread pool global) and a lot of very careful synchronization at each iteration pass. This synchronization destroys performance – this version is almost 2000 times slower than the single-threaded
tee, and 475 times slower than the simple but non-deterministic threaded version.
Still, as long as threads are used, there is no avoiding synchronization in some form. To completely eliminate synchronization, we must abandon threads and switch to cooperative multi-tasking. The question is is it possible to suspend execution of ordinary synchronous functions like
max in order to switch between them?
It turns out that the
greenlet third-party extension module enables exactly that. Greenlets are an implementation of fibers, lightweight micro-threads that switch between each other explicitly. This is sort of like Python generators, which use
yield to suspend, except greenlets offer a much more flexible suspension mechanism, allowing one to choose who to suspend to.
This makes it fairly easy to port the threaded version of
max_and_sum to greenlets:
def max_and_sum_greenlet(it): STOP = object() consumers = None def send(val): for g in consumers: g.switch(val) def produce(): for elem in it: send(elem) send(STOP) def consume(): g_produce = greenlet.getcurrent().parent while True: val = g_produce.switch() if val is STOP: return yield val sum_result =  max_result =  gsum = greenlet.greenlet(lambda: sum_result.append(sum(consume()))) gsum.switch() gmax = greenlet.greenlet(lambda: max_result.append(max(consume()))) gmax.switch() consumers = (gsum, gmax) produce() return sum_result, max_result
The logic is the same, but with less code. As before,
produce produces values retrieved from the source iterator, but its
send doesn’t bother with synchronization, as it doesn’t need to when everything is single-threaded. Instead, it explicitly switches to every consumer in turn to do its thing, with the consumer dutifully switching right back. After going through all consumers, the producer is ready for the next iteration pass.
Results are retrieved using an intermediate single-element list because greenlet doesn’t provide access to the return value of the target function (and neither does
threading.Thread, which is why we opted for
There are downsides to using greenlets, though. First, they don’t come with the standard library, you need to install the greenlet extension. Then, greenlet is inherently non-portable because the stack-switching code is not supported by the OS and the compiler and can be considered somewhat of a hack (although an extremely clever one). A Python targeting WebAssembly or JVM or GraalVM would be very unlikely to support greenlet. This is not a pressing issue, but it’s definitely something to keep in mind for the long haul.
As of Python 3.5, Python provides native coroutines. Unlike greenlets, and similar to generators, coroutines are distinct from regular functions and must be defined using
async def. Coroutines can’t be easily executed from synchronous code, they must instead be processed by a scheduler which drives them to completion. The scheduler is also known as an event loop because its other job is to receive IO events and pass them to appropriate callbacks and coroutines. In the standard library, this is the role of the
Before implementing an asyncio-based
max_and_sum, we must first resolve a hurdle. Unlike greenlet, asyncio is only able to suspend execution of coroutines, not of arbitrary functions. So we need to replace
max with coroutines that do essentially the same thing. This is as simple as implementing them in the obvious way, only replacing
async for, enabling the async iterator to suspend the coroutine while waiting for the next value to arrive:
async def asum(it): s = 0 async for elem in it: s += elem return s async def amax(it): NONE_YET = object() largest = NONE_YET async for elem in it: if largest is NONE_YET or elem > largest: largest = elem if largest is NONE_YET: raise ValueError("amax() arg is an empty sequence") return largest # or, using https://github.com/vxgmichel/aiostream # #from aiostream.stream import accumulate #def asum(it): # return accumulate(it, initializer=0) #def amax(it): # return accumulate(it, max)
One could reasonably ask if providing a new pair of aggregate functions is cheating; after all, the previous solutions were careful to use existing
max built-ins. The answer will depend on the exact interpretation of the question, but I would argue that the new functions are allowed because they are in no way specific to the task at hand. They do the exact same thing the built-ins do, but consuming async iterators. I suspect that the only reason such functions don’t already exist somewhere in the standard library is due to coroutines and async iterators being a relatively new feature.
With that out of the way, we can proceed to write
max_and_sum as a coroutine:
async def max_and_sum_asyncio(it): loop = asyncio.get_event_loop() STOP = object() next_val = loop.create_future() consumed = loop.create_future() used_cnt = 2 # number of consumers async def produce(): for elem in it: next_val.set_result(elem) await consumed next_val.set_result(STOP) async def consume(): nonlocal next_val, consumed, used_cnt while True: val = await next_val if val is STOP: return yield val used_cnt -= 1 if not used_cnt: consumed.set_result(None) consumed = loop.create_future() next_val = loop.create_future() used_cnt = 2 else: await consumed s, m, _ = await asyncio.gather(asum(consume()), amax(consume()), produce()) return s, m
Although this version is based on switching between coroutines inside a single thread, just like the one using greenlet, it looks different. asyncio doesn’t provide explicit switching of coroutines, it bases task switching on the
await suspension/resumption primitive. The target of
await can be another coroutine, but also an abstract “future”, a value placeholder which will be filled in later by some other coroutine. Once the awaited value becomes available, the event loop automatically resumes execution of the coroutine, with the
await expression evaluating to the provided value. So instead of
produce switching to consumers, it suspends itself by awaiting a future that will arrive once all the consumers have observed the produced value.
consume() is an asynchronous generator, which is like an ordinary generator, except it creates an async iterator, which our aggregate coroutines are already prepared to accept by using
async for. An async iterator’s equivalent of
__next__ is called
__anext__ and is a coroutine, allowing the coroutine that exhausts the async iterator to suspend while waiting for the new value to arrive. When a running async generator suspends on an
await, that is observed by
async for as a suspension of the implicit
consume() does exactly that when it waits for the values provided by
produce and, as they become available, transmits them to aggregate coroutines like
amax. Waiting is realized using the
next_val future, which carries the next element from
it. Awaiting that future inside
consume() suspends the async generator, and with it the aggregate coroutine.
The advantage of this approach compared to greenlets’ explicit switching is that it makes it much easier to combine coroutines that don’t know of each other into the same event loop. For example, one could have two instances of
max_and_sum running in parallel (in the same thread), or run a more complex aggregate function that invoked further async code to do calculations.
The following convenience function shows how to run the above from non-asyncio code:
def max_and_sum_asyncio_sync(it): # trivially instantiate the coroutine and execute it in the # default event loop coro = max_and_sum_asyncio(it) return asyncio.get_event_loop().run_until_complete(coro)
Measuring and comparing performance of these approaches to parallel execution can be misleading because
max do almost no processing, which over-stresses the overhead of parallelization. Treat these as you would treat any microbenchmarks, with a large grain of salt. Having said that, let’s look at the numbers anyway!
Measurements were produced using Python 3.6 The functions were run only once and given
range(10000), their time measured by subtracting
time.time() before and after the execution. Here are the results:
max_and_sum_tee: 0.66 ms – almost exact same time for both, with the
teeversion being a bit faster.
max_and_sum_threads_simple: 2.7 ms. This timing means very little because of non-deterministic buffering, so this might be measuring the time to start two threads and the synchronization internally performed by Python.
max_and_sum_threads: 1.29 seconds, by far the slowest option, ~2000 times slower than the fastest one. This horrible result is likely caused by a combination of the multiple synchronizations performed at each step of the iteration and their interaction with the GIL.
max_and_sum_greenlet: 25.5 ms, slow compared to the initial version, but much faster than the threaded version. With a sufficiently complex aggregate function, one can imagine using this version in production.
max_and_sum_asyncio: 351 ms, almost 14 times slower than the greenlet version. This is a disappointing result because asyncio coroutines are more lightweight than greenlets, and switching between them should be much faster than switching between fibers. It is likely that the overhead of running the coroutine scheduler and the event loop (which in this case is overkill given that the code does no IO) is destroying the performance on this micro-benchmark.
uvloop: 125 ms. This is more than twice the speed of regular asyncio, but still almost 5x slower than greenlet.
Running the examples under PyPy doesn’t bring significant speedup, in fact most of the examples run slightly slower, even after running them several times to ensure JIT warmup. The asyncio function requires a rewrite not to use async generators (since PyPy as of this writing implements Python 3.5), and executes in somewhat under 100ms. This is comparable to CPython+uvloop performance, i.e. better, but not dramatic compared to greenlet.