Explicit continuations with Python coroutines

async/await and the event loop

Python 3.5 officially introduced asynchronous functions, also known as coroutines, and the await keyword. Based on the yield from extension to generators available since Python 3.3, await allows a seemingly blocking operation to instead suspend the execution of the async function, and be resumed later. The mix of synchronous appearance with asynchronous execution makes it a perfect replacement for callback-based programming, the infamous callback hell. Async/await was added to Python for the asyncio ecosystem, but it soon inspired asyncio alternatives such as curio and trio, and got adopted by old players in the async field. And none of this is unique to Python – async and await were previously present in C# and have since made their way into JavaScript, Scala, and even Rust.

A common theme with async/await coroutines is that their use tends to be centered around an event loop. This is because coroutines, much like generators, and in contrast to ordinary functions, cannot be started and left to complete without assistance. Before it produces a result, a coroutine can suspend itself and expect its caller to resume it later, once some condition is met. Multiple coroutines can run concurrently, using await to turn what would normally be a blocking operation into a cooperative context switch. All this requires coroutines to be driven by a dispatcher with which they coordinate their suspensions. Reasons for suspension will vary, but most of them will ultimately boil down to waiting for data, a resource, or a timeout. Since multiplexing IO and timeout events is the job description of polling event loops, a coroutine scheduler is typically integrated with an event loop.

For example, to execute an asyncio coroutine from synchronous code:

async def greet():
    print('hello...')
    await asyncio.sleep(1)
    print('...world')
    return 42

one must run it in the asyncio event loop:

loop = asyncio.get_event_loop()
result = loop.run_until_complete(greet())  # waits and displays output
assert result == 42

asyncio can of course do much more than run a single coroutine to completion. It can submit a coroutine to the event loop without waiting for it to finish, allowing independent coroutines to run concurrently. It provides combinators such as gather and wait which make it convenient for coroutines to start other coroutines in parallel, using await to get their results when available. It supports queuing ordinary functions to execute alongside coroutines, as well as running chunks of synchronous code in a thread or process pool and awaiting it as if it were a coroutine. Finally, once submitted to the event loop, coroutines can be manipulated like futures, making it easy to hook them to classic async code that relies on callbacks. Taken together, all these provide a very powerful toolbox for real-world asynchronous programming — as long as the code is run inside the asyncio event loop.

But what happens if we can’t run the asyncio event loop? Python callbacks sometimes run embedded inside the event loop of a game, in an application using a foreign GUI toolkit, or in a web server on an appliance. Is there a way to use coroutine in such environments, or are we condemned to revert to callback hell?

One option is to implement a subset of the event loop interface on whatever event loop we are running under. This is in principle explicitly allowed by asyncio, whose event loop is specified by PEP 3156. Doing so in practice is still a large undertaking best left to dedicated projects. For example, gbulb, a project that implements the asyncio event loop on top of GLib’s, contains a decent amount of bridge code, and still warns of the various impedance mismatches between GLib and asyncio. Some differences are fundamental, such as GLib allowing recursive looping while asyncio doesn’t, and some are subtle, such as incompatible approaches to multi-threaded execution or platform-specific differences. Implementing the asyncio event loop just to add some coroutines to an existing application is a non-starter.

It turns out that there are ways of executing coroutines that do not require having an event loop.

Coroutines with continuations

Coroutine-like constructs have been around for much longer than Python-style generators. The Scheme language famously pioneered continuations, a concept more powerful and general than coroutines, but also harder to understand and harder yet to efficiently implement. Scheme didn’t have a concept of coroutines, but it allowed any function to capture a continuation, which would allow it to be later resumed at that exact point. The powerful call/cc primitive could be used to implement all kinds of control flow abstractions, not limited to coroutines. In case of coroutines, both suspension and resumption can be expressed in terms of requesting a continuation and at the right time, and applying it later to switch to the previous execution point. Scheme continuations could be called any number of times, which posed a burden on their efficient implementation. This is why full-fledged continuations, despite their undeniable power, are almost universally shunned by mainstream language implementors.

The success of Python generators showed that more limited forms of suspendable functions can be efficiently implemented and still be well-accepted by programmers. Where a Scheme continuation is almost as heavy as a thread because it needs to store a snapshot of the entire call stack, a running Python generator only needs to store its local variables, and those of its subgenerators if any.

The rise of callback-based programming that led to the invention of async/await sparked a renewed interest in coroutines, including those with explicit continuations. In May 2017 Kotlin 1.1 introduced experimental support for just such coroutines. Kotlin coroutines deftly avoid the downsides of full-fledged Scheme-style continuations by instituting the same limitations as those of Python generators: there is a clear boundary between suspending and non-suspending code, and each continuation can only be resumed once.

