Rust global variables demystified

Rust has a reputation of a language unfriendly to global variables. While this reputation is not entirely undeserved, most of it stems from guarantees afforded by Rust and not by a desire to stifle the programmer’s creativity. In this article we’ll show how to use global variables, and how to overcome the limitations.

Is it even ok to use global variables?

Global variables are a controversial topic in programming circles, with many educators taking the moral high ground and condemning them as code smell, a shortcut, a crutch, hallmark of throw-away code, and <insert favorite insult>. While there is good reason for the hostility, there is also an abundance of situations where global variables are either appropriate or actually the only way to proceed. For example, pre-compiled regular expressions or state of a logger are examples where you’d probably want to use globals in preference to sending the state all the way from top-level to the bottom-most part of your program. Many low-level system APIs, such as those of signal handlers or hardware-triggered interrupts, have callbacks that don’t receive a “state” argument, so communication between the callback and the rest of the system must go through globals. There are other examples, and in this article we’ll assume that you have good reasons for using globals that your CS professor would approve of. If that’s not the case, don’t worry, we won’t tell.

Declaring globals

A Rust global variable is declared much like any other variable, except it’s declared at top-level and uses static instead of let:

static LOG_LEVEL: u8 = 0;

So we use the keyword static instead of let, and must spell out the type, as the compiler refuses to infer it. That means that you must spell out the type even when it’s unambiguous – static LOG_LEVEL = 0u8 won’t compile.

Note that what we’ve declared is actually a static, which is not necessarily global. If the same declaration appeared in a function, the variable would be visible only inside the function, but its value would still be shared among all invocations of the function and last until the end of the program. This article talks about global variables, but most of the content equally applies to static variables defined inside functions, the only difference being that of visibility.

The global works much like you’d expect: you can read it from any function in the module, you can import it (if declared pub) from another module and then read it from there. You can even borrow it and get a reference with a 'static lifetime – because the global variable lasts until the end of the program. Neat.

What you can’t do is assign to the global because it’s not declared as mut. This is where things get interesting.

Mutable globals – atomics and locks

Ideally we’d like to declare our global as mut and have a public API that manipulates it – say, a function that reads it and another that writes it:

static mut LOG_LEVEL: u8 = 0;

pub fn get_log_level() -> u8 {
    LOG_LEVEL
}

pub fn set_log_level(level: u8) {
    LOG_LEVEL = level;
}

The compiler rejects both the getter and the setter with a similar error message:

error[E0133]: use of mutable static is unsafe and requires unsafe function or block
 --> src/lib.rs:8:5
  |
8 |     LOG_LEVEL = level;
  |     ^^^^^^^^^^^^^^^^^ use of mutable static

The underlying problem is that a global variable is potentially visible from multiple threads. The above functions don’t synchronize their access to the global, so there’s nothing to prevent set_log_level() from being called in one thread while another thread calls get_log_level() or set_log_level(), either of which would constitute a data race. Rust requires an unsafe block to signal that such synchronization has been implemented by the code that surrounds access to the mutable static. Alternatively, the whole function can be unsafe to signal that the burden of synchronization is transferred to its callers. Since we don’t in fact have such synchronization (and it’s unclear how a caller of set_log_level() and get_log_level() would even ensure it), we won’t attempt to “fix” it by adding an unsafe to get the code to compile. We want to access globals without advanced reasoning about unsafe and undefined behavior.

Since we’re dealing with a potential data race, let’s address it with the mechanisms used elsewhere in Rust to avoid data races – locks and atomics. In case of a u8, we can simply replace u8 with AtomicU8:

use std::sync::atomic::{AtomicU8, Ordering};

static LOG_LEVEL: AtomicU8 = AtomicU8::new(0);

pub fn get_log_level() -> u8 {
    LOG_LEVEL.load(Ordering::Relaxed)
}

pub fn set_log_level(level: u8) {
    LOG_LEVEL.store(level, Ordering::Relaxed);
}

The global variable is no longer mut, so no unsafe is needed. The code is thread-safe and as performant as an unsafe version would be – on x86 the relaxed atomic load compiles into an ordinary load. If you need stricter ordering guarantees between LOG_LEVEL and other data in the program, you can use Ordering::SeqCst instead.

But what if we need something that won’t neatly fit into an atomic – say, a string? This compiles, but won’t allow us to modify the global:

static LOG_FILE: String = String::new();

pub fn get_log_file() -> &'static str {
    &LOG_FILE
}

Since there is no AtomicString, we need to use a proper lock:

use std::sync::Mutex;

// XXX - doesn't compile
static LOG_FILE: Mutex<String> = Mutex::new(String::new());

pub fn get_log_file() -> String {
    LOG_FILE.lock().unwrap().clone()
}

pub fn set_log_file(file: String) {
    *LOG_FILE.lock().unwrap() = file;
}

Note that get_log_file() must return a fresh copy of the string. Returning a reference would require a lifetime, and there is no lifetime to associate with the global variable other than 'static, and 'static is incorrect (and wouldn’t compile) because set_log_file() can modify it at any time.

The above doesn’t compile for a different reason:

error[E0015]: calls in statics are limited to constant functions, tuple structs and tuple variants
 --> src/lib.rs:3:34
  |
3 | static LOG_FILE: Mutex<String> = Mutex::new(String::new());
  |                                  ^^^^^^^^^^^^^^^^^^^^^^^^^

For more information about this error, try `rustc --explain E0015`.

What’s going on here? Why did String::new() compile, but Mutex::new(String::new()) didn’t?

The difference is that the globals we declared so far were just pieces of data whose initial values were available at compilation time. The compiler didn’t need to generate any initialization code for static LOG_LEVEL: u8 = 0 – it only reserved a byte in the executable’s data segment and ensured that it contained 0 at compile time. String::new() also works because it is a const fn, function specifically marked as runnable at compile time. It can be marked like that because an empty string doesn’t allocate, so the string returned by String::new() can be represented in the executable by a triple of (0 [length], 0 [capacity], NonNull::dangling() [constant representing unallocated pointer]). Nothing to do at run time. On the other hand, static LOG_FILE: String = String::from("foo") wouldn’t compile because String::from() requires a run-time allocation and is therefore not a const fn.

std::sync::Mutex::new() is not const fn because it requires an allocation in order to keep the system mutex at a fixed address. And even if we used an allocation-free mutex (such as parking_lot::Mutex which supports a const fn constructor on nightly Rust), we’d face the same issue if we wanted to start off with a non-empty string, a data structure coming from a library we don’t control, or information only available at run-time, such as fresh randomness or the current time. In general, we don’t want to be constrained to const fn functions when initializing global variables.

As a side note, C++ supports initializing global variables with arbitrary code by simply allowing the compiler to generate code that runs before main() (or during dlopen() in the case of dynamic libraries). This approach is convenient for simple values, but when used in real programs it led to issues with initialization order, aptly named static initialization order fiasco. To avoid that problem, as well as issues with libraries that require explicit initialization, Rust doesn’t allow pre-main initialization, opting instead for the approach C++ calls the construct on first use idiom.

We will review three ways to initialize a global variable with arbitrary data, two of them based on external (but extremely well reviewed and widely used) crates, and one based on the standard library.

Once cell

The once_cell crate provides a OnceCell type that can be used to define global variables. Here is how one would use OnceCell for LOG_FILE:

use once_cell::sync::OnceCell;
use std::sync::Mutex;

static LOG_FILE: OnceCell<Mutex<String>> = OnceCell::new();

fn ensure_log_file() -> &'static Mutex<String> {
    LOG_FILE.get_or_init(|| Mutex::new(String::new()))
}

pub fn get_log_file() -> String {
    ensure_log_file().lock().unwrap().clone()
}

pub fn set_log_file(file: String) {
    *ensure_log_file().lock().unwrap() = file;
}

Looking at the implementation of get_log_file() and set_log_file(), it is immediately apparent that they implement the “construct on first use” idiom – both functions call a method that ensures that the inner value is initialized (and that this is only done once), and retrieve a reference to the globally-stored value. This value can then be manipulated in the usual way through interior mutability.

OnceCell<T> is conceptually similar to a RefCell<Option<T>>. Like an Option, it has two states, one empty and another with a useful value. Like RefCell<Option<T>>, it uses interior mutability to allow setting the inner value using just a shared reference. But unlike Option, once set to a non-empty value, the stored value can never be set again. This allows a non-empty OnceCell to give out shared references to the inner data, which RefCell<Option> wouldn’t be allowed to do (it could at best return a Ref<T>) because the contents may change at any time. OnceCell is also thread-safe, so it would actually compare to Mutex<Option<T>>, except it uses an atomic to efficiently check whether the cell has been set.

once_cell also provides a Lazy type that makes the above even simpler, removing the need for a separate ensure_log_file() function:

use std::sync::Mutex;
use once_cell::sync::Lazy;

static LOG_FILE: Lazy<Mutex<String>> = Lazy::new(|| Mutex::new(String::new()));

pub fn get_log_file() -> String {
    LOG_FILE.lock().unwrap().clone()
}

pub fn set_log_file(file: String) {
    *LOG_FILE.lock().unwrap() = file;
}

Our globals now work pretty much the way we wanted them. Lazy<T> performs a sleight of hand that even allows us to directly call methods like lock() directly on LOG_FILE. It achieves this by implementing Deref, the trait normally used to treat lightweight containers (typically smart pointers) like the values they contain. Deref is the mechanism that allows you to call methods of Foo on Box<Foo>, or methods of &str on String, and so on. Lazy<T> wraps a OnceCell<T> and implements a Deref<T> that returns self.once_cell.get_or_init(|| self.init.take().unwrap()), where init is the closure passed to Lazy::new().

The Lazy version still uses the construct on first use idiom, it’s just hidden behind the magic of Deref. In some cases this can yield surprising results because the actual type of LOG_FILE is not Mutex<String>, it’s Lazy<Mutex<String>>, so if you use it a context that expects exactly &Mutex<String>, it will fail to compile. It’s not a big deal because you can always obtain the actual &Mutex<String> with &*LOG_FILE (equivalent to LOG_FILE.deref()), but it is something to be aware of.

The OnceCell and Lazy types are in the process of getting stabilized, so we can expect them to become part of the standard library in the near future.

Lazy static

Another popular library for creating global variables is the lazy_static crate, which defines a macro that hides even the lazy initialization, allowing you to write code that looks almost like an ordinary declaration:

use lazy_static::lazy_static;
use std::sync::Mutex;

lazy_static! {
    static ref LOG_FILE: Mutex<String> = Mutex::new(String::new());
}

// get_log_file() and set_log_file() defined as with once_cell::Lazy

pub fn get_log_file() -> String {
    LOG_FILE.lock().unwrap().clone()
}

pub fn set_log_file(file: String) {
    *LOG_FILE.lock().unwrap() = file;
}

An invocation of lazy_static! is just syntax sugar for defining a Lazy value. Under the hood everything works exactly the same as in the example that used once_cell::Lazy (except lazy_static defines its own lazy_static::Lazy). Like with once_cell::Lazy, the actual type of LOG_FILE is not Mutex<String>, but a different type which uses Deref to give out &'static Mutex<String> on method calls. Some details differ, e.g. lazy_static constructs a dummy type also named LOG_FILE and implements Deref on that, while hiding the actual Lazy<T> value in a static variable defined in a function – but the end result is exactly the same.

If you’re curious, you can run cargo expand on code generated by lazy_static! {...} to learn exactly what it does.

Standard library – Once+unsafe

Until OnceCell stabilizes, the standard library doesn’t offer a way to implement the global variables initialized with non-const functions without unsafe code. In most cases this should be avoided because you can use use once_cell or lazy_static. But if you must only depend on the standard library, if you want tighter control, or if you just want to learn how it’s done, here is an example that uses std::sync::Once to implement a mutable global:

use std::mem::MaybeUninit;
use std::sync::{Mutex, Once};

fn ensure_log_file() -> &'static Mutex<String> {
    static mut LOG_FILE: MaybeUninit<Mutex<String>> = MaybeUninit::uninit();
    static LOG_FILE_ONCE: Once = Once::new();

    // Safety: initializing the variable is only done once, and reading is
    // possible only after initialization.
    unsafe {
        LOG_FILE_ONCE.call_once(|| {
            LOG_FILE.write(Mutex::new(String::new()));
        });
        // We've initialized it at this point, so it's safe to return the reference.
        LOG_FILE.assume_init_ref()
    }
}

// get_log_file() and set_log_file() defined as with once_cell::OnceCell

pub fn get_log_file() -> String {
    ensure_log_file().lock().unwrap().clone()
}

pub fn set_log_file(file: String) {
    *ensure_log_file().lock().unwrap() = file;
}

Once ensures that MaybeUninit is initialized only once, which OnceCell guaranteed in previous versions of the code. Once is also efficient, using atomics to optimize the fast path when the variable has already been initialized. The definition of the variable, now static rather than global, is placed inside the function to prevent code outside ensure_log_file() from accessing it directly. All accesses inside ensure_log_file() are synchronized through call_once(), writer by running inside it, and readers by waiting for it to complete, which makes the access data-race-free.

Once the initialization of LOG_FILE is complete, ensure_log_file() can proceed to return the reference to the inside of the MaybeUninit, using assume_init_ref().

Which option to choose

In most cases involving real-world types, you’ll want to use once_cell or lazy_static, depending on which syntax you prefer. They have the exact same run-time characteristics, and you won’t go wrong with choosing either. Of course, when once_cell stabilizes in the stdlib, that will become the obvious choice.