Kotlin divides functions into suspending functions, statically marked with a suspend modifier, and regular non-suspending ones. To call a suspending function from normal code, one must invoke the createCoroutine primitive or its high-level cousins launch and async. Once inside a suspending function, one can freely call other suspending functions. Unlike in Python, await is not explicit, calling another suspending function automatically awaits it. Most importantly, a suspending function may choose to suspend itself by calling suspendCoroutine, a function without a direct Python equivalent. suspendCoroutine suspends execution, like Python’s yield, but before doing so, generates a continuation object and passes it to the coroutine. The coroutine will be resumed when someone calls the resume method on the continuation object.

This example uses suspendCoroutine to resume the same coroutine in a different thread, after a 1-second delay:

suspend fun greet() {
    println("hello...")
    suspendCoroutine<Unit> { cont ->
        Thread {Thread.sleep(1000); cont.resume(Unit)}.start()
    }
    println("world...")
    return 42
}

It can be invoked from any blocking code by calling the launch function, for example:

fun main(args: Array<String>) {
    launch(Unconfined) {
        greet()
    }
    Thread.sleep(1500)
}

Let’s convert the coroutine to Python syntax for easier analysis:

async def greet():
    print('hello')
    def resume_later(cont):
        t = threading.Timer(1, cont.resume, args=(None,))
        t.start()
    await suspendCoroutine(resume_later)
    print('world')
    return 42

resume_later trivially uses the venerable Timer to spawn a new thread that will call cont.resume(None) a second later. The real magic happens in suspendCoroutine, which:

  • creates a continuation and immediately (before suspending!) invokes resume_later with the continuation;
  • then returns an awaitable object that suspends the coroutine when awaited.

But why is all this a big deal? How is it useful?

Notice how everything is done without relying on the asyncio event loop. In fact, not only doesn’t the code rely on a particular event loop, it doesn’t presuppose the existence of an event loop at all! Using explicit suspend/resume, a coroutine can suspend itself and its callers, previously arranging its resumption. And it can achieve this relying only on the resources at its disposal, without needing anything like a full-fledged PEP 3156 event loop. For example, a version of asyncio.sleep that works within the GTK event loop might look like this:

async def glib_sleep(delay):
    await suspendCoroutine(
        lambda cont: GLib.timeout_add(delay * 1000, cont.resume, None))

GTK animations could use it to sleep between frames without disturbing the rest of the GUI:

async def animate(canvas, shape):
    # animate a shape along a sine curve
    for t in (i / 100 for i in range(0, 100)):
        canvas.draw(shape, WIDTH * t, HEIGHT * math.sin(t / (2 * math.pi)))
        await glib_sleep(1 / FPS)

Usefulness of explicit suspend is not limited to sleeping, it could be used to await any callback, such as update ticks from the GTK frame clock, or arbitrary widget signals. Widget setup code reacting to multiple events such as realize and map must currently be dispersed acrossed several callbacks. With suspend they could be easily expressed as a single coroutine that awaits each signal it’s interested in.

Use cases extend beyond GUI toolkits. Explicit suspend has the potential to bring benefits of coroutines to any callback-based environment, no matter how minimalistic.

Now that we’re hooked, let’s implement suspendCoroutine.

Continuations in Python

How await works

A Python coroutine, also called an async function, is a thin wrapper around a generator, where yield signals a suspension and return signals coroutine completion. The @coroutine decorator can be used to promote an ordinary generator into an awaitable coroutine:

@types.coroutine
def inner():
    yield 1
    yield 2
    return "foo"

Other async functions can now use await inner() to delegate their execution to inner(). But what will await return? Let’s try to drive the coroutine to completion using its send method:

>>> coro = inner()
>>> coro.send(None)
1
>>> coro.send(None)
2
>>> coro.send(None)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: foo

So, the caller of send() gets the yielded values, but the return value is buried in the StopIteration instance. And this value is picked up and returned by await:

async def outer():
    print(await inner())
    return "bar"

>>> coro = outer()
>>> coro.send(None)
1
>>> coro.send(None)
2
>>> coro.send(None)
foo
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: bar

As far as outer is concerned, the individual values yielded by inner do not exist, they are directly transferred all the way up to the caller of send. On the other hand, the foo value returned by inner is never seen by send, it is consumed by the await in outer. In its StopIteration instance send() only receives the bar value returned by outer. await can be thought of as syntactic sugar for:

# simplistic expansion of result = await x
while True:
    try:
        susp_value = x.send(None)
    except StopIteration as e:
        result = e.value
        break
    # x has chosen to suspend; propagate the suspension to the caller
    yield susp_value

await is implemented with the same code as yield from, which delegates execution to a sub-generator, which is (as authors of asyncio noticed) conceptually the same thing as awaiting a coroutine. The original asyncio was in fact entirely based on generators and yield from.

Suspensions in asyncio

The event loop drives a coroutines by doing something similar to what await does: invoking send() until it signals it is finished by raising StopIteration. The difference is that the event loop switches to a difference coroutine after each send, and switches to waiting for IO or timeouts when there are no runnable coroutines.

Any coroutine that suspends must be resumed at a later point by the event loop. A naked “suspend me” primitive is not provided by asyncio — every asyncio suspension is paired with the code that arranges for timely resumption. To see how this works, consider a minimal implementation of asyncio.sleep():

@types.coroutine
def simple_sleep(delay):
    me = asyncio.Task.current_task()
    loop.call_later(loop.time() + delay, me.set_result, None)
    yield

At this level, things start to look similar to the Kotlin version. Suspension itself is clearly represented by yield. Prior to suspension, the coroutine arranges for its resumption with loop.call_later. The continuation is apparently represented by the “current task”, the object that owns the coroutine inside the event loop, conveniently provided by asyncio. The task object inherits from asyncio.Future, which means it is resumed with the set_result() method, analogous to cont.resume(). Since the task is tracked by the event loop, it will automatically notice that it has finished and will resume a coroutine that awaits simple_sleep.

Explicit continuation

Now let’s see if we can write a version of simple_sleep that doesn’t depend on asyncio. First, we’ll need a simple driver to invoke the coroutine from synchronous code:

def start(coro):
    try:
        coro.send(None)
        # at this point the coroutine is suspended
    except StopIteration:
        pass  # the coroutine chose not to suspend

This is very different from an event loop. For one, there is no loop! The name start reflects that we don’t really drive the coroutine, we simply set it in motion, and leave it to do the rest.

Calling start on asyncio-style coroutines such as inner and outer won’t produce a useful result – it will simply pick up the first yielded value and exit immediately, discarding the value. This is exactly what we want — our coroutines know how to resume themselves, so there is no need for start to continue executing after the coroutine has been given a chance to run. Also, if we ever hope to invoke start from a callback-based system, both start and the later continuations must be non-blocking, i.e. each of them must only execute the coroutine code until the next suspension point.

Next, we need to change simple_sleep to use threading.Timer to arrange for its continuation, renaming it to thread_sleep:

@types.coroutine
def thread_sleep(delay):
    me = ???
    t = threading.Timer(delay, me.set_result, args=(None,))
    t.start()
    yield

Here we have a little problem. Exactly what callback do we pass to timer.Thread? Creating a Future like asyncio does wouldn’t help because a freshly created future would not be observed by anyone, so its set_result would have no effect. However, start already has exactly the continuation we’re looking for — it’s coro.send(). But we don’t and cannot access to the running coroutine object stored somewhere on the stack. If only we could only somehow tell start to telegram us the continuation, we’d be set.

One way to do so would be for start to leave the current continuation in a global variable, like asyncio does with the current task. But there is an even neater approach, inspired by Curio, and that is to request the continuation from our non-coroutine caller! Something like this:

@types.coroutine
def thread_sleep(delay):
    cont = yield
    t = threading.Timer(delay, cont, args=(None,))
    t.start()
    yield

Now we have two suspensions, one to request the continuation, and the other to actually suspend. We’ll also change start to handle the first one:

def _resume(coro, value):
    # the pattern of catching StopIteration is frequent enough that a
    # convenience function comes useful
    try:
        coro.send(value)
        return True
    except StopIteration:
        return False

def start(coro):
    if not _resume(coro, None):
        return  # the coroutine chose not to suspend at all
    # we are at first suspension, the coroutine requested the continuation
    cont = lambda value: _resume(coro, value)
    _resume(coro, cont)
    # we are at second suspension, the coroutine is responsible 
    # to resume itself by invoking its continuation

Let’s try if it works on the greet example:

async def greet():
    print('hello...')
    await thread_sleep(1)
    print('...world')

>>> start(greet())
hello...
>>> ...world

It works! start exited immediately, as expected, but the coroutine arranged for itself to continue. As strange as it looks to use _resume to send _resume to the function, it actually works. The await construct drives thread_sleep for as long as it suspends, passing each suspensions to its caller until it reaches a non-coroutine caller such as start().

Explicit suspension