There are two exceptions:

  1. When your globals are const-initialized and you don’t need to modify them, you can just declare them as static or const. The difference between the two is that static guarantees that they are stored in only one place and const doesn’t (it inlines them where they’re used).

  2. When you need to modify your globals, but their type is supported by std::sync::atomic: bool, u8u64, i8i64, usize, or isize. In that case you can declare the variable as static with the appropriate atomic type, and use the atomic API to read and modify it.

A case not covered by this article are thread-local variables, which can also be global. Those are provided by the thread_local macro from the standard library, and allow the use of non-Sync types in globals.

Upgradable parking_lot::RwLock might not be what you expect

Let’s say we’re building a simple table indexed by integers starting with 0. Although the keys are contiguous, the table is loaded from key-value pairs that arrive in arbitrary order. The number of elements is not known in advance and can be anywhere from a handful up to a hundred million, but is expected to fit in working memory, given a reasonable representation. Due to these constraints the table loader naturally represents the data as Vec<Option<Payload>>. It starts out empty and sports a set() method that fills the payload at the given position, resizing the table as needed:

/// Called during loading, place `val` at `pos`
fn set(&mut self, pos: usize, val: Payload) {
    if pos >= self.data.len() {
        self.data.resize_with(pos + 1, Default::default);
    }
    self.data[pos] = Some(val);
}

The table payload is comparatively expensive to compute (it is deserialized from JSON), so that’s done from multiple worker threads, and set() must be thread-safe. The obvious approach is to place the whole table under a lock, using the fantastic parking_lot crate to provide the implementation. Under a mutex data would be a Mutex<Vec<Option<Payload>>>:

fn set(&self, pos: usize, val: Payload) {
    let mut data = self.data.lock();  // no unwrap() because parking_lot is awesome!
    if pos >= data.len() {
        data.resize_with(pos + 1, Default::default);
    }
    data[pos] = Some(val);
}

/// Called once loading is done to extract the data
fn into_data(self) -> Vec<Option<Payload>> {
    self.data.into_inner()
}

This simple solution has a serious problem – now the whole of set() operates with the big lock acquired. On a machine with 32 physical cores, there is a lot of contention around calls to set(), and the worker threads end up spending about 80% of their time idly waiting for the lock. The worst part is that the coarse lock is only really needed to resize the table, which occurs very rarely, because just one large key immediately removes the need for resizes for all smaller keys. The large majority of calls to set() just access non-overlapping slots in data and have no real need for locking.

Fortunately parking_lot’s Mutex is so incredibly well crafted that it only costs one (1) byte of space. Even if it ends up taking up more space due to padding, just compare that to a POSIX mutex that requires 40 bytes on x86-64 Linux, and must be heap-allocated because it’s UB to move it after initialization! With parking_lot fine-grained locking is quite feasible, even on tables with hundreds of millions of elements. Individual locks will be uncontended and therefore acquirable without system calls, and virtually free compared to the big lock. If we knew the table size in advance, we’d represent it as Vec<Mutex<Option<Payload>>> and be done with it.

However, we still need some kind of lock around the whole table as well, in order to be able to resize it when necessary. Since resizing only happens occasionally, we can use a RwLock, acquiring a shared read lock to access the table and its individual row (which has its own mutex for updating), and request an exclusive write lock only to resize the whole table. The type for data now becomes RwLock<Vec<Mutex<Option<Payload>>>>. (Don’t be scared by the piled-up generics, just read them in the order of appearance: read-write lock containing a vector of mutexes which protect optional payloads.)

To top it off, the parking_lot README advertises a feature of its RwLock that looks exactly like what we need:

RwLock supports atomically upgrading an “upgradable” read lock into a write lock.

Perfect – we acquire the read lock for the whole table, and only upgrade it to a write lock if a resize is needed. After the resize, we downgrade it back to a read lock and proceed. With resizing taken care of, we access the inner mutexes through the shared read lock, acquire the finer-grained mutex without any contention, and store the data. Here is an implementation of that idea:

// XXX
pub fn set(&self, pos: usize, val: Payload) {
    let data = self.data.upgradable_read();
    if data.len() <= pos {   // need to resize the table
        let mut wdata = RwLockUpgradableReadGuard::upgrade(data);
        wdata.resize_with(pos + 1, Default::default);
        let data = RwLockWriteGuard::downgrade(wdata);
        *data[pos].lock() = Some(val);
    } else {
        *data[pos].lock() = Some(val);
    }
}

fn into_data(self) -> Vec<Option<Payload>> {
    self.data.into_inner().into_iter().map(Mutex::into_inner).collect()
}

However, benchmarking this code in production shows no improvement whatsoever compared to the version with one big mutex. What’s going on?

After spending a lot of time trying to figure out where else in the program there could be a lock causing contention (including investigating the allocator, among other suspects turned innocent), I finally came back to this code and carefully reviewed the documentation of upgradable_read, which says the following:

Locks this rwlock with upgradable read access, blocking the current thread until it can be acquired. The calling thread will be blocked until there are no more writers or other upgradable reads which hold the lock. There may be other readers currently inside the lock when this method returns.

So, an upgradable read is locking out other upgradable reads, even if they never exercise their option to upgrade the lock! Since every access to the table from set() is an upgradable read, this made it behave just like a mutex. I feel that this should be more prominently stressed in the docs, as it is easy to miss and makes upgradable reads useless for this scenario. The correct way to implement set() is by manually releasing the read lock and reacquiring a write lock:

pub fn set(&self, pos: usize, val: Payload) {
    let mut data = self.data.read();
    if data.len() <= pos {
        // "upgrade" the lock
        drop(data);
        let mut wdata = self.data.write();
        // check that someone else hasn't resized the table in the meantime
        if wdata.len() <= pos {
            wdata.resize_with(pos + 1, Default::default);
        }
        // now "downgrade" it back again
        drop(wdata);
        data = self.data.read();
    }
    *data[pos].lock() = Some(val);
}

The upgrade and the downgrade are of course no longer atomic, but we don’t really care, we just perform another check after the upgrade in case another thread beat us to the punch and resized the table. And most importantly, this version of the code utilized all the available cores, bringing load times from 7 minutes to 1m45s to parse and load ~70 GiB of JSON data. (Just going through all the data with cat takes 32s, so this is pretty good.)

The immediate moral of the story is: be careful when adopting non-standard features of fundamental APIs like the synchronization devices. The documentation stated the behavior clearly, and quite a bit of debugging would have been avoided by reading it in advance. Even an otherwise excellent library like parking_lot sometimes violates the principle of least surprise.

The deeper lesson is that fearless concurrency, and the more general “if it compiles, it works” principle have their limits. Rust has spoiled us by providing a nice experience with multi-threading, so much that it’s easy to be genuinely surprised when a program that successfully compiles ends up misbehaving in a non-obvious way.

Rust async is colored, and that’s not a big deal

In the last several years async-friendly languages and APIs have received a large amount of attention. One contentious point in the language design space are the “colored functions”, or division of functions to async and non-async ones. The term was introduced by the now-famous 2015 article titled What Color is Your Function?, which uses color as a metaphor for the often painful mismatch between sync and async functions in JavaScript and other languages with explicitly async functions. Since 2015 many more languages have jumped on the aysnc bandwagon, so many more programmers are now getting familiar with the metaphor. Given that some languages managed to provide async IO without colored functions, such as Go, Zig, and in the future likely Java, the discussion around function colors is picking up once again and is raised in the context of Rust. Some people have even tried to argue that the bad rap of colored function doesn’t apply to Rust’s async because it’s not colored in the first place. An article from several days ago is titled “Rust’s async isn’t f#@king colored!”, and similar arguments have appeared on reddit. I’m not picking on any specific post, but I’d like to provide a response to that sort of argument in general.

In this article I will show that Rust async functions are colored, by both the original definition and in practice. This is not meant as an criticism of Rust async, though – I don’t see function colors as an insurmountable issue, but as a reflection of the fundamental difference of async and sync models of the world. Languages that hide that difference do so by introducing compromises that might not be acceptable in a systems language like Rust or C++ – for example, by entirely forbidding the use of system threads, or by complicating the invocation of foreign or OS-level blocking calls. Colored functions are also present in at least C#, Python, Kotlin, and C++, so they’re not a quirk of JavaScript and Rust. And additional features of Rust async do make it easier to connect async code with traditional blocking code, something that is just not possible in JavaScript.

Colored functions

“What Color is Your Function?” starts off by describing an imaginary language that perversely defines two types of functions: red and blue. The language enforces a set of seemingly arbitrary rules regarding how the two are allowed to interact:

  1. Every function has a color.
  2. The way you call a function depends on its color.
  3. You can only call a red function from within another red function.
  4. Red functions are more painful to call.
  5. Some core library functions are red.

Without knowing the details, a reasonable person would agree that the described language is not particularly well designed. Of course, readers of this article in 2021 will not find it hard to recognize the analogy with async: red functions are async functions, and blue functions are just ordinary functions. For example, #2 and #4 refers to the fact that calling an async function requires either explicit callback chaining or await, whereas a sync function can just be called. #3 refers to the fact that await and callback resolution work only inside async functions, and JavaScript doesn’t provide a way to block the current non-async function until a promise (async value) is resolved. The article portrays async functions as a leaky abstraction that profoundly and negatively affects the language, starting with the above rules.