Of course, we don’t want to write coroutines like thread_sleep using bare generators – we want to have a suspension primitive that is itself awaitable, the equivalent of Kotlin’s suspendCoroutine. Looking at thread_sleep, it is straightforward to generalize it:

@types.coroutine
def suspend(fn):
    cont = yield
    fn(cont)
    cont_retval = yield
    return cont_retval

This works exactly the same as thread_sleep, but leaving it up to the caller to decide what to do with the continuation, by passing it as to a function of the caller’s choosing. (This is why Scheme calls its primitive “call with current continuation”.) thread_sleep can now be an actual async function that awaits suspend:

async def thread_sleep(delay):
    def resume_later(cont):
        t = threading.Timer(delay, cont, args=(None,))
        t.start()
        # suspension happens at this point
    await suspend(resume_later)

start(greet()) works just like before. The original greet example translated from Kotlin to Python would now also work, just with suspendCoroutine changed to suspend. In fact, other than in the implementation of suspend, we never again have to use raw generators to implement suspension.

As a final change, the suspend API that accepts a function requires defining a temporary function for even very simple uses. This is not very ergonomic in Python, where suspension would be better expressed as an async context manager.

async def thread_sleep(delay):
    async with suspending() as cont:
        t = threading.Timer(delay, cont, args=(None,))
        t.start()
        # suspension happens at this point

suspending (named because it ends up in suspension, analogous to contextlib.closing) works like suspend, only split in two methods:

class suspending:
    __slots__ = ('_cont',)

    @types.coroutine
    def __aenter__(self):
        cont = yield
        self._cont = cont
        return cont

    @types.coroutine
    def __aexit__(self, *_):
        # we cannot return the result, so we leave it in cont.result
        self._cont.result = yield

corocc

The code presented above is available as the corocc module on github. It includes tests and several usage examples and is published under the MIT license. Although it is more complete than the snippets shown above, it should at this stage be considered experimental.

The goal of corocc is to bring the power of explicit continuations to Python. This should serve to allow coroutines to be used in a wider range of applications than previously possible.

Relation to greenlets

Invoking explicit continuations sounds in principle similar to the switching of greenlets, “green thread” objects provided by the greenlet library. Beyond the superficial similarity, the two do not share the same design. corocc is based on async functions and the await keyword, themselves based on Python generators and yield from. Greenlets are based on a full-featured green-thread design without distinction between blocking and suspending functions, so that any function may decide to switch context. Also, greenlets are organized into a tree hierarchy with exception propagation upward. These feature come at a cost of increased weight and also some limitations.

The two differ in behavior with threading. This is an area where continuation-based coroutines can lift the limitations of coroutines that were never inherent to coroutines themselves, but to the event loop design. There is nothing preventing a corocc coroutine from starting execution in one thread and continuing in another, and so on for every continuation. greenlet on the other hand explicitly documents that “It is not possible to mix or switch between greenlets belonging to different threads.” (Of course, switching a coroutine between threads is not possible in asyncio either.)

Cancellation

Canceling coroutines and futures from the outside is a standard event loop feature. In the explicit continuation paradigm, canceling a coroutine might prevent its continuation, possibly canceling its timeout if it was arranged through corocc.

Execution context

asyncio comes with functions gather and wait that combine coroutines running in parallel into a single awaitable object. Both of those are available in corocc, implemented on top of the on-done callback provided by the option of start to connect to a Future. However, fully implementing them requires a generic way to schedule a timeout task, the corocc equivalent of asyncio.sleep().

A generic corocc.sleep() would definitely require some knowledge of the execution context, perhaps obtained from a context provided to corocc.start(). Kotlin’s launch supports a context argument with similar semantics. The context would provide primitives like call_soon and call_later that could be easily adapted to new environments.

corocc.sleep() would use corocc.suspending() to retrieve the start context using the same protocol that currently retrieves the continuation. Then it would suspend itself and use context.call_later() to arrange for the continuation to be invoked later, regardless of the current event loop. The difference between this kind of execution a PEP 3156 event loop is that this mechanism is explicitly designed to hook into existing event loop systems, not to run the show.

Mixing with… other coroutines?

If corocc coroutines can be integrated into classic event loops, can we fit them into async event loops? Can we await a corocc coroutine from a Curio coroutine and the other way around? This doesn’t look very useful at first because async event loops are natively able to drive coroutines – but it might allow the same coroutine code to work under async and regular event loops!

It is not clear that this can be easily achieved, given the larger liberties provided to explicitly continued code. But with some ground rules and with support in the framework, it might be possible to do it anyway. And when the reward is access to a large body of established and well-tested code, it is well worth a try.