The rules of async make async code contagious, because using just one async function in one place requires all the callers up the stack to become async. This splits the ecosystem into async and non-async libraries, with little possibility to use them interchangeably. The article describes async functions as functions that operate on classic JavaScript callbacks, but further argues that async/await, which was novel at the time, doesn’t help with the issue. Although await constitutes a massive ergonomic improvement for calling async from async (#4), it does nothing to alleviate the split – you still cannot call async code from non-async code because await requires async.

Function colors in Rust async

How does all this apply to Rust? Many people believe that it applies only in part, or not at all. Several objections have been raised:

  1. Rust async functions are in essence ordinary functions that happen to return values that implement the Future trait. async fn is just syntactic sugar for defining such a function, but you can make one yourself using an ordinary fn as long as your function returns a type that implements Future. Since async functions, the argument goes, are functions that return Future<Output = T> instead of T and as such are not “special” in any way, any more than functions that return a Result<T> instead of T are special – so rule #1 (“every function has a color”) doesn’t apply.
  2. Unlike JavaScript, Rust async executors provide a block_on() primitive that invokes an async function from a non-async context and blocks until the result is available – so rule #3 (“you can only call a red function from within another red function”) doesn’t apply.
  3. Again unlike JavaScript, Rust async provides spawn_blocking() which invokes a blocking sync function from an async context, temporarily suspending the current async function without blocking the rest of the async environment. This one doesn’t correspond to a rule from the original article because JavaScript doesn’t support blocking sync functions.
  4. Rule #5 (“some core library functions are red”) doesn’t apply because Rust’s stdlib is sync-only.

If these arguments are correct, the only color rule that remains is rule #4, “red functions are more painful to call”, and that part is almost completely alleviated by await. The original JavaScript problems where async functions “don’t compose in expressions because of the callbacks” or “have different error-handling” simply don’t exist with await, in either JavaScript or Rust. Taking these arguments at face value, it would seem that the whole function-color problem is made up or at least wildly exaggerated from some exotic JavaScript problems that Rust async doesn’t inherit. Unfortunately, this is not the case.

First, the split between the sync and async ecosystems is immediately apparent to anyone who looks at the ecosystem. The very existence of async_std, a crate with the explicit purpose to provide an “async version of the Rust standard library”, shows that the regular standard library is not usable in an async context. If function colors weren’t present in Rust, the ordinary stdlib would be used in both sync and async code, as is the case in Go, where a distinction between “sync” and “async” is never made to begin with.

Then what of the above objections? Let’s go through them one by one and see how they hold up under scrutiny.

Aren’t Rust async functions just ordinary functions with a wacky return type?

While this is true in a technical sense, the same is also true in JavaScript and almost all languages with colored async (with the exception of Kotlin) in exactly the same way. JavaScript async functions are syntactic sugar for functions that create and return a Promise. Python’s async functions are regular callables that immediately return a coroutine object. That doesn’t change the fact that in all those languages the caller must handle the returned Promise (coroutine object in Python, Future in Rust) in ways that differ from handling normal values returned from functions. For example, you cannot pass an async function to Iterator::filter() because Iterator::filter() expects a function that returns an actual bool, not an opaque value that just might produce a bool at some point in the future. No matter what you put in the body of your async function, it will never return bool, and extracting the bool requires executor magic that creates other problems, as we’ll see below. Regardless of whether it’s technically possible to call an async function from a sync context, inability to retrieve its result is at the core of function color distinction.

Ok, but doesn’t the same apply to Result? Functions that need a u32 aren’t particularly happy to receive a Result<u32, SomeError>. A generic function that accepts u32, such as Iterator::min(), has no idea what to do with Result<u32, SomeError>. And yet people don’t go around claiming that Result somehow “colors” their functions. I admit that this argument has merit – Result indeed introduces a semantic shift that is not always easy to bridge, including in the example we used above, Iterator::filter(). There is even a proposal to add 21 new iterator methods such as try_filter(), try_min_by_key(), try_is_partitioned(), and so on, in order to support doing IO in your filter function (and key function, etc.). Doing this completely generically might require Haskell-style monads or at least some form of higher-kinded types. All this indicates that supporting both Result and non-Result types in fully generic code is far from a trivial matter. But is that enough to justify the claim that Result and Future are equivalent in how they affect functions that must handle them? I would say it’s not, and here is why.

If the recipient of a Result doesn’t care about the error case, it can locally resolve Result to the actual value by unwrapping it. If it doesn’t want to panic on error, it can choose to convert the error to a fallback value, or skip the processing of the value. While it can use the ? operator to propagate the error to its caller, it is not obliged to do so. The recipient of a Future doesn’t have that option – it can either .await the future, in which case it must become async itself, or it must ask an executor to resolve the future, in which case it must have access to an executor, and license to block. What it cannot do is get to the underlying value without interaction with the async environment.

Verdict: Rule #1 mostly applies to Rust – async functions are special because they return values that require async context to retrieve the actual payload.

Doesn’t block_on() offer a convenient way to invoke an async function from a non-async context?

Yes, provided you are actually allowed to use it. Libraries are expected to work with the executor provided by the environment and don’t have an executor lying around which they can just call to resolve async code. The standard library, for example, is certainly not allowed to assume any particular executor, and there are currently no traits that abstract over third-party executors.

But even if you had access to an executor, there is a more fundamental problem with block_on(). Consider a sync function fn foo() that, during its run, needs to obtain the value from an async function async fn bar(). To do so, foo() does something like let bar_result = block_on(bar()). But that means that foo() is no longer just a non-async function, it’s now a blocking non-async function. What does that mean? It means that foo() can block for arbitrarily long while waiting for bar() to complete. Async functions are not allowed to call functions like foo() for the same reason they’re not allowed to call thread::sleep() or TcpStream::connect() – calling a blocking function from async code halts the whole executor thread until the blocking function returns. In case of that happening in multiple threads, or in case of a single-threaded executor, that freezes the whole async system. This is not described in the original function color article because neither block_on() nor blocking functions exist in stock JavaScript. But the implications are clear: a function that uses block_on() is no longer blue, but it’s not red either – it’s of a new color, let’s call it purple.

If this looks like it’s changing the landscape, that’s because it is. And it gets worse. Consider another async function, xyzzy(), that needs to call foo(). If foo() were a blue/non-async function, xyzzy() would just call it and be done with it, the way it’d call HashMap::get() or Option::take() without thinking. But foo() is a purple function which blocks on block_on(bar()), and xyzzy() is not allowed to call it. The irony is that both xyzzy() and bar() are async and if xyzz() could just await bar() directly, everything would be fine. The fact that xyzzy() calls bar() through the non-async foo() is what creates the problem – foo‘s use of block_on() breaks the chain of suspensions required for bar() to communicate to xyzzy() that it needs to suspend until further notice. The ability to propagate suspension from the bottom-most awaitee all the way to the executor is the actual reason why async must be contagious. By eliminating async from the signature of foo() one also eliminates much of the advantage of bar() being async, along with the possibility of calling foo() from async code.

Verdict: rule #3 applies because block_on() changes a blue function into something that is neither red nor callable from red.

Doesn’t spawn_blocking() resolve the issue of awaiting blocking functions in async contexts?

spawn_blocking() is a neat bridge between sync and async code: it takes a sync function that might take a long time to execute, and instead of calling it, submits it to a thread pool for execution. It returns a Future, so you can await spawn_blocking(|| some_blocking_call()) like you’d await a true async function without the issues associated with block_on(). This is because the Future returned by spawn_blocking() is pending until until the thread pool reports that it’s done executing the submitted sync function. In our extended color metaphor, spawn_blocking() is an adapter that converts a purple function into a red function. Its main intended use case are CPU-bound functions that might take a long time to execute, as well as blocking functions that just don’t have a good async alternative. The example of the latter are functions that work with the file system, which still don’t have a good async alternative, or legacy blocking code behind FFI (think ancient database drivers and the like).

Problems arise when code tries to avoid multiple function colors and use block_on() or spawn_blocking() to hide the “color” of the implementation. For example, a library might be implemented using async code internally, but use block_on() to expose only a sync API. Someone might then use that library in an async context and wrap the sync calls in spawn_blocking(). What would be the consequences if that was done across the board? Recall that the important advantage of async is the ability to scale the number of concurrent agents (futures) without increasing the number of OS threads. As long as the agents are mostly IO-bound, you can have literally millions of them executing (most of them being suspended at any given time) on a single thread. But if an async function like the above xyzzy() uses spawn_blocking() to await a purple function like foo(), which itself uses block_on() to await an async function like bar(), then we have a problem: the number of xyzzy() instances that can run concurrently and make progress is now limited by the number of threads in the thread pool employed by spawn_blocking(). If you need to spawn a large number of tasks awaiting xyzzy() concurrently, most of them will need to wait for a slot in the thread pool to open up before their foo() functions even begin executing. And all this because foo() blocks on bar(), which is again ironic because bar(), being an async function, is designed to scale independently of the number of threads available to execute it.

The above is not just a matter of performance degradation; in the worst case spawn_blocking(|| block_on(...)) can deadlock. Consider what happens if one async function behind spawn_blocking(|| block_on(...)) needs data from another async function started the same way in order to proceed. It is possible that the other async function cannot make progress because it is waiting for a slot in the thread pool to even begin executing. And the slot won’t free up because it is taken by the first async function, which also runs inside a spawn_blocking() invocation. The slot is never going to change owner, and a deadlock occurs. This can’t happen with async functions that are directly executed as async tasks because those don’t require a slot in a fixed-size pool. They can all be in a suspended state waiting for something to happen to any of them, and resume execution at any moment. In an async system the number of OS threads deployed by the executor doesn’t limit the number of async functions that can work concurrently. (There are executors that use a single thread to drive all futures.)

Verdict: spawn_blocking() is fine to use with CPU-bound or true blocking code, but it’s not a good idea to use it with block_on() because the advantages of async are then lost and there is a possibility of deadlock.

But Rust’s stdlib is sync-only.

That’s technically true, but Rust’s stdlib is intentionally minimal. Important parts of functionality associated with Rust are delegated to external crates, with great success. Many of these external crates now require async, or even a specific executor like tokio. So while the standard library is async-free, you cannot ignore async while programming in Rust.

Verdict: technically true but not useful in a language with a minimalistic standard library.

Dealing with a two-colored world

Again, the above is not a criticism of Rust async, but merely of the claim that it’s not colored. Once we accept that it is, it becomes clear that, unlike JavaScript, Rust actually does provide the tools we need to deal with the mismatch. We can:

  1. Accept that sync and async are two separate worlds, and not try to hide it. In particular, don’t write “sync” interfaces that use block_on() to hide async ones, and the other way around with spawn_blocking(). If you absolutely must hide the async interfaces behind sync ones, then do so at immediately at the entry point, document that you’re doing so, and provide a public interface to the underlying native call.
  2. Respecting the above, use block_on() and spawn_blocking() in application-level code on the boundaries between the two worlds.
  3. In more complex scenarios, create clear and documented boundaries between the two worlds and use channels to communicate between them. This technique is already used for both multi-threaded and async code, so it should come to no surprise to future maintainers. Ideally you’d use channels that provide both a sync and an async interface, but if those are not available, use async channels with block_on() on the sync side.

Patterns of fallible iteration

Since Rust is a language without exceptions, errors are signaled by returning a Result: an enum that denotes either the function’s successful return value, or information about the error. This can seem like it requires a lot of error-handling boilerplate, but for most code this is in fact not the case. Rust’s powerful type system, the near-ubiquitous error-returning convention, and the ? operator combine convenience of exceptions with the safety of explicit return values. However, there are situations when error handling is buried inside callbacks and propagating the error takes a bit of effort. One such situation is when processing iterators that can return errors, where the ? error-handling operator cannot be used in the usual way.

This article will present a fallible iterator and show several ways to handle its errors without resorting to panic.

Explicit loop vs iterator

Reading a file is a typical example of fallible iteration. While the importance of handling errors when opening a file is widely acknowledged because the file can be missing or unreadable, errors while reading the file happen comparatively rarely and are often converted to panic with unwrap(). But such errors can and do occur, for example when the file is behind a network file system, or when it is backed by a pipe or hardware device, and should be handled cleanly. For demonstration purposes we’ll use a simple function that reads a file line by line, parses each line as integer, and returns the line with the maximum value. To indicate the possibility of failure, the function will return Result<u64, io::Error>, written more shortly as io::Result<u64>:

fn max_line(file_name: &str) -> io::Result<u64> {
    // ...
}

This signature gives the callers freedom to decide how to handle the errors indicated by max_line – they can unwrap() the return value to panic in case of error, they can match the error and handle the error variant, or they can use ? to propagate the error, if it occurs, to their caller. Here is a straightforward implementation of max_line, itself making copious use of the strange-looking ? operator:

fn max_line(file_name: &str) -> io::Result<u64> {
    let file = File::open(file_name)?;
    let mut max = 0;
    for line_result in BufReader::new(file).lines() {
        let line = line_result?;
        let n = line
            .parse::<u64>()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
        if n > max {
            max = n;
        }
    }
    Ok(max)
}

? is a postfix operator best described as use-or-return. If x is Result<T, SomeError>, the type of the expression x? will be T. When x is Ok(val), x? will evaluate to val, and when x is Err(e), the operator will just return the error result from the function. Applying the ? operator to Result actually requires the function to be declared as returning Result with a compatible error variant as return value, and fails to compile if this is not the case. BufRead::lines() provides lines as Results, so the expression let line = line_result? desugars to something like:

let line = match line_result {
    Ok(s) => s,               // a string, use it
    Err(e) => return Err(e),  // an error, return it
};

A minor complication arises from FromStr::parse for u64 not returning an io::Error, but a ParseIntError. Since we don’t particularly care about distinguishing between these two kinds of error condition, we use map_err() to unceremoniously convert ParseIntError into an io::Error, retaining the error message.

Readers experienced in idiomatic Rust will notice that the above function calculates the maximum using a for loop and a mutable accumulator, effectively reimplementing Iterator::max. Since BufReader::lines() already returns an iterator, it seems like an obvious improvement to combine it with map() and `max(), something like this:

// XXX
fn max_line(file_name: &str) -> io::Result<u64> {
    let file = File::open(file_name)?;
    BufReader::new(file)
        .lines()
        .map(|line_result| {
            let line = line_result?;
            let n = line.parse::<u64>()
                .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
            Ok(n)
        })
        .max()
        // unwrap_or() because max() returns an Option which is None 
        // when the iterator is empty
        .unwrap_or(Ok(0))
}

Not unsurprisingly, this fails to compile. The compiler complains that “the trait std::cmp::Ord is not implemented for std::io::Error“, which is Rust’s way of saying that Iterator::max() has no idea how to compare the io::Result<u64> values the closure passed to map() is producing. But the problem lies deeper than than a simple inability to compare. In the original for loop the ? operator could return from the max_line function, effectively aborting the iteration in case of error. In the iterator version, the parsing happens inside a closure, which takes away the power of ? to abort max_line(). In case of error, ? used in the closure returns an error result from the closure, but the iteration continues.

Stopping iteration on error will require a different approach, with several options depending on what we need to do with the items produced by the iterator.

Collecting items into container – collect

If you need to collect items from a fallible iterator, Rust offers an elegant idiom for aborting the iteration on error. Instead of normally collecting into a container, you can collect into Result<Container>. The effect will be exactly what we are after: collect() will return a Result that will be Ok(container) if all the values were Ok, and Err(e) if an error was encountered, in which case the iteration will have stopped after consuming the error. Applied to max_line, it would look like this:

// correct, but uses an intermediate vector
fn max_line(file_name: &str) -> io::Result<u64> {
    let file = File::open(file_name)?;
    let numbers_result: io::Result<Vec<u64>> = BufReader::new(file)
        .lines()
        .map(parse_line)
        .collect();
    let max = numbers_result?.into_iter().max().unwrap_or(0);
    Ok(max)
}

This performs the processing in two steps: first, the numbers are collected into a result that contains either a vector of numbers, or an error. We use ? to grab the vector or return the error and, provided there was no error, proceed to find the maximum.

Side note: since the closure passed to map will remain unchanged in subsequent examples, we moved it to a separate helper function:

fn parse_line(line_result: io::Result<String>) -> io::Result<u64> {
    let line = line_result?;
    let n = line
        .parse::<u64>()
        .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
    Ok(n)
}

The above max_line has the same error-handling behavior as the original for loop, but at the cost of allocating a potentially huge vector of numbers only to find the maximum. If you need the items in the vector for other purposes, that’s the approach you want to take. If you need to process the items in a streaming fashion, read on.

Consuming items – try_fold

max_line() is privileged to control the iteration from start to end: it sets up an iterator and consumes it, either with the for loop or letting a folding function like max() do it. Other than collect() discussed above, Iterator provides two additional consuming methods that stop on error: try_fold and try_for_each.

While a try_max() doesn’t exist, it is easy to emulate it because we can replace max() with fold(0, |max, n| std::cmp::max(max, n)). Likewise, try_max() is neatly expressible in terms of try_fold:

fn max_line(file_name: &str) -> io::Result<u64> {
    let file = File::open(file_name)?;
    BufReader::new(file)
        .lines()
        .map(parse_line)
        .try_fold(0u64, |max, n_result| Ok(std::cmp::max(max, n_result?)))
}

Not quite as pretty as try_max() would have been, but works exactly as the original for loop, and operates on the iterator. We don’t even need to use the ? operator for the result because max_line() returns an io::Result<u64> and that’s exactly what try_fold() gives us. If you can use this in your code, that’s almost certainly what you want.

The limitation of this approach is that it requires that you control how the iterator is consumed. If instead of max() we needed to invoke a function that accepts an iterator provided by a third party, we would have problems. Not every function that consumes an iterator can be easily replaced by a home-grown version based on try_fold or try_for_each. For example, the itertools crate contains a number of useful iterator adapters to solve real-world problems, including itertools::merge_by. We could use it to efficiently merge two sorted streams of records into a single stream, maintaining the sort order:

let iter1 = reader1.lines().map(Record::from_string);
let iter2 = reader2.lines().map(Record::from_string);
let mut out = Writer::new(...);
// XXX where to put try_fold or try_for_each?
for rec in itertools::merge_by(iter1, iter2, comparator) {
    out.write(rec.to_string())?;
}

Handling errors reported by iter1 and iter2 is not a simple matter of using try_fold or try_for_each because the record iterators are consumed by itertools::merge_by. As of this writing there is no try_merge_by in itertools, and it’s not clear there should be one, because adding try_merge_by would imply adding a fallible version of other adapters, eventually doubling the API surface of the module and the trait. There has to be a better way.

Stop-at-error iterator adapter – scan

Stopping at an error is really a special case of stopping the iteration on an arbitrary condition – something Iterator easily handles with take_while. It would seem that adapting an iterator to stop on error is as simple as tacking take_while(Result::is_ok) onto it, possibly adding a map to extract the values. The result looks like this:

// XXX
fn max_line(file_name: &str) -> io::Result<u64> {
    let file = File::open(file_name)?;
    let max = BufReader::new(file)
        .lines()
        .map(parse_line)
        .take_while(|n_result| match n_result {
            Ok(_) => true, // keep going
            Err(e) => {
                // XXX e is &io::Error
                false      // stop
            }
        })
        .map(Result::unwrap)
        .max()
        .unwrap_or(0);
    Ok(max)
}

This is closer to what we want, but not quite there yet. Iteration stops on error, but we don’t have the error value, nor any indication whether an error occurred. Since take_while() passes its callback a shared reference to the item, the closure cannot move the error to a captured local variable – the compiler would complain of a “move out of shared reference”. Also, take_while is a filter and not a transformer, so it requires the unsightly .map(Result::unwrap) making it look like the program might panic on error, when it will in fact correctly stop iterating.

We can address all the issues with take_while() by switching to its big brother Iterator::scan(). Like take_while(), scan() supports stopping the iteration, but it also allows transforming the value and storing intermediate state. We’ll use all those features, ending up with this:

fn max_line(file_name: &str) -> io::Result<u64> {
    let file = File::open(file_name)?;
    let mut err = Ok(());
    let max = BufReader::new(file)
        .lines()
        .map(parse_line)
        .scan(&mut err, until_err)
        .max()
        .unwrap_or(0);
    err?;
    Ok(max)
}

There are several new things going on, so let’s go through them one by one.

First, we initialize err to a Result which we’ll use as the place to store the error if one occurs. Its initial value is a placeholder – we could have used a type like Option<io::Error> and initialized it to None, but it’s nicer to use a Result because it will then work with ? in our function. As err serves only to detect the error, it has no meaningful Ok variant, and we initialize it with a unit value.

Second, after the unchanged map(parse_line), we chain the iterator to the scan() adapter, passing it a utility function which we’ll show shortly. The function passed to scan() provides values that will be returned from iterator’s next(), which means it stops the iteration by returning None. And that’s precisely what until_err does: returns Some(item) if the result is Ok(item), and otherwise stores the error result into the provided reference and returns None.

Third, since max() isn’t privy to our setup, it will return the maximum of the numbers it receives from the iterator. If there was no error, that will be the whole file and in case of error, we’ll get the maximum of lines until the first error. (Which is equivalent to what the max variable would have contained in case of error in our original loop implementation.) So after the call to max(), we must remember to check whether the iteration was prematurely aborted, and if so, return the error. err? does that and is simply a shorthand for if let Err(err) = err { return err }. Finally, once we’ve gotten errors out of the way and have proven that max() has observed all the lines from the file, we can return it from the function.

The remaining piece is the until_err helper, which looks like this:

fn until_err<T, E>(err: &mut &mut Result<(), E>, item: Result<T, E>) -> Option<T> {
    match item {
        Ok(item) => Some(item),
        Err(e) => {
            **err = Err(e);
            None
        }
    }
}

The signature looks daunting because the function will be called with a reference to the value passed to scan, which is itself a reference to our local variable of type Result<(), _>. (This signature allows passing temporary values to scan(), which we cannot do because we need to inspect err after the iteration to check whether an error was encountered.) The logic of the function a straightforward and described above: a match mapping Ok(item) to Some(item) and Err(e) to None, the latter stopping the iteration and storing e for later inspection.

To conclude, here is how scan() would apply to the code that uses itertools::merge_by:

let (mut err1, mut err2) = (Ok(()), Ok(()));
let iter1 = reader1.lines().map(Record::from_string).scan(&mut err1, until_err);
let iter2 = reader2.lines().map(Record::from_string).scan(&mut err2, until_err);
let mut out = Writer::new(...);
for rec in itertools::merge_by(iter1, iter2, comparator) {
    // check whether an input iterator has stopped, and abandon the loop if so
    err1?;
    err2?;
    out.write(rec.to_string())?;
}

Once merge_by is set up correctly, the remaining for loop can be easily transformed to use try_for_each() (or a separate scan() and for_each() pair!) to handle the write errors:

itertools::merge_by(iter1, iter2, comparator)
    .try_for_each(|rec| {
        err1?;
        err2?;
        out.write(rec.to_string())?;
    })?;

Summary

Fallible iteration is often omitted from introductory materials, but Rust does provide several idioms to handle errors in items:

  • If you need to collect the items, use collect::<Result<Container>, _>().
  • If you control how the iterator is consumed, use try_fold() or try_for_each() to stop at first error. Those two methods are even provided by Rayon’s parallel iterators.
  • Otherwise, use scan(&mut err, until_err) and use the iterator as usual. You’ll just need to live with the until_err helper function (which you can also write as a closure) and remember to check err for error after the iterator has been exhausted.

Note: the original version of this article used Iterator::sum() as the method that consumes the iterator, but a reddit reader pointed out that sum() actually stops on first result automatically.

Parallel stream processing with Rayon

Most Rust programmers have heard of Rayon, a crate that makes it almost magically easy to introduce parallelism to a program. In this article we’ll examine how to apply Rayon to basic stream processing.

Running the examples

To run the examples in this blog post, create a directory, run cargo init --bin in it and edit the generated Cargo.toml to include the following dependencies:

[dependencies]
rayon = "1.3.1"
serde_json = "1.0.57"

The code from the examples just goes to src/main.rs. You can run it with cargo run --release or build it with cargo build --release and run it as target/release/<directory-name>. Do use the release mode when checking the performance of anything in Rust, as the numbers you’ll get in debug mode are completely meaningless.

Basics of Rayon

Rayon is a library for parallel processing without the hassle. You don’t need to manage channels, events, futures, or even know about a thread pool, you just tell Rayon what you want performed in parallel, and Rayon does it. To borrow the example from the github page, here is a sequential program that sums up squares of the elements of a slice:

fn sum_of_squares(input: &[u64]) -> u64 {
    input.iter()
         .map(|&i| i * i)
         .sum()
}

To make it parallel, all you need to do is change iter to par_iter:

use rayon::prelude::*;

fn sum_of_squares(input: &[u64]) -> u64 {
    input.par_iter()
         .map(|&i| i * i)
         .sum()
}

Rayon delivers on its promise and produces a parallel version that runs ~1.5x faster, as tested on my machine using a vector of a billion integers. This is fairly impressive given that the individual map and reduce operations in this example boil down to just a CPU instruction each. Rayon’s implementation uses a technique called work stealing to ensure efficient distribution of tasks among threads. The idea behind work stealing is that each thread maintains a local queue of tasks submitted by code running in that thread. By default the thread runs local tasks, and only when it runs out of those does it go ahead and “steal” tasks from queues of other threads. If all threads are equally busy, each just services its own tasks, maximizing data locality and reducing the number of context switches.

The Rayon API provides two distinct abstractions: fork-join and parallel iterators. Fork-join is the lower-level primitive that is deceptively simple: it consists of a single function, join(), which accepts two closures and executes them, potentially in parallel. rayon::join(A, B) pushes B to the thread-local queue and starts executing A – in the same thread. By the time A returns, it’s possible that B had already been stolen by another thread, in which case the current thread waits for it to finish, stealing tasks from other threads while waiting. If B had not been stolen, join() just executes it as well, being no worse off than if the code had been sequential.

Fork-join elegantly maps to divide-and-conquer algorithms, but Rayon is best known and loved for its other abstraction built on top of fork-join, the parallel iterators. They enrich the standard library collections with a par_iter() method that returns a ParallelIterator, an object that closely resembles Iterator but which, when exhausted, executes closures passed to methods like map() and fold() in parallel. A simple use rayon::prelude::* enables this functionality, adding the par_iter() method to containers like Vec<T> or &[T].

Invoked on a slice, as in the sum_of_squares example, par_iter() divides it up into smaller chunks which it processes in parallel using rayon::join() behind the scenes. Methods like sum() and the more general fold() and reduce() aggregate the output, also in parallel, eventually returning a single value. A consequence of the semantics of ParallelIterator is that you cannot just use it in a regular for loop, you have to exhaust it using methods it provides such as the already mentioned sum(), fold(), and reduce(), but also for_each() and others. The closures passed to the methods that accept them must be callable from multiple threads, which the compiler will diligently check, as we’ll see below.

An excellent introduction to Rayon is provided by its author in the classic blog post which you should definitely check out.

Stream processing

An aspect of parallelism often overlooked when presenting Rayon is processing of streaming data. When processing a stream we don’t have a handy container to divide up, all we have are items arriving one at a time with no idea how many of them there will be in total. This makes some parallel processing techniques inaccessible, so we have to do things slightly differently.

As an example, let’s process a stream of JSON-encoded values in the one-record-per-line jsonl format. We will calculate the average of the value field in each record, with the sequential code looking like this:

use std::io::{BufRead, BufReader, Read};
use serde_json::{Map, Value};

fn avg_values_jsonl(input: impl Read) -> f64 {
    let input = BufReader::new(input);
    let mut cnt = 0usize;
    let total: f64 = input
        .lines()
        .map(Result::unwrap)
        .filter_map(|line| serde_json::from_str(&line).ok())
        .filter_map(|obj: Map<String, Value>| obj.get("value").cloned())
        .filter_map(|value| {
            cnt += 1;
            value.as_f64()
        })
        .sum();
    total / cnt as f64
}

Clear enough – iterate over input lines, drop the ones lines we can’t deserialize into a JSON object, remove objects without a value field or those whose value is not a number, sum them up, and divide by the count of valid records. The function is generic over the type of its input, so we can call it with any input that implements io::Read, such as a File, the standard input handle returned by io::stdin::lock(), or even a &[u8]:

fn main() {
    let json = r#"{"value": 1}
{"value": 15.3, "foo": "bar"}
{}
{"value": 100}"#
        .as_bytes();
    assert_eq!(avg_values_jsonl(json), 116.3 / 3.);
}

To make avg_values_jsonl parallel we can’t just add par_iter() into the mix like we did for sum_of_squares(). par_iter() is only defined for concrete container types because it relies on knowing the number of items in a container and being able to divide them into subparts to operate on, which doesn’t apply to generic iterators.

However Rayon provides a different mechanism specifically for the streaming use case, a par_bridge() method defined on iterators with the corresponding trait. The method adapts an ordinary iterator into a parallel one, employing clever design to take individual items from the iterator and balance them between Rayon’s threads. Most importantly, it implements ParallelIterator, which means you can use the adapter exactly as you would use the value returned by par_iter() on a collection.

Let’s try to insert par_bridge() into the iteration chain:

use rayon::prelude::*;

use std::io::{BufRead, BufReader, Read};
use serde_json::{Map, Value};

fn avg_values_jsonl(input: impl Read) -> f64 {
    let input = BufReader::new(input);
    let mut cnt = 0usize;
    let total: f64 = input
        .lines()
        .map(Result::unwrap)
        .par_bridge()  // this is new
        .filter_map(|line| serde_json::from_str(&line).ok())
        .filter_map(|obj: Map<String, Value>| obj.get("value").cloned())
        .filter_map(|value| {
            cnt += 1;
            value.as_f64()
        })
        .sum();
    total / cnt as f64
}

Unsurprisingly, our first attempt fails to compile:

   Compiling playground v0.1.0 (/home/hniksic/work/playground)
error[E0599]: no method named `par_bridge` found for struct `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>` in the current scope
   --> src/main.rs:12:10
    |
12  |           .par_bridge()
    |            ^^^^^^^^^^ method not found in `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>`
    |
    = note: the method `par_bridge` exists but the following trait bounds were not satisfied:
            `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::marker::Send`
            which is required by `std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`
            `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::marker::Send`
            which is required by `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`
            `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::iter::Iterator`
            which is required by `&std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`
            `&mut std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: std::marker::Send`
            which is required by `&mut std::iter::Map<std::io::Lines<std::io::BufReader<impl Read>>, fn(std::result::Result<std::string::String, std::io::Error>) -> std::string::String {std::result::Result::<std::string::String, std::io::Error>::unwrap}>: rayon::iter::par_bridge::ParallelBridge`

Charming, isn’t it? (But still not as bad as Tokio.)

The error message is inscrutable because the iterator on which we’re attempting to invoke par_bridge() has a complex generic type, coming from map() wrapping lines() wrapping BufReader wrapping the generic type of input. The actual problem is explained in the “note” which says that “the method par_bridge exists but the following trait bounds were not satisfied: <gibberish>: std::marker::Send”. The iterator returned by input.lines() doesn’t implement Send because it contains the value moved from input, whose type is only known to implement the Read trait. Without Send Rayon doesn’t have permission to send the iterator to another thread, which it might need to do. If the function were allowed to compile as written, calling it with an input which is not Send, perhaps because it contains Rc<_> or another non-Send type, would crash the program. Luckily for us, rustc prevents this and rejects the code due to the missing bound, even if the error message could be a bit smoother.

Once we understand the problem, the fix is simple: add the Send trait bound in addition to Read, declaring input as input: impl Read + Send. With that change we get a different compile error:

error[E0594]: cannot assign to `cnt`, as it is a captured variable in a `Fn` closure
  --> src/main.rs:17:13
   |
17 |             cnt += 1;
   |             ^^^^^^^^ cannot assign

The problem here is that the closure mutates shared state, the cnt counter. That requires the closure to capture cnt by unique (mutable) reference, which makes it an FnMut closure. This was perfectly fine in single-threaded code, but Rayon plans to call the closure from multiple threads, so it requests an Fn closure. The compiler rejects the assignment in an Fn closure and saves us from the embarrassment of a data race. Nice!

Neither of these issues is specific to Rayon, we would encounter the exact same errors if we tried to pass the closure to multiple threads using other means. We can fix the assignment by switching cnt to an AtomicUsize which can be safely modified through shared reference:

use rayon::prelude::*;

use std::sync::atomic::{AtomicUsize, Ordering};
use std::io::{BufRead, BufReader, Read};
use serde_json::{Map, Value};

fn avg_values_jsonl(input: impl Read + Send) -> f64 {
    let input = BufReader::new(input);

    let cnt = AtomicUsize::new(0);
    let total: f64 = input
        .lines()
        .map(Result::unwrap)
        .par_bridge()
        .filter_map(|line| serde_json::from_str(&line).ok())
        .filter_map(|obj: Map<String, Value>| obj.get("value").cloned())
        .filter_map(|value| {
            cnt.fetch_add(1, Ordering::Relaxed);
            value.as_f64()
        })
        .sum();
    total / cnt.into_inner() as f64
}

And this compiles!

After verifying that the parallel code produces correct results, we can proceed to compare its performance with that of the sequential version. One way is to modify the main() function to produce much more data and time the result:

fn main() {
    let json: String = (0..1_000_000).map(|i| format!(r#"{{"foo": "bar", "value": {}}}
"#, i)).collect();
    let t0 = std::time::Instant::now();
    assert_eq!(avg_values_jsonl(json.as_bytes()), 499999.5);
    println!("time: {}", t0.elapsed().as_secs_f64());
}

The above rather unscientific benchmark run on an old four-core desktop completes the sequential version in 0.51s and the Rayon version in 0.25s, showing a 2x speedup from parallel execution. Let’s see what happens when we add more keys to each JSON line to emulate a heavier workload per record:

fn main() {
    let some_keys = r#""foo1": "bar", "foo2": "bar", "foo3": "bar", "foo4": "bar", "foo5": "bar", "foo6": "bar", "foo7": "bar", "foo8": "bar", "foo9": "bar", "foo10": "bar""#;
    let json: String = (0..1_000_000).map(|i| format!(r#"{{{}, "value": {}}}
"#, some_keys, i)).collect();
    let t0 = std::time::Instant::now();
    assert_eq!(avg_values_jsonl(json.as_bytes()), 499999.5);
    println!("time: {}", t0.elapsed().as_secs_f64());
}

In this variant the sequential version takes 2.6s and the Rayon version 0.81s, a 3.2x speedup. As expected, making individual tasks more substantial increases the benefit of multiple cores.

Preserving order

The previous example summarizes the entire stream into a single number. But stream processing often needs to map input records into output records, possibly filtering them, but without upsetting their order. The classic Unix streaming commands like grep, cut or sed work exactly like that.

At first parallel processing and maintaining record order appear at odds with each other because parallel execution inherently removes ordering. But if only the transformations are run in parallel, it should be possible to associate an index with the input records as they are read, and use those indexes to reestablish the original order on output. Though perhaps not as straightforward as processing slices, processing streaming data in parallel seems perfectly feasible, even though none of the classic Unix streaming tools do it. (To be fair, that is because they predate today’s threading by decades, and also because they rely on parallelization on the pipeline level – piping grep to sed does run both commands in parallel.)

To experiment with order-preserving streaming, let’s transform JSONL containing arrays into a simple tab-delimited CSV:

use serde_json::Value;
use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};

fn json_to_csv(input: impl Read, output: impl Write) -> io::Result<()> {
    let input = BufReader::new(input);
    let mut output = BufWriter::new(output);
    for line in input.lines() {
        let line = line?;
        let rec: Vec<Value> = match serde_json::from_str(&line) {
            Ok(rec) => rec,
            Err(_) => continue, // should log the error...
        };
        for (idx, val) in rec.iter().enumerate() {
            output.write_all(val.to_string().as_bytes())?;
            if idx != rec.len() - 1 {
                output.write(b"\t")?;
            }
        }
        output.write(b"\n")?;
    }
    Ok(())
}

fn main() {
    let json = r#"[1, 2, 3]
["foo", "bar", "baz"]
"#;
    let mut output = vec![];
    json_to_csv(json.as_bytes(), &mut output).unwrap();
    let output = std::str::from_utf8(&output).unwrap();
    assert_eq!(output, "1\t2\t3\n\"foo\"\t\"bar\"\t\"baz\"\n");
}

Like the previous example, this one also reads the input line by line, deserializing each line into a JSON value. The value is now required to be an array of items, which are written to the output stream as a very naive form of CSV that just uses a tab as a separator.

There are two obstacles to running this in parallel using Rayon. The first, minor one, is that the code is structured as a for loop which makes copious use of the ? operator to bail out in case of IO error. Parallel execution doesn’t like for loops, so we’ll need to find a way to rewrite it using for_each, while still handling errors. The bigger obstacle is that you cannot just write the output from inside closures invoked by Rayon because they’ll be called in parallel, so their outputs will intermingle. In addition to that, the par_bridge() adapter, successfully used in the previous example, is explicitly documented not to guarantee preserving the order of the original iterator.

Fortunately there is a way out of both obstacles. Somewhat surprisingly, Rayon preserves original order when items are collected using the collect() method on the parallel iterator. This means that we don’t have to manually associate input records with order and reorder them on output, Rayon will do it for us, as long as we collect the output into a container. While we can’t slurp the whole input into a vector, we can work in batches: collect a fixed number of input lines into a Vec<String> batch. Then we can call par_iter() on the batch to process the lines in parallel, using map() to transform each input line into the corresponding output line. Finally we can collect the batch of output lines into a new Vec<String> whose order matches the order of input, and write them out.

Let’s examine the parallel version piece by piece:

fn json_to_csv(input: impl Read, output: impl Write) -> io::Result<()> {
    let input = BufReader::new(input);
    let mut output = BufWriter::new(output);
    let mut line_iter = input.lines();

In this version we don’t need to add Send bounds to input (or output) because we’ll do the reading and writing from the thread we’re called from. We do need to extract the input iterator into a separate variable because we’ll pull the lines manually:

    loop {
        let mut batch = vec![];
        for _ in 0..16384 {
            if let Some(line) = line_iter.next() {
                batch.push(line?);
            } else {
                break;
            }
        }
        if batch.is_empty() {
            break;
        }

We obtain a batch of input lines which we’ll process in parallel. Increasing the batch size generally improves performance, but past the size of several thousand lines, the returns begin to diminish and the memory use and latency start to become noticeable. Since the batch is collected in a single thread, we can also keep using the ? operator to bail out of the function in case of IO error.

        let output_lines: Vec<_> = batch
            .par_iter()
            .map(|line| {
                let mut outline: Vec<u8> = vec![];
                let rec: Vec<Value> = match serde_json::from_str(&line) {
                    Ok(rec) => rec,
                    Err(_) => return outline,
                };
                for (idx, val) in rec.iter().enumerate() {
                    outline.write_all(val.to_string().as_bytes()).unwrap();
                    if idx != rec.len() - 1 {
                       outline.write(b"\t").unwrap();
                    }
                }
                outline.write(b"\n").unwrap();
                outline
            })
            .collect();

Here we process the batch in parallel, transforming each input line into an output line. In case of error the output line will be left empty and not affect the output. We could have also used filter_map and returned Option<Vec<u8>> from the closure, but errors are expected to be rare, so we optimize for the common case and return the Vec directly. The closure passed to map() is called from multiple threads in parallel and therefore out of order, but the final collect() magically reassembles them back to the original order. The unwrap() used on all IO calls in the closure is not the cop-out it looks like because we can prove those unwraps will never panic. The output of all the writes is a Vec<u8> writing to which never results in an IO error – the only error that can happen is failing to grow the vector, and that will just abort the process.

What remains is to print the output lines:

        for line in output_lines {
            output.write_all(&line)?;
        }
    }
    Ok(())
}

To measure performance we can use the same approach as when benchmarking avg_values_jsonl: create a large input and measure the time it takes to process it.

fn main() {
    let json = r#"[1, 2, 3]
["foo", "bar", "baz"]
"#;
    let mut big = vec![];
    for _ in 0..1_000_000 {
        big.extend(json.as_bytes());
    }
    let mut output = vec![];
    let t0 = std::time::Instant::now();
    json_to_csv(big.as_slice(), &mut output).unwrap();
    println!("{}", t0.elapsed().as_secs_f64());
}

The sequential version finishes in 1.75s and the parallel version in 0.79s, a 2.2x speedup.

There are possible further optimizations to this approach. One is that the batch could be smaller, but consist of groups of lines. The map() closure would receive a group of lines instead of a single line, so the whole group of input lines would transform to a single Vec<u8> with multiple output lines, reducing the number of allocated strings. Another opportunity for optimization is that both input and output are currently performed without processing running during either. As suggested here, one could use rayon::join() to perform the reading of next input batch and the processing/output of the previous batch in parallel.

Based on experience and these experiments, Rayon can be put to good use for stream transformation and aggregation. It is especially straightforward when the order is not important and one can use par_bridge(), otherwise it requires explicit buffering, but still much less work than implementing parallel computation manually. Making the latter use case more convenient is requested in an issue.

Minimalistic blocking bounded queue in C++

While working on speeding up a Python extension written in C++, I needed a queue to distribute work among threads, and possibly to gather their results. What I had in mind was something akin to Python’s queue module – more specifically:

  • a blocking MPMC queue with fixed capacity;
  • that supports push, try_push, pop, and try_pop;
  • that depends only on the C++11 standard library;
  • and has a simple and robust header-only implementation.

Since I could choose the size of the unit of work arbitrarily, the performance of the queue as measured by microbenchmarks wasn’t a priority, as long as it wasn’t abysmal. Likewise, the queue capacity would be fairly small, on the order of the number of cores on the system, so the queue wouldn’t need to be particularly efficient at allocation – while a ring buffer would be optimal for storage, a linked list or deque would work as well. Other non-requirements included:

  • strong exception safety – while certainly an indicator of quality, the work queue did not require it because its payload would consist of smart pointers, either std::unique_ptr<T> or std::shared_ptr<T>, neither of which throws on move/copy.
  • lock-free/wait-free mutation – again, most definitely a plus (so long as the queue can fall back to blocking when necessary), but not a requirement because the workers would spend the majority of time doing actual work.
  • timed versions of blocking operations – useful in general, but I didn’t need them.

Googling C++ bounded blocking MPMC queue gives quite a few results, but surprisingly, most of them don’t really meet the requirements set out at the beginning.

For example, this queue is written by Dmitry Vyukov and endorsed by Martin Thompson, so it’s definitely of the highest quality, and way beyond something I (or most people) would be able to come up with on my own. However, it’s a lock-free queue that doesn’t expose any kinds of blocking operations by design. According to the author, waiting for events such as the queue no longer being empty or full is a concern that should be handled outside the queue. This position has a lot of merit, especially in more complex scenarios where you might need to wait on multiple queues at once, but it complicates the use in many simple cases, such as the work distribution I was implementing. In these use cases the a thread is communicating with only queue at one time. When the queue is unavailable due to being empty or full, that is a signal of backpressure and there is nothing else the thread can do but sleep until the situation changes. If done right, this sleep should neither hog the CPU nor introduce unnecessary latency, so it is at least convenient that it be implemented as part of the queue.

The next implementation that looks really promising is Erik Rigtorp’s MPMCQueue. It has a small implementation, supports both blocking and non-blocking variants of enqueue and dequeue operations, and covers inserting elements by move, copy, and emplace. The author claims that it’s battle-tested in several high-profile EA games, as well as in a low-latency trading platform. However, a closer look at push() and pop() betrays a problem with the blocking operations. For example, pop() contains the following loop:

    while (turn(tail) * 2 + 1 != slot.turn.load(std::memory_order_acquire))
      ;

Waiting is implemented with a busy loop, so that the waiting thread is not put to sleep when unable to pop, but it instead loops until this is possible. In usage that means that if the producers stall for any reason, such as reading new data from a slow network disk, consumers will angrily pounce on the CPU waiting for new items to appear in the queue. And if the producers are faster than the consumers, then it’s them who will spend CPU to wait for some free space to appear in the queue. In the latter case, the busy-looping producers will actually take the CPU cycles away from the workers doing useful work, prolonging the wait. This is not to say that Erik’s queue is not of high quality, just that it doesn’t fit this use case. I suspect that applications the queue was designed for very rarely invoke the blocking versions of these operations, and when they do, they do so in scenarios where they are confident that the blockage is about to clear up.

Then there are a lot of queue implementations on stackoverflow and various personal blogs that seem like they could be used, but don’t stand up to scrutiny. For example, this queue comes pretty high in search results, definitely supports blocking, and appears to achieve the stated requirements. Compiling it does produce some nasty warnings, though:

warning: operation on 'm_popIndex' may be undefined [-Wsequence-point]

Looking at the source, the assignment m_popIndex = ++m_popIndex % m_size is an infamous case of undefined behavior in C and C++. The author probably thought he found a way to avoid parentheses in m_popIndex = (m_popIndex + 1) % m_size, but C++ doesn’t work like that. The problem is easily fixed, but it leads a bad overall impression. Another issue is that it requires a semaphore implementation, using boost’s inter-process semaphore by default. The page does provide a replacement semaphore, but the two in combination become unreasonably heavy-weight: the queue contains no less than three mutexes, two condition variables, and two counters. The simple queues implemented with mutexes and condition variables tend to have a single mutex and one or two condition variables, depending on whether the queue is bounded.

It was at this point that it occurred to me that it would actually be more productive to just take a simple and well-tested classic queue and port it to C++ than to review and correct examples found by googling. I started from Python’s queue implementation and ended up with the following:

template<typename T>
class queue {
  std::deque<T> content;
  size_t capacity;

  std::mutex mutex;
  std::condition_variable not_empty;
  std::condition_variable not_full;

  queue(const queue &) = delete;
  queue(queue &&) = delete;
  queue &operator = (const queue &) = delete;
  queue &operator = (queue &&) = delete;

 public:
  queue(size_t capacity): capacity(capacity) {}

  void push(T &&item) {
    {
      std::unique_lock<std::mutex> lk(mutex);
      not_full.wait(lk, [this]() { return content.size() < capacity; });
      content.push_back(std::move(item));
    }
    not_empty.notify_one();
  }

  bool try_push(T &&item) {
    {
      std::unique_lock<std::mutex> lk(mutex);
      if (content.size() == capacity)
        return false;
      content.push_back(std::move(item));
    }
    not_empty.notify_one();
    return true;
  }

  void pop(T &item) {
    {
      std::unique_lock<std::mutex> lk(mutex);
      not_empty.wait(lk, [this]() { return !content.empty(); });
      item = std::move(content.front());
      content.pop_front();
    }
    not_full.notify_one();
  }

  bool try_pop(T &item) {
    {
      std::unique_lock<std::mutex> lk(mutex);
      if (content.empty())
        return false;
      item = std::move(content.front());
      content.pop_front();
    }
    not_full.notify_one();
    return true;
  }
};

Notes on the implementation:

  • push() and try_push() only accept rvalue references, so elements get moved into the queue, and if you have them in a variable, you might need to use queue.push(std::move(el)). This property allows the use of std::unique_ptr<T> as the queue element. If you have copyable types that you want to copy into the queue, you can always use queue.push(T(el)).

  • pop() and try_pop() accept a reference to the item rather than returning the item. This provides the strong exception guarantee – if moving the element from the front of the queue to item throws, the queue remains unchanged. Yes, I know I said I didn’t care for exception guarantees in my use case, but this design is shared by most C++ queues, and the exception guarantee follows from it quite naturally, so it’s a good idea to embrace it. I would have preferred pop() to just return an item, but that would require the C++17 std::optional (or equivalent) for the return type of try_pop(), and I couldn’t use C++17 in the project. The downside of accepting a reference is that to pop an item you must first be able to construct an empty one. Of course, that’s not a problem if you use the smart pointers which default-construct to hold nullptr, but it’s worth mentioning. Again, the same design is used by other C++ queues, so it’s apparently an acceptable choice.

  • The use of mutex and condition variables make this queue utterly boring to people excited about parallel and lock-free programming. Still, those primitives are implemented very efficiently in modern C++, without the need to use system calls in the non-contended case. In benchmarks against fancier queues I was never able to measure the difference on the workload of the application I was using.

Is this queue as battle-tested as the ones by Mr Vyukov and Mr Rigtorp? Certainly not! But it does work fine in production, it has passed code review by in-house experts, and most importantly, the code is easy to follow and understand.

The remaining question is – is this a classic example of NIH? Is there really no other way than to implement your own queue? How do others do it? To shed some light on this, I invite the reader to read Dmitry Vyukov’s classification of queues. In short, he categorizes queues across several dimensions: by whether they produce multiple producers, consumers, or both, by the underlying data structure, by maximum size, overflow behavior, support for priorities, ordering guarantees, and many many others! Differences in these choices vastly affect the implementation, and there is no one class that fits into all use cases. If you have needs for extremely low latency, definitely look into the lock-free queues like the ones from Vyukov or Rigtorp. If your needs are matched by the “classic” blocking queues, then a simple implementation like the one showed in this article might be the right choice. I would still prefer to have one good implementation written by experts that tries to cover the middle ground, for example a C++ equivalent of Rust’s crossbeam channels. But that kind of thing doesn’t appear to exist for C++ yet, at least not as a free-standing class.

EDIT
As pointed out on reddit, the last paragraph is not completely correct. There is a queue implementation that satisfies the requirements and has most of the crossbeam channels functionality: the moodycamel queue. I did stumble on that queue when originally researching the queues, but missed it when writing the article later, since it didn’t come up at the first page of results when googling for C++ bounded blocking MPMC queue. This queue was too complex to drop into the project I was working on, since its size would have dwarfed the rest of the code. But if size doesn’t scare you and you’re looking for a full-featured high quality queue implementation, do consider that one.

Control your magnetic tapes

Today I made a typo in the shell: instead of m trc (m being my alias for less and trc the name of some trace file), I accidentally typed mt rc. To my surprise, the system didn’t respond with zsh: command not found: mt. It responded with this message:

mt: invalid argument ‘rc’ for ‘operation’
Valid arguments are:
  - ‘eof’, ‘weof’
  - ‘fsf’
  - ‘bsf’
  - ‘fsr’
  - ‘bsr’
  - ‘rewind’
  - ‘offline’, ‘rewoffl’, ‘eject’
  - ‘status’
  - ‘bsfm’
  - ‘eom’
  - ‘retension’
  - ‘erase’
  - ‘asf’
  - ‘fsfm’
  - ‘seek’

Say, what? I’ve been using Unix-like systems for REDACTED years, but I don’t remember ever having used an mt command. And so I checked the fine manual:

MT(1)                              GNU CPIO                              MT(1)

NAME
       mt - control magnetic tape drive operation

SYNOPSIS
       mt [-V] [-f device] [--file=device] [--rsh-command=command] [--version]
       operation [count]

DESCRIPTION
       This manual page documents the GNU version  of  mt.   mt  performs  the
       given operation, which must be one of the tape operations listed below,
       on a tape drive.

       The  default  tape  device  to  operate  on  is  taken  from  the  file
       /usr/include/sys/mtio.h  when  mt is compiled.  It can be overridden by
       giving a device file name in the environment variable TAPE or by a com‐
       mand  line  option  (see  below),  which also overrides the environment
       variable.
...

Now I admit that I am not too keen to upgrade my Ubuntu, but it’s not that ancient, either. It turns out that an Ubuntu distribution released in 2018 actually installs a program to control magnetic tape drive operation! I know, I know, the program is likely useful for other things as well, and tar is technically also short for “tape archiver”, but it still feels a bit silly to carry this cruft into the 21st century.

And people say that Windows is arcane in its support for DOS graphics and x86 addressing modes.

Parallel iteration in Python

Several months ago a StackOverflow user asked an interesting question:

Imagine we have an iterator, say iter(range(1, 1000)). And we have two functions, each accepting an iterator as the only parameter, say sum() and max(). In SQL world we would call them aggregate functions. Is there any way to obtain results of both without buffering the iterator output?

The question went unnoticed for some time, perhaps because it didn’t specify the customary python-asyncio. I found it challenging, so I revived it with an answer that provides solutions using the various async tools available in Python¹. It soon rose to blog-post-length and might be interesting to my readers, so I’m reproducing it here.

¹ Technically not all solutions because it doesn’t cover curio and trio, but they are in this respect close enough to asyncio that the asyncio example can be easily adapted to them.

Two functions, one iterator

Let’s consider how to apply two aggregate functions to the same iterator, which we can only exhaust once. The initial attempt (which hardcodes sum and max for brevity, but is trivially generalizable to an arbitrary number of aggregate functions) might look like this:

def max_and_sum_buffer(it):
    content = list(it)
    p = sum(content)
    m = max(content)
    return p, m

This implementation has the downside that it stores all the generated elements in memory at once, despite both functions being perfectly capable of stream processing. The question anticipates this cop-out and explicitly requests the result to be produced without buffering the iterator output. Is it possible to do this?

Serial execution: itertools.tee

It certainly seems possible. After all, Python iterators are external, so every iterator is already capable of suspending itself. How hard can it be to provide an adapter that splits an iterator into two new iterators that provide the same content? Indeed, this is exactly the description of itertools.tee, which appears perfectly suited to parallel iteration:

def max_and_sum_tee(it):
    it1, it2 = itertools.tee(it)
    p = sum(it1)  # XXX
    m = max(it2)
    return p, m

The above produces the correct result, but doesn’t work the way we’d like it to. The trouble is that we’re not iterating in parallel. Aggregate functions like sum and max never suspend – each insists on consuming all of the iterator content before producing the result. So sum will exhaust it1 before max has had a chance to run at all. Exhausting elements of it1 while leaving it2 alone will cause those elements to be accumulated inside an internal FIFO shared between the two iterators. That’s unavoidable here – since max(it2) must see the same elements, tee has no choice but to accumulate them. (For more interesting details on tee, refer to this post.)

In other words, there is no difference between this implementation and the first one, except that the first one at least makes the buffering explicit. To eliminate buffering, sum and max must run in parallel, not one after the other.

Threads: concurrent.futures

Let’s see what happens if we run the aggregate functions in separate threads, still using tee to duplicate the original iterator:

def max_and_sum_threads_simple(it):
    it1, it2 = itertools.tee(it)

    with concurrent.futures.ThreadPoolExecutor(2) as executor:
        sum_future = executor.submit(lambda: sum(it1))
        max_future = executor.submit(lambda: max(it2))

    return sum_future.result(), max_future.result()

Now sum and max actually run in parallel (as much as the GIL permits), threads being managed by the excellent concurrent.futures module. It has a fatal flaw, however: for tee not to buffer the data, sum and max must process their items at exactly the same rate. If one is even a little bit faster than the other, they will drift apart, and tee will buffer all intermediate elements. Since there is no way to predict how fast each will run, the amount of buffering is both unpredictable and has the nasty worst case of buffering everything.

To ensure that no buffering occurs, tee must be replaced with a custom generator that buffers nothing and blocks until all the consumers have observed the previous value before proceeding to the next one. As before, each consumer runs in its own thread, but now the calling thread is busy running a producer, a loop that actually iterates over the source iterator and signals that a new value is available. Here is an implementation:

def max_and_sum_threads(it):
    STOP = object()
    next_val = None
    consumed = threading.Barrier(2 + 1)  # 2 consumers + 1 producer
    val_id = 0
    got_val = threading.Condition()

    def send(val):
        nonlocal next_val, val_id
        consumed.wait()
        with got_val:
            next_val = val
            val_id += 1
            got_val.notify_all()

    def produce():
        for elem in it:
            send(elem)
        send(STOP)

    def consume():
        last_val_id = -1
        while True:
            consumed.wait()
            with got_val:
                got_val.wait_for(lambda: val_id != last_val_id)
            if next_val is STOP:
                return
            yield next_val
            last_val_id = val_id

    with concurrent.futures.ThreadPoolExecutor(2) as executor:
        sum_future = executor.submit(lambda: sum(consume()))
        max_future = executor.submit(lambda: max(consume()))
        produce()

    return sum_future.result(), max_future.result()

This is quite some amount of code for something so simple conceptually, but it is necessary for correct operation.

produce() loops over the outside iterator and sends the items to the consumers, one value at a time. It uses a barrier, a convenient synchronization primitive added in Python 3.2, to wait until all consumers are done with the old value before overwriting it with the new one in next_val. Once the new value is actually ready, a condition is broadcast. consume() is a generator that transmits the produced values as they arrive, until detecting STOP. The code can be generalized run any number of aggregate functions in parallel by creating consumers in a loop, and adjusting their number when creating the barrier.

The downside of this implementation is that it requires creation of threads (possibly alleviated by making the thread pool global) and a lot of very careful synchronization at each iteration pass. This synchronization destroys performance – this version is almost 2000 times slower than the single-threaded tee, and 475 times slower than the simple but non-deterministic threaded version.

Still, as long as threads are used, there is no avoiding synchronization in some form. To completely eliminate synchronization, we must abandon threads and switch to cooperative multi-tasking. The question is is it possible to suspend execution of ordinary synchronous functions like sum and max in order to switch between them?

Fibers: greenlet

It turns out that the greenlet third-party extension module enables exactly that. Greenlets are an implementation of fibers, lightweight micro-threads that switch between each other explicitly. This is sort of like Python generators, which use yield to suspend, except greenlets offer a much more flexible suspension mechanism, allowing one to choose who to suspend to.

This makes it fairly easy to port the threaded version of max_and_sum to greenlets:

def max_and_sum_greenlet(it):
    STOP = object()
    consumers = None

    def send(val):
        for g in consumers:
            g.switch(val)

    def produce():
        for elem in it:
            send(elem)
        send(STOP)

    def consume():
        g_produce = greenlet.getcurrent().parent
        while True:
            val = g_produce.switch()
            if val is STOP:
                return
            yield val

    sum_result = []
    max_result = []
    gsum = greenlet.greenlet(lambda: sum_result.append(sum(consume())))
    gsum.switch()
    gmax = greenlet.greenlet(lambda: max_result.append(max(consume())))
    gmax.switch()
    consumers = (gsum, gmax)
    produce()

    return sum_result[0], max_result[0]

The logic is the same, but with less code. As before, produce produces values retrieved from the source iterator, but its send doesn’t bother with synchronization, as it doesn’t need to when everything is single-threaded. Instead, it explicitly switches to every consumer in turn to do its thing, with the consumer dutifully switching right back. After going through all consumers, the producer is ready for the next iteration pass.

Results are retrieved using an intermediate single-element list because greenlet doesn’t provide access to the return value of the target function (and neither does threading.Thread, which is why we opted for concurrent.futures above).

There are downsides to using greenlets, though. First, they don’t come with the standard library, you need to install the greenlet extension. Then, greenlet is inherently non-portable because the stack-switching code is not supported by the OS and the compiler and can be considered somewhat of a hack (although an extremely clever one). A Python targeting WebAssembly or JVM or GraalVM would be very unlikely to support greenlet. This is not a pressing issue, but it’s definitely something to keep in mind for the long haul.

Coroutines: asyncio

As of Python 3.5, Python provides native coroutines. Unlike greenlets, and similar to generators, coroutines are distinct from regular functions and must be defined using async def. Coroutines can’t be easily executed from synchronous code, they must instead be processed by a scheduler which drives them to completion. The scheduler is also known as an event loop because its other job is to receive IO events and pass them to appropriate callbacks and coroutines. In the standard library, this is the role of the asyncio module.

Before implementing an asyncio-based max_and_sum, we must first resolve a hurdle. Unlike greenlet, asyncio is only able to suspend execution of coroutines, not of arbitrary functions. So we need to replace sum and max with coroutines that do essentially the same thing. This is as simple as implementing them in the obvious way, only replacing for with async for, enabling the async iterator to suspend the coroutine while waiting for the next value to arrive:

async def asum(it):
    s = 0
    async for elem in it:
        s += elem
    return s

async def amax(it):
    NONE_YET = object()
    largest = NONE_YET
    async for elem in it:
        if largest is NONE_YET or elem > largest:
            largest = elem
    if largest is NONE_YET:
        raise ValueError("amax() arg is an empty sequence")
    return largest

# or, using https://github.com/vxgmichel/aiostream
#
#from aiostream.stream import accumulate
#def asum(it):
#    return accumulate(it, initializer=0)
#def amax(it):
#    return accumulate(it, max)

One could reasonably ask if providing a new pair of aggregate functions is cheating; after all, the previous solutions were careful to use existing sum and max built-ins. The answer will depend on the exact interpretation of the question, but I would argue that the new functions are allowed because they are in no way specific to the task at hand. They do the exact same thing the built-ins do, but consuming async iterators. I suspect that the only reason such functions don’t already exist somewhere in the standard library is due to coroutines and async iterators being a relatively new feature.

With that out of the way, we can proceed to write max_and_sum as a coroutine:

async def max_and_sum_asyncio(it):
    loop = asyncio.get_event_loop()
    STOP = object()

    next_val = loop.create_future()
    consumed = loop.create_future()
    used_cnt = 2  # number of consumers

    async def produce():
        for elem in it:
            next_val.set_result(elem)
            await consumed
        next_val.set_result(STOP)

    async def consume():
        nonlocal next_val, consumed, used_cnt
        while True:
            val = await next_val
            if val is STOP:
                return
            yield val
            used_cnt -= 1
            if not used_cnt:
                consumed.set_result(None)
                consumed = loop.create_future()
                next_val = loop.create_future()
                used_cnt = 2
            else:
                await consumed

    s, m, _ = await asyncio.gather(asum(consume()), amax(consume()),
                                   produce())
    return s, m

Although this version is based on switching between coroutines inside a single thread, just like the one using greenlet, it looks different. asyncio doesn’t provide explicit switching of coroutines, it bases task switching on the await suspension/resumption primitive. The target of await can be another coroutine, but also an abstract “future”, a value placeholder which will be filled in later by some other coroutine. Once the awaited value becomes available, the event loop automatically resumes execution of the coroutine, with the await expression evaluating to the provided value. So instead of produce switching to consumers, it suspends itself by awaiting a future that will arrive once all the consumers have observed the produced value.

consume() is an asynchronous generator, which is like an ordinary generator, except it creates an async iterator, which our aggregate coroutines are already prepared to accept by using async for. An async iterator’s equivalent of __next__ is called __anext__ and is a coroutine, allowing the coroutine that exhausts the async iterator to suspend while waiting for the new value to arrive. When a running async generator suspends on an await, that is observed by async for as a suspension of the implicit __anext__ invocation. consume() does exactly that when it waits for the values provided by produce and, as they become available, transmits them to aggregate coroutines like asum and amax. Waiting is realized using the next_val future, which carries the next element from it. Awaiting that future inside consume() suspends the async generator, and with it the aggregate coroutine.

The advantage of this approach compared to greenlets’ explicit switching is that it makes it much easier to combine coroutines that don’t know of each other into the same event loop. For example, one could have two instances of max_and_sum running in parallel (in the same thread), or run a more complex aggregate function that invoked further async code to do calculations.

The following convenience function shows how to run the above from non-asyncio code:

def max_and_sum_asyncio_sync(it):
    # trivially instantiate the coroutine and execute it in the
    # default event loop
    coro = max_and_sum_asyncio(it)
    return asyncio.get_event_loop().run_until_complete(coro)

Performance

Measuring and comparing performance of these approaches to parallel execution can be misleading because sum and max do almost no processing, which over-stresses the overhead of parallelization. Treat these as you would treat any microbenchmarks, with a large grain of salt. Having said that, let’s look at the numbers anyway!

Measurements were produced using Python 3.6 The functions were run only once and given range(10000), their time measured by subtracting time.time() before and after the execution. Here are the results:

  • max_and_sum_buffer and max_and_sum_tee: 0.66 ms – almost exact same time for both, with the tee version being a bit faster.
  • max_and_sum_threads_simple: 2.7 ms. This timing means very little because of non-deterministic buffering, so this might be measuring the time to start two threads and the synchronization internally performed by Python.
  • max_and_sum_threads: 1.29 seconds, by far the slowest option, ~2000 times slower than the fastest one. This horrible result is likely caused by a combination of the multiple synchronizations performed at each step of the iteration and their interaction with the GIL.
  • max_and_sum_greenlet: 25.5 ms, slow compared to the initial version, but much faster than the threaded version. With a sufficiently complex aggregate function, one can imagine using this version in production.
  • max_and_sum_asyncio: 351 ms, almost 14 times slower than the greenlet version. This is a disappointing result because asyncio coroutines are more lightweight than greenlets, and switching between them should be much faster than switching between fibers. It is likely that the overhead of running the coroutine scheduler and the event loop (which in this case is overkill given that the code does no IO) is destroying the performance on this micro-benchmark.
  • max_and_sum_asyncio using uvloop: 125 ms. This is more than twice the speed of regular asyncio, but still almost 5x slower than greenlet.

Running the examples under PyPy doesn’t bring significant speedup, in fact most of the examples run slightly slower, even after running them several times to ensure JIT warmup. The asyncio function requires a rewrite not to use async generators (since PyPy as of this writing implements Python 3.5), and executes in somewhat under 100ms. This is comparable to CPython+uvloop performance, i.e. better, but not dramatic compared to greenlet.

Explicit continuations with Python coroutines

async/await and the event loop

Python 3.5 officially introduced asynchronous functions, also known as coroutines, and the await keyword. Based on the yield from extension to generators available since Python 3.3, await allows a seemingly blocking operation to instead suspend the execution of the async function, and be resumed later. The mix of synchronous appearance with asynchronous execution makes it a perfect replacement for callback-based programming, the infamous callback hell. Async/await was added to Python for the asyncio ecosystem, but it soon inspired asyncio alternatives such as curio and trio, and got adopted by old players in the async field. And none of this is unique to Python – async and await were previously present in C# and have since made their way into JavaScript, Scala, and even Rust.

A common theme with async/await coroutines is that their use tends to be centered around an event loop. This is because coroutines, much like generators, and in contrast to ordinary functions, cannot be started and left to complete without assistance. Before it produces a result, a coroutine can suspend itself and expect its caller to resume it later, once some condition is met. Multiple coroutines can run concurrently, using await to turn what would normally be a blocking operation into a cooperative context switch. All this requires coroutines to be driven by a dispatcher with which they coordinate their suspensions. Reasons for suspension will vary, but most of them will ultimately boil down to waiting for data, a resource, or a timeout. Since multiplexing IO and timeout events is the job description of polling event loops, a coroutine scheduler is typically integrated with an event loop.

For example, to execute an asyncio coroutine from synchronous code:

async def greet():
    print('hello...')
    await asyncio.sleep(1)
    print('...world')
    return 42

one must run it in the asyncio event loop:

loop = asyncio.get_event_loop()
result = loop.run_until_complete(greet())  # waits and displays output
assert result == 42

asyncio can of course do much more than run a single coroutine to completion. It can submit a coroutine to the event loop without waiting for it to finish, allowing independent coroutines to run concurrently. It provides combinators such as gather and wait which make it convenient for coroutines to start other coroutines in parallel, using await to get their results when available. It supports queuing ordinary functions to execute alongside coroutines, as well as running chunks of synchronous code in a thread or process pool and awaiting it as if it were a coroutine. Finally, once submitted to the event loop, coroutines can be manipulated like futures, making it easy to hook them to classic async code that relies on callbacks. Taken together, all these provide a very powerful toolbox for real-world asynchronous programming — as long as the code is run inside the asyncio event loop.

But what happens if we can’t run the asyncio event loop? Python callbacks sometimes run embedded inside the event loop of a game, in an application using a foreign GUI toolkit, or in a web server on an appliance. Is there a way to use coroutine in such environments, or are we condemned to revert to callback hell?

One option is to implement a subset of the event loop interface on whatever event loop we are running under. This is in principle explicitly allowed by asyncio, whose event loop is specified by PEP 3156. Doing so in practice is still a large undertaking best left to dedicated projects. For example, gbulb, a project that implements the asyncio event loop on top of GLib’s, contains a decent amount of bridge code, and still warns of the various impedance mismatches between GLib and asyncio. Some differences are fundamental, such as GLib allowing recursive looping while asyncio doesn’t, and some are subtle, such as incompatible approaches to multi-threaded execution or platform-specific differences. Implementing the asyncio event loop just to add some coroutines to an existing application is a non-starter.

It turns out that there are ways of executing coroutines that do not require having an event loop.

Coroutines with continuations

Coroutine-like constructs have been around for much longer than Python-style generators. The Scheme language famously pioneered continuations, a concept more powerful and general than coroutines, but also harder to understand and harder yet to efficiently implement. Scheme didn’t have a concept of coroutines, but it allowed any function to capture a continuation, which would allow it to be later resumed at that exact point. The powerful call/cc primitive could be used to implement all kinds of control flow abstractions, not limited to coroutines. In case of coroutines, both suspension and resumption can be expressed in terms of requesting a continuation and at the right time, and applying it later to switch to the previous execution point. Scheme continuations could be called any number of times, which posed a burden on their efficient implementation. This is why full-fledged continuations, despite their undeniable power, are almost universally shunned by mainstream language implementors.

The success of Python generators showed that more limited forms of suspendable functions can be efficiently implemented and still be well-accepted by programmers. Where a Scheme continuation is almost as heavy as a thread because it needs to store a snapshot of the entire call stack, a running Python generator only needs to store its local variables, and those of its subgenerators if any.

The rise of callback-based programming that led to the invention of async/await sparked a renewed interest in coroutines, including those with explicit continuations. In May 2017 Kotlin 1.1 introduced experimental support for just such coroutines. Kotlin coroutines deftly avoid the downsides of full-fledged Scheme-style continuations by instituting the same limitations as those of Python generators: there is a clear boundary between suspending and non-suspending code, and each continuation can only be resumed once.

Kotlin divides functions into suspending functions, statically marked with a suspend modifier, and regular non-suspending ones. To call a suspending function from normal code, one must invoke the createCoroutine primitive or its high-level cousins launch and async. Once inside a suspending function, one can freely call other suspending functions. Unlike in Python, await is not explicit, calling another suspending function automatically awaits it. Most importantly, a suspending function may choose to suspend itself by calling suspendCoroutine, a function without a direct Python equivalent. suspendCoroutine suspends execution, like Python’s yield, but before doing so, generates a continuation object and passes it to the coroutine. The coroutine will be resumed when someone calls the resume method on the continuation object.

This example uses suspendCoroutine to resume the same coroutine in a different thread, after a 1-second delay:

suspend fun greet() {
    println("hello...")
    suspendCoroutine<Unit> { cont ->
        Thread {Thread.sleep(1000); cont.resume(Unit)}.start()
    }
    println("world...")
    return 42
}

It can be invoked from any blocking code by calling the launch function, for example:

fun main(args: Array<String>) {
    launch(Unconfined) {
        greet()
    }
    Thread.sleep(1500)
}

Let’s convert the coroutine to Python syntax for easier analysis:

async def greet():
    print('hello')
    def resume_later(cont):
        t = threading.Timer(1, cont.resume, args=(None,))
        t.start()
    await suspendCoroutine(resume_later)
    print('world')
    return 42

resume_later trivially uses the venerable Timer to spawn a new thread that will call cont.resume(None) a second later. The real magic happens in suspendCoroutine, which:

  • creates a continuation and immediately (before suspending!) invokes resume_later with the continuation;
  • then returns an awaitable object that suspends the coroutine when awaited.

But why is all this a big deal? How is it useful?

Notice how everything is done without relying on the asyncio event loop. In fact, not only doesn’t the code rely on a particular event loop, it doesn’t presuppose the existence of an event loop at all! Using explicit suspend/resume, a coroutine can suspend itself and its callers, previously arranging its resumption. And it can achieve this relying only on the resources at its disposal, without needing anything like a full-fledged PEP 3156 event loop. For example, a version of asyncio.sleep that works within the GTK event loop might look like this:

async def glib_sleep(delay):
    await suspendCoroutine(
        lambda cont: GLib.timeout_add(delay * 1000, cont.resume, None))

GTK animations could use it to sleep between frames without disturbing the rest of the GUI:

async def animate(canvas, shape):
    # animate a shape along a sine curve
    for t in (i / 100 for i in range(0, 100)):
        canvas.draw(shape, WIDTH * t, HEIGHT * math.sin(t / (2 * math.pi)))
        await glib_sleep(1 / FPS)

Usefulness of explicit suspend is not limited to sleeping, it could be used to await any callback, such as update ticks from the GTK frame clock, or arbitrary widget signals. Widget setup code reacting to multiple events such as realize and map must currently be dispersed acrossed several callbacks. With suspend they could be easily expressed as a single coroutine that awaits each signal it’s interested in.

Use cases extend beyond GUI toolkits. Explicit suspend has the potential to bring benefits of coroutines to any callback-based environment, no matter how minimalistic.

Now that we’re hooked, let’s implement suspendCoroutine.

Continuations in Python

How await works

A Python coroutine, also called an async function, is a thin wrapper around a generator, where yield signals a suspension and return signals coroutine completion. The @coroutine decorator can be used to promote an ordinary generator into an awaitable coroutine:

@types.coroutine
def inner():
    yield 1
    yield 2
    return "foo"

Other async functions can now use await inner() to delegate their execution to inner(). But what will await return? Let’s try to drive the coroutine to completion using its send method:

>>> coro = inner()
>>> coro.send(None)
1
>>> coro.send(None)
2
>>> coro.send(None)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: foo

So, the caller of send() gets the yielded values, but the return value is buried in the StopIteration instance. And this value is picked up and returned by await:

async def outer():
    print(await inner())
    return "bar"

>>> coro = outer()
>>> coro.send(None)
1
>>> coro.send(None)
2
>>> coro.send(None)
foo
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: bar

As far as outer is concerned, the individual values yielded by inner do not exist, they are directly transferred all the way up to the caller of send. On the other hand, the foo value returned by inner is never seen by send, it is consumed by the await in outer. In its StopIteration instance send() only receives the bar value returned by outer. await can be thought of as syntactic sugar for:

# simplistic expansion of result = await x
while True:
    try:
        susp_value = x.send(None)
    except StopIteration as e:
        result = e.value
        break
    # x has chosen to suspend; propagate the suspension to the caller
    yield susp_value

await is implemented with the same code as yield from, which delegates execution to a sub-generator, which is (as authors of asyncio noticed) conceptually the same thing as awaiting a coroutine. The original asyncio was in fact entirely based on generators and yield from.

Suspensions in asyncio

The event loop drives a coroutines by doing something similar to what await does: invoking send() until it signals it is finished by raising StopIteration. The difference is that the event loop switches to a difference coroutine after each send, and switches to waiting for IO or timeouts when there are no runnable coroutines.

Any coroutine that suspends must be resumed at a later point by the event loop. A naked “suspend me” primitive is not provided by asyncio — every asyncio suspension is paired with the code that arranges for timely resumption. To see how this works, consider a minimal implementation of asyncio.sleep():

@types.coroutine
def simple_sleep(delay):
    me = asyncio.Task.current_task()
    loop.call_later(loop.time() + delay, me.set_result, None)
    yield

At this level, things start to look similar to the Kotlin version. Suspension itself is clearly represented by yield. Prior to suspension, the coroutine arranges for its resumption with loop.call_later. The continuation is apparently represented by the “current task”, the object that owns the coroutine inside the event loop, conveniently provided by asyncio. The task object inherits from asyncio.Future, which means it is resumed with the set_result() method, analogous to cont.resume(). Since the task is tracked by the event loop, it will automatically notice that it has finished and will resume a coroutine that awaits simple_sleep.

Explicit continuation

Now let’s see if we can write a version of simple_sleep that doesn’t depend on asyncio. First, we’ll need a simple driver to invoke the coroutine from synchronous code:

def start(coro):
    try:
        coro.send(None)
        # at this point the coroutine is suspended
    except StopIteration:
        pass  # the coroutine chose not to suspend

This is very different from an event loop. For one, there is no loop! The name start reflects that we don’t really drive the coroutine, we simply set it in motion, and leave it to do the rest.

Calling start on asyncio-style coroutines such as inner and outer won’t produce a useful result – it will simply pick up the first yielded value and exit immediately, discarding the value. This is exactly what we want — our coroutines know how to resume themselves, so there is no need for start to continue executing after the coroutine has been given a chance to run. Also, if we ever hope to invoke start from a callback-based system, both start and the later continuations must be non-blocking, i.e. each of them must only execute the coroutine code until the next suspension point.

Next, we need to change simple_sleep to use threading.Timer to arrange for its continuation, renaming it to thread_sleep:

@types.coroutine
def thread_sleep(delay):
    me = ???
    t = threading.Timer(delay, me.set_result, args=(None,))
    t.start()
    yield

Here we have a little problem. Exactly what callback do we pass to timer.Thread? Creating a Future like asyncio does wouldn’t help because a freshly created future would not be observed by anyone, so its set_result would have no effect. However, start already has exactly the continuation we’re looking for — it’s coro.send(). But we don’t and cannot access to the running coroutine object stored somewhere on the stack. If only we could only somehow tell start to telegram us the continuation, we’d be set.

One way to do so would be for start to leave the current continuation in a global variable, like asyncio does with the current task. But there is an even neater approach, inspired by Curio, and that is to request the continuation from our non-coroutine caller! Something like this:

@types.coroutine
def thread_sleep(delay):
    cont = yield
    t = threading.Timer(delay, cont, args=(None,))
    t.start()
    yield

Now we have two suspensions, one to request the continuation, and the other to actually suspend. We’ll also change start to handle the first one:

def _resume(coro, value):
    # the pattern of catching StopIteration is frequent enough that a
    # convenience function comes useful
    try:
        coro.send(value)
        return True
    except StopIteration:
        return False

def start(coro):
    if not _resume(coro, None):
        return  # the coroutine chose not to suspend at all
    # we are at first suspension, the coroutine requested the continuation
    cont = lambda value: _resume(coro, value)
    _resume(coro, cont)
    # we are at second suspension, the coroutine is responsible 
    # to resume itself by invoking its continuation

Let’s try if it works on the greet example:

async def greet():
    print('hello...')
    await thread_sleep(1)
    print('...world')

>>> start(greet())
hello...
>>> ...world

It works! start exited immediately, as expected, but the coroutine arranged for itself to continue. As strange as it looks to use _resume to send _resume to the function, it actually works. The await construct drives thread_sleep for as long as it suspends, passing each suspensions to its caller until it reaches a non-coroutine caller such as start().

Explicit suspension

Of course, we don’t want to write coroutines like thread_sleep using bare generators – we want to have a suspension primitive that is itself awaitable, the equivalent of Kotlin’s suspendCoroutine. Looking at thread_sleep, it is straightforward to generalize it:

@types.coroutine
def suspend(fn):
    cont = yield
    fn(cont)
    cont_retval = yield
    return cont_retval

This works exactly the same as thread_sleep, but leaving it up to the caller to decide what to do with the continuation, by passing it as to a function of the caller’s choosing. (This is why Scheme calls its primitive “call with current continuation”.) thread_sleep can now be an actual async function that awaits suspend:

async def thread_sleep(delay):
    def resume_later(cont):
        t = threading.Timer(delay, cont, args=(None,))
        t.start()
        # suspension happens at this point
    await suspend(resume_later)

start(greet()) works just like before. The original greet example translated from Kotlin to Python would now also work, just with suspendCoroutine changed to suspend. In fact, other than in the implementation of suspend, we never again have to use raw generators to implement suspension.

As a final change, the suspend API that accepts a function requires defining a temporary function for even very simple uses. This is not very ergonomic in Python, where suspension would be better expressed as an async context manager.

async def thread_sleep(delay):
    async with suspending() as cont:
        t = threading.Timer(delay, cont, args=(None,))
        t.start()
        # suspension happens at this point

suspending (named because it ends up in suspension, analogous to contextlib.closing) works like suspend, only split in two methods:

class suspending:
    __slots__ = ('_cont',)

    @types.coroutine
    def __aenter__(self):
        cont = yield
        self._cont = cont
        return cont

    @types.coroutine
    def __aexit__(self, *_):
        # we cannot return the result, so we leave it in cont.result
        self._cont.result = yield

corocc

The code presented above is available as the corocc module on github. It includes tests and several usage examples and is published under the MIT license. Although it is more complete than the snippets shown above, it should at this stage be considered experimental.

The goal of corocc is to bring the power of explicit continuations to Python. This should serve to allow coroutines to be used in a wider range of applications than previously possible.

Relation to greenlets

Invoking explicit continuations sounds in principle similar to the switching of greenlets, “green thread” objects provided by the greenlet library. Beyond the superficial similarity, the two do not share the same design. corocc is based on async functions and the await keyword, themselves based on Python generators and yield from. Greenlets are based on a full-featured green-thread design without distinction between blocking and suspending functions, so that any function may decide to switch context. Also, greenlets are organized into a tree hierarchy with exception propagation upward. These feature come at a cost of increased weight and also some limitations.

The two differ in behavior with threading. This is an area where continuation-based coroutines can lift the limitations of coroutines that were never inherent to coroutines themselves, but to the event loop design. There is nothing preventing a corocc coroutine from starting execution in one thread and continuing in another, and so on for every continuation. greenlet on the other hand explicitly documents that “It is not possible to mix or switch between greenlets belonging to different threads.” (Of course, switching a coroutine between threads is not possible in asyncio either.)

Cancellation

Canceling coroutines and futures from the outside is a standard event loop feature. In the explicit continuation paradigm, canceling a coroutine might prevent its continuation, possibly canceling its timeout if it was arranged through corocc.

Execution context

asyncio comes with functions gather and wait that combine coroutines running in parallel into a single awaitable object. Both of those are available in corocc, implemented on top of the on-done callback provided by the option of start to connect to a Future. However, fully implementing them requires a generic way to schedule a timeout task, the corocc equivalent of asyncio.sleep().

A generic corocc.sleep() would definitely require some knowledge of the execution context, perhaps obtained from a context provided to corocc.start(). Kotlin’s launch supports a context argument with similar semantics. The context would provide primitives like call_soon and call_later that could be easily adapted to new environments.

corocc.sleep() would use corocc.suspending() to retrieve the start context using the same protocol that currently retrieves the continuation. Then it would suspend itself and use context.call_later() to arrange for the continuation to be invoked later, regardless of the current event loop. The difference between this kind of execution a PEP 3156 event loop is that this mechanism is explicitly designed to hook into existing event loop systems, not to run the show.

Mixing with… other coroutines?

If corocc coroutines can be integrated into classic event loops, can we fit them into async event loops? Can we await a corocc coroutine from a Curio coroutine and the other way around? This doesn’t look very useful at first because async event loops are natively able to drive coroutines – but it might allow the same coroutine code to work under async and regular event loops!

It is not clear that this can be easily achieved, given the larger liberties provided to explicitly continued code. But with some ground rules and with support in the framework, it might be possible to do it anyway. And when the reward is access to a large body of established and well-tested code, it is well worth a try.

Asyncio reading list

asyncio is the Python library for asynchronous programming and coroutine-based concurrency bundled with the language since version 3.4. As asynchronous programming and coroutines are currently hotly discussed in context of different programming languages (e.g. Kotlin, Rust, JavaScript, only to name a few), I decided to take a look at asyncio. Soon I found it fascinating and before I knew it I started spending time on the asyncio StackOverflow tag.

A StackOverflow participant asked me to recommend resources for learning about asyncio. While the official documentation provides a good reference manual, it doesn’t work as a tutorial – despite the authors’ best intentions, it is hard to follow unless one already understands the underpinnings. Fortunately there are quite a lot of introductory articles about asyncio. As with learning any novel concept, it takes some time to wrap one’s head around it, and one often needs to look at the same thing from many different angles before reaching that a-ha! moment that makes it all worth it. Here is a list of resources that I personally found enlightening.

Basic

  • A guide to asynchronous programming in Python with asyncio – covers the basic concepts. Introduces coroutines, futures, tasks, and event loop with simple examples.
  • Exploring Python 3’s Asyncio by Example – a more detailed introductory text. It shows spawning coroutines in parallel and using aiohttp for real-world tasks. It is written in the older yield from syntax, but don’t be put off by that, just imagine await in place of yield from, as their semantics in asyncio is identical. (Also, yield from is still supported, so the code works in modern asyncio just fine.)

Intermediate

  • AsyncIO for the Working Python Developer – covers the basic stuff, but also the wait flags and timeouts, task cancellation, and exception handling.
  • asyncio PyMOTW article – a very detailed reference for asyncio with examples for every covered concept. Includes examples of interaction between asyncio and synchronous code.

Advanced

  • How the heck does async/await work in Python 3.5? – an in-depth explanation of asyncio beginning with generators and moving on to yield from and await. Ends with a simple event loop implementation.
  • Python Concurrency From the Ground Up – a talk by the inimitable Dave Beazley. During the talk, Dave implements a full-featured event loop with support for coroutine dispatch, timeouts, and IO polling, typing and testing all of it in front of live audience. A friend likened this talk to the programming equivalent of jazz improvisation combined with stand-up comedy. Although the actual asyncio event loop implementation differs from the one presented there, this talk is a fantastic aid for understanding the dispatch of Python coroutines, and how it relates to the classic “event loop” concept. If you’re curious about coroutine-native event loops as presented in the talks, look up Dave’s curio and Nathaniel J. Smith’s trio libraries.